Skip to main content

mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42    Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50    cmp::{self, Reverse},
51    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52    fmt, io,
53    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54    str, thread,
55    time::Duration,
56    vec,
57};
58
59/// The default max length of the service name without domain, not including the
60/// leading underscore (`_`). It is set to 15 per
61/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
62pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64/// The default interval for checking IP changes automatically.
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
68/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
69pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78/// Response status code for the service `unregister` call.
79#[derive(Debug)]
80pub enum UnregisterStatus {
81    /// Unregister was successful.
82    OK,
83    /// The service was not found in the registration.
84    NotFound,
85}
86
87/// Status code for the service daemon.
88#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91    /// The daemon is running as normal.
92    Running,
93
94    /// The daemon has been shutdown.
95    Shutdown,
96}
97
98/// Different counters included in the metrics.
99/// Currently all counters are for outgoing packets.
100#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102    Register,
103    RegisterResend,
104    Unregister,
105    UnregisterResend,
106    Browse,
107    ResolveHostname,
108    Respond,
109    CacheRefreshPTR,
110    CacheRefreshSrvTxt,
111    CacheRefreshAddr,
112    KnownAnswerSuppression,
113    CachedPTR,
114    CachedSRV,
115    CachedAddr,
116    CachedTxt,
117    CachedNSec,
118    CachedSubtype,
119    DnsRegistryProbe,
120    DnsRegistryActive,
121    DnsRegistryTimer,
122    DnsRegistryNameChange,
123    Timer,
124}
125
126impl fmt::Display for Counter {
127    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128        match self {
129            Self::Register => write!(f, "register"),
130            Self::RegisterResend => write!(f, "register-resend"),
131            Self::Unregister => write!(f, "unregister"),
132            Self::UnregisterResend => write!(f, "unregister-resend"),
133            Self::Browse => write!(f, "browse"),
134            Self::ResolveHostname => write!(f, "resolve-hostname"),
135            Self::Respond => write!(f, "respond"),
136            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140            Self::CachedPTR => write!(f, "cached-ptr"),
141            Self::CachedSRV => write!(f, "cached-srv"),
142            Self::CachedAddr => write!(f, "cached-addr"),
143            Self::CachedTxt => write!(f, "cached-txt"),
144            Self::CachedNSec => write!(f, "cached-nsec"),
145            Self::CachedSubtype => write!(f, "cached-subtype"),
146            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150            Self::Timer => write!(f, "timer"),
151        }
152    }
153}
154
155/// A wrapper around UDP socket used by the mDNS daemon.
156///
157/// We do this because `mio` does not support PKTINFO and
158/// does not provide a way to implement `Source` trait directly and safely.
159struct MyUdpSocket {
160    /// The underlying socket that supports control messages like
161    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
162    pktinfo: PktInfoUdpSocket,
163
164    /// The mio UDP socket that is a clone of `pktinfo` and
165    /// is used for event polling.
166    mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171        let std_sock = pktinfo.try_clone_std()?;
172        let mio = MioUdpSocket::from_std(std_sock);
173
174        Ok(Self { pktinfo, mio })
175    }
176}
177
178/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
179impl Source for MyUdpSocket {
180    fn register(
181        &mut self,
182        registry: &Registry,
183        token: Token,
184        interests: Interest,
185    ) -> io::Result<()> {
186        self.mio.register(registry, token, interests)
187    }
188
189    fn reregister(
190        &mut self,
191        registry: &Registry,
192        token: Token,
193        interests: Interest,
194    ) -> io::Result<()> {
195        self.mio.reregister(registry, token, interests)
196    }
197
198    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199        self.mio.deregister(registry)
200    }
201}
202
203/// The metrics is a HashMap of (name_key, i64_value).
204/// The main purpose is to help monitoring the mDNS packet traffic.
205pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
208const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
209const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
210
211/// A daemon thread for mDNS
212///
213/// This struct provides a handle and an API to the daemon. It is cloneable.
214#[derive(Clone)]
215pub struct ServiceDaemon {
216    /// Sender handle of the channel to the daemon.
217    sender: Sender<Command>,
218
219    /// Send to this addr to signal that a `Command` is coming.
220    ///
221    /// The daemon listens on this addr together with other mDNS sockets,
222    /// to avoid busy polling the flume channel. If there is a way to poll
223    /// the channel and mDNS sockets together, then this can be removed.
224    signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228    /// Creates a new daemon and spawns a thread to run the daemon.
229    ///
230    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
231    /// ask callers to set the port.
232    pub fn new() -> Result<Self> {
233        // Use port 0 to allow the system assign a random available port,
234        // no need for a pre-defined port number.
235        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237        let signal_sock = UdpSocket::bind(signal_addr)
238            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240        // Get the socket with the OS chosen port
241        let signal_addr = signal_sock
242            .local_addr()
243            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245        // Must be nonblocking so we can listen to it together with mDNS sockets.
246        signal_sock
247            .set_nonblocking(true)
248            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252        let (sender, receiver) = bounded(100);
253
254        // Spawn the daemon thread
255        let mio_sock = MioUdpSocket::from_std(signal_sock);
256        thread::Builder::new()
257            .name("mDNS_daemon".to_string())
258            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261        Ok(Self {
262            sender,
263            signal_addr,
264        })
265    }
266
267    /// Sends `cmd` to the daemon via its channel, and sends a signal
268    /// to its sock addr to notify.
269    fn send_cmd(&self, cmd: Command) -> Result<()> {
270        let cmd_name = cmd.to_string();
271
272        // First, send to the flume channel.
273        self.sender.try_send(cmd).map_err(|e| match e {
274            TrySendError::Full(_) => Error::Again,
275            e => e_fmt!("flume::channel::send failed: {}", e),
276        })?;
277
278        // Second, send a signal to notify the daemon.
279        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280        let socket = UdpSocket::bind(addr)
281            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282        socket
283            .send_to(cmd_name.as_bytes(), self.signal_addr)
284            .map_err(|e| {
285                e_fmt!(
286                    "signal socket send_to {} ({}) failed: {}",
287                    self.signal_addr,
288                    cmd_name,
289                    e
290                )
291            })?;
292
293        Ok(())
294    }
295
296    /// Starts browsing for a specific service type.
297    ///
298    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
299    ///
300    /// Returns a channel `Receiver` to receive events about the service. The caller
301    /// can call `.recv_async().await` on this receiver to handle events in an
302    /// async environment or call `.recv()` in a sync environment.
303    ///
304    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
305    /// finding more details, i.e. SRV records and TXT records.
306    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307        check_domain_suffix(service_type)?;
308
309        let (resp_s, resp_r) = bounded(10);
310        self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
311        Ok(resp_r)
312    }
313
314    /// Preforms a "cache-only" browse.
315    ///
316    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
317    ///
318    /// The functionality is identical to 'browse', but the service events are based solely on the contents
319    /// of the daemon's cache. No actual mDNS query is sent to the network.
320    ///
321    /// See [accept_unsolicited](Self::accept_unsolicited) if you want to do cache-only browsing.
322    pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
323        check_domain_suffix(service_type)?;
324
325        let (resp_s, resp_r) = bounded(10);
326        self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
327        Ok(resp_r)
328    }
329
330    /// Stops searching for a specific service type.
331    ///
332    /// When an error is returned, the caller should retry only when
333    /// the error is `Error::Again`, otherwise should log and move on.
334    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
335        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
336    }
337
338    /// Starts querying for the ip addresses of a hostname.
339    ///
340    /// Returns a channel `Receiver` to receive events about the hostname.
341    /// The caller can call `.recv_async().await` on this receiver to handle events in an
342    /// async environment or call `.recv()` in a sync environment.
343    ///
344    /// The `timeout` is specified in milliseconds.
345    pub fn resolve_hostname(
346        &self,
347        hostname: &str,
348        timeout: Option<u64>,
349    ) -> Result<Receiver<HostnameResolutionEvent>> {
350        check_hostname(hostname)?;
351        let (resp_s, resp_r) = bounded(10);
352        self.send_cmd(Command::ResolveHostname(
353            hostname.to_string(),
354            1,
355            resp_s,
356            timeout,
357        ))?;
358        Ok(resp_r)
359    }
360
361    /// Stops querying for the ip addresses of a hostname.
362    ///
363    /// When an error is returned, the caller should retry only when
364    /// the error is `Error::Again`, otherwise should log and move on.
365    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
366        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
367    }
368
369    /// Registers a service provided by this host.
370    ///
371    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
372    /// this method will automatically fill in addresses from the host.
373    ///
374    /// To re-announce a service with an updated `service_info`, just call
375    /// this `register` function again. No need to call `unregister` first.
376    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
377        check_service_name(service_info.get_fullname())?;
378        check_hostname(service_info.get_hostname())?;
379
380        self.send_cmd(Command::Register(service_info))
381    }
382
383    /// Unregisters a service. This is a graceful shutdown of a service.
384    ///
385    /// Returns a channel receiver that is used to receive the status code
386    /// of the unregister.
387    ///
388    /// When an error is returned, the caller should retry only when
389    /// the error is `Error::Again`, otherwise should log and move on.
390    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
391        let (resp_s, resp_r) = bounded(1);
392        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
393        Ok(resp_r)
394    }
395
396    /// Starts to monitor events from the daemon.
397    ///
398    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
399    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
400        let (resp_s, resp_r) = bounded(100);
401        self.send_cmd(Command::Monitor(resp_s))?;
402        Ok(resp_r)
403    }
404
405    /// Shuts down the daemon thread and returns a channel to receive the status.
406    ///
407    /// When an error is returned, the caller should retry only when
408    /// the error is `Error::Again`, otherwise should log and move on.
409    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
410        let (resp_s, resp_r) = bounded(1);
411        self.send_cmd(Command::Exit(resp_s))?;
412        Ok(resp_r)
413    }
414
415    /// Returns the status of the daemon.
416    ///
417    /// When an error is returned, the caller should retry only when
418    /// the error is `Error::Again`, otherwise should consider the daemon
419    /// stopped working and move on.
420    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
421        let (resp_s, resp_r) = bounded(1);
422
423        if self.sender.is_disconnected() {
424            resp_s
425                .send(DaemonStatus::Shutdown)
426                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
427        } else {
428            self.send_cmd(Command::GetStatus(resp_s))?;
429        }
430
431        Ok(resp_r)
432    }
433
434    /// Returns a channel receiver for the metrics, e.g. input/output counters.
435    ///
436    /// The metrics returned is a snapshot. Hence the caller should call
437    /// this method repeatedly if they want to monitor the metrics continuously.
438    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
439        let (resp_s, resp_r) = bounded(1);
440        self.send_cmd(Command::GetMetrics(resp_s))?;
441        Ok(resp_r)
442    }
443
444    /// Change the max length allowed for a service name.
445    ///
446    /// As RFC 6763 defines a length max for a service name, a user should not call
447    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
448    ///
449    /// `len_max` is capped at an internal limit, which is currently 30.
450    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
451        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
452
453        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
454            return Err(Error::Msg(format!(
455                "service name length max {len_max} is too large"
456            )));
457        }
458
459        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
460    }
461
462    /// Change the interval for checking IP changes automatically.
463    ///
464    /// Setting the interval to 0 disables the IP check.
465    ///
466    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
467    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
468        let interval_in_millis = interval_in_secs as u64 * 1000;
469        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
470            interval_in_millis,
471        )))
472    }
473
474    /// Get the current interval in seconds for checking IP changes automatically.
475    pub fn get_ip_check_interval(&self) -> Result<u32> {
476        let (resp_s, resp_r) = bounded(1);
477        self.send_cmd(Command::GetOption(resp_s))?;
478
479        let option = resp_r
480            .recv_timeout(Duration::from_secs(10))
481            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
482        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
483        Ok(ip_check_interval_in_secs as u32)
484    }
485
486    /// Include interfaces that match `if_kind` for this service daemon.
487    ///
488    /// For example:
489    /// ```ignore
490    ///     daemon.enable_interface("en0")?;
491    /// ```
492    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
493        let if_kind_vec = if_kind.into_vec();
494        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
495            if_kind_vec.kinds,
496        )))
497    }
498
499    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
500    ///
501    /// For example:
502    /// ```ignore
503    ///     daemon.disable_interface(IfKind::IPv6)?;
504    /// ```
505    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
506        let if_kind_vec = if_kind.into_vec();
507        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
508            if_kind_vec.kinds,
509        )))
510    }
511
512    /// If `accept` is true, accept and cache all responses, even if there is no active querier
513    /// for a given service type. This is useful / necessary when doing cache-only browsing. See
514    /// [browse_cache](Self::browse_cache).
515    ///
516    /// If `accept` is false (default), accept only responses matching queries that we have initiated.
517    ///
518    /// For example:
519    /// ```ignore
520    ///     daemon.accept_unsolicited(true)?;
521    /// ```
522    pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
523        self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
524    }
525
526    #[cfg(test)]
527    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
528        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
529            ifname.to_string(),
530        )))
531    }
532
533    #[cfg(test)]
534    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
535        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
536            ifname.to_string(),
537        )))
538    }
539
540    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
541    ///
542    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
543    /// receive announcements from a responder on the same host.
544    ///
545    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
546    ///
547    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
548    /// the UNIX version of the IP_MULTICAST_LOOP option:
549    ///
550    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
551    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
552    ///
553    /// Which means, in order NOT to receive localhost announcements, you want to call
554    /// this API on the querier side on Windows, but on the responder side on Unix.
555    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
556        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
557    }
558
559    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
560    ///
561    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
562    /// receive announcements from a responder on the same host.
563    ///
564    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
565    ///
566    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
567    /// the UNIX version of the IP_MULTICAST_LOOP option:
568    ///
569    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
570    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
571    ///
572    /// Which means, in order NOT to receive localhost announcements, you want to call
573    /// this API on the querier side on Windows, but on the responder side on Unix.
574    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
575        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
576    }
577
578    /// Proactively confirms whether a service instance still valid.
579    ///
580    /// This call will issue queries for a service instance's SRV record and Address records.
581    ///
582    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
583    /// unless there is a reason not to follow RFC.
584    ///
585    /// If no response is received within `timeout`, the current resource
586    /// records will be flushed, and if needed, `ServiceRemoved` event will be
587    /// sent to active queriers.
588    ///
589    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
590    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
591        self.send_cmd(Command::Verify(instance_fullname, timeout))
592    }
593
594    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
595        let mut zc = Zeroconf::new(signal_sock, poller);
596
597        if let Some(cmd) = zc.run(receiver) {
598            match cmd {
599                Command::Exit(resp_s) => {
600                    // It is guaranteed that the receiver already dropped,
601                    // i.e. the daemon command channel closed.
602                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
603                        debug!("exit: failed to send response of shutdown: {}", e);
604                    }
605                }
606                _ => {
607                    debug!("Unexpected command: {:?}", cmd);
608                }
609            }
610        }
611    }
612}
613
614/// Creates a new UDP socket that uses `intf` to send and recv multicast.
615fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
616    // Use the same socket for receiving and sending multicast packets.
617    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
618    let intf_ip = &intf.ip();
619    match intf_ip {
620        IpAddr::V4(ip) => {
621            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
622            let sock = new_socket(addr.into(), true)?;
623
624            // Join mDNS group to receive packets.
625            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
626                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
627
628            // Set IP_MULTICAST_IF to send packets.
629            sock.set_multicast_if_v4(ip)
630                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
631
632            // Per RFC 6762 section 11:
633            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
634            // be sent with IP TTL set to 255."
635            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
636            sock.set_multicast_ttl_v4(255)
637                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
638
639            if !should_loop {
640                sock.set_multicast_loop_v4(false)
641                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
642            }
643
644            // Test if we can send packets successfully.
645            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
646            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
647            for packet in test_packets {
648                sock.send_to(&packet, &multicast_addr)
649                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
650            }
651            MyUdpSocket::new(sock)
652                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
653        }
654        IpAddr::V6(ip) => {
655            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
656            let sock = new_socket(addr.into(), true)?;
657
658            let if_index = intf.index.unwrap_or(0);
659
660            // Join mDNS group to receive packets.
661            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
662                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
663
664            // Set IPV6_MULTICAST_IF to send packets.
665            sock.set_multicast_if_v6(if_index)
666                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
667
668            // We are not sending multicast packets to test this socket as there might
669            // be many IPv6 interfaces on a host and could cause such send error:
670            // "No buffer space available (os error 55)".
671
672            MyUdpSocket::new(sock)
673                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
674        }
675    }
676}
677
678/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
679/// `non_block` indicates whether to set O_NONBLOCK for the socket.
680fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
681    let domain = match addr {
682        SocketAddr::V4(_) => socket2::Domain::IPV4,
683        SocketAddr::V6(_) => socket2::Domain::IPV6,
684    };
685
686    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
687
688    fd.set_reuse_address(true)
689        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
690    #[cfg(unix)] // this is currently restricted to Unix's in socket2
691    fd.set_reuse_port(true)
692        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
693
694    if non_block {
695        fd.set_nonblocking(true)
696            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
697    }
698
699    fd.bind(&addr.into())
700        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
701
702    trace!("new socket bind to {}", &addr);
703    Ok(fd)
704}
705
706/// Specify a UNIX timestamp in millis to run `command` for the next time.
707struct ReRun {
708    /// UNIX timestamp in millis.
709    next_time: u64,
710    command: Command,
711}
712
713/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
714///
715/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
716#[derive(Debug, Clone)]
717#[non_exhaustive]
718pub enum IfKind {
719    /// All interfaces.
720    All,
721
722    /// All IPv4 interfaces.
723    IPv4,
724
725    /// All IPv6 interfaces.
726    IPv6,
727
728    /// By the interface name, for example "en0"
729    Name(String),
730
731    /// By an IPv4 or IPv6 address.
732    Addr(IpAddr),
733
734    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
735    ///
736    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
737    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
738    LoopbackV4,
739
740    /// ::1/128, disabled by default.
741    LoopbackV6,
742}
743
744impl IfKind {
745    /// Checks if `intf` matches with this interface kind.
746    fn matches(&self, intf: &Interface) -> bool {
747        match self {
748            Self::All => true,
749            Self::IPv4 => intf.ip().is_ipv4(),
750            Self::IPv6 => intf.ip().is_ipv6(),
751            Self::Name(ifname) => ifname == &intf.name,
752            Self::Addr(addr) => addr == &intf.ip(),
753            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
754            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
755        }
756    }
757}
758
759/// The first use case of specifying an interface was to
760/// use an interface name. Hence adding this for ergonomic reasons.
761impl From<&str> for IfKind {
762    fn from(val: &str) -> Self {
763        Self::Name(val.to_string())
764    }
765}
766
767impl From<&String> for IfKind {
768    fn from(val: &String) -> Self {
769        Self::Name(val.to_string())
770    }
771}
772
773/// Still for ergonomic reasons.
774impl From<IpAddr> for IfKind {
775    fn from(val: IpAddr) -> Self {
776        Self::Addr(val)
777    }
778}
779
780/// A list of `IfKind` that can be used to match interfaces.
781pub struct IfKindVec {
782    kinds: Vec<IfKind>,
783}
784
785/// A trait that converts a type into a Vec of `IfKind`.
786pub trait IntoIfKindVec {
787    fn into_vec(self) -> IfKindVec;
788}
789
790impl<T: Into<IfKind>> IntoIfKindVec for T {
791    fn into_vec(self) -> IfKindVec {
792        let if_kind: IfKind = self.into();
793        IfKindVec {
794            kinds: vec![if_kind],
795        }
796    }
797}
798
799impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
800    fn into_vec(self) -> IfKindVec {
801        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
802        IfKindVec { kinds }
803    }
804}
805
806/// Selection of interfaces.
807struct IfSelection {
808    /// The interfaces to be selected.
809    if_kind: IfKind,
810
811    /// Whether the `if_kind` should be enabled or not.
812    selected: bool,
813}
814
815/// A struct holding the state. It was inspired by `zeroconf` package in Python.
816struct Zeroconf {
817    /// Local interfaces keyed by interface index.
818    my_intfs: HashMap<u32, MyIntf>,
819
820    /// A common socket for IPv4 interfaces.
821    ipv4_sock: MyUdpSocket,
822
823    /// A common socket for IPv6 interfaces.
824    ipv6_sock: MyUdpSocket,
825
826    /// Local registered services, keyed by service full names.
827    my_services: HashMap<String, ServiceInfo>,
828
829    /// Received DNS records.
830    cache: DnsCache,
831
832    /// Registered service records, keyed by interface index.
833    dns_registry_map: HashMap<u32, DnsRegistry>,
834
835    /// Active "Browse" commands.
836    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
837
838    /// Active "ResolveHostname" commands.
839    ///
840    /// The timestamps are set at the future timestamp when the command should timeout.
841    /// `hostname` is case-insensitive and stored in lowercase.
842    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
843
844    /// All repeating transmissions.
845    retransmissions: Vec<ReRun>,
846
847    counters: Metrics,
848
849    /// Waits for incoming packets.
850    poller: Poll,
851
852    /// Channels to notify events.
853    monitors: Vec<Sender<DaemonEvent>>,
854
855    /// Options
856    service_name_len_max: u8,
857
858    /// Interval in millis to check IP address changes.
859    ip_check_interval: u64,
860
861    /// All interface selections called to the daemon.
862    if_selections: Vec<IfSelection>,
863
864    /// Socket for signaling.
865    signal_sock: MioUdpSocket,
866
867    /// Timestamps marking where we need another iteration of the run loop,
868    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
869    ///
870    /// When the run loop goes through a single iteration, it will
871    /// set its timeout to the earliest timer in this list.
872    timers: BinaryHeap<Reverse<u64>>,
873
874    status: DaemonStatus,
875
876    /// Service instances that are pending for resolving SRV and TXT.
877    pending_resolves: HashSet<String>,
878
879    /// Service instances that are already resolved.
880    resolved: HashSet<String>,
881
882    multicast_loop_v4: bool,
883
884    multicast_loop_v6: bool,
885
886    accept_unsolicited: bool,
887
888    #[cfg(test)]
889    test_down_interfaces: HashSet<String>,
890}
891
892/// Join the multicast group for the given interface.
893fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
894    let intf_ip = &intf.ip();
895    match intf_ip {
896        IpAddr::V4(ip) => {
897            // Join mDNS group to receive packets.
898            debug!("join multicast group V4 on addr {}", ip);
899            my_sock
900                .join_multicast_v4(&GROUP_ADDR_V4, ip)
901                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
902        }
903        IpAddr::V6(ip) => {
904            let if_index = intf.index.unwrap_or(0);
905            // Join mDNS group to receive packets.
906            debug!(
907                "join multicast group V6 on addr {} with index {}",
908                ip, if_index
909            );
910            my_sock
911                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
912                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
913        }
914    }
915    Ok(())
916}
917
918impl Zeroconf {
919    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
920        // Get interfaces.
921        let my_ifaddrs = my_ip_interfaces(false);
922
923        // Create a socket for every IP addr.
924        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
925        // or the same interface name with different IP addrs.
926        let mut my_intfs = HashMap::new();
927        let mut dns_registry_map = HashMap::new();
928
929        // Use the same socket for receiving and sending multicast packets.
930        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
931        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
932        let sock = new_socket(addr.into(), true).unwrap();
933
934        // Per RFC 6762 section 11:
935        // "All Multicast DNS responses (including responses sent via unicast) SHOULD
936        // be sent with IP TTL set to 255."
937        // Here we set the TTL to 255 for multicast as we don't support unicast yet.
938        sock.set_multicast_ttl_v4(255)
939            .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
940            .unwrap();
941
942        // This clones a socket.
943        let ipv4_sock = MyUdpSocket::new(sock).unwrap();
944
945        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
946        let sock = new_socket(addr.into(), true).unwrap();
947
948        // Per RFC 6762 section 11:
949        // "All Multicast DNS responses (including responses sent via unicast) SHOULD
950        // be sent with IP TTL set to 255."
951        sock.set_multicast_hops_v6(255)
952            .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
953            .unwrap();
954
955        // This clones the ipv6 socket.
956        let ipv6_sock = MyUdpSocket::new(sock).unwrap();
957
958        // Configure sockets to join multicast groups.
959        for intf in my_ifaddrs {
960            let sock = if intf.ip().is_ipv4() {
961                &ipv4_sock
962            } else {
963                &ipv6_sock
964            };
965
966            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
967                debug!(
968                    "config socket to join multicast: {}: {e}. Skipped.",
969                    &intf.ip()
970                );
971            }
972
973            let if_index = intf.index.unwrap_or(0);
974
975            // Add this interface address if not already present.
976            dns_registry_map
977                .entry(if_index)
978                .or_insert_with(DnsRegistry::new);
979
980            my_intfs
981                .entry(if_index)
982                .and_modify(|v: &mut MyIntf| {
983                    v.addrs.insert(intf.addr.clone());
984                })
985                .or_insert(MyIntf {
986                    name: intf.name.clone(),
987                    index: if_index,
988                    addrs: HashSet::from([intf.addr]),
989                });
990        }
991
992        let monitors = Vec::new();
993        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
994        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
995
996        let timers = BinaryHeap::new();
997
998        // Disable loopback by default.
999        let if_selections = vec![
1000            IfSelection {
1001                if_kind: IfKind::LoopbackV4,
1002                selected: false,
1003            },
1004            IfSelection {
1005                if_kind: IfKind::LoopbackV6,
1006                selected: false,
1007            },
1008        ];
1009
1010        let status = DaemonStatus::Running;
1011
1012        Self {
1013            my_intfs,
1014            ipv4_sock,
1015            ipv6_sock,
1016            // poll_ids: HashMap::new(),
1017            my_services: HashMap::new(),
1018            cache: DnsCache::new(),
1019            dns_registry_map,
1020            hostname_resolvers: HashMap::new(),
1021            service_queriers: HashMap::new(),
1022            retransmissions: Vec::new(),
1023            counters: HashMap::new(),
1024            poller,
1025            monitors,
1026            service_name_len_max,
1027            ip_check_interval,
1028            if_selections,
1029            signal_sock,
1030            timers,
1031            status,
1032            pending_resolves: HashSet::new(),
1033            resolved: HashSet::new(),
1034            multicast_loop_v4: true,
1035            multicast_loop_v6: true,
1036            accept_unsolicited: false,
1037
1038            #[cfg(test)]
1039            test_down_interfaces: HashSet::new(),
1040        }
1041    }
1042
1043    /// The main event loop of the daemon thread
1044    ///
1045    /// In each round, it will:
1046    /// 1. select the listening sockets with a timeout.
1047    /// 2. process the incoming packets if any.
1048    /// 3. try_recv on its channel and execute commands.
1049    /// 4. announce its registered services.
1050    /// 5. process retransmissions if any.
1051    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1052        // Add the daemon's signal socket to the poller.
1053        if let Err(e) = self.poller.registry().register(
1054            &mut self.signal_sock,
1055            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1056            mio::Interest::READABLE,
1057        ) {
1058            debug!("failed to add signal socket to the poller: {}", e);
1059            return None;
1060        }
1061
1062        if let Err(e) = self.poller.registry().register(
1063            &mut self.ipv4_sock,
1064            mio::Token(IPV4_SOCK_EVENT_KEY),
1065            mio::Interest::READABLE,
1066        ) {
1067            debug!("failed to register ipv4 socket: {}", e);
1068            return None;
1069        }
1070
1071        if let Err(e) = self.poller.registry().register(
1072            &mut self.ipv6_sock,
1073            mio::Token(IPV6_SOCK_EVENT_KEY),
1074            mio::Interest::READABLE,
1075        ) {
1076            debug!("failed to register ipv6 socket: {}", e);
1077            return None;
1078        }
1079
1080        // Setup timer for IP checks.
1081        let mut next_ip_check = if self.ip_check_interval > 0 {
1082            current_time_millis() + self.ip_check_interval
1083        } else {
1084            0
1085        };
1086
1087        if next_ip_check > 0 {
1088            self.add_timer(next_ip_check);
1089        }
1090
1091        // Start the run loop.
1092
1093        let mut events = mio::Events::with_capacity(1024);
1094        loop {
1095            let now = current_time_millis();
1096
1097            let earliest_timer = self.peek_earliest_timer();
1098            let timeout = earliest_timer.map(|timer| {
1099                // If `timer` already passed, set `timeout` to be 1ms.
1100                let millis = if timer > now { timer - now } else { 1 };
1101                Duration::from_millis(millis)
1102            });
1103
1104            // Process incoming packets, command events and optional timeout.
1105            events.clear();
1106            match self.poller.poll(&mut events, timeout) {
1107                Ok(_) => self.handle_poller_events(&events),
1108                Err(e) => debug!("failed to select from sockets: {}", e),
1109            }
1110
1111            let now = current_time_millis();
1112
1113            // Remove the timers if already passed.
1114            self.pop_timers_till(now);
1115
1116            // Remove hostname resolvers with expired timeouts.
1117            for hostname in self
1118                .hostname_resolvers
1119                .clone()
1120                .into_iter()
1121                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1122                .map(|(hostname, _)| hostname)
1123            {
1124                trace!("hostname resolver timeout for {}", &hostname);
1125                call_hostname_resolution_listener(
1126                    &self.hostname_resolvers,
1127                    &hostname,
1128                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1129                );
1130                call_hostname_resolution_listener(
1131                    &self.hostname_resolvers,
1132                    &hostname,
1133                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1134                );
1135                self.hostname_resolvers.remove(&hostname);
1136            }
1137
1138            // process commands from the command channel
1139            while let Ok(command) = receiver.try_recv() {
1140                if matches!(command, Command::Exit(_)) {
1141                    self.status = DaemonStatus::Shutdown;
1142                    return Some(command);
1143                }
1144                self.exec_command(command, false);
1145            }
1146
1147            // check for repeated commands and run them if their time is up.
1148            let mut i = 0;
1149            while i < self.retransmissions.len() {
1150                if now >= self.retransmissions[i].next_time {
1151                    let rerun = self.retransmissions.remove(i);
1152                    self.exec_command(rerun.command, true);
1153                } else {
1154                    i += 1;
1155                }
1156            }
1157
1158            // Refresh cached service records with active queriers
1159            self.refresh_active_services();
1160
1161            // Refresh cached A/AAAA records with active queriers
1162            let mut query_count = 0;
1163            for (hostname, _sender) in self.hostname_resolvers.iter() {
1164                for (hostname, ip_addr) in
1165                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1166                {
1167                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1168                    query_count += 1;
1169                }
1170            }
1171
1172            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1173
1174            // check and evict expired records in our cache
1175            let now = current_time_millis();
1176
1177            // Notify service listeners about the expired records.
1178            let expired_services = self.cache.evict_expired_services(now);
1179            if !expired_services.is_empty() {
1180                debug!(
1181                    "run: send {} service removal to listeners",
1182                    expired_services.len()
1183                );
1184                self.notify_service_removal(expired_services);
1185            }
1186
1187            // Notify hostname listeners about the expired records.
1188            let expired_addrs = self.cache.evict_expired_addr(now);
1189            for (hostname, addrs) in expired_addrs {
1190                call_hostname_resolution_listener(
1191                    &self.hostname_resolvers,
1192                    &hostname,
1193                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1194                );
1195                let instances = self.cache.get_instances_on_host(&hostname);
1196                let instance_set: HashSet<String> = instances.into_iter().collect();
1197                self.resolve_updated_instances(&instance_set);
1198            }
1199
1200            // Send out probing queries.
1201            self.probing_handler();
1202
1203            // check IP changes if next_ip_check is reached.
1204            if now >= next_ip_check && next_ip_check > 0 {
1205                next_ip_check = now + self.ip_check_interval;
1206                self.add_timer(next_ip_check);
1207
1208                self.check_ip_changes();
1209            }
1210        }
1211    }
1212
1213    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1214        match daemon_opt {
1215            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1216            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1217            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1218            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1219            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1220            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1221            DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1222            #[cfg(test)]
1223            DaemonOption::TestDownInterface(ifname) => {
1224                self.test_down_interfaces.insert(ifname);
1225            }
1226            #[cfg(test)]
1227            DaemonOption::TestUpInterface(ifname) => {
1228                self.test_down_interfaces.remove(&ifname);
1229            }
1230        }
1231    }
1232
1233    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1234        debug!("enable_interface: {:?}", kinds);
1235        for if_kind in kinds {
1236            self.if_selections.push(IfSelection {
1237                if_kind,
1238                selected: true,
1239            });
1240        }
1241
1242        self.apply_intf_selections(my_ip_interfaces(true));
1243    }
1244
1245    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1246        debug!("disable_interface: {:?}", kinds);
1247        for if_kind in kinds {
1248            self.if_selections.push(IfSelection {
1249                if_kind,
1250                selected: false,
1251            });
1252        }
1253
1254        self.apply_intf_selections(my_ip_interfaces(true));
1255    }
1256
1257    fn set_multicast_loop_v4(&mut self, on: bool) {
1258        self.multicast_loop_v4 = on;
1259        self.ipv4_sock
1260            .pktinfo
1261            .set_multicast_loop_v4(on)
1262            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1263            .unwrap();
1264    }
1265
1266    fn set_multicast_loop_v6(&mut self, on: bool) {
1267        self.multicast_loop_v6 = on;
1268        self.ipv6_sock
1269            .pktinfo
1270            .set_multicast_loop_v6(on)
1271            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1272            .unwrap();
1273    }
1274
1275    fn set_accept_unsolicited(&mut self, accept: bool) {
1276        self.accept_unsolicited = accept;
1277    }
1278
1279    fn notify_monitors(&mut self, event: DaemonEvent) {
1280        // Only retain the monitors that are still connected.
1281        self.monitors.retain(|sender| {
1282            if let Err(e) = sender.try_send(event.clone()) {
1283                debug!("notify_monitors: try_send: {}", &e);
1284                if matches!(e, TrySendError::Disconnected(_)) {
1285                    return false; // This monitor is dropped.
1286                }
1287            }
1288            true
1289        });
1290    }
1291
1292    /// Remove `addr` in my services that enabled `addr_auto`.
1293    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1294        for (_, service_info) in self.my_services.iter_mut() {
1295            if service_info.is_addr_auto() {
1296                service_info.remove_ipaddr(addr);
1297            }
1298        }
1299    }
1300
1301    fn add_timer(&mut self, next_time: u64) {
1302        self.timers.push(Reverse(next_time));
1303    }
1304
1305    fn peek_earliest_timer(&self) -> Option<u64> {
1306        self.timers.peek().map(|Reverse(v)| *v)
1307    }
1308
1309    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1310        self.timers.pop().map(|Reverse(v)| v)
1311    }
1312
1313    /// Pop all timers that are already passed till `now`.
1314    fn pop_timers_till(&mut self, now: u64) {
1315        while let Some(Reverse(v)) = self.timers.peek() {
1316            if *v > now {
1317                break;
1318            }
1319            self.timers.pop();
1320        }
1321    }
1322
1323    /// Apply all selections to `interfaces` and return the selected addresses.
1324    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1325        let intf_count = interfaces.len();
1326        let mut intf_selections = vec![true; intf_count];
1327
1328        // apply if_selections
1329        for selection in self.if_selections.iter() {
1330            // Mark the interfaces for this selection.
1331            for i in 0..intf_count {
1332                if selection.if_kind.matches(&interfaces[i]) {
1333                    intf_selections[i] = selection.selected;
1334                }
1335            }
1336        }
1337
1338        let mut selected_addrs = HashSet::new();
1339        for i in 0..intf_count {
1340            if intf_selections[i] {
1341                selected_addrs.insert(interfaces[i].addr.ip());
1342            }
1343        }
1344
1345        selected_addrs
1346    }
1347
1348    /// Apply all selections to `interfaces`.
1349    ///
1350    /// For any interface, add it if selected but not bound yet,
1351    /// delete it if not selected but still bound.
1352    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1353        // By default, we enable all interfaces.
1354        let intf_count = interfaces.len();
1355        let mut intf_selections = vec![true; intf_count];
1356
1357        // apply if_selections
1358        for selection in self.if_selections.iter() {
1359            // Mark the interfaces for this selection.
1360            for i in 0..intf_count {
1361                if selection.if_kind.matches(&interfaces[i]) {
1362                    intf_selections[i] = selection.selected;
1363                }
1364            }
1365        }
1366
1367        // Update `my_intfs` based on the selections.
1368        for (idx, intf) in interfaces.into_iter().enumerate() {
1369            if intf_selections[idx] {
1370                // Add the interface
1371                self.add_interface(intf);
1372            } else {
1373                // Remove the interface
1374                self.del_interface(&intf);
1375            }
1376        }
1377    }
1378
1379    fn del_ip(&mut self, ip: IpAddr) {
1380        self.del_addr_in_my_services(&ip);
1381        self.notify_monitors(DaemonEvent::IpDel(ip));
1382    }
1383
1384    /// Check for IP changes and update [my_intfs] as needed.
1385    fn check_ip_changes(&mut self) {
1386        // Get the current interfaces.
1387        let my_ifaddrs = my_ip_interfaces(true);
1388
1389        #[cfg(test)]
1390        let my_ifaddrs: Vec<_> = my_ifaddrs
1391            .into_iter()
1392            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1393            .collect();
1394
1395        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1396            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1397                let if_index = intf.index.unwrap_or(0);
1398                acc.entry(if_index).or_default().push(&intf.addr);
1399                acc
1400            });
1401
1402        let mut deleted_intfs = Vec::new();
1403        let mut deleted_ips = Vec::new();
1404
1405        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1406            let mut last_ipv4 = None;
1407            let mut last_ipv6 = None;
1408
1409            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1410                my_intf.addrs.retain(|addr| {
1411                    if current_addrs.contains(&addr) {
1412                        true
1413                    } else {
1414                        match addr.ip() {
1415                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1416                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1417                        }
1418                        deleted_ips.push(addr.ip());
1419                        false
1420                    }
1421                });
1422                if my_intf.addrs.is_empty() {
1423                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1424                }
1425            } else {
1426                // If it does not exist, remove the interface.
1427                debug!(
1428                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1429                    my_intf.name, if_index
1430                );
1431                for addr in my_intf.addrs.iter() {
1432                    match addr.ip() {
1433                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1434                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1435                    }
1436                    deleted_ips.push(addr.ip())
1437                }
1438                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1439            }
1440        }
1441
1442        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1443            debug!(
1444                "check_ip_changes: {} deleted ips {} deleted intfs",
1445                deleted_ips.len(),
1446                deleted_intfs.len()
1447            );
1448        }
1449
1450        for ip in deleted_ips {
1451            self.del_ip(ip);
1452        }
1453
1454        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1455            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1456                continue;
1457            };
1458
1459            if let Some(ipv4) = last_ipv4 {
1460                debug!("leave multicast for {ipv4}");
1461                if let Err(e) = self
1462                    .ipv4_sock
1463                    .pktinfo
1464                    .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1465                {
1466                    debug!("leave multicast group for addr {ipv4}: {e}");
1467                }
1468            }
1469
1470            if let Some(ipv6) = last_ipv6 {
1471                debug!("leave multicast for {ipv6}");
1472                if let Err(e) = self
1473                    .ipv6_sock
1474                    .pktinfo
1475                    .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1476                {
1477                    debug!("leave multicast group for IPv6: {ipv6}: {e}");
1478                }
1479            }
1480
1481            // Remove cache records for this interface.
1482            let intf_id = InterfaceId {
1483                name: my_intf.name.to_string(),
1484                index: my_intf.index,
1485            };
1486            let removed_instances = self.cache.remove_records_on_intf(intf_id);
1487            self.notify_service_removal(removed_instances);
1488        }
1489
1490        // Add newly found interfaces only if in our selections.
1491        self.apply_intf_selections(my_ifaddrs);
1492    }
1493
1494    fn del_interface(&mut self, intf: &Interface) {
1495        let if_index = intf.index.unwrap_or(0);
1496        trace!(
1497            "del_interface: {} ({if_index}) addr {}",
1498            intf.name,
1499            intf.ip()
1500        );
1501
1502        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1503            debug!("del_interface: interface {} not found", intf.name);
1504            return;
1505        };
1506
1507        let mut ip_removed = false;
1508
1509        if my_intf.addrs.remove(&intf.addr) {
1510            ip_removed = true;
1511
1512            match intf.addr.ip() {
1513                IpAddr::V4(ipv4) => {
1514                    if my_intf.next_ifaddr_v4().is_none() {
1515                        if let Err(e) = self
1516                            .ipv4_sock
1517                            .pktinfo
1518                            .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1519                        {
1520                            debug!("leave multicast group for addr {ipv4}: {e}");
1521                        }
1522                    }
1523                }
1524
1525                IpAddr::V6(ipv6) => {
1526                    if my_intf.next_ifaddr_v6().is_none() {
1527                        if let Err(e) = self
1528                            .ipv6_sock
1529                            .pktinfo
1530                            .leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1531                        {
1532                            debug!("leave multicast group for addr {ipv6}: {e}");
1533                        }
1534                    }
1535                }
1536            }
1537
1538            if my_intf.addrs.is_empty() {
1539                // If no more addresses, remove the interface.
1540                debug!("del_interface: removing interface {}", intf.name);
1541                self.my_intfs.remove(&if_index);
1542                self.dns_registry_map.remove(&if_index);
1543                self.cache.remove_addrs_on_disabled_intf(if_index);
1544            }
1545        }
1546
1547        if ip_removed {
1548            // Notify the monitors.
1549            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1550            // Remove the interface from my services that enabled `addr_auto`.
1551            self.del_addr_in_my_services(&intf.ip());
1552        }
1553    }
1554
1555    fn add_interface(&mut self, intf: Interface) {
1556        let sock = if intf.ip().is_ipv4() {
1557            &self.ipv4_sock
1558        } else {
1559            &self.ipv6_sock
1560        };
1561
1562        let if_index = intf.index.unwrap_or(0);
1563        let mut new_addr = false;
1564
1565        match self.my_intfs.entry(if_index) {
1566            Entry::Occupied(mut entry) => {
1567                // If intf has a new address, add it to the existing interface.
1568                let my_intf = entry.get_mut();
1569                if !my_intf.addrs.contains(&intf.addr) {
1570                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1571                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1572                    }
1573                    my_intf.addrs.insert(intf.addr.clone());
1574                    new_addr = true;
1575                }
1576            }
1577            Entry::Vacant(entry) => {
1578                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1579                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1580                    return;
1581                }
1582
1583                new_addr = true;
1584                let new_intf = MyIntf {
1585                    name: intf.name.clone(),
1586                    index: if_index,
1587                    addrs: HashSet::from([intf.addr.clone()]),
1588                };
1589                entry.insert(new_intf);
1590            }
1591        }
1592
1593        if !new_addr {
1594            trace!("add_interface: interface {} already exists", &intf.name);
1595            return;
1596        }
1597
1598        debug!("add new interface {}: {}", intf.name, intf.ip());
1599
1600        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1601            debug!("add_interface: cannot find if_index {if_index}");
1602            return;
1603        };
1604
1605        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1606            Some(registry) => registry,
1607            None => self
1608                .dns_registry_map
1609                .entry(if_index)
1610                .or_insert_with(DnsRegistry::new),
1611        };
1612
1613        for (_, service_info) in self.my_services.iter_mut() {
1614            if service_info.is_addr_auto() {
1615                let new_ip = intf.ip();
1616                service_info.insert_ipaddr(new_ip);
1617
1618                if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1619                    debug!(
1620                        "Announce service {} on {}",
1621                        service_info.get_fullname(),
1622                        intf.ip()
1623                    );
1624                    service_info.set_status(if_index, ServiceStatus::Announced);
1625                } else {
1626                    for timer in dns_registry.new_timers.drain(..) {
1627                        self.timers.push(Reverse(timer));
1628                    }
1629                    service_info.set_status(if_index, ServiceStatus::Probing);
1630                }
1631            }
1632        }
1633
1634        // As we added a new interface, we want to execute all active "Browse" reruns now.
1635        let mut browse_reruns = Vec::new();
1636        let mut i = 0;
1637        while i < self.retransmissions.len() {
1638            if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1639                browse_reruns.push(self.retransmissions.remove(i));
1640            } else {
1641                i += 1;
1642            }
1643        }
1644
1645        for rerun in browse_reruns {
1646            self.exec_command(rerun.command, true);
1647        }
1648
1649        // Notify the monitors.
1650        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1651    }
1652
1653    /// Registers a service.
1654    ///
1655    /// RFC 6762 section 8.3.
1656    /// ...the Multicast DNS responder MUST send
1657    ///    an unsolicited Multicast DNS response containing, in the Answer
1658    ///    Section, all of its newly registered resource records
1659    ///
1660    /// Zeroconf will then respond to requests for information about this service.
1661    fn register_service(&mut self, mut info: ServiceInfo) {
1662        // Check the service name length.
1663        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1664            debug!("check_service_name_length: {}", &e);
1665            self.notify_monitors(DaemonEvent::Error(e));
1666            return;
1667        }
1668
1669        if info.is_addr_auto() {
1670            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1671            for addr in selected_addrs {
1672                info.insert_ipaddr(addr);
1673            }
1674        }
1675
1676        debug!("register service {:?}", &info);
1677
1678        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1679        if !outgoing_addrs.is_empty() {
1680            self.notify_monitors(DaemonEvent::Announce(
1681                info.get_fullname().to_string(),
1682                format!("{:?}", &outgoing_addrs),
1683            ));
1684        }
1685
1686        // The key has to be lower case letter as DNS record name is case insensitive.
1687        // The info will have the original name.
1688        let service_fullname = info.get_fullname().to_lowercase();
1689        self.my_services.insert(service_fullname, info);
1690    }
1691
1692    /// Sends out announcement of `info` on every valid interface.
1693    /// Returns the list of interface IPs that sent out the announcement.
1694    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1695        let mut outgoing_addrs = Vec::new();
1696        let mut outgoing_intfs = HashSet::new();
1697
1698        for (if_index, intf) in self.my_intfs.iter() {
1699            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1700                Some(registry) => registry,
1701                None => self
1702                    .dns_registry_map
1703                    .entry(*if_index)
1704                    .or_insert_with(DnsRegistry::new),
1705            };
1706
1707            let mut announced = false;
1708
1709            // IPv4
1710            if announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo) {
1711                for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1712                    outgoing_addrs.push(addr.ip());
1713                }
1714                outgoing_intfs.insert(intf.index);
1715
1716                debug!(
1717                    "Announce service IPv4 {} on {}",
1718                    info.get_fullname(),
1719                    intf.name
1720                );
1721                announced = true;
1722            }
1723
1724            if announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo) {
1725                for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1726                    outgoing_addrs.push(addr.ip());
1727                }
1728                outgoing_intfs.insert(intf.index);
1729
1730                debug!(
1731                    "Announce service IPv6 {} on {}",
1732                    info.get_fullname(),
1733                    intf.name
1734                );
1735                announced = true;
1736            }
1737
1738            if announced {
1739                info.set_status(intf.index, ServiceStatus::Announced);
1740            } else {
1741                for timer in dns_registry.new_timers.drain(..) {
1742                    self.timers.push(Reverse(timer));
1743                }
1744                info.set_status(*if_index, ServiceStatus::Probing);
1745            }
1746        }
1747
1748        // RFC 6762 section 8.3.
1749        // ..The Multicast DNS responder MUST send at least two unsolicited
1750        //    responses, one second apart.
1751        let next_time = current_time_millis() + 1000;
1752        for if_index in outgoing_intfs {
1753            self.add_retransmission(
1754                next_time,
1755                Command::RegisterResend(info.get_fullname().to_string(), if_index),
1756            );
1757        }
1758
1759        outgoing_addrs
1760    }
1761
1762    /// Send probings or finish them if expired. Notify waiting services.
1763    fn probing_handler(&mut self) {
1764        let now = current_time_millis();
1765
1766        for (if_index, intf) in self.my_intfs.iter() {
1767            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1768                continue;
1769            };
1770
1771            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1772
1773            // send probing.
1774            if !out.questions().is_empty() {
1775                trace!("sending out probing of questions: {:?}", out.questions());
1776                send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1777                send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1778            }
1779
1780            // For finished probes, wake up services that are waiting for the probes.
1781            let waiting_services =
1782                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1783
1784            for service_name in waiting_services {
1785                // service names are lowercase
1786                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1787                    if info.get_status(*if_index) == ServiceStatus::Announced {
1788                        debug!("service {} already announced", info.get_fullname());
1789                        continue;
1790                    }
1791
1792                    let announced_v4 =
1793                        announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
1794                    let announced_v6 =
1795                        announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
1796
1797                    if announced_v4 || announced_v6 {
1798                        let next_time = now + 1000;
1799                        let command =
1800                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1801                        self.retransmissions.push(ReRun { next_time, command });
1802                        self.timers.push(Reverse(next_time));
1803
1804                        let fullname = match dns_registry.name_changes.get(&service_name) {
1805                            Some(new_name) => new_name.to_string(),
1806                            None => service_name.to_string(),
1807                        };
1808
1809                        let mut hostname = info.get_hostname();
1810                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1811                            hostname = new_name;
1812                        }
1813
1814                        debug!("wake up: announce service {} on {}", fullname, intf.name);
1815                        notify_monitors(
1816                            &mut self.monitors,
1817                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1818                        );
1819
1820                        info.set_status(*if_index, ServiceStatus::Announced);
1821                    }
1822                }
1823            }
1824        }
1825    }
1826
1827    fn unregister_service(
1828        &self,
1829        info: &ServiceInfo,
1830        intf: &MyIntf,
1831        sock: &PktInfoUdpSocket,
1832    ) -> Vec<u8> {
1833        let is_ipv4 = sock.domain() == Domain::IPV4;
1834
1835        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1836        out.add_answer_at_time(
1837            DnsPointer::new(
1838                info.get_type(),
1839                RRType::PTR,
1840                CLASS_IN,
1841                0,
1842                info.get_fullname().to_string(),
1843            ),
1844            0,
1845        );
1846
1847        if let Some(sub) = info.get_subtype() {
1848            trace!("Adding subdomain {}", sub);
1849            out.add_answer_at_time(
1850                DnsPointer::new(
1851                    sub,
1852                    RRType::PTR,
1853                    CLASS_IN,
1854                    0,
1855                    info.get_fullname().to_string(),
1856                ),
1857                0,
1858            );
1859        }
1860
1861        out.add_answer_at_time(
1862            DnsSrv::new(
1863                info.get_fullname(),
1864                CLASS_IN | CLASS_CACHE_FLUSH,
1865                0,
1866                info.get_priority(),
1867                info.get_weight(),
1868                info.get_port(),
1869                info.get_hostname().to_string(),
1870            ),
1871            0,
1872        );
1873        out.add_answer_at_time(
1874            DnsTxt::new(
1875                info.get_fullname(),
1876                CLASS_IN | CLASS_CACHE_FLUSH,
1877                0,
1878                info.generate_txt(),
1879            ),
1880            0,
1881        );
1882
1883        let if_addrs = if is_ipv4 {
1884            info.get_addrs_on_my_intf_v4(intf)
1885        } else {
1886            info.get_addrs_on_my_intf_v6(intf)
1887        };
1888
1889        if if_addrs.is_empty() {
1890            return vec![];
1891        }
1892
1893        for address in if_addrs {
1894            out.add_answer_at_time(
1895                DnsAddress::new(
1896                    info.get_hostname(),
1897                    ip_address_rr_type(&address),
1898                    CLASS_IN | CLASS_CACHE_FLUSH,
1899                    0,
1900                    address,
1901                    intf.into(),
1902                ),
1903                0,
1904            );
1905        }
1906
1907        // Only (at most) one packet is expected to be sent out.
1908        send_dns_outgoing(&out, intf, sock)
1909            .into_iter()
1910            .next()
1911            .unwrap_or_default()
1912    }
1913
1914    /// Binds a channel `listener` to querying mDNS hostnames.
1915    ///
1916    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1917    fn add_hostname_resolver(
1918        &mut self,
1919        hostname: String,
1920        listener: Sender<HostnameResolutionEvent>,
1921        timeout: Option<u64>,
1922    ) {
1923        let real_timeout = timeout.map(|t| current_time_millis() + t);
1924        self.hostname_resolvers
1925            .insert(hostname.to_lowercase(), (listener, real_timeout));
1926        if let Some(t) = real_timeout {
1927            self.add_timer(t);
1928        }
1929    }
1930
1931    /// Sends a multicast query for `name` with `qtype`.
1932    fn send_query(&self, name: &str, qtype: RRType) {
1933        self.send_query_vec(&[(name, qtype)]);
1934    }
1935
1936    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1937    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1938        trace!("Sending query questions: {:?}", questions);
1939        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1940        let now = current_time_millis();
1941
1942        for (name, qtype) in questions {
1943            out.add_question(name, *qtype);
1944
1945            for record in self.cache.get_known_answers(name, *qtype, now) {
1946                /*
1947                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1948                ...
1949                    When a Multicast DNS querier sends a query to which it already knows
1950                    some answers, it populates the Answer Section of the DNS query
1951                    message with those answers.
1952                 */
1953                trace!("add known answer: {:?}", record.record);
1954                let mut new_record = record.record.clone();
1955                new_record.get_record_mut().update_ttl(now);
1956                out.add_answer_box(new_record);
1957            }
1958        }
1959
1960        for (_, intf) in self.my_intfs.iter() {
1961            send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1962            send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1963        }
1964    }
1965
1966    /// Reads one UDP datagram from the socket of `intf`.
1967    ///
1968    /// Returns false if failed to receive a packet,
1969    /// otherwise returns true.
1970    fn handle_read(&mut self, event_key: usize) -> bool {
1971        let sock = match event_key {
1972            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
1973            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
1974            _ => {
1975                debug!("handle_read: unknown token {}", event_key);
1976                return false;
1977            }
1978        };
1979        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1980
1981        // Read the next mDNS UDP datagram.
1982        //
1983        // If the datagram is larger than `buf`, excess bytes may or may not
1984        // be truncated by the socket layer depending on the platform's libc.
1985        // In any case, such large datagram will not be decoded properly and
1986        // this function should return false but should not crash.
1987        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
1988            Ok(sz) => sz,
1989            Err(e) => {
1990                if e.kind() != std::io::ErrorKind::WouldBlock {
1991                    debug!("listening socket read failed: {}", e);
1992                }
1993                return false;
1994            }
1995        };
1996
1997        // Find the interface that received the packet.
1998        let pkt_if_index = pktinfo.if_index as u32;
1999        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2000            debug!(
2001                "handle_read: no interface found for pktinfo if_index: {}",
2002                pktinfo.if_index
2003            );
2004            return true; // We still return true to indicate that we read something.
2005        };
2006
2007        buf.truncate(sz); // reduce potential processing errors
2008
2009        match DnsIncoming::new(buf, my_intf.into()) {
2010            Ok(msg) => {
2011                if msg.is_query() {
2012                    self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2013                } else if msg.is_response() {
2014                    self.handle_response(msg, pkt_if_index);
2015                } else {
2016                    debug!("Invalid message: not query and not response");
2017                }
2018            }
2019            Err(e) => debug!("Invalid incoming DNS message: {}", e),
2020        }
2021
2022        true
2023    }
2024
2025    /// Returns true, if sent query. Returns false if SRV already exists.
2026    fn query_unresolved(&mut self, instance: &str) -> bool {
2027        if !valid_instance_name(instance) {
2028            trace!("instance name {} not valid", instance);
2029            return false;
2030        }
2031
2032        if let Some(records) = self.cache.get_srv(instance) {
2033            for record in records {
2034                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2035                    if self.cache.get_addr(srv.host()).is_none() {
2036                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2037                        return true;
2038                    }
2039                }
2040            }
2041        } else {
2042            self.send_query(instance, RRType::ANY);
2043            return true;
2044        }
2045
2046        false
2047    }
2048
2049    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2050    /// cached records via `sender`.
2051    fn query_cache_for_service(
2052        &mut self,
2053        ty_domain: &str,
2054        sender: &Sender<ServiceEvent>,
2055        now: u64,
2056    ) {
2057        let mut resolved: HashSet<String> = HashSet::new();
2058        let mut unresolved: HashSet<String> = HashSet::new();
2059
2060        if let Some(records) = self.cache.get_ptr(ty_domain) {
2061            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2062                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2063                    let mut new_event = None;
2064                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2065                        Ok(resolved_service) => {
2066                            if resolved_service.is_valid() {
2067                                debug!("Resolved service from cache: {}", ptr.alias());
2068                                new_event =
2069                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2070                            } else {
2071                                debug!("Resolved service is not valid: {}", ptr.alias());
2072                            }
2073                        }
2074                        Err(err) => {
2075                            debug!("Error while resolving service from cache: {}", err);
2076                            continue;
2077                        }
2078                    }
2079
2080                    match sender.send(ServiceEvent::ServiceFound(
2081                        ty_domain.to_string(),
2082                        ptr.alias().to_string(),
2083                    )) {
2084                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2085                        Err(e) => {
2086                            debug!("failed to send service found: {}", e);
2087                            continue;
2088                        }
2089                    }
2090
2091                    if let Some(event) = new_event {
2092                        resolved.insert(ptr.alias().to_string());
2093                        match sender.send(event) {
2094                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2095                            Err(e) => debug!("failed to send service resolved: {}", e),
2096                        }
2097                    } else {
2098                        unresolved.insert(ptr.alias().to_string());
2099                    }
2100                }
2101            }
2102        }
2103
2104        for instance in resolved.drain() {
2105            self.pending_resolves.remove(&instance);
2106            self.resolved.insert(instance);
2107        }
2108
2109        for instance in unresolved.drain() {
2110            self.add_pending_resolve(instance);
2111        }
2112    }
2113
2114    /// Checks if `hostname` has records in the cache. If yes, sends the
2115    /// cached records via `sender`.
2116    fn query_cache_for_hostname(
2117        &mut self,
2118        hostname: &str,
2119        sender: Sender<HostnameResolutionEvent>,
2120    ) {
2121        let addresses_map = self.cache.get_addresses_for_host(hostname);
2122        for (name, addresses) in addresses_map {
2123            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2124                Ok(()) => trace!("sent hostname addresses found"),
2125                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2126            }
2127        }
2128    }
2129
2130    fn add_pending_resolve(&mut self, instance: String) {
2131        if !self.pending_resolves.contains(&instance) {
2132            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2133            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2134            self.pending_resolves.insert(instance);
2135        }
2136    }
2137
2138    /// Creates a `ResolvedService` from the cache.
2139    fn resolve_service_from_cache(
2140        &self,
2141        ty_domain: &str,
2142        fullname: &str,
2143    ) -> Result<ResolvedService> {
2144        let now = current_time_millis();
2145        let mut resolved_service = ResolvedService {
2146            ty_domain: ty_domain.to_string(),
2147            sub_ty_domain: None,
2148            fullname: fullname.to_string(),
2149            host: String::new(),
2150            port: 0,
2151            addresses: HashSet::new(),
2152            txt_properties: TxtProperties::new(),
2153        };
2154
2155        // Be sure setting `subtype` if available even when querying for the parent domain.
2156        if let Some(subtype) = self.cache.get_subtype(fullname) {
2157            trace!(
2158                "ty_domain: {} found subtype {} for instance: {}",
2159                ty_domain,
2160                subtype,
2161                fullname
2162            );
2163            if resolved_service.sub_ty_domain.is_none() {
2164                resolved_service.sub_ty_domain = Some(subtype.to_string());
2165            }
2166        }
2167
2168        // resolve SRV record
2169        if let Some(records) = self.cache.get_srv(fullname) {
2170            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2171                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2172                    resolved_service.host = dns_srv.host().to_string();
2173                    resolved_service.port = dns_srv.port();
2174                }
2175            }
2176        }
2177
2178        // resolve TXT record
2179        if let Some(records) = self.cache.get_txt(fullname) {
2180            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2181                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2182                    resolved_service.txt_properties = dns_txt.text().into();
2183                }
2184            }
2185        }
2186
2187        // resolve A and AAAA records
2188        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2189            for answer in records.iter() {
2190                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2191                    if dns_a.expires_soon(now) {
2192                        trace!(
2193                            "Addr expired or expires soon: {}",
2194                            dns_a.address().to_ip_addr()
2195                        );
2196                    } else {
2197                        resolved_service.addresses.insert(dns_a.address());
2198                    }
2199                }
2200            }
2201        }
2202
2203        Ok(resolved_service)
2204    }
2205
2206    fn handle_poller_events(&mut self, events: &mio::Events) {
2207        for ev in events.iter() {
2208            trace!("event received with key {:?}", ev.token());
2209            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2210                // Drain signals as we will drain commands as well.
2211                self.signal_sock_drain();
2212
2213                if let Err(e) = self.poller.registry().reregister(
2214                    &mut self.signal_sock,
2215                    ev.token(),
2216                    mio::Interest::READABLE,
2217                ) {
2218                    debug!("failed to modify poller for signal socket: {}", e);
2219                }
2220                continue; // Next event.
2221            }
2222
2223            // Read until no more packets available.
2224            while self.handle_read(ev.token().0) {}
2225
2226            // we continue to monitor this socket.
2227            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2228                // Re-register the IPv4 socket for reading.
2229                if let Err(e) = self.poller.registry().reregister(
2230                    &mut self.ipv4_sock,
2231                    ev.token(),
2232                    mio::Interest::READABLE,
2233                ) {
2234                    debug!("modify poller for IPv4 socket: {}", e);
2235                }
2236            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2237                // Re-register the IPv6 socket for reading.
2238                if let Err(e) = self.poller.registry().reregister(
2239                    &mut self.ipv6_sock,
2240                    ev.token(),
2241                    mio::Interest::READABLE,
2242                ) {
2243                    debug!("modify poller for IPv6 socket: {}", e);
2244                }
2245            }
2246        }
2247    }
2248
2249    /// Deal with incoming response packets.  All answers
2250    /// are held in the cache, and listeners are notified.
2251    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2252        let now = current_time_millis();
2253
2254        // remove records that are expired.
2255        let mut record_predicate = |record: &DnsRecordBox| {
2256            if !record.get_record().is_expired(now) {
2257                return true;
2258            }
2259
2260            debug!("record is expired, removing it from cache.");
2261            if self.cache.remove(record) {
2262                // for PTR records, send event to listeners
2263                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2264                    call_service_listener(
2265                        &self.service_queriers,
2266                        dns_ptr.get_name(),
2267                        ServiceEvent::ServiceRemoved(
2268                            dns_ptr.get_name().to_string(),
2269                            dns_ptr.alias().to_string(),
2270                        ),
2271                    );
2272                }
2273            }
2274            false
2275        };
2276        msg.answers_mut().retain(&mut record_predicate);
2277        msg.authorities_mut().retain(&mut record_predicate);
2278        msg.additionals_mut().retain(&mut record_predicate);
2279
2280        // check possible conflicts and handle them.
2281        self.conflict_handler(&msg, if_index);
2282
2283        // check if the message is for us.
2284        let mut is_for_us = true; // assume it is for us.
2285
2286        // If there are any PTR records in the answers, there should be
2287        // at least one PTR for us. Otherwise, the message is not for us.
2288        // If there are no PTR records at all, assume this message is for us.
2289        for answer in msg.answers() {
2290            if answer.get_type() == RRType::PTR {
2291                if self.service_queriers.contains_key(answer.get_name()) {
2292                    is_for_us = true;
2293                    break; // OK to break: at least one PTR for us.
2294                } else {
2295                    is_for_us = false;
2296                }
2297            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2298                // If there is a hostname querier for this address, then it is for us.
2299                let answer_lowercase = answer.get_name().to_lowercase();
2300                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2301                    is_for_us = true;
2302                    break; // OK to break: at least one hostname for us.
2303                }
2304            }
2305        }
2306
2307        // if we explicitily want to accept unsolicited responses, we should consider all messages as for us.
2308        if self.accept_unsolicited {
2309            is_for_us = true;
2310        }
2311
2312        /// Represents a DNS record change that involves one service instance.
2313        struct InstanceChange {
2314            ty: RRType,   // The type of DNS record for the instance.
2315            name: String, // The name of the record.
2316        }
2317
2318        // Go through all answers to get the new and updated records.
2319        // For new PTR records, send out ServiceFound immediately. For others,
2320        // collect them into `changes`.
2321        //
2322        // Note: we don't try to identify the update instances based on
2323        // each record immediately as the answers are likely related to each
2324        // other.
2325        let mut changes = Vec::new();
2326        let mut timers = Vec::new();
2327        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2328            return;
2329        };
2330        for record in msg.all_records() {
2331            match self
2332                .cache
2333                .add_or_update(my_intf, record, &mut timers, is_for_us)
2334            {
2335                Some((dns_record, true)) => {
2336                    timers.push(dns_record.record.get_record().get_expire_time());
2337                    timers.push(dns_record.record.get_record().get_refresh_time());
2338
2339                    let ty = dns_record.record.get_type();
2340                    let name = dns_record.record.get_name();
2341
2342                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2343                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2344                        if self.service_queriers.contains_key(name) {
2345                            timers.push(dns_record.record.get_record().get_refresh_time());
2346                        }
2347
2348                        // send ServiceFound
2349                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2350                        {
2351                            debug!("calling listener with service found: {name}");
2352                            call_service_listener(
2353                                &self.service_queriers,
2354                                name,
2355                                ServiceEvent::ServiceFound(
2356                                    name.to_string(),
2357                                    dns_ptr.alias().to_string(),
2358                                ),
2359                            );
2360                            changes.push(InstanceChange {
2361                                ty,
2362                                name: dns_ptr.alias().to_string(),
2363                            });
2364                        }
2365                    } else {
2366                        changes.push(InstanceChange {
2367                            ty,
2368                            name: name.to_string(),
2369                        });
2370                    }
2371                }
2372                Some((dns_record, false)) => {
2373                    timers.push(dns_record.record.get_record().get_expire_time());
2374                    timers.push(dns_record.record.get_record().get_refresh_time());
2375                }
2376                _ => {}
2377            }
2378        }
2379
2380        // Add timers for the new records.
2381        for t in timers {
2382            self.add_timer(t);
2383        }
2384
2385        // Go through remaining changes to see if any hostname resolutions were found or updated.
2386        for change in changes
2387            .iter()
2388            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2389        {
2390            let addr_map = self.cache.get_addresses_for_host(&change.name);
2391            for (name, addresses) in addr_map {
2392                call_hostname_resolution_listener(
2393                    &self.hostname_resolvers,
2394                    &change.name,
2395                    HostnameResolutionEvent::AddressesFound(name, addresses),
2396                )
2397            }
2398        }
2399
2400        // Identify the instances that need to be "resolved".
2401        let mut updated_instances = HashSet::new();
2402        for update in changes {
2403            match update.ty {
2404                RRType::PTR | RRType::SRV | RRType::TXT => {
2405                    updated_instances.insert(update.name);
2406                }
2407                RRType::A | RRType::AAAA => {
2408                    let instances = self.cache.get_instances_on_host(&update.name);
2409                    updated_instances.extend(instances);
2410                }
2411                _ => {}
2412            }
2413        }
2414
2415        self.resolve_updated_instances(&updated_instances);
2416    }
2417
2418    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2419        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2420            debug!("handle_response: no intf found for index {if_index}");
2421            return;
2422        };
2423
2424        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2425            return;
2426        };
2427
2428        for answer in msg.answers().iter() {
2429            let mut new_records = Vec::new();
2430
2431            let name = answer.get_name();
2432            let Some(probe) = dns_registry.probing.get_mut(name) else {
2433                continue;
2434            };
2435
2436            // check against possible multicast forwarding
2437            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2438                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2439                    if answer_addr.interface_id.index != if_index {
2440                        debug!(
2441                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2442                            answer_addr, my_intf.name
2443                        );
2444                        continue;
2445                    }
2446                }
2447
2448                // double check if any other address record matches rrdata,
2449                // as there could be multiple addresses for the same name.
2450                let any_match = probe.records.iter().any(|r| {
2451                    r.get_type() == answer.get_type()
2452                        && r.get_class() == answer.get_class()
2453                        && r.rrdata_match(answer.as_ref())
2454                });
2455                if any_match {
2456                    continue; // no conflict for this answer.
2457                }
2458            }
2459
2460            probe.records.retain(|record| {
2461                if record.get_type() == answer.get_type()
2462                    && record.get_class() == answer.get_class()
2463                    && !record.rrdata_match(answer.as_ref())
2464                {
2465                    debug!(
2466                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2467                        record.get_type(),
2468                        record.rdata_print(),
2469                        answer.rdata_print()
2470                    );
2471
2472                    // create a new name for this record
2473                    // then remove the old record in probing.
2474                    let mut new_record = record.clone();
2475                    let new_name = match record.get_type() {
2476                        RRType::A => hostname_change(name),
2477                        RRType::AAAA => hostname_change(name),
2478                        _ => name_change(name),
2479                    };
2480                    new_record.get_record_mut().set_new_name(new_name);
2481                    new_records.push(new_record);
2482                    return false; // old record is dropped from the probe.
2483                }
2484
2485                true
2486            });
2487
2488            // ?????
2489            // if probe.records.is_empty() {
2490            //     dns_registry.probing.remove(name);
2491            // }
2492
2493            // Probing again with the new names.
2494            let create_time = current_time_millis() + fastrand::u64(0..250);
2495
2496            let waiting_services = probe.waiting_services.clone();
2497
2498            for record in new_records {
2499                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2500                    self.timers.push(Reverse(create_time));
2501                }
2502
2503                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2504                dns_registry.name_changes.insert(
2505                    record.get_record().get_original_name().to_string(),
2506                    record.get_name().to_string(),
2507                );
2508
2509                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2510                    Some(p) => p,
2511                    None => {
2512                        let new_probe = dns_registry
2513                            .probing
2514                            .entry(record.get_name().to_string())
2515                            .or_insert_with(|| {
2516                                debug!("conflict handler: new probe of {}", record.get_name());
2517                                Probe::new(create_time)
2518                            });
2519                        self.timers.push(Reverse(new_probe.next_send));
2520                        new_probe
2521                    }
2522                };
2523
2524                debug!(
2525                    "insert record with new name '{}' {} into probe",
2526                    record.get_name(),
2527                    record.get_type()
2528                );
2529                new_probe.insert_record(record);
2530
2531                new_probe.waiting_services.extend(waiting_services.clone());
2532            }
2533        }
2534    }
2535
2536    /// Resolve the updated (including new) instances.
2537    ///
2538    /// Note: it is possible that more than 1 PTR pointing to the same
2539    /// instance. For example, a regular service type PTR and a sub-type
2540    /// service type PTR can both point to the same service instance.
2541    /// This loop automatically handles the sub-type PTRs.
2542    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2543        let mut resolved: HashSet<String> = HashSet::new();
2544        let mut unresolved: HashSet<String> = HashSet::new();
2545        let mut removed_instances = HashMap::new();
2546
2547        let now = current_time_millis();
2548
2549        for (ty_domain, records) in self.cache.all_ptr().iter() {
2550            if !self.service_queriers.contains_key(ty_domain) {
2551                // No need to resolve if not in our queries.
2552                continue;
2553            }
2554
2555            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2556                if let Some(dns_ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2557                    if updated_instances.contains(dns_ptr.alias()) {
2558                        let mut instance_found = false;
2559                        let mut new_event = None;
2560
2561                        if let Ok(resolved) =
2562                            self.resolve_service_from_cache(ty_domain, dns_ptr.alias())
2563                        {
2564                            debug!("resolve_updated_instances: from cache: {}", dns_ptr.alias());
2565                            instance_found = true;
2566                            if resolved.is_valid() {
2567                                new_event = Some(ServiceEvent::ServiceResolved(Box::new(resolved)));
2568                            } else {
2569                                debug!("Resolved service is not valid: {}", dns_ptr.alias());
2570                            }
2571                        }
2572
2573                        if instance_found {
2574                            if let Some(event) = new_event {
2575                                debug!("call queriers to resolve {}", dns_ptr.alias());
2576                                resolved.insert(dns_ptr.alias().to_string());
2577                                call_service_listener(&self.service_queriers, ty_domain, event);
2578                            } else {
2579                                if self.resolved.remove(dns_ptr.alias()) {
2580                                    removed_instances
2581                                        .entry(ty_domain.to_string())
2582                                        .or_insert_with(HashSet::new)
2583                                        .insert(dns_ptr.alias().to_string());
2584                                }
2585                                unresolved.insert(dns_ptr.alias().to_string());
2586                            }
2587                        }
2588                    }
2589                }
2590            }
2591        }
2592
2593        for instance in resolved.drain() {
2594            self.pending_resolves.remove(&instance);
2595            self.resolved.insert(instance);
2596        }
2597
2598        for instance in unresolved.drain() {
2599            self.add_pending_resolve(instance);
2600        }
2601
2602        if !removed_instances.is_empty() {
2603            debug!(
2604                "resolve_updated_instances: removed {}",
2605                &removed_instances.len()
2606            );
2607            self.notify_service_removal(removed_instances);
2608        }
2609    }
2610
2611    /// Handle incoming query packets, figure out whether and what to respond.
2612    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2613        let sock = if is_ipv4 {
2614            &self.ipv4_sock
2615        } else {
2616            &self.ipv6_sock
2617        };
2618        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2619
2620        // Special meta-query "_services._dns-sd._udp.<Domain>".
2621        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2622        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2623
2624        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2625            debug!("missing dns registry for intf {}", if_index);
2626            return;
2627        };
2628
2629        let Some(intf) = self.my_intfs.get(&if_index) else {
2630            return;
2631        };
2632
2633        for question in msg.questions().iter() {
2634            let qtype = question.entry_type();
2635
2636            if qtype == RRType::PTR {
2637                for service in self.my_services.values() {
2638                    if service.get_status(if_index) != ServiceStatus::Announced {
2639                        continue;
2640                    }
2641
2642                    if question.entry_name() == service.get_type()
2643                        || service
2644                            .get_subtype()
2645                            .as_ref()
2646                            .is_some_and(|v| v == question.entry_name())
2647                    {
2648                        add_answer_with_additionals(
2649                            &mut out,
2650                            &msg,
2651                            service,
2652                            intf,
2653                            dns_registry,
2654                            is_ipv4,
2655                        );
2656                    } else if question.entry_name() == META_QUERY {
2657                        let ptr_added = out.add_answer(
2658                            &msg,
2659                            DnsPointer::new(
2660                                question.entry_name(),
2661                                RRType::PTR,
2662                                CLASS_IN,
2663                                service.get_other_ttl(),
2664                                service.get_type().to_string(),
2665                            ),
2666                        );
2667                        if !ptr_added {
2668                            trace!("answer was not added for meta-query {:?}", &question);
2669                        }
2670                    }
2671                }
2672            } else {
2673                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2674                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2675                    let probe_name = question.entry_name();
2676
2677                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2678                        let now = current_time_millis();
2679
2680                        // Only do tiebreaking if probe already started.
2681                        // This check also helps avoid redo tiebreaking if start time
2682                        // was postponed.
2683                        if probe.start_time < now {
2684                            let incoming_records: Vec<_> = msg
2685                                .authorities()
2686                                .iter()
2687                                .filter(|r| r.get_name() == probe_name)
2688                                .collect();
2689
2690                            probe.tiebreaking(&incoming_records, now, probe_name);
2691                        }
2692                    }
2693                }
2694
2695                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2696                    for service in self.my_services.values() {
2697                        if service.get_status(if_index) != ServiceStatus::Announced {
2698                            continue;
2699                        }
2700
2701                        let service_hostname =
2702                            match dns_registry.name_changes.get(service.get_hostname()) {
2703                                Some(new_name) => new_name,
2704                                None => service.get_hostname(),
2705                            };
2706
2707                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2708                            let intf_addrs = if is_ipv4 {
2709                                service.get_addrs_on_my_intf_v4(intf)
2710                            } else {
2711                                service.get_addrs_on_my_intf_v6(intf)
2712                            };
2713                            if intf_addrs.is_empty()
2714                                && (qtype == RRType::A || qtype == RRType::AAAA)
2715                            {
2716                                let t = match qtype {
2717                                    RRType::A => "TYPE_A",
2718                                    RRType::AAAA => "TYPE_AAAA",
2719                                    _ => "invalid_type",
2720                                };
2721                                trace!(
2722                                    "Cannot find valid addrs for {} response on intf {:?}",
2723                                    t,
2724                                    &intf
2725                                );
2726                                return;
2727                            }
2728                            for address in intf_addrs {
2729                                out.add_answer(
2730                                    &msg,
2731                                    DnsAddress::new(
2732                                        service_hostname,
2733                                        ip_address_rr_type(&address),
2734                                        CLASS_IN | CLASS_CACHE_FLUSH,
2735                                        service.get_host_ttl(),
2736                                        address,
2737                                        intf.into(),
2738                                    ),
2739                                );
2740                            }
2741                        }
2742                    }
2743                }
2744
2745                let query_name = question.entry_name().to_lowercase();
2746                let service_opt = self
2747                    .my_services
2748                    .iter()
2749                    .find(|(k, _v)| {
2750                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2751                            Some(new_name) => new_name,
2752                            None => k,
2753                        };
2754                        service_name == &query_name
2755                    })
2756                    .map(|(_, v)| v);
2757
2758                let Some(service) = service_opt else {
2759                    continue;
2760                };
2761
2762                if service.get_status(if_index) != ServiceStatus::Announced {
2763                    continue;
2764                }
2765
2766                let intf_addrs = if is_ipv4 {
2767                    service.get_addrs_on_my_intf_v4(intf)
2768                } else {
2769                    service.get_addrs_on_my_intf_v6(intf)
2770                };
2771                if intf_addrs.is_empty() {
2772                    debug!(
2773                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2774                        &intf
2775                    );
2776                    continue;
2777                }
2778
2779                add_answer_of_service(
2780                    &mut out,
2781                    &msg,
2782                    question.entry_name(),
2783                    service,
2784                    qtype,
2785                    intf_addrs,
2786                );
2787            }
2788        }
2789
2790        if !out.answers_count() > 0 {
2791            out.set_id(msg.id());
2792            send_dns_outgoing(&out, intf, &sock.pktinfo);
2793
2794            let if_name = intf.name.clone();
2795
2796            self.increase_counter(Counter::Respond, 1);
2797            self.notify_monitors(DaemonEvent::Respond(if_name));
2798        }
2799
2800        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2801    }
2802
2803    /// Increases the value of `counter` by `count`.
2804    fn increase_counter(&mut self, counter: Counter, count: i64) {
2805        let key = counter.to_string();
2806        match self.counters.get_mut(&key) {
2807            Some(v) => *v += count,
2808            None => {
2809                self.counters.insert(key, count);
2810            }
2811        }
2812    }
2813
2814    /// Sets the value of `counter` to `count`.
2815    fn set_counter(&mut self, counter: Counter, count: i64) {
2816        let key = counter.to_string();
2817        self.counters.insert(key, count);
2818    }
2819
2820    fn signal_sock_drain(&self) {
2821        let mut signal_buf = [0; 1024];
2822
2823        // This recv is non-blocking as the socket is non-blocking.
2824        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2825            trace!(
2826                "signal socket recvd: {}",
2827                String::from_utf8_lossy(&signal_buf[0..sz])
2828            );
2829        }
2830    }
2831
2832    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2833        self.retransmissions.push(ReRun { next_time, command });
2834        self.add_timer(next_time);
2835    }
2836
2837    /// Sends service removal event to listeners for expired service records.
2838    /// `expired`: map of service type domain to set of instance names.
2839    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2840        for (ty_domain, sender) in self.service_queriers.iter() {
2841            if let Some(instances) = expired.get(ty_domain) {
2842                for instance_name in instances {
2843                    let event = ServiceEvent::ServiceRemoved(
2844                        ty_domain.to_string(),
2845                        instance_name.to_string(),
2846                    );
2847                    match sender.send(event) {
2848                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2849                        Err(e) => debug!("Failed to send event: {}", e),
2850                    }
2851                }
2852            }
2853        }
2854    }
2855
2856    /// The entry point that executes all commands received by the daemon.
2857    ///
2858    /// `repeating`: whether this is a retransmission.
2859    fn exec_command(&mut self, command: Command, repeating: bool) {
2860        trace!("exec_command: {:?} repeating: {}", &command, repeating);
2861        match command {
2862            Command::Browse(ty, next_delay, cache_only, listener) => {
2863                self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
2864            }
2865
2866            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2867                self.exec_command_resolve_hostname(
2868                    repeating, hostname, next_delay, listener, timeout,
2869                );
2870            }
2871
2872            Command::Register(service_info) => {
2873                self.register_service(service_info);
2874                self.increase_counter(Counter::Register, 1);
2875            }
2876
2877            Command::RegisterResend(fullname, intf) => {
2878                trace!("register-resend service: {fullname} on {}", &intf);
2879                self.exec_command_register_resend(fullname, intf);
2880            }
2881
2882            Command::Unregister(fullname, resp_s) => {
2883                trace!("unregister service {} repeat {}", &fullname, &repeating);
2884                self.exec_command_unregister(repeating, fullname, resp_s);
2885            }
2886
2887            Command::UnregisterResend(packet, if_index, is_ipv4) => {
2888                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2889            }
2890
2891            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2892
2893            Command::StopResolveHostname(hostname) => {
2894                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2895            }
2896
2897            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2898
2899            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2900
2901            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2902                Ok(()) => trace!("Sent status to the client"),
2903                Err(e) => debug!("Failed to send status: {}", e),
2904            },
2905
2906            Command::Monitor(resp_s) => {
2907                self.monitors.push(resp_s);
2908            }
2909
2910            Command::SetOption(daemon_opt) => {
2911                self.process_set_option(daemon_opt);
2912            }
2913
2914            Command::GetOption(resp_s) => {
2915                let val = DaemonOptionVal {
2916                    _service_name_len_max: self.service_name_len_max,
2917                    ip_check_interval: self.ip_check_interval,
2918                };
2919                if let Err(e) = resp_s.send(val) {
2920                    debug!("Failed to send options: {}", e);
2921                }
2922            }
2923
2924            Command::Verify(instance_fullname, timeout) => {
2925                self.exec_command_verify(instance_fullname, timeout, repeating);
2926            }
2927
2928            _ => {
2929                debug!("unexpected command: {:?}", &command);
2930            }
2931        }
2932    }
2933
2934    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2935        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2936        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2937        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2938        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2939        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2940        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2941        self.set_counter(Counter::Timer, self.timers.len() as i64);
2942
2943        let dns_registry_probe_count: usize = self
2944            .dns_registry_map
2945            .values()
2946            .map(|r| r.probing.len())
2947            .sum();
2948        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2949
2950        let dns_registry_active_count: usize = self
2951            .dns_registry_map
2952            .values()
2953            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2954            .sum();
2955        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2956
2957        let dns_registry_timer_count: usize = self
2958            .dns_registry_map
2959            .values()
2960            .map(|r| r.new_timers.len())
2961            .sum();
2962        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2963
2964        let dns_registry_name_change_count: usize = self
2965            .dns_registry_map
2966            .values()
2967            .map(|r| r.name_changes.len())
2968            .sum();
2969        self.set_counter(
2970            Counter::DnsRegistryNameChange,
2971            dns_registry_name_change_count as i64,
2972        );
2973
2974        // Send the metrics to the client.
2975        if let Err(e) = resp_s.send(self.counters.clone()) {
2976            debug!("Failed to send metrics: {}", e);
2977        }
2978    }
2979
2980    fn exec_command_browse(
2981        &mut self,
2982        repeating: bool,
2983        ty: String,
2984        next_delay: u32,
2985        cache_only: bool,
2986        listener: Sender<ServiceEvent>,
2987    ) {
2988        let pretty_addrs: Vec<String> = self
2989            .my_intfs
2990            .iter()
2991            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
2992            .collect();
2993
2994        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2995            "{ty} on {} interfaces [{}]",
2996            pretty_addrs.len(),
2997            pretty_addrs.join(", ")
2998        ))) {
2999            debug!(
3000                "Failed to send SearchStarted({})(repeating:{}): {}",
3001                &ty, repeating, e
3002            );
3003            return;
3004        }
3005
3006        let now = current_time_millis();
3007        if !repeating {
3008            // Binds a `listener` to querying mDNS domain type `ty`.
3009            //
3010            // If there is already a `listener`, it will be updated, i.e. overwritten.
3011            self.service_queriers.insert(ty.clone(), listener.clone());
3012
3013            // if we already have the records in our cache, just send them
3014            self.query_cache_for_service(&ty, &listener, now);
3015        }
3016
3017        if cache_only {
3018            // If cache_only is true, we do not send a query.
3019            match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3020                Ok(()) => debug!("SearchStopped sent for {}", &ty),
3021                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3022            }
3023            return;
3024        }
3025
3026        self.send_query(&ty, RRType::PTR);
3027        self.increase_counter(Counter::Browse, 1);
3028
3029        let next_time = now + (next_delay * 1000) as u64;
3030        let max_delay = 60 * 60;
3031        let delay = cmp::min(next_delay * 2, max_delay);
3032        self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3033    }
3034
3035    fn exec_command_resolve_hostname(
3036        &mut self,
3037        repeating: bool,
3038        hostname: String,
3039        next_delay: u32,
3040        listener: Sender<HostnameResolutionEvent>,
3041        timeout: Option<u64>,
3042    ) {
3043        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3044        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3045            "{} on addrs {:?}",
3046            &hostname, &addr_list
3047        ))) {
3048            debug!(
3049                "Failed to send ResolveStarted({})(repeating:{}): {}",
3050                &hostname, repeating, e
3051            );
3052            return;
3053        }
3054        if !repeating {
3055            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3056            // if we already have the records in our cache, just send them
3057            self.query_cache_for_hostname(&hostname, listener.clone());
3058        }
3059
3060        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3061        self.increase_counter(Counter::ResolveHostname, 1);
3062
3063        let now = current_time_millis();
3064        let next_time = now + u64::from(next_delay) * 1000;
3065        let max_delay = 60 * 60;
3066        let delay = cmp::min(next_delay * 2, max_delay);
3067
3068        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3069        if self
3070            .hostname_resolvers
3071            .get(&hostname)
3072            .and_then(|(_sender, timeout)| *timeout)
3073            .map(|timeout| next_time < timeout)
3074            .unwrap_or(true)
3075        {
3076            self.add_retransmission(
3077                next_time,
3078                Command::ResolveHostname(hostname, delay, listener, None),
3079            );
3080        }
3081    }
3082
3083    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3084        let pending_query = self.query_unresolved(&instance);
3085        let max_try = 3;
3086        if pending_query && try_count < max_try {
3087            // Note that if the current try already succeeds, the next retransmission
3088            // will be no-op as the cache has been updated.
3089            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3090            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3091        }
3092    }
3093
3094    fn exec_command_unregister(
3095        &mut self,
3096        repeating: bool,
3097        fullname: String,
3098        resp_s: Sender<UnregisterStatus>,
3099    ) {
3100        let response = match self.my_services.remove_entry(&fullname) {
3101            None => {
3102                debug!("unregister: cannot find such service {}", &fullname);
3103                UnregisterStatus::NotFound
3104            }
3105            Some((_k, info)) => {
3106                let mut timers = Vec::new();
3107
3108                for (if_index, intf) in self.my_intfs.iter() {
3109                    let packet = self.unregister_service(&info, intf, &self.ipv4_sock.pktinfo);
3110                    // repeat for one time just in case some peers miss the message
3111                    if !repeating && !packet.is_empty() {
3112                        let next_time = current_time_millis() + 120;
3113                        self.retransmissions.push(ReRun {
3114                            next_time,
3115                            command: Command::UnregisterResend(packet, *if_index, true),
3116                        });
3117                        timers.push(next_time);
3118                    }
3119
3120                    // ipv6
3121
3122                    let packet = self.unregister_service(&info, intf, &self.ipv6_sock.pktinfo);
3123                    if !repeating && !packet.is_empty() {
3124                        let next_time = current_time_millis() + 120;
3125                        self.retransmissions.push(ReRun {
3126                            next_time,
3127                            command: Command::UnregisterResend(packet, *if_index, false),
3128                        });
3129                        timers.push(next_time);
3130                    }
3131                }
3132
3133                for t in timers {
3134                    self.add_timer(t);
3135                }
3136
3137                self.increase_counter(Counter::Unregister, 1);
3138                UnregisterStatus::OK
3139            }
3140        };
3141        if let Err(e) = resp_s.send(response) {
3142            debug!("unregister: failed to send response: {}", e);
3143        }
3144    }
3145
3146    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3147        let Some(intf) = self.my_intfs.get(&if_index) else {
3148            return;
3149        };
3150        let sock = if is_ipv4 {
3151            &self.ipv4_sock.pktinfo
3152        } else {
3153            &self.ipv6_sock.pktinfo
3154        };
3155
3156        let if_addr = if is_ipv4 {
3157            match intf.next_ifaddr_v4() {
3158                Some(addr) => addr,
3159                None => return,
3160            }
3161        } else {
3162            match intf.next_ifaddr_v6() {
3163                Some(addr) => addr,
3164                None => return,
3165            }
3166        };
3167
3168        debug!("UnregisterResend from {:?}", if_addr);
3169        multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, sock);
3170
3171        self.increase_counter(Counter::UnregisterResend, 1);
3172    }
3173
3174    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3175        match self.service_queriers.remove_entry(&ty_domain) {
3176            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3177            Some((ty, sender)) => {
3178                // Remove pending browse commands in the reruns.
3179                trace!("StopBrowse: removed queryer for {}", &ty);
3180                let mut i = 0;
3181                while i < self.retransmissions.len() {
3182                    if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3183                        if t == &ty {
3184                            self.retransmissions.remove(i);
3185                            trace!("StopBrowse: removed retransmission for {}", &ty);
3186                            continue;
3187                        }
3188                    }
3189                    i += 1;
3190                }
3191
3192                // Remove cache entries.
3193                self.cache.remove_service_type(&ty_domain);
3194
3195                // Notify the client.
3196                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3197                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3198                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3199                }
3200            }
3201        }
3202    }
3203
3204    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3205        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3206            // Remove pending resolve commands in the reruns.
3207            trace!("StopResolve: removed queryer for {}", &host);
3208            let mut i = 0;
3209            while i < self.retransmissions.len() {
3210                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3211                    if t == &host {
3212                        self.retransmissions.remove(i);
3213                        trace!("StopResolve: removed retransmission for {}", &host);
3214                        continue;
3215                    }
3216                }
3217                i += 1;
3218            }
3219
3220            // Notify the client.
3221            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3222                Ok(()) => trace!("Sent SearchStopped to the listener"),
3223                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3224            }
3225        }
3226    }
3227
3228    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3229        let Some(info) = self.my_services.get_mut(&fullname) else {
3230            trace!("announce: cannot find such service {}", &fullname);
3231            return;
3232        };
3233
3234        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3235            return;
3236        };
3237
3238        let Some(intf) = self.my_intfs.get(&if_index) else {
3239            return;
3240        };
3241
3242        let announced_v4 =
3243            announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
3244        let announced_v6 =
3245            announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
3246
3247        if announced_v4 || announced_v6 {
3248            let mut hostname = info.get_hostname();
3249            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3250                hostname = new_name;
3251            }
3252            let service_name = match dns_registry.name_changes.get(&fullname) {
3253                Some(new_name) => new_name.to_string(),
3254                None => fullname,
3255            };
3256
3257            debug!("resend: announce service {service_name} on {}", intf.name);
3258
3259            notify_monitors(
3260                &mut self.monitors,
3261                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3262            );
3263            info.set_status(if_index, ServiceStatus::Announced);
3264        } else {
3265            debug!("register-resend should not fail");
3266        }
3267
3268        self.increase_counter(Counter::RegisterResend, 1);
3269    }
3270
3271    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3272        /*
3273        RFC 6762 section 10.4:
3274        ...
3275        When the cache receives this hint that it should reconfirm some
3276        record, it MUST issue two or more queries for the resource record in
3277        dispute.  If no response is received within ten seconds, then, even
3278        though its TTL may indicate that it is not yet due to expire, that
3279        record SHOULD be promptly flushed from the cache.
3280        */
3281        let now = current_time_millis();
3282        let expire_at = if repeating {
3283            None
3284        } else {
3285            Some(now + timeout.as_millis() as u64)
3286        };
3287
3288        // send query for the resource records.
3289        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3290
3291        if !record_vec.is_empty() {
3292            let query_vec: Vec<(&str, RRType)> = record_vec
3293                .iter()
3294                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3295                .collect();
3296            self.send_query_vec(&query_vec);
3297
3298            if let Some(new_expire) = expire_at {
3299                self.add_timer(new_expire); // ensure a check for the new expire time.
3300
3301                // schedule a resend 1 second later
3302                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3303            }
3304        }
3305    }
3306
3307    /// Refresh cached service records with active queriers
3308    fn refresh_active_services(&mut self) {
3309        let mut query_ptr_count = 0;
3310        let mut query_srv_count = 0;
3311        let mut new_timers = HashSet::new();
3312        let mut query_addr_count = 0;
3313
3314        for (ty_domain, _sender) in self.service_queriers.iter() {
3315            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3316            if !refreshed_timers.is_empty() {
3317                trace!("sending refresh query for PTR: {}", ty_domain);
3318                self.send_query(ty_domain, RRType::PTR);
3319                query_ptr_count += 1;
3320                new_timers.extend(refreshed_timers);
3321            }
3322
3323            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3324            for (instance, types) in instances {
3325                trace!("sending refresh query for: {}", &instance);
3326                let query_vec = types
3327                    .into_iter()
3328                    .map(|ty| (instance.as_str(), ty))
3329                    .collect::<Vec<_>>();
3330                self.send_query_vec(&query_vec);
3331                query_srv_count += 1;
3332            }
3333            new_timers.extend(timers);
3334            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3335            for hostname in hostnames.iter() {
3336                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3337                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3338                query_addr_count += 2;
3339            }
3340            new_timers.extend(timers);
3341        }
3342
3343        for timer in new_timers {
3344            self.add_timer(timer);
3345        }
3346
3347        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3348        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3349        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3350    }
3351}
3352
3353/// Adds one or more answers of a service for incoming msg and RR entry name.
3354fn add_answer_of_service(
3355    out: &mut DnsOutgoing,
3356    msg: &DnsIncoming,
3357    entry_name: &str,
3358    service: &ServiceInfo,
3359    qtype: RRType,
3360    intf_addrs: Vec<IpAddr>,
3361) {
3362    if qtype == RRType::SRV || qtype == RRType::ANY {
3363        out.add_answer(
3364            msg,
3365            DnsSrv::new(
3366                entry_name,
3367                CLASS_IN | CLASS_CACHE_FLUSH,
3368                service.get_host_ttl(),
3369                service.get_priority(),
3370                service.get_weight(),
3371                service.get_port(),
3372                service.get_hostname().to_string(),
3373            ),
3374        );
3375    }
3376
3377    if qtype == RRType::TXT || qtype == RRType::ANY {
3378        out.add_answer(
3379            msg,
3380            DnsTxt::new(
3381                entry_name,
3382                CLASS_IN | CLASS_CACHE_FLUSH,
3383                service.get_other_ttl(),
3384                service.generate_txt(),
3385            ),
3386        );
3387    }
3388
3389    if qtype == RRType::SRV {
3390        for address in intf_addrs {
3391            out.add_additional_answer(DnsAddress::new(
3392                service.get_hostname(),
3393                ip_address_rr_type(&address),
3394                CLASS_IN | CLASS_CACHE_FLUSH,
3395                service.get_host_ttl(),
3396                address,
3397                InterfaceId::default(),
3398            ));
3399        }
3400    }
3401}
3402
3403/// All possible events sent to the client from the daemon
3404/// regarding service discovery.
3405#[derive(Clone, Debug)]
3406#[non_exhaustive]
3407pub enum ServiceEvent {
3408    /// Started searching for a service type.
3409    SearchStarted(String),
3410
3411    /// Found a specific (service_type, fullname).
3412    ServiceFound(String, String),
3413
3414    /// Resolved a service instance in a ResolvedService struct.
3415    ServiceResolved(Box<ResolvedService>),
3416
3417    /// A service instance (service_type, fullname) was removed.
3418    ServiceRemoved(String, String),
3419
3420    /// Stopped searching for a service type.
3421    SearchStopped(String),
3422}
3423
3424/// All possible events sent to the client from the daemon
3425/// regarding host resolution.
3426#[derive(Clone, Debug)]
3427#[non_exhaustive]
3428pub enum HostnameResolutionEvent {
3429    /// Started searching for the ip address of a hostname.
3430    SearchStarted(String),
3431    /// One or more addresses for a hostname has been found.
3432    AddressesFound(String, HashSet<ScopedIp>),
3433    /// One or more addresses for a hostname has been removed.
3434    AddressesRemoved(String, HashSet<ScopedIp>),
3435    /// The search for the ip address of a hostname has timed out.
3436    SearchTimeout(String),
3437    /// Stopped searching for the ip address of a hostname.
3438    SearchStopped(String),
3439}
3440
3441/// Some notable events from the daemon besides [`ServiceEvent`].
3442/// These events are expected to happen infrequently.
3443#[derive(Clone, Debug)]
3444#[non_exhaustive]
3445pub enum DaemonEvent {
3446    /// Daemon unsolicitly announced a service from an interface.
3447    Announce(String, String),
3448
3449    /// Daemon encountered an error.
3450    Error(Error),
3451
3452    /// Daemon detected a new IP address from the host.
3453    IpAdd(IpAddr),
3454
3455    /// Daemon detected a IP address removed from the host.
3456    IpDel(IpAddr),
3457
3458    /// Daemon resolved a name conflict by changing one of its names.
3459    /// see [DnsNameChange] for more details.
3460    NameChange(DnsNameChange),
3461
3462    /// Send out a multicast response via an interface.
3463    Respond(String),
3464}
3465
3466/// Represents a name change due to a name conflict resolution.
3467/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3468#[derive(Clone, Debug)]
3469pub struct DnsNameChange {
3470    /// The original name set in `ServiceInfo` by the user.
3471    pub original: String,
3472
3473    /// A new name is created by appending a suffix after the original name.
3474    ///
3475    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3476    /// - for a host name, the suffix is `-N`, where N starts at 2.
3477    ///
3478    /// For example:
3479    ///
3480    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3481    /// - Host name `foo.local.` becomes `foo-2.local.`
3482    pub new_name: String,
3483
3484    /// The resource record type
3485    pub rr_type: RRType,
3486
3487    /// The interface where the name conflict and its change happened.
3488    pub intf_name: String,
3489}
3490
3491/// Commands supported by the daemon
3492#[derive(Debug)]
3493enum Command {
3494    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3495    Browse(String, u32, bool, Sender<ServiceEvent>),
3496
3497    /// Resolve a hostname to IP addresses.
3498    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3499
3500    /// Register a service
3501    Register(ServiceInfo),
3502
3503    /// Unregister a service
3504    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3505
3506    /// Announce again a service to local network
3507    RegisterResend(String, u32), // (fullname)
3508
3509    /// Resend unregister packet.
3510    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
3511
3512    /// Stop browsing a service type
3513    StopBrowse(String), // (ty_domain)
3514
3515    /// Stop resolving a hostname
3516    StopResolveHostname(String), // (hostname)
3517
3518    /// Send query to resolve a service instance.
3519    /// This is used when a PTR record exists but SRV & TXT records are missing.
3520    Resolve(String, u16), // (service_instance_fullname, try_count)
3521
3522    /// Read the current values of the counters
3523    GetMetrics(Sender<Metrics>),
3524
3525    /// Get the current status of the daemon.
3526    GetStatus(Sender<DaemonStatus>),
3527
3528    /// Monitor noticeable events in the daemon.
3529    Monitor(Sender<DaemonEvent>),
3530
3531    SetOption(DaemonOption),
3532
3533    GetOption(Sender<DaemonOptionVal>),
3534
3535    /// Proactively confirm a DNS resource record.
3536    ///
3537    /// The intention is to check if a service name or IP address still valid
3538    /// before its TTL expires.
3539    Verify(String, Duration),
3540
3541    Exit(Sender<DaemonStatus>),
3542}
3543
3544impl fmt::Display for Command {
3545    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3546        match self {
3547            Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3548            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3549            Self::Exit(_) => write!(f, "Command Exit"),
3550            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3551            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3552            Self::Monitor(_) => write!(f, "Command Monitor"),
3553            Self::Register(_) => write!(f, "Command Register"),
3554            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3555            Self::SetOption(_) => write!(f, "Command SetOption"),
3556            Self::GetOption(_) => write!(f, "Command GetOption"),
3557            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3558            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3559            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3560            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3561            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3562            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3563        }
3564    }
3565}
3566
3567struct DaemonOptionVal {
3568    _service_name_len_max: u8,
3569    ip_check_interval: u64,
3570}
3571
3572#[derive(Debug)]
3573enum DaemonOption {
3574    ServiceNameLenMax(u8),
3575    IpCheckInterval(u64),
3576    EnableInterface(Vec<IfKind>),
3577    DisableInterface(Vec<IfKind>),
3578    MulticastLoopV4(bool),
3579    MulticastLoopV6(bool),
3580    AcceptUnsolicited(bool),
3581    #[cfg(test)]
3582    TestDownInterface(String),
3583    #[cfg(test)]
3584    TestUpInterface(String),
3585}
3586
3587/// The length of Service Domain name supported in this lib.
3588const DOMAIN_LEN: usize = "._tcp.local.".len();
3589
3590/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3591fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3592    if ty_domain.len() <= DOMAIN_LEN + 1 {
3593        // service name cannot be empty or only '_'.
3594        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3595    }
3596
3597    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3598    if service_name_len > limit as usize {
3599        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3600    }
3601    Ok(())
3602}
3603
3604/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3605fn check_domain_suffix(name: &str) -> Result<()> {
3606    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3607        return Err(e_fmt!(
3608            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3609            name
3610        ));
3611    }
3612
3613    Ok(())
3614}
3615
3616/// Validate the service name in a fully qualified name.
3617///
3618/// A Full Name = <Instance>.<Service>.<Domain>
3619/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3620///
3621/// Note: this function does not check for the length of the service name.
3622/// Instead, `register_service` method will check the length.
3623fn check_service_name(fullname: &str) -> Result<()> {
3624    check_domain_suffix(fullname)?;
3625
3626    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3627    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3628
3629    if &name[0..1] != "_" {
3630        return Err(e_fmt!("Service name must start with '_'"));
3631    }
3632
3633    let name = &name[1..];
3634
3635    if name.contains("--") {
3636        return Err(e_fmt!("Service name must not contain '--'"));
3637    }
3638
3639    if name.starts_with('-') || name.ends_with('-') {
3640        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3641    }
3642
3643    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3644    if ascii_count < 1 {
3645        return Err(e_fmt!(
3646            "Service name must contain at least one letter (eg: 'A-Za-z')"
3647        ));
3648    }
3649
3650    Ok(())
3651}
3652
3653/// Validate a hostname.
3654fn check_hostname(hostname: &str) -> Result<()> {
3655    if !hostname.ends_with(".local.") {
3656        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3657    }
3658
3659    if hostname == ".local." {
3660        return Err(e_fmt!(
3661            "The part of the hostname before '.local.' cannot be empty"
3662        ));
3663    }
3664
3665    if hostname.len() > 255 {
3666        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3667    }
3668
3669    Ok(())
3670}
3671
3672fn call_service_listener(
3673    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3674    ty_domain: &str,
3675    event: ServiceEvent,
3676) {
3677    if let Some(listener) = listeners_map.get(ty_domain) {
3678        match listener.send(event) {
3679            Ok(()) => trace!("Sent event to listener successfully"),
3680            Err(e) => debug!("Failed to send event: {}", e),
3681        }
3682    }
3683}
3684
3685fn call_hostname_resolution_listener(
3686    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3687    hostname: &str,
3688    event: HostnameResolutionEvent,
3689) {
3690    let hostname_lower = hostname.to_lowercase();
3691    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3692        match listener.send(event) {
3693            Ok(()) => trace!("Sent event to listener successfully"),
3694            Err(e) => debug!("Failed to send event: {}", e),
3695        }
3696    }
3697}
3698
3699/// Returns valid network interfaces in the host system.
3700/// Operational down interfaces are excluded.
3701/// Loopback interfaces are excluded if `with_loopback` is false.
3702fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3703    if_addrs::get_if_addrs()
3704        .unwrap_or_default()
3705        .into_iter()
3706        .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3707        .collect()
3708}
3709
3710/// Send an outgoing mDNS query or response, and returns the packet bytes.
3711/// Returns empty vec if no valid interface address is found.
3712fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3713    let if_name = &my_intf.name;
3714
3715    let if_addr = if sock.domain() == Domain::IPV4 {
3716        match my_intf.next_ifaddr_v4() {
3717            Some(addr) => addr,
3718            None => return vec![],
3719        }
3720    } else {
3721        match my_intf.next_ifaddr_v6() {
3722            Some(addr) => addr,
3723            None => return vec![],
3724        }
3725    };
3726
3727    send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3728}
3729
3730/// Send an outgoing mDNS query or response, and returns the packet bytes.
3731fn send_dns_outgoing_impl(
3732    out: &DnsOutgoing,
3733    if_name: &str,
3734    if_index: u32,
3735    if_addr: &IfAddr,
3736    sock: &PktInfoUdpSocket,
3737) -> Vec<Vec<u8>> {
3738    let qtype = if out.is_query() {
3739        "query"
3740    } else {
3741        if out.answers_count() == 0 && out.additionals().is_empty() {
3742            return vec![]; // no need to send empty response
3743        }
3744        "response"
3745    };
3746    trace!(
3747        "send {}: {} questions {} answers {} authorities {} additional",
3748        qtype,
3749        out.questions().len(),
3750        out.answers_count(),
3751        out.authorities().len(),
3752        out.additionals().len()
3753    );
3754
3755    match if_addr.ip() {
3756        IpAddr::V4(ipv4) => {
3757            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3758                debug!(
3759                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3760                    ipv4, e
3761                );
3762                return vec![]; // cannot send without a valid interface
3763            }
3764        }
3765        IpAddr::V6(ipv6) => {
3766            if let Err(e) = sock.set_multicast_if_v6(if_index) {
3767                debug!(
3768                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3769                    ipv6, e
3770                );
3771                return vec![]; // cannot send without a valid interface
3772            }
3773        }
3774    }
3775
3776    let packet_list = out.to_data_on_wire();
3777    for packet in packet_list.iter() {
3778        multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3779    }
3780    packet_list
3781}
3782
3783/// Sends a multicast packet, and returns the packet bytes.
3784fn multicast_on_intf(
3785    packet: &[u8],
3786    if_name: &str,
3787    if_index: u32,
3788    if_addr: &IfAddr,
3789    socket: &PktInfoUdpSocket,
3790) {
3791    if packet.len() > MAX_MSG_ABSOLUTE {
3792        debug!("Drop over-sized packet ({})", packet.len());
3793        return;
3794    }
3795
3796    let addr: SocketAddr = match if_addr {
3797        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3798        if_addrs::IfAddr::V6(_) => {
3799            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3800            sock.set_scope_id(if_index); // Choose iface for multicast
3801            sock.into()
3802        }
3803    };
3804
3805    // Sends out `packet` to `addr` on the socket.
3806    let sock_addr = addr.into();
3807    match socket.send_to(packet, &sock_addr) {
3808        Ok(sz) => trace!(
3809            "sent out {} bytes on interface {} (idx {}) addr {}",
3810            sz,
3811            if_name,
3812            if_index,
3813            if_addr.ip()
3814        ),
3815        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3816    }
3817}
3818
3819/// Returns true if `name` is a valid instance name of format:
3820/// <instance>.<service_type>.<_udp|_tcp>.local.
3821/// Note: <instance> could contain '.' as well.
3822fn valid_instance_name(name: &str) -> bool {
3823    name.split('.').count() >= 5
3824}
3825
3826fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3827    monitors.retain(|sender| {
3828        if let Err(e) = sender.try_send(event.clone()) {
3829            debug!("notify_monitors: try_send: {}", &e);
3830            if matches!(e, TrySendError::Disconnected(_)) {
3831                return false; // This monitor is dropped.
3832            }
3833        }
3834        true
3835    });
3836}
3837
3838/// Check if all unique records passed "probing", and if yes, create a packet
3839/// to announce the service.
3840fn prepare_announce(
3841    info: &ServiceInfo,
3842    intf: &MyIntf,
3843    dns_registry: &mut DnsRegistry,
3844    is_ipv4: bool,
3845) -> Option<DnsOutgoing> {
3846    let intf_addrs = if is_ipv4 {
3847        info.get_addrs_on_my_intf_v4(intf)
3848    } else {
3849        info.get_addrs_on_my_intf_v6(intf)
3850    };
3851
3852    if intf_addrs.is_empty() {
3853        debug!(
3854            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3855            &intf.name
3856        );
3857        return None;
3858    }
3859
3860    // check if we changed our name due to conflicts.
3861    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3862        Some(new_name) => new_name,
3863        None => info.get_fullname(),
3864    };
3865
3866    debug!(
3867        "prepare to announce service {service_fullname} on {:?}",
3868        &intf_addrs
3869    );
3870
3871    let mut probing_count = 0;
3872    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3873    let create_time = current_time_millis() + fastrand::u64(0..250);
3874
3875    out.add_answer_at_time(
3876        DnsPointer::new(
3877            info.get_type(),
3878            RRType::PTR,
3879            CLASS_IN,
3880            info.get_other_ttl(),
3881            service_fullname.to_string(),
3882        ),
3883        0,
3884    );
3885
3886    if let Some(sub) = info.get_subtype() {
3887        trace!("Adding subdomain {}", sub);
3888        out.add_answer_at_time(
3889            DnsPointer::new(
3890                sub,
3891                RRType::PTR,
3892                CLASS_IN,
3893                info.get_other_ttl(),
3894                service_fullname.to_string(),
3895            ),
3896            0,
3897        );
3898    }
3899
3900    // SRV records.
3901    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3902        Some(new_name) => new_name.to_string(),
3903        None => info.get_hostname().to_string(),
3904    };
3905
3906    let mut srv = DnsSrv::new(
3907        info.get_fullname(),
3908        CLASS_IN | CLASS_CACHE_FLUSH,
3909        info.get_host_ttl(),
3910        info.get_priority(),
3911        info.get_weight(),
3912        info.get_port(),
3913        hostname,
3914    );
3915
3916    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3917        srv.get_record_mut().set_new_name(new_name.to_string());
3918    }
3919
3920    if !info.requires_probe()
3921        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3922    {
3923        out.add_answer_at_time(srv, 0);
3924    } else {
3925        probing_count += 1;
3926    }
3927
3928    // TXT records.
3929
3930    let mut txt = DnsTxt::new(
3931        info.get_fullname(),
3932        CLASS_IN | CLASS_CACHE_FLUSH,
3933        info.get_other_ttl(),
3934        info.generate_txt(),
3935    );
3936
3937    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3938        txt.get_record_mut().set_new_name(new_name.to_string());
3939    }
3940
3941    if !info.requires_probe()
3942        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3943    {
3944        out.add_answer_at_time(txt, 0);
3945    } else {
3946        probing_count += 1;
3947    }
3948
3949    // Address records. (A and AAAA)
3950
3951    let hostname = info.get_hostname();
3952    for address in intf_addrs {
3953        let mut dns_addr = DnsAddress::new(
3954            hostname,
3955            ip_address_rr_type(&address),
3956            CLASS_IN | CLASS_CACHE_FLUSH,
3957            info.get_host_ttl(),
3958            address,
3959            intf.into(),
3960        );
3961
3962        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3963            dns_addr.get_record_mut().set_new_name(new_name.to_string());
3964        }
3965
3966        if !info.requires_probe()
3967            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3968        {
3969            out.add_answer_at_time(dns_addr, 0);
3970        } else {
3971            probing_count += 1;
3972        }
3973    }
3974
3975    if probing_count > 0 {
3976        return None;
3977    }
3978
3979    Some(out)
3980}
3981
3982/// Send an unsolicited response for owned service via `intf` and `sock`.
3983/// Returns true if sent out successfully for IPv4 or IPv6.
3984fn announce_service_on_intf(
3985    dns_registry: &mut DnsRegistry,
3986    info: &ServiceInfo,
3987    intf: &MyIntf,
3988    sock: &PktInfoUdpSocket,
3989) -> bool {
3990    let is_ipv4 = sock.domain() == Domain::IPV4;
3991    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
3992        send_dns_outgoing(&out, intf, sock);
3993        return true;
3994    }
3995
3996    false
3997}
3998
3999/// Returns a new name based on the `original` to avoid conflicts.
4000/// If the name already contains a number in parentheses, increments that number.
4001///
4002/// Examples:
4003/// - `foo.local.` becomes `foo (2).local.`
4004/// - `foo (2).local.` becomes `foo (3).local.`
4005/// - `foo (9)` becomes `foo (10)`
4006fn name_change(original: &str) -> String {
4007    let mut parts: Vec<_> = original.split('.').collect();
4008    let Some(first_part) = parts.get_mut(0) else {
4009        return format!("{original} (2)");
4010    };
4011
4012    let mut new_name = format!("{first_part} (2)");
4013
4014    // check if there is already has `(<num>)` suffix.
4015    if let Some(paren_pos) = first_part.rfind(" (") {
4016        // Check if there's a closing parenthesis
4017        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4018            let absolute_end_pos = paren_pos + end_paren;
4019            // Only process if the closing parenthesis is the last character
4020            if absolute_end_pos == first_part.len() - 1 {
4021                let num_start = paren_pos + 2; // Skip " ("
4022                                               // Try to parse the number between parentheses
4023                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4024                    let base_name = &first_part[..paren_pos];
4025                    new_name = format!("{} ({})", base_name, number + 1)
4026                }
4027            }
4028        }
4029    }
4030
4031    *first_part = &new_name;
4032    parts.join(".")
4033}
4034
4035/// Returns a new name based on the `original` to avoid conflicts.
4036/// If the name already contains a hyphenated number, increments that number.
4037///
4038/// Examples:
4039/// - `foo.local.` becomes `foo-2.local.`
4040/// - `foo-2.local.` becomes `foo-3.local.`
4041/// - `foo` becomes `foo-2`
4042fn hostname_change(original: &str) -> String {
4043    let mut parts: Vec<_> = original.split('.').collect();
4044    let Some(first_part) = parts.get_mut(0) else {
4045        return format!("{original}-2");
4046    };
4047
4048    let mut new_name = format!("{first_part}-2");
4049
4050    // check if there is already a `-<num>` suffix
4051    if let Some(hyphen_pos) = first_part.rfind('-') {
4052        // Try to parse everything after the hyphen as a number
4053        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4054            let base_name = &first_part[..hyphen_pos];
4055            new_name = format!("{}-{}", base_name, number + 1);
4056        }
4057    }
4058
4059    *first_part = &new_name;
4060    parts.join(".")
4061}
4062
4063fn add_answer_with_additionals(
4064    out: &mut DnsOutgoing,
4065    msg: &DnsIncoming,
4066    service: &ServiceInfo,
4067    intf: &MyIntf,
4068    dns_registry: &DnsRegistry,
4069    is_ipv4: bool,
4070) {
4071    let intf_addrs = if is_ipv4 {
4072        service.get_addrs_on_my_intf_v4(intf)
4073    } else {
4074        service.get_addrs_on_my_intf_v6(intf)
4075    };
4076    if intf_addrs.is_empty() {
4077        trace!("No addrs on LAN of intf {:?}", intf);
4078        return;
4079    }
4080
4081    // check if we changed our name due to conflicts.
4082    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4083        Some(new_name) => new_name,
4084        None => service.get_fullname(),
4085    };
4086
4087    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4088        Some(new_name) => new_name,
4089        None => service.get_hostname(),
4090    };
4091
4092    let ptr_added = out.add_answer(
4093        msg,
4094        DnsPointer::new(
4095            service.get_type(),
4096            RRType::PTR,
4097            CLASS_IN,
4098            service.get_other_ttl(),
4099            service_fullname.to_string(),
4100        ),
4101    );
4102
4103    if !ptr_added {
4104        trace!("answer was not added for msg {:?}", msg);
4105        return;
4106    }
4107
4108    if let Some(sub) = service.get_subtype() {
4109        trace!("Adding subdomain {}", sub);
4110        out.add_additional_answer(DnsPointer::new(
4111            sub,
4112            RRType::PTR,
4113            CLASS_IN,
4114            service.get_other_ttl(),
4115            service_fullname.to_string(),
4116        ));
4117    }
4118
4119    // Add recommended additional answers according to
4120    // https://tools.ietf.org/html/rfc6763#section-12.1.
4121    out.add_additional_answer(DnsSrv::new(
4122        service_fullname,
4123        CLASS_IN | CLASS_CACHE_FLUSH,
4124        service.get_host_ttl(),
4125        service.get_priority(),
4126        service.get_weight(),
4127        service.get_port(),
4128        hostname.to_string(),
4129    ));
4130
4131    out.add_additional_answer(DnsTxt::new(
4132        service_fullname,
4133        CLASS_IN | CLASS_CACHE_FLUSH,
4134        service.get_other_ttl(),
4135        service.generate_txt(),
4136    ));
4137
4138    for address in intf_addrs {
4139        out.add_additional_answer(DnsAddress::new(
4140            hostname,
4141            ip_address_rr_type(&address),
4142            CLASS_IN | CLASS_CACHE_FLUSH,
4143            service.get_host_ttl(),
4144            address,
4145            intf.into(),
4146        ));
4147    }
4148}
4149
4150/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4151/// that are finished.
4152fn check_probing(
4153    dns_registry: &mut DnsRegistry,
4154    timers: &mut BinaryHeap<Reverse<u64>>,
4155    now: u64,
4156) -> (DnsOutgoing, Vec<String>) {
4157    let mut expired_probes = Vec::new();
4158    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4159
4160    for (name, probe) in dns_registry.probing.iter_mut() {
4161        if now >= probe.next_send {
4162            if probe.expired(now) {
4163                // move the record to active
4164                expired_probes.push(name.clone());
4165            } else {
4166                out.add_question(name, RRType::ANY);
4167
4168                /*
4169                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4170                ...
4171                for tiebreaking to work correctly in all
4172                cases, the Authority Section must contain *all* the records and
4173                proposed rdata being probed for uniqueness.
4174                    */
4175                for record in probe.records.iter() {
4176                    out.add_authority(record.clone());
4177                }
4178
4179                probe.update_next_send(now);
4180
4181                // add timer
4182                timers.push(Reverse(probe.next_send));
4183            }
4184        }
4185    }
4186
4187    (out, expired_probes)
4188}
4189
4190/// Process expired probes on an interface and return a list of services
4191/// that are waiting for the probe to finish.
4192///
4193/// `DnsNameChange` events are sent to the monitors.
4194fn handle_expired_probes(
4195    expired_probes: Vec<String>,
4196    intf_name: &str,
4197    dns_registry: &mut DnsRegistry,
4198    monitors: &mut Vec<Sender<DaemonEvent>>,
4199) -> HashSet<String> {
4200    let mut waiting_services = HashSet::new();
4201
4202    for name in expired_probes {
4203        let Some(probe) = dns_registry.probing.remove(&name) else {
4204            continue;
4205        };
4206
4207        // send notifications about name changes
4208        for record in probe.records.iter() {
4209            if let Some(new_name) = record.get_record().get_new_name() {
4210                dns_registry
4211                    .name_changes
4212                    .insert(name.clone(), new_name.to_string());
4213
4214                let event = DnsNameChange {
4215                    original: record.get_record().get_original_name().to_string(),
4216                    new_name: new_name.to_string(),
4217                    rr_type: record.get_type(),
4218                    intf_name: intf_name.to_string(),
4219                };
4220                debug!("Name change event: {:?}", &event);
4221                notify_monitors(monitors, DaemonEvent::NameChange(event));
4222            }
4223        }
4224
4225        // move RR from probe to active.
4226        debug!(
4227            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4228            probe.records.len(),
4229            probe.waiting_services.len(),
4230        );
4231
4232        // Move records to active and plan to wake up services if records are not empty.
4233        if !probe.records.is_empty() {
4234            match dns_registry.active.get_mut(&name) {
4235                Some(records) => {
4236                    records.extend(probe.records);
4237                }
4238                None => {
4239                    dns_registry.active.insert(name, probe.records);
4240                }
4241            }
4242
4243            waiting_services.extend(probe.waiting_services);
4244        }
4245    }
4246
4247    waiting_services
4248}
4249
4250#[cfg(test)]
4251mod tests {
4252    use super::{
4253        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4254        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4255        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
4256        MDNS_PORT,
4257    };
4258    use crate::{
4259        dns_parser::{
4260            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4261            FLAGS_AA, FLAGS_QR_RESPONSE,
4262        },
4263        service_daemon::{add_answer_of_service, check_hostname},
4264    };
4265    use std::{
4266        net::{SocketAddr, SocketAddrV4},
4267        time::{Duration, SystemTime},
4268    };
4269    use test_log::test;
4270
4271    #[test]
4272    fn test_socketaddr_print() {
4273        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
4274        let print = format!("{}", addr);
4275        assert_eq!(print, "224.0.0.251:5353");
4276    }
4277
4278    #[test]
4279    fn test_instance_name() {
4280        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4281        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4282        assert!(!valid_instance_name("_printer._tcp.local."));
4283    }
4284
4285    #[test]
4286    fn test_check_service_name_length() {
4287        let result = check_service_name_length("_tcp", 100);
4288        assert!(result.is_err());
4289        if let Err(e) = result {
4290            println!("{}", e);
4291        }
4292    }
4293
4294    #[test]
4295    fn test_check_hostname() {
4296        // valid hostnames
4297        for hostname in &[
4298            "my_host.local.",
4299            &("A".repeat(255 - ".local.".len()) + ".local."),
4300        ] {
4301            let result = check_hostname(hostname);
4302            assert!(result.is_ok());
4303        }
4304
4305        // erroneous hostnames
4306        for hostname in &[
4307            "my_host.local",
4308            ".local.",
4309            &("A".repeat(256 - ".local.".len()) + ".local."),
4310        ] {
4311            let result = check_hostname(hostname);
4312            assert!(result.is_err());
4313            if let Err(e) = result {
4314                println!("{}", e);
4315            }
4316        }
4317    }
4318
4319    #[test]
4320    fn test_check_domain_suffix() {
4321        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4322        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4323        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4324        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4325        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4326        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4327    }
4328
4329    #[test]
4330    fn test_service_with_temporarily_invalidated_ptr() {
4331        // Create a daemon
4332        let d = ServiceDaemon::new().expect("Failed to create daemon");
4333
4334        let service = "_test_inval_ptr._udp.local.";
4335        let host_name = "my_host_tmp_invalidated_ptr.local.";
4336        let intfs: Vec<_> = my_ip_interfaces(false);
4337        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4338        let port = 5201;
4339        let my_service =
4340            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4341                .expect("invalid service info")
4342                .enable_addr_auto();
4343        let result = d.register(my_service.clone());
4344        assert!(result.is_ok());
4345
4346        // Browse for a service
4347        let browse_chan = d.browse(service).unwrap();
4348        let timeout = Duration::from_secs(2);
4349        let mut resolved = false;
4350
4351        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4352            match event {
4353                ServiceEvent::ServiceResolved(info) => {
4354                    resolved = true;
4355                    println!("Resolved a service of {}", &info.fullname);
4356                    break;
4357                }
4358                e => {
4359                    println!("Received event {:?}", e);
4360                }
4361            }
4362        }
4363
4364        assert!(resolved);
4365
4366        println!("Stopping browse of {}", service);
4367        // Pause browsing so restarting will cause a new immediate query.
4368        // Unregistering will not work here, it will invalidate all the records.
4369        d.stop_browse(service).unwrap();
4370
4371        // Ensure the search is stopped.
4372        // Reduces the chance of receiving an answer adding the ptr back to the
4373        // cache causing the later browse to return directly from the cache.
4374        // (which invalidates what this test is trying to test for.)
4375        let mut stopped = false;
4376        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4377            match event {
4378                ServiceEvent::SearchStopped(_) => {
4379                    stopped = true;
4380                    println!("Stopped browsing service");
4381                    break;
4382                }
4383                // Other `ServiceResolved` messages may be received
4384                // here as they come from different interfaces.
4385                // That's fine for this test.
4386                e => {
4387                    println!("Received event {:?}", e);
4388                }
4389            }
4390        }
4391
4392        assert!(stopped);
4393
4394        // Invalidate the ptr from the service to the host.
4395        let invalidate_ptr_packet = DnsPointer::new(
4396            my_service.get_type(),
4397            RRType::PTR,
4398            CLASS_IN,
4399            0,
4400            my_service.get_fullname().to_string(),
4401        );
4402
4403        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4404        packet_buffer.add_additional_answer(invalidate_ptr_packet);
4405
4406        for intf in intfs {
4407            let sock = _new_socket_bind(&intf, true).unwrap();
4408            send_dns_outgoing_impl(
4409                &packet_buffer,
4410                &intf.name,
4411                intf.index.unwrap_or(0),
4412                &intf.addr,
4413                &sock.pktinfo,
4414            );
4415        }
4416
4417        println!(
4418            "Sent PTR record invalidation. Starting second browse for {}",
4419            service
4420        );
4421
4422        // Restart the browse to force the sender to re-send the announcements.
4423        let browse_chan = d.browse(service).unwrap();
4424
4425        resolved = false;
4426        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4427            match event {
4428                ServiceEvent::ServiceResolved(info) => {
4429                    resolved = true;
4430                    println!("Resolved a service of {}", &info.fullname);
4431                    break;
4432                }
4433                e => {
4434                    println!("Received event {:?}", e);
4435                }
4436            }
4437        }
4438
4439        assert!(resolved);
4440        d.shutdown().unwrap();
4441    }
4442
4443    #[test]
4444    fn test_expired_srv() {
4445        // construct service info
4446        let service_type = "_expired-srv._udp.local.";
4447        let instance = "test_instance";
4448        let host_name = "expired_srv_host.local.";
4449        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4450            .unwrap()
4451            .enable_addr_auto();
4452        // let fullname = my_service.get_fullname().to_string();
4453
4454        // set SRV to expire soon.
4455        let new_ttl = 3; // for testing only.
4456        my_service._set_host_ttl(new_ttl);
4457
4458        // register my service
4459        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4460        let result = mdns_server.register(my_service);
4461        assert!(result.is_ok());
4462
4463        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4464        let browse_chan = mdns_client.browse(service_type).unwrap();
4465        let timeout = Duration::from_secs(2);
4466        let mut resolved = false;
4467
4468        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4469            match event {
4470                ServiceEvent::ServiceResolved(info) => {
4471                    resolved = true;
4472                    println!("Resolved a service of {}", &info.fullname);
4473                    break;
4474                }
4475                _ => {}
4476            }
4477        }
4478
4479        assert!(resolved);
4480
4481        // Exit the server so that no more responses.
4482        mdns_server.shutdown().unwrap();
4483
4484        // SRV record in the client cache will expire.
4485        let expire_timeout = Duration::from_secs(new_ttl as u64);
4486        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4487            match event {
4488                ServiceEvent::ServiceRemoved(service_type, full_name) => {
4489                    println!("Service removed: {}: {}", &service_type, &full_name);
4490                    break;
4491                }
4492                _ => {}
4493            }
4494        }
4495    }
4496
4497    #[test]
4498    fn test_hostname_resolution_address_removed() {
4499        // Create a mDNS server
4500        let server = ServiceDaemon::new().expect("Failed to create server");
4501        let hostname = "addr_remove_host._tcp.local.";
4502        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4503            .iter()
4504            .find(|iface| iface.ip().is_ipv4())
4505            .map(|iface| iface.ip().into())
4506            .unwrap();
4507
4508        let mut my_service = ServiceInfo::new(
4509            "_host_res_test._tcp.local.",
4510            "my_instance",
4511            hostname,
4512            &service_ip_addr.to_ip_addr(),
4513            1234,
4514            None,
4515        )
4516        .expect("invalid service info");
4517
4518        // Set a short TTL for addresses for testing.
4519        let addr_ttl = 2;
4520        my_service._set_host_ttl(addr_ttl); // Expire soon
4521
4522        server.register(my_service).unwrap();
4523
4524        // Create a mDNS client for resolving the hostname.
4525        let client = ServiceDaemon::new().expect("Failed to create client");
4526        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4527        let resolved = loop {
4528            match event_receiver.recv() {
4529                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4530                    assert!(found_hostname == hostname);
4531                    assert!(addresses.contains(&service_ip_addr));
4532                    println!("address found: {:?}", &addresses);
4533                    break true;
4534                }
4535                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4536                Ok(_event) => {}
4537                Err(_) => break false,
4538            }
4539        };
4540
4541        assert!(resolved);
4542
4543        // Shutdown the server so no more responses / refreshes for addresses.
4544        server.shutdown().unwrap();
4545
4546        // Wait till hostname address record expires, with 1 second grace period.
4547        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4548        let removed = loop {
4549            match event_receiver.recv_timeout(timeout) {
4550                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4551                    assert!(removed_host == hostname);
4552                    assert!(addresses.contains(&service_ip_addr));
4553
4554                    println!(
4555                        "address removed: hostname: {} addresses: {:?}",
4556                        &hostname, &addresses
4557                    );
4558                    break true;
4559                }
4560                Ok(_event) => {}
4561                Err(_) => {
4562                    break false;
4563                }
4564            }
4565        };
4566
4567        assert!(removed);
4568
4569        client.shutdown().unwrap();
4570    }
4571
4572    #[test]
4573    fn test_refresh_ptr() {
4574        // construct service info
4575        let service_type = "_refresh-ptr._udp.local.";
4576        let instance = "test_instance";
4577        let host_name = "refresh_ptr_host.local.";
4578        let service_ip_addr = my_ip_interfaces(false)
4579            .iter()
4580            .find(|iface| iface.ip().is_ipv4())
4581            .map(|iface| iface.ip())
4582            .unwrap();
4583
4584        let mut my_service = ServiceInfo::new(
4585            service_type,
4586            instance,
4587            host_name,
4588            &service_ip_addr,
4589            5023,
4590            None,
4591        )
4592        .unwrap();
4593
4594        let new_ttl = 3; // for testing only.
4595        my_service._set_other_ttl(new_ttl);
4596
4597        // register my service
4598        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4599        let result = mdns_server.register(my_service);
4600        assert!(result.is_ok());
4601
4602        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4603        let browse_chan = mdns_client.browse(service_type).unwrap();
4604        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4605        let mut resolved = false;
4606
4607        // resolve the service first.
4608        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4609            match event {
4610                ServiceEvent::ServiceResolved(info) => {
4611                    resolved = true;
4612                    println!("Resolved a service of {}", &info.fullname);
4613                    break;
4614                }
4615                _ => {}
4616            }
4617        }
4618
4619        assert!(resolved);
4620
4621        // wait over 80% of TTL, and refresh PTR should be sent out.
4622        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4623        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4624            println!("event: {:?}", &event);
4625        }
4626
4627        // verify refresh counter.
4628        let metrics_chan = mdns_client.get_metrics().unwrap();
4629        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4630        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4631        assert_eq!(ptr_refresh_counter, 1);
4632        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4633        assert_eq!(srvtxt_refresh_counter, 1);
4634
4635        // Exit the server so that no more responses.
4636        mdns_server.shutdown().unwrap();
4637        mdns_client.shutdown().unwrap();
4638    }
4639
4640    #[test]
4641    fn test_name_change() {
4642        assert_eq!(name_change("foo.local."), "foo (2).local.");
4643        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4644        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4645        assert_eq!(name_change("foo"), "foo (2)");
4646        assert_eq!(name_change("foo (2)"), "foo (3)");
4647        assert_eq!(name_change(""), " (2)");
4648
4649        // Additional edge cases
4650        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4651        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4652        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4653    }
4654
4655    #[test]
4656    fn test_hostname_change() {
4657        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4658        assert_eq!(hostname_change("foo"), "foo-2");
4659        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4660        assert_eq!(hostname_change("foo-9"), "foo-10");
4661        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4662    }
4663
4664    #[test]
4665    fn test_add_answer_txt_ttl() {
4666        // construct a simple service info
4667        let service_type = "_test_add_answer._udp.local.";
4668        let instance = "test_instance";
4669        let host_name = "add_answer_host.local.";
4670        let service_intf = my_ip_interfaces(false)
4671            .into_iter()
4672            .find(|iface| iface.ip().is_ipv4())
4673            .unwrap();
4674        let service_ip_addr = service_intf.ip();
4675        let my_service = ServiceInfo::new(
4676            service_type,
4677            instance,
4678            host_name,
4679            &service_ip_addr,
4680            5023,
4681            None,
4682        )
4683        .unwrap();
4684
4685        // construct a DnsOutgoing message
4686        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4687
4688        // Construct a dummy DnsIncoming message
4689        let mut dummy_data = out.to_data_on_wire();
4690        let interface_id = InterfaceId::from(&service_intf);
4691        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4692
4693        // Add an answer of TXT type for the service.
4694        let if_addrs = vec![service_intf.ip()];
4695        add_answer_of_service(
4696            &mut out,
4697            &incoming,
4698            instance,
4699            &my_service,
4700            RRType::TXT,
4701            if_addrs,
4702        );
4703
4704        // Check if the answer was added correctly
4705        assert!(
4706            out.answers_count() > 0,
4707            "No answers added to the outgoing message"
4708        );
4709
4710        // Check if the first answer is of type TXT
4711        let answer = out._answers().first().unwrap();
4712        assert_eq!(answer.0.get_type(), RRType::TXT);
4713
4714        // Check TTL is set properly for the TXT record
4715        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4716    }
4717
4718    #[test]
4719    fn test_interface_flip() {
4720        // start a server
4721        let ty_domain = "_intf-flip._udp.local.";
4722        let host_name = "intf_flip.local.";
4723        let now = SystemTime::now()
4724            .duration_since(SystemTime::UNIX_EPOCH)
4725            .unwrap();
4726        let instance_name = now.as_micros().to_string(); // Create a unique name.
4727        let port = 5200;
4728
4729        // Get a single IPv4 address
4730        let (ip_addr1, intf_name) = my_ip_interfaces(false)
4731            .iter()
4732            .find(|iface| iface.ip().is_ipv4())
4733            .map(|iface| (iface.ip(), iface.name.clone()))
4734            .unwrap();
4735
4736        println!("Using interface {} with IP {}", intf_name, ip_addr1);
4737
4738        // Register the service.
4739        let service1 =
4740            ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
4741                .expect("valid service info");
4742        let server1 = ServiceDaemon::new().expect("failed to start server");
4743        server1
4744            .register(service1)
4745            .expect("Failed to register service1");
4746
4747        // wait for the service announced.
4748        std::thread::sleep(Duration::from_secs(2));
4749
4750        // start a client
4751        let client = ServiceDaemon::new().expect("failed to start client");
4752
4753        let receiver = client.browse(ty_domain).unwrap();
4754
4755        let timeout = Duration::from_secs(3);
4756        let mut got_data = false;
4757
4758        while let Ok(event) = receiver.recv_timeout(timeout) {
4759            match event {
4760                ServiceEvent::ServiceResolved(_) => {
4761                    println!("Received ServiceResolved event");
4762                    got_data = true;
4763                    break;
4764                }
4765                _ => {}
4766            }
4767        }
4768
4769        assert!(got_data, "Should receive ServiceResolved event");
4770
4771        // Set a short IP check interval to detect interface changes quickly.
4772        client.set_ip_check_interval(1).unwrap();
4773
4774        // Now shutdown the interface and expect the client to lose the service.
4775        println!("Shutting down interface {}", &intf_name);
4776        client.test_down_interface(&intf_name).unwrap();
4777
4778        let mut got_removed = false;
4779
4780        while let Ok(event) = receiver.recv_timeout(timeout) {
4781            match event {
4782                ServiceEvent::ServiceRemoved(ty_domain, instance) => {
4783                    got_removed = true;
4784                    println!("removed: {ty_domain} : {instance}");
4785                    break;
4786                }
4787                _ => {}
4788            }
4789        }
4790        assert!(got_removed, "Should receive ServiceRemoved event");
4791
4792        println!("Bringing up interface {}", &intf_name);
4793        client.test_up_interface(&intf_name).unwrap();
4794        let mut got_data = false;
4795        while let Ok(event) = receiver.recv_timeout(timeout) {
4796            match event {
4797                ServiceEvent::ServiceResolved(resolved) => {
4798                    got_data = true;
4799                    println!("Received ServiceResolved: {:?}", resolved);
4800                    break;
4801                }
4802                _ => {}
4803            }
4804        }
4805        assert!(
4806            got_data,
4807            "Should receive ServiceResolved event after interface is back up"
4808        );
4809
4810        server1.shutdown().unwrap();
4811        client.shutdown().unwrap();
4812    }
4813
4814    #[test]
4815    fn test_cache_only() {
4816        // construct service info
4817        let service_type = "_cache_only._udp.local.";
4818        let instance = "test_instance";
4819        let host_name = "cache_only_host.local.";
4820        let service_ip_addr = my_ip_interfaces(false)
4821            .iter()
4822            .find(|iface| iface.ip().is_ipv4())
4823            .map(|iface| iface.ip())
4824            .unwrap();
4825
4826        let mut my_service = ServiceInfo::new(
4827            service_type,
4828            instance,
4829            host_name,
4830            &service_ip_addr,
4831            5023,
4832            None,
4833        )
4834        .unwrap();
4835
4836        let new_ttl = 3; // for testing only.
4837        my_service._set_other_ttl(new_ttl);
4838
4839        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4840
4841        // make a single browse request to record that we are interested in the service.  This ensures that
4842        // subsequent announcements are cached.
4843        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4844        std::thread::sleep(Duration::from_secs(2));
4845
4846        // register my service
4847        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4848        let result = mdns_server.register(my_service);
4849        assert!(result.is_ok());
4850
4851        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4852        let mut resolved = false;
4853
4854        // resolve the service.
4855        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4856            match event {
4857                ServiceEvent::ServiceResolved(info) => {
4858                    resolved = true;
4859                    println!("Resolved a service of {}", &info.get_fullname());
4860                    break;
4861                }
4862                _ => {}
4863            }
4864        }
4865
4866        assert!(resolved);
4867
4868        // Exit the server so that no more responses.
4869        mdns_server.shutdown().unwrap();
4870        mdns_client.shutdown().unwrap();
4871    }
4872
4873    #[test]
4874    fn test_cache_only_unsolicited() {
4875        // construct service info
4876        let service_type = "_cache_only._udp.local.";
4877        let instance = "test_instance";
4878        let host_name = "cache_only_host.local.";
4879        let service_ip_addr = my_ip_interfaces(false)
4880            .iter()
4881            .find(|iface| iface.ip().is_ipv4())
4882            .map(|iface| iface.ip())
4883            .unwrap();
4884
4885        let mut my_service = ServiceInfo::new(
4886            service_type,
4887            instance,
4888            host_name,
4889            &service_ip_addr,
4890            5023,
4891            None,
4892        )
4893        .unwrap();
4894
4895        let new_ttl = 3; // for testing only.
4896        my_service._set_other_ttl(new_ttl);
4897
4898        // register my service
4899        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4900        let result = mdns_server.register(my_service);
4901        assert!(result.is_ok());
4902
4903        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4904        mdns_client.accept_unsolicited(true).unwrap();
4905
4906        // Wait a bit for the service announcements to go out, before calling browse_cache.  This ensures
4907        // that the announcements are treated as unsolicited
4908        std::thread::sleep(Duration::from_secs(2));
4909        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4910        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4911        let mut resolved = false;
4912
4913        // resolve the service.
4914        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4915            match event {
4916                ServiceEvent::ServiceResolved(info) => {
4917                    resolved = true;
4918                    println!("Resolved a service of {}", &info.get_fullname());
4919                    break;
4920                }
4921                _ => {}
4922            }
4923        }
4924
4925        assert!(resolved);
4926
4927        // Exit the server so that no more responses.
4928        mdns_server.shutdown().unwrap();
4929        mdns_client.shutdown().unwrap();
4930    }
4931}