mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42    Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50    cmp::{self, Reverse},
51    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52    fmt, io,
53    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54    str, thread,
55    time::Duration,
56    vec,
57};
58
59/// The default max length of the service name without domain, not including the
60/// leading underscore (`_`). It is set to 15 per
61/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
62pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64/// The default interval for checking IP changes automatically.
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
68/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
69pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78/// Response status code for the service `unregister` call.
79#[derive(Debug)]
80pub enum UnregisterStatus {
81    /// Unregister was successful.
82    OK,
83    /// The service was not found in the registration.
84    NotFound,
85}
86
87/// Status code for the service daemon.
88#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91    /// The daemon is running as normal.
92    Running,
93
94    /// The daemon has been shutdown.
95    Shutdown,
96}
97
98/// Different counters included in the metrics.
99/// Currently all counters are for outgoing packets.
100#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102    Register,
103    RegisterResend,
104    Unregister,
105    UnregisterResend,
106    Browse,
107    ResolveHostname,
108    Respond,
109    CacheRefreshPTR,
110    CacheRefreshSrvTxt,
111    CacheRefreshAddr,
112    KnownAnswerSuppression,
113    CachedPTR,
114    CachedSRV,
115    CachedAddr,
116    CachedTxt,
117    CachedNSec,
118    CachedSubtype,
119    DnsRegistryProbe,
120    DnsRegistryActive,
121    DnsRegistryTimer,
122    DnsRegistryNameChange,
123    Timer,
124}
125
126impl fmt::Display for Counter {
127    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128        match self {
129            Self::Register => write!(f, "register"),
130            Self::RegisterResend => write!(f, "register-resend"),
131            Self::Unregister => write!(f, "unregister"),
132            Self::UnregisterResend => write!(f, "unregister-resend"),
133            Self::Browse => write!(f, "browse"),
134            Self::ResolveHostname => write!(f, "resolve-hostname"),
135            Self::Respond => write!(f, "respond"),
136            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140            Self::CachedPTR => write!(f, "cached-ptr"),
141            Self::CachedSRV => write!(f, "cached-srv"),
142            Self::CachedAddr => write!(f, "cached-addr"),
143            Self::CachedTxt => write!(f, "cached-txt"),
144            Self::CachedNSec => write!(f, "cached-nsec"),
145            Self::CachedSubtype => write!(f, "cached-subtype"),
146            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150            Self::Timer => write!(f, "timer"),
151        }
152    }
153}
154
155/// A wrapper around UDP socket used by the mDNS daemon.
156///
157/// We do this because `mio` does not support PKTINFO and
158/// does not provide a way to implement `Source` trait directly and safely.
159struct MyUdpSocket {
160    /// The underlying socket that supports control messages like
161    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
162    pktinfo: PktInfoUdpSocket,
163
164    /// The mio UDP socket that is a clone of `pktinfo` and
165    /// is used for event polling.
166    mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171        let std_sock = pktinfo.try_clone_std()?;
172        let mio = MioUdpSocket::from_std(std_sock);
173
174        Ok(Self { pktinfo, mio })
175    }
176}
177
178/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
179impl Source for MyUdpSocket {
180    fn register(
181        &mut self,
182        registry: &Registry,
183        token: Token,
184        interests: Interest,
185    ) -> io::Result<()> {
186        self.mio.register(registry, token, interests)
187    }
188
189    fn reregister(
190        &mut self,
191        registry: &Registry,
192        token: Token,
193        interests: Interest,
194    ) -> io::Result<()> {
195        self.mio.reregister(registry, token, interests)
196    }
197
198    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199        self.mio.deregister(registry)
200    }
201}
202
203/// The metrics is a HashMap of (name_key, i64_value).
204/// The main purpose is to help monitoring the mDNS packet traffic.
205pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
208const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
209const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
210
211/// A daemon thread for mDNS
212///
213/// This struct provides a handle and an API to the daemon. It is cloneable.
214#[derive(Clone)]
215pub struct ServiceDaemon {
216    /// Sender handle of the channel to the daemon.
217    sender: Sender<Command>,
218
219    /// Send to this addr to signal that a `Command` is coming.
220    ///
221    /// The daemon listens on this addr together with other mDNS sockets,
222    /// to avoid busy polling the flume channel. If there is a way to poll
223    /// the channel and mDNS sockets together, then this can be removed.
224    signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228    /// Creates a new daemon and spawns a thread to run the daemon.
229    ///
230    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
231    /// ask callers to set the port.
232    pub fn new() -> Result<Self> {
233        // Use port 0 to allow the system assign a random available port,
234        // no need for a pre-defined port number.
235        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237        let signal_sock = UdpSocket::bind(signal_addr)
238            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240        // Get the socket with the OS chosen port
241        let signal_addr = signal_sock
242            .local_addr()
243            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245        // Must be nonblocking so we can listen to it together with mDNS sockets.
246        signal_sock
247            .set_nonblocking(true)
248            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252        let (sender, receiver) = bounded(100);
253
254        // Spawn the daemon thread
255        let mio_sock = MioUdpSocket::from_std(signal_sock);
256        thread::Builder::new()
257            .name("mDNS_daemon".to_string())
258            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261        Ok(Self {
262            sender,
263            signal_addr,
264        })
265    }
266
267    /// Sends `cmd` to the daemon via its channel, and sends a signal
268    /// to its sock addr to notify.
269    fn send_cmd(&self, cmd: Command) -> Result<()> {
270        let cmd_name = cmd.to_string();
271
272        // First, send to the flume channel.
273        self.sender.try_send(cmd).map_err(|e| match e {
274            TrySendError::Full(_) => Error::Again,
275            e => e_fmt!("flume::channel::send failed: {}", e),
276        })?;
277
278        // Second, send a signal to notify the daemon.
279        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280        let socket = UdpSocket::bind(addr)
281            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282        socket
283            .send_to(cmd_name.as_bytes(), self.signal_addr)
284            .map_err(|e| {
285                e_fmt!(
286                    "signal socket send_to {} ({}) failed: {}",
287                    self.signal_addr,
288                    cmd_name,
289                    e
290                )
291            })?;
292
293        Ok(())
294    }
295
296    /// Starts browsing for a specific service type.
297    ///
298    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
299    ///
300    /// Returns a channel `Receiver` to receive events about the service. The caller
301    /// can call `.recv_async().await` on this receiver to handle events in an
302    /// async environment or call `.recv()` in a sync environment.
303    ///
304    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
305    /// finding more details, i.e. SRV records and TXT records.
306    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307        check_domain_suffix(service_type)?;
308
309        let (resp_s, resp_r) = bounded(10);
310        self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
311        Ok(resp_r)
312    }
313
314    /// Stops searching for a specific service type.
315    ///
316    /// When an error is returned, the caller should retry only when
317    /// the error is `Error::Again`, otherwise should log and move on.
318    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
319        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
320    }
321
322    /// Starts querying for the ip addresses of a hostname.
323    ///
324    /// Returns a channel `Receiver` to receive events about the hostname.
325    /// The caller can call `.recv_async().await` on this receiver to handle events in an
326    /// async environment or call `.recv()` in a sync environment.
327    ///
328    /// The `timeout` is specified in milliseconds.
329    pub fn resolve_hostname(
330        &self,
331        hostname: &str,
332        timeout: Option<u64>,
333    ) -> Result<Receiver<HostnameResolutionEvent>> {
334        check_hostname(hostname)?;
335        let (resp_s, resp_r) = bounded(10);
336        self.send_cmd(Command::ResolveHostname(
337            hostname.to_string(),
338            1,
339            resp_s,
340            timeout,
341        ))?;
342        Ok(resp_r)
343    }
344
345    /// Stops querying for the ip addresses of a hostname.
346    ///
347    /// When an error is returned, the caller should retry only when
348    /// the error is `Error::Again`, otherwise should log and move on.
349    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
350        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
351    }
352
353    /// Registers a service provided by this host.
354    ///
355    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
356    /// this method will automatically fill in addresses from the host.
357    ///
358    /// To re-announce a service with an updated `service_info`, just call
359    /// this `register` function again. No need to call `unregister` first.
360    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
361        check_service_name(service_info.get_fullname())?;
362        check_hostname(service_info.get_hostname())?;
363
364        self.send_cmd(Command::Register(service_info))
365    }
366
367    /// Unregisters a service. This is a graceful shutdown of a service.
368    ///
369    /// Returns a channel receiver that is used to receive the status code
370    /// of the unregister.
371    ///
372    /// When an error is returned, the caller should retry only when
373    /// the error is `Error::Again`, otherwise should log and move on.
374    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
375        let (resp_s, resp_r) = bounded(1);
376        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
377        Ok(resp_r)
378    }
379
380    /// Starts to monitor events from the daemon.
381    ///
382    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
383    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
384        let (resp_s, resp_r) = bounded(100);
385        self.send_cmd(Command::Monitor(resp_s))?;
386        Ok(resp_r)
387    }
388
389    /// Shuts down the daemon thread and returns a channel to receive the status.
390    ///
391    /// When an error is returned, the caller should retry only when
392    /// the error is `Error::Again`, otherwise should log and move on.
393    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
394        let (resp_s, resp_r) = bounded(1);
395        self.send_cmd(Command::Exit(resp_s))?;
396        Ok(resp_r)
397    }
398
399    /// Returns the status of the daemon.
400    ///
401    /// When an error is returned, the caller should retry only when
402    /// the error is `Error::Again`, otherwise should consider the daemon
403    /// stopped working and move on.
404    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
405        let (resp_s, resp_r) = bounded(1);
406
407        if self.sender.is_disconnected() {
408            resp_s
409                .send(DaemonStatus::Shutdown)
410                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
411        } else {
412            self.send_cmd(Command::GetStatus(resp_s))?;
413        }
414
415        Ok(resp_r)
416    }
417
418    /// Returns a channel receiver for the metrics, e.g. input/output counters.
419    ///
420    /// The metrics returned is a snapshot. Hence the caller should call
421    /// this method repeatedly if they want to monitor the metrics continuously.
422    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
423        let (resp_s, resp_r) = bounded(1);
424        self.send_cmd(Command::GetMetrics(resp_s))?;
425        Ok(resp_r)
426    }
427
428    /// Change the max length allowed for a service name.
429    ///
430    /// As RFC 6763 defines a length max for a service name, a user should not call
431    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
432    ///
433    /// `len_max` is capped at an internal limit, which is currently 30.
434    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
435        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
436
437        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
438            return Err(Error::Msg(format!(
439                "service name length max {len_max} is too large"
440            )));
441        }
442
443        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
444    }
445
446    /// Change the interval for checking IP changes automatically.
447    ///
448    /// Setting the interval to 0 disables the IP check.
449    ///
450    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
451    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
452        let interval_in_millis = interval_in_secs as u64 * 1000;
453        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
454            interval_in_millis,
455        )))
456    }
457
458    /// Get the current interval in seconds for checking IP changes automatically.
459    pub fn get_ip_check_interval(&self) -> Result<u32> {
460        let (resp_s, resp_r) = bounded(1);
461        self.send_cmd(Command::GetOption(resp_s))?;
462
463        let option = resp_r
464            .recv_timeout(Duration::from_secs(10))
465            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
466        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
467        Ok(ip_check_interval_in_secs as u32)
468    }
469
470    /// Include interfaces that match `if_kind` for this service daemon.
471    ///
472    /// For example:
473    /// ```ignore
474    ///     daemon.enable_interface("en0")?;
475    /// ```
476    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
477        let if_kind_vec = if_kind.into_vec();
478        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
479            if_kind_vec.kinds,
480        )))
481    }
482
483    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
484    ///
485    /// For example:
486    /// ```ignore
487    ///     daemon.disable_interface(IfKind::IPv6)?;
488    /// ```
489    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
490        let if_kind_vec = if_kind.into_vec();
491        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
492            if_kind_vec.kinds,
493        )))
494    }
495
496    #[cfg(test)]
497    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
498        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
499            ifname.to_string(),
500        )))
501    }
502
503    #[cfg(test)]
504    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
505        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
506            ifname.to_string(),
507        )))
508    }
509
510    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
511    ///
512    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
513    /// receive announcements from a responder on the same host.
514    ///
515    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
516    ///
517    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
518    /// the UNIX version of the IP_MULTICAST_LOOP option:
519    ///
520    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
521    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
522    ///
523    /// Which means, in order NOT to receive localhost announcements, you want to call
524    /// this API on the querier side on Windows, but on the responder side on Unix.
525    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
526        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
527    }
528
529    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
530    ///
531    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
532    /// receive announcements from a responder on the same host.
533    ///
534    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
535    ///
536    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
537    /// the UNIX version of the IP_MULTICAST_LOOP option:
538    ///
539    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
540    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
541    ///
542    /// Which means, in order NOT to receive localhost announcements, you want to call
543    /// this API on the querier side on Windows, but on the responder side on Unix.
544    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
545        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
546    }
547
548    /// Proactively confirms whether a service instance still valid.
549    ///
550    /// This call will issue queries for a service instance's SRV record and Address records.
551    ///
552    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
553    /// unless there is a reason not to follow RFC.
554    ///
555    /// If no response is received within `timeout`, the current resource
556    /// records will be flushed, and if needed, `ServiceRemoved` event will be
557    /// sent to active queriers.
558    ///
559    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
560    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
561        self.send_cmd(Command::Verify(instance_fullname, timeout))
562    }
563
564    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
565        let mut zc = Zeroconf::new(signal_sock, poller);
566
567        if let Some(cmd) = zc.run(receiver) {
568            match cmd {
569                Command::Exit(resp_s) => {
570                    // It is guaranteed that the receiver already dropped,
571                    // i.e. the daemon command channel closed.
572                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
573                        debug!("exit: failed to send response of shutdown: {}", e);
574                    }
575                }
576                _ => {
577                    debug!("Unexpected command: {:?}", cmd);
578                }
579            }
580        }
581    }
582}
583
584/// Creates a new UDP socket that uses `intf` to send and recv multicast.
585fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
586    // Use the same socket for receiving and sending multicast packets.
587    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
588    let intf_ip = &intf.ip();
589    match intf_ip {
590        IpAddr::V4(ip) => {
591            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
592            let sock = new_socket(addr.into(), true)?;
593
594            // Join mDNS group to receive packets.
595            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
596                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
597
598            // Set IP_MULTICAST_IF to send packets.
599            sock.set_multicast_if_v4(ip)
600                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
601
602            // Per RFC 6762 section 11:
603            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
604            // be sent with IP TTL set to 255."
605            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
606            sock.set_multicast_ttl_v4(255)
607                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
608
609            if !should_loop {
610                sock.set_multicast_loop_v4(false)
611                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
612            }
613
614            // Test if we can send packets successfully.
615            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
616            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
617            for packet in test_packets {
618                sock.send_to(&packet, &multicast_addr)
619                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
620            }
621            MyUdpSocket::new(sock)
622                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
623        }
624        IpAddr::V6(ip) => {
625            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
626            let sock = new_socket(addr.into(), true)?;
627
628            let if_index = intf.index.unwrap_or(0);
629
630            // Join mDNS group to receive packets.
631            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
632                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
633
634            // Set IPV6_MULTICAST_IF to send packets.
635            sock.set_multicast_if_v6(if_index)
636                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
637
638            // We are not sending multicast packets to test this socket as there might
639            // be many IPv6 interfaces on a host and could cause such send error:
640            // "No buffer space available (os error 55)".
641
642            MyUdpSocket::new(sock)
643                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
644        }
645    }
646}
647
648/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
649/// `non_block` indicates whether to set O_NONBLOCK for the socket.
650fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
651    let domain = match addr {
652        SocketAddr::V4(_) => socket2::Domain::IPV4,
653        SocketAddr::V6(_) => socket2::Domain::IPV6,
654    };
655
656    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
657
658    fd.set_reuse_address(true)
659        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
660    #[cfg(unix)] // this is currently restricted to Unix's in socket2
661    fd.set_reuse_port(true)
662        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
663
664    if non_block {
665        fd.set_nonblocking(true)
666            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
667    }
668
669    fd.bind(&addr.into())
670        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
671
672    trace!("new socket bind to {}", &addr);
673    Ok(fd)
674}
675
676/// Specify a UNIX timestamp in millis to run `command` for the next time.
677struct ReRun {
678    /// UNIX timestamp in millis.
679    next_time: u64,
680    command: Command,
681}
682
683/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
684///
685/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
686#[derive(Debug, Clone)]
687#[non_exhaustive]
688pub enum IfKind {
689    /// All interfaces.
690    All,
691
692    /// All IPv4 interfaces.
693    IPv4,
694
695    /// All IPv6 interfaces.
696    IPv6,
697
698    /// By the interface name, for example "en0"
699    Name(String),
700
701    /// By an IPv4 or IPv6 address.
702    Addr(IpAddr),
703
704    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
705    ///
706    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
707    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
708    LoopbackV4,
709
710    /// ::1/128, disabled by default.
711    LoopbackV6,
712}
713
714impl IfKind {
715    /// Checks if `intf` matches with this interface kind.
716    fn matches(&self, intf: &Interface) -> bool {
717        match self {
718            Self::All => true,
719            Self::IPv4 => intf.ip().is_ipv4(),
720            Self::IPv6 => intf.ip().is_ipv6(),
721            Self::Name(ifname) => ifname == &intf.name,
722            Self::Addr(addr) => addr == &intf.ip(),
723            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
724            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
725        }
726    }
727}
728
729/// The first use case of specifying an interface was to
730/// use an interface name. Hence adding this for ergonomic reasons.
731impl From<&str> for IfKind {
732    fn from(val: &str) -> Self {
733        Self::Name(val.to_string())
734    }
735}
736
737impl From<&String> for IfKind {
738    fn from(val: &String) -> Self {
739        Self::Name(val.to_string())
740    }
741}
742
743/// Still for ergonomic reasons.
744impl From<IpAddr> for IfKind {
745    fn from(val: IpAddr) -> Self {
746        Self::Addr(val)
747    }
748}
749
750/// A list of `IfKind` that can be used to match interfaces.
751pub struct IfKindVec {
752    kinds: Vec<IfKind>,
753}
754
755/// A trait that converts a type into a Vec of `IfKind`.
756pub trait IntoIfKindVec {
757    fn into_vec(self) -> IfKindVec;
758}
759
760impl<T: Into<IfKind>> IntoIfKindVec for T {
761    fn into_vec(self) -> IfKindVec {
762        let if_kind: IfKind = self.into();
763        IfKindVec {
764            kinds: vec![if_kind],
765        }
766    }
767}
768
769impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
770    fn into_vec(self) -> IfKindVec {
771        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
772        IfKindVec { kinds }
773    }
774}
775
776/// Selection of interfaces.
777struct IfSelection {
778    /// The interfaces to be selected.
779    if_kind: IfKind,
780
781    /// Whether the `if_kind` should be enabled or not.
782    selected: bool,
783}
784
785/// A struct holding the state. It was inspired by `zeroconf` package in Python.
786struct Zeroconf {
787    /// Local interfaces keyed by interface index.
788    my_intfs: HashMap<u32, MyIntf>,
789
790    /// A common socket for IPv4 interfaces.
791    ipv4_sock: MyUdpSocket,
792
793    /// A common socket for IPv6 interfaces.
794    ipv6_sock: MyUdpSocket,
795
796    /// Local registered services, keyed by service full names.
797    my_services: HashMap<String, ServiceInfo>,
798
799    /// Received DNS records.
800    cache: DnsCache,
801
802    /// Registered service records, keyed by interface index.
803    dns_registry_map: HashMap<u32, DnsRegistry>,
804
805    /// Active "Browse" commands.
806    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
807
808    /// Active "ResolveHostname" commands.
809    ///
810    /// The timestamps are set at the future timestamp when the command should timeout.
811    /// `hostname` is case-insensitive and stored in lowercase.
812    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
813
814    /// All repeating transmissions.
815    retransmissions: Vec<ReRun>,
816
817    counters: Metrics,
818
819    /// Waits for incoming packets.
820    poller: Poll,
821
822    /// Channels to notify events.
823    monitors: Vec<Sender<DaemonEvent>>,
824
825    /// Options
826    service_name_len_max: u8,
827
828    /// Interval in millis to check IP address changes.
829    ip_check_interval: u64,
830
831    /// All interface selections called to the daemon.
832    if_selections: Vec<IfSelection>,
833
834    /// Socket for signaling.
835    signal_sock: MioUdpSocket,
836
837    /// Timestamps marking where we need another iteration of the run loop,
838    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
839    ///
840    /// When the run loop goes through a single iteration, it will
841    /// set its timeout to the earliest timer in this list.
842    timers: BinaryHeap<Reverse<u64>>,
843
844    status: DaemonStatus,
845
846    /// Service instances that are pending for resolving SRV and TXT.
847    pending_resolves: HashSet<String>,
848
849    /// Service instances that are already resolved.
850    resolved: HashSet<String>,
851
852    multicast_loop_v4: bool,
853
854    multicast_loop_v6: bool,
855
856    #[cfg(test)]
857    test_down_interfaces: HashSet<String>,
858}
859
860/// Join the multicast group for the given interface.
861fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
862    let intf_ip = &intf.ip();
863    match intf_ip {
864        IpAddr::V4(ip) => {
865            // Join mDNS group to receive packets.
866            debug!("join multicast group V4 on addr {}", ip);
867            my_sock
868                .join_multicast_v4(&GROUP_ADDR_V4, ip)
869                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
870        }
871        IpAddr::V6(ip) => {
872            let if_index = intf.index.unwrap_or(0);
873            // Join mDNS group to receive packets.
874            debug!(
875                "join multicast group V6 on addr {} with index {}",
876                ip, if_index
877            );
878            my_sock
879                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
880                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
881        }
882    }
883    Ok(())
884}
885
886impl Zeroconf {
887    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
888        // Get interfaces.
889        let my_ifaddrs = my_ip_interfaces(false);
890
891        // Create a socket for every IP addr.
892        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
893        // or the same interface name with different IP addrs.
894        let mut my_intfs = HashMap::new();
895        let mut dns_registry_map = HashMap::new();
896
897        // Use the same socket for receiving and sending multicast packets.
898        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
899        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
900        let sock = new_socket(addr.into(), true).unwrap();
901
902        // Per RFC 6762 section 11:
903        // "All Multicast DNS responses (including responses sent via unicast) SHOULD
904        // be sent with IP TTL set to 255."
905        // Here we set the TTL to 255 for multicast as we don't support unicast yet.
906        sock.set_multicast_ttl_v4(255)
907            .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
908            .unwrap();
909
910        // This clones a socket.
911        let ipv4_sock = MyUdpSocket::new(sock).unwrap();
912
913        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
914        let sock = new_socket(addr.into(), true).unwrap();
915
916        // Per RFC 6762 section 11:
917        // "All Multicast DNS responses (including responses sent via unicast) SHOULD
918        // be sent with IP TTL set to 255."
919        sock.set_multicast_hops_v6(255)
920            .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
921            .unwrap();
922
923        // This clones the ipv6 socket.
924        let ipv6_sock = MyUdpSocket::new(sock).unwrap();
925
926        // Configure sockets to join multicast groups.
927        for intf in my_ifaddrs {
928            let sock = if intf.ip().is_ipv4() {
929                &ipv4_sock
930            } else {
931                &ipv6_sock
932            };
933
934            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
935                debug!(
936                    "config socket to join multicast: {}: {e}. Skipped.",
937                    &intf.ip()
938                );
939            }
940
941            let if_index = intf.index.unwrap_or(0);
942
943            // Add this interface address if not already present.
944            dns_registry_map
945                .entry(if_index)
946                .or_insert_with(DnsRegistry::new);
947
948            my_intfs
949                .entry(if_index)
950                .and_modify(|v: &mut MyIntf| {
951                    v.addrs.insert(intf.addr.clone());
952                })
953                .or_insert(MyIntf {
954                    name: intf.name.clone(),
955                    index: if_index,
956                    addrs: HashSet::from([intf.addr]),
957                });
958        }
959
960        let monitors = Vec::new();
961        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
962        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
963
964        let timers = BinaryHeap::new();
965
966        // Disable loopback by default.
967        let if_selections = vec![
968            IfSelection {
969                if_kind: IfKind::LoopbackV4,
970                selected: false,
971            },
972            IfSelection {
973                if_kind: IfKind::LoopbackV6,
974                selected: false,
975            },
976        ];
977
978        let status = DaemonStatus::Running;
979
980        Self {
981            my_intfs,
982            ipv4_sock,
983            ipv6_sock,
984            // poll_ids: HashMap::new(),
985            my_services: HashMap::new(),
986            cache: DnsCache::new(),
987            dns_registry_map,
988            hostname_resolvers: HashMap::new(),
989            service_queriers: HashMap::new(),
990            retransmissions: Vec::new(),
991            counters: HashMap::new(),
992            poller,
993            monitors,
994            service_name_len_max,
995            ip_check_interval,
996            if_selections,
997            signal_sock,
998            timers,
999            status,
1000            pending_resolves: HashSet::new(),
1001            resolved: HashSet::new(),
1002            multicast_loop_v4: true,
1003            multicast_loop_v6: true,
1004
1005            #[cfg(test)]
1006            test_down_interfaces: HashSet::new(),
1007        }
1008    }
1009
1010    /// The main event loop of the daemon thread
1011    ///
1012    /// In each round, it will:
1013    /// 1. select the listening sockets with a timeout.
1014    /// 2. process the incoming packets if any.
1015    /// 3. try_recv on its channel and execute commands.
1016    /// 4. announce its registered services.
1017    /// 5. process retransmissions if any.
1018    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1019        // Add the daemon's signal socket to the poller.
1020        if let Err(e) = self.poller.registry().register(
1021            &mut self.signal_sock,
1022            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1023            mio::Interest::READABLE,
1024        ) {
1025            debug!("failed to add signal socket to the poller: {}", e);
1026            return None;
1027        }
1028
1029        if let Err(e) = self.poller.registry().register(
1030            &mut self.ipv4_sock,
1031            mio::Token(IPV4_SOCK_EVENT_KEY),
1032            mio::Interest::READABLE,
1033        ) {
1034            debug!("failed to register ipv4 socket: {}", e);
1035            return None;
1036        }
1037
1038        if let Err(e) = self.poller.registry().register(
1039            &mut self.ipv6_sock,
1040            mio::Token(IPV6_SOCK_EVENT_KEY),
1041            mio::Interest::READABLE,
1042        ) {
1043            debug!("failed to register ipv6 socket: {}", e);
1044            return None;
1045        }
1046
1047        // Setup timer for IP checks.
1048        let mut next_ip_check = if self.ip_check_interval > 0 {
1049            current_time_millis() + self.ip_check_interval
1050        } else {
1051            0
1052        };
1053
1054        if next_ip_check > 0 {
1055            self.add_timer(next_ip_check);
1056        }
1057
1058        // Start the run loop.
1059
1060        let mut events = mio::Events::with_capacity(1024);
1061        loop {
1062            let now = current_time_millis();
1063
1064            let earliest_timer = self.peek_earliest_timer();
1065            let timeout = earliest_timer.map(|timer| {
1066                // If `timer` already passed, set `timeout` to be 1ms.
1067                let millis = if timer > now { timer - now } else { 1 };
1068                Duration::from_millis(millis)
1069            });
1070
1071            // Process incoming packets, command events and optional timeout.
1072            events.clear();
1073            match self.poller.poll(&mut events, timeout) {
1074                Ok(_) => self.handle_poller_events(&events),
1075                Err(e) => debug!("failed to select from sockets: {}", e),
1076            }
1077
1078            let now = current_time_millis();
1079
1080            // Remove the timers if already passed.
1081            self.pop_timers_till(now);
1082
1083            // Remove hostname resolvers with expired timeouts.
1084            for hostname in self
1085                .hostname_resolvers
1086                .clone()
1087                .into_iter()
1088                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1089                .map(|(hostname, _)| hostname)
1090            {
1091                trace!("hostname resolver timeout for {}", &hostname);
1092                call_hostname_resolution_listener(
1093                    &self.hostname_resolvers,
1094                    &hostname,
1095                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1096                );
1097                call_hostname_resolution_listener(
1098                    &self.hostname_resolvers,
1099                    &hostname,
1100                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1101                );
1102                self.hostname_resolvers.remove(&hostname);
1103            }
1104
1105            // process commands from the command channel
1106            while let Ok(command) = receiver.try_recv() {
1107                if matches!(command, Command::Exit(_)) {
1108                    self.status = DaemonStatus::Shutdown;
1109                    return Some(command);
1110                }
1111                self.exec_command(command, false);
1112            }
1113
1114            // check for repeated commands and run them if their time is up.
1115            let mut i = 0;
1116            while i < self.retransmissions.len() {
1117                if now >= self.retransmissions[i].next_time {
1118                    let rerun = self.retransmissions.remove(i);
1119                    self.exec_command(rerun.command, true);
1120                } else {
1121                    i += 1;
1122                }
1123            }
1124
1125            // Refresh cached service records with active queriers
1126            self.refresh_active_services();
1127
1128            // Refresh cached A/AAAA records with active queriers
1129            let mut query_count = 0;
1130            for (hostname, _sender) in self.hostname_resolvers.iter() {
1131                for (hostname, ip_addr) in
1132                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1133                {
1134                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1135                    query_count += 1;
1136                }
1137            }
1138
1139            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1140
1141            // check and evict expired records in our cache
1142            let now = current_time_millis();
1143
1144            // Notify service listeners about the expired records.
1145            let expired_services = self.cache.evict_expired_services(now);
1146            if !expired_services.is_empty() {
1147                debug!(
1148                    "run: send {} service removal to listeners",
1149                    expired_services.len()
1150                );
1151                self.notify_service_removal(expired_services);
1152            }
1153
1154            // Notify hostname listeners about the expired records.
1155            let expired_addrs = self.cache.evict_expired_addr(now);
1156            for (hostname, addrs) in expired_addrs {
1157                call_hostname_resolution_listener(
1158                    &self.hostname_resolvers,
1159                    &hostname,
1160                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1161                );
1162                let instances = self.cache.get_instances_on_host(&hostname);
1163                let instance_set: HashSet<String> = instances.into_iter().collect();
1164                self.resolve_updated_instances(&instance_set);
1165            }
1166
1167            // Send out probing queries.
1168            self.probing_handler();
1169
1170            // check IP changes if next_ip_check is reached.
1171            if now >= next_ip_check && next_ip_check > 0 {
1172                next_ip_check = now + self.ip_check_interval;
1173                self.add_timer(next_ip_check);
1174
1175                self.check_ip_changes();
1176            }
1177        }
1178    }
1179
1180    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1181        match daemon_opt {
1182            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1183            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1184            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1185            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1186            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1187            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1188            #[cfg(test)]
1189            DaemonOption::TestDownInterface(ifname) => {
1190                self.test_down_interfaces.insert(ifname);
1191            }
1192            #[cfg(test)]
1193            DaemonOption::TestUpInterface(ifname) => {
1194                self.test_down_interfaces.remove(&ifname);
1195            }
1196        }
1197    }
1198
1199    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1200        debug!("enable_interface: {:?}", kinds);
1201        for if_kind in kinds {
1202            self.if_selections.push(IfSelection {
1203                if_kind,
1204                selected: true,
1205            });
1206        }
1207
1208        self.apply_intf_selections(my_ip_interfaces(true));
1209    }
1210
1211    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1212        debug!("disable_interface: {:?}", kinds);
1213        for if_kind in kinds {
1214            self.if_selections.push(IfSelection {
1215                if_kind,
1216                selected: false,
1217            });
1218        }
1219
1220        self.apply_intf_selections(my_ip_interfaces(true));
1221    }
1222
1223    fn set_multicast_loop_v4(&mut self, on: bool) {
1224        self.multicast_loop_v4 = on;
1225        self.ipv4_sock
1226            .pktinfo
1227            .set_multicast_loop_v4(on)
1228            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1229            .unwrap();
1230    }
1231
1232    fn set_multicast_loop_v6(&mut self, on: bool) {
1233        self.multicast_loop_v6 = on;
1234        self.ipv6_sock
1235            .pktinfo
1236            .set_multicast_loop_v6(on)
1237            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1238            .unwrap();
1239    }
1240
1241    fn notify_monitors(&mut self, event: DaemonEvent) {
1242        // Only retain the monitors that are still connected.
1243        self.monitors.retain(|sender| {
1244            if let Err(e) = sender.try_send(event.clone()) {
1245                debug!("notify_monitors: try_send: {}", &e);
1246                if matches!(e, TrySendError::Disconnected(_)) {
1247                    return false; // This monitor is dropped.
1248                }
1249            }
1250            true
1251        });
1252    }
1253
1254    /// Remove `addr` in my services that enabled `addr_auto`.
1255    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1256        for (_, service_info) in self.my_services.iter_mut() {
1257            if service_info.is_addr_auto() {
1258                service_info.remove_ipaddr(addr);
1259            }
1260        }
1261    }
1262
1263    fn add_timer(&mut self, next_time: u64) {
1264        self.timers.push(Reverse(next_time));
1265    }
1266
1267    fn peek_earliest_timer(&self) -> Option<u64> {
1268        self.timers.peek().map(|Reverse(v)| *v)
1269    }
1270
1271    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1272        self.timers.pop().map(|Reverse(v)| v)
1273    }
1274
1275    /// Pop all timers that are already passed till `now`.
1276    fn pop_timers_till(&mut self, now: u64) {
1277        while let Some(Reverse(v)) = self.timers.peek() {
1278            if *v > now {
1279                break;
1280            }
1281            self.timers.pop();
1282        }
1283    }
1284
1285    /// Apply all selections to `interfaces` and return the selected addresses.
1286    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1287        let intf_count = interfaces.len();
1288        let mut intf_selections = vec![true; intf_count];
1289
1290        // apply if_selections
1291        for selection in self.if_selections.iter() {
1292            // Mark the interfaces for this selection.
1293            for i in 0..intf_count {
1294                if selection.if_kind.matches(&interfaces[i]) {
1295                    intf_selections[i] = selection.selected;
1296                }
1297            }
1298        }
1299
1300        let mut selected_addrs = HashSet::new();
1301        for i in 0..intf_count {
1302            if intf_selections[i] {
1303                selected_addrs.insert(interfaces[i].addr.ip());
1304            }
1305        }
1306
1307        selected_addrs
1308    }
1309
1310    /// Apply all selections to `interfaces`.
1311    ///
1312    /// For any interface, add it if selected but not bound yet,
1313    /// delete it if not selected but still bound.
1314    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1315        // By default, we enable all interfaces.
1316        let intf_count = interfaces.len();
1317        let mut intf_selections = vec![true; intf_count];
1318
1319        // apply if_selections
1320        for selection in self.if_selections.iter() {
1321            // Mark the interfaces for this selection.
1322            for i in 0..intf_count {
1323                if selection.if_kind.matches(&interfaces[i]) {
1324                    intf_selections[i] = selection.selected;
1325                }
1326            }
1327        }
1328
1329        // Update `my_intfs` based on the selections.
1330        for (idx, intf) in interfaces.into_iter().enumerate() {
1331            if intf_selections[idx] {
1332                // Add the interface
1333                self.add_interface(intf);
1334            } else {
1335                // Remove the interface
1336                self.del_interface(&intf);
1337            }
1338        }
1339    }
1340
1341    fn del_ip(&mut self, ip: IpAddr) {
1342        self.del_addr_in_my_services(&ip);
1343        self.notify_monitors(DaemonEvent::IpDel(ip));
1344    }
1345
1346    /// Check for IP changes and update [my_intfs] as needed.
1347    fn check_ip_changes(&mut self) {
1348        // Get the current interfaces.
1349        let my_ifaddrs = my_ip_interfaces(true);
1350
1351        #[cfg(test)]
1352        let my_ifaddrs: Vec<_> = my_ifaddrs
1353            .into_iter()
1354            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1355            .collect();
1356
1357        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1358            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1359                let if_index = intf.index.unwrap_or(0);
1360                acc.entry(if_index).or_default().push(&intf.addr);
1361                acc
1362            });
1363
1364        let mut deleted_intfs = Vec::new();
1365        let mut deleted_ips = Vec::new();
1366
1367        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1368            let mut last_ipv4 = None;
1369            let mut last_ipv6 = None;
1370
1371            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1372                my_intf.addrs.retain(|addr| {
1373                    if current_addrs.contains(&addr) {
1374                        true
1375                    } else {
1376                        match addr.ip() {
1377                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1378                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1379                        }
1380                        deleted_ips.push(addr.ip());
1381                        false
1382                    }
1383                });
1384                if my_intf.addrs.is_empty() {
1385                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1386                }
1387            } else {
1388                // If it does not exist, remove the interface.
1389                debug!(
1390                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1391                    my_intf.name, if_index
1392                );
1393                for addr in my_intf.addrs.iter() {
1394                    match addr.ip() {
1395                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1396                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1397                    }
1398                    deleted_ips.push(addr.ip())
1399                }
1400                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1401            }
1402        }
1403
1404        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1405            debug!(
1406                "check_ip_changes: {} deleted ips {} deleted intfs",
1407                deleted_ips.len(),
1408                deleted_intfs.len()
1409            );
1410        }
1411
1412        for ip in deleted_ips {
1413            self.del_ip(ip);
1414        }
1415
1416        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1417            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1418                continue;
1419            };
1420
1421            if let Some(ipv4) = last_ipv4 {
1422                debug!("leave multicast for {ipv4}");
1423                if let Err(e) = self
1424                    .ipv4_sock
1425                    .pktinfo
1426                    .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1427                {
1428                    debug!("leave multicast group for addr {ipv4}: {e}");
1429                }
1430            }
1431
1432            if let Some(ipv6) = last_ipv6 {
1433                debug!("leave multicast for {ipv6}");
1434                if let Err(e) = self
1435                    .ipv6_sock
1436                    .pktinfo
1437                    .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1438                {
1439                    debug!("leave multicast group for IPv6: {ipv6}: {e}");
1440                }
1441            }
1442
1443            // Remove cache records for this interface.
1444            let intf_id = InterfaceId {
1445                name: my_intf.name.to_string(),
1446                index: my_intf.index,
1447            };
1448            let removed_instances = self.cache.remove_records_on_intf(intf_id);
1449            self.notify_service_removal(removed_instances);
1450        }
1451
1452        // Add newly found interfaces only if in our selections.
1453        self.apply_intf_selections(my_ifaddrs);
1454    }
1455
1456    fn del_interface(&mut self, intf: &Interface) {
1457        let if_index = intf.index.unwrap_or(0);
1458        trace!(
1459            "del_interface: {} ({if_index}) addr {}",
1460            intf.name,
1461            intf.ip()
1462        );
1463
1464        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1465            debug!("del_interface: interface {} not found", intf.name);
1466            return;
1467        };
1468
1469        let mut ip_removed = false;
1470
1471        if my_intf.addrs.remove(&intf.addr) {
1472            ip_removed = true;
1473
1474            match intf.addr.ip() {
1475                IpAddr::V4(ipv4) => {
1476                    if my_intf.next_ifaddr_v4().is_none() {
1477                        if let Err(e) = self
1478                            .ipv4_sock
1479                            .pktinfo
1480                            .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1481                        {
1482                            debug!("leave multicast group for addr {ipv4}: {e}");
1483                        }
1484                    }
1485                }
1486
1487                IpAddr::V6(ipv6) => {
1488                    if my_intf.next_ifaddr_v6().is_none() {
1489                        if let Err(e) = self
1490                            .ipv6_sock
1491                            .pktinfo
1492                            .leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1493                        {
1494                            debug!("leave multicast group for addr {ipv6}: {e}");
1495                        }
1496                    }
1497                }
1498            }
1499
1500            if my_intf.addrs.is_empty() {
1501                // If no more addresses, remove the interface.
1502                debug!("del_interface: removing interface {}", intf.name);
1503                self.my_intfs.remove(&if_index);
1504                self.dns_registry_map.remove(&if_index);
1505                self.cache.remove_addrs_on_disabled_intf(if_index);
1506            }
1507        }
1508
1509        if ip_removed {
1510            // Notify the monitors.
1511            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1512            // Remove the interface from my services that enabled `addr_auto`.
1513            self.del_addr_in_my_services(&intf.ip());
1514        }
1515    }
1516
1517    fn add_interface(&mut self, intf: Interface) {
1518        let sock = if intf.ip().is_ipv4() {
1519            &self.ipv4_sock
1520        } else {
1521            &self.ipv6_sock
1522        };
1523
1524        let if_index = intf.index.unwrap_or(0);
1525        let mut new_addr = false;
1526
1527        match self.my_intfs.entry(if_index) {
1528            Entry::Occupied(mut entry) => {
1529                // If intf has a new address, add it to the existing interface.
1530                let my_intf = entry.get_mut();
1531                if !my_intf.addrs.contains(&intf.addr) {
1532                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1533                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1534                    }
1535                    my_intf.addrs.insert(intf.addr.clone());
1536                    new_addr = true;
1537                }
1538            }
1539            Entry::Vacant(entry) => {
1540                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1541                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1542                    return;
1543                }
1544
1545                new_addr = true;
1546                let new_intf = MyIntf {
1547                    name: intf.name.clone(),
1548                    index: if_index,
1549                    addrs: HashSet::from([intf.addr.clone()]),
1550                };
1551                entry.insert(new_intf);
1552            }
1553        }
1554
1555        if !new_addr {
1556            trace!("add_interface: interface {} already exists", &intf.name);
1557            return;
1558        }
1559
1560        debug!("add new interface {}: {}", intf.name, intf.ip());
1561
1562        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1563            debug!("add_interface: cannot find if_index {if_index}");
1564            return;
1565        };
1566
1567        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1568            Some(registry) => registry,
1569            None => self
1570                .dns_registry_map
1571                .entry(if_index)
1572                .or_insert_with(DnsRegistry::new),
1573        };
1574
1575        for (_, service_info) in self.my_services.iter_mut() {
1576            if service_info.is_addr_auto() {
1577                let new_ip = intf.ip();
1578                service_info.insert_ipaddr(new_ip);
1579
1580                if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1581                    debug!(
1582                        "Announce service {} on {}",
1583                        service_info.get_fullname(),
1584                        intf.ip()
1585                    );
1586                    service_info.set_status(if_index, ServiceStatus::Announced);
1587                } else {
1588                    for timer in dns_registry.new_timers.drain(..) {
1589                        self.timers.push(Reverse(timer));
1590                    }
1591                    service_info.set_status(if_index, ServiceStatus::Probing);
1592                }
1593            }
1594        }
1595
1596        // As we added a new interface, we want to execute all active "Browse" reruns now.
1597        let mut browse_reruns = Vec::new();
1598        let mut i = 0;
1599        while i < self.retransmissions.len() {
1600            if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1601                browse_reruns.push(self.retransmissions.remove(i));
1602            } else {
1603                i += 1;
1604            }
1605        }
1606
1607        for rerun in browse_reruns {
1608            self.exec_command(rerun.command, true);
1609        }
1610
1611        // Notify the monitors.
1612        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1613    }
1614
1615    /// Registers a service.
1616    ///
1617    /// RFC 6762 section 8.3.
1618    /// ...the Multicast DNS responder MUST send
1619    ///    an unsolicited Multicast DNS response containing, in the Answer
1620    ///    Section, all of its newly registered resource records
1621    ///
1622    /// Zeroconf will then respond to requests for information about this service.
1623    fn register_service(&mut self, mut info: ServiceInfo) {
1624        // Check the service name length.
1625        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1626            debug!("check_service_name_length: {}", &e);
1627            self.notify_monitors(DaemonEvent::Error(e));
1628            return;
1629        }
1630
1631        if info.is_addr_auto() {
1632            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1633            for addr in selected_addrs {
1634                info.insert_ipaddr(addr);
1635            }
1636        }
1637
1638        debug!("register service {:?}", &info);
1639
1640        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1641        if !outgoing_addrs.is_empty() {
1642            self.notify_monitors(DaemonEvent::Announce(
1643                info.get_fullname().to_string(),
1644                format!("{:?}", &outgoing_addrs),
1645            ));
1646        }
1647
1648        // The key has to be lower case letter as DNS record name is case insensitive.
1649        // The info will have the original name.
1650        let service_fullname = info.get_fullname().to_lowercase();
1651        self.my_services.insert(service_fullname, info);
1652    }
1653
1654    /// Sends out announcement of `info` on every valid interface.
1655    /// Returns the list of interface IPs that sent out the announcement.
1656    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1657        let mut outgoing_addrs = Vec::new();
1658        let mut outgoing_intfs = HashSet::new();
1659
1660        for (if_index, intf) in self.my_intfs.iter() {
1661            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1662                Some(registry) => registry,
1663                None => self
1664                    .dns_registry_map
1665                    .entry(*if_index)
1666                    .or_insert_with(DnsRegistry::new),
1667            };
1668
1669            let mut announced = false;
1670
1671            // IPv4
1672            if announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo) {
1673                for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1674                    outgoing_addrs.push(addr.ip());
1675                }
1676                outgoing_intfs.insert(intf.index);
1677
1678                debug!(
1679                    "Announce service IPv4 {} on {}",
1680                    info.get_fullname(),
1681                    intf.name
1682                );
1683                announced = true;
1684            }
1685
1686            if announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo) {
1687                for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1688                    outgoing_addrs.push(addr.ip());
1689                }
1690                outgoing_intfs.insert(intf.index);
1691
1692                debug!(
1693                    "Announce service IPv6 {} on {}",
1694                    info.get_fullname(),
1695                    intf.name
1696                );
1697                announced = true;
1698            }
1699
1700            if announced {
1701                info.set_status(intf.index, ServiceStatus::Announced);
1702            } else {
1703                for timer in dns_registry.new_timers.drain(..) {
1704                    self.timers.push(Reverse(timer));
1705                }
1706                info.set_status(*if_index, ServiceStatus::Probing);
1707            }
1708        }
1709
1710        // RFC 6762 section 8.3.
1711        // ..The Multicast DNS responder MUST send at least two unsolicited
1712        //    responses, one second apart.
1713        let next_time = current_time_millis() + 1000;
1714        for if_index in outgoing_intfs {
1715            self.add_retransmission(
1716                next_time,
1717                Command::RegisterResend(info.get_fullname().to_string(), if_index),
1718            );
1719        }
1720
1721        outgoing_addrs
1722    }
1723
1724    /// Send probings or finish them if expired. Notify waiting services.
1725    fn probing_handler(&mut self) {
1726        let now = current_time_millis();
1727
1728        for (if_index, intf) in self.my_intfs.iter() {
1729            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1730                continue;
1731            };
1732
1733            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1734
1735            // send probing.
1736            if !out.questions().is_empty() {
1737                trace!("sending out probing of questions: {:?}", out.questions());
1738                send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1739                send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1740            }
1741
1742            // For finished probes, wake up services that are waiting for the probes.
1743            let waiting_services =
1744                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1745
1746            for service_name in waiting_services {
1747                // service names are lowercase
1748                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1749                    if info.get_status(*if_index) == ServiceStatus::Announced {
1750                        debug!("service {} already announced", info.get_fullname());
1751                        continue;
1752                    }
1753
1754                    let announced_v4 =
1755                        announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
1756                    let announced_v6 =
1757                        announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
1758
1759                    if announced_v4 || announced_v6 {
1760                        let next_time = now + 1000;
1761                        let command =
1762                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1763                        self.retransmissions.push(ReRun { next_time, command });
1764                        self.timers.push(Reverse(next_time));
1765
1766                        let fullname = match dns_registry.name_changes.get(&service_name) {
1767                            Some(new_name) => new_name.to_string(),
1768                            None => service_name.to_string(),
1769                        };
1770
1771                        let mut hostname = info.get_hostname();
1772                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1773                            hostname = new_name;
1774                        }
1775
1776                        debug!("wake up: announce service {} on {}", fullname, intf.name);
1777                        notify_monitors(
1778                            &mut self.monitors,
1779                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1780                        );
1781
1782                        info.set_status(*if_index, ServiceStatus::Announced);
1783                    }
1784                }
1785            }
1786        }
1787    }
1788
1789    fn unregister_service(
1790        &self,
1791        info: &ServiceInfo,
1792        intf: &MyIntf,
1793        sock: &PktInfoUdpSocket,
1794    ) -> Vec<u8> {
1795        let is_ipv4 = sock.domain() == Domain::IPV4;
1796
1797        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1798        out.add_answer_at_time(
1799            DnsPointer::new(
1800                info.get_type(),
1801                RRType::PTR,
1802                CLASS_IN,
1803                0,
1804                info.get_fullname().to_string(),
1805            ),
1806            0,
1807        );
1808
1809        if let Some(sub) = info.get_subtype() {
1810            trace!("Adding subdomain {}", sub);
1811            out.add_answer_at_time(
1812                DnsPointer::new(
1813                    sub,
1814                    RRType::PTR,
1815                    CLASS_IN,
1816                    0,
1817                    info.get_fullname().to_string(),
1818                ),
1819                0,
1820            );
1821        }
1822
1823        out.add_answer_at_time(
1824            DnsSrv::new(
1825                info.get_fullname(),
1826                CLASS_IN | CLASS_CACHE_FLUSH,
1827                0,
1828                info.get_priority(),
1829                info.get_weight(),
1830                info.get_port(),
1831                info.get_hostname().to_string(),
1832            ),
1833            0,
1834        );
1835        out.add_answer_at_time(
1836            DnsTxt::new(
1837                info.get_fullname(),
1838                CLASS_IN | CLASS_CACHE_FLUSH,
1839                0,
1840                info.generate_txt(),
1841            ),
1842            0,
1843        );
1844
1845        let if_addrs = if is_ipv4 {
1846            info.get_addrs_on_my_intf_v4(intf)
1847        } else {
1848            info.get_addrs_on_my_intf_v6(intf)
1849        };
1850
1851        if if_addrs.is_empty() {
1852            return vec![];
1853        }
1854
1855        for address in if_addrs {
1856            out.add_answer_at_time(
1857                DnsAddress::new(
1858                    info.get_hostname(),
1859                    ip_address_rr_type(&address),
1860                    CLASS_IN | CLASS_CACHE_FLUSH,
1861                    0,
1862                    address,
1863                    intf.into(),
1864                ),
1865                0,
1866            );
1867        }
1868
1869        // `out` data is non-empty, hence we can do this.
1870        send_dns_outgoing(&out, intf, sock).remove(0)
1871    }
1872
1873    /// Binds a channel `listener` to querying mDNS hostnames.
1874    ///
1875    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1876    fn add_hostname_resolver(
1877        &mut self,
1878        hostname: String,
1879        listener: Sender<HostnameResolutionEvent>,
1880        timeout: Option<u64>,
1881    ) {
1882        let real_timeout = timeout.map(|t| current_time_millis() + t);
1883        self.hostname_resolvers
1884            .insert(hostname.to_lowercase(), (listener, real_timeout));
1885        if let Some(t) = real_timeout {
1886            self.add_timer(t);
1887        }
1888    }
1889
1890    /// Sends a multicast query for `name` with `qtype`.
1891    fn send_query(&self, name: &str, qtype: RRType) {
1892        self.send_query_vec(&[(name, qtype)]);
1893    }
1894
1895    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1896    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1897        trace!("Sending query questions: {:?}", questions);
1898        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1899        let now = current_time_millis();
1900
1901        for (name, qtype) in questions {
1902            out.add_question(name, *qtype);
1903
1904            for record in self.cache.get_known_answers(name, *qtype, now) {
1905                /*
1906                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1907                ...
1908                    When a Multicast DNS querier sends a query to which it already knows
1909                    some answers, it populates the Answer Section of the DNS query
1910                    message with those answers.
1911                 */
1912                trace!("add known answer: {:?}", record.record);
1913                let mut new_record = record.record.clone();
1914                new_record.get_record_mut().update_ttl(now);
1915                out.add_answer_box(new_record);
1916            }
1917        }
1918
1919        for (_, intf) in self.my_intfs.iter() {
1920            send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1921            send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1922        }
1923    }
1924
1925    /// Reads one UDP datagram from the socket of `intf`.
1926    ///
1927    /// Returns false if failed to receive a packet,
1928    /// otherwise returns true.
1929    fn handle_read(&mut self, event_key: usize) -> bool {
1930        let sock = match event_key {
1931            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
1932            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
1933            _ => {
1934                debug!("handle_read: unknown token {}", event_key);
1935                return false;
1936            }
1937        };
1938        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1939
1940        // Read the next mDNS UDP datagram.
1941        //
1942        // If the datagram is larger than `buf`, excess bytes may or may not
1943        // be truncated by the socket layer depending on the platform's libc.
1944        // In any case, such large datagram will not be decoded properly and
1945        // this function should return false but should not crash.
1946        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
1947            Ok(sz) => sz,
1948            Err(e) => {
1949                if e.kind() != std::io::ErrorKind::WouldBlock {
1950                    debug!("listening socket read failed: {}", e);
1951                }
1952                return false;
1953            }
1954        };
1955
1956        // Find the interface that received the packet.
1957        let pkt_if_index = pktinfo.if_index as u32;
1958        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
1959            debug!(
1960                "handle_read: no interface found for pktinfo if_index: {}",
1961                pktinfo.if_index
1962            );
1963            return true; // We still return true to indicate that we read something.
1964        };
1965
1966        buf.truncate(sz); // reduce potential processing errors
1967
1968        match DnsIncoming::new(buf, my_intf.into()) {
1969            Ok(msg) => {
1970                if msg.is_query() {
1971                    self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
1972                } else if msg.is_response() {
1973                    self.handle_response(msg, pkt_if_index);
1974                } else {
1975                    debug!("Invalid message: not query and not response");
1976                }
1977            }
1978            Err(e) => debug!("Invalid incoming DNS message: {}", e),
1979        }
1980
1981        true
1982    }
1983
1984    /// Returns true, if sent query. Returns false if SRV already exists.
1985    fn query_unresolved(&mut self, instance: &str) -> bool {
1986        if !valid_instance_name(instance) {
1987            trace!("instance name {} not valid", instance);
1988            return false;
1989        }
1990
1991        if let Some(records) = self.cache.get_srv(instance) {
1992            for record in records {
1993                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
1994                    if self.cache.get_addr(srv.host()).is_none() {
1995                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1996                        return true;
1997                    }
1998                }
1999            }
2000        } else {
2001            self.send_query(instance, RRType::ANY);
2002            return true;
2003        }
2004
2005        false
2006    }
2007
2008    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2009    /// cached records via `sender`.
2010    fn query_cache_for_service(
2011        &mut self,
2012        ty_domain: &str,
2013        sender: &Sender<ServiceEvent>,
2014        now: u64,
2015    ) {
2016        let mut resolved: HashSet<String> = HashSet::new();
2017        let mut unresolved: HashSet<String> = HashSet::new();
2018
2019        if let Some(records) = self.cache.get_ptr(ty_domain) {
2020            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2021                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2022                    let mut new_event = None;
2023                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2024                        Ok(resolved_service) => {
2025                            if resolved_service.is_valid() {
2026                                debug!("Resolved service from cache: {}", ptr.alias());
2027                                new_event =
2028                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2029                            } else {
2030                                debug!("Resolved service is not valid: {}", ptr.alias());
2031                            }
2032                        }
2033                        Err(err) => {
2034                            debug!("Error while resolving service from cache: {}", err);
2035                            continue;
2036                        }
2037                    }
2038
2039                    match sender.send(ServiceEvent::ServiceFound(
2040                        ty_domain.to_string(),
2041                        ptr.alias().to_string(),
2042                    )) {
2043                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2044                        Err(e) => {
2045                            debug!("failed to send service found: {}", e);
2046                            continue;
2047                        }
2048                    }
2049
2050                    if let Some(event) = new_event {
2051                        resolved.insert(ptr.alias().to_string());
2052                        match sender.send(event) {
2053                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2054                            Err(e) => debug!("failed to send service resolved: {}", e),
2055                        }
2056                    } else {
2057                        unresolved.insert(ptr.alias().to_string());
2058                    }
2059                }
2060            }
2061        }
2062
2063        for instance in resolved.drain() {
2064            self.pending_resolves.remove(&instance);
2065            self.resolved.insert(instance);
2066        }
2067
2068        for instance in unresolved.drain() {
2069            self.add_pending_resolve(instance);
2070        }
2071    }
2072
2073    /// Checks if `hostname` has records in the cache. If yes, sends the
2074    /// cached records via `sender`.
2075    fn query_cache_for_hostname(
2076        &mut self,
2077        hostname: &str,
2078        sender: Sender<HostnameResolutionEvent>,
2079    ) {
2080        let addresses_map = self.cache.get_addresses_for_host(hostname);
2081        for (name, addresses) in addresses_map {
2082            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2083                Ok(()) => trace!("sent hostname addresses found"),
2084                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2085            }
2086        }
2087    }
2088
2089    fn add_pending_resolve(&mut self, instance: String) {
2090        if !self.pending_resolves.contains(&instance) {
2091            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2092            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2093            self.pending_resolves.insert(instance);
2094        }
2095    }
2096
2097    /// Creates a `ResolvedService` from the cache.
2098    fn resolve_service_from_cache(
2099        &self,
2100        ty_domain: &str,
2101        fullname: &str,
2102    ) -> Result<ResolvedService> {
2103        let now = current_time_millis();
2104        let mut resolved_service = ResolvedService {
2105            ty_domain: ty_domain.to_string(),
2106            sub_ty_domain: None,
2107            fullname: fullname.to_string(),
2108            host: String::new(),
2109            port: 0,
2110            addresses: HashSet::new(),
2111            txt_properties: TxtProperties::new(),
2112        };
2113
2114        // Be sure setting `subtype` if available even when querying for the parent domain.
2115        if let Some(subtype) = self.cache.get_subtype(fullname) {
2116            trace!(
2117                "ty_domain: {} found subtype {} for instance: {}",
2118                ty_domain,
2119                subtype,
2120                fullname
2121            );
2122            if resolved_service.sub_ty_domain.is_none() {
2123                resolved_service.sub_ty_domain = Some(subtype.to_string());
2124            }
2125        }
2126
2127        // resolve SRV record
2128        if let Some(records) = self.cache.get_srv(fullname) {
2129            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2130                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2131                    resolved_service.host = dns_srv.host().to_string();
2132                    resolved_service.port = dns_srv.port();
2133                }
2134            }
2135        }
2136
2137        // resolve TXT record
2138        if let Some(records) = self.cache.get_txt(fullname) {
2139            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2140                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2141                    resolved_service.txt_properties = dns_txt.text().into();
2142                }
2143            }
2144        }
2145
2146        // resolve A and AAAA records
2147        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2148            for answer in records.iter() {
2149                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2150                    if dns_a.expires_soon(now) {
2151                        trace!(
2152                            "Addr expired or expires soon: {}",
2153                            dns_a.address().to_ip_addr()
2154                        );
2155                    } else {
2156                        resolved_service.addresses.insert(dns_a.address());
2157                    }
2158                }
2159            }
2160        }
2161
2162        Ok(resolved_service)
2163    }
2164
2165    fn handle_poller_events(&mut self, events: &mio::Events) {
2166        for ev in events.iter() {
2167            trace!("event received with key {:?}", ev.token());
2168            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2169                // Drain signals as we will drain commands as well.
2170                self.signal_sock_drain();
2171
2172                if let Err(e) = self.poller.registry().reregister(
2173                    &mut self.signal_sock,
2174                    ev.token(),
2175                    mio::Interest::READABLE,
2176                ) {
2177                    debug!("failed to modify poller for signal socket: {}", e);
2178                }
2179                continue; // Next event.
2180            }
2181
2182            // Read until no more packets available.
2183            while self.handle_read(ev.token().0) {}
2184
2185            // we continue to monitor this socket.
2186            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2187                // Re-register the IPv4 socket for reading.
2188                if let Err(e) = self.poller.registry().reregister(
2189                    &mut self.ipv4_sock,
2190                    ev.token(),
2191                    mio::Interest::READABLE,
2192                ) {
2193                    debug!("modify poller for IPv4 socket: {}", e);
2194                }
2195            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2196                // Re-register the IPv6 socket for reading.
2197                if let Err(e) = self.poller.registry().reregister(
2198                    &mut self.ipv6_sock,
2199                    ev.token(),
2200                    mio::Interest::READABLE,
2201                ) {
2202                    debug!("modify poller for IPv6 socket: {}", e);
2203                }
2204            }
2205        }
2206    }
2207
2208    /// Deal with incoming response packets.  All answers
2209    /// are held in the cache, and listeners are notified.
2210    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2211        let now = current_time_millis();
2212
2213        // remove records that are expired.
2214        let mut record_predicate = |record: &DnsRecordBox| {
2215            if !record.get_record().is_expired(now) {
2216                return true;
2217            }
2218
2219            debug!("record is expired, removing it from cache.");
2220            if self.cache.remove(record) {
2221                // for PTR records, send event to listeners
2222                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2223                    call_service_listener(
2224                        &self.service_queriers,
2225                        dns_ptr.get_name(),
2226                        ServiceEvent::ServiceRemoved(
2227                            dns_ptr.get_name().to_string(),
2228                            dns_ptr.alias().to_string(),
2229                        ),
2230                    );
2231                }
2232            }
2233            false
2234        };
2235        msg.answers_mut().retain(&mut record_predicate);
2236        msg.authorities_mut().retain(&mut record_predicate);
2237        msg.additionals_mut().retain(&mut record_predicate);
2238
2239        // check possible conflicts and handle them.
2240        self.conflict_handler(&msg, if_index);
2241
2242        // check if the message is for us.
2243        let mut is_for_us = true; // assume it is for us.
2244
2245        // If there are any PTR records in the answers, there should be
2246        // at least one PTR for us. Otherwise, the message is not for us.
2247        // If there are no PTR records at all, assume this message is for us.
2248        for answer in msg.answers() {
2249            if answer.get_type() == RRType::PTR {
2250                if self.service_queriers.contains_key(answer.get_name()) {
2251                    is_for_us = true;
2252                    break; // OK to break: at least one PTR for us.
2253                } else {
2254                    is_for_us = false;
2255                }
2256            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2257                // If there is a hostname querier for this address, then it is for us.
2258                let answer_lowercase = answer.get_name().to_lowercase();
2259                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2260                    is_for_us = true;
2261                    break; // OK to break: at least one hostname for us.
2262                }
2263            }
2264        }
2265
2266        /// Represents a DNS record change that involves one service instance.
2267        struct InstanceChange {
2268            ty: RRType,   // The type of DNS record for the instance.
2269            name: String, // The name of the record.
2270        }
2271
2272        // Go through all answers to get the new and updated records.
2273        // For new PTR records, send out ServiceFound immediately. For others,
2274        // collect them into `changes`.
2275        //
2276        // Note: we don't try to identify the update instances based on
2277        // each record immediately as the answers are likely related to each
2278        // other.
2279        let mut changes = Vec::new();
2280        let mut timers = Vec::new();
2281        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2282            return;
2283        };
2284        for record in msg.all_records() {
2285            match self
2286                .cache
2287                .add_or_update(my_intf, record, &mut timers, is_for_us)
2288            {
2289                Some((dns_record, true)) => {
2290                    timers.push(dns_record.record.get_record().get_expire_time());
2291                    timers.push(dns_record.record.get_record().get_refresh_time());
2292
2293                    let ty = dns_record.record.get_type();
2294                    let name = dns_record.record.get_name();
2295
2296                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2297                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2298                        if self.service_queriers.contains_key(name) {
2299                            timers.push(dns_record.record.get_record().get_refresh_time());
2300                        }
2301
2302                        // send ServiceFound
2303                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2304                        {
2305                            debug!("calling listener with service found: {name}");
2306                            call_service_listener(
2307                                &self.service_queriers,
2308                                name,
2309                                ServiceEvent::ServiceFound(
2310                                    name.to_string(),
2311                                    dns_ptr.alias().to_string(),
2312                                ),
2313                            );
2314                            changes.push(InstanceChange {
2315                                ty,
2316                                name: dns_ptr.alias().to_string(),
2317                            });
2318                        }
2319                    } else {
2320                        changes.push(InstanceChange {
2321                            ty,
2322                            name: name.to_string(),
2323                        });
2324                    }
2325                }
2326                Some((dns_record, false)) => {
2327                    timers.push(dns_record.record.get_record().get_expire_time());
2328                    timers.push(dns_record.record.get_record().get_refresh_time());
2329                }
2330                _ => {}
2331            }
2332        }
2333
2334        // Add timers for the new records.
2335        for t in timers {
2336            self.add_timer(t);
2337        }
2338
2339        // Go through remaining changes to see if any hostname resolutions were found or updated.
2340        for change in changes
2341            .iter()
2342            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2343        {
2344            let addr_map = self.cache.get_addresses_for_host(&change.name);
2345            for (name, addresses) in addr_map {
2346                call_hostname_resolution_listener(
2347                    &self.hostname_resolvers,
2348                    &change.name,
2349                    HostnameResolutionEvent::AddressesFound(name, addresses),
2350                )
2351            }
2352        }
2353
2354        // Identify the instances that need to be "resolved".
2355        let mut updated_instances = HashSet::new();
2356        for update in changes {
2357            match update.ty {
2358                RRType::PTR | RRType::SRV | RRType::TXT => {
2359                    updated_instances.insert(update.name);
2360                }
2361                RRType::A | RRType::AAAA => {
2362                    let instances = self.cache.get_instances_on_host(&update.name);
2363                    updated_instances.extend(instances);
2364                }
2365                _ => {}
2366            }
2367        }
2368
2369        self.resolve_updated_instances(&updated_instances);
2370    }
2371
2372    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2373        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2374            debug!("handle_response: no intf found for index {if_index}");
2375            return;
2376        };
2377
2378        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2379            return;
2380        };
2381
2382        for answer in msg.answers().iter() {
2383            let mut new_records = Vec::new();
2384
2385            let name = answer.get_name();
2386            let Some(probe) = dns_registry.probing.get_mut(name) else {
2387                continue;
2388            };
2389
2390            // check against possible multicast forwarding
2391            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2392                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2393                    if answer_addr.interface_id.index != if_index {
2394                        debug!(
2395                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2396                            answer_addr, my_intf.name
2397                        );
2398                        continue;
2399                    }
2400                }
2401
2402                // double check if any other address record matches rrdata,
2403                // as there could be multiple addresses for the same name.
2404                let any_match = probe.records.iter().any(|r| {
2405                    r.get_type() == answer.get_type()
2406                        && r.get_class() == answer.get_class()
2407                        && r.rrdata_match(answer.as_ref())
2408                });
2409                if any_match {
2410                    continue; // no conflict for this answer.
2411                }
2412            }
2413
2414            probe.records.retain(|record| {
2415                if record.get_type() == answer.get_type()
2416                    && record.get_class() == answer.get_class()
2417                    && !record.rrdata_match(answer.as_ref())
2418                {
2419                    debug!(
2420                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2421                        record.get_type(),
2422                        record.rdata_print(),
2423                        answer.rdata_print()
2424                    );
2425
2426                    // create a new name for this record
2427                    // then remove the old record in probing.
2428                    let mut new_record = record.clone();
2429                    let new_name = match record.get_type() {
2430                        RRType::A => hostname_change(name),
2431                        RRType::AAAA => hostname_change(name),
2432                        _ => name_change(name),
2433                    };
2434                    new_record.get_record_mut().set_new_name(new_name);
2435                    new_records.push(new_record);
2436                    return false; // old record is dropped from the probe.
2437                }
2438
2439                true
2440            });
2441
2442            // ?????
2443            // if probe.records.is_empty() {
2444            //     dns_registry.probing.remove(name);
2445            // }
2446
2447            // Probing again with the new names.
2448            let create_time = current_time_millis() + fastrand::u64(0..250);
2449
2450            let waiting_services = probe.waiting_services.clone();
2451
2452            for record in new_records {
2453                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2454                    self.timers.push(Reverse(create_time));
2455                }
2456
2457                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2458                dns_registry.name_changes.insert(
2459                    record.get_record().get_original_name().to_string(),
2460                    record.get_name().to_string(),
2461                );
2462
2463                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2464                    Some(p) => p,
2465                    None => {
2466                        let new_probe = dns_registry
2467                            .probing
2468                            .entry(record.get_name().to_string())
2469                            .or_insert_with(|| {
2470                                debug!("conflict handler: new probe of {}", record.get_name());
2471                                Probe::new(create_time)
2472                            });
2473                        self.timers.push(Reverse(new_probe.next_send));
2474                        new_probe
2475                    }
2476                };
2477
2478                debug!(
2479                    "insert record with new name '{}' {} into probe",
2480                    record.get_name(),
2481                    record.get_type()
2482                );
2483                new_probe.insert_record(record);
2484
2485                new_probe.waiting_services.extend(waiting_services.clone());
2486            }
2487        }
2488    }
2489
2490    /// Resolve the updated (including new) instances.
2491    ///
2492    /// Note: it is possible that more than 1 PTR pointing to the same
2493    /// instance. For example, a regular service type PTR and a sub-type
2494    /// service type PTR can both point to the same service instance.
2495    /// This loop automatically handles the sub-type PTRs.
2496    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2497        let mut resolved: HashSet<String> = HashSet::new();
2498        let mut unresolved: HashSet<String> = HashSet::new();
2499        let mut removed_instances = HashMap::new();
2500
2501        let now = current_time_millis();
2502
2503        for (ty_domain, records) in self.cache.all_ptr().iter() {
2504            if !self.service_queriers.contains_key(ty_domain) {
2505                // No need to resolve if not in our queries.
2506                continue;
2507            }
2508
2509            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2510                if let Some(dns_ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2511                    if updated_instances.contains(dns_ptr.alias()) {
2512                        let mut instance_found = false;
2513                        let mut new_event = None;
2514
2515                        if let Ok(resolved) =
2516                            self.resolve_service_from_cache(ty_domain, dns_ptr.alias())
2517                        {
2518                            debug!("resolve_updated_instances: from cache: {}", dns_ptr.alias());
2519                            instance_found = true;
2520                            if resolved.is_valid() {
2521                                new_event = Some(ServiceEvent::ServiceResolved(Box::new(resolved)));
2522                            } else {
2523                                debug!("Resolved service is not valid: {}", dns_ptr.alias());
2524                            }
2525                        }
2526
2527                        if instance_found {
2528                            if let Some(event) = new_event {
2529                                debug!("call queriers to resolve {}", dns_ptr.alias());
2530                                resolved.insert(dns_ptr.alias().to_string());
2531                                call_service_listener(&self.service_queriers, ty_domain, event);
2532                            } else {
2533                                if self.resolved.remove(dns_ptr.alias()) {
2534                                    removed_instances
2535                                        .entry(ty_domain.to_string())
2536                                        .or_insert_with(HashSet::new)
2537                                        .insert(dns_ptr.alias().to_string());
2538                                }
2539                                unresolved.insert(dns_ptr.alias().to_string());
2540                            }
2541                        }
2542                    }
2543                }
2544            }
2545        }
2546
2547        for instance in resolved.drain() {
2548            self.pending_resolves.remove(&instance);
2549            self.resolved.insert(instance);
2550        }
2551
2552        for instance in unresolved.drain() {
2553            self.add_pending_resolve(instance);
2554        }
2555
2556        if !removed_instances.is_empty() {
2557            debug!(
2558                "resolve_updated_instances: removed {}",
2559                &removed_instances.len()
2560            );
2561            self.notify_service_removal(removed_instances);
2562        }
2563    }
2564
2565    /// Handle incoming query packets, figure out whether and what to respond.
2566    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2567        let sock = if is_ipv4 {
2568            &self.ipv4_sock
2569        } else {
2570            &self.ipv6_sock
2571        };
2572        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2573
2574        // Special meta-query "_services._dns-sd._udp.<Domain>".
2575        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2576        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2577
2578        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2579            debug!("missing dns registry for intf {}", if_index);
2580            return;
2581        };
2582
2583        let Some(intf) = self.my_intfs.get(&if_index) else {
2584            return;
2585        };
2586
2587        for question in msg.questions().iter() {
2588            let qtype = question.entry_type();
2589
2590            if qtype == RRType::PTR {
2591                for service in self.my_services.values() {
2592                    if service.get_status(if_index) != ServiceStatus::Announced {
2593                        continue;
2594                    }
2595
2596                    if question.entry_name() == service.get_type()
2597                        || service
2598                            .get_subtype()
2599                            .as_ref()
2600                            .is_some_and(|v| v == question.entry_name())
2601                    {
2602                        add_answer_with_additionals(
2603                            &mut out,
2604                            &msg,
2605                            service,
2606                            intf,
2607                            dns_registry,
2608                            is_ipv4,
2609                        );
2610                    } else if question.entry_name() == META_QUERY {
2611                        let ptr_added = out.add_answer(
2612                            &msg,
2613                            DnsPointer::new(
2614                                question.entry_name(),
2615                                RRType::PTR,
2616                                CLASS_IN,
2617                                service.get_other_ttl(),
2618                                service.get_type().to_string(),
2619                            ),
2620                        );
2621                        if !ptr_added {
2622                            trace!("answer was not added for meta-query {:?}", &question);
2623                        }
2624                    }
2625                }
2626            } else {
2627                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2628                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2629                    let probe_name = question.entry_name();
2630
2631                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2632                        let now = current_time_millis();
2633
2634                        // Only do tiebreaking if probe already started.
2635                        // This check also helps avoid redo tiebreaking if start time
2636                        // was postponed.
2637                        if probe.start_time < now {
2638                            let incoming_records: Vec<_> = msg
2639                                .authorities()
2640                                .iter()
2641                                .filter(|r| r.get_name() == probe_name)
2642                                .collect();
2643
2644                            probe.tiebreaking(&incoming_records, now, probe_name);
2645                        }
2646                    }
2647                }
2648
2649                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2650                    for service in self.my_services.values() {
2651                        if service.get_status(if_index) != ServiceStatus::Announced {
2652                            continue;
2653                        }
2654
2655                        let service_hostname =
2656                            match dns_registry.name_changes.get(service.get_hostname()) {
2657                                Some(new_name) => new_name,
2658                                None => service.get_hostname(),
2659                            };
2660
2661                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2662                            let intf_addrs = if is_ipv4 {
2663                                service.get_addrs_on_my_intf_v4(intf)
2664                            } else {
2665                                service.get_addrs_on_my_intf_v6(intf)
2666                            };
2667                            if intf_addrs.is_empty()
2668                                && (qtype == RRType::A || qtype == RRType::AAAA)
2669                            {
2670                                let t = match qtype {
2671                                    RRType::A => "TYPE_A",
2672                                    RRType::AAAA => "TYPE_AAAA",
2673                                    _ => "invalid_type",
2674                                };
2675                                trace!(
2676                                    "Cannot find valid addrs for {} response on intf {:?}",
2677                                    t,
2678                                    &intf
2679                                );
2680                                return;
2681                            }
2682                            for address in intf_addrs {
2683                                out.add_answer(
2684                                    &msg,
2685                                    DnsAddress::new(
2686                                        service_hostname,
2687                                        ip_address_rr_type(&address),
2688                                        CLASS_IN | CLASS_CACHE_FLUSH,
2689                                        service.get_host_ttl(),
2690                                        address,
2691                                        intf.into(),
2692                                    ),
2693                                );
2694                            }
2695                        }
2696                    }
2697                }
2698
2699                let query_name = question.entry_name().to_lowercase();
2700                let service_opt = self
2701                    .my_services
2702                    .iter()
2703                    .find(|(k, _v)| {
2704                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2705                            Some(new_name) => new_name,
2706                            None => k,
2707                        };
2708                        service_name == &query_name
2709                    })
2710                    .map(|(_, v)| v);
2711
2712                let Some(service) = service_opt else {
2713                    continue;
2714                };
2715
2716                if service.get_status(if_index) != ServiceStatus::Announced {
2717                    continue;
2718                }
2719
2720                let intf_addrs = if is_ipv4 {
2721                    service.get_addrs_on_my_intf_v4(intf)
2722                } else {
2723                    service.get_addrs_on_my_intf_v6(intf)
2724                };
2725                if intf_addrs.is_empty() {
2726                    debug!(
2727                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2728                        &intf
2729                    );
2730                    continue;
2731                }
2732
2733                add_answer_of_service(
2734                    &mut out,
2735                    &msg,
2736                    question.entry_name(),
2737                    service,
2738                    qtype,
2739                    intf_addrs,
2740                );
2741            }
2742        }
2743
2744        if !out.answers_count() > 0 {
2745            out.set_id(msg.id());
2746            send_dns_outgoing(&out, intf, &sock.pktinfo);
2747
2748            let if_name = intf.name.clone();
2749
2750            self.increase_counter(Counter::Respond, 1);
2751            self.notify_monitors(DaemonEvent::Respond(if_name));
2752        }
2753
2754        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2755    }
2756
2757    /// Increases the value of `counter` by `count`.
2758    fn increase_counter(&mut self, counter: Counter, count: i64) {
2759        let key = counter.to_string();
2760        match self.counters.get_mut(&key) {
2761            Some(v) => *v += count,
2762            None => {
2763                self.counters.insert(key, count);
2764            }
2765        }
2766    }
2767
2768    /// Sets the value of `counter` to `count`.
2769    fn set_counter(&mut self, counter: Counter, count: i64) {
2770        let key = counter.to_string();
2771        self.counters.insert(key, count);
2772    }
2773
2774    fn signal_sock_drain(&self) {
2775        let mut signal_buf = [0; 1024];
2776
2777        // This recv is non-blocking as the socket is non-blocking.
2778        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2779            trace!(
2780                "signal socket recvd: {}",
2781                String::from_utf8_lossy(&signal_buf[0..sz])
2782            );
2783        }
2784    }
2785
2786    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2787        self.retransmissions.push(ReRun { next_time, command });
2788        self.add_timer(next_time);
2789    }
2790
2791    /// Sends service removal event to listeners for expired service records.
2792    /// `expired`: map of service type domain to set of instance names.
2793    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2794        for (ty_domain, sender) in self.service_queriers.iter() {
2795            if let Some(instances) = expired.get(ty_domain) {
2796                for instance_name in instances {
2797                    let event = ServiceEvent::ServiceRemoved(
2798                        ty_domain.to_string(),
2799                        instance_name.to_string(),
2800                    );
2801                    match sender.send(event) {
2802                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2803                        Err(e) => debug!("Failed to send event: {}", e),
2804                    }
2805                }
2806            }
2807        }
2808    }
2809
2810    /// The entry point that executes all commands received by the daemon.
2811    ///
2812    /// `repeating`: whether this is a retransmission.
2813    fn exec_command(&mut self, command: Command, repeating: bool) {
2814        trace!("exec_command: {:?} repeating: {}", &command, repeating);
2815        match command {
2816            Command::Browse(ty, next_delay, listener) => {
2817                self.exec_command_browse(repeating, ty, next_delay, listener);
2818            }
2819
2820            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2821                self.exec_command_resolve_hostname(
2822                    repeating, hostname, next_delay, listener, timeout,
2823                );
2824            }
2825
2826            Command::Register(service_info) => {
2827                self.register_service(service_info);
2828                self.increase_counter(Counter::Register, 1);
2829            }
2830
2831            Command::RegisterResend(fullname, intf) => {
2832                trace!("register-resend service: {fullname} on {}", &intf);
2833                self.exec_command_register_resend(fullname, intf);
2834            }
2835
2836            Command::Unregister(fullname, resp_s) => {
2837                trace!("unregister service {} repeat {}", &fullname, &repeating);
2838                self.exec_command_unregister(repeating, fullname, resp_s);
2839            }
2840
2841            Command::UnregisterResend(packet, if_index, is_ipv4) => {
2842                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2843            }
2844
2845            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2846
2847            Command::StopResolveHostname(hostname) => {
2848                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2849            }
2850
2851            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2852
2853            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2854
2855            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2856                Ok(()) => trace!("Sent status to the client"),
2857                Err(e) => debug!("Failed to send status: {}", e),
2858            },
2859
2860            Command::Monitor(resp_s) => {
2861                self.monitors.push(resp_s);
2862            }
2863
2864            Command::SetOption(daemon_opt) => {
2865                self.process_set_option(daemon_opt);
2866            }
2867
2868            Command::GetOption(resp_s) => {
2869                let val = DaemonOptionVal {
2870                    _service_name_len_max: self.service_name_len_max,
2871                    ip_check_interval: self.ip_check_interval,
2872                };
2873                if let Err(e) = resp_s.send(val) {
2874                    debug!("Failed to send options: {}", e);
2875                }
2876            }
2877
2878            Command::Verify(instance_fullname, timeout) => {
2879                self.exec_command_verify(instance_fullname, timeout, repeating);
2880            }
2881
2882            _ => {
2883                debug!("unexpected command: {:?}", &command);
2884            }
2885        }
2886    }
2887
2888    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2889        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2890        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2891        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2892        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2893        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2894        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2895        self.set_counter(Counter::Timer, self.timers.len() as i64);
2896
2897        let dns_registry_probe_count: usize = self
2898            .dns_registry_map
2899            .values()
2900            .map(|r| r.probing.len())
2901            .sum();
2902        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2903
2904        let dns_registry_active_count: usize = self
2905            .dns_registry_map
2906            .values()
2907            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2908            .sum();
2909        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2910
2911        let dns_registry_timer_count: usize = self
2912            .dns_registry_map
2913            .values()
2914            .map(|r| r.new_timers.len())
2915            .sum();
2916        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2917
2918        let dns_registry_name_change_count: usize = self
2919            .dns_registry_map
2920            .values()
2921            .map(|r| r.name_changes.len())
2922            .sum();
2923        self.set_counter(
2924            Counter::DnsRegistryNameChange,
2925            dns_registry_name_change_count as i64,
2926        );
2927
2928        // Send the metrics to the client.
2929        if let Err(e) = resp_s.send(self.counters.clone()) {
2930            debug!("Failed to send metrics: {}", e);
2931        }
2932    }
2933
2934    fn exec_command_browse(
2935        &mut self,
2936        repeating: bool,
2937        ty: String,
2938        next_delay: u32,
2939        listener: Sender<ServiceEvent>,
2940    ) {
2941        let pretty_addrs: Vec<String> = self
2942            .my_intfs
2943            .iter()
2944            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
2945            .collect();
2946
2947        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2948            "{ty} on {} interfaces [{}]",
2949            pretty_addrs.len(),
2950            pretty_addrs.join(", ")
2951        ))) {
2952            debug!(
2953                "Failed to send SearchStarted({})(repeating:{}): {}",
2954                &ty, repeating, e
2955            );
2956            return;
2957        }
2958
2959        let now = current_time_millis();
2960        if !repeating {
2961            // Binds a `listener` to querying mDNS domain type `ty`.
2962            //
2963            // If there is already a `listener`, it will be updated, i.e. overwritten.
2964            self.service_queriers.insert(ty.clone(), listener.clone());
2965
2966            // if we already have the records in our cache, just send them
2967            self.query_cache_for_service(&ty, &listener, now);
2968        }
2969
2970        self.send_query(&ty, RRType::PTR);
2971        self.increase_counter(Counter::Browse, 1);
2972
2973        let next_time = now + (next_delay * 1000) as u64;
2974        let max_delay = 60 * 60;
2975        let delay = cmp::min(next_delay * 2, max_delay);
2976        self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2977    }
2978
2979    fn exec_command_resolve_hostname(
2980        &mut self,
2981        repeating: bool,
2982        hostname: String,
2983        next_delay: u32,
2984        listener: Sender<HostnameResolutionEvent>,
2985        timeout: Option<u64>,
2986    ) {
2987        let addr_list: Vec<_> = self.my_intfs.iter().collect();
2988        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2989            "{} on addrs {:?}",
2990            &hostname, &addr_list
2991        ))) {
2992            debug!(
2993                "Failed to send ResolveStarted({})(repeating:{}): {}",
2994                &hostname, repeating, e
2995            );
2996            return;
2997        }
2998        if !repeating {
2999            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3000            // if we already have the records in our cache, just send them
3001            self.query_cache_for_hostname(&hostname, listener.clone());
3002        }
3003
3004        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3005        self.increase_counter(Counter::ResolveHostname, 1);
3006
3007        let now = current_time_millis();
3008        let next_time = now + u64::from(next_delay) * 1000;
3009        let max_delay = 60 * 60;
3010        let delay = cmp::min(next_delay * 2, max_delay);
3011
3012        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3013        if self
3014            .hostname_resolvers
3015            .get(&hostname)
3016            .and_then(|(_sender, timeout)| *timeout)
3017            .map(|timeout| next_time < timeout)
3018            .unwrap_or(true)
3019        {
3020            self.add_retransmission(
3021                next_time,
3022                Command::ResolveHostname(hostname, delay, listener, None),
3023            );
3024        }
3025    }
3026
3027    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3028        let pending_query = self.query_unresolved(&instance);
3029        let max_try = 3;
3030        if pending_query && try_count < max_try {
3031            // Note that if the current try already succeeds, the next retransmission
3032            // will be no-op as the cache has been updated.
3033            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3034            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3035        }
3036    }
3037
3038    fn exec_command_unregister(
3039        &mut self,
3040        repeating: bool,
3041        fullname: String,
3042        resp_s: Sender<UnregisterStatus>,
3043    ) {
3044        let response = match self.my_services.remove_entry(&fullname) {
3045            None => {
3046                debug!("unregister: cannot find such service {}", &fullname);
3047                UnregisterStatus::NotFound
3048            }
3049            Some((_k, info)) => {
3050                let mut timers = Vec::new();
3051
3052                for (if_index, intf) in self.my_intfs.iter() {
3053                    let packet = self.unregister_service(&info, intf, &self.ipv4_sock.pktinfo);
3054                    // repeat for one time just in case some peers miss the message
3055                    if !repeating && !packet.is_empty() {
3056                        let next_time = current_time_millis() + 120;
3057                        self.retransmissions.push(ReRun {
3058                            next_time,
3059                            command: Command::UnregisterResend(packet, *if_index, true),
3060                        });
3061                        timers.push(next_time);
3062                    }
3063
3064                    // ipv6
3065
3066                    let packet = self.unregister_service(&info, intf, &self.ipv6_sock.pktinfo);
3067                    if !repeating && !packet.is_empty() {
3068                        let next_time = current_time_millis() + 120;
3069                        self.retransmissions.push(ReRun {
3070                            next_time,
3071                            command: Command::UnregisterResend(packet, *if_index, false),
3072                        });
3073                        timers.push(next_time);
3074                    }
3075                }
3076
3077                for t in timers {
3078                    self.add_timer(t);
3079                }
3080
3081                self.increase_counter(Counter::Unregister, 1);
3082                UnregisterStatus::OK
3083            }
3084        };
3085        if let Err(e) = resp_s.send(response) {
3086            debug!("unregister: failed to send response: {}", e);
3087        }
3088    }
3089
3090    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3091        let Some(intf) = self.my_intfs.get(&if_index) else {
3092            return;
3093        };
3094        let sock = if is_ipv4 {
3095            &self.ipv4_sock.pktinfo
3096        } else {
3097            &self.ipv6_sock.pktinfo
3098        };
3099
3100        let if_addr = if is_ipv4 {
3101            match intf.next_ifaddr_v4() {
3102                Some(addr) => addr,
3103                None => return,
3104            }
3105        } else {
3106            match intf.next_ifaddr_v6() {
3107                Some(addr) => addr,
3108                None => return,
3109            }
3110        };
3111
3112        debug!("UnregisterResend from {:?}", if_addr);
3113        multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, sock);
3114
3115        self.increase_counter(Counter::UnregisterResend, 1);
3116    }
3117
3118    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3119        match self.service_queriers.remove_entry(&ty_domain) {
3120            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3121            Some((ty, sender)) => {
3122                // Remove pending browse commands in the reruns.
3123                trace!("StopBrowse: removed queryer for {}", &ty);
3124                let mut i = 0;
3125                while i < self.retransmissions.len() {
3126                    if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
3127                        if t == &ty {
3128                            self.retransmissions.remove(i);
3129                            trace!("StopBrowse: removed retransmission for {}", &ty);
3130                            continue;
3131                        }
3132                    }
3133                    i += 1;
3134                }
3135
3136                // Remove cache entries.
3137                self.cache.remove_service_type(&ty_domain);
3138
3139                // Notify the client.
3140                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3141                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3142                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3143                }
3144            }
3145        }
3146    }
3147
3148    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3149        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3150            // Remove pending resolve commands in the reruns.
3151            trace!("StopResolve: removed queryer for {}", &host);
3152            let mut i = 0;
3153            while i < self.retransmissions.len() {
3154                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3155                    if t == &host {
3156                        self.retransmissions.remove(i);
3157                        trace!("StopResolve: removed retransmission for {}", &host);
3158                        continue;
3159                    }
3160                }
3161                i += 1;
3162            }
3163
3164            // Notify the client.
3165            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3166                Ok(()) => trace!("Sent SearchStopped to the listener"),
3167                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3168            }
3169        }
3170    }
3171
3172    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3173        let Some(info) = self.my_services.get_mut(&fullname) else {
3174            trace!("announce: cannot find such service {}", &fullname);
3175            return;
3176        };
3177
3178        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3179            return;
3180        };
3181
3182        let Some(intf) = self.my_intfs.get(&if_index) else {
3183            return;
3184        };
3185
3186        let announced_v4 =
3187            announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
3188        let announced_v6 =
3189            announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
3190
3191        if announced_v4 || announced_v6 {
3192            let mut hostname = info.get_hostname();
3193            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3194                hostname = new_name;
3195            }
3196            let service_name = match dns_registry.name_changes.get(&fullname) {
3197                Some(new_name) => new_name.to_string(),
3198                None => fullname,
3199            };
3200
3201            debug!("resend: announce service {service_name} on {}", intf.name);
3202
3203            notify_monitors(
3204                &mut self.monitors,
3205                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3206            );
3207            info.set_status(if_index, ServiceStatus::Announced);
3208        } else {
3209            debug!("register-resend should not fail");
3210        }
3211
3212        self.increase_counter(Counter::RegisterResend, 1);
3213    }
3214
3215    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3216        /*
3217        RFC 6762 section 10.4:
3218        ...
3219        When the cache receives this hint that it should reconfirm some
3220        record, it MUST issue two or more queries for the resource record in
3221        dispute.  If no response is received within ten seconds, then, even
3222        though its TTL may indicate that it is not yet due to expire, that
3223        record SHOULD be promptly flushed from the cache.
3224        */
3225        let now = current_time_millis();
3226        let expire_at = if repeating {
3227            None
3228        } else {
3229            Some(now + timeout.as_millis() as u64)
3230        };
3231
3232        // send query for the resource records.
3233        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3234
3235        if !record_vec.is_empty() {
3236            let query_vec: Vec<(&str, RRType)> = record_vec
3237                .iter()
3238                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3239                .collect();
3240            self.send_query_vec(&query_vec);
3241
3242            if let Some(new_expire) = expire_at {
3243                self.add_timer(new_expire); // ensure a check for the new expire time.
3244
3245                // schedule a resend 1 second later
3246                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3247            }
3248        }
3249    }
3250
3251    /// Refresh cached service records with active queriers
3252    fn refresh_active_services(&mut self) {
3253        let mut query_ptr_count = 0;
3254        let mut query_srv_count = 0;
3255        let mut new_timers = HashSet::new();
3256        let mut query_addr_count = 0;
3257
3258        for (ty_domain, _sender) in self.service_queriers.iter() {
3259            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3260            if !refreshed_timers.is_empty() {
3261                trace!("sending refresh query for PTR: {}", ty_domain);
3262                self.send_query(ty_domain, RRType::PTR);
3263                query_ptr_count += 1;
3264                new_timers.extend(refreshed_timers);
3265            }
3266
3267            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3268            for (instance, types) in instances {
3269                trace!("sending refresh query for: {}", &instance);
3270                let query_vec = types
3271                    .into_iter()
3272                    .map(|ty| (instance.as_str(), ty))
3273                    .collect::<Vec<_>>();
3274                self.send_query_vec(&query_vec);
3275                query_srv_count += 1;
3276            }
3277            new_timers.extend(timers);
3278            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3279            for hostname in hostnames.iter() {
3280                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3281                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3282                query_addr_count += 2;
3283            }
3284            new_timers.extend(timers);
3285        }
3286
3287        for timer in new_timers {
3288            self.add_timer(timer);
3289        }
3290
3291        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3292        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3293        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3294    }
3295}
3296
3297/// Adds one or more answers of a service for incoming msg and RR entry name.
3298fn add_answer_of_service(
3299    out: &mut DnsOutgoing,
3300    msg: &DnsIncoming,
3301    entry_name: &str,
3302    service: &ServiceInfo,
3303    qtype: RRType,
3304    intf_addrs: Vec<IpAddr>,
3305) {
3306    if qtype == RRType::SRV || qtype == RRType::ANY {
3307        out.add_answer(
3308            msg,
3309            DnsSrv::new(
3310                entry_name,
3311                CLASS_IN | CLASS_CACHE_FLUSH,
3312                service.get_host_ttl(),
3313                service.get_priority(),
3314                service.get_weight(),
3315                service.get_port(),
3316                service.get_hostname().to_string(),
3317            ),
3318        );
3319    }
3320
3321    if qtype == RRType::TXT || qtype == RRType::ANY {
3322        out.add_answer(
3323            msg,
3324            DnsTxt::new(
3325                entry_name,
3326                CLASS_IN | CLASS_CACHE_FLUSH,
3327                service.get_other_ttl(),
3328                service.generate_txt(),
3329            ),
3330        );
3331    }
3332
3333    if qtype == RRType::SRV {
3334        for address in intf_addrs {
3335            out.add_additional_answer(DnsAddress::new(
3336                service.get_hostname(),
3337                ip_address_rr_type(&address),
3338                CLASS_IN | CLASS_CACHE_FLUSH,
3339                service.get_host_ttl(),
3340                address,
3341                InterfaceId::default(),
3342            ));
3343        }
3344    }
3345}
3346
3347/// All possible events sent to the client from the daemon
3348/// regarding service discovery.
3349#[derive(Clone, Debug)]
3350#[non_exhaustive]
3351pub enum ServiceEvent {
3352    /// Started searching for a service type.
3353    SearchStarted(String),
3354
3355    /// Found a specific (service_type, fullname).
3356    ServiceFound(String, String),
3357
3358    /// Resolved a service instance in a ResolvedService struct.
3359    ServiceResolved(Box<ResolvedService>),
3360
3361    /// A service instance (service_type, fullname) was removed.
3362    ServiceRemoved(String, String),
3363
3364    /// Stopped searching for a service type.
3365    SearchStopped(String),
3366}
3367
3368/// All possible events sent to the client from the daemon
3369/// regarding host resolution.
3370#[derive(Clone, Debug)]
3371#[non_exhaustive]
3372pub enum HostnameResolutionEvent {
3373    /// Started searching for the ip address of a hostname.
3374    SearchStarted(String),
3375    /// One or more addresses for a hostname has been found.
3376    AddressesFound(String, HashSet<ScopedIp>),
3377    /// One or more addresses for a hostname has been removed.
3378    AddressesRemoved(String, HashSet<ScopedIp>),
3379    /// The search for the ip address of a hostname has timed out.
3380    SearchTimeout(String),
3381    /// Stopped searching for the ip address of a hostname.
3382    SearchStopped(String),
3383}
3384
3385/// Some notable events from the daemon besides [`ServiceEvent`].
3386/// These events are expected to happen infrequently.
3387#[derive(Clone, Debug)]
3388#[non_exhaustive]
3389pub enum DaemonEvent {
3390    /// Daemon unsolicitly announced a service from an interface.
3391    Announce(String, String),
3392
3393    /// Daemon encountered an error.
3394    Error(Error),
3395
3396    /// Daemon detected a new IP address from the host.
3397    IpAdd(IpAddr),
3398
3399    /// Daemon detected a IP address removed from the host.
3400    IpDel(IpAddr),
3401
3402    /// Daemon resolved a name conflict by changing one of its names.
3403    /// see [DnsNameChange] for more details.
3404    NameChange(DnsNameChange),
3405
3406    /// Send out a multicast response via an interface.
3407    Respond(String),
3408}
3409
3410/// Represents a name change due to a name conflict resolution.
3411/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3412#[derive(Clone, Debug)]
3413pub struct DnsNameChange {
3414    /// The original name set in `ServiceInfo` by the user.
3415    pub original: String,
3416
3417    /// A new name is created by appending a suffix after the original name.
3418    ///
3419    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3420    /// - for a host name, the suffix is `-N`, where N starts at 2.
3421    ///
3422    /// For example:
3423    ///
3424    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3425    /// - Host name `foo.local.` becomes `foo-2.local.`
3426    pub new_name: String,
3427
3428    /// The resource record type
3429    pub rr_type: RRType,
3430
3431    /// The interface where the name conflict and its change happened.
3432    pub intf_name: String,
3433}
3434
3435/// Commands supported by the daemon
3436#[derive(Debug)]
3437enum Command {
3438    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3439    Browse(String, u32, Sender<ServiceEvent>),
3440
3441    /// Resolve a hostname to IP addresses.
3442    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3443
3444    /// Register a service
3445    Register(ServiceInfo),
3446
3447    /// Unregister a service
3448    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3449
3450    /// Announce again a service to local network
3451    RegisterResend(String, u32), // (fullname)
3452
3453    /// Resend unregister packet.
3454    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
3455
3456    /// Stop browsing a service type
3457    StopBrowse(String), // (ty_domain)
3458
3459    /// Stop resolving a hostname
3460    StopResolveHostname(String), // (hostname)
3461
3462    /// Send query to resolve a service instance.
3463    /// This is used when a PTR record exists but SRV & TXT records are missing.
3464    Resolve(String, u16), // (service_instance_fullname, try_count)
3465
3466    /// Read the current values of the counters
3467    GetMetrics(Sender<Metrics>),
3468
3469    /// Get the current status of the daemon.
3470    GetStatus(Sender<DaemonStatus>),
3471
3472    /// Monitor noticeable events in the daemon.
3473    Monitor(Sender<DaemonEvent>),
3474
3475    SetOption(DaemonOption),
3476
3477    GetOption(Sender<DaemonOptionVal>),
3478
3479    /// Proactively confirm a DNS resource record.
3480    ///
3481    /// The intention is to check if a service name or IP address still valid
3482    /// before its TTL expires.
3483    Verify(String, Duration),
3484
3485    Exit(Sender<DaemonStatus>),
3486}
3487
3488impl fmt::Display for Command {
3489    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3490        match self {
3491            Self::Browse(_, _, _) => write!(f, "Command Browse"),
3492            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3493            Self::Exit(_) => write!(f, "Command Exit"),
3494            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3495            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3496            Self::Monitor(_) => write!(f, "Command Monitor"),
3497            Self::Register(_) => write!(f, "Command Register"),
3498            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3499            Self::SetOption(_) => write!(f, "Command SetOption"),
3500            Self::GetOption(_) => write!(f, "Command GetOption"),
3501            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3502            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3503            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3504            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3505            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3506            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3507        }
3508    }
3509}
3510
3511struct DaemonOptionVal {
3512    _service_name_len_max: u8,
3513    ip_check_interval: u64,
3514}
3515
3516#[derive(Debug)]
3517enum DaemonOption {
3518    ServiceNameLenMax(u8),
3519    IpCheckInterval(u64),
3520    EnableInterface(Vec<IfKind>),
3521    DisableInterface(Vec<IfKind>),
3522    MulticastLoopV4(bool),
3523    MulticastLoopV6(bool),
3524    #[cfg(test)]
3525    TestDownInterface(String),
3526    #[cfg(test)]
3527    TestUpInterface(String),
3528}
3529
3530/// The length of Service Domain name supported in this lib.
3531const DOMAIN_LEN: usize = "._tcp.local.".len();
3532
3533/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3534fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3535    if ty_domain.len() <= DOMAIN_LEN + 1 {
3536        // service name cannot be empty or only '_'.
3537        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3538    }
3539
3540    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3541    if service_name_len > limit as usize {
3542        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3543    }
3544    Ok(())
3545}
3546
3547/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3548fn check_domain_suffix(name: &str) -> Result<()> {
3549    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3550        return Err(e_fmt!(
3551            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3552            name
3553        ));
3554    }
3555
3556    Ok(())
3557}
3558
3559/// Validate the service name in a fully qualified name.
3560///
3561/// A Full Name = <Instance>.<Service>.<Domain>
3562/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3563///
3564/// Note: this function does not check for the length of the service name.
3565/// Instead, `register_service` method will check the length.
3566fn check_service_name(fullname: &str) -> Result<()> {
3567    check_domain_suffix(fullname)?;
3568
3569    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3570    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3571
3572    if &name[0..1] != "_" {
3573        return Err(e_fmt!("Service name must start with '_'"));
3574    }
3575
3576    let name = &name[1..];
3577
3578    if name.contains("--") {
3579        return Err(e_fmt!("Service name must not contain '--'"));
3580    }
3581
3582    if name.starts_with('-') || name.ends_with('-') {
3583        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3584    }
3585
3586    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3587    if ascii_count < 1 {
3588        return Err(e_fmt!(
3589            "Service name must contain at least one letter (eg: 'A-Za-z')"
3590        ));
3591    }
3592
3593    Ok(())
3594}
3595
3596/// Validate a hostname.
3597fn check_hostname(hostname: &str) -> Result<()> {
3598    if !hostname.ends_with(".local.") {
3599        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3600    }
3601
3602    if hostname == ".local." {
3603        return Err(e_fmt!(
3604            "The part of the hostname before '.local.' cannot be empty"
3605        ));
3606    }
3607
3608    if hostname.len() > 255 {
3609        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3610    }
3611
3612    Ok(())
3613}
3614
3615fn call_service_listener(
3616    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3617    ty_domain: &str,
3618    event: ServiceEvent,
3619) {
3620    if let Some(listener) = listeners_map.get(ty_domain) {
3621        match listener.send(event) {
3622            Ok(()) => trace!("Sent event to listener successfully"),
3623            Err(e) => debug!("Failed to send event: {}", e),
3624        }
3625    }
3626}
3627
3628fn call_hostname_resolution_listener(
3629    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3630    hostname: &str,
3631    event: HostnameResolutionEvent,
3632) {
3633    let hostname_lower = hostname.to_lowercase();
3634    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3635        match listener.send(event) {
3636            Ok(()) => trace!("Sent event to listener successfully"),
3637            Err(e) => debug!("Failed to send event: {}", e),
3638        }
3639    }
3640}
3641
3642/// Returns valid network interfaces in the host system.
3643/// Operational down interfaces are excluded.
3644/// Loopback interfaces are excluded if `with_loopback` is false.
3645fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3646    if_addrs::get_if_addrs()
3647        .unwrap_or_default()
3648        .into_iter()
3649        .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3650        .collect()
3651}
3652
3653fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3654    let if_name = &my_intf.name;
3655
3656    let if_addr = if sock.domain() == Domain::IPV4 {
3657        match my_intf.next_ifaddr_v4() {
3658            Some(addr) => addr,
3659            None => return vec![],
3660        }
3661    } else {
3662        match my_intf.next_ifaddr_v6() {
3663            Some(addr) => addr,
3664            None => return vec![],
3665        }
3666    };
3667
3668    send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3669}
3670
3671/// Send an outgoing mDNS query or response, and returns the packet bytes.
3672fn send_dns_outgoing_impl(
3673    out: &DnsOutgoing,
3674    if_name: &str,
3675    if_index: u32,
3676    if_addr: &IfAddr,
3677    sock: &PktInfoUdpSocket,
3678) -> Vec<Vec<u8>> {
3679    let qtype = if out.is_query() {
3680        "query"
3681    } else {
3682        if out.answers_count() == 0 && out.additionals().is_empty() {
3683            return vec![]; // no need to send empty response
3684        }
3685        "response"
3686    };
3687    trace!(
3688        "send {}: {} questions {} answers {} authorities {} additional",
3689        qtype,
3690        out.questions().len(),
3691        out.answers_count(),
3692        out.authorities().len(),
3693        out.additionals().len()
3694    );
3695
3696    match if_addr.ip() {
3697        IpAddr::V4(ipv4) => {
3698            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3699                debug!(
3700                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3701                    ipv4, e
3702                );
3703                return vec![]; // cannot send without a valid interface
3704            }
3705        }
3706        IpAddr::V6(ipv6) => {
3707            if let Err(e) = sock.set_multicast_if_v6(if_index) {
3708                debug!(
3709                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3710                    ipv6, e
3711                );
3712                return vec![]; // cannot send without a valid interface
3713            }
3714        }
3715    }
3716
3717    let packet_list = out.to_data_on_wire();
3718    for packet in packet_list.iter() {
3719        multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3720    }
3721    packet_list
3722}
3723
3724/// Sends a multicast packet, and returns the packet bytes.
3725fn multicast_on_intf(
3726    packet: &[u8],
3727    if_name: &str,
3728    if_index: u32,
3729    if_addr: &IfAddr,
3730    socket: &PktInfoUdpSocket,
3731) {
3732    if packet.len() > MAX_MSG_ABSOLUTE {
3733        debug!("Drop over-sized packet ({})", packet.len());
3734        return;
3735    }
3736
3737    let addr: SocketAddr = match if_addr {
3738        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3739        if_addrs::IfAddr::V6(_) => {
3740            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3741            sock.set_scope_id(if_index); // Choose iface for multicast
3742            sock.into()
3743        }
3744    };
3745
3746    // Sends out `packet` to `addr` on the socket.
3747    let sock_addr = addr.into();
3748    match socket.send_to(packet, &sock_addr) {
3749        Ok(sz) => trace!(
3750            "sent out {} bytes on interface {} (idx {}) addr {}",
3751            sz,
3752            if_name,
3753            if_index,
3754            if_addr.ip()
3755        ),
3756        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3757    }
3758}
3759
3760/// Returns true if `name` is a valid instance name of format:
3761/// <instance>.<service_type>.<_udp|_tcp>.local.
3762/// Note: <instance> could contain '.' as well.
3763fn valid_instance_name(name: &str) -> bool {
3764    name.split('.').count() >= 5
3765}
3766
3767fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3768    monitors.retain(|sender| {
3769        if let Err(e) = sender.try_send(event.clone()) {
3770            debug!("notify_monitors: try_send: {}", &e);
3771            if matches!(e, TrySendError::Disconnected(_)) {
3772                return false; // This monitor is dropped.
3773            }
3774        }
3775        true
3776    });
3777}
3778
3779/// Check if all unique records passed "probing", and if yes, create a packet
3780/// to announce the service.
3781fn prepare_announce(
3782    info: &ServiceInfo,
3783    intf: &MyIntf,
3784    dns_registry: &mut DnsRegistry,
3785    is_ipv4: bool,
3786) -> Option<DnsOutgoing> {
3787    let intf_addrs = if is_ipv4 {
3788        info.get_addrs_on_my_intf_v4(intf)
3789    } else {
3790        info.get_addrs_on_my_intf_v6(intf)
3791    };
3792
3793    if intf_addrs.is_empty() {
3794        debug!(
3795            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3796            &intf.name
3797        );
3798        return None;
3799    }
3800
3801    // check if we changed our name due to conflicts.
3802    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3803        Some(new_name) => new_name,
3804        None => info.get_fullname(),
3805    };
3806
3807    debug!(
3808        "prepare to announce service {service_fullname} on {:?}",
3809        &intf_addrs
3810    );
3811
3812    let mut probing_count = 0;
3813    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3814    let create_time = current_time_millis() + fastrand::u64(0..250);
3815
3816    out.add_answer_at_time(
3817        DnsPointer::new(
3818            info.get_type(),
3819            RRType::PTR,
3820            CLASS_IN,
3821            info.get_other_ttl(),
3822            service_fullname.to_string(),
3823        ),
3824        0,
3825    );
3826
3827    if let Some(sub) = info.get_subtype() {
3828        trace!("Adding subdomain {}", sub);
3829        out.add_answer_at_time(
3830            DnsPointer::new(
3831                sub,
3832                RRType::PTR,
3833                CLASS_IN,
3834                info.get_other_ttl(),
3835                service_fullname.to_string(),
3836            ),
3837            0,
3838        );
3839    }
3840
3841    // SRV records.
3842    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3843        Some(new_name) => new_name.to_string(),
3844        None => info.get_hostname().to_string(),
3845    };
3846
3847    let mut srv = DnsSrv::new(
3848        info.get_fullname(),
3849        CLASS_IN | CLASS_CACHE_FLUSH,
3850        info.get_host_ttl(),
3851        info.get_priority(),
3852        info.get_weight(),
3853        info.get_port(),
3854        hostname,
3855    );
3856
3857    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3858        srv.get_record_mut().set_new_name(new_name.to_string());
3859    }
3860
3861    if !info.requires_probe()
3862        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3863    {
3864        out.add_answer_at_time(srv, 0);
3865    } else {
3866        probing_count += 1;
3867    }
3868
3869    // TXT records.
3870
3871    let mut txt = DnsTxt::new(
3872        info.get_fullname(),
3873        CLASS_IN | CLASS_CACHE_FLUSH,
3874        info.get_other_ttl(),
3875        info.generate_txt(),
3876    );
3877
3878    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3879        txt.get_record_mut().set_new_name(new_name.to_string());
3880    }
3881
3882    if !info.requires_probe()
3883        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3884    {
3885        out.add_answer_at_time(txt, 0);
3886    } else {
3887        probing_count += 1;
3888    }
3889
3890    // Address records. (A and AAAA)
3891
3892    let hostname = info.get_hostname();
3893    for address in intf_addrs {
3894        let mut dns_addr = DnsAddress::new(
3895            hostname,
3896            ip_address_rr_type(&address),
3897            CLASS_IN | CLASS_CACHE_FLUSH,
3898            info.get_host_ttl(),
3899            address,
3900            intf.into(),
3901        );
3902
3903        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3904            dns_addr.get_record_mut().set_new_name(new_name.to_string());
3905        }
3906
3907        if !info.requires_probe()
3908            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3909        {
3910            out.add_answer_at_time(dns_addr, 0);
3911        } else {
3912            probing_count += 1;
3913        }
3914    }
3915
3916    if probing_count > 0 {
3917        return None;
3918    }
3919
3920    Some(out)
3921}
3922
3923/// Send an unsolicited response for owned service via `intf` and `sock`.
3924/// Returns true if sent out successfully for IPv4 or IPv6.
3925fn announce_service_on_intf(
3926    dns_registry: &mut DnsRegistry,
3927    info: &ServiceInfo,
3928    intf: &MyIntf,
3929    sock: &PktInfoUdpSocket,
3930) -> bool {
3931    let is_ipv4 = sock.domain() == Domain::IPV4;
3932    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
3933        send_dns_outgoing(&out, intf, sock);
3934        return true;
3935    }
3936
3937    false
3938}
3939
3940/// Returns a new name based on the `original` to avoid conflicts.
3941/// If the name already contains a number in parentheses, increments that number.
3942///
3943/// Examples:
3944/// - `foo.local.` becomes `foo (2).local.`
3945/// - `foo (2).local.` becomes `foo (3).local.`
3946/// - `foo (9)` becomes `foo (10)`
3947fn name_change(original: &str) -> String {
3948    let mut parts: Vec<_> = original.split('.').collect();
3949    let Some(first_part) = parts.get_mut(0) else {
3950        return format!("{original} (2)");
3951    };
3952
3953    let mut new_name = format!("{first_part} (2)");
3954
3955    // check if there is already has `(<num>)` suffix.
3956    if let Some(paren_pos) = first_part.rfind(" (") {
3957        // Check if there's a closing parenthesis
3958        if let Some(end_paren) = first_part[paren_pos..].find(')') {
3959            let absolute_end_pos = paren_pos + end_paren;
3960            // Only process if the closing parenthesis is the last character
3961            if absolute_end_pos == first_part.len() - 1 {
3962                let num_start = paren_pos + 2; // Skip " ("
3963                                               // Try to parse the number between parentheses
3964                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3965                    let base_name = &first_part[..paren_pos];
3966                    new_name = format!("{} ({})", base_name, number + 1)
3967                }
3968            }
3969        }
3970    }
3971
3972    *first_part = &new_name;
3973    parts.join(".")
3974}
3975
3976/// Returns a new name based on the `original` to avoid conflicts.
3977/// If the name already contains a hyphenated number, increments that number.
3978///
3979/// Examples:
3980/// - `foo.local.` becomes `foo-2.local.`
3981/// - `foo-2.local.` becomes `foo-3.local.`
3982/// - `foo` becomes `foo-2`
3983fn hostname_change(original: &str) -> String {
3984    let mut parts: Vec<_> = original.split('.').collect();
3985    let Some(first_part) = parts.get_mut(0) else {
3986        return format!("{original}-2");
3987    };
3988
3989    let mut new_name = format!("{first_part}-2");
3990
3991    // check if there is already a `-<num>` suffix
3992    if let Some(hyphen_pos) = first_part.rfind('-') {
3993        // Try to parse everything after the hyphen as a number
3994        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3995            let base_name = &first_part[..hyphen_pos];
3996            new_name = format!("{}-{}", base_name, number + 1);
3997        }
3998    }
3999
4000    *first_part = &new_name;
4001    parts.join(".")
4002}
4003
4004fn add_answer_with_additionals(
4005    out: &mut DnsOutgoing,
4006    msg: &DnsIncoming,
4007    service: &ServiceInfo,
4008    intf: &MyIntf,
4009    dns_registry: &DnsRegistry,
4010    is_ipv4: bool,
4011) {
4012    let intf_addrs = if is_ipv4 {
4013        service.get_addrs_on_my_intf_v4(intf)
4014    } else {
4015        service.get_addrs_on_my_intf_v6(intf)
4016    };
4017    if intf_addrs.is_empty() {
4018        trace!("No addrs on LAN of intf {:?}", intf);
4019        return;
4020    }
4021
4022    // check if we changed our name due to conflicts.
4023    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4024        Some(new_name) => new_name,
4025        None => service.get_fullname(),
4026    };
4027
4028    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4029        Some(new_name) => new_name,
4030        None => service.get_hostname(),
4031    };
4032
4033    let ptr_added = out.add_answer(
4034        msg,
4035        DnsPointer::new(
4036            service.get_type(),
4037            RRType::PTR,
4038            CLASS_IN,
4039            service.get_other_ttl(),
4040            service_fullname.to_string(),
4041        ),
4042    );
4043
4044    if !ptr_added {
4045        trace!("answer was not added for msg {:?}", msg);
4046        return;
4047    }
4048
4049    if let Some(sub) = service.get_subtype() {
4050        trace!("Adding subdomain {}", sub);
4051        out.add_additional_answer(DnsPointer::new(
4052            sub,
4053            RRType::PTR,
4054            CLASS_IN,
4055            service.get_other_ttl(),
4056            service_fullname.to_string(),
4057        ));
4058    }
4059
4060    // Add recommended additional answers according to
4061    // https://tools.ietf.org/html/rfc6763#section-12.1.
4062    out.add_additional_answer(DnsSrv::new(
4063        service_fullname,
4064        CLASS_IN | CLASS_CACHE_FLUSH,
4065        service.get_host_ttl(),
4066        service.get_priority(),
4067        service.get_weight(),
4068        service.get_port(),
4069        hostname.to_string(),
4070    ));
4071
4072    out.add_additional_answer(DnsTxt::new(
4073        service_fullname,
4074        CLASS_IN | CLASS_CACHE_FLUSH,
4075        service.get_other_ttl(),
4076        service.generate_txt(),
4077    ));
4078
4079    for address in intf_addrs {
4080        out.add_additional_answer(DnsAddress::new(
4081            hostname,
4082            ip_address_rr_type(&address),
4083            CLASS_IN | CLASS_CACHE_FLUSH,
4084            service.get_host_ttl(),
4085            address,
4086            intf.into(),
4087        ));
4088    }
4089}
4090
4091/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4092/// that are finished.
4093fn check_probing(
4094    dns_registry: &mut DnsRegistry,
4095    timers: &mut BinaryHeap<Reverse<u64>>,
4096    now: u64,
4097) -> (DnsOutgoing, Vec<String>) {
4098    let mut expired_probes = Vec::new();
4099    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4100
4101    for (name, probe) in dns_registry.probing.iter_mut() {
4102        if now >= probe.next_send {
4103            if probe.expired(now) {
4104                // move the record to active
4105                expired_probes.push(name.clone());
4106            } else {
4107                out.add_question(name, RRType::ANY);
4108
4109                /*
4110                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4111                ...
4112                for tiebreaking to work correctly in all
4113                cases, the Authority Section must contain *all* the records and
4114                proposed rdata being probed for uniqueness.
4115                    */
4116                for record in probe.records.iter() {
4117                    out.add_authority(record.clone());
4118                }
4119
4120                probe.update_next_send(now);
4121
4122                // add timer
4123                timers.push(Reverse(probe.next_send));
4124            }
4125        }
4126    }
4127
4128    (out, expired_probes)
4129}
4130
4131/// Process expired probes on an interface and return a list of services
4132/// that are waiting for the probe to finish.
4133///
4134/// `DnsNameChange` events are sent to the monitors.
4135fn handle_expired_probes(
4136    expired_probes: Vec<String>,
4137    intf_name: &str,
4138    dns_registry: &mut DnsRegistry,
4139    monitors: &mut Vec<Sender<DaemonEvent>>,
4140) -> HashSet<String> {
4141    let mut waiting_services = HashSet::new();
4142
4143    for name in expired_probes {
4144        let Some(probe) = dns_registry.probing.remove(&name) else {
4145            continue;
4146        };
4147
4148        // send notifications about name changes
4149        for record in probe.records.iter() {
4150            if let Some(new_name) = record.get_record().get_new_name() {
4151                dns_registry
4152                    .name_changes
4153                    .insert(name.clone(), new_name.to_string());
4154
4155                let event = DnsNameChange {
4156                    original: record.get_record().get_original_name().to_string(),
4157                    new_name: new_name.to_string(),
4158                    rr_type: record.get_type(),
4159                    intf_name: intf_name.to_string(),
4160                };
4161                debug!("Name change event: {:?}", &event);
4162                notify_monitors(monitors, DaemonEvent::NameChange(event));
4163            }
4164        }
4165
4166        // move RR from probe to active.
4167        debug!(
4168            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4169            probe.records.len(),
4170            probe.waiting_services.len(),
4171        );
4172
4173        // Move records to active and plan to wake up services if records are not empty.
4174        if !probe.records.is_empty() {
4175            match dns_registry.active.get_mut(&name) {
4176                Some(records) => {
4177                    records.extend(probe.records);
4178                }
4179                None => {
4180                    dns_registry.active.insert(name, probe.records);
4181                }
4182            }
4183
4184            waiting_services.extend(probe.waiting_services);
4185        }
4186    }
4187
4188    waiting_services
4189}
4190
4191#[cfg(test)]
4192mod tests {
4193    use super::{
4194        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4195        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4196        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
4197        MDNS_PORT,
4198    };
4199    use crate::{
4200        dns_parser::{
4201            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4202            FLAGS_AA, FLAGS_QR_RESPONSE,
4203        },
4204        service_daemon::{add_answer_of_service, check_hostname},
4205    };
4206    use std::{
4207        net::{SocketAddr, SocketAddrV4},
4208        time::{Duration, SystemTime},
4209    };
4210    use test_log::test;
4211
4212    #[test]
4213    fn test_socketaddr_print() {
4214        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
4215        let print = format!("{}", addr);
4216        assert_eq!(print, "224.0.0.251:5353");
4217    }
4218
4219    #[test]
4220    fn test_instance_name() {
4221        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4222        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4223        assert!(!valid_instance_name("_printer._tcp.local."));
4224    }
4225
4226    #[test]
4227    fn test_check_service_name_length() {
4228        let result = check_service_name_length("_tcp", 100);
4229        assert!(result.is_err());
4230        if let Err(e) = result {
4231            println!("{}", e);
4232        }
4233    }
4234
4235    #[test]
4236    fn test_check_hostname() {
4237        // valid hostnames
4238        for hostname in &[
4239            "my_host.local.",
4240            &("A".repeat(255 - ".local.".len()) + ".local."),
4241        ] {
4242            let result = check_hostname(hostname);
4243            assert!(result.is_ok());
4244        }
4245
4246        // erroneous hostnames
4247        for hostname in &[
4248            "my_host.local",
4249            ".local.",
4250            &("A".repeat(256 - ".local.".len()) + ".local."),
4251        ] {
4252            let result = check_hostname(hostname);
4253            assert!(result.is_err());
4254            if let Err(e) = result {
4255                println!("{}", e);
4256            }
4257        }
4258    }
4259
4260    #[test]
4261    fn test_check_domain_suffix() {
4262        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4263        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4264        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4265        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4266        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4267        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4268    }
4269
4270    #[test]
4271    fn test_service_with_temporarily_invalidated_ptr() {
4272        // Create a daemon
4273        let d = ServiceDaemon::new().expect("Failed to create daemon");
4274
4275        let service = "_test_inval_ptr._udp.local.";
4276        let host_name = "my_host_tmp_invalidated_ptr.local.";
4277        let intfs: Vec<_> = my_ip_interfaces(false);
4278        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4279        let port = 5201;
4280        let my_service =
4281            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4282                .expect("invalid service info")
4283                .enable_addr_auto();
4284        let result = d.register(my_service.clone());
4285        assert!(result.is_ok());
4286
4287        // Browse for a service
4288        let browse_chan = d.browse(service).unwrap();
4289        let timeout = Duration::from_secs(2);
4290        let mut resolved = false;
4291
4292        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4293            match event {
4294                ServiceEvent::ServiceResolved(info) => {
4295                    resolved = true;
4296                    println!("Resolved a service of {}", &info.fullname);
4297                    break;
4298                }
4299                e => {
4300                    println!("Received event {:?}", e);
4301                }
4302            }
4303        }
4304
4305        assert!(resolved);
4306
4307        println!("Stopping browse of {}", service);
4308        // Pause browsing so restarting will cause a new immediate query.
4309        // Unregistering will not work here, it will invalidate all the records.
4310        d.stop_browse(service).unwrap();
4311
4312        // Ensure the search is stopped.
4313        // Reduces the chance of receiving an answer adding the ptr back to the
4314        // cache causing the later browse to return directly from the cache.
4315        // (which invalidates what this test is trying to test for.)
4316        let mut stopped = false;
4317        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4318            match event {
4319                ServiceEvent::SearchStopped(_) => {
4320                    stopped = true;
4321                    println!("Stopped browsing service");
4322                    break;
4323                }
4324                // Other `ServiceResolved` messages may be received
4325                // here as they come from different interfaces.
4326                // That's fine for this test.
4327                e => {
4328                    println!("Received event {:?}", e);
4329                }
4330            }
4331        }
4332
4333        assert!(stopped);
4334
4335        // Invalidate the ptr from the service to the host.
4336        let invalidate_ptr_packet = DnsPointer::new(
4337            my_service.get_type(),
4338            RRType::PTR,
4339            CLASS_IN,
4340            0,
4341            my_service.get_fullname().to_string(),
4342        );
4343
4344        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4345        packet_buffer.add_additional_answer(invalidate_ptr_packet);
4346
4347        for intf in intfs {
4348            let sock = _new_socket_bind(&intf, true).unwrap();
4349            send_dns_outgoing_impl(
4350                &packet_buffer,
4351                &intf.name,
4352                intf.index.unwrap_or(0),
4353                &intf.addr,
4354                &sock.pktinfo,
4355            );
4356        }
4357
4358        println!(
4359            "Sent PTR record invalidation. Starting second browse for {}",
4360            service
4361        );
4362
4363        // Restart the browse to force the sender to re-send the announcements.
4364        let browse_chan = d.browse(service).unwrap();
4365
4366        resolved = false;
4367        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4368            match event {
4369                ServiceEvent::ServiceResolved(info) => {
4370                    resolved = true;
4371                    println!("Resolved a service of {}", &info.fullname);
4372                    break;
4373                }
4374                e => {
4375                    println!("Received event {:?}", e);
4376                }
4377            }
4378        }
4379
4380        assert!(resolved);
4381        d.shutdown().unwrap();
4382    }
4383
4384    #[test]
4385    fn test_expired_srv() {
4386        // construct service info
4387        let service_type = "_expired-srv._udp.local.";
4388        let instance = "test_instance";
4389        let host_name = "expired_srv_host.local.";
4390        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4391            .unwrap()
4392            .enable_addr_auto();
4393        // let fullname = my_service.get_fullname().to_string();
4394
4395        // set SRV to expire soon.
4396        let new_ttl = 3; // for testing only.
4397        my_service._set_host_ttl(new_ttl);
4398
4399        // register my service
4400        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4401        let result = mdns_server.register(my_service);
4402        assert!(result.is_ok());
4403
4404        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4405        let browse_chan = mdns_client.browse(service_type).unwrap();
4406        let timeout = Duration::from_secs(2);
4407        let mut resolved = false;
4408
4409        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4410            match event {
4411                ServiceEvent::ServiceResolved(info) => {
4412                    resolved = true;
4413                    println!("Resolved a service of {}", &info.fullname);
4414                    break;
4415                }
4416                _ => {}
4417            }
4418        }
4419
4420        assert!(resolved);
4421
4422        // Exit the server so that no more responses.
4423        mdns_server.shutdown().unwrap();
4424
4425        // SRV record in the client cache will expire.
4426        let expire_timeout = Duration::from_secs(new_ttl as u64);
4427        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4428            match event {
4429                ServiceEvent::ServiceRemoved(service_type, full_name) => {
4430                    println!("Service removed: {}: {}", &service_type, &full_name);
4431                    break;
4432                }
4433                _ => {}
4434            }
4435        }
4436    }
4437
4438    #[test]
4439    fn test_hostname_resolution_address_removed() {
4440        // Create a mDNS server
4441        let server = ServiceDaemon::new().expect("Failed to create server");
4442        let hostname = "addr_remove_host._tcp.local.";
4443        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4444            .iter()
4445            .find(|iface| iface.ip().is_ipv4())
4446            .map(|iface| iface.ip().into())
4447            .unwrap();
4448
4449        let mut my_service = ServiceInfo::new(
4450            "_host_res_test._tcp.local.",
4451            "my_instance",
4452            hostname,
4453            &service_ip_addr.to_ip_addr(),
4454            1234,
4455            None,
4456        )
4457        .expect("invalid service info");
4458
4459        // Set a short TTL for addresses for testing.
4460        let addr_ttl = 2;
4461        my_service._set_host_ttl(addr_ttl); // Expire soon
4462
4463        server.register(my_service).unwrap();
4464
4465        // Create a mDNS client for resolving the hostname.
4466        let client = ServiceDaemon::new().expect("Failed to create client");
4467        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4468        let resolved = loop {
4469            match event_receiver.recv() {
4470                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4471                    assert!(found_hostname == hostname);
4472                    assert!(addresses.contains(&service_ip_addr));
4473                    println!("address found: {:?}", &addresses);
4474                    break true;
4475                }
4476                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4477                Ok(_event) => {}
4478                Err(_) => break false,
4479            }
4480        };
4481
4482        assert!(resolved);
4483
4484        // Shutdown the server so no more responses / refreshes for addresses.
4485        server.shutdown().unwrap();
4486
4487        // Wait till hostname address record expires, with 1 second grace period.
4488        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4489        let removed = loop {
4490            match event_receiver.recv_timeout(timeout) {
4491                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4492                    assert!(removed_host == hostname);
4493                    assert!(addresses.contains(&service_ip_addr));
4494
4495                    println!(
4496                        "address removed: hostname: {} addresses: {:?}",
4497                        &hostname, &addresses
4498                    );
4499                    break true;
4500                }
4501                Ok(_event) => {}
4502                Err(_) => {
4503                    break false;
4504                }
4505            }
4506        };
4507
4508        assert!(removed);
4509
4510        client.shutdown().unwrap();
4511    }
4512
4513    #[test]
4514    fn test_refresh_ptr() {
4515        // construct service info
4516        let service_type = "_refresh-ptr._udp.local.";
4517        let instance = "test_instance";
4518        let host_name = "refresh_ptr_host.local.";
4519        let service_ip_addr = my_ip_interfaces(false)
4520            .iter()
4521            .find(|iface| iface.ip().is_ipv4())
4522            .map(|iface| iface.ip())
4523            .unwrap();
4524
4525        let mut my_service = ServiceInfo::new(
4526            service_type,
4527            instance,
4528            host_name,
4529            &service_ip_addr,
4530            5023,
4531            None,
4532        )
4533        .unwrap();
4534
4535        let new_ttl = 3; // for testing only.
4536        my_service._set_other_ttl(new_ttl);
4537
4538        // register my service
4539        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4540        let result = mdns_server.register(my_service);
4541        assert!(result.is_ok());
4542
4543        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4544        let browse_chan = mdns_client.browse(service_type).unwrap();
4545        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4546        let mut resolved = false;
4547
4548        // resolve the service first.
4549        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4550            match event {
4551                ServiceEvent::ServiceResolved(info) => {
4552                    resolved = true;
4553                    println!("Resolved a service of {}", &info.fullname);
4554                    break;
4555                }
4556                _ => {}
4557            }
4558        }
4559
4560        assert!(resolved);
4561
4562        // wait over 80% of TTL, and refresh PTR should be sent out.
4563        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4564        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4565            println!("event: {:?}", &event);
4566        }
4567
4568        // verify refresh counter.
4569        let metrics_chan = mdns_client.get_metrics().unwrap();
4570        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4571        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4572        assert_eq!(ptr_refresh_counter, 1);
4573        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4574        assert_eq!(srvtxt_refresh_counter, 1);
4575
4576        // Exit the server so that no more responses.
4577        mdns_server.shutdown().unwrap();
4578        mdns_client.shutdown().unwrap();
4579    }
4580
4581    #[test]
4582    fn test_name_change() {
4583        assert_eq!(name_change("foo.local."), "foo (2).local.");
4584        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4585        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4586        assert_eq!(name_change("foo"), "foo (2)");
4587        assert_eq!(name_change("foo (2)"), "foo (3)");
4588        assert_eq!(name_change(""), " (2)");
4589
4590        // Additional edge cases
4591        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4592        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4593        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4594    }
4595
4596    #[test]
4597    fn test_hostname_change() {
4598        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4599        assert_eq!(hostname_change("foo"), "foo-2");
4600        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4601        assert_eq!(hostname_change("foo-9"), "foo-10");
4602        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4603    }
4604
4605    #[test]
4606    fn test_add_answer_txt_ttl() {
4607        // construct a simple service info
4608        let service_type = "_test_add_answer._udp.local.";
4609        let instance = "test_instance";
4610        let host_name = "add_answer_host.local.";
4611        let service_intf = my_ip_interfaces(false)
4612            .into_iter()
4613            .find(|iface| iface.ip().is_ipv4())
4614            .unwrap();
4615        let service_ip_addr = service_intf.ip();
4616        let my_service = ServiceInfo::new(
4617            service_type,
4618            instance,
4619            host_name,
4620            &service_ip_addr,
4621            5023,
4622            None,
4623        )
4624        .unwrap();
4625
4626        // construct a DnsOutgoing message
4627        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4628
4629        // Construct a dummy DnsIncoming message
4630        let mut dummy_data = out.to_data_on_wire();
4631        let interface_id = InterfaceId::from(&service_intf);
4632        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4633
4634        // Add an answer of TXT type for the service.
4635        let if_addrs = vec![service_intf.ip()];
4636        add_answer_of_service(
4637            &mut out,
4638            &incoming,
4639            instance,
4640            &my_service,
4641            RRType::TXT,
4642            if_addrs,
4643        );
4644
4645        // Check if the answer was added correctly
4646        assert!(
4647            out.answers_count() > 0,
4648            "No answers added to the outgoing message"
4649        );
4650
4651        // Check if the first answer is of type TXT
4652        let answer = out._answers().first().unwrap();
4653        assert_eq!(answer.0.get_type(), RRType::TXT);
4654
4655        // Check TTL is set properly for the TXT record
4656        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4657    }
4658
4659    #[test]
4660    fn test_interface_flip() {
4661        // start a server
4662        let ty_domain = "_intf-flip._udp.local.";
4663        let host_name = "intf_flip.local.";
4664        let now = SystemTime::now()
4665            .duration_since(SystemTime::UNIX_EPOCH)
4666            .unwrap();
4667        let instance_name = now.as_micros().to_string(); // Create a unique name.
4668        let port = 5200;
4669
4670        // Get a single IPv4 address
4671        let (ip_addr1, intf_name) = my_ip_interfaces(false)
4672            .iter()
4673            .find(|iface| iface.ip().is_ipv4())
4674            .map(|iface| (iface.ip(), iface.name.clone()))
4675            .unwrap();
4676
4677        println!("Using interface {} with IP {}", intf_name, ip_addr1);
4678
4679        // Register the service.
4680        let service1 =
4681            ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
4682                .expect("valid service info");
4683        let server1 = ServiceDaemon::new().expect("failed to start server");
4684        server1
4685            .register(service1)
4686            .expect("Failed to register service1");
4687
4688        // wait for the service announced.
4689        std::thread::sleep(Duration::from_secs(2));
4690
4691        // start a client
4692        let client = ServiceDaemon::new().expect("failed to start client");
4693
4694        let receiver = client.browse(ty_domain).unwrap();
4695
4696        let timeout = Duration::from_secs(3);
4697        let mut got_data = false;
4698
4699        while let Ok(event) = receiver.recv_timeout(timeout) {
4700            match event {
4701                ServiceEvent::ServiceResolved(_) => {
4702                    println!("Received ServiceResolved event");
4703                    got_data = true;
4704                    break;
4705                }
4706                _ => {}
4707            }
4708        }
4709
4710        assert!(got_data, "Should receive ServiceResolved event");
4711
4712        // Set a short IP check interval to detect interface changes quickly.
4713        client.set_ip_check_interval(1).unwrap();
4714
4715        // Now shutdown the interface and expect the client to lose the service.
4716        println!("Shutting down interface {}", &intf_name);
4717        client.test_down_interface(&intf_name).unwrap();
4718
4719        let mut got_removed = false;
4720
4721        while let Ok(event) = receiver.recv_timeout(timeout) {
4722            match event {
4723                ServiceEvent::ServiceRemoved(ty_domain, instance) => {
4724                    got_removed = true;
4725                    println!("removed: {ty_domain} : {instance}");
4726                    break;
4727                }
4728                _ => {}
4729            }
4730        }
4731        assert!(got_removed, "Should receive ServiceRemoved event");
4732
4733        println!("Bringing up interface {}", &intf_name);
4734        client.test_up_interface(&intf_name).unwrap();
4735        let mut got_data = false;
4736        while let Ok(event) = receiver.recv_timeout(timeout) {
4737            match event {
4738                ServiceEvent::ServiceResolved(resolved) => {
4739                    got_data = true;
4740                    println!("Received ServiceResolved: {:?}", resolved);
4741                    break;
4742                }
4743                _ => {}
4744            }
4745        }
4746        assert!(
4747            got_data,
4748            "Should receive ServiceResolved event after interface is back up"
4749        );
4750
4751        server1.shutdown().unwrap();
4752        client.shutdown().unwrap();
4753    }
4754}