mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38        FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{
42        split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43    },
44    Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51    cmp::{self, Reverse},
52    collections::{BinaryHeap, HashMap, HashSet},
53    fmt,
54    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55    str, thread,
56    time::Duration,
57    vec,
58};
59
60/// The default max length of the service name without domain, not including the
61/// leading underscore (`_`). It is set to 15 per
62/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
63pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65/// The default interval for checking IP changes automatically.
66pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
69/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
70pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79/// Response status code for the service `unregister` call.
80#[derive(Debug)]
81pub enum UnregisterStatus {
82    /// Unregister was successful.
83    OK,
84    /// The service was not found in the registration.
85    NotFound,
86}
87
88/// Status code for the service daemon.
89#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92    /// The daemon is running as normal.
93    Running,
94
95    /// The daemon has been shutdown.
96    Shutdown,
97}
98
99/// Different counters included in the metrics.
100/// Currently all counters are for outgoing packets.
101#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103    Register,
104    RegisterResend,
105    Unregister,
106    UnregisterResend,
107    Browse,
108    ResolveHostname,
109    Respond,
110    CacheRefreshPTR,
111    CacheRefreshSRV,
112    CacheRefreshAddr,
113    KnownAnswerSuppression,
114    CachedPTR,
115    CachedSRV,
116    CachedAddr,
117    CachedTxt,
118    CachedNSec,
119    CachedSubtype,
120    DnsRegistryProbe,
121    DnsRegistryActive,
122    DnsRegistryTimer,
123    DnsRegistryNameChange,
124    Timer,
125}
126
127impl fmt::Display for Counter {
128    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
129        match self {
130            Self::Register => write!(f, "register"),
131            Self::RegisterResend => write!(f, "register-resend"),
132            Self::Unregister => write!(f, "unregister"),
133            Self::UnregisterResend => write!(f, "unregister-resend"),
134            Self::Browse => write!(f, "browse"),
135            Self::ResolveHostname => write!(f, "resolve-hostname"),
136            Self::Respond => write!(f, "respond"),
137            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
138            Self::CacheRefreshSRV => write!(f, "cache-refresh-srv"),
139            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
140            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
141            Self::CachedPTR => write!(f, "cached-ptr"),
142            Self::CachedSRV => write!(f, "cached-srv"),
143            Self::CachedAddr => write!(f, "cached-addr"),
144            Self::CachedTxt => write!(f, "cached-txt"),
145            Self::CachedNSec => write!(f, "cached-nsec"),
146            Self::CachedSubtype => write!(f, "cached-subtype"),
147            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
148            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
149            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
150            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
151            Self::Timer => write!(f, "timer"),
152        }
153    }
154}
155
156/// The metrics is a HashMap of (name_key, i64_value).
157/// The main purpose is to help monitoring the mDNS packet traffic.
158pub type Metrics = HashMap<String, i64>;
159
160const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
161
162/// A daemon thread for mDNS
163///
164/// This struct provides a handle and an API to the daemon. It is cloneable.
165#[derive(Clone)]
166pub struct ServiceDaemon {
167    /// Sender handle of the channel to the daemon.
168    sender: Sender<Command>,
169
170    /// Send to this addr to signal that a `Command` is coming.
171    ///
172    /// The daemon listens on this addr together with other mDNS sockets,
173    /// to avoid busy polling the flume channel. If there is a way to poll
174    /// the channel and mDNS sockets together, then this can be removed.
175    signal_addr: SocketAddr,
176}
177
178impl ServiceDaemon {
179    /// Creates a new daemon and spawns a thread to run the daemon.
180    ///
181    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
182    /// ask callers to set the port.
183    pub fn new() -> Result<Self> {
184        // Use port 0 to allow the system assign a random available port,
185        // no need for a pre-defined port number.
186        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
187
188        let signal_sock = UdpSocket::bind(signal_addr)
189            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
190
191        // Get the socket with the OS chosen port
192        let signal_addr = signal_sock
193            .local_addr()
194            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
195
196        // Must be nonblocking so we can listen to it together with mDNS sockets.
197        signal_sock
198            .set_nonblocking(true)
199            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
200
201        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
202
203        let (sender, receiver) = bounded(100);
204
205        // Spawn the daemon thread
206        let mio_sock = MioUdpSocket::from_std(signal_sock);
207        thread::Builder::new()
208            .name("mDNS_daemon".to_string())
209            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
210            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
211
212        Ok(Self {
213            sender,
214            signal_addr,
215        })
216    }
217
218    /// Sends `cmd` to the daemon via its channel, and sends a signal
219    /// to its sock addr to notify.
220    fn send_cmd(&self, cmd: Command) -> Result<()> {
221        let cmd_name = cmd.to_string();
222
223        // First, send to the flume channel.
224        self.sender.try_send(cmd).map_err(|e| match e {
225            TrySendError::Full(_) => Error::Again,
226            e => e_fmt!("flume::channel::send failed: {}", e),
227        })?;
228
229        // Second, send a signal to notify the daemon.
230        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
231        let socket = UdpSocket::bind(addr)
232            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
233        socket
234            .send_to(cmd_name.as_bytes(), self.signal_addr)
235            .map_err(|e| {
236                e_fmt!(
237                    "signal socket send_to {} ({}) failed: {}",
238                    self.signal_addr,
239                    cmd_name,
240                    e
241                )
242            })?;
243
244        Ok(())
245    }
246
247    /// Starts browsing for a specific service type.
248    ///
249    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
250    ///
251    /// Returns a channel `Receiver` to receive events about the service. The caller
252    /// can call `.recv_async().await` on this receiver to handle events in an
253    /// async environment or call `.recv()` in a sync environment.
254    ///
255    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
256    /// finding more details, i.e. SRV records and TXT records.
257    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
258        check_domain_suffix(service_type)?;
259
260        let (resp_s, resp_r) = bounded(10);
261        self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
262        Ok(resp_r)
263    }
264
265    /// Stops searching for a specific service type.
266    ///
267    /// When an error is returned, the caller should retry only when
268    /// the error is `Error::Again`, otherwise should log and move on.
269    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
270        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
271    }
272
273    /// Starts querying for the ip addresses of a hostname.
274    ///
275    /// Returns a channel `Receiver` to receive events about the hostname.
276    /// The caller can call `.recv_async().await` on this receiver to handle events in an
277    /// async environment or call `.recv()` in a sync environment.
278    ///
279    /// The `timeout` is specified in milliseconds.
280    pub fn resolve_hostname(
281        &self,
282        hostname: &str,
283        timeout: Option<u64>,
284    ) -> Result<Receiver<HostnameResolutionEvent>> {
285        check_hostname(hostname)?;
286        let (resp_s, resp_r) = bounded(10);
287        self.send_cmd(Command::ResolveHostname(
288            hostname.to_string(),
289            1,
290            resp_s,
291            timeout,
292        ))?;
293        Ok(resp_r)
294    }
295
296    /// Stops querying for the ip addresses of a hostname.
297    ///
298    /// When an error is returned, the caller should retry only when
299    /// the error is `Error::Again`, otherwise should log and move on.
300    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
301        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
302    }
303
304    /// Registers a service provided by this host.
305    ///
306    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
307    /// this method will automatically fill in addresses from the host.
308    ///
309    /// To re-announce a service with an updated `service_info`, just call
310    /// this `register` function again. No need to call `unregister` first.
311    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
312        check_service_name(service_info.get_fullname())?;
313        check_hostname(service_info.get_hostname())?;
314
315        self.send_cmd(Command::Register(service_info))
316    }
317
318    /// Unregisters a service. This is a graceful shutdown of a service.
319    ///
320    /// Returns a channel receiver that is used to receive the status code
321    /// of the unregister.
322    ///
323    /// When an error is returned, the caller should retry only when
324    /// the error is `Error::Again`, otherwise should log and move on.
325    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
326        let (resp_s, resp_r) = bounded(1);
327        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
328        Ok(resp_r)
329    }
330
331    /// Starts to monitor events from the daemon.
332    ///
333    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
334    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
335        let (resp_s, resp_r) = bounded(100);
336        self.send_cmd(Command::Monitor(resp_s))?;
337        Ok(resp_r)
338    }
339
340    /// Shuts down the daemon thread and returns a channel to receive the status.
341    ///
342    /// When an error is returned, the caller should retry only when
343    /// the error is `Error::Again`, otherwise should log and move on.
344    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
345        let (resp_s, resp_r) = bounded(1);
346        self.send_cmd(Command::Exit(resp_s))?;
347        Ok(resp_r)
348    }
349
350    /// Returns the status of the daemon.
351    ///
352    /// When an error is returned, the caller should retry only when
353    /// the error is `Error::Again`, otherwise should consider the daemon
354    /// stopped working and move on.
355    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
356        let (resp_s, resp_r) = bounded(1);
357
358        if self.sender.is_disconnected() {
359            resp_s
360                .send(DaemonStatus::Shutdown)
361                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
362        } else {
363            self.send_cmd(Command::GetStatus(resp_s))?;
364        }
365
366        Ok(resp_r)
367    }
368
369    /// Returns a channel receiver for the metrics, e.g. input/output counters.
370    ///
371    /// The metrics returned is a snapshot. Hence the caller should call
372    /// this method repeatedly if they want to monitor the metrics continuously.
373    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
374        let (resp_s, resp_r) = bounded(1);
375        self.send_cmd(Command::GetMetrics(resp_s))?;
376        Ok(resp_r)
377    }
378
379    /// Change the max length allowed for a service name.
380    ///
381    /// As RFC 6763 defines a length max for a service name, a user should not call
382    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
383    ///
384    /// `len_max` is capped at an internal limit, which is currently 30.
385    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
386        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
387
388        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
389            return Err(Error::Msg(format!(
390                "service name length max {} is too large",
391                len_max
392            )));
393        }
394
395        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
396    }
397
398    /// Change the interval for checking IP changes automatically.
399    ///
400    /// Setting the interval to 0 disables the IP check.
401    ///
402    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
403    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
404        let interval_in_millis = interval_in_secs as u64 * 1000;
405        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
406            interval_in_millis,
407        )))
408    }
409
410    /// Get the current interval in seconds for checking IP changes automatically.
411    pub fn get_ip_check_interval(&self) -> Result<u32> {
412        let (resp_s, resp_r) = bounded(1);
413        self.send_cmd(Command::GetOption(resp_s))?;
414
415        let option = resp_r
416            .recv_timeout(Duration::from_secs(10))
417            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
418        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
419        Ok(ip_check_interval_in_secs as u32)
420    }
421
422    /// Include interfaces that match `if_kind` for this service daemon.
423    ///
424    /// For example:
425    /// ```ignore
426    ///     daemon.enable_interface("en0")?;
427    /// ```
428    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
429        let if_kind_vec = if_kind.into_vec();
430        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
431            if_kind_vec.kinds,
432        )))
433    }
434
435    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
436    ///
437    /// For example:
438    /// ```ignore
439    ///     daemon.disable_interface(IfKind::IPv6)?;
440    /// ```
441    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
442        let if_kind_vec = if_kind.into_vec();
443        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
444            if_kind_vec.kinds,
445        )))
446    }
447
448    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
449    ///
450    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
451    /// receive announcements from a responder on the same host.
452    ///
453    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
454    ///
455    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
456    /// the UNIX version of the IP_MULTICAST_LOOP option:
457    ///
458    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
459    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
460    ///
461    /// Which means, in order NOT to receive localhost announcements, you want to call
462    /// this API on the querier side on Windows, but on the responder side on Unix.
463    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
464        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
465    }
466
467    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
468    ///
469    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
470    /// receive announcements from a responder on the same host.
471    ///
472    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
473    ///
474    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
475    /// the UNIX version of the IP_MULTICAST_LOOP option:
476    ///
477    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
478    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
479    ///
480    /// Which means, in order NOT to receive localhost announcements, you want to call
481    /// this API on the querier side on Windows, but on the responder side on Unix.
482    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
483        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
484    }
485
486    /// Proactively confirms whether a service instance still valid.
487    ///
488    /// This call will issue queries for a service instance's SRV record and Address records.
489    ///
490    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
491    /// unless there is a reason not to follow RFC.
492    ///
493    /// If no response is received within `timeout`, the current resource
494    /// records will be flushed, and if needed, `ServiceRemoved` event will be
495    /// sent to active queriers.
496    ///
497    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
498    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
499        self.send_cmd(Command::Verify(instance_fullname, timeout))
500    }
501
502    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
503        let zc = Zeroconf::new(signal_sock, poller);
504
505        if let Some(cmd) = Self::run(zc, receiver) {
506            match cmd {
507                Command::Exit(resp_s) => {
508                    // It is guaranteed that the receiver already dropped,
509                    // i.e. the daemon command channel closed.
510                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
511                        debug!("exit: failed to send response of shutdown: {}", e);
512                    }
513                }
514                _ => {
515                    debug!("Unexpected command: {:?}", cmd);
516                }
517            }
518        }
519    }
520
521    /// The main event loop of the daemon thread
522    ///
523    /// In each round, it will:
524    /// 1. select the listening sockets with a timeout.
525    /// 2. process the incoming packets if any.
526    /// 3. try_recv on its channel and execute commands.
527    /// 4. announce its registered services.
528    /// 5. process retransmissions if any.
529    fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
530        // Add the daemon's signal socket to the poller.
531        if let Err(e) = zc.poller.registry().register(
532            &mut zc.signal_sock,
533            mio::Token(SIGNAL_SOCK_EVENT_KEY),
534            mio::Interest::READABLE,
535        ) {
536            debug!("failed to add signal socket to the poller: {}", e);
537            return None;
538        }
539
540        // Add mDNS sockets to the poller.
541        for (intf, sock) in zc.intf_socks.iter_mut() {
542            let key =
543                Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
544
545            if let Err(e) =
546                zc.poller
547                    .registry()
548                    .register(sock, mio::Token(key), mio::Interest::READABLE)
549            {
550                debug!("add socket of {:?} to poller: {e}", intf);
551                return None;
552            }
553        }
554
555        // Setup timer for IP checks.
556        let mut next_ip_check = if zc.ip_check_interval > 0 {
557            current_time_millis() + zc.ip_check_interval
558        } else {
559            0
560        };
561
562        if next_ip_check > 0 {
563            zc.add_timer(next_ip_check);
564        }
565
566        // Start the run loop.
567
568        let mut events = mio::Events::with_capacity(1024);
569        loop {
570            let now = current_time_millis();
571
572            let earliest_timer = zc.peek_earliest_timer();
573            let timeout = earliest_timer.map(|timer| {
574                // If `timer` already passed, set `timeout` to be 1ms.
575                let millis = if timer > now { timer - now } else { 1 };
576                Duration::from_millis(millis)
577            });
578
579            // Process incoming packets, command events and optional timeout.
580            events.clear();
581            match zc.poller.poll(&mut events, timeout) {
582                Ok(_) => zc.handle_poller_events(&events),
583                Err(e) => debug!("failed to select from sockets: {}", e),
584            }
585
586            let now = current_time_millis();
587
588            // Remove the timers if already passed.
589            zc.pop_timers_till(now);
590
591            // Remove hostname resolvers with expired timeouts.
592            for hostname in zc
593                .hostname_resolvers
594                .clone()
595                .into_iter()
596                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
597                .map(|(hostname, _)| hostname)
598            {
599                trace!("hostname resolver timeout for {}", &hostname);
600                call_hostname_resolution_listener(
601                    &zc.hostname_resolvers,
602                    &hostname,
603                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
604                );
605                call_hostname_resolution_listener(
606                    &zc.hostname_resolvers,
607                    &hostname,
608                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
609                );
610                zc.hostname_resolvers.remove(&hostname);
611            }
612
613            // process commands from the command channel
614            while let Ok(command) = receiver.try_recv() {
615                if matches!(command, Command::Exit(_)) {
616                    zc.status = DaemonStatus::Shutdown;
617                    return Some(command);
618                }
619                zc.exec_command(command, false);
620            }
621
622            // check for repeated commands and run them if their time is up.
623            let mut i = 0;
624            while i < zc.retransmissions.len() {
625                if now >= zc.retransmissions[i].next_time {
626                    let rerun = zc.retransmissions.remove(i);
627                    zc.exec_command(rerun.command, true);
628                } else {
629                    i += 1;
630                }
631            }
632
633            // Refresh cached service records with active queriers
634            zc.refresh_active_services();
635
636            // Refresh cached A/AAAA records with active queriers
637            let mut query_count = 0;
638            for (hostname, _sender) in zc.hostname_resolvers.iter() {
639                for (hostname, ip_addr) in
640                    zc.cache.refresh_due_hostname_resolutions(hostname).iter()
641                {
642                    zc.send_query(hostname, ip_address_rr_type(ip_addr));
643                    query_count += 1;
644                }
645            }
646
647            zc.increase_counter(Counter::CacheRefreshAddr, query_count);
648
649            // check and evict expired records in our cache
650            let now = current_time_millis();
651
652            // Notify service listeners about the expired records.
653            let expired_services = zc.cache.evict_expired_services(now);
654            zc.notify_service_removal(expired_services);
655
656            // Notify hostname listeners about the expired records.
657            let expired_addrs = zc.cache.evict_expired_addr(now);
658            for (hostname, addrs) in expired_addrs {
659                call_hostname_resolution_listener(
660                    &zc.hostname_resolvers,
661                    &hostname,
662                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
663                );
664                let instances = zc.cache.get_instances_on_host(&hostname);
665                let instance_set: HashSet<String> = instances.into_iter().collect();
666                zc.resolve_updated_instances(&instance_set);
667            }
668
669            // Send out probing queries.
670            zc.probing_handler();
671
672            // check IP changes if next_ip_check is reached.
673            if now >= next_ip_check && next_ip_check > 0 {
674                next_ip_check = now + zc.ip_check_interval;
675                zc.add_timer(next_ip_check);
676
677                zc.check_ip_changes();
678            }
679        }
680    }
681}
682
683/// Creates a new UDP socket that uses `intf` to send and recv multicast.
684fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
685    // Use the same socket for receiving and sending multicast packets.
686    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
687    let intf_ip = &intf.ip();
688    match intf_ip {
689        IpAddr::V4(ip) => {
690            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
691            let sock = new_socket(addr.into(), true)?;
692
693            // Join mDNS group to receive packets.
694            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
695                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
696
697            // Set IP_MULTICAST_IF to send packets.
698            sock.set_multicast_if_v4(ip)
699                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
700
701            if !should_loop {
702                sock.set_multicast_loop_v4(false)
703                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
704            }
705
706            // Test if we can send packets successfully.
707            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
708            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
709            for packet in test_packets {
710                sock.send_to(&packet, &multicast_addr)
711                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
712            }
713            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
714        }
715        IpAddr::V6(ip) => {
716            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
717            let sock = new_socket(addr.into(), true)?;
718
719            // Join mDNS group to receive packets.
720            sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
721                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
722
723            // Set IPV6_MULTICAST_IF to send packets.
724            sock.set_multicast_if_v6(intf.index.unwrap_or(0))
725                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
726
727            // We are not sending multicast packets to test this socket as there might
728            // be many IPv6 interfaces on a host and could cause such send error:
729            // "No buffer space available (os error 55)".
730
731            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
732        }
733    }
734}
735
736/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
737/// `non_block` indicates whether to set O_NONBLOCK for the socket.
738fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
739    let domain = match addr {
740        SocketAddr::V4(_) => socket2::Domain::IPV4,
741        SocketAddr::V6(_) => socket2::Domain::IPV6,
742    };
743
744    let fd = Socket::new(domain, socket2::Type::DGRAM, None)
745        .map_err(|e| e_fmt!("create socket failed: {}", e))?;
746
747    fd.set_reuse_address(true)
748        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
749    #[cfg(unix)] // this is currently restricted to Unix's in socket2
750    fd.set_reuse_port(true)
751        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
752
753    if non_block {
754        fd.set_nonblocking(true)
755            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
756    }
757
758    fd.bind(&addr.into())
759        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
760
761    trace!("new socket bind to {}", &addr);
762    Ok(fd)
763}
764
765/// Specify a UNIX timestamp in millis to run `command` for the next time.
766struct ReRun {
767    /// UNIX timestamp in millis.
768    next_time: u64,
769    command: Command,
770}
771
772/// Enum to represent the IP version.
773#[derive(Debug, Eq, Hash, PartialEq)]
774enum IpVersion {
775    V4,
776    V6,
777}
778
779/// A struct to track multicast send status for a network interface.
780#[derive(Debug, Eq, Hash, PartialEq)]
781struct MulticastSendTracker {
782    intf_index: u32,
783    ip_version: IpVersion,
784}
785
786/// Returns the multicast send tracker if the interface index is valid
787fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
788    match intf.index {
789        Some(index) => {
790            let ip_ver = match intf.addr {
791                IfAddr::V4(_) => IpVersion::V4,
792                IfAddr::V6(_) => IpVersion::V6,
793            };
794            Some(MulticastSendTracker {
795                intf_index: index,
796                ip_version: ip_ver,
797            })
798        }
799        None => None,
800    }
801}
802
803/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
804///
805/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
806#[derive(Debug, Clone)]
807#[non_exhaustive]
808pub enum IfKind {
809    /// All interfaces.
810    All,
811
812    /// All IPv4 interfaces.
813    IPv4,
814
815    /// All IPv6 interfaces.
816    IPv6,
817
818    /// By the interface name, for example "en0"
819    Name(String),
820
821    /// By an IPv4 or IPv6 address.
822    Addr(IpAddr),
823
824    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
825    ///
826    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
827    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
828    LoopbackV4,
829
830    /// ::1/128, disabled by default.
831    LoopbackV6,
832}
833
834impl IfKind {
835    /// Checks if `intf` matches with this interface kind.
836    fn matches(&self, intf: &Interface) -> bool {
837        match self {
838            Self::All => true,
839            Self::IPv4 => intf.ip().is_ipv4(),
840            Self::IPv6 => intf.ip().is_ipv6(),
841            Self::Name(ifname) => ifname == &intf.name,
842            Self::Addr(addr) => addr == &intf.ip(),
843            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
844            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
845        }
846    }
847}
848
849/// The first use case of specifying an interface was to
850/// use an interface name. Hence adding this for ergonomic reasons.
851impl From<&str> for IfKind {
852    fn from(val: &str) -> Self {
853        Self::Name(val.to_string())
854    }
855}
856
857impl From<&String> for IfKind {
858    fn from(val: &String) -> Self {
859        Self::Name(val.to_string())
860    }
861}
862
863/// Still for ergonomic reasons.
864impl From<IpAddr> for IfKind {
865    fn from(val: IpAddr) -> Self {
866        Self::Addr(val)
867    }
868}
869
870/// A list of `IfKind` that can be used to match interfaces.
871pub struct IfKindVec {
872    kinds: Vec<IfKind>,
873}
874
875/// A trait that converts a type into a Vec of `IfKind`.
876pub trait IntoIfKindVec {
877    fn into_vec(self) -> IfKindVec;
878}
879
880impl<T: Into<IfKind>> IntoIfKindVec for T {
881    fn into_vec(self) -> IfKindVec {
882        let if_kind: IfKind = self.into();
883        IfKindVec {
884            kinds: vec![if_kind],
885        }
886    }
887}
888
889impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
890    fn into_vec(self) -> IfKindVec {
891        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
892        IfKindVec { kinds }
893    }
894}
895
896/// Selection of interfaces.
897struct IfSelection {
898    /// The interfaces to be selected.
899    if_kind: IfKind,
900
901    /// Whether the `if_kind` should be enabled or not.
902    selected: bool,
903}
904
905/// A struct holding the state. It was inspired by `zeroconf` package in Python.
906struct Zeroconf {
907    /// Local interfaces with sockets to recv/send on these interfaces.
908    intf_socks: HashMap<Interface, MioUdpSocket>,
909
910    /// Map poll id to Interface.
911    poll_ids: HashMap<usize, Interface>,
912
913    /// Next poll id value
914    poll_id_count: usize,
915
916    /// Local registered services, keyed by service full names.
917    my_services: HashMap<String, ServiceInfo>,
918
919    /// Received DNS records.
920    cache: DnsCache,
921
922    /// Registered service records.
923    dns_registry_map: HashMap<Interface, DnsRegistry>,
924
925    /// Active "Browse" commands.
926    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
927
928    /// Active "ResolveHostname" commands.
929    ///
930    /// The timestamps are set at the future timestamp when the command should timeout.
931    /// `hostname` is case-insensitive and stored in lowercase.
932    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
933
934    /// All repeating transmissions.
935    retransmissions: Vec<ReRun>,
936
937    counters: Metrics,
938
939    /// Waits for incoming packets.
940    poller: Poll,
941
942    /// Channels to notify events.
943    monitors: Vec<Sender<DaemonEvent>>,
944
945    /// Options
946    service_name_len_max: u8,
947
948    /// Interval in millis to check IP address changes.
949    ip_check_interval: u64,
950
951    /// All interface selections called to the daemon.
952    if_selections: Vec<IfSelection>,
953
954    /// Socket for signaling.
955    signal_sock: MioUdpSocket,
956
957    /// Timestamps marking where we need another iteration of the run loop,
958    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
959    ///
960    /// When the run loop goes through a single iteration, it will
961    /// set its timeout to the earliest timer in this list.
962    timers: BinaryHeap<Reverse<u64>>,
963
964    status: DaemonStatus,
965
966    /// Service instances that are pending for resolving SRV and TXT.
967    pending_resolves: HashSet<String>,
968
969    /// Service instances that are already resolved.
970    resolved: HashSet<String>,
971
972    multicast_loop_v4: bool,
973
974    multicast_loop_v6: bool,
975}
976
977impl Zeroconf {
978    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
979        // Get interfaces.
980        let my_ifaddrs = my_ip_interfaces(false);
981
982        // Create a socket for every IP addr.
983        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
984        // or the same interface name with different IP addrs.
985        let mut intf_socks = HashMap::new();
986        let mut dns_registry_map = HashMap::new();
987
988        for intf in my_ifaddrs {
989            let sock = match new_socket_bind(&intf, true) {
990                Ok(s) => s,
991                Err(e) => {
992                    trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
993                    continue;
994                }
995            };
996
997            dns_registry_map.insert(intf.clone(), DnsRegistry::new());
998
999            intf_socks.insert(intf, sock);
1000        }
1001
1002        let monitors = Vec::new();
1003        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1004        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1005
1006        let timers = BinaryHeap::new();
1007
1008        // Disable loopback by default.
1009        let if_selections = vec![
1010            IfSelection {
1011                if_kind: IfKind::LoopbackV4,
1012                selected: false,
1013            },
1014            IfSelection {
1015                if_kind: IfKind::LoopbackV6,
1016                selected: false,
1017            },
1018        ];
1019
1020        let status = DaemonStatus::Running;
1021
1022        Self {
1023            intf_socks,
1024            poll_ids: HashMap::new(),
1025            poll_id_count: 0,
1026            my_services: HashMap::new(),
1027            cache: DnsCache::new(),
1028            dns_registry_map,
1029            hostname_resolvers: HashMap::new(),
1030            service_queriers: HashMap::new(),
1031            retransmissions: Vec::new(),
1032            counters: HashMap::new(),
1033            poller,
1034            monitors,
1035            service_name_len_max,
1036            ip_check_interval,
1037            if_selections,
1038            signal_sock,
1039            timers,
1040            status,
1041            pending_resolves: HashSet::new(),
1042            resolved: HashSet::new(),
1043            multicast_loop_v4: true,
1044            multicast_loop_v6: true,
1045        }
1046    }
1047
1048    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1049        match daemon_opt {
1050            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1051            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1052            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1053            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1054            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1055            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1056        }
1057    }
1058
1059    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1060        for if_kind in kinds {
1061            self.if_selections.push(IfSelection {
1062                if_kind,
1063                selected: true,
1064            });
1065        }
1066
1067        self.apply_intf_selections(my_ip_interfaces(true));
1068    }
1069
1070    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1071        for if_kind in kinds {
1072            self.if_selections.push(IfSelection {
1073                if_kind,
1074                selected: false,
1075            });
1076        }
1077
1078        self.apply_intf_selections(my_ip_interfaces(true));
1079    }
1080
1081    fn set_multicast_loop_v4(&mut self, on: bool) {
1082        for (_, sock) in self.intf_socks.iter_mut() {
1083            if let Err(e) = sock.set_multicast_loop_v4(on) {
1084                debug!("failed to set multicast loop v4: {e}");
1085            }
1086        }
1087    }
1088
1089    fn set_multicast_loop_v6(&mut self, on: bool) {
1090        for (_, sock) in self.intf_socks.iter_mut() {
1091            if let Err(e) = sock.set_multicast_loop_v6(on) {
1092                debug!("failed to set multicast loop v6: {e}");
1093            }
1094        }
1095    }
1096
1097    fn notify_monitors(&mut self, event: DaemonEvent) {
1098        // Only retain the monitors that are still connected.
1099        self.monitors.retain(|sender| {
1100            if let Err(e) = sender.try_send(event.clone()) {
1101                debug!("notify_monitors: try_send: {}", &e);
1102                if matches!(e, TrySendError::Disconnected(_)) {
1103                    return false; // This monitor is dropped.
1104                }
1105            }
1106            true
1107        });
1108    }
1109
1110    /// Remove `addr` in my services that enabled `addr_auto`.
1111    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1112        for (_, service_info) in self.my_services.iter_mut() {
1113            if service_info.is_addr_auto() {
1114                service_info.remove_ipaddr(addr);
1115            }
1116        }
1117    }
1118
1119    /// Insert a new interface into the poll map and return key
1120    fn add_poll(&mut self, intf: Interface) -> usize {
1121        Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1122    }
1123
1124    /// Insert a new interface into the poll map and return its key.
1125    ///
1126    /// This exists to satisfy the borrow checker
1127    fn add_poll_impl(
1128        poll_ids: &mut HashMap<usize, Interface>,
1129        poll_id_count: &mut usize,
1130        intf: Interface,
1131    ) -> usize {
1132        let key = *poll_id_count;
1133        *poll_id_count += 1;
1134        let _ = (*poll_ids).insert(key, intf);
1135        key
1136    }
1137
1138    fn add_timer(&mut self, next_time: u64) {
1139        self.timers.push(Reverse(next_time));
1140    }
1141
1142    fn peek_earliest_timer(&self) -> Option<u64> {
1143        self.timers.peek().map(|Reverse(v)| *v)
1144    }
1145
1146    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1147        self.timers.pop().map(|Reverse(v)| v)
1148    }
1149
1150    /// Pop all timers that are already passed till `now`.
1151    fn pop_timers_till(&mut self, now: u64) {
1152        while let Some(Reverse(v)) = self.timers.peek() {
1153            if *v > now {
1154                break;
1155            }
1156            self.timers.pop();
1157        }
1158    }
1159
1160    /// Apply all selections to `interfaces` and return the selected addresses.
1161    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1162        let intf_count = interfaces.len();
1163        let mut intf_selections = vec![true; intf_count];
1164
1165        // apply if_selections
1166        for selection in self.if_selections.iter() {
1167            // Mark the interfaces for this selection.
1168            for i in 0..intf_count {
1169                if selection.if_kind.matches(&interfaces[i]) {
1170                    intf_selections[i] = selection.selected;
1171                }
1172            }
1173        }
1174
1175        let mut selected_addrs = HashSet::new();
1176        for i in 0..intf_count {
1177            if intf_selections[i] {
1178                selected_addrs.insert(interfaces[i].addr.ip());
1179            }
1180        }
1181
1182        selected_addrs
1183    }
1184
1185    /// Apply all selections to `interfaces`.
1186    ///
1187    /// For any interface, add it if selected but not bound yet,
1188    /// delete it if not selected but still bound.
1189    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1190        // By default, we enable all interfaces.
1191        let intf_count = interfaces.len();
1192        let mut intf_selections = vec![true; intf_count];
1193
1194        // apply if_selections
1195        for selection in self.if_selections.iter() {
1196            // Mark the interfaces for this selection.
1197            for i in 0..intf_count {
1198                if selection.if_kind.matches(&interfaces[i]) {
1199                    intf_selections[i] = selection.selected;
1200                }
1201            }
1202        }
1203
1204        // Update `intf_socks` based on the selections.
1205        for (idx, intf) in interfaces.into_iter().enumerate() {
1206            if intf_selections[idx] {
1207                // Add the interface
1208                if !self.intf_socks.contains_key(&intf) {
1209                    debug!("apply_intf_selections: add {:?}", &intf.ip());
1210                    self.add_new_interface(intf);
1211                }
1212            } else {
1213                // Remove the interface
1214                if let Some(mut sock) = self.intf_socks.remove(&intf) {
1215                    match self.poller.registry().deregister(&mut sock) {
1216                        Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1217                        Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1218                    }
1219
1220                    // Remove from poll_ids
1221                    self.poll_ids.retain(|_, v| v != &intf);
1222
1223                    // Remove cache records for this interface.
1224                    self.cache.remove_addrs_on_disabled_intf(&intf);
1225                }
1226            }
1227        }
1228    }
1229
1230    /// Check for IP changes and update intf_socks as needed.
1231    fn check_ip_changes(&mut self) {
1232        // Get the current interfaces.
1233        let my_ifaddrs = my_ip_interfaces(true);
1234
1235        let poll_ids = &mut self.poll_ids;
1236        let poller = &mut self.poller;
1237        // Remove unused sockets in the poller.
1238        let deleted_addrs = self
1239            .intf_socks
1240            .iter_mut()
1241            .filter_map(|(intf, sock)| {
1242                if !my_ifaddrs.contains(intf) {
1243                    if let Err(e) = poller.registry().deregister(sock) {
1244                        debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1245                    }
1246                    // Remove from poll_ids
1247                    poll_ids.retain(|_, v| v != intf);
1248                    Some(intf.ip())
1249                } else {
1250                    None
1251                }
1252            })
1253            .collect::<Vec<IpAddr>>();
1254
1255        // Remove deleted addrs from my services that enabled `addr_auto`.
1256        for ip in deleted_addrs.iter() {
1257            self.del_addr_in_my_services(ip);
1258            self.notify_monitors(DaemonEvent::IpDel(*ip));
1259        }
1260
1261        // Keep the interfaces only if they still exist.
1262        self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1263
1264        // Add newly found interfaces only if in our selections.
1265        self.apply_intf_selections(my_ifaddrs);
1266    }
1267
1268    fn add_new_interface(&mut self, intf: Interface) {
1269        // Bind the new interface.
1270        let new_ip = intf.ip();
1271        let should_loop = if new_ip.is_ipv4() {
1272            self.multicast_loop_v4
1273        } else {
1274            self.multicast_loop_v6
1275        };
1276        let mut sock = match new_socket_bind(&intf, should_loop) {
1277            Ok(s) => s,
1278            Err(e) => {
1279                debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1280                return;
1281            }
1282        };
1283
1284        // Add the new interface into the poller.
1285        let key = self.add_poll(intf.clone());
1286        if let Err(e) =
1287            self.poller
1288                .registry()
1289                .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1290        {
1291            debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1292            return;
1293        }
1294
1295        debug!("add new interface {}: {new_ip}", intf.name);
1296        let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1297            Some(registry) => registry,
1298            None => self
1299                .dns_registry_map
1300                .entry(intf.clone())
1301                .or_insert_with(DnsRegistry::new),
1302        };
1303
1304        for (_, service_info) in self.my_services.iter_mut() {
1305            if service_info.is_addr_auto() {
1306                service_info.insert_ipaddr(new_ip);
1307
1308                if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1309                    debug!(
1310                        "Announce service {} on {}",
1311                        service_info.get_fullname(),
1312                        intf.ip()
1313                    );
1314                    service_info.set_status(&intf, ServiceStatus::Announced);
1315                } else {
1316                    for timer in dns_registry.new_timers.drain(..) {
1317                        self.timers.push(Reverse(timer));
1318                    }
1319                    service_info.set_status(&intf, ServiceStatus::Probing);
1320                }
1321            }
1322        }
1323
1324        self.intf_socks.insert(intf, sock);
1325
1326        // Notify the monitors.
1327        self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1328    }
1329
1330    /// Registers a service.
1331    ///
1332    /// RFC 6762 section 8.3.
1333    /// ...the Multicast DNS responder MUST send
1334    ///    an unsolicited Multicast DNS response containing, in the Answer
1335    ///    Section, all of its newly registered resource records
1336    ///
1337    /// Zeroconf will then respond to requests for information about this service.
1338    fn register_service(&mut self, mut info: ServiceInfo) {
1339        // Check the service name length.
1340        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1341            debug!("check_service_name_length: {}", &e);
1342            self.notify_monitors(DaemonEvent::Error(e));
1343            return;
1344        }
1345
1346        if info.is_addr_auto() {
1347            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1348            for addr in selected_addrs {
1349                info.insert_ipaddr(addr);
1350            }
1351        }
1352
1353        debug!("register service {:?}", &info);
1354
1355        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1356        if !outgoing_addrs.is_empty() {
1357            self.notify_monitors(DaemonEvent::Announce(
1358                info.get_fullname().to_string(),
1359                format!("{:?}", &outgoing_addrs),
1360            ));
1361        }
1362
1363        // The key has to be lower case letter as DNS record name is case insensitive.
1364        // The info will have the original name.
1365        let service_fullname = info.get_fullname().to_lowercase();
1366        self.my_services.insert(service_fullname, info);
1367    }
1368
1369    /// Sends out announcement of `info` on every valid interface.
1370    /// Returns the list of interface IPs that sent out the announcement.
1371    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1372        let mut outgoing_addrs = Vec::new();
1373        // Send the announcement on one interface per ip version.
1374        let mut multicast_sent_trackers = HashSet::new();
1375
1376        let mut outgoing_intfs = Vec::new();
1377
1378        for (intf, sock) in self.intf_socks.iter() {
1379            if let Some(tracker) = multicast_send_tracker(intf) {
1380                if multicast_sent_trackers.contains(&tracker) {
1381                    continue; // No need to send again on the same interface with same ip version.
1382                }
1383            }
1384
1385            let dns_registry = match self.dns_registry_map.get_mut(intf) {
1386                Some(registry) => registry,
1387                None => self
1388                    .dns_registry_map
1389                    .entry(intf.clone())
1390                    .or_insert_with(DnsRegistry::new),
1391            };
1392
1393            if announce_service_on_intf(dns_registry, info, intf, sock) {
1394                if let Some(tracker) = multicast_send_tracker(intf) {
1395                    multicast_sent_trackers.insert(tracker);
1396                }
1397                outgoing_addrs.push(intf.ip());
1398                outgoing_intfs.push(intf.clone());
1399
1400                debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1401
1402                info.set_status(intf, ServiceStatus::Announced);
1403            } else {
1404                for timer in dns_registry.new_timers.drain(..) {
1405                    self.timers.push(Reverse(timer));
1406                }
1407                info.set_status(intf, ServiceStatus::Probing);
1408            }
1409        }
1410
1411        // RFC 6762 section 8.3.
1412        // ..The Multicast DNS responder MUST send at least two unsolicited
1413        //    responses, one second apart.
1414        let next_time = current_time_millis() + 1000;
1415        for intf in outgoing_intfs {
1416            self.add_retransmission(
1417                next_time,
1418                Command::RegisterResend(info.get_fullname().to_string(), intf),
1419            );
1420        }
1421
1422        outgoing_addrs
1423    }
1424
1425    /// Send probings or finish them if expired. Notify waiting services.
1426    fn probing_handler(&mut self) {
1427        let now = current_time_millis();
1428
1429        for (intf, sock) in self.intf_socks.iter() {
1430            let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1431                continue;
1432            };
1433
1434            let mut expired_probe_names = Vec::new();
1435            let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1436
1437            for (name, probe) in dns_registry.probing.iter_mut() {
1438                if now >= probe.next_send {
1439                    if probe.expired(now) {
1440                        // move the record to active
1441                        expired_probe_names.push(name.clone());
1442                    } else {
1443                        out.add_question(name, RRType::ANY);
1444
1445                        /*
1446                        RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
1447                        ...
1448                        for tiebreaking to work correctly in all
1449                        cases, the Authority Section must contain *all* the records and
1450                        proposed rdata being probed for uniqueness.
1451                         */
1452                        for record in probe.records.iter() {
1453                            out.add_authority(record.clone());
1454                        }
1455
1456                        probe.update_next_send(now);
1457
1458                        // add timer
1459                        self.timers.push(Reverse(probe.next_send));
1460                    }
1461                }
1462            }
1463
1464            // send probing.
1465            if !out.questions().is_empty() {
1466                debug!("sending out probing of {} questions", out.questions().len());
1467                send_dns_outgoing(&out, intf, sock);
1468            }
1469
1470            let mut waiting_services = HashSet::new();
1471
1472            for name in expired_probe_names {
1473                let Some(probe) = dns_registry.probing.remove(&name) else {
1474                    continue;
1475                };
1476
1477                // send notifications about name changes
1478                for record in probe.records.iter() {
1479                    if let Some(new_name) = record.get_record().get_new_name() {
1480                        dns_registry
1481                            .name_changes
1482                            .insert(name.clone(), new_name.to_string());
1483
1484                        let event = DnsNameChange {
1485                            original: record.get_record().get_original_name().to_string(),
1486                            new_name: new_name.to_string(),
1487                            rr_type: record.get_type(),
1488                            intf_name: intf.name.to_string(),
1489                        };
1490                        notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1491                    }
1492                }
1493
1494                // move RR from probe to active.
1495                debug!(
1496                    "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1497                    probe.records.len(),
1498                    probe.waiting_services.len(),
1499                );
1500
1501                // Move records to active and plan to wake up services if records are not empty.
1502                if !probe.records.is_empty() {
1503                    match dns_registry.active.get_mut(&name) {
1504                        Some(records) => {
1505                            records.extend(probe.records);
1506                        }
1507                        None => {
1508                            dns_registry.active.insert(name, probe.records);
1509                        }
1510                    }
1511
1512                    waiting_services.extend(probe.waiting_services);
1513                }
1514            }
1515
1516            // wake up services waiting.
1517            for service_name in waiting_services {
1518                debug!(
1519                    "try to announce service {service_name} on intf {}",
1520                    intf.ip()
1521                );
1522                // service names are lowercase
1523                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1524                    if info.get_status(intf) == ServiceStatus::Announced {
1525                        debug!("service {} already announced", info.get_fullname());
1526                        continue;
1527                    }
1528
1529                    if announce_service_on_intf(dns_registry, info, intf, sock) {
1530                        let next_time = now + 1000;
1531                        let command =
1532                            Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1533                        self.retransmissions.push(ReRun { next_time, command });
1534                        self.timers.push(Reverse(next_time));
1535
1536                        let fullname = match dns_registry.name_changes.get(&service_name) {
1537                            Some(new_name) => new_name.to_string(),
1538                            None => service_name.to_string(),
1539                        };
1540
1541                        let mut hostname = info.get_hostname();
1542                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1543                            hostname = new_name;
1544                        }
1545
1546                        debug!("wake up: announce service {} on {}", fullname, intf.ip());
1547                        notify_monitors(
1548                            &mut self.monitors,
1549                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1550                        );
1551
1552                        info.set_status(intf, ServiceStatus::Announced);
1553                    }
1554                }
1555            }
1556        }
1557    }
1558
1559    fn unregister_service(
1560        &self,
1561        info: &ServiceInfo,
1562        intf: &Interface,
1563        sock: &MioUdpSocket,
1564    ) -> Vec<u8> {
1565        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1566        out.add_answer_at_time(
1567            DnsPointer::new(
1568                info.get_type(),
1569                RRType::PTR,
1570                CLASS_IN,
1571                0,
1572                info.get_fullname().to_string(),
1573            ),
1574            0,
1575        );
1576
1577        if let Some(sub) = info.get_subtype() {
1578            trace!("Adding subdomain {}", sub);
1579            out.add_answer_at_time(
1580                DnsPointer::new(
1581                    sub,
1582                    RRType::PTR,
1583                    CLASS_IN,
1584                    0,
1585                    info.get_fullname().to_string(),
1586                ),
1587                0,
1588            );
1589        }
1590
1591        out.add_answer_at_time(
1592            DnsSrv::new(
1593                info.get_fullname(),
1594                CLASS_IN | CLASS_CACHE_FLUSH,
1595                0,
1596                info.get_priority(),
1597                info.get_weight(),
1598                info.get_port(),
1599                info.get_hostname().to_string(),
1600            ),
1601            0,
1602        );
1603        out.add_answer_at_time(
1604            DnsTxt::new(
1605                info.get_fullname(),
1606                CLASS_IN | CLASS_CACHE_FLUSH,
1607                0,
1608                info.generate_txt(),
1609            ),
1610            0,
1611        );
1612
1613        for address in info.get_addrs_on_intf(intf) {
1614            out.add_answer_at_time(
1615                DnsAddress::new(
1616                    info.get_hostname(),
1617                    ip_address_rr_type(&address),
1618                    CLASS_IN | CLASS_CACHE_FLUSH,
1619                    0,
1620                    address,
1621                ),
1622                0,
1623            );
1624        }
1625
1626        // `out` data is non-empty, hence we can do this.
1627        send_dns_outgoing(&out, intf, sock).remove(0)
1628    }
1629
1630    /// Binds a channel `listener` to querying mDNS hostnames.
1631    ///
1632    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1633    fn add_hostname_resolver(
1634        &mut self,
1635        hostname: String,
1636        listener: Sender<HostnameResolutionEvent>,
1637        timeout: Option<u64>,
1638    ) {
1639        let real_timeout = timeout.map(|t| current_time_millis() + t);
1640        self.hostname_resolvers
1641            .insert(hostname.to_lowercase(), (listener, real_timeout));
1642        if let Some(t) = real_timeout {
1643            self.add_timer(t);
1644        }
1645    }
1646
1647    /// Sends a multicast query for `name` with `qtype`.
1648    fn send_query(&self, name: &str, qtype: RRType) {
1649        self.send_query_vec(&[(name, qtype)]);
1650    }
1651
1652    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1653    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1654        trace!("Sending query questions: {:?}", questions);
1655        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1656        let now = current_time_millis();
1657
1658        for (name, qtype) in questions {
1659            out.add_question(name, *qtype);
1660
1661            for record in self.cache.get_known_answers(name, *qtype, now) {
1662                /*
1663                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1664                ...
1665                    When a Multicast DNS querier sends a query to which it already knows
1666                    some answers, it populates the Answer Section of the DNS query
1667                    message with those answers.
1668                 */
1669                trace!("add known answer: {:?}", record);
1670                let mut new_record = record.clone();
1671                new_record.get_record_mut().update_ttl(now);
1672                out.add_answer_box(new_record);
1673            }
1674        }
1675
1676        // Send the query on one interface per ip version.
1677        let mut multicast_sent_trackers = HashSet::new();
1678        for (intf, sock) in self.intf_socks.iter() {
1679            if let Some(tracker) = multicast_send_tracker(intf) {
1680                if multicast_sent_trackers.contains(&tracker) {
1681                    continue; // no need to send query the same interface with same ip version.
1682                }
1683                multicast_sent_trackers.insert(tracker);
1684            }
1685            send_dns_outgoing(&out, intf, sock);
1686        }
1687    }
1688
1689    /// Reads one UDP datagram from the socket of `intf`.
1690    ///
1691    /// Returns false if failed to receive a packet,
1692    /// otherwise returns true.
1693    fn handle_read(&mut self, intf: &Interface) -> bool {
1694        let sock = match self.intf_socks.get_mut(intf) {
1695            Some(if_sock) => if_sock,
1696            None => return false,
1697        };
1698        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1699
1700        // Read the next mDNS UDP datagram.
1701        //
1702        // If the datagram is larger than `buf`, excess bytes may or may not
1703        // be truncated by the socket layer depending on the platform's libc.
1704        // In any case, such large datagram will not be decoded properly and
1705        // this function should return false but should not crash.
1706        let sz = match sock.recv(&mut buf) {
1707            Ok(sz) => sz,
1708            Err(e) => {
1709                if e.kind() != std::io::ErrorKind::WouldBlock {
1710                    debug!("listening socket read failed: {}", e);
1711                }
1712                return false;
1713            }
1714        };
1715
1716        trace!("received {} bytes at IP: {}", sz, intf.ip());
1717
1718        // If sz is 0, it means sock reached End-of-File.
1719        if sz == 0 {
1720            debug!("socket {:?} was likely shutdown", &sock);
1721            if let Err(e) = self.poller.registry().deregister(sock) {
1722                debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1723            }
1724
1725            // Replace the closed socket with a new one.
1726            let should_loop = if intf.ip().is_ipv4() {
1727                self.multicast_loop_v4
1728            } else {
1729                self.multicast_loop_v6
1730            };
1731            match new_socket_bind(intf, should_loop) {
1732                Ok(new_sock) => {
1733                    trace!("reset socket for IP {}", intf.ip());
1734                    self.intf_socks.insert(intf.clone(), new_sock);
1735                }
1736                Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1737            }
1738
1739            return false;
1740        }
1741
1742        buf.truncate(sz); // reduce potential processing errors
1743
1744        match DnsIncoming::new(buf) {
1745            Ok(msg) => {
1746                if msg.is_query() {
1747                    self.handle_query(msg, intf);
1748                } else if msg.is_response() {
1749                    self.handle_response(msg, intf);
1750                } else {
1751                    debug!("Invalid message: not query and not response");
1752                }
1753            }
1754            Err(e) => debug!("Invalid incoming DNS message: {}", e),
1755        }
1756
1757        true
1758    }
1759
1760    /// Returns true, if sent query. Returns false if SRV already exists.
1761    fn query_unresolved(&mut self, instance: &str) -> bool {
1762        if !valid_instance_name(instance) {
1763            trace!("instance name {} not valid", instance);
1764            return false;
1765        }
1766
1767        if let Some(records) = self.cache.get_srv(instance) {
1768            for record in records {
1769                if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1770                    if self.cache.get_addr(srv.host()).is_none() {
1771                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1772                        return true;
1773                    }
1774                }
1775            }
1776        } else {
1777            self.send_query(instance, RRType::ANY);
1778            return true;
1779        }
1780
1781        false
1782    }
1783
1784    /// Checks if `ty_domain` has records in the cache. If yes, sends the
1785    /// cached records via `sender`.
1786    fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1787        let mut resolved: HashSet<String> = HashSet::new();
1788        let mut unresolved: HashSet<String> = HashSet::new();
1789
1790        if let Some(records) = self.cache.get_ptr(ty_domain) {
1791            for record in records.iter() {
1792                if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1793                    let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1794                        Ok(ok) => ok,
1795                        Err(err) => {
1796                            debug!("Error while creating service info from cache: {}", err);
1797                            continue;
1798                        }
1799                    };
1800
1801                    match sender.send(ServiceEvent::ServiceFound(
1802                        ty_domain.to_string(),
1803                        ptr.alias().to_string(),
1804                    )) {
1805                        Ok(()) => debug!("send service found {}", ptr.alias()),
1806                        Err(e) => {
1807                            debug!("failed to send service found: {}", e);
1808                            continue;
1809                        }
1810                    }
1811
1812                    if info.is_ready() {
1813                        resolved.insert(ptr.alias().to_string());
1814                        match sender.send(ServiceEvent::ServiceResolved(info)) {
1815                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1816                            Err(e) => debug!("failed to send service resolved: {}", e),
1817                        }
1818                    } else {
1819                        unresolved.insert(ptr.alias().to_string());
1820                    }
1821                }
1822            }
1823        }
1824
1825        for instance in resolved.drain() {
1826            self.pending_resolves.remove(&instance);
1827            self.resolved.insert(instance);
1828        }
1829
1830        for instance in unresolved.drain() {
1831            self.add_pending_resolve(instance);
1832        }
1833    }
1834
1835    /// Checks if `hostname` has records in the cache. If yes, sends the
1836    /// cached records via `sender`.
1837    fn query_cache_for_hostname(
1838        &mut self,
1839        hostname: &str,
1840        sender: Sender<HostnameResolutionEvent>,
1841    ) {
1842        let addresses_map = self.cache.get_addresses_for_host(hostname);
1843        for (name, addresses) in addresses_map {
1844            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1845                Ok(()) => trace!("sent hostname addresses found"),
1846                Err(e) => debug!("failed to send hostname addresses found: {}", e),
1847            }
1848        }
1849    }
1850
1851    fn add_pending_resolve(&mut self, instance: String) {
1852        if !self.pending_resolves.contains(&instance) {
1853            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1854            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1855            self.pending_resolves.insert(instance);
1856        }
1857    }
1858
1859    fn create_service_info_from_cache(
1860        &self,
1861        ty_domain: &str,
1862        fullname: &str,
1863    ) -> Result<ServiceInfo> {
1864        let my_name = {
1865            let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1866            name.strip_suffix('.').unwrap_or(name).to_string()
1867        };
1868
1869        let now = current_time_millis();
1870        let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1871
1872        // Be sure setting `subtype` if available even when querying for the parent domain.
1873        if let Some(subtype) = self.cache.get_subtype(fullname) {
1874            trace!(
1875                "ty_domain: {} found subtype {} for instance: {}",
1876                ty_domain,
1877                subtype,
1878                fullname
1879            );
1880            if info.get_subtype().is_none() {
1881                info.set_subtype(subtype.clone());
1882            }
1883        }
1884
1885        // resolve SRV record
1886        if let Some(records) = self.cache.get_srv(fullname) {
1887            if let Some(answer) = records.first() {
1888                if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1889                    info.set_hostname(dns_srv.host().to_string());
1890                    info.set_port(dns_srv.port());
1891                }
1892            }
1893        }
1894
1895        // resolve TXT record
1896        if let Some(records) = self.cache.get_txt(fullname) {
1897            if let Some(record) = records.first() {
1898                if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1899                    info.set_properties_from_txt(dns_txt.text());
1900                }
1901            }
1902        }
1903
1904        // resolve A and AAAA records
1905        if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1906            for answer in records.iter() {
1907                if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1908                    if dns_a.get_record().is_expired(now) {
1909                        trace!("Addr expired: {}", dns_a.address());
1910                    } else {
1911                        info.insert_ipaddr(dns_a.address());
1912                    }
1913                }
1914            }
1915        }
1916
1917        Ok(info)
1918    }
1919
1920    fn handle_poller_events(&mut self, events: &mio::Events) {
1921        for ev in events.iter() {
1922            trace!("event received with key {:?}", ev.token());
1923            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1924                // Drain signals as we will drain commands as well.
1925                self.signal_sock_drain();
1926
1927                if let Err(e) = self.poller.registry().reregister(
1928                    &mut self.signal_sock,
1929                    ev.token(),
1930                    mio::Interest::READABLE,
1931                ) {
1932                    debug!("failed to modify poller for signal socket: {}", e);
1933                }
1934                continue; // Next event.
1935            }
1936
1937            // Read until no more packets available.
1938            let intf = match self.poll_ids.get(&ev.token().0) {
1939                Some(interface) => interface.clone(),
1940                None => {
1941                    debug!("Ip for event key {} not found", ev.token().0);
1942                    break;
1943                }
1944            };
1945            while self.handle_read(&intf) {}
1946
1947            // we continue to monitor this socket.
1948            if let Some(sock) = self.intf_socks.get_mut(&intf) {
1949                if let Err(e) =
1950                    self.poller
1951                        .registry()
1952                        .reregister(sock, ev.token(), mio::Interest::READABLE)
1953                {
1954                    debug!("modify poller for interface {:?}: {}", &intf, e);
1955                    break;
1956                }
1957            }
1958        }
1959    }
1960
1961    /// Deal with incoming response packets.  All answers
1962    /// are held in the cache, and listeners are notified.
1963    fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1964        trace!(
1965            "handle_response: {} answers {} authorities {} additionals",
1966            msg.answers().len(),
1967            &msg.authorities().len(),
1968            &msg.num_additionals()
1969        );
1970        let now = current_time_millis();
1971
1972        // remove records that are expired.
1973        let mut record_predicate = |record: &DnsRecordBox| {
1974            if !record.get_record().is_expired(now) {
1975                return true;
1976            }
1977
1978            debug!("record is expired, removing it from cache.");
1979            if self.cache.remove(record) {
1980                // for PTR records, send event to listeners
1981                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1982                    call_service_listener(
1983                        &self.service_queriers,
1984                        dns_ptr.get_name(),
1985                        ServiceEvent::ServiceRemoved(
1986                            dns_ptr.get_name().to_string(),
1987                            dns_ptr.alias().to_string(),
1988                        ),
1989                    );
1990                }
1991            }
1992            false
1993        };
1994        msg.answers_mut().retain(&mut record_predicate);
1995        msg.authorities_mut().retain(&mut record_predicate);
1996        msg.additionals_mut().retain(&mut record_predicate);
1997
1998        // check possible conflicts and handle them.
1999        self.conflict_handler(&msg, intf);
2000
2001        // check if the message is for us.
2002        let mut is_for_us = true; // assume it is for us.
2003
2004        // If there are any PTR records in the answers, there should be
2005        // at least one PTR for us. Otherwise, the message is not for us.
2006        // If there are no PTR records at all, assume this message is for us.
2007        for answer in msg.answers() {
2008            if answer.get_type() == RRType::PTR {
2009                if self.service_queriers.contains_key(answer.get_name()) {
2010                    is_for_us = true;
2011                    break; // OK to break: at least one PTR for us.
2012                } else {
2013                    is_for_us = false;
2014                }
2015            }
2016        }
2017
2018        /// Represents a DNS record change that involves one service instance.
2019        struct InstanceChange {
2020            ty: RRType,   // The type of DNS record for the instance.
2021            name: String, // The name of the record.
2022        }
2023
2024        // Go through all answers to get the new and updated records.
2025        // For new PTR records, send out ServiceFound immediately. For others,
2026        // collect them into `changes`.
2027        //
2028        // Note: we don't try to identify the update instances based on
2029        // each record immediately as the answers are likely related to each
2030        // other.
2031        let mut changes = Vec::new();
2032        let mut timers = Vec::new();
2033        for record in msg.all_records() {
2034            match self
2035                .cache
2036                .add_or_update(intf, record, &mut timers, is_for_us)
2037            {
2038                Some((dns_record, true)) => {
2039                    timers.push(dns_record.get_record().get_expire_time());
2040                    timers.push(dns_record.get_record().get_refresh_time());
2041
2042                    let ty = dns_record.get_type();
2043                    let name = dns_record.get_name();
2044                    if ty == RRType::PTR {
2045                        if self.service_queriers.contains_key(name) {
2046                            timers.push(dns_record.get_record().get_refresh_time());
2047                        }
2048
2049                        // send ServiceFound
2050                        if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2051                            call_service_listener(
2052                                &self.service_queriers,
2053                                name,
2054                                ServiceEvent::ServiceFound(
2055                                    name.to_string(),
2056                                    dns_ptr.alias().to_string(),
2057                                ),
2058                            );
2059                            changes.push(InstanceChange {
2060                                ty,
2061                                name: dns_ptr.alias().to_string(),
2062                            });
2063                        }
2064                    } else {
2065                        changes.push(InstanceChange {
2066                            ty,
2067                            name: name.to_string(),
2068                        });
2069                    }
2070                }
2071                Some((dns_record, false)) => {
2072                    timers.push(dns_record.get_record().get_expire_time());
2073                    timers.push(dns_record.get_record().get_refresh_time());
2074                }
2075                _ => {}
2076            }
2077        }
2078
2079        // Add timers for the new records.
2080        for t in timers {
2081            self.add_timer(t);
2082        }
2083
2084        // Go through remaining changes to see if any hostname resolutions were found or updated.
2085        for change in changes
2086            .iter()
2087            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2088        {
2089            let addr_map = self.cache.get_addresses_for_host(&change.name);
2090            for (name, addresses) in addr_map {
2091                call_hostname_resolution_listener(
2092                    &self.hostname_resolvers,
2093                    &change.name,
2094                    HostnameResolutionEvent::AddressesFound(name, addresses),
2095                )
2096            }
2097        }
2098
2099        // Identify the instances that need to be "resolved".
2100        let mut updated_instances = HashSet::new();
2101        for update in changes {
2102            match update.ty {
2103                RRType::PTR | RRType::SRV | RRType::TXT => {
2104                    updated_instances.insert(update.name);
2105                }
2106                RRType::A | RRType::AAAA => {
2107                    let instances = self.cache.get_instances_on_host(&update.name);
2108                    updated_instances.extend(instances);
2109                }
2110                _ => {}
2111            }
2112        }
2113
2114        self.resolve_updated_instances(&updated_instances);
2115    }
2116
2117    fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2118        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2119            return;
2120        };
2121
2122        for answer in msg.answers().iter() {
2123            let mut new_records = Vec::new();
2124
2125            let name = answer.get_name();
2126            let Some(probe) = dns_registry.probing.get_mut(name) else {
2127                continue;
2128            };
2129
2130            // check against possible multicast forwarding
2131            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2132                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2133                    if !valid_ip_on_intf(&answer_addr.address(), intf) {
2134                        debug!(
2135                            "conflict handler: answer addr {:?} not in the subnet of {:?}",
2136                            answer_addr, intf
2137                        );
2138                        continue;
2139                    }
2140                }
2141
2142                // double check if any other address record matches rrdata,
2143                // as there could be multiple addresses for the same name.
2144                let any_match = probe.records.iter().any(|r| {
2145                    r.get_type() == answer.get_type()
2146                        && r.get_class() == answer.get_class()
2147                        && r.rrdata_match(answer.as_ref())
2148                });
2149                if any_match {
2150                    continue; // no conflict for this answer.
2151                }
2152            }
2153
2154            probe.records.retain(|record| {
2155                if record.get_type() == answer.get_type()
2156                    && record.get_class() == answer.get_class()
2157                    && !record.rrdata_match(answer.as_ref())
2158                {
2159                    debug!(
2160                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2161                        record.get_type(),
2162                        record.rdata_print(),
2163                        answer.rdata_print()
2164                    );
2165
2166                    // create a new name for this record
2167                    // then remove the old record in probing.
2168                    let mut new_record = record.clone();
2169                    let new_name = match record.get_type() {
2170                        RRType::A => hostname_change(name),
2171                        RRType::AAAA => hostname_change(name),
2172                        _ => name_change(name),
2173                    };
2174                    new_record.get_record_mut().set_new_name(new_name);
2175                    new_records.push(new_record);
2176                    return false; // old record is dropped from the probe.
2177                }
2178
2179                true
2180            });
2181
2182            // ?????
2183            // if probe.records.is_empty() {
2184            //     dns_registry.probing.remove(name);
2185            // }
2186
2187            // Probing again with the new names.
2188            let create_time = current_time_millis() + fastrand::u64(0..250);
2189
2190            let waiting_services = probe.waiting_services.clone();
2191
2192            for record in new_records {
2193                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2194                    self.timers.push(Reverse(create_time));
2195                }
2196
2197                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2198                dns_registry.name_changes.insert(
2199                    record.get_record().get_original_name().to_string(),
2200                    record.get_name().to_string(),
2201                );
2202
2203                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2204                    Some(p) => p,
2205                    None => {
2206                        let new_probe = dns_registry
2207                            .probing
2208                            .entry(record.get_name().to_string())
2209                            .or_insert_with(|| {
2210                                debug!("conflict handler: new probe of {}", record.get_name());
2211                                Probe::new(create_time)
2212                            });
2213                        self.timers.push(Reverse(new_probe.next_send));
2214                        new_probe
2215                    }
2216                };
2217
2218                debug!(
2219                    "insert record with new name '{}' {} into probe",
2220                    record.get_name(),
2221                    record.get_type()
2222                );
2223                new_probe.insert_record(record);
2224
2225                new_probe.waiting_services.extend(waiting_services.clone());
2226            }
2227        }
2228    }
2229
2230    /// Resolve the updated (including new) instances.
2231    ///
2232    /// Note: it is possible that more than 1 PTR pointing to the same
2233    /// instance. For example, a regular service type PTR and a sub-type
2234    /// service type PTR can both point to the same service instance.
2235    /// This loop automatically handles the sub-type PTRs.
2236    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2237        let mut resolved: HashSet<String> = HashSet::new();
2238        let mut unresolved: HashSet<String> = HashSet::new();
2239        let mut removed_instances = HashMap::new();
2240
2241        for (ty_domain, records) in self.cache.all_ptr().iter() {
2242            if !self.service_queriers.contains_key(ty_domain) {
2243                // No need to resolve if not in our queries.
2244                continue;
2245            }
2246
2247            for record in records.iter() {
2248                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2249                    if updated_instances.contains(dns_ptr.alias()) {
2250                        if let Ok(info) =
2251                            self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2252                        {
2253                            if info.is_ready() {
2254                                debug!("call queriers to resolve {}", dns_ptr.alias());
2255                                resolved.insert(dns_ptr.alias().to_string());
2256                                call_service_listener(
2257                                    &self.service_queriers,
2258                                    ty_domain,
2259                                    ServiceEvent::ServiceResolved(info),
2260                                );
2261                            } else {
2262                                if self.resolved.remove(dns_ptr.alias()) {
2263                                    removed_instances
2264                                        .entry(ty_domain.to_string())
2265                                        .or_insert_with(HashSet::new)
2266                                        .insert(dns_ptr.alias().to_string());
2267                                }
2268                                unresolved.insert(dns_ptr.alias().to_string());
2269                            }
2270                        }
2271                    }
2272                }
2273            }
2274        }
2275
2276        for instance in resolved.drain() {
2277            self.pending_resolves.remove(&instance);
2278            self.resolved.insert(instance);
2279        }
2280
2281        for instance in unresolved.drain() {
2282            self.add_pending_resolve(instance);
2283        }
2284
2285        self.notify_service_removal(removed_instances);
2286    }
2287
2288    /// Handle incoming query packets, figure out whether and what to respond.
2289    fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2290        let sock = match self.intf_socks.get(intf) {
2291            Some(sock) => sock,
2292            None => return,
2293        };
2294        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2295
2296        // Special meta-query "_services._dns-sd._udp.<Domain>".
2297        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2298        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2299
2300        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2301            debug!("missing dns registry for intf {}", intf.ip());
2302            return;
2303        };
2304
2305        for question in msg.questions().iter() {
2306            trace!("query question: {:?}", &question);
2307
2308            let qtype = question.entry_type();
2309
2310            if qtype == RRType::PTR {
2311                for service in self.my_services.values() {
2312                    if service.get_status(intf) != ServiceStatus::Announced {
2313                        continue;
2314                    }
2315
2316                    if question.entry_name() == service.get_type()
2317                        || service
2318                            .get_subtype()
2319                            .as_ref()
2320                            .is_some_and(|v| v == question.entry_name())
2321                    {
2322                        add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2323                    } else if question.entry_name() == META_QUERY {
2324                        let ptr_added = out.add_answer(
2325                            &msg,
2326                            DnsPointer::new(
2327                                question.entry_name(),
2328                                RRType::PTR,
2329                                CLASS_IN,
2330                                service.get_other_ttl(),
2331                                service.get_type().to_string(),
2332                            ),
2333                        );
2334                        if !ptr_added {
2335                            trace!("answer was not added for meta-query {:?}", &question);
2336                        }
2337                    }
2338                }
2339            } else {
2340                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2341                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2342                    let probe_name = question.entry_name();
2343
2344                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2345                        let now = current_time_millis();
2346
2347                        // Only do tiebreaking if probe already started.
2348                        // This check also helps avoid redo tiebreaking if start time
2349                        // was postponed.
2350                        if probe.start_time < now {
2351                            let incoming_records: Vec<_> = msg
2352                                .authorities()
2353                                .iter()
2354                                .filter(|r| r.get_name() == probe_name)
2355                                .collect();
2356
2357                            probe.tiebreaking(&incoming_records, now, probe_name);
2358                        }
2359                    }
2360                }
2361
2362                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2363                    for service in self.my_services.values() {
2364                        if service.get_status(intf) != ServiceStatus::Announced {
2365                            continue;
2366                        }
2367
2368                        let service_hostname =
2369                            match dns_registry.name_changes.get(service.get_hostname()) {
2370                                Some(new_name) => new_name,
2371                                None => service.get_hostname(),
2372                            };
2373
2374                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2375                            let intf_addrs = service.get_addrs_on_intf(intf);
2376                            if intf_addrs.is_empty()
2377                                && (qtype == RRType::A || qtype == RRType::AAAA)
2378                            {
2379                                let t = match qtype {
2380                                    RRType::A => "TYPE_A",
2381                                    RRType::AAAA => "TYPE_AAAA",
2382                                    _ => "invalid_type",
2383                                };
2384                                trace!(
2385                                    "Cannot find valid addrs for {} response on intf {:?}",
2386                                    t,
2387                                    &intf
2388                                );
2389                                return;
2390                            }
2391                            for address in intf_addrs {
2392                                out.add_answer(
2393                                    &msg,
2394                                    DnsAddress::new(
2395                                        service_hostname,
2396                                        ip_address_rr_type(&address),
2397                                        CLASS_IN | CLASS_CACHE_FLUSH,
2398                                        service.get_host_ttl(),
2399                                        address,
2400                                    ),
2401                                );
2402                            }
2403                        }
2404                    }
2405                }
2406
2407                let query_name = question.entry_name().to_lowercase();
2408                let service_opt = self
2409                    .my_services
2410                    .iter()
2411                    .find(|(k, _v)| {
2412                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2413                            Some(new_name) => new_name,
2414                            None => k,
2415                        };
2416                        service_name == &query_name
2417                    })
2418                    .map(|(_, v)| v);
2419
2420                let Some(service) = service_opt else {
2421                    continue;
2422                };
2423
2424                if service.get_status(intf) != ServiceStatus::Announced {
2425                    continue;
2426                }
2427
2428                if qtype == RRType::SRV || qtype == RRType::ANY {
2429                    out.add_answer(
2430                        &msg,
2431                        DnsSrv::new(
2432                            question.entry_name(),
2433                            CLASS_IN | CLASS_CACHE_FLUSH,
2434                            service.get_host_ttl(),
2435                            service.get_priority(),
2436                            service.get_weight(),
2437                            service.get_port(),
2438                            service.get_hostname().to_string(),
2439                        ),
2440                    );
2441                }
2442
2443                if qtype == RRType::TXT || qtype == RRType::ANY {
2444                    out.add_answer(
2445                        &msg,
2446                        DnsTxt::new(
2447                            question.entry_name(),
2448                            CLASS_IN | CLASS_CACHE_FLUSH,
2449                            service.get_host_ttl(),
2450                            service.generate_txt(),
2451                        ),
2452                    );
2453                }
2454
2455                if qtype == RRType::SRV {
2456                    let intf_addrs = service.get_addrs_on_intf(intf);
2457                    if intf_addrs.is_empty() {
2458                        debug!(
2459                            "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2460                            &intf
2461                        );
2462                        return;
2463                    }
2464                    for address in intf_addrs {
2465                        out.add_additional_answer(DnsAddress::new(
2466                            service.get_hostname(),
2467                            ip_address_rr_type(&address),
2468                            CLASS_IN | CLASS_CACHE_FLUSH,
2469                            service.get_host_ttl(),
2470                            address,
2471                        ));
2472                    }
2473                }
2474            }
2475        }
2476
2477        if !out.answers_count() > 0 {
2478            out.set_id(msg.id());
2479            send_dns_outgoing(&out, intf, sock);
2480
2481            self.increase_counter(Counter::Respond, 1);
2482            self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2483        }
2484
2485        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2486    }
2487
2488    /// Increases the value of `counter` by `count`.
2489    fn increase_counter(&mut self, counter: Counter, count: i64) {
2490        let key = counter.to_string();
2491        match self.counters.get_mut(&key) {
2492            Some(v) => *v += count,
2493            None => {
2494                self.counters.insert(key, count);
2495            }
2496        }
2497    }
2498
2499    /// Sets the value of `counter` to `count`.
2500    fn set_counter(&mut self, counter: Counter, count: i64) {
2501        let key = counter.to_string();
2502        self.counters.insert(key, count);
2503    }
2504
2505    fn signal_sock_drain(&self) {
2506        let mut signal_buf = [0; 1024];
2507
2508        // This recv is non-blocking as the socket is non-blocking.
2509        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2510            trace!(
2511                "signal socket recvd: {}",
2512                String::from_utf8_lossy(&signal_buf[0..sz])
2513            );
2514        }
2515    }
2516
2517    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2518        self.retransmissions.push(ReRun { next_time, command });
2519        self.add_timer(next_time);
2520    }
2521
2522    /// Sends service removal event to listeners for expired service records.
2523    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2524        for (ty_domain, sender) in self.service_queriers.iter() {
2525            if let Some(instances) = expired.get(ty_domain) {
2526                for instance_name in instances {
2527                    let event = ServiceEvent::ServiceRemoved(
2528                        ty_domain.to_string(),
2529                        instance_name.to_string(),
2530                    );
2531                    match sender.send(event) {
2532                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2533                        Err(e) => debug!("Failed to send event: {}", e),
2534                    }
2535                }
2536            }
2537        }
2538    }
2539
2540    /// The entry point that executes all commands received by the daemon.
2541    ///
2542    /// `repeating`: whether this is a retransmission.
2543    fn exec_command(&mut self, command: Command, repeating: bool) {
2544        match command {
2545            Command::Browse(ty, next_delay, listener) => {
2546                self.exec_command_browse(repeating, ty, next_delay, listener);
2547            }
2548
2549            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2550                self.exec_command_resolve_hostname(
2551                    repeating, hostname, next_delay, listener, timeout,
2552                );
2553            }
2554
2555            Command::Register(service_info) => {
2556                self.register_service(service_info);
2557                self.increase_counter(Counter::Register, 1);
2558            }
2559
2560            Command::RegisterResend(fullname, intf) => {
2561                trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2562                self.exec_command_register_resend(fullname, intf);
2563            }
2564
2565            Command::Unregister(fullname, resp_s) => {
2566                trace!("unregister service {} repeat {}", &fullname, &repeating);
2567                self.exec_command_unregister(repeating, fullname, resp_s);
2568            }
2569
2570            Command::UnregisterResend(packet, ip) => {
2571                self.exec_command_unregister_resend(packet, ip);
2572            }
2573
2574            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2575
2576            Command::StopResolveHostname(hostname) => {
2577                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2578            }
2579
2580            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2581
2582            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2583
2584            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2585                Ok(()) => trace!("Sent status to the client"),
2586                Err(e) => debug!("Failed to send status: {}", e),
2587            },
2588
2589            Command::Monitor(resp_s) => {
2590                self.monitors.push(resp_s);
2591            }
2592
2593            Command::SetOption(daemon_opt) => {
2594                self.process_set_option(daemon_opt);
2595            }
2596
2597            Command::GetOption(resp_s) => {
2598                let val = DaemonOptionVal {
2599                    _service_name_len_max: self.service_name_len_max,
2600                    ip_check_interval: self.ip_check_interval,
2601                };
2602                if let Err(e) = resp_s.send(val) {
2603                    debug!("Failed to send options: {}", e);
2604                }
2605            }
2606
2607            Command::Verify(instance_fullname, timeout) => {
2608                self.exec_command_verify(instance_fullname, timeout, repeating);
2609            }
2610
2611            _ => {
2612                debug!("unexpected command: {:?}", &command);
2613            }
2614        }
2615    }
2616
2617    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2618        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2619        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2620        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2621        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2622        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2623        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2624        self.set_counter(Counter::Timer, self.timers.len() as i64);
2625
2626        let dns_registry_probe_count: usize = self
2627            .dns_registry_map
2628            .values()
2629            .map(|r| r.probing.len())
2630            .sum();
2631        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2632
2633        let dns_registry_active_count: usize = self
2634            .dns_registry_map
2635            .values()
2636            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2637            .sum();
2638        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2639
2640        let dns_registry_timer_count: usize = self
2641            .dns_registry_map
2642            .values()
2643            .map(|r| r.new_timers.len())
2644            .sum();
2645        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2646
2647        let dns_registry_name_change_count: usize = self
2648            .dns_registry_map
2649            .values()
2650            .map(|r| r.name_changes.len())
2651            .sum();
2652        self.set_counter(
2653            Counter::DnsRegistryNameChange,
2654            dns_registry_name_change_count as i64,
2655        );
2656
2657        // Send the metrics to the client.
2658        if let Err(e) = resp_s.send(self.counters.clone()) {
2659            debug!("Failed to send metrics: {}", e);
2660        }
2661    }
2662
2663    fn exec_command_browse(
2664        &mut self,
2665        repeating: bool,
2666        ty: String,
2667        next_delay: u32,
2668        listener: Sender<ServiceEvent>,
2669    ) {
2670        let pretty_addrs: Vec<String> = self
2671            .intf_socks
2672            .keys()
2673            .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2674            .collect();
2675
2676        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2677            "{ty} on {} interfaces [{}]",
2678            pretty_addrs.len(),
2679            pretty_addrs.join(", ")
2680        ))) {
2681            debug!(
2682                "Failed to send SearchStarted({})(repeating:{}): {}",
2683                &ty, repeating, e
2684            );
2685            return;
2686        }
2687        if !repeating {
2688            // Binds a `listener` to querying mDNS domain type `ty`.
2689            //
2690            // If there is already a `listener`, it will be updated, i.e. overwritten.
2691            self.service_queriers.insert(ty.clone(), listener.clone());
2692
2693            // if we already have the records in our cache, just send them
2694            self.query_cache_for_service(&ty, &listener);
2695        }
2696
2697        self.send_query(&ty, RRType::PTR);
2698        self.increase_counter(Counter::Browse, 1);
2699
2700        let next_time = current_time_millis() + (next_delay * 1000) as u64;
2701        let max_delay = 60 * 60;
2702        let delay = cmp::min(next_delay * 2, max_delay);
2703        self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2704    }
2705
2706    fn exec_command_resolve_hostname(
2707        &mut self,
2708        repeating: bool,
2709        hostname: String,
2710        next_delay: u32,
2711        listener: Sender<HostnameResolutionEvent>,
2712        timeout: Option<u64>,
2713    ) {
2714        let addr_list: Vec<_> = self.intf_socks.keys().collect();
2715        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2716            "{} on addrs {:?}",
2717            &hostname, &addr_list
2718        ))) {
2719            debug!(
2720                "Failed to send ResolveStarted({})(repeating:{}): {}",
2721                &hostname, repeating, e
2722            );
2723            return;
2724        }
2725        if !repeating {
2726            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2727            // if we already have the records in our cache, just send them
2728            self.query_cache_for_hostname(&hostname, listener.clone());
2729        }
2730
2731        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2732        self.increase_counter(Counter::ResolveHostname, 1);
2733
2734        let now = current_time_millis();
2735        let next_time = now + u64::from(next_delay) * 1000;
2736        let max_delay = 60 * 60;
2737        let delay = cmp::min(next_delay * 2, max_delay);
2738
2739        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
2740        if self
2741            .hostname_resolvers
2742            .get(&hostname)
2743            .and_then(|(_sender, timeout)| *timeout)
2744            .map(|timeout| next_time < timeout)
2745            .unwrap_or(true)
2746        {
2747            self.add_retransmission(
2748                next_time,
2749                Command::ResolveHostname(hostname, delay, listener, None),
2750            );
2751        }
2752    }
2753
2754    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2755        let pending_query = self.query_unresolved(&instance);
2756        let max_try = 3;
2757        if pending_query && try_count < max_try {
2758            // Note that if the current try already succeeds, the next retransmission
2759            // will be no-op as the cache has been updated.
2760            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2761            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2762        }
2763    }
2764
2765    fn exec_command_unregister(
2766        &mut self,
2767        repeating: bool,
2768        fullname: String,
2769        resp_s: Sender<UnregisterStatus>,
2770    ) {
2771        let response = match self.my_services.remove_entry(&fullname) {
2772            None => {
2773                debug!("unregister: cannot find such service {}", &fullname);
2774                UnregisterStatus::NotFound
2775            }
2776            Some((_k, info)) => {
2777                let mut timers = Vec::new();
2778                // Send one unregister per interface and ip version
2779                let mut multicast_sent_trackers = HashSet::new();
2780
2781                for (intf, sock) in self.intf_socks.iter() {
2782                    if let Some(tracker) = multicast_send_tracker(intf) {
2783                        if multicast_sent_trackers.contains(&tracker) {
2784                            continue; // no need to send unregister the same interface with same ip version.
2785                        }
2786                        multicast_sent_trackers.insert(tracker);
2787                    }
2788                    let packet = self.unregister_service(&info, intf, sock);
2789                    // repeat for one time just in case some peers miss the message
2790                    if !repeating && !packet.is_empty() {
2791                        let next_time = current_time_millis() + 120;
2792                        self.retransmissions.push(ReRun {
2793                            next_time,
2794                            command: Command::UnregisterResend(packet, intf.clone()),
2795                        });
2796                        timers.push(next_time);
2797                    }
2798                }
2799
2800                for t in timers {
2801                    self.add_timer(t);
2802                }
2803
2804                self.increase_counter(Counter::Unregister, 1);
2805                UnregisterStatus::OK
2806            }
2807        };
2808        if let Err(e) = resp_s.send(response) {
2809            debug!("unregister: failed to send response: {}", e);
2810        }
2811    }
2812
2813    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2814        if let Some(sock) = self.intf_socks.get(&intf) {
2815            debug!("UnregisterResend from {}", &intf.ip());
2816            multicast_on_intf(&packet[..], &intf, sock);
2817            self.increase_counter(Counter::UnregisterResend, 1);
2818        }
2819    }
2820
2821    fn exec_command_stop_browse(&mut self, ty_domain: String) {
2822        match self.service_queriers.remove_entry(&ty_domain) {
2823            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2824            Some((ty, sender)) => {
2825                // Remove pending browse commands in the reruns.
2826                trace!("StopBrowse: removed queryer for {}", &ty);
2827                let mut i = 0;
2828                while i < self.retransmissions.len() {
2829                    if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2830                        if t == &ty {
2831                            self.retransmissions.remove(i);
2832                            trace!("StopBrowse: removed retransmission for {}", &ty);
2833                            continue;
2834                        }
2835                    }
2836                    i += 1;
2837                }
2838
2839                // Remove cache entries.
2840                self.cache.remove_service_type(&ty_domain);
2841
2842                // Notify the client.
2843                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2844                    Ok(()) => trace!("Sent SearchStopped to the listener"),
2845                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
2846                }
2847            }
2848        }
2849    }
2850
2851    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2852        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2853            // Remove pending resolve commands in the reruns.
2854            trace!("StopResolve: removed queryer for {}", &host);
2855            let mut i = 0;
2856            while i < self.retransmissions.len() {
2857                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2858                    if t == &host {
2859                        self.retransmissions.remove(i);
2860                        trace!("StopResolve: removed retransmission for {}", &host);
2861                        continue;
2862                    }
2863                }
2864                i += 1;
2865            }
2866
2867            // Notify the client.
2868            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2869                Ok(()) => trace!("Sent SearchStopped to the listener"),
2870                Err(e) => debug!("Failed to send SearchStopped: {}", e),
2871            }
2872        }
2873    }
2874
2875    fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2876        let Some(info) = self.my_services.get_mut(&fullname) else {
2877            trace!("announce: cannot find such service {}", &fullname);
2878            return;
2879        };
2880
2881        let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2882            return;
2883        };
2884
2885        let Some(sock) = self.intf_socks.get(&intf) else {
2886            return;
2887        };
2888
2889        if announce_service_on_intf(dns_registry, info, &intf, sock) {
2890            let mut hostname = info.get_hostname();
2891            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2892                hostname = new_name;
2893            }
2894            let service_name = match dns_registry.name_changes.get(&fullname) {
2895                Some(new_name) => new_name.to_string(),
2896                None => fullname,
2897            };
2898
2899            debug!("resend: announce service {} on {}", service_name, intf.ip());
2900
2901            notify_monitors(
2902                &mut self.monitors,
2903                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2904            );
2905            info.set_status(&intf, ServiceStatus::Announced);
2906        } else {
2907            debug!("register-resend should not fail");
2908        }
2909
2910        self.increase_counter(Counter::RegisterResend, 1);
2911    }
2912
2913    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2914        /*
2915        RFC 6762 section 10.4:
2916        ...
2917        When the cache receives this hint that it should reconfirm some
2918        record, it MUST issue two or more queries for the resource record in
2919        dispute.  If no response is received within ten seconds, then, even
2920        though its TTL may indicate that it is not yet due to expire, that
2921        record SHOULD be promptly flushed from the cache.
2922        */
2923        let now = current_time_millis();
2924        let expire_at = if repeating {
2925            None
2926        } else {
2927            Some(now + timeout.as_millis() as u64)
2928        };
2929
2930        // send query for the resource records.
2931        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2932
2933        if !record_vec.is_empty() {
2934            let query_vec: Vec<(&str, RRType)> = record_vec
2935                .iter()
2936                .map(|(record, rr_type)| (record.as_str(), *rr_type))
2937                .collect();
2938            self.send_query_vec(&query_vec);
2939
2940            if let Some(new_expire) = expire_at {
2941                self.add_timer(new_expire); // ensure a check for the new expire time.
2942
2943                // schedule a resend 1 second later
2944                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2945            }
2946        }
2947    }
2948
2949    /// Refresh cached service records with active queriers
2950    fn refresh_active_services(&mut self) {
2951        let mut query_ptr_count = 0;
2952        let mut query_srv_count = 0;
2953        let mut new_timers = HashSet::new();
2954        let mut query_addr_count = 0;
2955
2956        for (ty_domain, _sender) in self.service_queriers.iter() {
2957            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2958            if !refreshed_timers.is_empty() {
2959                trace!("sending refresh query for PTR: {}", ty_domain);
2960                self.send_query(ty_domain, RRType::PTR);
2961                query_ptr_count += 1;
2962                new_timers.extend(refreshed_timers);
2963            }
2964
2965            let (instances, timers) = self.cache.refresh_due_srv(ty_domain);
2966            for instance in instances.iter() {
2967                trace!("sending refresh query for SRV: {}", instance);
2968                self.send_query(instance, RRType::SRV);
2969                query_srv_count += 1;
2970            }
2971            new_timers.extend(timers);
2972            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2973            for hostname in hostnames.iter() {
2974                trace!("sending refresh queries for A and AAAA:  {}", hostname);
2975                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2976                query_addr_count += 2;
2977            }
2978            new_timers.extend(timers);
2979        }
2980
2981        for timer in new_timers {
2982            self.add_timer(timer);
2983        }
2984
2985        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2986        self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
2987        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2988    }
2989}
2990
2991/// All possible events sent to the client from the daemon
2992/// regarding service discovery.
2993#[derive(Debug)]
2994pub enum ServiceEvent {
2995    /// Started searching for a service type.
2996    SearchStarted(String),
2997
2998    /// Found a specific (service_type, fullname).
2999    ServiceFound(String, String),
3000
3001    /// Resolved a service instance with detailed info.
3002    ServiceResolved(ServiceInfo),
3003
3004    /// A service instance (service_type, fullname) was removed.
3005    ServiceRemoved(String, String),
3006
3007    /// Stopped searching for a service type.
3008    SearchStopped(String),
3009}
3010
3011/// All possible events sent to the client from the daemon
3012/// regarding host resolution.
3013#[derive(Debug)]
3014#[non_exhaustive]
3015pub enum HostnameResolutionEvent {
3016    /// Started searching for the ip address of a hostname.
3017    SearchStarted(String),
3018    /// One or more addresses for a hostname has been found.
3019    AddressesFound(String, HashSet<IpAddr>),
3020    /// One or more addresses for a hostname has been removed.
3021    AddressesRemoved(String, HashSet<IpAddr>),
3022    /// The search for the ip address of a hostname has timed out.
3023    SearchTimeout(String),
3024    /// Stopped searching for the ip address of a hostname.
3025    SearchStopped(String),
3026}
3027
3028/// Some notable events from the daemon besides [`ServiceEvent`].
3029/// These events are expected to happen infrequently.
3030#[derive(Clone, Debug)]
3031#[non_exhaustive]
3032pub enum DaemonEvent {
3033    /// Daemon unsolicitly announced a service from an interface.
3034    Announce(String, String),
3035
3036    /// Daemon encountered an error.
3037    Error(Error),
3038
3039    /// Daemon detected a new IP address from the host.
3040    IpAdd(IpAddr),
3041
3042    /// Daemon detected a IP address removed from the host.
3043    IpDel(IpAddr),
3044
3045    /// Daemon resolved a name conflict by changing one of its names.
3046    /// see [DnsNameChange] for more details.
3047    NameChange(DnsNameChange),
3048
3049    /// Send out a multicast response via an IP address.
3050    Respond(IpAddr),
3051}
3052
3053/// Represents a name change due to a name conflict resolution.
3054/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3055#[derive(Clone, Debug)]
3056pub struct DnsNameChange {
3057    /// The original name set in `ServiceInfo` by the user.
3058    pub original: String,
3059
3060    /// A new name is created by appending a suffix after the original name.
3061    ///
3062    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3063    /// - for a host name, the suffix is `-N`, where N starts at 2.
3064    ///
3065    /// For example:
3066    ///
3067    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3068    /// - Host name `foo.local.` becomes `foo-2.local.`
3069    pub new_name: String,
3070
3071    /// The resource record type
3072    pub rr_type: RRType,
3073
3074    /// The interface where the name conflict and its change happened.
3075    pub intf_name: String,
3076}
3077
3078/// Commands supported by the daemon
3079#[derive(Debug)]
3080enum Command {
3081    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3082    Browse(String, u32, Sender<ServiceEvent>),
3083
3084    /// Resolve a hostname to IP addresses.
3085    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3086
3087    /// Register a service
3088    Register(ServiceInfo),
3089
3090    /// Unregister a service
3091    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3092
3093    /// Announce again a service to local network
3094    RegisterResend(String, Interface), // (fullname)
3095
3096    /// Resend unregister packet.
3097    UnregisterResend(Vec<u8>, Interface), // (packet content)
3098
3099    /// Stop browsing a service type
3100    StopBrowse(String), // (ty_domain)
3101
3102    /// Stop resolving a hostname
3103    StopResolveHostname(String), // (hostname)
3104
3105    /// Send query to resolve a service instance.
3106    /// This is used when a PTR record exists but SRV & TXT records are missing.
3107    Resolve(String, u16), // (service_instance_fullname, try_count)
3108
3109    /// Read the current values of the counters
3110    GetMetrics(Sender<Metrics>),
3111
3112    /// Get the current status of the daemon.
3113    GetStatus(Sender<DaemonStatus>),
3114
3115    /// Monitor noticable events in the daemon.
3116    Monitor(Sender<DaemonEvent>),
3117
3118    SetOption(DaemonOption),
3119
3120    GetOption(Sender<DaemonOptionVal>),
3121
3122    /// Proactively confirm a DNS resource record.
3123    ///
3124    /// The intention is to check if a service name or IP address still valid
3125    /// before its TTL expires.
3126    Verify(String, Duration),
3127
3128    Exit(Sender<DaemonStatus>),
3129}
3130
3131impl fmt::Display for Command {
3132    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3133        match self {
3134            Self::Browse(_, _, _) => write!(f, "Command Browse"),
3135            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3136            Self::Exit(_) => write!(f, "Command Exit"),
3137            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3138            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3139            Self::Monitor(_) => write!(f, "Command Monitor"),
3140            Self::Register(_) => write!(f, "Command Register"),
3141            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3142            Self::SetOption(_) => write!(f, "Command SetOption"),
3143            Self::GetOption(_) => write!(f, "Command GetOption"),
3144            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3145            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3146            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3147            Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3148            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3149            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3150        }
3151    }
3152}
3153
3154struct DaemonOptionVal {
3155    _service_name_len_max: u8,
3156    ip_check_interval: u64,
3157}
3158
3159#[derive(Debug)]
3160enum DaemonOption {
3161    ServiceNameLenMax(u8),
3162    IpCheckInterval(u64),
3163    EnableInterface(Vec<IfKind>),
3164    DisableInterface(Vec<IfKind>),
3165    MulticastLoopV4(bool),
3166    MulticastLoopV6(bool),
3167}
3168
3169/// The length of Service Domain name supported in this lib.
3170const DOMAIN_LEN: usize = "._tcp.local.".len();
3171
3172/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3173fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3174    if ty_domain.len() <= DOMAIN_LEN + 1 {
3175        // service name cannot be empty or only '_'.
3176        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3177    }
3178
3179    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3180    if service_name_len > limit as usize {
3181        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3182    }
3183    Ok(())
3184}
3185
3186/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3187fn check_domain_suffix(name: &str) -> Result<()> {
3188    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3189        return Err(e_fmt!(
3190            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3191            name
3192        ));
3193    }
3194
3195    Ok(())
3196}
3197
3198/// Validate the service name in a fully qualified name.
3199///
3200/// A Full Name = <Instance>.<Service>.<Domain>
3201/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3202///
3203/// Note: this function does not check for the length of the service name.
3204/// Instead, `register_service` method will check the length.
3205fn check_service_name(fullname: &str) -> Result<()> {
3206    check_domain_suffix(fullname)?;
3207
3208    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3209    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3210
3211    if &name[0..1] != "_" {
3212        return Err(e_fmt!("Service name must start with '_'"));
3213    }
3214
3215    let name = &name[1..];
3216
3217    if name.contains("--") {
3218        return Err(e_fmt!("Service name must not contain '--'"));
3219    }
3220
3221    if name.starts_with('-') || name.ends_with('-') {
3222        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3223    }
3224
3225    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3226    if ascii_count < 1 {
3227        return Err(e_fmt!(
3228            "Service name must contain at least one letter (eg: 'A-Za-z')"
3229        ));
3230    }
3231
3232    Ok(())
3233}
3234
3235/// Validate a hostname.
3236fn check_hostname(hostname: &str) -> Result<()> {
3237    if !hostname.ends_with(".local.") {
3238        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3239    }
3240
3241    if hostname == ".local." {
3242        return Err(e_fmt!(
3243            "The part of the hostname before '.local.' cannot be empty"
3244        ));
3245    }
3246
3247    if hostname.len() > 255 {
3248        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3249    }
3250
3251    Ok(())
3252}
3253
3254fn call_service_listener(
3255    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3256    ty_domain: &str,
3257    event: ServiceEvent,
3258) {
3259    if let Some(listener) = listeners_map.get(ty_domain) {
3260        match listener.send(event) {
3261            Ok(()) => trace!("Sent event to listener successfully"),
3262            Err(e) => debug!("Failed to send event: {}", e),
3263        }
3264    }
3265}
3266
3267fn call_hostname_resolution_listener(
3268    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3269    hostname: &str,
3270    event: HostnameResolutionEvent,
3271) {
3272    let hostname_lower = hostname.to_lowercase();
3273    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3274        match listener.send(event) {
3275            Ok(()) => trace!("Sent event to listener successfully"),
3276            Err(e) => debug!("Failed to send event: {}", e),
3277        }
3278    }
3279}
3280
3281/// Returns valid network interfaces in the host system.
3282/// Loopback interfaces are excluded.
3283fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3284    if_addrs::get_if_addrs()
3285        .unwrap_or_default()
3286        .into_iter()
3287        .filter(|i| !i.is_loopback() || with_loopback)
3288        .collect()
3289}
3290
3291/// Send an outgoing mDNS query or response, and returns the packet bytes.
3292fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3293    let qtype = if out.is_query() { "query" } else { "response" };
3294    trace!(
3295        "send outgoing {}: {} questions {} answers {} authorities {} additional",
3296        qtype,
3297        out.questions().len(),
3298        out.answers_count(),
3299        out.authorities().len(),
3300        out.additionals().len()
3301    );
3302    let packet_list = out.to_data_on_wire();
3303    for packet in packet_list.iter() {
3304        multicast_on_intf(packet, intf, sock);
3305    }
3306    packet_list
3307}
3308
3309/// Sends a multicast packet, and returns the packet bytes.
3310fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3311    if packet.len() > MAX_MSG_ABSOLUTE {
3312        debug!("Drop over-sized packet ({})", packet.len());
3313        return;
3314    }
3315
3316    let addr: SocketAddr = match intf.addr {
3317        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3318        if_addrs::IfAddr::V6(_) => {
3319            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3320            sock.set_scope_id(intf.index.unwrap_or(0)); // Choose iface for multicast
3321            sock.into()
3322        }
3323    };
3324
3325    send_packet(packet, addr, intf, socket);
3326}
3327
3328/// Sends out `packet` to `addr` on the socket in `intf_sock`.
3329fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3330    match sock.send_to(packet, addr) {
3331        Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3332        Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3333    }
3334}
3335
3336/// Returns true if `name` is a valid instance name of format:
3337/// <instance>.<service_type>.<_udp|_tcp>.local.
3338/// Note: <instance> could contain '.' as well.
3339fn valid_instance_name(name: &str) -> bool {
3340    name.split('.').count() >= 5
3341}
3342
3343fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3344    monitors.retain(|sender| {
3345        if let Err(e) = sender.try_send(event.clone()) {
3346            debug!("notify_monitors: try_send: {}", &e);
3347            if matches!(e, TrySendError::Disconnected(_)) {
3348                return false; // This monitor is dropped.
3349            }
3350        }
3351        true
3352    });
3353}
3354
3355/// Check if all unique records passed "probing", and if yes, create a packet
3356/// to announce the service.
3357fn prepare_announce(
3358    info: &ServiceInfo,
3359    intf: &Interface,
3360    dns_registry: &mut DnsRegistry,
3361) -> Option<DnsOutgoing> {
3362    let intf_addrs = info.get_addrs_on_intf(intf);
3363    if intf_addrs.is_empty() {
3364        trace!("No valid addrs to add on intf {:?}", &intf);
3365        return None;
3366    }
3367
3368    // check if we changed our name due to conflicts.
3369    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3370        Some(new_name) => new_name,
3371        None => info.get_fullname(),
3372    };
3373
3374    debug!(
3375        "prepare to announce service {service_fullname} on {}: {}",
3376        &intf.name,
3377        &intf.ip()
3378    );
3379
3380    let mut probing_count = 0;
3381    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3382    let create_time = current_time_millis() + fastrand::u64(0..250);
3383
3384    out.add_answer_at_time(
3385        DnsPointer::new(
3386            info.get_type(),
3387            RRType::PTR,
3388            CLASS_IN,
3389            info.get_other_ttl(),
3390            service_fullname.to_string(),
3391        ),
3392        0,
3393    );
3394
3395    if let Some(sub) = info.get_subtype() {
3396        trace!("Adding subdomain {}", sub);
3397        out.add_answer_at_time(
3398            DnsPointer::new(
3399                sub,
3400                RRType::PTR,
3401                CLASS_IN,
3402                info.get_other_ttl(),
3403                service_fullname.to_string(),
3404            ),
3405            0,
3406        );
3407    }
3408
3409    // SRV records.
3410    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3411        Some(new_name) => new_name.to_string(),
3412        None => info.get_hostname().to_string(),
3413    };
3414
3415    let mut srv = DnsSrv::new(
3416        info.get_fullname(),
3417        CLASS_IN | CLASS_CACHE_FLUSH,
3418        info.get_host_ttl(),
3419        info.get_priority(),
3420        info.get_weight(),
3421        info.get_port(),
3422        hostname,
3423    );
3424
3425    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3426        srv.get_record_mut().set_new_name(new_name.to_string());
3427    }
3428
3429    if !info.requires_probe()
3430        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3431    {
3432        out.add_answer_at_time(srv, 0);
3433    } else {
3434        probing_count += 1;
3435    }
3436
3437    // TXT records.
3438
3439    let mut txt = DnsTxt::new(
3440        info.get_fullname(),
3441        CLASS_IN | CLASS_CACHE_FLUSH,
3442        info.get_other_ttl(),
3443        info.generate_txt(),
3444    );
3445
3446    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3447        txt.get_record_mut().set_new_name(new_name.to_string());
3448    }
3449
3450    if !info.requires_probe()
3451        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3452    {
3453        out.add_answer_at_time(txt, 0);
3454    } else {
3455        probing_count += 1;
3456    }
3457
3458    // Address records. (A and AAAA)
3459
3460    let hostname = info.get_hostname();
3461    for address in intf_addrs {
3462        let mut dns_addr = DnsAddress::new(
3463            hostname,
3464            ip_address_rr_type(&address),
3465            CLASS_IN | CLASS_CACHE_FLUSH,
3466            info.get_host_ttl(),
3467            address,
3468        );
3469
3470        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3471            dns_addr.get_record_mut().set_new_name(new_name.to_string());
3472        }
3473
3474        if !info.requires_probe()
3475            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3476        {
3477            out.add_answer_at_time(dns_addr, 0);
3478        } else {
3479            probing_count += 1;
3480        }
3481    }
3482
3483    if probing_count > 0 {
3484        return None;
3485    }
3486
3487    Some(out)
3488}
3489
3490/// Send an unsolicited response for owned service via `intf` and `sock`.
3491/// Returns true if sent out successfully.
3492fn announce_service_on_intf(
3493    dns_registry: &mut DnsRegistry,
3494    info: &ServiceInfo,
3495    intf: &Interface,
3496    sock: &MioUdpSocket,
3497) -> bool {
3498    if let Some(out) = prepare_announce(info, intf, dns_registry) {
3499        send_dns_outgoing(&out, intf, sock);
3500        return true;
3501    }
3502    false
3503}
3504
3505/// Returns a new name based on the `original` to avoid conflicts.
3506/// If the name already contains a number in parentheses, increments that number.
3507///
3508/// Examples:
3509/// - `foo.local.` becomes `foo (2).local.`
3510/// - `foo (2).local.` becomes `foo (3).local.`
3511/// - `foo (9)` becomes `foo (10)`
3512fn name_change(original: &str) -> String {
3513    let mut parts: Vec<_> = original.split('.').collect();
3514    let Some(first_part) = parts.get_mut(0) else {
3515        return format!("{original} (2)");
3516    };
3517
3518    let mut new_name = format!("{} (2)", first_part);
3519
3520    // check if there is already has `(<num>)` suffix.
3521    if let Some(paren_pos) = first_part.rfind(" (") {
3522        // Check if there's a closing parenthesis
3523        if let Some(end_paren) = first_part[paren_pos..].find(')') {
3524            let absolute_end_pos = paren_pos + end_paren;
3525            // Only process if the closing parenthesis is the last character
3526            if absolute_end_pos == first_part.len() - 1 {
3527                let num_start = paren_pos + 2; // Skip " ("
3528                                               // Try to parse the number between parentheses
3529                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3530                    let base_name = &first_part[..paren_pos];
3531                    new_name = format!("{} ({})", base_name, number + 1)
3532                }
3533            }
3534        }
3535    }
3536
3537    *first_part = &new_name;
3538    parts.join(".")
3539}
3540
3541/// Returns a new name based on the `original` to avoid conflicts.
3542/// If the name already contains a hyphenated number, increments that number.
3543///
3544/// Examples:
3545/// - `foo.local.` becomes `foo-2.local.`
3546/// - `foo-2.local.` becomes `foo-3.local.`
3547/// - `foo` becomes `foo-2`
3548fn hostname_change(original: &str) -> String {
3549    let mut parts: Vec<_> = original.split('.').collect();
3550    let Some(first_part) = parts.get_mut(0) else {
3551        return format!("{original}-2");
3552    };
3553
3554    let mut new_name = format!("{}-2", first_part);
3555
3556    // check if there is already a `-<num>` suffix
3557    if let Some(hyphen_pos) = first_part.rfind('-') {
3558        // Try to parse everything after the hyphen as a number
3559        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3560            let base_name = &first_part[..hyphen_pos];
3561            new_name = format!("{}-{}", base_name, number + 1);
3562        }
3563    }
3564
3565    *first_part = &new_name;
3566    parts.join(".")
3567}
3568
3569fn add_answer_with_additionals(
3570    out: &mut DnsOutgoing,
3571    msg: &DnsIncoming,
3572    service: &ServiceInfo,
3573    intf: &Interface,
3574    dns_registry: &DnsRegistry,
3575) {
3576    let intf_addrs = service.get_addrs_on_intf(intf);
3577    if intf_addrs.is_empty() {
3578        trace!("No addrs on LAN of intf {:?}", intf);
3579        return;
3580    }
3581
3582    // check if we changed our name due to conflicts.
3583    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3584        Some(new_name) => new_name,
3585        None => service.get_fullname(),
3586    };
3587
3588    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3589        Some(new_name) => new_name,
3590        None => service.get_hostname(),
3591    };
3592
3593    let ptr_added = out.add_answer(
3594        msg,
3595        DnsPointer::new(
3596            service.get_type(),
3597            RRType::PTR,
3598            CLASS_IN,
3599            service.get_other_ttl(),
3600            service_fullname.to_string(),
3601        ),
3602    );
3603
3604    if !ptr_added {
3605        trace!("answer was not added for msg {:?}", msg);
3606        return;
3607    }
3608
3609    if let Some(sub) = service.get_subtype() {
3610        trace!("Adding subdomain {}", sub);
3611        out.add_additional_answer(DnsPointer::new(
3612            sub,
3613            RRType::PTR,
3614            CLASS_IN,
3615            service.get_other_ttl(),
3616            service_fullname.to_string(),
3617        ));
3618    }
3619
3620    // Add recommended additional answers according to
3621    // https://tools.ietf.org/html/rfc6763#section-12.1.
3622    out.add_additional_answer(DnsSrv::new(
3623        service_fullname,
3624        CLASS_IN | CLASS_CACHE_FLUSH,
3625        service.get_host_ttl(),
3626        service.get_priority(),
3627        service.get_weight(),
3628        service.get_port(),
3629        hostname.to_string(),
3630    ));
3631
3632    out.add_additional_answer(DnsTxt::new(
3633        service_fullname,
3634        CLASS_IN | CLASS_CACHE_FLUSH,
3635        service.get_host_ttl(),
3636        service.generate_txt(),
3637    ));
3638
3639    for address in intf_addrs {
3640        out.add_additional_answer(DnsAddress::new(
3641            hostname,
3642            ip_address_rr_type(&address),
3643            CLASS_IN | CLASS_CACHE_FLUSH,
3644            service.get_host_ttl(),
3645            address,
3646        ));
3647    }
3648}
3649
3650#[cfg(test)]
3651mod tests {
3652    use super::{
3653        check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3654        name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3655        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3656        MDNS_PORT,
3657    };
3658    use crate::{
3659        dns_parser::{DnsOutgoing, DnsPointer, RRType, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE},
3660        service_daemon::check_hostname,
3661    };
3662    use std::{
3663        net::{SocketAddr, SocketAddrV4},
3664        time::Duration,
3665    };
3666    use test_log::test;
3667
3668    #[test]
3669    fn test_socketaddr_print() {
3670        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3671        let print = format!("{}", addr);
3672        assert_eq!(print, "224.0.0.251:5353");
3673    }
3674
3675    #[test]
3676    fn test_instance_name() {
3677        assert!(valid_instance_name("my-laser._printer._tcp.local."));
3678        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3679        assert!(!valid_instance_name("_printer._tcp.local."));
3680    }
3681
3682    #[test]
3683    fn test_check_service_name_length() {
3684        let result = check_service_name_length("_tcp", 100);
3685        assert!(result.is_err());
3686        if let Err(e) = result {
3687            println!("{}", e);
3688        }
3689    }
3690
3691    #[test]
3692    fn test_check_hostname() {
3693        // valid hostnames
3694        for hostname in &[
3695            "my_host.local.",
3696            &("A".repeat(255 - ".local.".len()) + ".local."),
3697        ] {
3698            let result = check_hostname(hostname);
3699            assert!(result.is_ok());
3700        }
3701
3702        // erroneous hostnames
3703        for hostname in &[
3704            "my_host.local",
3705            ".local.",
3706            &("A".repeat(256 - ".local.".len()) + ".local."),
3707        ] {
3708            let result = check_hostname(hostname);
3709            assert!(result.is_err());
3710            if let Err(e) = result {
3711                println!("{}", e);
3712            }
3713        }
3714    }
3715
3716    #[test]
3717    fn test_check_domain_suffix() {
3718        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3719        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3720        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3721        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3722        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3723        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3724    }
3725
3726    #[test]
3727    fn test_service_with_temporarily_invalidated_ptr() {
3728        // Create a daemon
3729        let d = ServiceDaemon::new().expect("Failed to create daemon");
3730
3731        let service = "_test_inval_ptr._udp.local.";
3732        let host_name = "my_host_tmp_invalidated_ptr.local.";
3733        let intfs: Vec<_> = my_ip_interfaces(false);
3734        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3735        let port = 5201;
3736        let my_service =
3737            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3738                .expect("invalid service info")
3739                .enable_addr_auto();
3740        let result = d.register(my_service.clone());
3741        assert!(result.is_ok());
3742
3743        // Browse for a service
3744        let browse_chan = d.browse(service).unwrap();
3745        let timeout = Duration::from_secs(2);
3746        let mut resolved = false;
3747
3748        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3749            match event {
3750                ServiceEvent::ServiceResolved(info) => {
3751                    resolved = true;
3752                    println!("Resolved a service of {}", &info.get_fullname());
3753                    break;
3754                }
3755                e => {
3756                    println!("Received event {:?}", e);
3757                }
3758            }
3759        }
3760
3761        assert!(resolved);
3762
3763        println!("Stopping browse of {}", service);
3764        // Pause browsing so restarting will cause a new immediate query.
3765        // Unregistering will not work here, it will invalidate all the records.
3766        d.stop_browse(service).unwrap();
3767
3768        // Ensure the search is stopped.
3769        // Reduces the chance of receiving an answer adding the ptr back to the
3770        // cache causing the later browse to return directly from the cache.
3771        // (which invalidates what this test is trying to test for.)
3772        let mut stopped = false;
3773        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3774            match event {
3775                ServiceEvent::SearchStopped(_) => {
3776                    stopped = true;
3777                    println!("Stopped browsing service");
3778                    break;
3779                }
3780                // Other `ServiceResolved` messages may be received
3781                // here as they come from different interfaces.
3782                // That's fine for this test.
3783                e => {
3784                    println!("Received event {:?}", e);
3785                }
3786            }
3787        }
3788
3789        assert!(stopped);
3790
3791        // Invalidate the ptr from the service to the host.
3792        let invalidate_ptr_packet = DnsPointer::new(
3793            my_service.get_type(),
3794            RRType::PTR,
3795            CLASS_IN,
3796            0,
3797            my_service.get_fullname().to_string(),
3798        );
3799
3800        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3801        packet_buffer.add_additional_answer(invalidate_ptr_packet);
3802
3803        for intf in intfs {
3804            let sock = new_socket_bind(&intf, true).unwrap();
3805            send_dns_outgoing(&packet_buffer, &intf, &sock);
3806        }
3807
3808        println!(
3809            "Sent PTR record invalidation. Starting second browse for {}",
3810            service
3811        );
3812
3813        // Restart the browse to force the sender to re-send the announcements.
3814        let browse_chan = d.browse(service).unwrap();
3815
3816        resolved = false;
3817        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3818            match event {
3819                ServiceEvent::ServiceResolved(info) => {
3820                    resolved = true;
3821                    println!("Resolved a service of {}", &info.get_fullname());
3822                    break;
3823                }
3824                e => {
3825                    println!("Received event {:?}", e);
3826                }
3827            }
3828        }
3829
3830        assert!(resolved);
3831        d.shutdown().unwrap();
3832    }
3833
3834    #[test]
3835    fn test_expired_srv() {
3836        // construct service info
3837        let service_type = "_expired-srv._udp.local.";
3838        let instance = "test_instance";
3839        let host_name = "expired_srv_host.local.";
3840        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3841            .unwrap()
3842            .enable_addr_auto();
3843        // let fullname = my_service.get_fullname().to_string();
3844
3845        // set SRV to expire soon.
3846        let new_ttl = 3; // for testing only.
3847        my_service._set_host_ttl(new_ttl);
3848
3849        // register my service
3850        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3851        let result = mdns_server.register(my_service);
3852        assert!(result.is_ok());
3853
3854        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3855        let browse_chan = mdns_client.browse(service_type).unwrap();
3856        let timeout = Duration::from_secs(2);
3857        let mut resolved = false;
3858
3859        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3860            match event {
3861                ServiceEvent::ServiceResolved(info) => {
3862                    resolved = true;
3863                    println!("Resolved a service of {}", &info.get_fullname());
3864                    break;
3865                }
3866                _ => {}
3867            }
3868        }
3869
3870        assert!(resolved);
3871
3872        // Exit the server so that no more responses.
3873        mdns_server.shutdown().unwrap();
3874
3875        // SRV record in the client cache will expire.
3876        let expire_timeout = Duration::from_secs(new_ttl as u64);
3877        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3878            match event {
3879                ServiceEvent::ServiceRemoved(service_type, full_name) => {
3880                    println!("Service removed: {}: {}", &service_type, &full_name);
3881                    break;
3882                }
3883                _ => {}
3884            }
3885        }
3886    }
3887
3888    #[test]
3889    fn test_hostname_resolution_address_removed() {
3890        // Create a mDNS server
3891        let server = ServiceDaemon::new().expect("Failed to create server");
3892        let hostname = "addr_remove_host._tcp.local.";
3893        let service_ip_addr = my_ip_interfaces(false)
3894            .iter()
3895            .find(|iface| iface.ip().is_ipv4())
3896            .map(|iface| iface.ip())
3897            .unwrap();
3898
3899        let mut my_service = ServiceInfo::new(
3900            "_host_res_test._tcp.local.",
3901            "my_instance",
3902            hostname,
3903            &service_ip_addr,
3904            1234,
3905            None,
3906        )
3907        .expect("invalid service info");
3908
3909        // Set a short TTL for addresses for testing.
3910        let addr_ttl = 2;
3911        my_service._set_host_ttl(addr_ttl); // Expire soon
3912
3913        server.register(my_service).unwrap();
3914
3915        // Create a mDNS client for resolving the hostname.
3916        let client = ServiceDaemon::new().expect("Failed to create client");
3917        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3918        let resolved = loop {
3919            match event_receiver.recv() {
3920                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3921                    assert!(found_hostname == hostname);
3922                    assert!(addresses.contains(&service_ip_addr));
3923                    println!("address found: {:?}", &addresses);
3924                    break true;
3925                }
3926                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3927                Ok(_event) => {}
3928                Err(_) => break false,
3929            }
3930        };
3931
3932        assert!(resolved);
3933
3934        // Shutdown the server so no more responses / refreshes for addresses.
3935        server.shutdown().unwrap();
3936
3937        // Wait till hostname address record expires, with 1 second grace period.
3938        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3939        let removed = loop {
3940            match event_receiver.recv_timeout(timeout) {
3941                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3942                    assert!(removed_host == hostname);
3943                    assert!(addresses.contains(&service_ip_addr));
3944
3945                    println!(
3946                        "address removed: hostname: {} addresses: {:?}",
3947                        &hostname, &addresses
3948                    );
3949                    break true;
3950                }
3951                Ok(_event) => {}
3952                Err(_) => {
3953                    break false;
3954                }
3955            }
3956        };
3957
3958        assert!(removed);
3959
3960        client.shutdown().unwrap();
3961    }
3962
3963    #[test]
3964    fn test_refresh_ptr() {
3965        // construct service info
3966        let service_type = "_refresh-ptr._udp.local.";
3967        let instance = "test_instance";
3968        let host_name = "refresh_ptr_host.local.";
3969        let service_ip_addr = my_ip_interfaces(false)
3970            .iter()
3971            .find(|iface| iface.ip().is_ipv4())
3972            .map(|iface| iface.ip())
3973            .unwrap();
3974
3975        let mut my_service = ServiceInfo::new(
3976            service_type,
3977            instance,
3978            host_name,
3979            &service_ip_addr,
3980            5023,
3981            None,
3982        )
3983        .unwrap();
3984
3985        let new_ttl = 3; // for testing only.
3986        my_service._set_other_ttl(new_ttl);
3987
3988        // register my service
3989        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3990        let result = mdns_server.register(my_service);
3991        assert!(result.is_ok());
3992
3993        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3994        let browse_chan = mdns_client.browse(service_type).unwrap();
3995        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
3996        let mut resolved = false;
3997
3998        // resolve the service first.
3999        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4000            match event {
4001                ServiceEvent::ServiceResolved(info) => {
4002                    resolved = true;
4003                    println!("Resolved a service of {}", &info.get_fullname());
4004                    break;
4005                }
4006                _ => {}
4007            }
4008        }
4009
4010        assert!(resolved);
4011
4012        // wait over 80% of TTL, and refresh PTR should be sent out.
4013        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4014        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4015            println!("event: {:?}", &event);
4016        }
4017
4018        // verify refresh counter.
4019        let metrics_chan = mdns_client.get_metrics().unwrap();
4020        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4021        let refresh_counter = metrics["cache-refresh-ptr"];
4022        assert_eq!(refresh_counter, 1);
4023
4024        // Exit the server so that no more responses.
4025        mdns_server.shutdown().unwrap();
4026        mdns_client.shutdown().unwrap();
4027    }
4028
4029    #[test]
4030    fn test_name_change() {
4031        assert_eq!(name_change("foo.local."), "foo (2).local.");
4032        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4033        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4034        assert_eq!(name_change("foo"), "foo (2)");
4035        assert_eq!(name_change("foo (2)"), "foo (3)");
4036        assert_eq!(name_change(""), " (2)");
4037
4038        // Additional edge cases
4039        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4040        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4041        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4042    }
4043
4044    #[test]
4045    fn test_hostname_change() {
4046        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4047        assert_eq!(hostname_change("foo"), "foo-2");
4048        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4049        assert_eq!(hostname_change("foo-9"), "foo-10");
4050        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4051    }
4052}