mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42    Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50    cmp::{self, Reverse},
51    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52    fmt, io,
53    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54    str, thread,
55    time::Duration,
56    vec,
57};
58
59/// The default max length of the service name without domain, not including the
60/// leading underscore (`_`). It is set to 15 per
61/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
62pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64/// The default interval for checking IP changes automatically.
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
68/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
69pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78/// Response status code for the service `unregister` call.
79#[derive(Debug)]
80pub enum UnregisterStatus {
81    /// Unregister was successful.
82    OK,
83    /// The service was not found in the registration.
84    NotFound,
85}
86
87/// Status code for the service daemon.
88#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91    /// The daemon is running as normal.
92    Running,
93
94    /// The daemon has been shutdown.
95    Shutdown,
96}
97
98/// Different counters included in the metrics.
99/// Currently all counters are for outgoing packets.
100#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102    Register,
103    RegisterResend,
104    Unregister,
105    UnregisterResend,
106    Browse,
107    ResolveHostname,
108    Respond,
109    CacheRefreshPTR,
110    CacheRefreshSrvTxt,
111    CacheRefreshAddr,
112    KnownAnswerSuppression,
113    CachedPTR,
114    CachedSRV,
115    CachedAddr,
116    CachedTxt,
117    CachedNSec,
118    CachedSubtype,
119    DnsRegistryProbe,
120    DnsRegistryActive,
121    DnsRegistryTimer,
122    DnsRegistryNameChange,
123    Timer,
124}
125
126impl fmt::Display for Counter {
127    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128        match self {
129            Self::Register => write!(f, "register"),
130            Self::RegisterResend => write!(f, "register-resend"),
131            Self::Unregister => write!(f, "unregister"),
132            Self::UnregisterResend => write!(f, "unregister-resend"),
133            Self::Browse => write!(f, "browse"),
134            Self::ResolveHostname => write!(f, "resolve-hostname"),
135            Self::Respond => write!(f, "respond"),
136            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140            Self::CachedPTR => write!(f, "cached-ptr"),
141            Self::CachedSRV => write!(f, "cached-srv"),
142            Self::CachedAddr => write!(f, "cached-addr"),
143            Self::CachedTxt => write!(f, "cached-txt"),
144            Self::CachedNSec => write!(f, "cached-nsec"),
145            Self::CachedSubtype => write!(f, "cached-subtype"),
146            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150            Self::Timer => write!(f, "timer"),
151        }
152    }
153}
154
155/// A wrapper around UDP socket used by the mDNS daemon.
156///
157/// We do this because `mio` does not support PKTINFO and
158/// does not provide a way to implement `Source` trait directly and safely.
159struct MyUdpSocket {
160    /// The underlying socket that supports control messages like
161    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
162    pktinfo: PktInfoUdpSocket,
163
164    /// The mio UDP socket that is a clone of `pktinfo` and
165    /// is used for event polling.
166    mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171        let std_sock = pktinfo.try_clone_std()?;
172        let mio = MioUdpSocket::from_std(std_sock);
173
174        Ok(Self { pktinfo, mio })
175    }
176}
177
178/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
179impl Source for MyUdpSocket {
180    fn register(
181        &mut self,
182        registry: &Registry,
183        token: Token,
184        interests: Interest,
185    ) -> io::Result<()> {
186        self.mio.register(registry, token, interests)
187    }
188
189    fn reregister(
190        &mut self,
191        registry: &Registry,
192        token: Token,
193        interests: Interest,
194    ) -> io::Result<()> {
195        self.mio.reregister(registry, token, interests)
196    }
197
198    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199        self.mio.deregister(registry)
200    }
201}
202
203/// The metrics is a HashMap of (name_key, i64_value).
204/// The main purpose is to help monitoring the mDNS packet traffic.
205pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
208const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
209const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
210
211/// A daemon thread for mDNS
212///
213/// This struct provides a handle and an API to the daemon. It is cloneable.
214#[derive(Clone)]
215pub struct ServiceDaemon {
216    /// Sender handle of the channel to the daemon.
217    sender: Sender<Command>,
218
219    /// Send to this addr to signal that a `Command` is coming.
220    ///
221    /// The daemon listens on this addr together with other mDNS sockets,
222    /// to avoid busy polling the flume channel. If there is a way to poll
223    /// the channel and mDNS sockets together, then this can be removed.
224    signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228    /// Creates a new daemon and spawns a thread to run the daemon.
229    ///
230    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
231    /// ask callers to set the port.
232    pub fn new() -> Result<Self> {
233        // Use port 0 to allow the system assign a random available port,
234        // no need for a pre-defined port number.
235        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237        let signal_sock = UdpSocket::bind(signal_addr)
238            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240        // Get the socket with the OS chosen port
241        let signal_addr = signal_sock
242            .local_addr()
243            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245        // Must be nonblocking so we can listen to it together with mDNS sockets.
246        signal_sock
247            .set_nonblocking(true)
248            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252        let (sender, receiver) = bounded(100);
253
254        // Spawn the daemon thread
255        let mio_sock = MioUdpSocket::from_std(signal_sock);
256        thread::Builder::new()
257            .name("mDNS_daemon".to_string())
258            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261        Ok(Self {
262            sender,
263            signal_addr,
264        })
265    }
266
267    /// Sends `cmd` to the daemon via its channel, and sends a signal
268    /// to its sock addr to notify.
269    fn send_cmd(&self, cmd: Command) -> Result<()> {
270        let cmd_name = cmd.to_string();
271
272        // First, send to the flume channel.
273        self.sender.try_send(cmd).map_err(|e| match e {
274            TrySendError::Full(_) => Error::Again,
275            e => e_fmt!("flume::channel::send failed: {}", e),
276        })?;
277
278        // Second, send a signal to notify the daemon.
279        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280        let socket = UdpSocket::bind(addr)
281            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282        socket
283            .send_to(cmd_name.as_bytes(), self.signal_addr)
284            .map_err(|e| {
285                e_fmt!(
286                    "signal socket send_to {} ({}) failed: {}",
287                    self.signal_addr,
288                    cmd_name,
289                    e
290                )
291            })?;
292
293        Ok(())
294    }
295
296    /// Starts browsing for a specific service type.
297    ///
298    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
299    ///
300    /// Returns a channel `Receiver` to receive events about the service. The caller
301    /// can call `.recv_async().await` on this receiver to handle events in an
302    /// async environment or call `.recv()` in a sync environment.
303    ///
304    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
305    /// finding more details, i.e. SRV records and TXT records.
306    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307        check_domain_suffix(service_type)?;
308
309        let (resp_s, resp_r) = bounded(10);
310        self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
311        Ok(resp_r)
312    }
313
314    /// Preforms a "cache-only" browse.
315    ///
316    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
317    ///
318    /// The functionality is identical to 'browse', but the service events are based solely on the contents
319    /// of the daemon's cache. No actual mDNS query is sent to the network.
320    ///
321    /// See [accept_unsolicited](Self::accept_unsolicited) if you want to do cache-only browsing.
322    pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
323        check_domain_suffix(service_type)?;
324
325        let (resp_s, resp_r) = bounded(10);
326        self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
327        Ok(resp_r)
328    }
329
330    /// Stops searching for a specific service type.
331    ///
332    /// When an error is returned, the caller should retry only when
333    /// the error is `Error::Again`, otherwise should log and move on.
334    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
335        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
336    }
337
338    /// Starts querying for the ip addresses of a hostname.
339    ///
340    /// Returns a channel `Receiver` to receive events about the hostname.
341    /// The caller can call `.recv_async().await` on this receiver to handle events in an
342    /// async environment or call `.recv()` in a sync environment.
343    ///
344    /// The `timeout` is specified in milliseconds.
345    pub fn resolve_hostname(
346        &self,
347        hostname: &str,
348        timeout: Option<u64>,
349    ) -> Result<Receiver<HostnameResolutionEvent>> {
350        check_hostname(hostname)?;
351        let (resp_s, resp_r) = bounded(10);
352        self.send_cmd(Command::ResolveHostname(
353            hostname.to_string(),
354            1,
355            resp_s,
356            timeout,
357        ))?;
358        Ok(resp_r)
359    }
360
361    /// Stops querying for the ip addresses of a hostname.
362    ///
363    /// When an error is returned, the caller should retry only when
364    /// the error is `Error::Again`, otherwise should log and move on.
365    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
366        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
367    }
368
369    /// Registers a service provided by this host.
370    ///
371    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
372    /// this method will automatically fill in addresses from the host.
373    ///
374    /// To re-announce a service with an updated `service_info`, just call
375    /// this `register` function again. No need to call `unregister` first.
376    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
377        check_service_name(service_info.get_fullname())?;
378        check_hostname(service_info.get_hostname())?;
379
380        self.send_cmd(Command::Register(service_info))
381    }
382
383    /// Unregisters a service. This is a graceful shutdown of a service.
384    ///
385    /// Returns a channel receiver that is used to receive the status code
386    /// of the unregister.
387    ///
388    /// When an error is returned, the caller should retry only when
389    /// the error is `Error::Again`, otherwise should log and move on.
390    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
391        let (resp_s, resp_r) = bounded(1);
392        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
393        Ok(resp_r)
394    }
395
396    /// Starts to monitor events from the daemon.
397    ///
398    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
399    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
400        let (resp_s, resp_r) = bounded(100);
401        self.send_cmd(Command::Monitor(resp_s))?;
402        Ok(resp_r)
403    }
404
405    /// Shuts down the daemon thread and returns a channel to receive the status.
406    ///
407    /// When an error is returned, the caller should retry only when
408    /// the error is `Error::Again`, otherwise should log and move on.
409    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
410        let (resp_s, resp_r) = bounded(1);
411        self.send_cmd(Command::Exit(resp_s))?;
412        Ok(resp_r)
413    }
414
415    /// Returns the status of the daemon.
416    ///
417    /// When an error is returned, the caller should retry only when
418    /// the error is `Error::Again`, otherwise should consider the daemon
419    /// stopped working and move on.
420    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
421        let (resp_s, resp_r) = bounded(1);
422
423        if self.sender.is_disconnected() {
424            resp_s
425                .send(DaemonStatus::Shutdown)
426                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
427        } else {
428            self.send_cmd(Command::GetStatus(resp_s))?;
429        }
430
431        Ok(resp_r)
432    }
433
434    /// Returns a channel receiver for the metrics, e.g. input/output counters.
435    ///
436    /// The metrics returned is a snapshot. Hence the caller should call
437    /// this method repeatedly if they want to monitor the metrics continuously.
438    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
439        let (resp_s, resp_r) = bounded(1);
440        self.send_cmd(Command::GetMetrics(resp_s))?;
441        Ok(resp_r)
442    }
443
444    /// Change the max length allowed for a service name.
445    ///
446    /// As RFC 6763 defines a length max for a service name, a user should not call
447    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
448    ///
449    /// `len_max` is capped at an internal limit, which is currently 30.
450    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
451        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
452
453        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
454            return Err(Error::Msg(format!(
455                "service name length max {len_max} is too large"
456            )));
457        }
458
459        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
460    }
461
462    /// Change the interval for checking IP changes automatically.
463    ///
464    /// Setting the interval to 0 disables the IP check.
465    ///
466    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
467    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
468        let interval_in_millis = interval_in_secs as u64 * 1000;
469        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
470            interval_in_millis,
471        )))
472    }
473
474    /// Get the current interval in seconds for checking IP changes automatically.
475    pub fn get_ip_check_interval(&self) -> Result<u32> {
476        let (resp_s, resp_r) = bounded(1);
477        self.send_cmd(Command::GetOption(resp_s))?;
478
479        let option = resp_r
480            .recv_timeout(Duration::from_secs(10))
481            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
482        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
483        Ok(ip_check_interval_in_secs as u32)
484    }
485
486    /// Include interfaces that match `if_kind` for this service daemon.
487    ///
488    /// For example:
489    /// ```ignore
490    ///     daemon.enable_interface("en0")?;
491    /// ```
492    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
493        let if_kind_vec = if_kind.into_vec();
494        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
495            if_kind_vec.kinds,
496        )))
497    }
498
499    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
500    ///
501    /// For example:
502    /// ```ignore
503    ///     daemon.disable_interface(IfKind::IPv6)?;
504    /// ```
505    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
506        let if_kind_vec = if_kind.into_vec();
507        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
508            if_kind_vec.kinds,
509        )))
510    }
511
512    /// If `accept` is true, accept and cache all responses, even if there is no active querier
513    /// for a given service type. This is useful / necessary when doing cache-only browsing. See
514    /// [browse_cache](Self::browse_cache).
515    ///
516    /// If `accept` is false (default), accept only responses matching queries that we have initiated.
517    ///
518    /// For example:
519    /// ```ignore
520    ///     daemon.accept_unsolicited(true)?;
521    /// ```
522    pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
523        self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
524    }
525
526    #[cfg(test)]
527    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
528        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
529            ifname.to_string(),
530        )))
531    }
532
533    #[cfg(test)]
534    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
535        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
536            ifname.to_string(),
537        )))
538    }
539
540    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
541    ///
542    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
543    /// receive announcements from a responder on the same host.
544    ///
545    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
546    ///
547    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
548    /// the UNIX version of the IP_MULTICAST_LOOP option:
549    ///
550    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
551    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
552    ///
553    /// Which means, in order NOT to receive localhost announcements, you want to call
554    /// this API on the querier side on Windows, but on the responder side on Unix.
555    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
556        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
557    }
558
559    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
560    ///
561    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
562    /// receive announcements from a responder on the same host.
563    ///
564    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
565    ///
566    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
567    /// the UNIX version of the IP_MULTICAST_LOOP option:
568    ///
569    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
570    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
571    ///
572    /// Which means, in order NOT to receive localhost announcements, you want to call
573    /// this API on the querier side on Windows, but on the responder side on Unix.
574    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
575        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
576    }
577
578    /// Proactively confirms whether a service instance still valid.
579    ///
580    /// This call will issue queries for a service instance's SRV record and Address records.
581    ///
582    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
583    /// unless there is a reason not to follow RFC.
584    ///
585    /// If no response is received within `timeout`, the current resource
586    /// records will be flushed, and if needed, `ServiceRemoved` event will be
587    /// sent to active queriers.
588    ///
589    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
590    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
591        self.send_cmd(Command::Verify(instance_fullname, timeout))
592    }
593
594    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
595        let mut zc = Zeroconf::new(signal_sock, poller);
596
597        if let Some(cmd) = zc.run(receiver) {
598            match cmd {
599                Command::Exit(resp_s) => {
600                    // It is guaranteed that the receiver already dropped,
601                    // i.e. the daemon command channel closed.
602                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
603                        debug!("exit: failed to send response of shutdown: {}", e);
604                    }
605                }
606                _ => {
607                    debug!("Unexpected command: {:?}", cmd);
608                }
609            }
610        }
611    }
612}
613
614/// Creates a new UDP socket that uses `intf` to send and recv multicast.
615fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
616    // Use the same socket for receiving and sending multicast packets.
617    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
618    let intf_ip = &intf.ip();
619    match intf_ip {
620        IpAddr::V4(ip) => {
621            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
622            let sock = new_socket(addr.into(), true)?;
623
624            // Join mDNS group to receive packets.
625            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
626                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
627
628            // Set IP_MULTICAST_IF to send packets.
629            sock.set_multicast_if_v4(ip)
630                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
631
632            // Per RFC 6762 section 11:
633            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
634            // be sent with IP TTL set to 255."
635            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
636            sock.set_multicast_ttl_v4(255)
637                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
638
639            if !should_loop {
640                sock.set_multicast_loop_v4(false)
641                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
642            }
643
644            // Test if we can send packets successfully.
645            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
646            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
647            for packet in test_packets {
648                sock.send_to(&packet, &multicast_addr)
649                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
650            }
651            MyUdpSocket::new(sock)
652                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
653        }
654        IpAddr::V6(ip) => {
655            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
656            let sock = new_socket(addr.into(), true)?;
657
658            let if_index = intf.index.unwrap_or(0);
659
660            // Join mDNS group to receive packets.
661            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
662                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
663
664            // Set IPV6_MULTICAST_IF to send packets.
665            sock.set_multicast_if_v6(if_index)
666                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
667
668            // We are not sending multicast packets to test this socket as there might
669            // be many IPv6 interfaces on a host and could cause such send error:
670            // "No buffer space available (os error 55)".
671
672            MyUdpSocket::new(sock)
673                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
674        }
675    }
676}
677
678/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
679/// `non_block` indicates whether to set O_NONBLOCK for the socket.
680fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
681    let domain = match addr {
682        SocketAddr::V4(_) => socket2::Domain::IPV4,
683        SocketAddr::V6(_) => socket2::Domain::IPV6,
684    };
685
686    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
687
688    fd.set_reuse_address(true)
689        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
690    #[cfg(unix)] // this is currently restricted to Unix's in socket2
691    fd.set_reuse_port(true)
692        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
693
694    if non_block {
695        fd.set_nonblocking(true)
696            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
697    }
698
699    fd.bind(&addr.into())
700        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
701
702    trace!("new socket bind to {}", &addr);
703    Ok(fd)
704}
705
706/// Specify a UNIX timestamp in millis to run `command` for the next time.
707struct ReRun {
708    /// UNIX timestamp in millis.
709    next_time: u64,
710    command: Command,
711}
712
713/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
714///
715/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
716#[derive(Debug, Clone)]
717#[non_exhaustive]
718pub enum IfKind {
719    /// All interfaces.
720    All,
721
722    /// All IPv4 interfaces.
723    IPv4,
724
725    /// All IPv6 interfaces.
726    IPv6,
727
728    /// By the interface name, for example "en0"
729    Name(String),
730
731    /// By an IPv4 or IPv6 address.
732    Addr(IpAddr),
733
734    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
735    ///
736    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
737    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
738    LoopbackV4,
739
740    /// ::1/128, disabled by default.
741    LoopbackV6,
742}
743
744impl IfKind {
745    /// Checks if `intf` matches with this interface kind.
746    fn matches(&self, intf: &Interface) -> bool {
747        match self {
748            Self::All => true,
749            Self::IPv4 => intf.ip().is_ipv4(),
750            Self::IPv6 => intf.ip().is_ipv6(),
751            Self::Name(ifname) => ifname == &intf.name,
752            Self::Addr(addr) => addr == &intf.ip(),
753            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
754            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
755        }
756    }
757}
758
759/// The first use case of specifying an interface was to
760/// use an interface name. Hence adding this for ergonomic reasons.
761impl From<&str> for IfKind {
762    fn from(val: &str) -> Self {
763        Self::Name(val.to_string())
764    }
765}
766
767impl From<&String> for IfKind {
768    fn from(val: &String) -> Self {
769        Self::Name(val.to_string())
770    }
771}
772
773/// Still for ergonomic reasons.
774impl From<IpAddr> for IfKind {
775    fn from(val: IpAddr) -> Self {
776        Self::Addr(val)
777    }
778}
779
780/// A list of `IfKind` that can be used to match interfaces.
781pub struct IfKindVec {
782    kinds: Vec<IfKind>,
783}
784
785/// A trait that converts a type into a Vec of `IfKind`.
786pub trait IntoIfKindVec {
787    fn into_vec(self) -> IfKindVec;
788}
789
790impl<T: Into<IfKind>> IntoIfKindVec for T {
791    fn into_vec(self) -> IfKindVec {
792        let if_kind: IfKind = self.into();
793        IfKindVec {
794            kinds: vec![if_kind],
795        }
796    }
797}
798
799impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
800    fn into_vec(self) -> IfKindVec {
801        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
802        IfKindVec { kinds }
803    }
804}
805
806/// Selection of interfaces.
807struct IfSelection {
808    /// The interfaces to be selected.
809    if_kind: IfKind,
810
811    /// Whether the `if_kind` should be enabled or not.
812    selected: bool,
813}
814
815/// A struct holding the state. It was inspired by `zeroconf` package in Python.
816struct Zeroconf {
817    /// Local interfaces keyed by interface index.
818    my_intfs: HashMap<u32, MyIntf>,
819
820    /// A common socket for IPv4 interfaces.
821    ipv4_sock: MyUdpSocket,
822
823    /// A common socket for IPv6 interfaces.
824    ipv6_sock: MyUdpSocket,
825
826    /// Local registered services, keyed by service full names.
827    my_services: HashMap<String, ServiceInfo>,
828
829    /// Received DNS records.
830    cache: DnsCache,
831
832    /// Registered service records, keyed by interface index.
833    dns_registry_map: HashMap<u32, DnsRegistry>,
834
835    /// Active "Browse" commands.
836    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
837
838    /// Active "ResolveHostname" commands.
839    ///
840    /// The timestamps are set at the future timestamp when the command should timeout.
841    /// `hostname` is case-insensitive and stored in lowercase.
842    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
843
844    /// All repeating transmissions.
845    retransmissions: Vec<ReRun>,
846
847    counters: Metrics,
848
849    /// Waits for incoming packets.
850    poller: Poll,
851
852    /// Channels to notify events.
853    monitors: Vec<Sender<DaemonEvent>>,
854
855    /// Options
856    service_name_len_max: u8,
857
858    /// Interval in millis to check IP address changes.
859    ip_check_interval: u64,
860
861    /// All interface selections called to the daemon.
862    if_selections: Vec<IfSelection>,
863
864    /// Socket for signaling.
865    signal_sock: MioUdpSocket,
866
867    /// Timestamps marking where we need another iteration of the run loop,
868    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
869    ///
870    /// When the run loop goes through a single iteration, it will
871    /// set its timeout to the earliest timer in this list.
872    timers: BinaryHeap<Reverse<u64>>,
873
874    status: DaemonStatus,
875
876    /// Service instances that are pending for resolving SRV and TXT.
877    pending_resolves: HashSet<String>,
878
879    /// Service instances that are already resolved.
880    resolved: HashSet<String>,
881
882    multicast_loop_v4: bool,
883
884    multicast_loop_v6: bool,
885
886    accept_unsolicited: bool,
887
888    #[cfg(test)]
889    test_down_interfaces: HashSet<String>,
890}
891
892/// Join the multicast group for the given interface.
893fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
894    let intf_ip = &intf.ip();
895    match intf_ip {
896        IpAddr::V4(ip) => {
897            // Join mDNS group to receive packets.
898            debug!("join multicast group V4 on addr {}", ip);
899            my_sock
900                .join_multicast_v4(&GROUP_ADDR_V4, ip)
901                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
902        }
903        IpAddr::V6(ip) => {
904            let if_index = intf.index.unwrap_or(0);
905            // Join mDNS group to receive packets.
906            debug!(
907                "join multicast group V6 on addr {} with index {}",
908                ip, if_index
909            );
910            my_sock
911                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
912                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
913        }
914    }
915    Ok(())
916}
917
918impl Zeroconf {
919    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
920        // Get interfaces.
921        let my_ifaddrs = my_ip_interfaces(false);
922
923        // Create a socket for every IP addr.
924        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
925        // or the same interface name with different IP addrs.
926        let mut my_intfs = HashMap::new();
927        let mut dns_registry_map = HashMap::new();
928
929        // Use the same socket for receiving and sending multicast packets.
930        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
931        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
932        let sock = new_socket(addr.into(), true).unwrap();
933
934        // Per RFC 6762 section 11:
935        // "All Multicast DNS responses (including responses sent via unicast) SHOULD
936        // be sent with IP TTL set to 255."
937        // Here we set the TTL to 255 for multicast as we don't support unicast yet.
938        sock.set_multicast_ttl_v4(255)
939            .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
940            .unwrap();
941
942        // This clones a socket.
943        let ipv4_sock = MyUdpSocket::new(sock).unwrap();
944
945        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
946        let sock = new_socket(addr.into(), true).unwrap();
947
948        // Per RFC 6762 section 11:
949        // "All Multicast DNS responses (including responses sent via unicast) SHOULD
950        // be sent with IP TTL set to 255."
951        sock.set_multicast_hops_v6(255)
952            .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
953            .unwrap();
954
955        // This clones the ipv6 socket.
956        let ipv6_sock = MyUdpSocket::new(sock).unwrap();
957
958        // Configure sockets to join multicast groups.
959        for intf in my_ifaddrs {
960            let sock = if intf.ip().is_ipv4() {
961                &ipv4_sock
962            } else {
963                &ipv6_sock
964            };
965
966            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
967                debug!(
968                    "config socket to join multicast: {}: {e}. Skipped.",
969                    &intf.ip()
970                );
971            }
972
973            let if_index = intf.index.unwrap_or(0);
974
975            // Add this interface address if not already present.
976            dns_registry_map
977                .entry(if_index)
978                .or_insert_with(DnsRegistry::new);
979
980            my_intfs
981                .entry(if_index)
982                .and_modify(|v: &mut MyIntf| {
983                    v.addrs.insert(intf.addr.clone());
984                })
985                .or_insert(MyIntf {
986                    name: intf.name.clone(),
987                    index: if_index,
988                    addrs: HashSet::from([intf.addr]),
989                });
990        }
991
992        let monitors = Vec::new();
993        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
994        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
995
996        let timers = BinaryHeap::new();
997
998        // Disable loopback by default.
999        let if_selections = vec![
1000            IfSelection {
1001                if_kind: IfKind::LoopbackV4,
1002                selected: false,
1003            },
1004            IfSelection {
1005                if_kind: IfKind::LoopbackV6,
1006                selected: false,
1007            },
1008        ];
1009
1010        let status = DaemonStatus::Running;
1011
1012        Self {
1013            my_intfs,
1014            ipv4_sock,
1015            ipv6_sock,
1016            // poll_ids: HashMap::new(),
1017            my_services: HashMap::new(),
1018            cache: DnsCache::new(),
1019            dns_registry_map,
1020            hostname_resolvers: HashMap::new(),
1021            service_queriers: HashMap::new(),
1022            retransmissions: Vec::new(),
1023            counters: HashMap::new(),
1024            poller,
1025            monitors,
1026            service_name_len_max,
1027            ip_check_interval,
1028            if_selections,
1029            signal_sock,
1030            timers,
1031            status,
1032            pending_resolves: HashSet::new(),
1033            resolved: HashSet::new(),
1034            multicast_loop_v4: true,
1035            multicast_loop_v6: true,
1036            accept_unsolicited: false,
1037
1038            #[cfg(test)]
1039            test_down_interfaces: HashSet::new(),
1040        }
1041    }
1042
1043    /// The main event loop of the daemon thread
1044    ///
1045    /// In each round, it will:
1046    /// 1. select the listening sockets with a timeout.
1047    /// 2. process the incoming packets if any.
1048    /// 3. try_recv on its channel and execute commands.
1049    /// 4. announce its registered services.
1050    /// 5. process retransmissions if any.
1051    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1052        // Add the daemon's signal socket to the poller.
1053        if let Err(e) = self.poller.registry().register(
1054            &mut self.signal_sock,
1055            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1056            mio::Interest::READABLE,
1057        ) {
1058            debug!("failed to add signal socket to the poller: {}", e);
1059            return None;
1060        }
1061
1062        if let Err(e) = self.poller.registry().register(
1063            &mut self.ipv4_sock,
1064            mio::Token(IPV4_SOCK_EVENT_KEY),
1065            mio::Interest::READABLE,
1066        ) {
1067            debug!("failed to register ipv4 socket: {}", e);
1068            return None;
1069        }
1070
1071        if let Err(e) = self.poller.registry().register(
1072            &mut self.ipv6_sock,
1073            mio::Token(IPV6_SOCK_EVENT_KEY),
1074            mio::Interest::READABLE,
1075        ) {
1076            debug!("failed to register ipv6 socket: {}", e);
1077            return None;
1078        }
1079
1080        // Setup timer for IP checks.
1081        let mut next_ip_check = if self.ip_check_interval > 0 {
1082            current_time_millis() + self.ip_check_interval
1083        } else {
1084            0
1085        };
1086
1087        if next_ip_check > 0 {
1088            self.add_timer(next_ip_check);
1089        }
1090
1091        // Start the run loop.
1092
1093        let mut events = mio::Events::with_capacity(1024);
1094        loop {
1095            let now = current_time_millis();
1096
1097            let earliest_timer = self.peek_earliest_timer();
1098            let timeout = earliest_timer.map(|timer| {
1099                // If `timer` already passed, set `timeout` to be 1ms.
1100                let millis = if timer > now { timer - now } else { 1 };
1101                Duration::from_millis(millis)
1102            });
1103
1104            // Process incoming packets, command events and optional timeout.
1105            events.clear();
1106            match self.poller.poll(&mut events, timeout) {
1107                Ok(_) => self.handle_poller_events(&events),
1108                Err(e) => debug!("failed to select from sockets: {}", e),
1109            }
1110
1111            let now = current_time_millis();
1112
1113            // Remove the timers if already passed.
1114            self.pop_timers_till(now);
1115
1116            // Remove hostname resolvers with expired timeouts.
1117            for hostname in self
1118                .hostname_resolvers
1119                .clone()
1120                .into_iter()
1121                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1122                .map(|(hostname, _)| hostname)
1123            {
1124                trace!("hostname resolver timeout for {}", &hostname);
1125                call_hostname_resolution_listener(
1126                    &self.hostname_resolvers,
1127                    &hostname,
1128                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1129                );
1130                call_hostname_resolution_listener(
1131                    &self.hostname_resolvers,
1132                    &hostname,
1133                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1134                );
1135                self.hostname_resolvers.remove(&hostname);
1136            }
1137
1138            // process commands from the command channel
1139            while let Ok(command) = receiver.try_recv() {
1140                if matches!(command, Command::Exit(_)) {
1141                    self.status = DaemonStatus::Shutdown;
1142                    return Some(command);
1143                }
1144                self.exec_command(command, false);
1145            }
1146
1147            // check for repeated commands and run them if their time is up.
1148            let mut i = 0;
1149            while i < self.retransmissions.len() {
1150                if now >= self.retransmissions[i].next_time {
1151                    let rerun = self.retransmissions.remove(i);
1152                    self.exec_command(rerun.command, true);
1153                } else {
1154                    i += 1;
1155                }
1156            }
1157
1158            // Refresh cached service records with active queriers
1159            self.refresh_active_services();
1160
1161            // Refresh cached A/AAAA records with active queriers
1162            let mut query_count = 0;
1163            for (hostname, _sender) in self.hostname_resolvers.iter() {
1164                for (hostname, ip_addr) in
1165                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1166                {
1167                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1168                    query_count += 1;
1169                }
1170            }
1171
1172            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1173
1174            // check and evict expired records in our cache
1175            let now = current_time_millis();
1176
1177            // Notify service listeners about the expired records.
1178            let expired_services = self.cache.evict_expired_services(now);
1179            if !expired_services.is_empty() {
1180                debug!(
1181                    "run: send {} service removal to listeners",
1182                    expired_services.len()
1183                );
1184                self.notify_service_removal(expired_services);
1185            }
1186
1187            // Notify hostname listeners about the expired records.
1188            let expired_addrs = self.cache.evict_expired_addr(now);
1189            for (hostname, addrs) in expired_addrs {
1190                call_hostname_resolution_listener(
1191                    &self.hostname_resolvers,
1192                    &hostname,
1193                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1194                );
1195                let instances = self.cache.get_instances_on_host(&hostname);
1196                let instance_set: HashSet<String> = instances.into_iter().collect();
1197                self.resolve_updated_instances(&instance_set);
1198            }
1199
1200            // Send out probing queries.
1201            self.probing_handler();
1202
1203            // check IP changes if next_ip_check is reached.
1204            if now >= next_ip_check && next_ip_check > 0 {
1205                next_ip_check = now + self.ip_check_interval;
1206                self.add_timer(next_ip_check);
1207
1208                self.check_ip_changes();
1209            }
1210        }
1211    }
1212
1213    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1214        match daemon_opt {
1215            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1216            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1217            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1218            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1219            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1220            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1221            DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1222            #[cfg(test)]
1223            DaemonOption::TestDownInterface(ifname) => {
1224                self.test_down_interfaces.insert(ifname);
1225            }
1226            #[cfg(test)]
1227            DaemonOption::TestUpInterface(ifname) => {
1228                self.test_down_interfaces.remove(&ifname);
1229            }
1230        }
1231    }
1232
1233    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1234        debug!("enable_interface: {:?}", kinds);
1235        for if_kind in kinds {
1236            self.if_selections.push(IfSelection {
1237                if_kind,
1238                selected: true,
1239            });
1240        }
1241
1242        self.apply_intf_selections(my_ip_interfaces(true));
1243    }
1244
1245    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1246        debug!("disable_interface: {:?}", kinds);
1247        for if_kind in kinds {
1248            self.if_selections.push(IfSelection {
1249                if_kind,
1250                selected: false,
1251            });
1252        }
1253
1254        self.apply_intf_selections(my_ip_interfaces(true));
1255    }
1256
1257    fn set_multicast_loop_v4(&mut self, on: bool) {
1258        self.multicast_loop_v4 = on;
1259        self.ipv4_sock
1260            .pktinfo
1261            .set_multicast_loop_v4(on)
1262            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1263            .unwrap();
1264    }
1265
1266    fn set_multicast_loop_v6(&mut self, on: bool) {
1267        self.multicast_loop_v6 = on;
1268        self.ipv6_sock
1269            .pktinfo
1270            .set_multicast_loop_v6(on)
1271            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1272            .unwrap();
1273    }
1274
1275    fn set_accept_unsolicited(&mut self, accept: bool) {
1276        self.accept_unsolicited = accept;
1277    }
1278
1279    fn notify_monitors(&mut self, event: DaemonEvent) {
1280        // Only retain the monitors that are still connected.
1281        self.monitors.retain(|sender| {
1282            if let Err(e) = sender.try_send(event.clone()) {
1283                debug!("notify_monitors: try_send: {}", &e);
1284                if matches!(e, TrySendError::Disconnected(_)) {
1285                    return false; // This monitor is dropped.
1286                }
1287            }
1288            true
1289        });
1290    }
1291
1292    /// Remove `addr` in my services that enabled `addr_auto`.
1293    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1294        for (_, service_info) in self.my_services.iter_mut() {
1295            if service_info.is_addr_auto() {
1296                service_info.remove_ipaddr(addr);
1297            }
1298        }
1299    }
1300
1301    fn add_timer(&mut self, next_time: u64) {
1302        self.timers.push(Reverse(next_time));
1303    }
1304
1305    fn peek_earliest_timer(&self) -> Option<u64> {
1306        self.timers.peek().map(|Reverse(v)| *v)
1307    }
1308
1309    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1310        self.timers.pop().map(|Reverse(v)| v)
1311    }
1312
1313    /// Pop all timers that are already passed till `now`.
1314    fn pop_timers_till(&mut self, now: u64) {
1315        while let Some(Reverse(v)) = self.timers.peek() {
1316            if *v > now {
1317                break;
1318            }
1319            self.timers.pop();
1320        }
1321    }
1322
1323    /// Apply all selections to `interfaces` and return the selected addresses.
1324    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1325        let intf_count = interfaces.len();
1326        let mut intf_selections = vec![true; intf_count];
1327
1328        // apply if_selections
1329        for selection in self.if_selections.iter() {
1330            // Mark the interfaces for this selection.
1331            for i in 0..intf_count {
1332                if selection.if_kind.matches(&interfaces[i]) {
1333                    intf_selections[i] = selection.selected;
1334                }
1335            }
1336        }
1337
1338        let mut selected_addrs = HashSet::new();
1339        for i in 0..intf_count {
1340            if intf_selections[i] {
1341                selected_addrs.insert(interfaces[i].addr.ip());
1342            }
1343        }
1344
1345        selected_addrs
1346    }
1347
1348    /// Apply all selections to `interfaces`.
1349    ///
1350    /// For any interface, add it if selected but not bound yet,
1351    /// delete it if not selected but still bound.
1352    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1353        // By default, we enable all interfaces.
1354        let intf_count = interfaces.len();
1355        let mut intf_selections = vec![true; intf_count];
1356
1357        // apply if_selections
1358        for selection in self.if_selections.iter() {
1359            // Mark the interfaces for this selection.
1360            for i in 0..intf_count {
1361                if selection.if_kind.matches(&interfaces[i]) {
1362                    intf_selections[i] = selection.selected;
1363                }
1364            }
1365        }
1366
1367        // Update `my_intfs` based on the selections.
1368        for (idx, intf) in interfaces.into_iter().enumerate() {
1369            if intf_selections[idx] {
1370                // Add the interface
1371                self.add_interface(intf);
1372            } else {
1373                // Remove the interface
1374                self.del_interface(&intf);
1375            }
1376        }
1377    }
1378
1379    fn del_ip(&mut self, ip: IpAddr) {
1380        self.del_addr_in_my_services(&ip);
1381        self.notify_monitors(DaemonEvent::IpDel(ip));
1382    }
1383
1384    /// Check for IP changes and update [my_intfs] as needed.
1385    fn check_ip_changes(&mut self) {
1386        // Get the current interfaces.
1387        let my_ifaddrs = my_ip_interfaces(true);
1388
1389        #[cfg(test)]
1390        let my_ifaddrs: Vec<_> = my_ifaddrs
1391            .into_iter()
1392            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1393            .collect();
1394
1395        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1396            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1397                let if_index = intf.index.unwrap_or(0);
1398                acc.entry(if_index).or_default().push(&intf.addr);
1399                acc
1400            });
1401
1402        let mut deleted_intfs = Vec::new();
1403        let mut deleted_ips = Vec::new();
1404
1405        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1406            let mut last_ipv4 = None;
1407            let mut last_ipv6 = None;
1408
1409            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1410                my_intf.addrs.retain(|addr| {
1411                    if current_addrs.contains(&addr) {
1412                        true
1413                    } else {
1414                        match addr.ip() {
1415                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1416                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1417                        }
1418                        deleted_ips.push(addr.ip());
1419                        false
1420                    }
1421                });
1422                if my_intf.addrs.is_empty() {
1423                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1424                }
1425            } else {
1426                // If it does not exist, remove the interface.
1427                debug!(
1428                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1429                    my_intf.name, if_index
1430                );
1431                for addr in my_intf.addrs.iter() {
1432                    match addr.ip() {
1433                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1434                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1435                    }
1436                    deleted_ips.push(addr.ip())
1437                }
1438                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1439            }
1440        }
1441
1442        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1443            debug!(
1444                "check_ip_changes: {} deleted ips {} deleted intfs",
1445                deleted_ips.len(),
1446                deleted_intfs.len()
1447            );
1448        }
1449
1450        for ip in deleted_ips {
1451            self.del_ip(ip);
1452        }
1453
1454        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1455            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1456                continue;
1457            };
1458
1459            if let Some(ipv4) = last_ipv4 {
1460                debug!("leave multicast for {ipv4}");
1461                if let Err(e) = self
1462                    .ipv4_sock
1463                    .pktinfo
1464                    .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1465                {
1466                    debug!("leave multicast group for addr {ipv4}: {e}");
1467                }
1468            }
1469
1470            if let Some(ipv6) = last_ipv6 {
1471                debug!("leave multicast for {ipv6}");
1472                if let Err(e) = self
1473                    .ipv6_sock
1474                    .pktinfo
1475                    .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1476                {
1477                    debug!("leave multicast group for IPv6: {ipv6}: {e}");
1478                }
1479            }
1480
1481            // Remove cache records for this interface.
1482            let intf_id = InterfaceId {
1483                name: my_intf.name.to_string(),
1484                index: my_intf.index,
1485            };
1486            let removed_instances = self.cache.remove_records_on_intf(intf_id);
1487            self.notify_service_removal(removed_instances);
1488        }
1489
1490        // Add newly found interfaces only if in our selections.
1491        self.apply_intf_selections(my_ifaddrs);
1492    }
1493
1494    fn del_interface(&mut self, intf: &Interface) {
1495        let if_index = intf.index.unwrap_or(0);
1496        trace!(
1497            "del_interface: {} ({if_index}) addr {}",
1498            intf.name,
1499            intf.ip()
1500        );
1501
1502        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1503            debug!("del_interface: interface {} not found", intf.name);
1504            return;
1505        };
1506
1507        let mut ip_removed = false;
1508
1509        if my_intf.addrs.remove(&intf.addr) {
1510            ip_removed = true;
1511
1512            match intf.addr.ip() {
1513                IpAddr::V4(ipv4) => {
1514                    if my_intf.next_ifaddr_v4().is_none() {
1515                        if let Err(e) = self
1516                            .ipv4_sock
1517                            .pktinfo
1518                            .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1519                        {
1520                            debug!("leave multicast group for addr {ipv4}: {e}");
1521                        }
1522                    }
1523                }
1524
1525                IpAddr::V6(ipv6) => {
1526                    if my_intf.next_ifaddr_v6().is_none() {
1527                        if let Err(e) = self
1528                            .ipv6_sock
1529                            .pktinfo
1530                            .leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1531                        {
1532                            debug!("leave multicast group for addr {ipv6}: {e}");
1533                        }
1534                    }
1535                }
1536            }
1537
1538            if my_intf.addrs.is_empty() {
1539                // If no more addresses, remove the interface.
1540                debug!("del_interface: removing interface {}", intf.name);
1541                self.my_intfs.remove(&if_index);
1542                self.dns_registry_map.remove(&if_index);
1543                self.cache.remove_addrs_on_disabled_intf(if_index);
1544            }
1545        }
1546
1547        if ip_removed {
1548            // Notify the monitors.
1549            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1550            // Remove the interface from my services that enabled `addr_auto`.
1551            self.del_addr_in_my_services(&intf.ip());
1552        }
1553    }
1554
1555    fn add_interface(&mut self, intf: Interface) {
1556        let sock = if intf.ip().is_ipv4() {
1557            &self.ipv4_sock
1558        } else {
1559            &self.ipv6_sock
1560        };
1561
1562        let if_index = intf.index.unwrap_or(0);
1563        let mut new_addr = false;
1564
1565        match self.my_intfs.entry(if_index) {
1566            Entry::Occupied(mut entry) => {
1567                // If intf has a new address, add it to the existing interface.
1568                let my_intf = entry.get_mut();
1569                if !my_intf.addrs.contains(&intf.addr) {
1570                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1571                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1572                    }
1573                    my_intf.addrs.insert(intf.addr.clone());
1574                    new_addr = true;
1575                }
1576            }
1577            Entry::Vacant(entry) => {
1578                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1579                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1580                    return;
1581                }
1582
1583                new_addr = true;
1584                let new_intf = MyIntf {
1585                    name: intf.name.clone(),
1586                    index: if_index,
1587                    addrs: HashSet::from([intf.addr.clone()]),
1588                };
1589                entry.insert(new_intf);
1590            }
1591        }
1592
1593        if !new_addr {
1594            trace!("add_interface: interface {} already exists", &intf.name);
1595            return;
1596        }
1597
1598        debug!("add new interface {}: {}", intf.name, intf.ip());
1599
1600        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1601            debug!("add_interface: cannot find if_index {if_index}");
1602            return;
1603        };
1604
1605        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1606            Some(registry) => registry,
1607            None => self
1608                .dns_registry_map
1609                .entry(if_index)
1610                .or_insert_with(DnsRegistry::new),
1611        };
1612
1613        for (_, service_info) in self.my_services.iter_mut() {
1614            if service_info.is_addr_auto() {
1615                let new_ip = intf.ip();
1616                service_info.insert_ipaddr(new_ip);
1617
1618                if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1619                    debug!(
1620                        "Announce service {} on {}",
1621                        service_info.get_fullname(),
1622                        intf.ip()
1623                    );
1624                    service_info.set_status(if_index, ServiceStatus::Announced);
1625                } else {
1626                    for timer in dns_registry.new_timers.drain(..) {
1627                        self.timers.push(Reverse(timer));
1628                    }
1629                    service_info.set_status(if_index, ServiceStatus::Probing);
1630                }
1631            }
1632        }
1633
1634        // As we added a new interface, we want to execute all active "Browse" reruns now.
1635        let mut browse_reruns = Vec::new();
1636        let mut i = 0;
1637        while i < self.retransmissions.len() {
1638            if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1639                browse_reruns.push(self.retransmissions.remove(i));
1640            } else {
1641                i += 1;
1642            }
1643        }
1644
1645        for rerun in browse_reruns {
1646            self.exec_command(rerun.command, true);
1647        }
1648
1649        // Notify the monitors.
1650        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1651    }
1652
1653    /// Registers a service.
1654    ///
1655    /// RFC 6762 section 8.3.
1656    /// ...the Multicast DNS responder MUST send
1657    ///    an unsolicited Multicast DNS response containing, in the Answer
1658    ///    Section, all of its newly registered resource records
1659    ///
1660    /// Zeroconf will then respond to requests for information about this service.
1661    fn register_service(&mut self, mut info: ServiceInfo) {
1662        // Check the service name length.
1663        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1664            debug!("check_service_name_length: {}", &e);
1665            self.notify_monitors(DaemonEvent::Error(e));
1666            return;
1667        }
1668
1669        if info.is_addr_auto() {
1670            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1671            for addr in selected_addrs {
1672                info.insert_ipaddr(addr);
1673            }
1674        }
1675
1676        debug!("register service {:?}", &info);
1677
1678        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1679        if !outgoing_addrs.is_empty() {
1680            self.notify_monitors(DaemonEvent::Announce(
1681                info.get_fullname().to_string(),
1682                format!("{:?}", &outgoing_addrs),
1683            ));
1684        }
1685
1686        // The key has to be lower case letter as DNS record name is case insensitive.
1687        // The info will have the original name.
1688        let service_fullname = info.get_fullname().to_lowercase();
1689        self.my_services.insert(service_fullname, info);
1690    }
1691
1692    /// Sends out announcement of `info` on every valid interface.
1693    /// Returns the list of interface IPs that sent out the announcement.
1694    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1695        let mut outgoing_addrs = Vec::new();
1696        let mut outgoing_intfs = HashSet::new();
1697
1698        for (if_index, intf) in self.my_intfs.iter() {
1699            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1700                Some(registry) => registry,
1701                None => self
1702                    .dns_registry_map
1703                    .entry(*if_index)
1704                    .or_insert_with(DnsRegistry::new),
1705            };
1706
1707            let mut announced = false;
1708
1709            // IPv4
1710            if announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo) {
1711                for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1712                    outgoing_addrs.push(addr.ip());
1713                }
1714                outgoing_intfs.insert(intf.index);
1715
1716                debug!(
1717                    "Announce service IPv4 {} on {}",
1718                    info.get_fullname(),
1719                    intf.name
1720                );
1721                announced = true;
1722            }
1723
1724            if announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo) {
1725                for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1726                    outgoing_addrs.push(addr.ip());
1727                }
1728                outgoing_intfs.insert(intf.index);
1729
1730                debug!(
1731                    "Announce service IPv6 {} on {}",
1732                    info.get_fullname(),
1733                    intf.name
1734                );
1735                announced = true;
1736            }
1737
1738            if announced {
1739                info.set_status(intf.index, ServiceStatus::Announced);
1740            } else {
1741                for timer in dns_registry.new_timers.drain(..) {
1742                    self.timers.push(Reverse(timer));
1743                }
1744                info.set_status(*if_index, ServiceStatus::Probing);
1745            }
1746        }
1747
1748        // RFC 6762 section 8.3.
1749        // ..The Multicast DNS responder MUST send at least two unsolicited
1750        //    responses, one second apart.
1751        let next_time = current_time_millis() + 1000;
1752        for if_index in outgoing_intfs {
1753            self.add_retransmission(
1754                next_time,
1755                Command::RegisterResend(info.get_fullname().to_string(), if_index),
1756            );
1757        }
1758
1759        outgoing_addrs
1760    }
1761
1762    /// Send probings or finish them if expired. Notify waiting services.
1763    fn probing_handler(&mut self) {
1764        let now = current_time_millis();
1765
1766        for (if_index, intf) in self.my_intfs.iter() {
1767            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1768                continue;
1769            };
1770
1771            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1772
1773            // send probing.
1774            if !out.questions().is_empty() {
1775                trace!("sending out probing of questions: {:?}", out.questions());
1776                send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1777                send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1778            }
1779
1780            // For finished probes, wake up services that are waiting for the probes.
1781            let waiting_services =
1782                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1783
1784            for service_name in waiting_services {
1785                // service names are lowercase
1786                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1787                    if info.get_status(*if_index) == ServiceStatus::Announced {
1788                        debug!("service {} already announced", info.get_fullname());
1789                        continue;
1790                    }
1791
1792                    let announced_v4 =
1793                        announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
1794                    let announced_v6 =
1795                        announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
1796
1797                    if announced_v4 || announced_v6 {
1798                        let next_time = now + 1000;
1799                        let command =
1800                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1801                        self.retransmissions.push(ReRun { next_time, command });
1802                        self.timers.push(Reverse(next_time));
1803
1804                        let fullname = match dns_registry.name_changes.get(&service_name) {
1805                            Some(new_name) => new_name.to_string(),
1806                            None => service_name.to_string(),
1807                        };
1808
1809                        let mut hostname = info.get_hostname();
1810                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1811                            hostname = new_name;
1812                        }
1813
1814                        debug!("wake up: announce service {} on {}", fullname, intf.name);
1815                        notify_monitors(
1816                            &mut self.monitors,
1817                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1818                        );
1819
1820                        info.set_status(*if_index, ServiceStatus::Announced);
1821                    }
1822                }
1823            }
1824        }
1825    }
1826
1827    fn unregister_service(
1828        &self,
1829        info: &ServiceInfo,
1830        intf: &MyIntf,
1831        sock: &PktInfoUdpSocket,
1832    ) -> Vec<u8> {
1833        let is_ipv4 = sock.domain() == Domain::IPV4;
1834
1835        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1836        out.add_answer_at_time(
1837            DnsPointer::new(
1838                info.get_type(),
1839                RRType::PTR,
1840                CLASS_IN,
1841                0,
1842                info.get_fullname().to_string(),
1843            ),
1844            0,
1845        );
1846
1847        if let Some(sub) = info.get_subtype() {
1848            trace!("Adding subdomain {}", sub);
1849            out.add_answer_at_time(
1850                DnsPointer::new(
1851                    sub,
1852                    RRType::PTR,
1853                    CLASS_IN,
1854                    0,
1855                    info.get_fullname().to_string(),
1856                ),
1857                0,
1858            );
1859        }
1860
1861        out.add_answer_at_time(
1862            DnsSrv::new(
1863                info.get_fullname(),
1864                CLASS_IN | CLASS_CACHE_FLUSH,
1865                0,
1866                info.get_priority(),
1867                info.get_weight(),
1868                info.get_port(),
1869                info.get_hostname().to_string(),
1870            ),
1871            0,
1872        );
1873        out.add_answer_at_time(
1874            DnsTxt::new(
1875                info.get_fullname(),
1876                CLASS_IN | CLASS_CACHE_FLUSH,
1877                0,
1878                info.generate_txt(),
1879            ),
1880            0,
1881        );
1882
1883        let if_addrs = if is_ipv4 {
1884            info.get_addrs_on_my_intf_v4(intf)
1885        } else {
1886            info.get_addrs_on_my_intf_v6(intf)
1887        };
1888
1889        if if_addrs.is_empty() {
1890            return vec![];
1891        }
1892
1893        for address in if_addrs {
1894            out.add_answer_at_time(
1895                DnsAddress::new(
1896                    info.get_hostname(),
1897                    ip_address_rr_type(&address),
1898                    CLASS_IN | CLASS_CACHE_FLUSH,
1899                    0,
1900                    address,
1901                    intf.into(),
1902                ),
1903                0,
1904            );
1905        }
1906
1907        // `out` data is non-empty, hence we can do this.
1908        send_dns_outgoing(&out, intf, sock).remove(0)
1909    }
1910
1911    /// Binds a channel `listener` to querying mDNS hostnames.
1912    ///
1913    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1914    fn add_hostname_resolver(
1915        &mut self,
1916        hostname: String,
1917        listener: Sender<HostnameResolutionEvent>,
1918        timeout: Option<u64>,
1919    ) {
1920        let real_timeout = timeout.map(|t| current_time_millis() + t);
1921        self.hostname_resolvers
1922            .insert(hostname.to_lowercase(), (listener, real_timeout));
1923        if let Some(t) = real_timeout {
1924            self.add_timer(t);
1925        }
1926    }
1927
1928    /// Sends a multicast query for `name` with `qtype`.
1929    fn send_query(&self, name: &str, qtype: RRType) {
1930        self.send_query_vec(&[(name, qtype)]);
1931    }
1932
1933    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1934    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1935        trace!("Sending query questions: {:?}", questions);
1936        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1937        let now = current_time_millis();
1938
1939        for (name, qtype) in questions {
1940            out.add_question(name, *qtype);
1941
1942            for record in self.cache.get_known_answers(name, *qtype, now) {
1943                /*
1944                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1945                ...
1946                    When a Multicast DNS querier sends a query to which it already knows
1947                    some answers, it populates the Answer Section of the DNS query
1948                    message with those answers.
1949                 */
1950                trace!("add known answer: {:?}", record.record);
1951                let mut new_record = record.record.clone();
1952                new_record.get_record_mut().update_ttl(now);
1953                out.add_answer_box(new_record);
1954            }
1955        }
1956
1957        for (_, intf) in self.my_intfs.iter() {
1958            send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1959            send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1960        }
1961    }
1962
1963    /// Reads one UDP datagram from the socket of `intf`.
1964    ///
1965    /// Returns false if failed to receive a packet,
1966    /// otherwise returns true.
1967    fn handle_read(&mut self, event_key: usize) -> bool {
1968        let sock = match event_key {
1969            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
1970            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
1971            _ => {
1972                debug!("handle_read: unknown token {}", event_key);
1973                return false;
1974            }
1975        };
1976        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1977
1978        // Read the next mDNS UDP datagram.
1979        //
1980        // If the datagram is larger than `buf`, excess bytes may or may not
1981        // be truncated by the socket layer depending on the platform's libc.
1982        // In any case, such large datagram will not be decoded properly and
1983        // this function should return false but should not crash.
1984        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
1985            Ok(sz) => sz,
1986            Err(e) => {
1987                if e.kind() != std::io::ErrorKind::WouldBlock {
1988                    debug!("listening socket read failed: {}", e);
1989                }
1990                return false;
1991            }
1992        };
1993
1994        // Find the interface that received the packet.
1995        let pkt_if_index = pktinfo.if_index as u32;
1996        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
1997            debug!(
1998                "handle_read: no interface found for pktinfo if_index: {}",
1999                pktinfo.if_index
2000            );
2001            return true; // We still return true to indicate that we read something.
2002        };
2003
2004        buf.truncate(sz); // reduce potential processing errors
2005
2006        match DnsIncoming::new(buf, my_intf.into()) {
2007            Ok(msg) => {
2008                if msg.is_query() {
2009                    self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2010                } else if msg.is_response() {
2011                    self.handle_response(msg, pkt_if_index);
2012                } else {
2013                    debug!("Invalid message: not query and not response");
2014                }
2015            }
2016            Err(e) => debug!("Invalid incoming DNS message: {}", e),
2017        }
2018
2019        true
2020    }
2021
2022    /// Returns true, if sent query. Returns false if SRV already exists.
2023    fn query_unresolved(&mut self, instance: &str) -> bool {
2024        if !valid_instance_name(instance) {
2025            trace!("instance name {} not valid", instance);
2026            return false;
2027        }
2028
2029        if let Some(records) = self.cache.get_srv(instance) {
2030            for record in records {
2031                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2032                    if self.cache.get_addr(srv.host()).is_none() {
2033                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2034                        return true;
2035                    }
2036                }
2037            }
2038        } else {
2039            self.send_query(instance, RRType::ANY);
2040            return true;
2041        }
2042
2043        false
2044    }
2045
2046    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2047    /// cached records via `sender`.
2048    fn query_cache_for_service(
2049        &mut self,
2050        ty_domain: &str,
2051        sender: &Sender<ServiceEvent>,
2052        now: u64,
2053    ) {
2054        let mut resolved: HashSet<String> = HashSet::new();
2055        let mut unresolved: HashSet<String> = HashSet::new();
2056
2057        if let Some(records) = self.cache.get_ptr(ty_domain) {
2058            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2059                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2060                    let mut new_event = None;
2061                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2062                        Ok(resolved_service) => {
2063                            if resolved_service.is_valid() {
2064                                debug!("Resolved service from cache: {}", ptr.alias());
2065                                new_event =
2066                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2067                            } else {
2068                                debug!("Resolved service is not valid: {}", ptr.alias());
2069                            }
2070                        }
2071                        Err(err) => {
2072                            debug!("Error while resolving service from cache: {}", err);
2073                            continue;
2074                        }
2075                    }
2076
2077                    match sender.send(ServiceEvent::ServiceFound(
2078                        ty_domain.to_string(),
2079                        ptr.alias().to_string(),
2080                    )) {
2081                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2082                        Err(e) => {
2083                            debug!("failed to send service found: {}", e);
2084                            continue;
2085                        }
2086                    }
2087
2088                    if let Some(event) = new_event {
2089                        resolved.insert(ptr.alias().to_string());
2090                        match sender.send(event) {
2091                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2092                            Err(e) => debug!("failed to send service resolved: {}", e),
2093                        }
2094                    } else {
2095                        unresolved.insert(ptr.alias().to_string());
2096                    }
2097                }
2098            }
2099        }
2100
2101        for instance in resolved.drain() {
2102            self.pending_resolves.remove(&instance);
2103            self.resolved.insert(instance);
2104        }
2105
2106        for instance in unresolved.drain() {
2107            self.add_pending_resolve(instance);
2108        }
2109    }
2110
2111    /// Checks if `hostname` has records in the cache. If yes, sends the
2112    /// cached records via `sender`.
2113    fn query_cache_for_hostname(
2114        &mut self,
2115        hostname: &str,
2116        sender: Sender<HostnameResolutionEvent>,
2117    ) {
2118        let addresses_map = self.cache.get_addresses_for_host(hostname);
2119        for (name, addresses) in addresses_map {
2120            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2121                Ok(()) => trace!("sent hostname addresses found"),
2122                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2123            }
2124        }
2125    }
2126
2127    fn add_pending_resolve(&mut self, instance: String) {
2128        if !self.pending_resolves.contains(&instance) {
2129            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2130            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2131            self.pending_resolves.insert(instance);
2132        }
2133    }
2134
2135    /// Creates a `ResolvedService` from the cache.
2136    fn resolve_service_from_cache(
2137        &self,
2138        ty_domain: &str,
2139        fullname: &str,
2140    ) -> Result<ResolvedService> {
2141        let now = current_time_millis();
2142        let mut resolved_service = ResolvedService {
2143            ty_domain: ty_domain.to_string(),
2144            sub_ty_domain: None,
2145            fullname: fullname.to_string(),
2146            host: String::new(),
2147            port: 0,
2148            addresses: HashSet::new(),
2149            txt_properties: TxtProperties::new(),
2150        };
2151
2152        // Be sure setting `subtype` if available even when querying for the parent domain.
2153        if let Some(subtype) = self.cache.get_subtype(fullname) {
2154            trace!(
2155                "ty_domain: {} found subtype {} for instance: {}",
2156                ty_domain,
2157                subtype,
2158                fullname
2159            );
2160            if resolved_service.sub_ty_domain.is_none() {
2161                resolved_service.sub_ty_domain = Some(subtype.to_string());
2162            }
2163        }
2164
2165        // resolve SRV record
2166        if let Some(records) = self.cache.get_srv(fullname) {
2167            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2168                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2169                    resolved_service.host = dns_srv.host().to_string();
2170                    resolved_service.port = dns_srv.port();
2171                }
2172            }
2173        }
2174
2175        // resolve TXT record
2176        if let Some(records) = self.cache.get_txt(fullname) {
2177            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2178                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2179                    resolved_service.txt_properties = dns_txt.text().into();
2180                }
2181            }
2182        }
2183
2184        // resolve A and AAAA records
2185        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2186            for answer in records.iter() {
2187                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2188                    if dns_a.expires_soon(now) {
2189                        trace!(
2190                            "Addr expired or expires soon: {}",
2191                            dns_a.address().to_ip_addr()
2192                        );
2193                    } else {
2194                        resolved_service.addresses.insert(dns_a.address());
2195                    }
2196                }
2197            }
2198        }
2199
2200        Ok(resolved_service)
2201    }
2202
2203    fn handle_poller_events(&mut self, events: &mio::Events) {
2204        for ev in events.iter() {
2205            trace!("event received with key {:?}", ev.token());
2206            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2207                // Drain signals as we will drain commands as well.
2208                self.signal_sock_drain();
2209
2210                if let Err(e) = self.poller.registry().reregister(
2211                    &mut self.signal_sock,
2212                    ev.token(),
2213                    mio::Interest::READABLE,
2214                ) {
2215                    debug!("failed to modify poller for signal socket: {}", e);
2216                }
2217                continue; // Next event.
2218            }
2219
2220            // Read until no more packets available.
2221            while self.handle_read(ev.token().0) {}
2222
2223            // we continue to monitor this socket.
2224            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2225                // Re-register the IPv4 socket for reading.
2226                if let Err(e) = self.poller.registry().reregister(
2227                    &mut self.ipv4_sock,
2228                    ev.token(),
2229                    mio::Interest::READABLE,
2230                ) {
2231                    debug!("modify poller for IPv4 socket: {}", e);
2232                }
2233            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2234                // Re-register the IPv6 socket for reading.
2235                if let Err(e) = self.poller.registry().reregister(
2236                    &mut self.ipv6_sock,
2237                    ev.token(),
2238                    mio::Interest::READABLE,
2239                ) {
2240                    debug!("modify poller for IPv6 socket: {}", e);
2241                }
2242            }
2243        }
2244    }
2245
2246    /// Deal with incoming response packets.  All answers
2247    /// are held in the cache, and listeners are notified.
2248    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2249        let now = current_time_millis();
2250
2251        // remove records that are expired.
2252        let mut record_predicate = |record: &DnsRecordBox| {
2253            if !record.get_record().is_expired(now) {
2254                return true;
2255            }
2256
2257            debug!("record is expired, removing it from cache.");
2258            if self.cache.remove(record) {
2259                // for PTR records, send event to listeners
2260                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2261                    call_service_listener(
2262                        &self.service_queriers,
2263                        dns_ptr.get_name(),
2264                        ServiceEvent::ServiceRemoved(
2265                            dns_ptr.get_name().to_string(),
2266                            dns_ptr.alias().to_string(),
2267                        ),
2268                    );
2269                }
2270            }
2271            false
2272        };
2273        msg.answers_mut().retain(&mut record_predicate);
2274        msg.authorities_mut().retain(&mut record_predicate);
2275        msg.additionals_mut().retain(&mut record_predicate);
2276
2277        // check possible conflicts and handle them.
2278        self.conflict_handler(&msg, if_index);
2279
2280        // check if the message is for us.
2281        let mut is_for_us = true; // assume it is for us.
2282
2283        // If there are any PTR records in the answers, there should be
2284        // at least one PTR for us. Otherwise, the message is not for us.
2285        // If there are no PTR records at all, assume this message is for us.
2286        for answer in msg.answers() {
2287            if answer.get_type() == RRType::PTR {
2288                if self.service_queriers.contains_key(answer.get_name()) {
2289                    is_for_us = true;
2290                    break; // OK to break: at least one PTR for us.
2291                } else {
2292                    is_for_us = false;
2293                }
2294            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2295                // If there is a hostname querier for this address, then it is for us.
2296                let answer_lowercase = answer.get_name().to_lowercase();
2297                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2298                    is_for_us = true;
2299                    break; // OK to break: at least one hostname for us.
2300                }
2301            }
2302        }
2303
2304        // if we explicitily want to accept unsolicited responses, we should consider all messages as for us.
2305        if self.accept_unsolicited {
2306            is_for_us = true;
2307        }
2308
2309        /// Represents a DNS record change that involves one service instance.
2310        struct InstanceChange {
2311            ty: RRType,   // The type of DNS record for the instance.
2312            name: String, // The name of the record.
2313        }
2314
2315        // Go through all answers to get the new and updated records.
2316        // For new PTR records, send out ServiceFound immediately. For others,
2317        // collect them into `changes`.
2318        //
2319        // Note: we don't try to identify the update instances based on
2320        // each record immediately as the answers are likely related to each
2321        // other.
2322        let mut changes = Vec::new();
2323        let mut timers = Vec::new();
2324        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2325            return;
2326        };
2327        for record in msg.all_records() {
2328            match self
2329                .cache
2330                .add_or_update(my_intf, record, &mut timers, is_for_us)
2331            {
2332                Some((dns_record, true)) => {
2333                    timers.push(dns_record.record.get_record().get_expire_time());
2334                    timers.push(dns_record.record.get_record().get_refresh_time());
2335
2336                    let ty = dns_record.record.get_type();
2337                    let name = dns_record.record.get_name();
2338
2339                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2340                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2341                        if self.service_queriers.contains_key(name) {
2342                            timers.push(dns_record.record.get_record().get_refresh_time());
2343                        }
2344
2345                        // send ServiceFound
2346                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2347                        {
2348                            debug!("calling listener with service found: {name}");
2349                            call_service_listener(
2350                                &self.service_queriers,
2351                                name,
2352                                ServiceEvent::ServiceFound(
2353                                    name.to_string(),
2354                                    dns_ptr.alias().to_string(),
2355                                ),
2356                            );
2357                            changes.push(InstanceChange {
2358                                ty,
2359                                name: dns_ptr.alias().to_string(),
2360                            });
2361                        }
2362                    } else {
2363                        changes.push(InstanceChange {
2364                            ty,
2365                            name: name.to_string(),
2366                        });
2367                    }
2368                }
2369                Some((dns_record, false)) => {
2370                    timers.push(dns_record.record.get_record().get_expire_time());
2371                    timers.push(dns_record.record.get_record().get_refresh_time());
2372                }
2373                _ => {}
2374            }
2375        }
2376
2377        // Add timers for the new records.
2378        for t in timers {
2379            self.add_timer(t);
2380        }
2381
2382        // Go through remaining changes to see if any hostname resolutions were found or updated.
2383        for change in changes
2384            .iter()
2385            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2386        {
2387            let addr_map = self.cache.get_addresses_for_host(&change.name);
2388            for (name, addresses) in addr_map {
2389                call_hostname_resolution_listener(
2390                    &self.hostname_resolvers,
2391                    &change.name,
2392                    HostnameResolutionEvent::AddressesFound(name, addresses),
2393                )
2394            }
2395        }
2396
2397        // Identify the instances that need to be "resolved".
2398        let mut updated_instances = HashSet::new();
2399        for update in changes {
2400            match update.ty {
2401                RRType::PTR | RRType::SRV | RRType::TXT => {
2402                    updated_instances.insert(update.name);
2403                }
2404                RRType::A | RRType::AAAA => {
2405                    let instances = self.cache.get_instances_on_host(&update.name);
2406                    updated_instances.extend(instances);
2407                }
2408                _ => {}
2409            }
2410        }
2411
2412        self.resolve_updated_instances(&updated_instances);
2413    }
2414
2415    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2416        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2417            debug!("handle_response: no intf found for index {if_index}");
2418            return;
2419        };
2420
2421        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2422            return;
2423        };
2424
2425        for answer in msg.answers().iter() {
2426            let mut new_records = Vec::new();
2427
2428            let name = answer.get_name();
2429            let Some(probe) = dns_registry.probing.get_mut(name) else {
2430                continue;
2431            };
2432
2433            // check against possible multicast forwarding
2434            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2435                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2436                    if answer_addr.interface_id.index != if_index {
2437                        debug!(
2438                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2439                            answer_addr, my_intf.name
2440                        );
2441                        continue;
2442                    }
2443                }
2444
2445                // double check if any other address record matches rrdata,
2446                // as there could be multiple addresses for the same name.
2447                let any_match = probe.records.iter().any(|r| {
2448                    r.get_type() == answer.get_type()
2449                        && r.get_class() == answer.get_class()
2450                        && r.rrdata_match(answer.as_ref())
2451                });
2452                if any_match {
2453                    continue; // no conflict for this answer.
2454                }
2455            }
2456
2457            probe.records.retain(|record| {
2458                if record.get_type() == answer.get_type()
2459                    && record.get_class() == answer.get_class()
2460                    && !record.rrdata_match(answer.as_ref())
2461                {
2462                    debug!(
2463                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2464                        record.get_type(),
2465                        record.rdata_print(),
2466                        answer.rdata_print()
2467                    );
2468
2469                    // create a new name for this record
2470                    // then remove the old record in probing.
2471                    let mut new_record = record.clone();
2472                    let new_name = match record.get_type() {
2473                        RRType::A => hostname_change(name),
2474                        RRType::AAAA => hostname_change(name),
2475                        _ => name_change(name),
2476                    };
2477                    new_record.get_record_mut().set_new_name(new_name);
2478                    new_records.push(new_record);
2479                    return false; // old record is dropped from the probe.
2480                }
2481
2482                true
2483            });
2484
2485            // ?????
2486            // if probe.records.is_empty() {
2487            //     dns_registry.probing.remove(name);
2488            // }
2489
2490            // Probing again with the new names.
2491            let create_time = current_time_millis() + fastrand::u64(0..250);
2492
2493            let waiting_services = probe.waiting_services.clone();
2494
2495            for record in new_records {
2496                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2497                    self.timers.push(Reverse(create_time));
2498                }
2499
2500                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2501                dns_registry.name_changes.insert(
2502                    record.get_record().get_original_name().to_string(),
2503                    record.get_name().to_string(),
2504                );
2505
2506                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2507                    Some(p) => p,
2508                    None => {
2509                        let new_probe = dns_registry
2510                            .probing
2511                            .entry(record.get_name().to_string())
2512                            .or_insert_with(|| {
2513                                debug!("conflict handler: new probe of {}", record.get_name());
2514                                Probe::new(create_time)
2515                            });
2516                        self.timers.push(Reverse(new_probe.next_send));
2517                        new_probe
2518                    }
2519                };
2520
2521                debug!(
2522                    "insert record with new name '{}' {} into probe",
2523                    record.get_name(),
2524                    record.get_type()
2525                );
2526                new_probe.insert_record(record);
2527
2528                new_probe.waiting_services.extend(waiting_services.clone());
2529            }
2530        }
2531    }
2532
2533    /// Resolve the updated (including new) instances.
2534    ///
2535    /// Note: it is possible that more than 1 PTR pointing to the same
2536    /// instance. For example, a regular service type PTR and a sub-type
2537    /// service type PTR can both point to the same service instance.
2538    /// This loop automatically handles the sub-type PTRs.
2539    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2540        let mut resolved: HashSet<String> = HashSet::new();
2541        let mut unresolved: HashSet<String> = HashSet::new();
2542        let mut removed_instances = HashMap::new();
2543
2544        let now = current_time_millis();
2545
2546        for (ty_domain, records) in self.cache.all_ptr().iter() {
2547            if !self.service_queriers.contains_key(ty_domain) {
2548                // No need to resolve if not in our queries.
2549                continue;
2550            }
2551
2552            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2553                if let Some(dns_ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2554                    if updated_instances.contains(dns_ptr.alias()) {
2555                        let mut instance_found = false;
2556                        let mut new_event = None;
2557
2558                        if let Ok(resolved) =
2559                            self.resolve_service_from_cache(ty_domain, dns_ptr.alias())
2560                        {
2561                            debug!("resolve_updated_instances: from cache: {}", dns_ptr.alias());
2562                            instance_found = true;
2563                            if resolved.is_valid() {
2564                                new_event = Some(ServiceEvent::ServiceResolved(Box::new(resolved)));
2565                            } else {
2566                                debug!("Resolved service is not valid: {}", dns_ptr.alias());
2567                            }
2568                        }
2569
2570                        if instance_found {
2571                            if let Some(event) = new_event {
2572                                debug!("call queriers to resolve {}", dns_ptr.alias());
2573                                resolved.insert(dns_ptr.alias().to_string());
2574                                call_service_listener(&self.service_queriers, ty_domain, event);
2575                            } else {
2576                                if self.resolved.remove(dns_ptr.alias()) {
2577                                    removed_instances
2578                                        .entry(ty_domain.to_string())
2579                                        .or_insert_with(HashSet::new)
2580                                        .insert(dns_ptr.alias().to_string());
2581                                }
2582                                unresolved.insert(dns_ptr.alias().to_string());
2583                            }
2584                        }
2585                    }
2586                }
2587            }
2588        }
2589
2590        for instance in resolved.drain() {
2591            self.pending_resolves.remove(&instance);
2592            self.resolved.insert(instance);
2593        }
2594
2595        for instance in unresolved.drain() {
2596            self.add_pending_resolve(instance);
2597        }
2598
2599        if !removed_instances.is_empty() {
2600            debug!(
2601                "resolve_updated_instances: removed {}",
2602                &removed_instances.len()
2603            );
2604            self.notify_service_removal(removed_instances);
2605        }
2606    }
2607
2608    /// Handle incoming query packets, figure out whether and what to respond.
2609    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2610        let sock = if is_ipv4 {
2611            &self.ipv4_sock
2612        } else {
2613            &self.ipv6_sock
2614        };
2615        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2616
2617        // Special meta-query "_services._dns-sd._udp.<Domain>".
2618        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2619        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2620
2621        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2622            debug!("missing dns registry for intf {}", if_index);
2623            return;
2624        };
2625
2626        let Some(intf) = self.my_intfs.get(&if_index) else {
2627            return;
2628        };
2629
2630        for question in msg.questions().iter() {
2631            let qtype = question.entry_type();
2632
2633            if qtype == RRType::PTR {
2634                for service in self.my_services.values() {
2635                    if service.get_status(if_index) != ServiceStatus::Announced {
2636                        continue;
2637                    }
2638
2639                    if question.entry_name() == service.get_type()
2640                        || service
2641                            .get_subtype()
2642                            .as_ref()
2643                            .is_some_and(|v| v == question.entry_name())
2644                    {
2645                        add_answer_with_additionals(
2646                            &mut out,
2647                            &msg,
2648                            service,
2649                            intf,
2650                            dns_registry,
2651                            is_ipv4,
2652                        );
2653                    } else if question.entry_name() == META_QUERY {
2654                        let ptr_added = out.add_answer(
2655                            &msg,
2656                            DnsPointer::new(
2657                                question.entry_name(),
2658                                RRType::PTR,
2659                                CLASS_IN,
2660                                service.get_other_ttl(),
2661                                service.get_type().to_string(),
2662                            ),
2663                        );
2664                        if !ptr_added {
2665                            trace!("answer was not added for meta-query {:?}", &question);
2666                        }
2667                    }
2668                }
2669            } else {
2670                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2671                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2672                    let probe_name = question.entry_name();
2673
2674                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2675                        let now = current_time_millis();
2676
2677                        // Only do tiebreaking if probe already started.
2678                        // This check also helps avoid redo tiebreaking if start time
2679                        // was postponed.
2680                        if probe.start_time < now {
2681                            let incoming_records: Vec<_> = msg
2682                                .authorities()
2683                                .iter()
2684                                .filter(|r| r.get_name() == probe_name)
2685                                .collect();
2686
2687                            probe.tiebreaking(&incoming_records, now, probe_name);
2688                        }
2689                    }
2690                }
2691
2692                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2693                    for service in self.my_services.values() {
2694                        if service.get_status(if_index) != ServiceStatus::Announced {
2695                            continue;
2696                        }
2697
2698                        let service_hostname =
2699                            match dns_registry.name_changes.get(service.get_hostname()) {
2700                                Some(new_name) => new_name,
2701                                None => service.get_hostname(),
2702                            };
2703
2704                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2705                            let intf_addrs = if is_ipv4 {
2706                                service.get_addrs_on_my_intf_v4(intf)
2707                            } else {
2708                                service.get_addrs_on_my_intf_v6(intf)
2709                            };
2710                            if intf_addrs.is_empty()
2711                                && (qtype == RRType::A || qtype == RRType::AAAA)
2712                            {
2713                                let t = match qtype {
2714                                    RRType::A => "TYPE_A",
2715                                    RRType::AAAA => "TYPE_AAAA",
2716                                    _ => "invalid_type",
2717                                };
2718                                trace!(
2719                                    "Cannot find valid addrs for {} response on intf {:?}",
2720                                    t,
2721                                    &intf
2722                                );
2723                                return;
2724                            }
2725                            for address in intf_addrs {
2726                                out.add_answer(
2727                                    &msg,
2728                                    DnsAddress::new(
2729                                        service_hostname,
2730                                        ip_address_rr_type(&address),
2731                                        CLASS_IN | CLASS_CACHE_FLUSH,
2732                                        service.get_host_ttl(),
2733                                        address,
2734                                        intf.into(),
2735                                    ),
2736                                );
2737                            }
2738                        }
2739                    }
2740                }
2741
2742                let query_name = question.entry_name().to_lowercase();
2743                let service_opt = self
2744                    .my_services
2745                    .iter()
2746                    .find(|(k, _v)| {
2747                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2748                            Some(new_name) => new_name,
2749                            None => k,
2750                        };
2751                        service_name == &query_name
2752                    })
2753                    .map(|(_, v)| v);
2754
2755                let Some(service) = service_opt else {
2756                    continue;
2757                };
2758
2759                if service.get_status(if_index) != ServiceStatus::Announced {
2760                    continue;
2761                }
2762
2763                let intf_addrs = if is_ipv4 {
2764                    service.get_addrs_on_my_intf_v4(intf)
2765                } else {
2766                    service.get_addrs_on_my_intf_v6(intf)
2767                };
2768                if intf_addrs.is_empty() {
2769                    debug!(
2770                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2771                        &intf
2772                    );
2773                    continue;
2774                }
2775
2776                add_answer_of_service(
2777                    &mut out,
2778                    &msg,
2779                    question.entry_name(),
2780                    service,
2781                    qtype,
2782                    intf_addrs,
2783                );
2784            }
2785        }
2786
2787        if !out.answers_count() > 0 {
2788            out.set_id(msg.id());
2789            send_dns_outgoing(&out, intf, &sock.pktinfo);
2790
2791            let if_name = intf.name.clone();
2792
2793            self.increase_counter(Counter::Respond, 1);
2794            self.notify_monitors(DaemonEvent::Respond(if_name));
2795        }
2796
2797        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2798    }
2799
2800    /// Increases the value of `counter` by `count`.
2801    fn increase_counter(&mut self, counter: Counter, count: i64) {
2802        let key = counter.to_string();
2803        match self.counters.get_mut(&key) {
2804            Some(v) => *v += count,
2805            None => {
2806                self.counters.insert(key, count);
2807            }
2808        }
2809    }
2810
2811    /// Sets the value of `counter` to `count`.
2812    fn set_counter(&mut self, counter: Counter, count: i64) {
2813        let key = counter.to_string();
2814        self.counters.insert(key, count);
2815    }
2816
2817    fn signal_sock_drain(&self) {
2818        let mut signal_buf = [0; 1024];
2819
2820        // This recv is non-blocking as the socket is non-blocking.
2821        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2822            trace!(
2823                "signal socket recvd: {}",
2824                String::from_utf8_lossy(&signal_buf[0..sz])
2825            );
2826        }
2827    }
2828
2829    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2830        self.retransmissions.push(ReRun { next_time, command });
2831        self.add_timer(next_time);
2832    }
2833
2834    /// Sends service removal event to listeners for expired service records.
2835    /// `expired`: map of service type domain to set of instance names.
2836    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2837        for (ty_domain, sender) in self.service_queriers.iter() {
2838            if let Some(instances) = expired.get(ty_domain) {
2839                for instance_name in instances {
2840                    let event = ServiceEvent::ServiceRemoved(
2841                        ty_domain.to_string(),
2842                        instance_name.to_string(),
2843                    );
2844                    match sender.send(event) {
2845                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2846                        Err(e) => debug!("Failed to send event: {}", e),
2847                    }
2848                }
2849            }
2850        }
2851    }
2852
2853    /// The entry point that executes all commands received by the daemon.
2854    ///
2855    /// `repeating`: whether this is a retransmission.
2856    fn exec_command(&mut self, command: Command, repeating: bool) {
2857        trace!("exec_command: {:?} repeating: {}", &command, repeating);
2858        match command {
2859            Command::Browse(ty, next_delay, cache_only, listener) => {
2860                self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
2861            }
2862
2863            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2864                self.exec_command_resolve_hostname(
2865                    repeating, hostname, next_delay, listener, timeout,
2866                );
2867            }
2868
2869            Command::Register(service_info) => {
2870                self.register_service(service_info);
2871                self.increase_counter(Counter::Register, 1);
2872            }
2873
2874            Command::RegisterResend(fullname, intf) => {
2875                trace!("register-resend service: {fullname} on {}", &intf);
2876                self.exec_command_register_resend(fullname, intf);
2877            }
2878
2879            Command::Unregister(fullname, resp_s) => {
2880                trace!("unregister service {} repeat {}", &fullname, &repeating);
2881                self.exec_command_unregister(repeating, fullname, resp_s);
2882            }
2883
2884            Command::UnregisterResend(packet, if_index, is_ipv4) => {
2885                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2886            }
2887
2888            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2889
2890            Command::StopResolveHostname(hostname) => {
2891                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2892            }
2893
2894            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2895
2896            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2897
2898            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2899                Ok(()) => trace!("Sent status to the client"),
2900                Err(e) => debug!("Failed to send status: {}", e),
2901            },
2902
2903            Command::Monitor(resp_s) => {
2904                self.monitors.push(resp_s);
2905            }
2906
2907            Command::SetOption(daemon_opt) => {
2908                self.process_set_option(daemon_opt);
2909            }
2910
2911            Command::GetOption(resp_s) => {
2912                let val = DaemonOptionVal {
2913                    _service_name_len_max: self.service_name_len_max,
2914                    ip_check_interval: self.ip_check_interval,
2915                };
2916                if let Err(e) = resp_s.send(val) {
2917                    debug!("Failed to send options: {}", e);
2918                }
2919            }
2920
2921            Command::Verify(instance_fullname, timeout) => {
2922                self.exec_command_verify(instance_fullname, timeout, repeating);
2923            }
2924
2925            _ => {
2926                debug!("unexpected command: {:?}", &command);
2927            }
2928        }
2929    }
2930
2931    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2932        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2933        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2934        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2935        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2936        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2937        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2938        self.set_counter(Counter::Timer, self.timers.len() as i64);
2939
2940        let dns_registry_probe_count: usize = self
2941            .dns_registry_map
2942            .values()
2943            .map(|r| r.probing.len())
2944            .sum();
2945        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2946
2947        let dns_registry_active_count: usize = self
2948            .dns_registry_map
2949            .values()
2950            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2951            .sum();
2952        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2953
2954        let dns_registry_timer_count: usize = self
2955            .dns_registry_map
2956            .values()
2957            .map(|r| r.new_timers.len())
2958            .sum();
2959        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2960
2961        let dns_registry_name_change_count: usize = self
2962            .dns_registry_map
2963            .values()
2964            .map(|r| r.name_changes.len())
2965            .sum();
2966        self.set_counter(
2967            Counter::DnsRegistryNameChange,
2968            dns_registry_name_change_count as i64,
2969        );
2970
2971        // Send the metrics to the client.
2972        if let Err(e) = resp_s.send(self.counters.clone()) {
2973            debug!("Failed to send metrics: {}", e);
2974        }
2975    }
2976
2977    fn exec_command_browse(
2978        &mut self,
2979        repeating: bool,
2980        ty: String,
2981        next_delay: u32,
2982        cache_only: bool,
2983        listener: Sender<ServiceEvent>,
2984    ) {
2985        let pretty_addrs: Vec<String> = self
2986            .my_intfs
2987            .iter()
2988            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
2989            .collect();
2990
2991        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2992            "{ty} on {} interfaces [{}]",
2993            pretty_addrs.len(),
2994            pretty_addrs.join(", ")
2995        ))) {
2996            debug!(
2997                "Failed to send SearchStarted({})(repeating:{}): {}",
2998                &ty, repeating, e
2999            );
3000            return;
3001        }
3002
3003        let now = current_time_millis();
3004        if !repeating {
3005            // Binds a `listener` to querying mDNS domain type `ty`.
3006            //
3007            // If there is already a `listener`, it will be updated, i.e. overwritten.
3008            self.service_queriers.insert(ty.clone(), listener.clone());
3009
3010            // if we already have the records in our cache, just send them
3011            self.query_cache_for_service(&ty, &listener, now);
3012        }
3013
3014        if cache_only {
3015            // If cache_only is true, we do not send a query.
3016            match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3017                Ok(()) => debug!("SearchStopped sent for {}", &ty),
3018                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3019            }
3020            return;
3021        }
3022
3023        self.send_query(&ty, RRType::PTR);
3024        self.increase_counter(Counter::Browse, 1);
3025
3026        let next_time = now + (next_delay * 1000) as u64;
3027        let max_delay = 60 * 60;
3028        let delay = cmp::min(next_delay * 2, max_delay);
3029        self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3030    }
3031
3032    fn exec_command_resolve_hostname(
3033        &mut self,
3034        repeating: bool,
3035        hostname: String,
3036        next_delay: u32,
3037        listener: Sender<HostnameResolutionEvent>,
3038        timeout: Option<u64>,
3039    ) {
3040        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3041        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3042            "{} on addrs {:?}",
3043            &hostname, &addr_list
3044        ))) {
3045            debug!(
3046                "Failed to send ResolveStarted({})(repeating:{}): {}",
3047                &hostname, repeating, e
3048            );
3049            return;
3050        }
3051        if !repeating {
3052            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3053            // if we already have the records in our cache, just send them
3054            self.query_cache_for_hostname(&hostname, listener.clone());
3055        }
3056
3057        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3058        self.increase_counter(Counter::ResolveHostname, 1);
3059
3060        let now = current_time_millis();
3061        let next_time = now + u64::from(next_delay) * 1000;
3062        let max_delay = 60 * 60;
3063        let delay = cmp::min(next_delay * 2, max_delay);
3064
3065        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3066        if self
3067            .hostname_resolvers
3068            .get(&hostname)
3069            .and_then(|(_sender, timeout)| *timeout)
3070            .map(|timeout| next_time < timeout)
3071            .unwrap_or(true)
3072        {
3073            self.add_retransmission(
3074                next_time,
3075                Command::ResolveHostname(hostname, delay, listener, None),
3076            );
3077        }
3078    }
3079
3080    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3081        let pending_query = self.query_unresolved(&instance);
3082        let max_try = 3;
3083        if pending_query && try_count < max_try {
3084            // Note that if the current try already succeeds, the next retransmission
3085            // will be no-op as the cache has been updated.
3086            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3087            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3088        }
3089    }
3090
3091    fn exec_command_unregister(
3092        &mut self,
3093        repeating: bool,
3094        fullname: String,
3095        resp_s: Sender<UnregisterStatus>,
3096    ) {
3097        let response = match self.my_services.remove_entry(&fullname) {
3098            None => {
3099                debug!("unregister: cannot find such service {}", &fullname);
3100                UnregisterStatus::NotFound
3101            }
3102            Some((_k, info)) => {
3103                let mut timers = Vec::new();
3104
3105                for (if_index, intf) in self.my_intfs.iter() {
3106                    let packet = self.unregister_service(&info, intf, &self.ipv4_sock.pktinfo);
3107                    // repeat for one time just in case some peers miss the message
3108                    if !repeating && !packet.is_empty() {
3109                        let next_time = current_time_millis() + 120;
3110                        self.retransmissions.push(ReRun {
3111                            next_time,
3112                            command: Command::UnregisterResend(packet, *if_index, true),
3113                        });
3114                        timers.push(next_time);
3115                    }
3116
3117                    // ipv6
3118
3119                    let packet = self.unregister_service(&info, intf, &self.ipv6_sock.pktinfo);
3120                    if !repeating && !packet.is_empty() {
3121                        let next_time = current_time_millis() + 120;
3122                        self.retransmissions.push(ReRun {
3123                            next_time,
3124                            command: Command::UnregisterResend(packet, *if_index, false),
3125                        });
3126                        timers.push(next_time);
3127                    }
3128                }
3129
3130                for t in timers {
3131                    self.add_timer(t);
3132                }
3133
3134                self.increase_counter(Counter::Unregister, 1);
3135                UnregisterStatus::OK
3136            }
3137        };
3138        if let Err(e) = resp_s.send(response) {
3139            debug!("unregister: failed to send response: {}", e);
3140        }
3141    }
3142
3143    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3144        let Some(intf) = self.my_intfs.get(&if_index) else {
3145            return;
3146        };
3147        let sock = if is_ipv4 {
3148            &self.ipv4_sock.pktinfo
3149        } else {
3150            &self.ipv6_sock.pktinfo
3151        };
3152
3153        let if_addr = if is_ipv4 {
3154            match intf.next_ifaddr_v4() {
3155                Some(addr) => addr,
3156                None => return,
3157            }
3158        } else {
3159            match intf.next_ifaddr_v6() {
3160                Some(addr) => addr,
3161                None => return,
3162            }
3163        };
3164
3165        debug!("UnregisterResend from {:?}", if_addr);
3166        multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, sock);
3167
3168        self.increase_counter(Counter::UnregisterResend, 1);
3169    }
3170
3171    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3172        match self.service_queriers.remove_entry(&ty_domain) {
3173            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3174            Some((ty, sender)) => {
3175                // Remove pending browse commands in the reruns.
3176                trace!("StopBrowse: removed queryer for {}", &ty);
3177                let mut i = 0;
3178                while i < self.retransmissions.len() {
3179                    if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3180                        if t == &ty {
3181                            self.retransmissions.remove(i);
3182                            trace!("StopBrowse: removed retransmission for {}", &ty);
3183                            continue;
3184                        }
3185                    }
3186                    i += 1;
3187                }
3188
3189                // Remove cache entries.
3190                self.cache.remove_service_type(&ty_domain);
3191
3192                // Notify the client.
3193                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3194                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3195                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3196                }
3197            }
3198        }
3199    }
3200
3201    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3202        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3203            // Remove pending resolve commands in the reruns.
3204            trace!("StopResolve: removed queryer for {}", &host);
3205            let mut i = 0;
3206            while i < self.retransmissions.len() {
3207                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3208                    if t == &host {
3209                        self.retransmissions.remove(i);
3210                        trace!("StopResolve: removed retransmission for {}", &host);
3211                        continue;
3212                    }
3213                }
3214                i += 1;
3215            }
3216
3217            // Notify the client.
3218            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3219                Ok(()) => trace!("Sent SearchStopped to the listener"),
3220                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3221            }
3222        }
3223    }
3224
3225    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3226        let Some(info) = self.my_services.get_mut(&fullname) else {
3227            trace!("announce: cannot find such service {}", &fullname);
3228            return;
3229        };
3230
3231        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3232            return;
3233        };
3234
3235        let Some(intf) = self.my_intfs.get(&if_index) else {
3236            return;
3237        };
3238
3239        let announced_v4 =
3240            announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
3241        let announced_v6 =
3242            announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
3243
3244        if announced_v4 || announced_v6 {
3245            let mut hostname = info.get_hostname();
3246            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3247                hostname = new_name;
3248            }
3249            let service_name = match dns_registry.name_changes.get(&fullname) {
3250                Some(new_name) => new_name.to_string(),
3251                None => fullname,
3252            };
3253
3254            debug!("resend: announce service {service_name} on {}", intf.name);
3255
3256            notify_monitors(
3257                &mut self.monitors,
3258                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3259            );
3260            info.set_status(if_index, ServiceStatus::Announced);
3261        } else {
3262            debug!("register-resend should not fail");
3263        }
3264
3265        self.increase_counter(Counter::RegisterResend, 1);
3266    }
3267
3268    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3269        /*
3270        RFC 6762 section 10.4:
3271        ...
3272        When the cache receives this hint that it should reconfirm some
3273        record, it MUST issue two or more queries for the resource record in
3274        dispute.  If no response is received within ten seconds, then, even
3275        though its TTL may indicate that it is not yet due to expire, that
3276        record SHOULD be promptly flushed from the cache.
3277        */
3278        let now = current_time_millis();
3279        let expire_at = if repeating {
3280            None
3281        } else {
3282            Some(now + timeout.as_millis() as u64)
3283        };
3284
3285        // send query for the resource records.
3286        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3287
3288        if !record_vec.is_empty() {
3289            let query_vec: Vec<(&str, RRType)> = record_vec
3290                .iter()
3291                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3292                .collect();
3293            self.send_query_vec(&query_vec);
3294
3295            if let Some(new_expire) = expire_at {
3296                self.add_timer(new_expire); // ensure a check for the new expire time.
3297
3298                // schedule a resend 1 second later
3299                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3300            }
3301        }
3302    }
3303
3304    /// Refresh cached service records with active queriers
3305    fn refresh_active_services(&mut self) {
3306        let mut query_ptr_count = 0;
3307        let mut query_srv_count = 0;
3308        let mut new_timers = HashSet::new();
3309        let mut query_addr_count = 0;
3310
3311        for (ty_domain, _sender) in self.service_queriers.iter() {
3312            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3313            if !refreshed_timers.is_empty() {
3314                trace!("sending refresh query for PTR: {}", ty_domain);
3315                self.send_query(ty_domain, RRType::PTR);
3316                query_ptr_count += 1;
3317                new_timers.extend(refreshed_timers);
3318            }
3319
3320            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3321            for (instance, types) in instances {
3322                trace!("sending refresh query for: {}", &instance);
3323                let query_vec = types
3324                    .into_iter()
3325                    .map(|ty| (instance.as_str(), ty))
3326                    .collect::<Vec<_>>();
3327                self.send_query_vec(&query_vec);
3328                query_srv_count += 1;
3329            }
3330            new_timers.extend(timers);
3331            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3332            for hostname in hostnames.iter() {
3333                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3334                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3335                query_addr_count += 2;
3336            }
3337            new_timers.extend(timers);
3338        }
3339
3340        for timer in new_timers {
3341            self.add_timer(timer);
3342        }
3343
3344        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3345        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3346        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3347    }
3348}
3349
3350/// Adds one or more answers of a service for incoming msg and RR entry name.
3351fn add_answer_of_service(
3352    out: &mut DnsOutgoing,
3353    msg: &DnsIncoming,
3354    entry_name: &str,
3355    service: &ServiceInfo,
3356    qtype: RRType,
3357    intf_addrs: Vec<IpAddr>,
3358) {
3359    if qtype == RRType::SRV || qtype == RRType::ANY {
3360        out.add_answer(
3361            msg,
3362            DnsSrv::new(
3363                entry_name,
3364                CLASS_IN | CLASS_CACHE_FLUSH,
3365                service.get_host_ttl(),
3366                service.get_priority(),
3367                service.get_weight(),
3368                service.get_port(),
3369                service.get_hostname().to_string(),
3370            ),
3371        );
3372    }
3373
3374    if qtype == RRType::TXT || qtype == RRType::ANY {
3375        out.add_answer(
3376            msg,
3377            DnsTxt::new(
3378                entry_name,
3379                CLASS_IN | CLASS_CACHE_FLUSH,
3380                service.get_other_ttl(),
3381                service.generate_txt(),
3382            ),
3383        );
3384    }
3385
3386    if qtype == RRType::SRV {
3387        for address in intf_addrs {
3388            out.add_additional_answer(DnsAddress::new(
3389                service.get_hostname(),
3390                ip_address_rr_type(&address),
3391                CLASS_IN | CLASS_CACHE_FLUSH,
3392                service.get_host_ttl(),
3393                address,
3394                InterfaceId::default(),
3395            ));
3396        }
3397    }
3398}
3399
3400/// All possible events sent to the client from the daemon
3401/// regarding service discovery.
3402#[derive(Clone, Debug)]
3403#[non_exhaustive]
3404pub enum ServiceEvent {
3405    /// Started searching for a service type.
3406    SearchStarted(String),
3407
3408    /// Found a specific (service_type, fullname).
3409    ServiceFound(String, String),
3410
3411    /// Resolved a service instance in a ResolvedService struct.
3412    ServiceResolved(Box<ResolvedService>),
3413
3414    /// A service instance (service_type, fullname) was removed.
3415    ServiceRemoved(String, String),
3416
3417    /// Stopped searching for a service type.
3418    SearchStopped(String),
3419}
3420
3421/// All possible events sent to the client from the daemon
3422/// regarding host resolution.
3423#[derive(Clone, Debug)]
3424#[non_exhaustive]
3425pub enum HostnameResolutionEvent {
3426    /// Started searching for the ip address of a hostname.
3427    SearchStarted(String),
3428    /// One or more addresses for a hostname has been found.
3429    AddressesFound(String, HashSet<ScopedIp>),
3430    /// One or more addresses for a hostname has been removed.
3431    AddressesRemoved(String, HashSet<ScopedIp>),
3432    /// The search for the ip address of a hostname has timed out.
3433    SearchTimeout(String),
3434    /// Stopped searching for the ip address of a hostname.
3435    SearchStopped(String),
3436}
3437
3438/// Some notable events from the daemon besides [`ServiceEvent`].
3439/// These events are expected to happen infrequently.
3440#[derive(Clone, Debug)]
3441#[non_exhaustive]
3442pub enum DaemonEvent {
3443    /// Daemon unsolicitly announced a service from an interface.
3444    Announce(String, String),
3445
3446    /// Daemon encountered an error.
3447    Error(Error),
3448
3449    /// Daemon detected a new IP address from the host.
3450    IpAdd(IpAddr),
3451
3452    /// Daemon detected a IP address removed from the host.
3453    IpDel(IpAddr),
3454
3455    /// Daemon resolved a name conflict by changing one of its names.
3456    /// see [DnsNameChange] for more details.
3457    NameChange(DnsNameChange),
3458
3459    /// Send out a multicast response via an interface.
3460    Respond(String),
3461}
3462
3463/// Represents a name change due to a name conflict resolution.
3464/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3465#[derive(Clone, Debug)]
3466pub struct DnsNameChange {
3467    /// The original name set in `ServiceInfo` by the user.
3468    pub original: String,
3469
3470    /// A new name is created by appending a suffix after the original name.
3471    ///
3472    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3473    /// - for a host name, the suffix is `-N`, where N starts at 2.
3474    ///
3475    /// For example:
3476    ///
3477    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3478    /// - Host name `foo.local.` becomes `foo-2.local.`
3479    pub new_name: String,
3480
3481    /// The resource record type
3482    pub rr_type: RRType,
3483
3484    /// The interface where the name conflict and its change happened.
3485    pub intf_name: String,
3486}
3487
3488/// Commands supported by the daemon
3489#[derive(Debug)]
3490enum Command {
3491    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3492    Browse(String, u32, bool, Sender<ServiceEvent>),
3493
3494    /// Resolve a hostname to IP addresses.
3495    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3496
3497    /// Register a service
3498    Register(ServiceInfo),
3499
3500    /// Unregister a service
3501    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3502
3503    /// Announce again a service to local network
3504    RegisterResend(String, u32), // (fullname)
3505
3506    /// Resend unregister packet.
3507    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
3508
3509    /// Stop browsing a service type
3510    StopBrowse(String), // (ty_domain)
3511
3512    /// Stop resolving a hostname
3513    StopResolveHostname(String), // (hostname)
3514
3515    /// Send query to resolve a service instance.
3516    /// This is used when a PTR record exists but SRV & TXT records are missing.
3517    Resolve(String, u16), // (service_instance_fullname, try_count)
3518
3519    /// Read the current values of the counters
3520    GetMetrics(Sender<Metrics>),
3521
3522    /// Get the current status of the daemon.
3523    GetStatus(Sender<DaemonStatus>),
3524
3525    /// Monitor noticeable events in the daemon.
3526    Monitor(Sender<DaemonEvent>),
3527
3528    SetOption(DaemonOption),
3529
3530    GetOption(Sender<DaemonOptionVal>),
3531
3532    /// Proactively confirm a DNS resource record.
3533    ///
3534    /// The intention is to check if a service name or IP address still valid
3535    /// before its TTL expires.
3536    Verify(String, Duration),
3537
3538    Exit(Sender<DaemonStatus>),
3539}
3540
3541impl fmt::Display for Command {
3542    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3543        match self {
3544            Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3545            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3546            Self::Exit(_) => write!(f, "Command Exit"),
3547            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3548            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3549            Self::Monitor(_) => write!(f, "Command Monitor"),
3550            Self::Register(_) => write!(f, "Command Register"),
3551            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3552            Self::SetOption(_) => write!(f, "Command SetOption"),
3553            Self::GetOption(_) => write!(f, "Command GetOption"),
3554            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3555            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3556            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3557            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3558            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3559            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3560        }
3561    }
3562}
3563
3564struct DaemonOptionVal {
3565    _service_name_len_max: u8,
3566    ip_check_interval: u64,
3567}
3568
3569#[derive(Debug)]
3570enum DaemonOption {
3571    ServiceNameLenMax(u8),
3572    IpCheckInterval(u64),
3573    EnableInterface(Vec<IfKind>),
3574    DisableInterface(Vec<IfKind>),
3575    MulticastLoopV4(bool),
3576    MulticastLoopV6(bool),
3577    AcceptUnsolicited(bool),
3578    #[cfg(test)]
3579    TestDownInterface(String),
3580    #[cfg(test)]
3581    TestUpInterface(String),
3582}
3583
3584/// The length of Service Domain name supported in this lib.
3585const DOMAIN_LEN: usize = "._tcp.local.".len();
3586
3587/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3588fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3589    if ty_domain.len() <= DOMAIN_LEN + 1 {
3590        // service name cannot be empty or only '_'.
3591        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3592    }
3593
3594    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3595    if service_name_len > limit as usize {
3596        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3597    }
3598    Ok(())
3599}
3600
3601/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3602fn check_domain_suffix(name: &str) -> Result<()> {
3603    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3604        return Err(e_fmt!(
3605            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3606            name
3607        ));
3608    }
3609
3610    Ok(())
3611}
3612
3613/// Validate the service name in a fully qualified name.
3614///
3615/// A Full Name = <Instance>.<Service>.<Domain>
3616/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3617///
3618/// Note: this function does not check for the length of the service name.
3619/// Instead, `register_service` method will check the length.
3620fn check_service_name(fullname: &str) -> Result<()> {
3621    check_domain_suffix(fullname)?;
3622
3623    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3624    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3625
3626    if &name[0..1] != "_" {
3627        return Err(e_fmt!("Service name must start with '_'"));
3628    }
3629
3630    let name = &name[1..];
3631
3632    if name.contains("--") {
3633        return Err(e_fmt!("Service name must not contain '--'"));
3634    }
3635
3636    if name.starts_with('-') || name.ends_with('-') {
3637        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3638    }
3639
3640    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3641    if ascii_count < 1 {
3642        return Err(e_fmt!(
3643            "Service name must contain at least one letter (eg: 'A-Za-z')"
3644        ));
3645    }
3646
3647    Ok(())
3648}
3649
3650/// Validate a hostname.
3651fn check_hostname(hostname: &str) -> Result<()> {
3652    if !hostname.ends_with(".local.") {
3653        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3654    }
3655
3656    if hostname == ".local." {
3657        return Err(e_fmt!(
3658            "The part of the hostname before '.local.' cannot be empty"
3659        ));
3660    }
3661
3662    if hostname.len() > 255 {
3663        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3664    }
3665
3666    Ok(())
3667}
3668
3669fn call_service_listener(
3670    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3671    ty_domain: &str,
3672    event: ServiceEvent,
3673) {
3674    if let Some(listener) = listeners_map.get(ty_domain) {
3675        match listener.send(event) {
3676            Ok(()) => trace!("Sent event to listener successfully"),
3677            Err(e) => debug!("Failed to send event: {}", e),
3678        }
3679    }
3680}
3681
3682fn call_hostname_resolution_listener(
3683    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3684    hostname: &str,
3685    event: HostnameResolutionEvent,
3686) {
3687    let hostname_lower = hostname.to_lowercase();
3688    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3689        match listener.send(event) {
3690            Ok(()) => trace!("Sent event to listener successfully"),
3691            Err(e) => debug!("Failed to send event: {}", e),
3692        }
3693    }
3694}
3695
3696/// Returns valid network interfaces in the host system.
3697/// Operational down interfaces are excluded.
3698/// Loopback interfaces are excluded if `with_loopback` is false.
3699fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3700    if_addrs::get_if_addrs()
3701        .unwrap_or_default()
3702        .into_iter()
3703        .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3704        .collect()
3705}
3706
3707fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3708    let if_name = &my_intf.name;
3709
3710    let if_addr = if sock.domain() == Domain::IPV4 {
3711        match my_intf.next_ifaddr_v4() {
3712            Some(addr) => addr,
3713            None => return vec![],
3714        }
3715    } else {
3716        match my_intf.next_ifaddr_v6() {
3717            Some(addr) => addr,
3718            None => return vec![],
3719        }
3720    };
3721
3722    send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3723}
3724
3725/// Send an outgoing mDNS query or response, and returns the packet bytes.
3726fn send_dns_outgoing_impl(
3727    out: &DnsOutgoing,
3728    if_name: &str,
3729    if_index: u32,
3730    if_addr: &IfAddr,
3731    sock: &PktInfoUdpSocket,
3732) -> Vec<Vec<u8>> {
3733    let qtype = if out.is_query() {
3734        "query"
3735    } else {
3736        if out.answers_count() == 0 && out.additionals().is_empty() {
3737            return vec![]; // no need to send empty response
3738        }
3739        "response"
3740    };
3741    trace!(
3742        "send {}: {} questions {} answers {} authorities {} additional",
3743        qtype,
3744        out.questions().len(),
3745        out.answers_count(),
3746        out.authorities().len(),
3747        out.additionals().len()
3748    );
3749
3750    match if_addr.ip() {
3751        IpAddr::V4(ipv4) => {
3752            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3753                debug!(
3754                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3755                    ipv4, e
3756                );
3757                return vec![]; // cannot send without a valid interface
3758            }
3759        }
3760        IpAddr::V6(ipv6) => {
3761            if let Err(e) = sock.set_multicast_if_v6(if_index) {
3762                debug!(
3763                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3764                    ipv6, e
3765                );
3766                return vec![]; // cannot send without a valid interface
3767            }
3768        }
3769    }
3770
3771    let packet_list = out.to_data_on_wire();
3772    for packet in packet_list.iter() {
3773        multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3774    }
3775    packet_list
3776}
3777
3778/// Sends a multicast packet, and returns the packet bytes.
3779fn multicast_on_intf(
3780    packet: &[u8],
3781    if_name: &str,
3782    if_index: u32,
3783    if_addr: &IfAddr,
3784    socket: &PktInfoUdpSocket,
3785) {
3786    if packet.len() > MAX_MSG_ABSOLUTE {
3787        debug!("Drop over-sized packet ({})", packet.len());
3788        return;
3789    }
3790
3791    let addr: SocketAddr = match if_addr {
3792        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3793        if_addrs::IfAddr::V6(_) => {
3794            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3795            sock.set_scope_id(if_index); // Choose iface for multicast
3796            sock.into()
3797        }
3798    };
3799
3800    // Sends out `packet` to `addr` on the socket.
3801    let sock_addr = addr.into();
3802    match socket.send_to(packet, &sock_addr) {
3803        Ok(sz) => trace!(
3804            "sent out {} bytes on interface {} (idx {}) addr {}",
3805            sz,
3806            if_name,
3807            if_index,
3808            if_addr.ip()
3809        ),
3810        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3811    }
3812}
3813
3814/// Returns true if `name` is a valid instance name of format:
3815/// <instance>.<service_type>.<_udp|_tcp>.local.
3816/// Note: <instance> could contain '.' as well.
3817fn valid_instance_name(name: &str) -> bool {
3818    name.split('.').count() >= 5
3819}
3820
3821fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3822    monitors.retain(|sender| {
3823        if let Err(e) = sender.try_send(event.clone()) {
3824            debug!("notify_monitors: try_send: {}", &e);
3825            if matches!(e, TrySendError::Disconnected(_)) {
3826                return false; // This monitor is dropped.
3827            }
3828        }
3829        true
3830    });
3831}
3832
3833/// Check if all unique records passed "probing", and if yes, create a packet
3834/// to announce the service.
3835fn prepare_announce(
3836    info: &ServiceInfo,
3837    intf: &MyIntf,
3838    dns_registry: &mut DnsRegistry,
3839    is_ipv4: bool,
3840) -> Option<DnsOutgoing> {
3841    let intf_addrs = if is_ipv4 {
3842        info.get_addrs_on_my_intf_v4(intf)
3843    } else {
3844        info.get_addrs_on_my_intf_v6(intf)
3845    };
3846
3847    if intf_addrs.is_empty() {
3848        debug!(
3849            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3850            &intf.name
3851        );
3852        return None;
3853    }
3854
3855    // check if we changed our name due to conflicts.
3856    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3857        Some(new_name) => new_name,
3858        None => info.get_fullname(),
3859    };
3860
3861    debug!(
3862        "prepare to announce service {service_fullname} on {:?}",
3863        &intf_addrs
3864    );
3865
3866    let mut probing_count = 0;
3867    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3868    let create_time = current_time_millis() + fastrand::u64(0..250);
3869
3870    out.add_answer_at_time(
3871        DnsPointer::new(
3872            info.get_type(),
3873            RRType::PTR,
3874            CLASS_IN,
3875            info.get_other_ttl(),
3876            service_fullname.to_string(),
3877        ),
3878        0,
3879    );
3880
3881    if let Some(sub) = info.get_subtype() {
3882        trace!("Adding subdomain {}", sub);
3883        out.add_answer_at_time(
3884            DnsPointer::new(
3885                sub,
3886                RRType::PTR,
3887                CLASS_IN,
3888                info.get_other_ttl(),
3889                service_fullname.to_string(),
3890            ),
3891            0,
3892        );
3893    }
3894
3895    // SRV records.
3896    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3897        Some(new_name) => new_name.to_string(),
3898        None => info.get_hostname().to_string(),
3899    };
3900
3901    let mut srv = DnsSrv::new(
3902        info.get_fullname(),
3903        CLASS_IN | CLASS_CACHE_FLUSH,
3904        info.get_host_ttl(),
3905        info.get_priority(),
3906        info.get_weight(),
3907        info.get_port(),
3908        hostname,
3909    );
3910
3911    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3912        srv.get_record_mut().set_new_name(new_name.to_string());
3913    }
3914
3915    if !info.requires_probe()
3916        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3917    {
3918        out.add_answer_at_time(srv, 0);
3919    } else {
3920        probing_count += 1;
3921    }
3922
3923    // TXT records.
3924
3925    let mut txt = DnsTxt::new(
3926        info.get_fullname(),
3927        CLASS_IN | CLASS_CACHE_FLUSH,
3928        info.get_other_ttl(),
3929        info.generate_txt(),
3930    );
3931
3932    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3933        txt.get_record_mut().set_new_name(new_name.to_string());
3934    }
3935
3936    if !info.requires_probe()
3937        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3938    {
3939        out.add_answer_at_time(txt, 0);
3940    } else {
3941        probing_count += 1;
3942    }
3943
3944    // Address records. (A and AAAA)
3945
3946    let hostname = info.get_hostname();
3947    for address in intf_addrs {
3948        let mut dns_addr = DnsAddress::new(
3949            hostname,
3950            ip_address_rr_type(&address),
3951            CLASS_IN | CLASS_CACHE_FLUSH,
3952            info.get_host_ttl(),
3953            address,
3954            intf.into(),
3955        );
3956
3957        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3958            dns_addr.get_record_mut().set_new_name(new_name.to_string());
3959        }
3960
3961        if !info.requires_probe()
3962            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3963        {
3964            out.add_answer_at_time(dns_addr, 0);
3965        } else {
3966            probing_count += 1;
3967        }
3968    }
3969
3970    if probing_count > 0 {
3971        return None;
3972    }
3973
3974    Some(out)
3975}
3976
3977/// Send an unsolicited response for owned service via `intf` and `sock`.
3978/// Returns true if sent out successfully for IPv4 or IPv6.
3979fn announce_service_on_intf(
3980    dns_registry: &mut DnsRegistry,
3981    info: &ServiceInfo,
3982    intf: &MyIntf,
3983    sock: &PktInfoUdpSocket,
3984) -> bool {
3985    let is_ipv4 = sock.domain() == Domain::IPV4;
3986    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
3987        send_dns_outgoing(&out, intf, sock);
3988        return true;
3989    }
3990
3991    false
3992}
3993
3994/// Returns a new name based on the `original` to avoid conflicts.
3995/// If the name already contains a number in parentheses, increments that number.
3996///
3997/// Examples:
3998/// - `foo.local.` becomes `foo (2).local.`
3999/// - `foo (2).local.` becomes `foo (3).local.`
4000/// - `foo (9)` becomes `foo (10)`
4001fn name_change(original: &str) -> String {
4002    let mut parts: Vec<_> = original.split('.').collect();
4003    let Some(first_part) = parts.get_mut(0) else {
4004        return format!("{original} (2)");
4005    };
4006
4007    let mut new_name = format!("{first_part} (2)");
4008
4009    // check if there is already has `(<num>)` suffix.
4010    if let Some(paren_pos) = first_part.rfind(" (") {
4011        // Check if there's a closing parenthesis
4012        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4013            let absolute_end_pos = paren_pos + end_paren;
4014            // Only process if the closing parenthesis is the last character
4015            if absolute_end_pos == first_part.len() - 1 {
4016                let num_start = paren_pos + 2; // Skip " ("
4017                                               // Try to parse the number between parentheses
4018                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4019                    let base_name = &first_part[..paren_pos];
4020                    new_name = format!("{} ({})", base_name, number + 1)
4021                }
4022            }
4023        }
4024    }
4025
4026    *first_part = &new_name;
4027    parts.join(".")
4028}
4029
4030/// Returns a new name based on the `original` to avoid conflicts.
4031/// If the name already contains a hyphenated number, increments that number.
4032///
4033/// Examples:
4034/// - `foo.local.` becomes `foo-2.local.`
4035/// - `foo-2.local.` becomes `foo-3.local.`
4036/// - `foo` becomes `foo-2`
4037fn hostname_change(original: &str) -> String {
4038    let mut parts: Vec<_> = original.split('.').collect();
4039    let Some(first_part) = parts.get_mut(0) else {
4040        return format!("{original}-2");
4041    };
4042
4043    let mut new_name = format!("{first_part}-2");
4044
4045    // check if there is already a `-<num>` suffix
4046    if let Some(hyphen_pos) = first_part.rfind('-') {
4047        // Try to parse everything after the hyphen as a number
4048        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4049            let base_name = &first_part[..hyphen_pos];
4050            new_name = format!("{}-{}", base_name, number + 1);
4051        }
4052    }
4053
4054    *first_part = &new_name;
4055    parts.join(".")
4056}
4057
4058fn add_answer_with_additionals(
4059    out: &mut DnsOutgoing,
4060    msg: &DnsIncoming,
4061    service: &ServiceInfo,
4062    intf: &MyIntf,
4063    dns_registry: &DnsRegistry,
4064    is_ipv4: bool,
4065) {
4066    let intf_addrs = if is_ipv4 {
4067        service.get_addrs_on_my_intf_v4(intf)
4068    } else {
4069        service.get_addrs_on_my_intf_v6(intf)
4070    };
4071    if intf_addrs.is_empty() {
4072        trace!("No addrs on LAN of intf {:?}", intf);
4073        return;
4074    }
4075
4076    // check if we changed our name due to conflicts.
4077    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4078        Some(new_name) => new_name,
4079        None => service.get_fullname(),
4080    };
4081
4082    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4083        Some(new_name) => new_name,
4084        None => service.get_hostname(),
4085    };
4086
4087    let ptr_added = out.add_answer(
4088        msg,
4089        DnsPointer::new(
4090            service.get_type(),
4091            RRType::PTR,
4092            CLASS_IN,
4093            service.get_other_ttl(),
4094            service_fullname.to_string(),
4095        ),
4096    );
4097
4098    if !ptr_added {
4099        trace!("answer was not added for msg {:?}", msg);
4100        return;
4101    }
4102
4103    if let Some(sub) = service.get_subtype() {
4104        trace!("Adding subdomain {}", sub);
4105        out.add_additional_answer(DnsPointer::new(
4106            sub,
4107            RRType::PTR,
4108            CLASS_IN,
4109            service.get_other_ttl(),
4110            service_fullname.to_string(),
4111        ));
4112    }
4113
4114    // Add recommended additional answers according to
4115    // https://tools.ietf.org/html/rfc6763#section-12.1.
4116    out.add_additional_answer(DnsSrv::new(
4117        service_fullname,
4118        CLASS_IN | CLASS_CACHE_FLUSH,
4119        service.get_host_ttl(),
4120        service.get_priority(),
4121        service.get_weight(),
4122        service.get_port(),
4123        hostname.to_string(),
4124    ));
4125
4126    out.add_additional_answer(DnsTxt::new(
4127        service_fullname,
4128        CLASS_IN | CLASS_CACHE_FLUSH,
4129        service.get_other_ttl(),
4130        service.generate_txt(),
4131    ));
4132
4133    for address in intf_addrs {
4134        out.add_additional_answer(DnsAddress::new(
4135            hostname,
4136            ip_address_rr_type(&address),
4137            CLASS_IN | CLASS_CACHE_FLUSH,
4138            service.get_host_ttl(),
4139            address,
4140            intf.into(),
4141        ));
4142    }
4143}
4144
4145/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4146/// that are finished.
4147fn check_probing(
4148    dns_registry: &mut DnsRegistry,
4149    timers: &mut BinaryHeap<Reverse<u64>>,
4150    now: u64,
4151) -> (DnsOutgoing, Vec<String>) {
4152    let mut expired_probes = Vec::new();
4153    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4154
4155    for (name, probe) in dns_registry.probing.iter_mut() {
4156        if now >= probe.next_send {
4157            if probe.expired(now) {
4158                // move the record to active
4159                expired_probes.push(name.clone());
4160            } else {
4161                out.add_question(name, RRType::ANY);
4162
4163                /*
4164                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4165                ...
4166                for tiebreaking to work correctly in all
4167                cases, the Authority Section must contain *all* the records and
4168                proposed rdata being probed for uniqueness.
4169                    */
4170                for record in probe.records.iter() {
4171                    out.add_authority(record.clone());
4172                }
4173
4174                probe.update_next_send(now);
4175
4176                // add timer
4177                timers.push(Reverse(probe.next_send));
4178            }
4179        }
4180    }
4181
4182    (out, expired_probes)
4183}
4184
4185/// Process expired probes on an interface and return a list of services
4186/// that are waiting for the probe to finish.
4187///
4188/// `DnsNameChange` events are sent to the monitors.
4189fn handle_expired_probes(
4190    expired_probes: Vec<String>,
4191    intf_name: &str,
4192    dns_registry: &mut DnsRegistry,
4193    monitors: &mut Vec<Sender<DaemonEvent>>,
4194) -> HashSet<String> {
4195    let mut waiting_services = HashSet::new();
4196
4197    for name in expired_probes {
4198        let Some(probe) = dns_registry.probing.remove(&name) else {
4199            continue;
4200        };
4201
4202        // send notifications about name changes
4203        for record in probe.records.iter() {
4204            if let Some(new_name) = record.get_record().get_new_name() {
4205                dns_registry
4206                    .name_changes
4207                    .insert(name.clone(), new_name.to_string());
4208
4209                let event = DnsNameChange {
4210                    original: record.get_record().get_original_name().to_string(),
4211                    new_name: new_name.to_string(),
4212                    rr_type: record.get_type(),
4213                    intf_name: intf_name.to_string(),
4214                };
4215                debug!("Name change event: {:?}", &event);
4216                notify_monitors(monitors, DaemonEvent::NameChange(event));
4217            }
4218        }
4219
4220        // move RR from probe to active.
4221        debug!(
4222            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4223            probe.records.len(),
4224            probe.waiting_services.len(),
4225        );
4226
4227        // Move records to active and plan to wake up services if records are not empty.
4228        if !probe.records.is_empty() {
4229            match dns_registry.active.get_mut(&name) {
4230                Some(records) => {
4231                    records.extend(probe.records);
4232                }
4233                None => {
4234                    dns_registry.active.insert(name, probe.records);
4235                }
4236            }
4237
4238            waiting_services.extend(probe.waiting_services);
4239        }
4240    }
4241
4242    waiting_services
4243}
4244
4245#[cfg(test)]
4246mod tests {
4247    use super::{
4248        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4249        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4250        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
4251        MDNS_PORT,
4252    };
4253    use crate::{
4254        dns_parser::{
4255            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4256            FLAGS_AA, FLAGS_QR_RESPONSE,
4257        },
4258        service_daemon::{add_answer_of_service, check_hostname},
4259    };
4260    use std::{
4261        net::{SocketAddr, SocketAddrV4},
4262        time::{Duration, SystemTime},
4263    };
4264    use test_log::test;
4265
4266    #[test]
4267    fn test_socketaddr_print() {
4268        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
4269        let print = format!("{}", addr);
4270        assert_eq!(print, "224.0.0.251:5353");
4271    }
4272
4273    #[test]
4274    fn test_instance_name() {
4275        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4276        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4277        assert!(!valid_instance_name("_printer._tcp.local."));
4278    }
4279
4280    #[test]
4281    fn test_check_service_name_length() {
4282        let result = check_service_name_length("_tcp", 100);
4283        assert!(result.is_err());
4284        if let Err(e) = result {
4285            println!("{}", e);
4286        }
4287    }
4288
4289    #[test]
4290    fn test_check_hostname() {
4291        // valid hostnames
4292        for hostname in &[
4293            "my_host.local.",
4294            &("A".repeat(255 - ".local.".len()) + ".local."),
4295        ] {
4296            let result = check_hostname(hostname);
4297            assert!(result.is_ok());
4298        }
4299
4300        // erroneous hostnames
4301        for hostname in &[
4302            "my_host.local",
4303            ".local.",
4304            &("A".repeat(256 - ".local.".len()) + ".local."),
4305        ] {
4306            let result = check_hostname(hostname);
4307            assert!(result.is_err());
4308            if let Err(e) = result {
4309                println!("{}", e);
4310            }
4311        }
4312    }
4313
4314    #[test]
4315    fn test_check_domain_suffix() {
4316        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4317        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4318        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4319        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4320        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4321        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4322    }
4323
4324    #[test]
4325    fn test_service_with_temporarily_invalidated_ptr() {
4326        // Create a daemon
4327        let d = ServiceDaemon::new().expect("Failed to create daemon");
4328
4329        let service = "_test_inval_ptr._udp.local.";
4330        let host_name = "my_host_tmp_invalidated_ptr.local.";
4331        let intfs: Vec<_> = my_ip_interfaces(false);
4332        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4333        let port = 5201;
4334        let my_service =
4335            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4336                .expect("invalid service info")
4337                .enable_addr_auto();
4338        let result = d.register(my_service.clone());
4339        assert!(result.is_ok());
4340
4341        // Browse for a service
4342        let browse_chan = d.browse(service).unwrap();
4343        let timeout = Duration::from_secs(2);
4344        let mut resolved = false;
4345
4346        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4347            match event {
4348                ServiceEvent::ServiceResolved(info) => {
4349                    resolved = true;
4350                    println!("Resolved a service of {}", &info.fullname);
4351                    break;
4352                }
4353                e => {
4354                    println!("Received event {:?}", e);
4355                }
4356            }
4357        }
4358
4359        assert!(resolved);
4360
4361        println!("Stopping browse of {}", service);
4362        // Pause browsing so restarting will cause a new immediate query.
4363        // Unregistering will not work here, it will invalidate all the records.
4364        d.stop_browse(service).unwrap();
4365
4366        // Ensure the search is stopped.
4367        // Reduces the chance of receiving an answer adding the ptr back to the
4368        // cache causing the later browse to return directly from the cache.
4369        // (which invalidates what this test is trying to test for.)
4370        let mut stopped = false;
4371        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4372            match event {
4373                ServiceEvent::SearchStopped(_) => {
4374                    stopped = true;
4375                    println!("Stopped browsing service");
4376                    break;
4377                }
4378                // Other `ServiceResolved` messages may be received
4379                // here as they come from different interfaces.
4380                // That's fine for this test.
4381                e => {
4382                    println!("Received event {:?}", e);
4383                }
4384            }
4385        }
4386
4387        assert!(stopped);
4388
4389        // Invalidate the ptr from the service to the host.
4390        let invalidate_ptr_packet = DnsPointer::new(
4391            my_service.get_type(),
4392            RRType::PTR,
4393            CLASS_IN,
4394            0,
4395            my_service.get_fullname().to_string(),
4396        );
4397
4398        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4399        packet_buffer.add_additional_answer(invalidate_ptr_packet);
4400
4401        for intf in intfs {
4402            let sock = _new_socket_bind(&intf, true).unwrap();
4403            send_dns_outgoing_impl(
4404                &packet_buffer,
4405                &intf.name,
4406                intf.index.unwrap_or(0),
4407                &intf.addr,
4408                &sock.pktinfo,
4409            );
4410        }
4411
4412        println!(
4413            "Sent PTR record invalidation. Starting second browse for {}",
4414            service
4415        );
4416
4417        // Restart the browse to force the sender to re-send the announcements.
4418        let browse_chan = d.browse(service).unwrap();
4419
4420        resolved = false;
4421        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4422            match event {
4423                ServiceEvent::ServiceResolved(info) => {
4424                    resolved = true;
4425                    println!("Resolved a service of {}", &info.fullname);
4426                    break;
4427                }
4428                e => {
4429                    println!("Received event {:?}", e);
4430                }
4431            }
4432        }
4433
4434        assert!(resolved);
4435        d.shutdown().unwrap();
4436    }
4437
4438    #[test]
4439    fn test_expired_srv() {
4440        // construct service info
4441        let service_type = "_expired-srv._udp.local.";
4442        let instance = "test_instance";
4443        let host_name = "expired_srv_host.local.";
4444        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4445            .unwrap()
4446            .enable_addr_auto();
4447        // let fullname = my_service.get_fullname().to_string();
4448
4449        // set SRV to expire soon.
4450        let new_ttl = 3; // for testing only.
4451        my_service._set_host_ttl(new_ttl);
4452
4453        // register my service
4454        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4455        let result = mdns_server.register(my_service);
4456        assert!(result.is_ok());
4457
4458        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4459        let browse_chan = mdns_client.browse(service_type).unwrap();
4460        let timeout = Duration::from_secs(2);
4461        let mut resolved = false;
4462
4463        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4464            match event {
4465                ServiceEvent::ServiceResolved(info) => {
4466                    resolved = true;
4467                    println!("Resolved a service of {}", &info.fullname);
4468                    break;
4469                }
4470                _ => {}
4471            }
4472        }
4473
4474        assert!(resolved);
4475
4476        // Exit the server so that no more responses.
4477        mdns_server.shutdown().unwrap();
4478
4479        // SRV record in the client cache will expire.
4480        let expire_timeout = Duration::from_secs(new_ttl as u64);
4481        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4482            match event {
4483                ServiceEvent::ServiceRemoved(service_type, full_name) => {
4484                    println!("Service removed: {}: {}", &service_type, &full_name);
4485                    break;
4486                }
4487                _ => {}
4488            }
4489        }
4490    }
4491
4492    #[test]
4493    fn test_hostname_resolution_address_removed() {
4494        // Create a mDNS server
4495        let server = ServiceDaemon::new().expect("Failed to create server");
4496        let hostname = "addr_remove_host._tcp.local.";
4497        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4498            .iter()
4499            .find(|iface| iface.ip().is_ipv4())
4500            .map(|iface| iface.ip().into())
4501            .unwrap();
4502
4503        let mut my_service = ServiceInfo::new(
4504            "_host_res_test._tcp.local.",
4505            "my_instance",
4506            hostname,
4507            &service_ip_addr.to_ip_addr(),
4508            1234,
4509            None,
4510        )
4511        .expect("invalid service info");
4512
4513        // Set a short TTL for addresses for testing.
4514        let addr_ttl = 2;
4515        my_service._set_host_ttl(addr_ttl); // Expire soon
4516
4517        server.register(my_service).unwrap();
4518
4519        // Create a mDNS client for resolving the hostname.
4520        let client = ServiceDaemon::new().expect("Failed to create client");
4521        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4522        let resolved = loop {
4523            match event_receiver.recv() {
4524                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4525                    assert!(found_hostname == hostname);
4526                    assert!(addresses.contains(&service_ip_addr));
4527                    println!("address found: {:?}", &addresses);
4528                    break true;
4529                }
4530                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4531                Ok(_event) => {}
4532                Err(_) => break false,
4533            }
4534        };
4535
4536        assert!(resolved);
4537
4538        // Shutdown the server so no more responses / refreshes for addresses.
4539        server.shutdown().unwrap();
4540
4541        // Wait till hostname address record expires, with 1 second grace period.
4542        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4543        let removed = loop {
4544            match event_receiver.recv_timeout(timeout) {
4545                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4546                    assert!(removed_host == hostname);
4547                    assert!(addresses.contains(&service_ip_addr));
4548
4549                    println!(
4550                        "address removed: hostname: {} addresses: {:?}",
4551                        &hostname, &addresses
4552                    );
4553                    break true;
4554                }
4555                Ok(_event) => {}
4556                Err(_) => {
4557                    break false;
4558                }
4559            }
4560        };
4561
4562        assert!(removed);
4563
4564        client.shutdown().unwrap();
4565    }
4566
4567    #[test]
4568    fn test_refresh_ptr() {
4569        // construct service info
4570        let service_type = "_refresh-ptr._udp.local.";
4571        let instance = "test_instance";
4572        let host_name = "refresh_ptr_host.local.";
4573        let service_ip_addr = my_ip_interfaces(false)
4574            .iter()
4575            .find(|iface| iface.ip().is_ipv4())
4576            .map(|iface| iface.ip())
4577            .unwrap();
4578
4579        let mut my_service = ServiceInfo::new(
4580            service_type,
4581            instance,
4582            host_name,
4583            &service_ip_addr,
4584            5023,
4585            None,
4586        )
4587        .unwrap();
4588
4589        let new_ttl = 3; // for testing only.
4590        my_service._set_other_ttl(new_ttl);
4591
4592        // register my service
4593        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4594        let result = mdns_server.register(my_service);
4595        assert!(result.is_ok());
4596
4597        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4598        let browse_chan = mdns_client.browse(service_type).unwrap();
4599        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4600        let mut resolved = false;
4601
4602        // resolve the service first.
4603        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4604            match event {
4605                ServiceEvent::ServiceResolved(info) => {
4606                    resolved = true;
4607                    println!("Resolved a service of {}", &info.fullname);
4608                    break;
4609                }
4610                _ => {}
4611            }
4612        }
4613
4614        assert!(resolved);
4615
4616        // wait over 80% of TTL, and refresh PTR should be sent out.
4617        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4618        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4619            println!("event: {:?}", &event);
4620        }
4621
4622        // verify refresh counter.
4623        let metrics_chan = mdns_client.get_metrics().unwrap();
4624        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4625        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4626        assert_eq!(ptr_refresh_counter, 1);
4627        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4628        assert_eq!(srvtxt_refresh_counter, 1);
4629
4630        // Exit the server so that no more responses.
4631        mdns_server.shutdown().unwrap();
4632        mdns_client.shutdown().unwrap();
4633    }
4634
4635    #[test]
4636    fn test_name_change() {
4637        assert_eq!(name_change("foo.local."), "foo (2).local.");
4638        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4639        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4640        assert_eq!(name_change("foo"), "foo (2)");
4641        assert_eq!(name_change("foo (2)"), "foo (3)");
4642        assert_eq!(name_change(""), " (2)");
4643
4644        // Additional edge cases
4645        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4646        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4647        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4648    }
4649
4650    #[test]
4651    fn test_hostname_change() {
4652        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4653        assert_eq!(hostname_change("foo"), "foo-2");
4654        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4655        assert_eq!(hostname_change("foo-9"), "foo-10");
4656        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4657    }
4658
4659    #[test]
4660    fn test_add_answer_txt_ttl() {
4661        // construct a simple service info
4662        let service_type = "_test_add_answer._udp.local.";
4663        let instance = "test_instance";
4664        let host_name = "add_answer_host.local.";
4665        let service_intf = my_ip_interfaces(false)
4666            .into_iter()
4667            .find(|iface| iface.ip().is_ipv4())
4668            .unwrap();
4669        let service_ip_addr = service_intf.ip();
4670        let my_service = ServiceInfo::new(
4671            service_type,
4672            instance,
4673            host_name,
4674            &service_ip_addr,
4675            5023,
4676            None,
4677        )
4678        .unwrap();
4679
4680        // construct a DnsOutgoing message
4681        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4682
4683        // Construct a dummy DnsIncoming message
4684        let mut dummy_data = out.to_data_on_wire();
4685        let interface_id = InterfaceId::from(&service_intf);
4686        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4687
4688        // Add an answer of TXT type for the service.
4689        let if_addrs = vec![service_intf.ip()];
4690        add_answer_of_service(
4691            &mut out,
4692            &incoming,
4693            instance,
4694            &my_service,
4695            RRType::TXT,
4696            if_addrs,
4697        );
4698
4699        // Check if the answer was added correctly
4700        assert!(
4701            out.answers_count() > 0,
4702            "No answers added to the outgoing message"
4703        );
4704
4705        // Check if the first answer is of type TXT
4706        let answer = out._answers().first().unwrap();
4707        assert_eq!(answer.0.get_type(), RRType::TXT);
4708
4709        // Check TTL is set properly for the TXT record
4710        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4711    }
4712
4713    #[test]
4714    fn test_interface_flip() {
4715        // start a server
4716        let ty_domain = "_intf-flip._udp.local.";
4717        let host_name = "intf_flip.local.";
4718        let now = SystemTime::now()
4719            .duration_since(SystemTime::UNIX_EPOCH)
4720            .unwrap();
4721        let instance_name = now.as_micros().to_string(); // Create a unique name.
4722        let port = 5200;
4723
4724        // Get a single IPv4 address
4725        let (ip_addr1, intf_name) = my_ip_interfaces(false)
4726            .iter()
4727            .find(|iface| iface.ip().is_ipv4())
4728            .map(|iface| (iface.ip(), iface.name.clone()))
4729            .unwrap();
4730
4731        println!("Using interface {} with IP {}", intf_name, ip_addr1);
4732
4733        // Register the service.
4734        let service1 =
4735            ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
4736                .expect("valid service info");
4737        let server1 = ServiceDaemon::new().expect("failed to start server");
4738        server1
4739            .register(service1)
4740            .expect("Failed to register service1");
4741
4742        // wait for the service announced.
4743        std::thread::sleep(Duration::from_secs(2));
4744
4745        // start a client
4746        let client = ServiceDaemon::new().expect("failed to start client");
4747
4748        let receiver = client.browse(ty_domain).unwrap();
4749
4750        let timeout = Duration::from_secs(3);
4751        let mut got_data = false;
4752
4753        while let Ok(event) = receiver.recv_timeout(timeout) {
4754            match event {
4755                ServiceEvent::ServiceResolved(_) => {
4756                    println!("Received ServiceResolved event");
4757                    got_data = true;
4758                    break;
4759                }
4760                _ => {}
4761            }
4762        }
4763
4764        assert!(got_data, "Should receive ServiceResolved event");
4765
4766        // Set a short IP check interval to detect interface changes quickly.
4767        client.set_ip_check_interval(1).unwrap();
4768
4769        // Now shutdown the interface and expect the client to lose the service.
4770        println!("Shutting down interface {}", &intf_name);
4771        client.test_down_interface(&intf_name).unwrap();
4772
4773        let mut got_removed = false;
4774
4775        while let Ok(event) = receiver.recv_timeout(timeout) {
4776            match event {
4777                ServiceEvent::ServiceRemoved(ty_domain, instance) => {
4778                    got_removed = true;
4779                    println!("removed: {ty_domain} : {instance}");
4780                    break;
4781                }
4782                _ => {}
4783            }
4784        }
4785        assert!(got_removed, "Should receive ServiceRemoved event");
4786
4787        println!("Bringing up interface {}", &intf_name);
4788        client.test_up_interface(&intf_name).unwrap();
4789        let mut got_data = false;
4790        while let Ok(event) = receiver.recv_timeout(timeout) {
4791            match event {
4792                ServiceEvent::ServiceResolved(resolved) => {
4793                    got_data = true;
4794                    println!("Received ServiceResolved: {:?}", resolved);
4795                    break;
4796                }
4797                _ => {}
4798            }
4799        }
4800        assert!(
4801            got_data,
4802            "Should receive ServiceResolved event after interface is back up"
4803        );
4804
4805        server1.shutdown().unwrap();
4806        client.shutdown().unwrap();
4807    }
4808
4809    #[test]
4810    fn test_cache_only() {
4811        // construct service info
4812        let service_type = "_cache_only._udp.local.";
4813        let instance = "test_instance";
4814        let host_name = "cache_only_host.local.";
4815        let service_ip_addr = my_ip_interfaces(false)
4816            .iter()
4817            .find(|iface| iface.ip().is_ipv4())
4818            .map(|iface| iface.ip())
4819            .unwrap();
4820
4821        let mut my_service = ServiceInfo::new(
4822            service_type,
4823            instance,
4824            host_name,
4825            &service_ip_addr,
4826            5023,
4827            None,
4828        )
4829        .unwrap();
4830
4831        let new_ttl = 3; // for testing only.
4832        my_service._set_other_ttl(new_ttl);
4833
4834        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4835
4836        // make a single browse request to record that we are interested in the service.  This ensures that
4837        // subsequent announcements are cached.
4838        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4839        std::thread::sleep(Duration::from_secs(2));
4840
4841        // register my service
4842        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4843        let result = mdns_server.register(my_service);
4844        assert!(result.is_ok());
4845
4846        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4847        let mut resolved = false;
4848
4849        // resolve the service.
4850        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4851            match event {
4852                ServiceEvent::ServiceResolved(info) => {
4853                    resolved = true;
4854                    println!("Resolved a service of {}", &info.get_fullname());
4855                    break;
4856                }
4857                _ => {}
4858            }
4859        }
4860
4861        assert!(resolved);
4862
4863        // Exit the server so that no more responses.
4864        mdns_server.shutdown().unwrap();
4865        mdns_client.shutdown().unwrap();
4866    }
4867
4868    #[test]
4869    fn test_cache_only_unsolicited() {
4870        // construct service info
4871        let service_type = "_cache_only._udp.local.";
4872        let instance = "test_instance";
4873        let host_name = "cache_only_host.local.";
4874        let service_ip_addr = my_ip_interfaces(false)
4875            .iter()
4876            .find(|iface| iface.ip().is_ipv4())
4877            .map(|iface| iface.ip())
4878            .unwrap();
4879
4880        let mut my_service = ServiceInfo::new(
4881            service_type,
4882            instance,
4883            host_name,
4884            &service_ip_addr,
4885            5023,
4886            None,
4887        )
4888        .unwrap();
4889
4890        let new_ttl = 3; // for testing only.
4891        my_service._set_other_ttl(new_ttl);
4892
4893        // register my service
4894        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4895        let result = mdns_server.register(my_service);
4896        assert!(result.is_ok());
4897
4898        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4899        mdns_client.accept_unsolicited(true).unwrap();
4900
4901        // Wait a bit for the service announcements to go out, before calling browse_cache.  This ensures
4902        // that the announcements are treated as unsolicited
4903        std::thread::sleep(Duration::from_secs(2));
4904        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4905        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4906        let mut resolved = false;
4907
4908        // resolve the service.
4909        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4910            match event {
4911                ServiceEvent::ServiceResolved(info) => {
4912                    resolved = true;
4913                    println!("Resolved a service of {}", &info.get_fullname());
4914                    break;
4915                }
4916                _ => {}
4917            }
4918        }
4919
4920        assert!(resolved);
4921
4922        // Exit the server so that no more responses.
4923        mdns_server.shutdown().unwrap();
4924        mdns_client.shutdown().unwrap();
4925    }
4926}