mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42    Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50    cmp::{self, Reverse},
51    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52    fmt, io,
53    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54    str, thread,
55    time::Duration,
56    vec,
57};
58
59/// The default max length of the service name without domain, not including the
60/// leading underscore (`_`). It is set to 15 per
61/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
62pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64/// The default interval for checking IP changes automatically.
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
68/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
69pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78/// Response status code for the service `unregister` call.
79#[derive(Debug)]
80pub enum UnregisterStatus {
81    /// Unregister was successful.
82    OK,
83    /// The service was not found in the registration.
84    NotFound,
85}
86
87/// Status code for the service daemon.
88#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91    /// The daemon is running as normal.
92    Running,
93
94    /// The daemon has been shutdown.
95    Shutdown,
96}
97
98/// Different counters included in the metrics.
99/// Currently all counters are for outgoing packets.
100#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102    Register,
103    RegisterResend,
104    Unregister,
105    UnregisterResend,
106    Browse,
107    ResolveHostname,
108    Respond,
109    CacheRefreshPTR,
110    CacheRefreshSrvTxt,
111    CacheRefreshAddr,
112    KnownAnswerSuppression,
113    CachedPTR,
114    CachedSRV,
115    CachedAddr,
116    CachedTxt,
117    CachedNSec,
118    CachedSubtype,
119    DnsRegistryProbe,
120    DnsRegistryActive,
121    DnsRegistryTimer,
122    DnsRegistryNameChange,
123    Timer,
124}
125
126impl fmt::Display for Counter {
127    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128        match self {
129            Self::Register => write!(f, "register"),
130            Self::RegisterResend => write!(f, "register-resend"),
131            Self::Unregister => write!(f, "unregister"),
132            Self::UnregisterResend => write!(f, "unregister-resend"),
133            Self::Browse => write!(f, "browse"),
134            Self::ResolveHostname => write!(f, "resolve-hostname"),
135            Self::Respond => write!(f, "respond"),
136            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140            Self::CachedPTR => write!(f, "cached-ptr"),
141            Self::CachedSRV => write!(f, "cached-srv"),
142            Self::CachedAddr => write!(f, "cached-addr"),
143            Self::CachedTxt => write!(f, "cached-txt"),
144            Self::CachedNSec => write!(f, "cached-nsec"),
145            Self::CachedSubtype => write!(f, "cached-subtype"),
146            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150            Self::Timer => write!(f, "timer"),
151        }
152    }
153}
154
155/// A wrapper around UDP socket used by the mDNS daemon.
156///
157/// We do this because `mio` does not support PKTINFO and
158/// does not provide a way to implement `Source` trait directly and safely.
159struct MyUdpSocket {
160    /// The underlying socket that supports control messages like
161    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
162    pktinfo: PktInfoUdpSocket,
163
164    /// The mio UDP socket that is a clone of `pktinfo` and
165    /// is used for event polling.
166    mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171        let std_sock = pktinfo.try_clone_std()?;
172        let mio = MioUdpSocket::from_std(std_sock);
173
174        Ok(Self { pktinfo, mio })
175    }
176}
177
178/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
179impl Source for MyUdpSocket {
180    fn register(
181        &mut self,
182        registry: &Registry,
183        token: Token,
184        interests: Interest,
185    ) -> io::Result<()> {
186        self.mio.register(registry, token, interests)
187    }
188
189    fn reregister(
190        &mut self,
191        registry: &Registry,
192        token: Token,
193        interests: Interest,
194    ) -> io::Result<()> {
195        self.mio.reregister(registry, token, interests)
196    }
197
198    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199        self.mio.deregister(registry)
200    }
201}
202
203/// The metrics is a HashMap of (name_key, i64_value).
204/// The main purpose is to help monitoring the mDNS packet traffic.
205pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
208const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
209const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
210
211/// A daemon thread for mDNS
212///
213/// This struct provides a handle and an API to the daemon. It is cloneable.
214#[derive(Clone)]
215pub struct ServiceDaemon {
216    /// Sender handle of the channel to the daemon.
217    sender: Sender<Command>,
218
219    /// Send to this addr to signal that a `Command` is coming.
220    ///
221    /// The daemon listens on this addr together with other mDNS sockets,
222    /// to avoid busy polling the flume channel. If there is a way to poll
223    /// the channel and mDNS sockets together, then this can be removed.
224    signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228    /// Creates a new daemon and spawns a thread to run the daemon.
229    ///
230    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
231    /// ask callers to set the port.
232    pub fn new() -> Result<Self> {
233        // Use port 0 to allow the system assign a random available port,
234        // no need for a pre-defined port number.
235        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237        let signal_sock = UdpSocket::bind(signal_addr)
238            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240        // Get the socket with the OS chosen port
241        let signal_addr = signal_sock
242            .local_addr()
243            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245        // Must be nonblocking so we can listen to it together with mDNS sockets.
246        signal_sock
247            .set_nonblocking(true)
248            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252        let (sender, receiver) = bounded(100);
253
254        // Spawn the daemon thread
255        let mio_sock = MioUdpSocket::from_std(signal_sock);
256        thread::Builder::new()
257            .name("mDNS_daemon".to_string())
258            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261        Ok(Self {
262            sender,
263            signal_addr,
264        })
265    }
266
267    /// Sends `cmd` to the daemon via its channel, and sends a signal
268    /// to its sock addr to notify.
269    fn send_cmd(&self, cmd: Command) -> Result<()> {
270        let cmd_name = cmd.to_string();
271
272        // First, send to the flume channel.
273        self.sender.try_send(cmd).map_err(|e| match e {
274            TrySendError::Full(_) => Error::Again,
275            e => e_fmt!("flume::channel::send failed: {}", e),
276        })?;
277
278        // Second, send a signal to notify the daemon.
279        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280        let socket = UdpSocket::bind(addr)
281            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282        socket
283            .send_to(cmd_name.as_bytes(), self.signal_addr)
284            .map_err(|e| {
285                e_fmt!(
286                    "signal socket send_to {} ({}) failed: {}",
287                    self.signal_addr,
288                    cmd_name,
289                    e
290                )
291            })?;
292
293        Ok(())
294    }
295
296    /// Starts browsing for a specific service type.
297    ///
298    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
299    ///
300    /// Returns a channel `Receiver` to receive events about the service. The caller
301    /// can call `.recv_async().await` on this receiver to handle events in an
302    /// async environment or call `.recv()` in a sync environment.
303    ///
304    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
305    /// finding more details, i.e. SRV records and TXT records.
306    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307        check_domain_suffix(service_type)?;
308
309        let (resp_s, resp_r) = bounded(10);
310        self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
311        Ok(resp_r)
312    }
313
314    /// Preforms a "cache-only" browse.
315    ///
316    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
317    ///
318    /// The functionality is identical to 'browse', but the service events are based solely on the contents
319    /// of the daemon's cache. No actual mDNS query is sent to the network.
320    ///
321    /// See [accept_unsolicited](Self::accept_unsolicited) if you want to do cache-only browsing.
322    pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
323        check_domain_suffix(service_type)?;
324
325        let (resp_s, resp_r) = bounded(10);
326        self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
327        Ok(resp_r)
328    }
329
330    /// Stops searching for a specific service type.
331    ///
332    /// When an error is returned, the caller should retry only when
333    /// the error is `Error::Again`, otherwise should log and move on.
334    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
335        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
336    }
337
338    /// Starts querying for the ip addresses of a hostname.
339    ///
340    /// Returns a channel `Receiver` to receive events about the hostname.
341    /// The caller can call `.recv_async().await` on this receiver to handle events in an
342    /// async environment or call `.recv()` in a sync environment.
343    ///
344    /// The `timeout` is specified in milliseconds.
345    pub fn resolve_hostname(
346        &self,
347        hostname: &str,
348        timeout: Option<u64>,
349    ) -> Result<Receiver<HostnameResolutionEvent>> {
350        check_hostname(hostname)?;
351        let (resp_s, resp_r) = bounded(10);
352        self.send_cmd(Command::ResolveHostname(
353            hostname.to_string(),
354            1,
355            resp_s,
356            timeout,
357        ))?;
358        Ok(resp_r)
359    }
360
361    /// Stops querying for the ip addresses of a hostname.
362    ///
363    /// When an error is returned, the caller should retry only when
364    /// the error is `Error::Again`, otherwise should log and move on.
365    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
366        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
367    }
368
369    /// Registers a service provided by this host.
370    ///
371    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
372    /// this method will automatically fill in addresses from the host.
373    ///
374    /// To re-announce a service with an updated `service_info`, just call
375    /// this `register` function again. No need to call `unregister` first.
376    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
377        check_service_name(service_info.get_fullname())?;
378        check_hostname(service_info.get_hostname())?;
379
380        self.send_cmd(Command::Register(service_info.into()))
381    }
382
383    /// Unregisters a service. This is a graceful shutdown of a service.
384    ///
385    /// Returns a channel receiver that is used to receive the status code
386    /// of the unregister.
387    ///
388    /// When an error is returned, the caller should retry only when
389    /// the error is `Error::Again`, otherwise should log and move on.
390    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
391        let (resp_s, resp_r) = bounded(1);
392        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
393        Ok(resp_r)
394    }
395
396    /// Starts to monitor events from the daemon.
397    ///
398    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
399    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
400        let (resp_s, resp_r) = bounded(100);
401        self.send_cmd(Command::Monitor(resp_s))?;
402        Ok(resp_r)
403    }
404
405    /// Shuts down the daemon thread and returns a channel to receive the status.
406    ///
407    /// When an error is returned, the caller should retry only when
408    /// the error is `Error::Again`, otherwise should log and move on.
409    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
410        let (resp_s, resp_r) = bounded(1);
411        self.send_cmd(Command::Exit(resp_s))?;
412        Ok(resp_r)
413    }
414
415    /// Returns the status of the daemon.
416    ///
417    /// When an error is returned, the caller should retry only when
418    /// the error is `Error::Again`, otherwise should consider the daemon
419    /// stopped working and move on.
420    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
421        let (resp_s, resp_r) = bounded(1);
422
423        if self.sender.is_disconnected() {
424            resp_s
425                .send(DaemonStatus::Shutdown)
426                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
427        } else {
428            self.send_cmd(Command::GetStatus(resp_s))?;
429        }
430
431        Ok(resp_r)
432    }
433
434    /// Returns a channel receiver for the metrics, e.g. input/output counters.
435    ///
436    /// The metrics returned is a snapshot. Hence the caller should call
437    /// this method repeatedly if they want to monitor the metrics continuously.
438    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
439        let (resp_s, resp_r) = bounded(1);
440        self.send_cmd(Command::GetMetrics(resp_s))?;
441        Ok(resp_r)
442    }
443
444    /// Change the max length allowed for a service name.
445    ///
446    /// As RFC 6763 defines a length max for a service name, a user should not call
447    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
448    ///
449    /// `len_max` is capped at an internal limit, which is currently 30.
450    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
451        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
452
453        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
454            return Err(Error::Msg(format!(
455                "service name length max {len_max} is too large"
456            )));
457        }
458
459        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
460    }
461
462    /// Change the interval for checking IP changes automatically.
463    ///
464    /// Setting the interval to 0 disables the IP check.
465    ///
466    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
467    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
468        let interval_in_millis = interval_in_secs as u64 * 1000;
469        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
470            interval_in_millis,
471        )))
472    }
473
474    /// Get the current interval in seconds for checking IP changes automatically.
475    pub fn get_ip_check_interval(&self) -> Result<u32> {
476        let (resp_s, resp_r) = bounded(1);
477        self.send_cmd(Command::GetOption(resp_s))?;
478
479        let option = resp_r
480            .recv_timeout(Duration::from_secs(10))
481            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
482        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
483        Ok(ip_check_interval_in_secs as u32)
484    }
485
486    /// Include interfaces that match `if_kind` for this service daemon.
487    ///
488    /// For example:
489    /// ```ignore
490    ///     daemon.enable_interface("en0")?;
491    /// ```
492    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
493        let if_kind_vec = if_kind.into_vec();
494        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
495            if_kind_vec.kinds,
496        )))
497    }
498
499    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
500    ///
501    /// For example:
502    /// ```ignore
503    ///     daemon.disable_interface(IfKind::IPv6)?;
504    /// ```
505    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
506        let if_kind_vec = if_kind.into_vec();
507        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
508            if_kind_vec.kinds,
509        )))
510    }
511
512    /// If `accept` is true, accept and cache all responses, even if there is no active querier
513    /// for a given service type. This is useful / necessary when doing cache-only browsing. See
514    /// [browse_cache](Self::browse_cache).
515    ///
516    /// If `accept` is false (default), accept only responses matching queries that we have initiated.
517    ///
518    /// For example:
519    /// ```ignore
520    ///     daemon.accept_unsolicited(true)?;
521    /// ```
522    pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
523        self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
524    }
525
526    #[cfg(test)]
527    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
528        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
529            ifname.to_string(),
530        )))
531    }
532
533    #[cfg(test)]
534    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
535        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
536            ifname.to_string(),
537        )))
538    }
539
540    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
541    ///
542    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
543    /// receive announcements from a responder on the same host.
544    ///
545    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
546    ///
547    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
548    /// the UNIX version of the IP_MULTICAST_LOOP option:
549    ///
550    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
551    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
552    ///
553    /// Which means, in order NOT to receive localhost announcements, you want to call
554    /// this API on the querier side on Windows, but on the responder side on Unix.
555    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
556        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
557    }
558
559    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
560    ///
561    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
562    /// receive announcements from a responder on the same host.
563    ///
564    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
565    ///
566    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
567    /// the UNIX version of the IP_MULTICAST_LOOP option:
568    ///
569    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
570    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
571    ///
572    /// Which means, in order NOT to receive localhost announcements, you want to call
573    /// this API on the querier side on Windows, but on the responder side on Unix.
574    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
575        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
576    }
577
578    /// Proactively confirms whether a service instance still valid.
579    ///
580    /// This call will issue queries for a service instance's SRV record and Address records.
581    ///
582    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
583    /// unless there is a reason not to follow RFC.
584    ///
585    /// If no response is received within `timeout`, the current resource
586    /// records will be flushed, and if needed, `ServiceRemoved` event will be
587    /// sent to active queriers.
588    ///
589    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
590    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
591        self.send_cmd(Command::Verify(instance_fullname, timeout))
592    }
593
594    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
595        let mut zc = Zeroconf::new(signal_sock, poller);
596
597        if let Some(cmd) = zc.run(receiver) {
598            match cmd {
599                Command::Exit(resp_s) => {
600                    // It is guaranteed that the receiver already dropped,
601                    // i.e. the daemon command channel closed.
602                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
603                        debug!("exit: failed to send response of shutdown: {}", e);
604                    }
605                }
606                _ => {
607                    debug!("Unexpected command: {:?}", cmd);
608                }
609            }
610        }
611    }
612}
613
614/// Creates a new UDP socket that uses `intf` to send and recv multicast.
615fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
616    // Use the same socket for receiving and sending multicast packets.
617    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
618    let intf_ip = &intf.ip();
619    match intf_ip {
620        IpAddr::V4(ip) => {
621            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
622            let sock = new_socket(addr.into(), true)?;
623
624            // Join mDNS group to receive packets.
625            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
626                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
627
628            // Set IP_MULTICAST_IF to send packets.
629            sock.set_multicast_if_v4(ip)
630                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
631
632            // Per RFC 6762 section 11:
633            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
634            // be sent with IP TTL set to 255."
635            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
636            sock.set_multicast_ttl_v4(255)
637                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
638
639            if !should_loop {
640                sock.set_multicast_loop_v4(false)
641                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
642            }
643
644            // Test if we can send packets successfully.
645            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
646            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
647            for packet in test_packets {
648                sock.send_to(&packet, &multicast_addr)
649                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
650            }
651            MyUdpSocket::new(sock)
652                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
653        }
654        IpAddr::V6(ip) => {
655            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
656            let sock = new_socket(addr.into(), true)?;
657
658            let if_index = intf.index.unwrap_or(0);
659
660            // Join mDNS group to receive packets.
661            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
662                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
663
664            // Set IPV6_MULTICAST_IF to send packets.
665            sock.set_multicast_if_v6(if_index)
666                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
667
668            // We are not sending multicast packets to test this socket as there might
669            // be many IPv6 interfaces on a host and could cause such send error:
670            // "No buffer space available (os error 55)".
671
672            MyUdpSocket::new(sock)
673                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
674        }
675    }
676}
677
678/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
679/// `non_block` indicates whether to set O_NONBLOCK for the socket.
680fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
681    let domain = match addr {
682        SocketAddr::V4(_) => socket2::Domain::IPV4,
683        SocketAddr::V6(_) => socket2::Domain::IPV6,
684    };
685
686    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
687
688    fd.set_reuse_address(true)
689        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
690    #[cfg(unix)] // this is currently restricted to Unix's in socket2
691    fd.set_reuse_port(true)
692        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
693
694    if non_block {
695        fd.set_nonblocking(true)
696            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
697    }
698
699    fd.bind(&addr.into())
700        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
701
702    trace!("new socket bind to {}", &addr);
703    Ok(fd)
704}
705
706/// Specify a UNIX timestamp in millis to run `command` for the next time.
707struct ReRun {
708    /// UNIX timestamp in millis.
709    next_time: u64,
710    command: Command,
711}
712
713/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
714///
715/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
716#[derive(Debug, Clone)]
717#[non_exhaustive]
718pub enum IfKind {
719    /// All interfaces.
720    All,
721
722    /// All IPv4 interfaces.
723    IPv4,
724
725    /// All IPv6 interfaces.
726    IPv6,
727
728    /// By the interface name, for example "en0"
729    Name(String),
730
731    /// By an IPv4 or IPv6 address.
732    Addr(IpAddr),
733
734    /// 127.0.0.1 (or anything in 127.0.0.0/8), enabled by default.
735    ///
736    /// Loopback interfaces are required by some use cases (e.g., OSCQuery) for publishing.
737    LoopbackV4,
738
739    /// ::1/128, enabled by default.
740    LoopbackV6,
741}
742
743impl IfKind {
744    /// Checks if `intf` matches with this interface kind.
745    fn matches(&self, intf: &Interface) -> bool {
746        match self {
747            Self::All => true,
748            Self::IPv4 => intf.ip().is_ipv4(),
749            Self::IPv6 => intf.ip().is_ipv6(),
750            Self::Name(ifname) => ifname == &intf.name,
751            Self::Addr(addr) => addr == &intf.ip(),
752            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
753            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
754        }
755    }
756}
757
758/// The first use case of specifying an interface was to
759/// use an interface name. Hence adding this for ergonomic reasons.
760impl From<&str> for IfKind {
761    fn from(val: &str) -> Self {
762        Self::Name(val.to_string())
763    }
764}
765
766impl From<&String> for IfKind {
767    fn from(val: &String) -> Self {
768        Self::Name(val.to_string())
769    }
770}
771
772/// Still for ergonomic reasons.
773impl From<IpAddr> for IfKind {
774    fn from(val: IpAddr) -> Self {
775        Self::Addr(val)
776    }
777}
778
779/// A list of `IfKind` that can be used to match interfaces.
780pub struct IfKindVec {
781    kinds: Vec<IfKind>,
782}
783
784/// A trait that converts a type into a Vec of `IfKind`.
785pub trait IntoIfKindVec {
786    fn into_vec(self) -> IfKindVec;
787}
788
789impl<T: Into<IfKind>> IntoIfKindVec for T {
790    fn into_vec(self) -> IfKindVec {
791        let if_kind: IfKind = self.into();
792        IfKindVec {
793            kinds: vec![if_kind],
794        }
795    }
796}
797
798impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
799    fn into_vec(self) -> IfKindVec {
800        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
801        IfKindVec { kinds }
802    }
803}
804
805/// Selection of interfaces.
806struct IfSelection {
807    /// The interfaces to be selected.
808    if_kind: IfKind,
809
810    /// Whether the `if_kind` should be enabled or not.
811    selected: bool,
812}
813
814/// A struct holding the state. It was inspired by `zeroconf` package in Python.
815struct Zeroconf {
816    /// Local interfaces keyed by interface index.
817    my_intfs: HashMap<u32, MyIntf>,
818
819    /// A common socket for IPv4 interfaces. It's None if IPv4 is disabled in OS kernel.
820    ipv4_sock: Option<MyUdpSocket>,
821
822    /// A common socket for IPv6 interfaces. It's None if IPv6 is disabled in OS kernel.
823    ipv6_sock: Option<MyUdpSocket>,
824
825    /// Local registered services, keyed by service full names.
826    my_services: HashMap<String, ServiceInfo>,
827
828    /// Received DNS records.
829    cache: DnsCache,
830
831    /// Registered service records, keyed by interface index.
832    dns_registry_map: HashMap<u32, DnsRegistry>,
833
834    /// Active "Browse" commands.
835    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
836
837    /// Active "ResolveHostname" commands.
838    ///
839    /// The timestamps are set at the future timestamp when the command should timeout.
840    /// `hostname` is case-insensitive and stored in lowercase.
841    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
842
843    /// All repeating transmissions.
844    retransmissions: Vec<ReRun>,
845
846    counters: Metrics,
847
848    /// Waits for incoming packets.
849    poller: Poll,
850
851    /// Channels to notify events.
852    monitors: Vec<Sender<DaemonEvent>>,
853
854    /// Options
855    service_name_len_max: u8,
856
857    /// Interval in millis to check IP address changes.
858    ip_check_interval: u64,
859
860    /// All interface selections called to the daemon.
861    if_selections: Vec<IfSelection>,
862
863    /// Socket for signaling.
864    signal_sock: MioUdpSocket,
865
866    /// Timestamps marking where we need another iteration of the run loop,
867    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
868    ///
869    /// When the run loop goes through a single iteration, it will
870    /// set its timeout to the earliest timer in this list.
871    timers: BinaryHeap<Reverse<u64>>,
872
873    status: DaemonStatus,
874
875    /// Service instances that are pending for resolving SRV and TXT.
876    pending_resolves: HashSet<String>,
877
878    /// Service instances that are already resolved.
879    resolved: HashSet<String>,
880
881    multicast_loop_v4: bool,
882
883    multicast_loop_v6: bool,
884
885    accept_unsolicited: bool,
886
887    #[cfg(test)]
888    test_down_interfaces: HashSet<String>,
889}
890
891/// Join the multicast group for the given interface.
892fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
893    let intf_ip = &intf.ip();
894    match intf_ip {
895        IpAddr::V4(ip) => {
896            // Join mDNS group to receive packets.
897            debug!("join multicast group V4 on {} addr {ip}", intf.name);
898            my_sock
899                .join_multicast_v4(&GROUP_ADDR_V4, ip)
900                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
901        }
902        IpAddr::V6(ip) => {
903            let if_index = intf.index.unwrap_or(0);
904            // Join mDNS group to receive packets.
905            debug!(
906                "join multicast group V6 on {} addr {ip} with index {if_index}",
907                intf.name
908            );
909            my_sock
910                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
911                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
912        }
913    }
914    Ok(())
915}
916
917impl Zeroconf {
918    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
919        // Get interfaces.
920        let my_ifaddrs = my_ip_interfaces(true);
921
922        // Create a socket for every IP addr.
923        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
924        // or the same interface name with different IP addrs.
925        let mut my_intfs = HashMap::new();
926        let mut dns_registry_map = HashMap::new();
927
928        // Use the same socket for receiving and sending multicast packets.
929        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
930        let mut ipv4_sock = None;
931        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
932        match new_socket(addr.into(), true) {
933            Ok(sock) => {
934                // Per RFC 6762 section 11:
935                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
936                // be sent with IP TTL set to 255."
937                // Here we set the TTL to 255 for multicast as we don't support unicast yet.
938                sock.set_multicast_ttl_v4(255)
939                    .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
940                    .ok();
941
942                // This clones a socket.
943                ipv4_sock = match MyUdpSocket::new(sock) {
944                    Ok(s) => Some(s),
945                    Err(e) => {
946                        debug!("failed to create IPv4 MyUdpSocket: {e}");
947                        None
948                    }
949                };
950            }
951            // Per RFC 6762 section 11:}
952            Err(e) => debug!("failed to create IPv4 socket: {e}"),
953        }
954
955        let mut ipv6_sock = None;
956        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
957        match new_socket(addr.into(), true) {
958            Ok(sock) => {
959                // Per RFC 6762 section 11:
960                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
961                // be sent with IP TTL set to 255."
962                sock.set_multicast_hops_v6(255)
963                    .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
964                    .ok();
965
966                // This clones the ipv6 socket.
967                ipv6_sock = match MyUdpSocket::new(sock) {
968                    Ok(s) => Some(s),
969                    Err(e) => {
970                        debug!("failed to create IPv6 MyUdpSocket: {e}");
971                        None
972                    }
973                };
974            }
975            Err(e) => debug!("failed to create IPv6 socket: {e}"),
976        }
977
978        // Configure sockets to join multicast groups.
979        for intf in my_ifaddrs {
980            let sock_opt = if intf.ip().is_ipv4() {
981                &ipv4_sock
982            } else {
983                &ipv6_sock
984            };
985            let Some(sock) = sock_opt else {
986                debug!(
987                    "no socket available for interface {} with addr {}. Skipped.",
988                    intf.name,
989                    intf.ip()
990                );
991                continue;
992            };
993
994            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
995                debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
996            }
997
998            let if_index = intf.index.unwrap_or(0);
999
1000            // Add this interface address if not already present.
1001            dns_registry_map
1002                .entry(if_index)
1003                .or_insert_with(DnsRegistry::new);
1004
1005            my_intfs
1006                .entry(if_index)
1007                .and_modify(|v: &mut MyIntf| {
1008                    v.addrs.insert(intf.addr.clone());
1009                })
1010                .or_insert(MyIntf {
1011                    name: intf.name.clone(),
1012                    index: if_index,
1013                    addrs: HashSet::from([intf.addr]),
1014                });
1015        }
1016
1017        let monitors = Vec::new();
1018        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1019        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1020
1021        let timers = BinaryHeap::new();
1022
1023        // Enable everything, including loopback interfaces.
1024        let if_selections = vec![];
1025
1026        let status = DaemonStatus::Running;
1027
1028        Self {
1029            my_intfs,
1030            ipv4_sock,
1031            ipv6_sock,
1032            my_services: HashMap::new(),
1033            cache: DnsCache::new(),
1034            dns_registry_map,
1035            hostname_resolvers: HashMap::new(),
1036            service_queriers: HashMap::new(),
1037            retransmissions: Vec::new(),
1038            counters: HashMap::new(),
1039            poller,
1040            monitors,
1041            service_name_len_max,
1042            ip_check_interval,
1043            if_selections,
1044            signal_sock,
1045            timers,
1046            status,
1047            pending_resolves: HashSet::new(),
1048            resolved: HashSet::new(),
1049            multicast_loop_v4: true,
1050            multicast_loop_v6: true,
1051            accept_unsolicited: false,
1052
1053            #[cfg(test)]
1054            test_down_interfaces: HashSet::new(),
1055        }
1056    }
1057
1058    /// The main event loop of the daemon thread
1059    ///
1060    /// In each round, it will:
1061    /// 1. select the listening sockets with a timeout.
1062    /// 2. process the incoming packets if any.
1063    /// 3. try_recv on its channel and execute commands.
1064    /// 4. announce its registered services.
1065    /// 5. process retransmissions if any.
1066    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1067        // Add the daemon's signal socket to the poller.
1068        if let Err(e) = self.poller.registry().register(
1069            &mut self.signal_sock,
1070            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1071            mio::Interest::READABLE,
1072        ) {
1073            debug!("failed to add signal socket to the poller: {}", e);
1074            return None;
1075        }
1076
1077        if let Some(sock) = self.ipv4_sock.as_mut() {
1078            if let Err(e) = self.poller.registry().register(
1079                sock,
1080                mio::Token(IPV4_SOCK_EVENT_KEY),
1081                mio::Interest::READABLE,
1082            ) {
1083                debug!("failed to register ipv4 socket: {}", e);
1084                return None;
1085            }
1086        }
1087
1088        if let Some(sock) = self.ipv6_sock.as_mut() {
1089            if let Err(e) = self.poller.registry().register(
1090                sock,
1091                mio::Token(IPV6_SOCK_EVENT_KEY),
1092                mio::Interest::READABLE,
1093            ) {
1094                debug!("failed to register ipv6 socket: {}", e);
1095                return None;
1096            }
1097        }
1098
1099        // Setup timer for IP checks.
1100        let mut next_ip_check = if self.ip_check_interval > 0 {
1101            current_time_millis() + self.ip_check_interval
1102        } else {
1103            0
1104        };
1105
1106        if next_ip_check > 0 {
1107            self.add_timer(next_ip_check);
1108        }
1109
1110        // Start the run loop.
1111
1112        let mut events = mio::Events::with_capacity(1024);
1113        loop {
1114            let now = current_time_millis();
1115
1116            let earliest_timer = self.peek_earliest_timer();
1117            let timeout = earliest_timer.map(|timer| {
1118                // If `timer` already passed, set `timeout` to be 1ms.
1119                let millis = if timer > now { timer - now } else { 1 };
1120                Duration::from_millis(millis)
1121            });
1122
1123            // Process incoming packets, command events and optional timeout.
1124            events.clear();
1125            match self.poller.poll(&mut events, timeout) {
1126                Ok(_) => self.handle_poller_events(&events),
1127                Err(e) => debug!("failed to select from sockets: {}", e),
1128            }
1129
1130            let now = current_time_millis();
1131
1132            // Remove the timers if already passed.
1133            self.pop_timers_till(now);
1134
1135            // Remove hostname resolvers with expired timeouts.
1136            for hostname in self
1137                .hostname_resolvers
1138                .clone()
1139                .into_iter()
1140                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1141                .map(|(hostname, _)| hostname)
1142            {
1143                trace!("hostname resolver timeout for {}", &hostname);
1144                call_hostname_resolution_listener(
1145                    &self.hostname_resolvers,
1146                    &hostname,
1147                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1148                );
1149                call_hostname_resolution_listener(
1150                    &self.hostname_resolvers,
1151                    &hostname,
1152                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1153                );
1154                self.hostname_resolvers.remove(&hostname);
1155            }
1156
1157            // process commands from the command channel
1158            while let Ok(command) = receiver.try_recv() {
1159                if matches!(command, Command::Exit(_)) {
1160                    self.status = DaemonStatus::Shutdown;
1161                    return Some(command);
1162                }
1163                self.exec_command(command, false);
1164            }
1165
1166            // check for repeated commands and run them if their time is up.
1167            let mut i = 0;
1168            while i < self.retransmissions.len() {
1169                if now >= self.retransmissions[i].next_time {
1170                    let rerun = self.retransmissions.remove(i);
1171                    self.exec_command(rerun.command, true);
1172                } else {
1173                    i += 1;
1174                }
1175            }
1176
1177            // Refresh cached service records with active queriers
1178            self.refresh_active_services();
1179
1180            // Refresh cached A/AAAA records with active queriers
1181            let mut query_count = 0;
1182            for (hostname, _sender) in self.hostname_resolvers.iter() {
1183                for (hostname, ip_addr) in
1184                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1185                {
1186                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1187                    query_count += 1;
1188                }
1189            }
1190
1191            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1192
1193            // check and evict expired records in our cache
1194            let now = current_time_millis();
1195
1196            // Notify service listeners about the expired records.
1197            let expired_services = self.cache.evict_expired_services(now);
1198            if !expired_services.is_empty() {
1199                debug!(
1200                    "run: send {} service removal to listeners",
1201                    expired_services.len()
1202                );
1203                self.notify_service_removal(expired_services);
1204            }
1205
1206            // Notify hostname listeners about the expired records.
1207            let expired_addrs = self.cache.evict_expired_addr(now);
1208            for (hostname, addrs) in expired_addrs {
1209                call_hostname_resolution_listener(
1210                    &self.hostname_resolvers,
1211                    &hostname,
1212                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1213                );
1214                let instances = self.cache.get_instances_on_host(&hostname);
1215                let instance_set: HashSet<String> = instances.into_iter().collect();
1216                self.resolve_updated_instances(&instance_set);
1217            }
1218
1219            // Send out probing queries.
1220            self.probing_handler();
1221
1222            // check IP changes if next_ip_check is reached.
1223            if now >= next_ip_check && next_ip_check > 0 {
1224                next_ip_check = now + self.ip_check_interval;
1225                self.add_timer(next_ip_check);
1226
1227                self.check_ip_changes();
1228            }
1229        }
1230    }
1231
1232    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1233        match daemon_opt {
1234            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1235            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1236            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1237            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1238            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1239            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1240            DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1241            #[cfg(test)]
1242            DaemonOption::TestDownInterface(ifname) => {
1243                self.test_down_interfaces.insert(ifname);
1244            }
1245            #[cfg(test)]
1246            DaemonOption::TestUpInterface(ifname) => {
1247                self.test_down_interfaces.remove(&ifname);
1248            }
1249        }
1250    }
1251
1252    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1253        debug!("enable_interface: {:?}", kinds);
1254        for if_kind in kinds {
1255            self.if_selections.push(IfSelection {
1256                if_kind,
1257                selected: true,
1258            });
1259        }
1260
1261        self.apply_intf_selections(my_ip_interfaces(true));
1262    }
1263
1264    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1265        debug!("disable_interface: {:?}", kinds);
1266        for if_kind in kinds {
1267            self.if_selections.push(IfSelection {
1268                if_kind,
1269                selected: false,
1270            });
1271        }
1272
1273        self.apply_intf_selections(my_ip_interfaces(true));
1274    }
1275
1276    fn set_multicast_loop_v4(&mut self, on: bool) {
1277        let Some(sock) = self.ipv4_sock.as_mut() else {
1278            return;
1279        };
1280        self.multicast_loop_v4 = on;
1281        sock.pktinfo
1282            .set_multicast_loop_v4(on)
1283            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1284            .unwrap();
1285    }
1286
1287    fn set_multicast_loop_v6(&mut self, on: bool) {
1288        let Some(sock) = self.ipv6_sock.as_mut() else {
1289            return;
1290        };
1291        self.multicast_loop_v6 = on;
1292        sock.pktinfo
1293            .set_multicast_loop_v6(on)
1294            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1295            .unwrap();
1296    }
1297
1298    fn set_accept_unsolicited(&mut self, accept: bool) {
1299        self.accept_unsolicited = accept;
1300    }
1301
1302    fn notify_monitors(&mut self, event: DaemonEvent) {
1303        // Only retain the monitors that are still connected.
1304        self.monitors.retain(|sender| {
1305            if let Err(e) = sender.try_send(event.clone()) {
1306                debug!("notify_monitors: try_send: {}", &e);
1307                if matches!(e, TrySendError::Disconnected(_)) {
1308                    return false; // This monitor is dropped.
1309                }
1310            }
1311            true
1312        });
1313    }
1314
1315    /// Remove `addr` in my services that enabled `addr_auto`.
1316    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1317        for (_, service_info) in self.my_services.iter_mut() {
1318            if service_info.is_addr_auto() {
1319                service_info.remove_ipaddr(addr);
1320            }
1321        }
1322    }
1323
1324    fn add_timer(&mut self, next_time: u64) {
1325        self.timers.push(Reverse(next_time));
1326    }
1327
1328    fn peek_earliest_timer(&self) -> Option<u64> {
1329        self.timers.peek().map(|Reverse(v)| *v)
1330    }
1331
1332    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1333        self.timers.pop().map(|Reverse(v)| v)
1334    }
1335
1336    /// Pop all timers that are already passed till `now`.
1337    fn pop_timers_till(&mut self, now: u64) {
1338        while let Some(Reverse(v)) = self.timers.peek() {
1339            if *v > now {
1340                break;
1341            }
1342            self.timers.pop();
1343        }
1344    }
1345
1346    /// Apply all selections to `interfaces` and return the selected addresses.
1347    fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1348        let intf_count = interfaces.len();
1349        let mut intf_selections = vec![true; intf_count];
1350
1351        // apply if_selections
1352        for selection in self.if_selections.iter() {
1353            // Mark the interfaces for this selection.
1354            for i in 0..intf_count {
1355                if selection.if_kind.matches(&interfaces[i]) {
1356                    intf_selections[i] = selection.selected;
1357                }
1358            }
1359        }
1360
1361        let mut selected_addrs = HashSet::new();
1362        for i in 0..intf_count {
1363            if intf_selections[i] {
1364                selected_addrs.insert(interfaces[i].clone());
1365            }
1366        }
1367
1368        selected_addrs
1369    }
1370
1371    /// Apply all selections to `interfaces`.
1372    ///
1373    /// For any interface, add it if selected but not bound yet,
1374    /// delete it if not selected but still bound.
1375    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1376        // By default, we enable all interfaces.
1377        let intf_count = interfaces.len();
1378        let mut intf_selections = vec![true; intf_count];
1379
1380        // apply if_selections
1381        for selection in self.if_selections.iter() {
1382            // Mark the interfaces for this selection.
1383            for i in 0..intf_count {
1384                if selection.if_kind.matches(&interfaces[i]) {
1385                    intf_selections[i] = selection.selected;
1386                }
1387            }
1388        }
1389
1390        // Update `my_intfs` based on the selections.
1391        for (idx, intf) in interfaces.into_iter().enumerate() {
1392            if intf_selections[idx] {
1393                // Add the interface
1394                self.add_interface(intf);
1395            } else {
1396                // Remove the interface
1397                self.del_interface(&intf);
1398            }
1399        }
1400    }
1401
1402    fn del_ip(&mut self, ip: IpAddr) {
1403        self.del_addr_in_my_services(&ip);
1404        self.notify_monitors(DaemonEvent::IpDel(ip));
1405    }
1406
1407    /// Check for IP changes and update [my_intfs] as needed.
1408    fn check_ip_changes(&mut self) {
1409        // Get the current interfaces.
1410        let my_ifaddrs = my_ip_interfaces(true);
1411
1412        #[cfg(test)]
1413        let my_ifaddrs: Vec<_> = my_ifaddrs
1414            .into_iter()
1415            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1416            .collect();
1417
1418        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1419            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1420                let if_index = intf.index.unwrap_or(0);
1421                acc.entry(if_index).or_default().push(&intf.addr);
1422                acc
1423            });
1424
1425        let mut deleted_intfs = Vec::new();
1426        let mut deleted_ips = Vec::new();
1427
1428        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1429            let mut last_ipv4 = None;
1430            let mut last_ipv6 = None;
1431
1432            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1433                my_intf.addrs.retain(|addr| {
1434                    if current_addrs.contains(&addr) {
1435                        true
1436                    } else {
1437                        match addr.ip() {
1438                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1439                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1440                        }
1441                        deleted_ips.push(addr.ip());
1442                        false
1443                    }
1444                });
1445                if my_intf.addrs.is_empty() {
1446                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1447                }
1448            } else {
1449                // If it does not exist, remove the interface.
1450                debug!(
1451                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1452                    my_intf.name, if_index
1453                );
1454                for addr in my_intf.addrs.iter() {
1455                    match addr.ip() {
1456                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1457                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1458                    }
1459                    deleted_ips.push(addr.ip())
1460                }
1461                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1462            }
1463        }
1464
1465        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1466            debug!(
1467                "check_ip_changes: {} deleted ips {} deleted intfs",
1468                deleted_ips.len(),
1469                deleted_intfs.len()
1470            );
1471        }
1472
1473        for ip in deleted_ips {
1474            self.del_ip(ip);
1475        }
1476
1477        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1478            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1479                continue;
1480            };
1481
1482            if let Some(ipv4) = last_ipv4 {
1483                debug!("leave multicast for {ipv4}");
1484                if let Some(sock) = self.ipv4_sock.as_mut() {
1485                    if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1486                        debug!("leave multicast group for addr {ipv4}: {e}");
1487                    }
1488                }
1489            }
1490
1491            if let Some(ipv6) = last_ipv6 {
1492                debug!("leave multicast for {ipv6}");
1493                if let Some(sock) = self.ipv6_sock.as_mut() {
1494                    if let Err(e) = sock
1495                        .pktinfo
1496                        .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1497                    {
1498                        debug!("leave multicast group for IPv6: {ipv6}: {e}");
1499                    }
1500                }
1501            }
1502
1503            // Remove cache records for this interface.
1504            let intf_id = InterfaceId {
1505                name: my_intf.name.to_string(),
1506                index: my_intf.index,
1507            };
1508            let removed_instances = self.cache.remove_records_on_intf(intf_id);
1509            self.notify_service_removal(removed_instances);
1510        }
1511
1512        // Add newly found interfaces only if in our selections.
1513        self.apply_intf_selections(my_ifaddrs);
1514    }
1515
1516    fn del_interface(&mut self, intf: &Interface) {
1517        let if_index = intf.index.unwrap_or(0);
1518        trace!(
1519            "del_interface: {} ({if_index}) addr {}",
1520            intf.name,
1521            intf.ip()
1522        );
1523
1524        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1525            debug!("del_interface: interface {} not found", intf.name);
1526            return;
1527        };
1528
1529        let mut ip_removed = false;
1530
1531        if my_intf.addrs.remove(&intf.addr) {
1532            ip_removed = true;
1533
1534            match intf.addr.ip() {
1535                IpAddr::V4(ipv4) => {
1536                    if my_intf.next_ifaddr_v4().is_none() {
1537                        if let Some(sock) = self.ipv4_sock.as_mut() {
1538                            if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1539                                debug!("leave multicast group for addr {ipv4}: {e}");
1540                            }
1541                        }
1542                    }
1543                }
1544
1545                IpAddr::V6(ipv6) => {
1546                    if my_intf.next_ifaddr_v6().is_none() {
1547                        if let Some(sock) = self.ipv6_sock.as_mut() {
1548                            if let Err(e) =
1549                                sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1550                            {
1551                                debug!("leave multicast group for addr {ipv6}: {e}");
1552                            }
1553                        }
1554                    }
1555                }
1556            }
1557
1558            if my_intf.addrs.is_empty() {
1559                // If no more addresses, remove the interface.
1560                debug!("del_interface: removing interface {}", intf.name);
1561                self.my_intfs.remove(&if_index);
1562                self.dns_registry_map.remove(&if_index);
1563                self.cache.remove_addrs_on_disabled_intf(if_index);
1564            }
1565        }
1566
1567        if ip_removed {
1568            // Notify the monitors.
1569            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1570            // Remove the interface from my services that enabled `addr_auto`.
1571            self.del_addr_in_my_services(&intf.ip());
1572        }
1573    }
1574
1575    fn add_interface(&mut self, intf: Interface) {
1576        let sock_opt = if intf.ip().is_ipv4() {
1577            &self.ipv4_sock
1578        } else {
1579            &self.ipv6_sock
1580        };
1581
1582        let Some(sock) = sock_opt else {
1583            debug!(
1584                "add_interface: no socket available for interface {} with addr {}. Skipped.",
1585                intf.name,
1586                intf.ip()
1587            );
1588            return;
1589        };
1590
1591        let if_index = intf.index.unwrap_or(0);
1592        let mut new_addr = false;
1593
1594        match self.my_intfs.entry(if_index) {
1595            Entry::Occupied(mut entry) => {
1596                // If intf has a new address, add it to the existing interface.
1597                let my_intf = entry.get_mut();
1598                if !my_intf.addrs.contains(&intf.addr) {
1599                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1600                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1601                    }
1602                    my_intf.addrs.insert(intf.addr.clone());
1603                    new_addr = true;
1604                }
1605            }
1606            Entry::Vacant(entry) => {
1607                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1608                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1609                    return;
1610                }
1611
1612                new_addr = true;
1613                let new_intf = MyIntf {
1614                    name: intf.name.clone(),
1615                    index: if_index,
1616                    addrs: HashSet::from([intf.addr.clone()]),
1617                };
1618                entry.insert(new_intf);
1619            }
1620        }
1621
1622        if !new_addr {
1623            trace!("add_interface: interface {} already exists", &intf.name);
1624            return;
1625        }
1626
1627        debug!("add new interface {}: {}", intf.name, intf.ip());
1628
1629        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1630            debug!("add_interface: cannot find if_index {if_index}");
1631            return;
1632        };
1633
1634        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1635            Some(registry) => registry,
1636            None => self
1637                .dns_registry_map
1638                .entry(if_index)
1639                .or_insert_with(DnsRegistry::new),
1640        };
1641
1642        for (_, service_info) in self.my_services.iter_mut() {
1643            if service_info.is_addr_auto() {
1644                service_info.insert_ipaddr(&intf);
1645
1646                if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1647                    debug!(
1648                        "Announce service {} on {}",
1649                        service_info.get_fullname(),
1650                        intf.ip()
1651                    );
1652                    service_info.set_status(if_index, ServiceStatus::Announced);
1653                } else {
1654                    for timer in dns_registry.new_timers.drain(..) {
1655                        self.timers.push(Reverse(timer));
1656                    }
1657                    service_info.set_status(if_index, ServiceStatus::Probing);
1658                }
1659            }
1660        }
1661
1662        // As we added a new interface, we want to execute all active "Browse" reruns now.
1663        let mut browse_reruns = Vec::new();
1664        let mut i = 0;
1665        while i < self.retransmissions.len() {
1666            if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1667                browse_reruns.push(self.retransmissions.remove(i));
1668            } else {
1669                i += 1;
1670            }
1671        }
1672
1673        for rerun in browse_reruns {
1674            self.exec_command(rerun.command, true);
1675        }
1676
1677        // Notify the monitors.
1678        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1679    }
1680
1681    /// Registers a service.
1682    ///
1683    /// RFC 6762 section 8.3.
1684    /// ...the Multicast DNS responder MUST send
1685    ///    an unsolicited Multicast DNS response containing, in the Answer
1686    ///    Section, all of its newly registered resource records
1687    ///
1688    /// Zeroconf will then respond to requests for information about this service.
1689    fn register_service(&mut self, mut info: ServiceInfo) {
1690        // Check the service name length.
1691        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1692            debug!("check_service_name_length: {}", &e);
1693            self.notify_monitors(DaemonEvent::Error(e));
1694            return;
1695        }
1696
1697        if info.is_addr_auto() {
1698            let selected_intfs = self.selected_intfs(my_ip_interfaces(true));
1699            for intf in selected_intfs {
1700                info.insert_ipaddr(&intf);
1701            }
1702        }
1703
1704        debug!("register service {:?}", &info);
1705
1706        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1707        if !outgoing_addrs.is_empty() {
1708            self.notify_monitors(DaemonEvent::Announce(
1709                info.get_fullname().to_string(),
1710                format!("{:?}", &outgoing_addrs),
1711            ));
1712        }
1713
1714        // The key has to be lower case letter as DNS record name is case insensitive.
1715        // The info will have the original name.
1716        let service_fullname = info.get_fullname().to_lowercase();
1717        self.my_services.insert(service_fullname, info);
1718    }
1719
1720    /// Sends out announcement of `info` on every valid interface.
1721    /// Returns the list of interface IPs that sent out the announcement.
1722    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1723        let mut outgoing_addrs = Vec::new();
1724        let mut outgoing_intfs = HashSet::new();
1725
1726        for (if_index, intf) in self.my_intfs.iter() {
1727            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1728                Some(registry) => registry,
1729                None => self
1730                    .dns_registry_map
1731                    .entry(*if_index)
1732                    .or_insert_with(DnsRegistry::new),
1733            };
1734
1735            let mut announced = false;
1736
1737            // IPv4
1738            if let Some(sock) = self.ipv4_sock.as_mut() {
1739                if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo) {
1740                    for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1741                        outgoing_addrs.push(addr.ip());
1742                    }
1743                    outgoing_intfs.insert(intf.index);
1744
1745                    debug!(
1746                        "Announce service IPv4 {} on {}",
1747                        info.get_fullname(),
1748                        intf.name
1749                    );
1750                    announced = true;
1751                }
1752            }
1753
1754            if let Some(sock) = self.ipv6_sock.as_mut() {
1755                if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo) {
1756                    for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1757                        outgoing_addrs.push(addr.ip());
1758                    }
1759                    outgoing_intfs.insert(intf.index);
1760
1761                    debug!(
1762                        "Announce service IPv6 {} on {}",
1763                        info.get_fullname(),
1764                        intf.name
1765                    );
1766                    announced = true;
1767                }
1768            }
1769
1770            if announced {
1771                info.set_status(intf.index, ServiceStatus::Announced);
1772            } else {
1773                for timer in dns_registry.new_timers.drain(..) {
1774                    self.timers.push(Reverse(timer));
1775                }
1776                info.set_status(*if_index, ServiceStatus::Probing);
1777            }
1778        }
1779
1780        // RFC 6762 section 8.3.
1781        // ..The Multicast DNS responder MUST send at least two unsolicited
1782        //    responses, one second apart.
1783        let next_time = current_time_millis() + 1000;
1784        for if_index in outgoing_intfs {
1785            self.add_retransmission(
1786                next_time,
1787                Command::RegisterResend(info.get_fullname().to_string(), if_index),
1788            );
1789        }
1790
1791        outgoing_addrs
1792    }
1793
1794    /// Send probings or finish them if expired. Notify waiting services.
1795    fn probing_handler(&mut self) {
1796        let now = current_time_millis();
1797
1798        for (if_index, intf) in self.my_intfs.iter() {
1799            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1800                continue;
1801            };
1802
1803            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1804
1805            // send probing.
1806            if !out.questions().is_empty() {
1807                trace!("sending out probing of questions: {:?}", out.questions());
1808                if let Some(sock) = self.ipv4_sock.as_mut() {
1809                    send_dns_outgoing(&out, intf, &sock.pktinfo);
1810                }
1811                if let Some(sock) = self.ipv6_sock.as_mut() {
1812                    send_dns_outgoing(&out, intf, &sock.pktinfo);
1813                }
1814            }
1815
1816            // For finished probes, wake up services that are waiting for the probes.
1817            let waiting_services =
1818                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1819
1820            for service_name in waiting_services {
1821                // service names are lowercase
1822                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1823                    if info.get_status(*if_index) == ServiceStatus::Announced {
1824                        debug!("service {} already announced", info.get_fullname());
1825                        continue;
1826                    }
1827
1828                    let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
1829                        announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
1830                    } else {
1831                        false
1832                    };
1833                    let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
1834                        announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
1835                    } else {
1836                        false
1837                    };
1838
1839                    if announced_v4 || announced_v6 {
1840                        let next_time = now + 1000;
1841                        let command =
1842                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1843                        self.retransmissions.push(ReRun { next_time, command });
1844                        self.timers.push(Reverse(next_time));
1845
1846                        let fullname = match dns_registry.name_changes.get(&service_name) {
1847                            Some(new_name) => new_name.to_string(),
1848                            None => service_name.to_string(),
1849                        };
1850
1851                        let mut hostname = info.get_hostname();
1852                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1853                            hostname = new_name;
1854                        }
1855
1856                        debug!("wake up: announce service {} on {}", fullname, intf.name);
1857                        notify_monitors(
1858                            &mut self.monitors,
1859                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1860                        );
1861
1862                        info.set_status(*if_index, ServiceStatus::Announced);
1863                    }
1864                }
1865            }
1866        }
1867    }
1868
1869    fn unregister_service(
1870        &self,
1871        info: &ServiceInfo,
1872        intf: &MyIntf,
1873        sock: &PktInfoUdpSocket,
1874    ) -> Vec<u8> {
1875        let is_ipv4 = sock.domain() == Domain::IPV4;
1876
1877        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1878        out.add_answer_at_time(
1879            DnsPointer::new(
1880                info.get_type(),
1881                RRType::PTR,
1882                CLASS_IN,
1883                0,
1884                info.get_fullname().to_string(),
1885            ),
1886            0,
1887        );
1888
1889        if let Some(sub) = info.get_subtype() {
1890            trace!("Adding subdomain {}", sub);
1891            out.add_answer_at_time(
1892                DnsPointer::new(
1893                    sub,
1894                    RRType::PTR,
1895                    CLASS_IN,
1896                    0,
1897                    info.get_fullname().to_string(),
1898                ),
1899                0,
1900            );
1901        }
1902
1903        out.add_answer_at_time(
1904            DnsSrv::new(
1905                info.get_fullname(),
1906                CLASS_IN | CLASS_CACHE_FLUSH,
1907                0,
1908                info.get_priority(),
1909                info.get_weight(),
1910                info.get_port(),
1911                info.get_hostname().to_string(),
1912            ),
1913            0,
1914        );
1915        out.add_answer_at_time(
1916            DnsTxt::new(
1917                info.get_fullname(),
1918                CLASS_IN | CLASS_CACHE_FLUSH,
1919                0,
1920                info.generate_txt(),
1921            ),
1922            0,
1923        );
1924
1925        let if_addrs = if is_ipv4 {
1926            info.get_addrs_on_my_intf_v4(intf)
1927        } else {
1928            info.get_addrs_on_my_intf_v6(intf)
1929        };
1930
1931        if if_addrs.is_empty() {
1932            return vec![];
1933        }
1934
1935        for address in if_addrs {
1936            out.add_answer_at_time(
1937                DnsAddress::new(
1938                    info.get_hostname(),
1939                    ip_address_rr_type(&address),
1940                    CLASS_IN | CLASS_CACHE_FLUSH,
1941                    0,
1942                    address,
1943                    intf.into(),
1944                ),
1945                0,
1946            );
1947        }
1948
1949        // `out` data is non-empty, hence we can do this.
1950        send_dns_outgoing(&out, intf, sock).remove(0)
1951    }
1952
1953    /// Binds a channel `listener` to querying mDNS hostnames.
1954    ///
1955    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1956    fn add_hostname_resolver(
1957        &mut self,
1958        hostname: String,
1959        listener: Sender<HostnameResolutionEvent>,
1960        timeout: Option<u64>,
1961    ) {
1962        let real_timeout = timeout.map(|t| current_time_millis() + t);
1963        self.hostname_resolvers
1964            .insert(hostname.to_lowercase(), (listener, real_timeout));
1965        if let Some(t) = real_timeout {
1966            self.add_timer(t);
1967        }
1968    }
1969
1970    /// Sends a multicast query for `name` with `qtype`.
1971    fn send_query(&self, name: &str, qtype: RRType) {
1972        self.send_query_vec(&[(name, qtype)]);
1973    }
1974
1975    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1976    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1977        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1978        let now = current_time_millis();
1979
1980        for (name, qtype) in questions {
1981            out.add_question(name, *qtype);
1982
1983            for record in self.cache.get_known_answers(name, *qtype, now) {
1984                /*
1985                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1986                ...
1987                    When a Multicast DNS querier sends a query to which it already knows
1988                    some answers, it populates the Answer Section of the DNS query
1989                    message with those answers.
1990                 */
1991                trace!("add known answer: {:?}", record.record);
1992                let mut new_record = record.record.clone();
1993                new_record.get_record_mut().update_ttl(now);
1994                out.add_answer_box(new_record);
1995            }
1996        }
1997
1998        for (_, intf) in self.my_intfs.iter() {
1999            if let Some(sock) = self.ipv4_sock.as_ref() {
2000                send_dns_outgoing(&out, intf, &sock.pktinfo);
2001            }
2002            if let Some(sock) = self.ipv6_sock.as_ref() {
2003                send_dns_outgoing(&out, intf, &sock.pktinfo);
2004            }
2005        }
2006    }
2007
2008    /// Reads one UDP datagram from the socket of `intf`.
2009    ///
2010    /// Returns false if failed to receive a packet,
2011    /// otherwise returns true.
2012    fn handle_read(&mut self, event_key: usize) -> bool {
2013        let sock_opt = match event_key {
2014            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2015            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2016            _ => {
2017                debug!("handle_read: unknown token {}", event_key);
2018                return false;
2019            }
2020        };
2021        let Some(sock) = sock_opt.as_mut() else {
2022            debug!("handle_read: socket not available for token {}", event_key);
2023            return false;
2024        };
2025        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2026
2027        // Read the next mDNS UDP datagram.
2028        //
2029        // If the datagram is larger than `buf`, excess bytes may or may not
2030        // be truncated by the socket layer depending on the platform's libc.
2031        // In any case, such large datagram will not be decoded properly and
2032        // this function should return false but should not crash.
2033        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2034            Ok(sz) => sz,
2035            Err(e) => {
2036                if e.kind() != std::io::ErrorKind::WouldBlock {
2037                    debug!("listening socket read failed: {}", e);
2038                }
2039                return false;
2040            }
2041        };
2042
2043        // Find the interface that received the packet.
2044        let pkt_if_index = pktinfo.if_index as u32;
2045        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2046            debug!(
2047                "handle_read: no interface found for pktinfo if_index: {}",
2048                pktinfo.if_index
2049            );
2050            return true; // We still return true to indicate that we read something.
2051        };
2052
2053        buf.truncate(sz); // reduce potential processing errors
2054
2055        match DnsIncoming::new(buf, my_intf.into()) {
2056            Ok(msg) => {
2057                if msg.is_query() {
2058                    self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2059                } else if msg.is_response() {
2060                    self.handle_response(msg, pkt_if_index);
2061                } else {
2062                    debug!("Invalid message: not query and not response");
2063                }
2064            }
2065            Err(e) => debug!("Invalid incoming DNS message: {}", e),
2066        }
2067
2068        true
2069    }
2070
2071    /// Returns true, if sent query. Returns false if SRV already exists.
2072    fn query_unresolved(&mut self, instance: &str) -> bool {
2073        if !valid_instance_name(instance) {
2074            trace!("instance name {} not valid", instance);
2075            return false;
2076        }
2077
2078        if let Some(records) = self.cache.get_srv(instance) {
2079            for record in records {
2080                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2081                    if self.cache.get_addr(srv.host()).is_none() {
2082                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2083                        return true;
2084                    }
2085                }
2086            }
2087        } else {
2088            self.send_query(instance, RRType::ANY);
2089            return true;
2090        }
2091
2092        false
2093    }
2094
2095    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2096    /// cached records via `sender`.
2097    fn query_cache_for_service(
2098        &mut self,
2099        ty_domain: &str,
2100        sender: &Sender<ServiceEvent>,
2101        now: u64,
2102    ) {
2103        let mut resolved: HashSet<String> = HashSet::new();
2104        let mut unresolved: HashSet<String> = HashSet::new();
2105
2106        if let Some(records) = self.cache.get_ptr(ty_domain) {
2107            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2108                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2109                    let mut new_event = None;
2110                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2111                        Ok(resolved_service) => {
2112                            if resolved_service.is_valid() {
2113                                debug!("Resolved service from cache: {}", ptr.alias());
2114                                new_event =
2115                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2116                            } else {
2117                                debug!("Resolved service is not valid: {}", ptr.alias());
2118                            }
2119                        }
2120                        Err(err) => {
2121                            debug!("Error while resolving service from cache: {}", err);
2122                            continue;
2123                        }
2124                    }
2125
2126                    match sender.send(ServiceEvent::ServiceFound(
2127                        ty_domain.to_string(),
2128                        ptr.alias().to_string(),
2129                    )) {
2130                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2131                        Err(e) => {
2132                            debug!("failed to send service found: {}", e);
2133                            continue;
2134                        }
2135                    }
2136
2137                    if let Some(event) = new_event {
2138                        resolved.insert(ptr.alias().to_string());
2139                        match sender.send(event) {
2140                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2141                            Err(e) => debug!("failed to send service resolved: {}", e),
2142                        }
2143                    } else {
2144                        unresolved.insert(ptr.alias().to_string());
2145                    }
2146                }
2147            }
2148        }
2149
2150        for instance in resolved.drain() {
2151            self.pending_resolves.remove(&instance);
2152            self.resolved.insert(instance);
2153        }
2154
2155        for instance in unresolved.drain() {
2156            self.add_pending_resolve(instance);
2157        }
2158    }
2159
2160    /// Checks if `hostname` has records in the cache. If yes, sends the
2161    /// cached records via `sender`.
2162    fn query_cache_for_hostname(
2163        &mut self,
2164        hostname: &str,
2165        sender: Sender<HostnameResolutionEvent>,
2166    ) {
2167        let addresses_map = self.cache.get_addresses_for_host(hostname);
2168        for (name, addresses) in addresses_map {
2169            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2170                Ok(()) => trace!("sent hostname addresses found"),
2171                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2172            }
2173        }
2174    }
2175
2176    fn add_pending_resolve(&mut self, instance: String) {
2177        if !self.pending_resolves.contains(&instance) {
2178            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2179            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2180            self.pending_resolves.insert(instance);
2181        }
2182    }
2183
2184    /// Creates a `ResolvedService` from the cache.
2185    fn resolve_service_from_cache(
2186        &self,
2187        ty_domain: &str,
2188        fullname: &str,
2189    ) -> Result<ResolvedService> {
2190        let now = current_time_millis();
2191        let mut resolved_service = ResolvedService {
2192            ty_domain: ty_domain.to_string(),
2193            sub_ty_domain: None,
2194            fullname: fullname.to_string(),
2195            host: String::new(),
2196            port: 0,
2197            addresses: HashSet::new(),
2198            txt_properties: TxtProperties::new(),
2199        };
2200
2201        // Be sure setting `subtype` if available even when querying for the parent domain.
2202        if let Some(subtype) = self.cache.get_subtype(fullname) {
2203            trace!(
2204                "ty_domain: {} found subtype {} for instance: {}",
2205                ty_domain,
2206                subtype,
2207                fullname
2208            );
2209            if resolved_service.sub_ty_domain.is_none() {
2210                resolved_service.sub_ty_domain = Some(subtype.to_string());
2211            }
2212        }
2213
2214        // resolve SRV record
2215        if let Some(records) = self.cache.get_srv(fullname) {
2216            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2217                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2218                    resolved_service.host = dns_srv.host().to_string();
2219                    resolved_service.port = dns_srv.port();
2220                }
2221            }
2222        }
2223
2224        // resolve TXT record
2225        if let Some(records) = self.cache.get_txt(fullname) {
2226            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2227                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2228                    resolved_service.txt_properties = dns_txt.text().into();
2229                }
2230            }
2231        }
2232
2233        // resolve A and AAAA records
2234        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2235            for answer in records.iter() {
2236                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2237                    if dns_a.expires_soon(now) {
2238                        trace!(
2239                            "Addr expired or expires soon: {}",
2240                            dns_a.address().to_ip_addr()
2241                        );
2242                    } else {
2243                        resolved_service.addresses.insert(dns_a.address());
2244                    }
2245                }
2246            }
2247        }
2248
2249        Ok(resolved_service)
2250    }
2251
2252    fn handle_poller_events(&mut self, events: &mio::Events) {
2253        for ev in events.iter() {
2254            trace!("event received with key {:?}", ev.token());
2255            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2256                // Drain signals as we will drain commands as well.
2257                self.signal_sock_drain();
2258
2259                if let Err(e) = self.poller.registry().reregister(
2260                    &mut self.signal_sock,
2261                    ev.token(),
2262                    mio::Interest::READABLE,
2263                ) {
2264                    debug!("failed to modify poller for signal socket: {}", e);
2265                }
2266                continue; // Next event.
2267            }
2268
2269            // Read until no more packets available.
2270            while self.handle_read(ev.token().0) {}
2271
2272            // we continue to monitor this socket.
2273            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2274                // Re-register the IPv4 socket for reading.
2275                if let Some(sock) = self.ipv4_sock.as_mut() {
2276                    if let Err(e) =
2277                        self.poller
2278                            .registry()
2279                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2280                    {
2281                        debug!("modify poller for IPv4 socket: {}", e);
2282                    }
2283                }
2284            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2285                // Re-register the IPv6 socket for reading.
2286                if let Some(sock) = self.ipv6_sock.as_mut() {
2287                    if let Err(e) =
2288                        self.poller
2289                            .registry()
2290                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2291                    {
2292                        debug!("modify poller for IPv6 socket: {}", e);
2293                    }
2294                }
2295            }
2296        }
2297    }
2298
2299    /// Deal with incoming response packets.  All answers
2300    /// are held in the cache, and listeners are notified.
2301    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2302        let now = current_time_millis();
2303
2304        // remove records that are expired.
2305        let mut record_predicate = |record: &DnsRecordBox| {
2306            if !record.get_record().is_expired(now) {
2307                return true;
2308            }
2309
2310            debug!("record is expired, removing it from cache.");
2311            if self.cache.remove(record) {
2312                // for PTR records, send event to listeners
2313                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2314                    call_service_listener(
2315                        &self.service_queriers,
2316                        dns_ptr.get_name(),
2317                        ServiceEvent::ServiceRemoved(
2318                            dns_ptr.get_name().to_string(),
2319                            dns_ptr.alias().to_string(),
2320                        ),
2321                    );
2322                }
2323            }
2324            false
2325        };
2326        msg.answers_mut().retain(&mut record_predicate);
2327        msg.authorities_mut().retain(&mut record_predicate);
2328        msg.additionals_mut().retain(&mut record_predicate);
2329
2330        // check possible conflicts and handle them.
2331        self.conflict_handler(&msg, if_index);
2332
2333        // check if the message is for us.
2334        let mut is_for_us = true; // assume it is for us.
2335
2336        // If there are any PTR records in the answers, there should be
2337        // at least one PTR for us. Otherwise, the message is not for us.
2338        // If there are no PTR records at all, assume this message is for us.
2339        for answer in msg.answers() {
2340            if answer.get_type() == RRType::PTR {
2341                if self.service_queriers.contains_key(answer.get_name()) {
2342                    is_for_us = true;
2343                    break; // OK to break: at least one PTR for us.
2344                } else {
2345                    is_for_us = false;
2346                }
2347            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2348                // If there is a hostname querier for this address, then it is for us.
2349                let answer_lowercase = answer.get_name().to_lowercase();
2350                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2351                    is_for_us = true;
2352                    break; // OK to break: at least one hostname for us.
2353                }
2354            }
2355        }
2356
2357        // if we explicitily want to accept unsolicited responses, we should consider all messages as for us.
2358        if self.accept_unsolicited {
2359            is_for_us = true;
2360        }
2361
2362        /// Represents a DNS record change that involves one service instance.
2363        struct InstanceChange {
2364            ty: RRType,   // The type of DNS record for the instance.
2365            name: String, // The name of the record.
2366        }
2367
2368        // Go through all answers to get the new and updated records.
2369        // For new PTR records, send out ServiceFound immediately. For others,
2370        // collect them into `changes`.
2371        //
2372        // Note: we don't try to identify the update instances based on
2373        // each record immediately as the answers are likely related to each
2374        // other.
2375        let mut changes = Vec::new();
2376        let mut timers = Vec::new();
2377        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2378            return;
2379        };
2380        for record in msg.all_records() {
2381            match self
2382                .cache
2383                .add_or_update(my_intf, record, &mut timers, is_for_us)
2384            {
2385                Some((dns_record, true)) => {
2386                    timers.push(dns_record.record.get_record().get_expire_time());
2387                    timers.push(dns_record.record.get_record().get_refresh_time());
2388
2389                    let ty = dns_record.record.get_type();
2390                    let name = dns_record.record.get_name();
2391
2392                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2393                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2394                        if self.service_queriers.contains_key(name) {
2395                            timers.push(dns_record.record.get_record().get_refresh_time());
2396                        }
2397
2398                        // send ServiceFound
2399                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2400                        {
2401                            debug!("calling listener with service found: {name}");
2402                            call_service_listener(
2403                                &self.service_queriers,
2404                                name,
2405                                ServiceEvent::ServiceFound(
2406                                    name.to_string(),
2407                                    dns_ptr.alias().to_string(),
2408                                ),
2409                            );
2410                            changes.push(InstanceChange {
2411                                ty,
2412                                name: dns_ptr.alias().to_string(),
2413                            });
2414                        }
2415                    } else {
2416                        changes.push(InstanceChange {
2417                            ty,
2418                            name: name.to_string(),
2419                        });
2420                    }
2421                }
2422                Some((dns_record, false)) => {
2423                    timers.push(dns_record.record.get_record().get_expire_time());
2424                    timers.push(dns_record.record.get_record().get_refresh_time());
2425                }
2426                _ => {}
2427            }
2428        }
2429
2430        // Add timers for the new records.
2431        for t in timers {
2432            self.add_timer(t);
2433        }
2434
2435        // Go through remaining changes to see if any hostname resolutions were found or updated.
2436        for change in changes
2437            .iter()
2438            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2439        {
2440            let addr_map = self.cache.get_addresses_for_host(&change.name);
2441            for (name, addresses) in addr_map {
2442                call_hostname_resolution_listener(
2443                    &self.hostname_resolvers,
2444                    &change.name,
2445                    HostnameResolutionEvent::AddressesFound(name, addresses),
2446                )
2447            }
2448        }
2449
2450        // Identify the instances that need to be "resolved".
2451        let mut updated_instances = HashSet::new();
2452        for update in changes {
2453            match update.ty {
2454                RRType::PTR | RRType::SRV | RRType::TXT => {
2455                    updated_instances.insert(update.name);
2456                }
2457                RRType::A | RRType::AAAA => {
2458                    let instances = self.cache.get_instances_on_host(&update.name);
2459                    updated_instances.extend(instances);
2460                }
2461                _ => {}
2462            }
2463        }
2464
2465        self.resolve_updated_instances(&updated_instances);
2466    }
2467
2468    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2469        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2470            debug!("handle_response: no intf found for index {if_index}");
2471            return;
2472        };
2473
2474        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2475            return;
2476        };
2477
2478        for answer in msg.answers().iter() {
2479            let mut new_records = Vec::new();
2480
2481            let name = answer.get_name();
2482            let Some(probe) = dns_registry.probing.get_mut(name) else {
2483                continue;
2484            };
2485
2486            // check against possible multicast forwarding
2487            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2488                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2489                    if answer_addr.interface_id.index != if_index {
2490                        debug!(
2491                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2492                            answer_addr, my_intf.name
2493                        );
2494                        continue;
2495                    }
2496                }
2497
2498                // double check if any other address record matches rrdata,
2499                // as there could be multiple addresses for the same name.
2500                let any_match = probe.records.iter().any(|r| {
2501                    r.get_type() == answer.get_type()
2502                        && r.get_class() == answer.get_class()
2503                        && r.rrdata_match(answer.as_ref())
2504                });
2505                if any_match {
2506                    continue; // no conflict for this answer.
2507                }
2508            }
2509
2510            probe.records.retain(|record| {
2511                if record.get_type() == answer.get_type()
2512                    && record.get_class() == answer.get_class()
2513                    && !record.rrdata_match(answer.as_ref())
2514                {
2515                    debug!(
2516                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2517                        record.get_type(),
2518                        record.rdata_print(),
2519                        answer.rdata_print()
2520                    );
2521
2522                    // create a new name for this record
2523                    // then remove the old record in probing.
2524                    let mut new_record = record.clone();
2525                    let new_name = match record.get_type() {
2526                        RRType::A => hostname_change(name),
2527                        RRType::AAAA => hostname_change(name),
2528                        _ => name_change(name),
2529                    };
2530                    new_record.get_record_mut().set_new_name(new_name);
2531                    new_records.push(new_record);
2532                    return false; // old record is dropped from the probe.
2533                }
2534
2535                true
2536            });
2537
2538            // ?????
2539            // if probe.records.is_empty() {
2540            //     dns_registry.probing.remove(name);
2541            // }
2542
2543            // Probing again with the new names.
2544            let create_time = current_time_millis() + fastrand::u64(0..250);
2545
2546            let waiting_services = probe.waiting_services.clone();
2547
2548            for record in new_records {
2549                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2550                    self.timers.push(Reverse(create_time));
2551                }
2552
2553                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2554                dns_registry.name_changes.insert(
2555                    record.get_record().get_original_name().to_string(),
2556                    record.get_name().to_string(),
2557                );
2558
2559                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2560                    Some(p) => p,
2561                    None => {
2562                        let new_probe = dns_registry
2563                            .probing
2564                            .entry(record.get_name().to_string())
2565                            .or_insert_with(|| {
2566                                debug!("conflict handler: new probe of {}", record.get_name());
2567                                Probe::new(create_time)
2568                            });
2569                        self.timers.push(Reverse(new_probe.next_send));
2570                        new_probe
2571                    }
2572                };
2573
2574                debug!(
2575                    "insert record with new name '{}' {} into probe",
2576                    record.get_name(),
2577                    record.get_type()
2578                );
2579                new_probe.insert_record(record);
2580
2581                new_probe.waiting_services.extend(waiting_services.clone());
2582            }
2583        }
2584    }
2585
2586    /// Resolve the updated (including new) instances.
2587    ///
2588    /// Note: it is possible that more than 1 PTR pointing to the same
2589    /// instance. For example, a regular service type PTR and a sub-type
2590    /// service type PTR can both point to the same service instance.
2591    /// This loop automatically handles the sub-type PTRs.
2592    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2593        let mut resolved: HashSet<String> = HashSet::new();
2594        let mut unresolved: HashSet<String> = HashSet::new();
2595        let mut removed_instances = HashMap::new();
2596
2597        let now = current_time_millis();
2598
2599        for (ty_domain, records) in self.cache.all_ptr().iter() {
2600            if !self.service_queriers.contains_key(ty_domain) {
2601                // No need to resolve if not in our queries.
2602                continue;
2603            }
2604
2605            for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2606                let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2607                    continue;
2608                };
2609
2610                let instance = dns_ptr.alias();
2611                if !updated_instances.contains(instance) {
2612                    continue;
2613                }
2614
2615                let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2616                else {
2617                    continue;
2618                };
2619
2620                debug!("resolve_updated_instances: from cache: {instance}");
2621                if resolved_service.is_valid() {
2622                    debug!("call queriers to resolve {instance}");
2623                    resolved.insert(instance.to_string());
2624                    let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2625                    call_service_listener(&self.service_queriers, ty_domain, event);
2626                } else {
2627                    debug!("Resolved service is not valid: {instance}");
2628                    if self.resolved.remove(dns_ptr.alias()) {
2629                        removed_instances
2630                            .entry(ty_domain.to_string())
2631                            .or_insert_with(HashSet::new)
2632                            .insert(instance.to_string());
2633                    }
2634                    unresolved.insert(instance.to_string());
2635                }
2636            }
2637        }
2638
2639        for instance in resolved.drain() {
2640            self.pending_resolves.remove(&instance);
2641            self.resolved.insert(instance);
2642        }
2643
2644        for instance in unresolved.drain() {
2645            self.add_pending_resolve(instance);
2646        }
2647
2648        if !removed_instances.is_empty() {
2649            debug!(
2650                "resolve_updated_instances: removed {}",
2651                &removed_instances.len()
2652            );
2653            self.notify_service_removal(removed_instances);
2654        }
2655    }
2656
2657    /// Handle incoming query packets, figure out whether and what to respond.
2658    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2659        let sock_opt = if is_ipv4 {
2660            &self.ipv4_sock
2661        } else {
2662            &self.ipv6_sock
2663        };
2664        let Some(sock) = sock_opt.as_ref() else {
2665            debug!("handle_query: socket not available for intf {}", if_index);
2666            return;
2667        };
2668        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2669
2670        // Special meta-query "_services._dns-sd._udp.<Domain>".
2671        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2672        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2673
2674        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2675            debug!("missing dns registry for intf {}", if_index);
2676            return;
2677        };
2678
2679        let Some(intf) = self.my_intfs.get(&if_index) else {
2680            debug!("handle_query: no intf found for index {if_index}");
2681            return;
2682        };
2683
2684        for question in msg.questions().iter() {
2685            let qtype = question.entry_type();
2686
2687            if qtype == RRType::PTR {
2688                for service in self.my_services.values() {
2689                    if service.get_status(if_index) != ServiceStatus::Announced {
2690                        continue;
2691                    }
2692
2693                    if question.entry_name() == service.get_type()
2694                        || service
2695                            .get_subtype()
2696                            .as_ref()
2697                            .is_some_and(|v| v == question.entry_name())
2698                    {
2699                        add_answer_with_additionals(
2700                            &mut out,
2701                            &msg,
2702                            service,
2703                            intf,
2704                            dns_registry,
2705                            is_ipv4,
2706                        );
2707                    } else if question.entry_name() == META_QUERY {
2708                        let ptr_added = out.add_answer(
2709                            &msg,
2710                            DnsPointer::new(
2711                                question.entry_name(),
2712                                RRType::PTR,
2713                                CLASS_IN,
2714                                service.get_other_ttl(),
2715                                service.get_type().to_string(),
2716                            ),
2717                        );
2718                        if !ptr_added {
2719                            trace!("answer was not added for meta-query {:?}", &question);
2720                        }
2721                    }
2722                }
2723            } else {
2724                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2725                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2726                    let probe_name = question.entry_name();
2727
2728                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2729                        let now = current_time_millis();
2730
2731                        // Only do tiebreaking if probe already started.
2732                        // This check also helps avoid redo tiebreaking if start time
2733                        // was postponed.
2734                        if probe.start_time < now {
2735                            let incoming_records: Vec<_> = msg
2736                                .authorities()
2737                                .iter()
2738                                .filter(|r| r.get_name() == probe_name)
2739                                .collect();
2740
2741                            probe.tiebreaking(&incoming_records, now, probe_name);
2742                        }
2743                    }
2744                }
2745
2746                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2747                    for service in self.my_services.values() {
2748                        if service.get_status(if_index) != ServiceStatus::Announced {
2749                            continue;
2750                        }
2751
2752                        let service_hostname =
2753                            match dns_registry.name_changes.get(service.get_hostname()) {
2754                                Some(new_name) => new_name,
2755                                None => service.get_hostname(),
2756                            };
2757
2758                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2759                            let intf_addrs = if is_ipv4 {
2760                                service.get_addrs_on_my_intf_v4(intf)
2761                            } else {
2762                                service.get_addrs_on_my_intf_v6(intf)
2763                            };
2764                            if intf_addrs.is_empty()
2765                                && (qtype == RRType::A || qtype == RRType::AAAA)
2766                            {
2767                                let t = match qtype {
2768                                    RRType::A => "TYPE_A",
2769                                    RRType::AAAA => "TYPE_AAAA",
2770                                    _ => "invalid_type",
2771                                };
2772                                trace!(
2773                                    "Cannot find valid addrs for {} response on intf {:?}",
2774                                    t,
2775                                    &intf
2776                                );
2777                                return;
2778                            }
2779                            for address in intf_addrs {
2780                                out.add_answer(
2781                                    &msg,
2782                                    DnsAddress::new(
2783                                        service_hostname,
2784                                        ip_address_rr_type(&address),
2785                                        CLASS_IN | CLASS_CACHE_FLUSH,
2786                                        service.get_host_ttl(),
2787                                        address,
2788                                        intf.into(),
2789                                    ),
2790                                );
2791                            }
2792                        }
2793                    }
2794                }
2795
2796                let query_name = question.entry_name().to_lowercase();
2797                let service_opt = self
2798                    .my_services
2799                    .iter()
2800                    .find(|(k, _v)| {
2801                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2802                            Some(new_name) => new_name,
2803                            None => k,
2804                        };
2805                        service_name == &query_name
2806                    })
2807                    .map(|(_, v)| v);
2808
2809                let Some(service) = service_opt else {
2810                    continue;
2811                };
2812
2813                if service.get_status(if_index) != ServiceStatus::Announced {
2814                    continue;
2815                }
2816
2817                let intf_addrs = if is_ipv4 {
2818                    service.get_addrs_on_my_intf_v4(intf)
2819                } else {
2820                    service.get_addrs_on_my_intf_v6(intf)
2821                };
2822                if intf_addrs.is_empty() {
2823                    debug!(
2824                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2825                        &intf
2826                    );
2827                    continue;
2828                }
2829
2830                add_answer_of_service(
2831                    &mut out,
2832                    &msg,
2833                    question.entry_name(),
2834                    service,
2835                    qtype,
2836                    intf_addrs,
2837                );
2838            }
2839        }
2840
2841        if out.answers_count() > 0 {
2842            debug!("sending response on intf {}", &intf.name);
2843            out.set_id(msg.id());
2844            send_dns_outgoing(&out, intf, &sock.pktinfo);
2845
2846            let if_name = intf.name.clone();
2847
2848            self.increase_counter(Counter::Respond, 1);
2849            self.notify_monitors(DaemonEvent::Respond(if_name));
2850        }
2851
2852        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2853    }
2854
2855    /// Increases the value of `counter` by `count`.
2856    fn increase_counter(&mut self, counter: Counter, count: i64) {
2857        let key = counter.to_string();
2858        match self.counters.get_mut(&key) {
2859            Some(v) => *v += count,
2860            None => {
2861                self.counters.insert(key, count);
2862            }
2863        }
2864    }
2865
2866    /// Sets the value of `counter` to `count`.
2867    fn set_counter(&mut self, counter: Counter, count: i64) {
2868        let key = counter.to_string();
2869        self.counters.insert(key, count);
2870    }
2871
2872    fn signal_sock_drain(&self) {
2873        let mut signal_buf = [0; 1024];
2874
2875        // This recv is non-blocking as the socket is non-blocking.
2876        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2877            trace!(
2878                "signal socket recvd: {}",
2879                String::from_utf8_lossy(&signal_buf[0..sz])
2880            );
2881        }
2882    }
2883
2884    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2885        self.retransmissions.push(ReRun { next_time, command });
2886        self.add_timer(next_time);
2887    }
2888
2889    /// Sends service removal event to listeners for expired service records.
2890    /// `expired`: map of service type domain to set of instance names.
2891    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2892        for (ty_domain, sender) in self.service_queriers.iter() {
2893            if let Some(instances) = expired.get(ty_domain) {
2894                for instance_name in instances {
2895                    let event = ServiceEvent::ServiceRemoved(
2896                        ty_domain.to_string(),
2897                        instance_name.to_string(),
2898                    );
2899                    match sender.send(event) {
2900                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2901                        Err(e) => debug!("Failed to send event: {}", e),
2902                    }
2903                }
2904            }
2905        }
2906    }
2907
2908    /// The entry point that executes all commands received by the daemon.
2909    ///
2910    /// `repeating`: whether this is a retransmission.
2911    fn exec_command(&mut self, command: Command, repeating: bool) {
2912        trace!("exec_command: {:?} repeating: {}", &command, repeating);
2913        match command {
2914            Command::Browse(ty, next_delay, cache_only, listener) => {
2915                self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
2916            }
2917
2918            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2919                self.exec_command_resolve_hostname(
2920                    repeating, hostname, next_delay, listener, timeout,
2921                );
2922            }
2923
2924            Command::Register(service_info) => {
2925                self.register_service(*service_info);
2926                self.increase_counter(Counter::Register, 1);
2927            }
2928
2929            Command::RegisterResend(fullname, intf) => {
2930                trace!("register-resend service: {fullname} on {}", &intf);
2931                self.exec_command_register_resend(fullname, intf);
2932            }
2933
2934            Command::Unregister(fullname, resp_s) => {
2935                trace!("unregister service {} repeat {}", &fullname, &repeating);
2936                self.exec_command_unregister(repeating, fullname, resp_s);
2937            }
2938
2939            Command::UnregisterResend(packet, if_index, is_ipv4) => {
2940                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2941            }
2942
2943            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2944
2945            Command::StopResolveHostname(hostname) => {
2946                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2947            }
2948
2949            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2950
2951            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2952
2953            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2954                Ok(()) => trace!("Sent status to the client"),
2955                Err(e) => debug!("Failed to send status: {}", e),
2956            },
2957
2958            Command::Monitor(resp_s) => {
2959                self.monitors.push(resp_s);
2960            }
2961
2962            Command::SetOption(daemon_opt) => {
2963                self.process_set_option(daemon_opt);
2964            }
2965
2966            Command::GetOption(resp_s) => {
2967                let val = DaemonOptionVal {
2968                    _service_name_len_max: self.service_name_len_max,
2969                    ip_check_interval: self.ip_check_interval,
2970                };
2971                if let Err(e) = resp_s.send(val) {
2972                    debug!("Failed to send options: {}", e);
2973                }
2974            }
2975
2976            Command::Verify(instance_fullname, timeout) => {
2977                self.exec_command_verify(instance_fullname, timeout, repeating);
2978            }
2979
2980            _ => {
2981                debug!("unexpected command: {:?}", &command);
2982            }
2983        }
2984    }
2985
2986    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2987        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2988        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2989        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2990        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2991        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2992        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2993        self.set_counter(Counter::Timer, self.timers.len() as i64);
2994
2995        let dns_registry_probe_count: usize = self
2996            .dns_registry_map
2997            .values()
2998            .map(|r| r.probing.len())
2999            .sum();
3000        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3001
3002        let dns_registry_active_count: usize = self
3003            .dns_registry_map
3004            .values()
3005            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3006            .sum();
3007        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3008
3009        let dns_registry_timer_count: usize = self
3010            .dns_registry_map
3011            .values()
3012            .map(|r| r.new_timers.len())
3013            .sum();
3014        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3015
3016        let dns_registry_name_change_count: usize = self
3017            .dns_registry_map
3018            .values()
3019            .map(|r| r.name_changes.len())
3020            .sum();
3021        self.set_counter(
3022            Counter::DnsRegistryNameChange,
3023            dns_registry_name_change_count as i64,
3024        );
3025
3026        // Send the metrics to the client.
3027        if let Err(e) = resp_s.send(self.counters.clone()) {
3028            debug!("Failed to send metrics: {}", e);
3029        }
3030    }
3031
3032    fn exec_command_browse(
3033        &mut self,
3034        repeating: bool,
3035        ty: String,
3036        next_delay: u32,
3037        cache_only: bool,
3038        listener: Sender<ServiceEvent>,
3039    ) {
3040        let pretty_addrs: Vec<String> = self
3041            .my_intfs
3042            .iter()
3043            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3044            .collect();
3045
3046        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3047            "{ty} on {} interfaces [{}]",
3048            pretty_addrs.len(),
3049            pretty_addrs.join(", ")
3050        ))) {
3051            debug!(
3052                "Failed to send SearchStarted({})(repeating:{}): {}",
3053                &ty, repeating, e
3054            );
3055            return;
3056        }
3057
3058        let now = current_time_millis();
3059        if !repeating {
3060            // Binds a `listener` to querying mDNS domain type `ty`.
3061            //
3062            // If there is already a `listener`, it will be updated, i.e. overwritten.
3063            self.service_queriers.insert(ty.clone(), listener.clone());
3064
3065            // if we already have the records in our cache, just send them
3066            self.query_cache_for_service(&ty, &listener, now);
3067        }
3068
3069        if cache_only {
3070            // If cache_only is true, we do not send a query.
3071            match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3072                Ok(()) => debug!("SearchStopped sent for {}", &ty),
3073                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3074            }
3075            return;
3076        }
3077
3078        self.send_query(&ty, RRType::PTR);
3079        self.increase_counter(Counter::Browse, 1);
3080
3081        let next_time = now + (next_delay * 1000) as u64;
3082        let max_delay = 60 * 60;
3083        let delay = cmp::min(next_delay * 2, max_delay);
3084        self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3085    }
3086
3087    fn exec_command_resolve_hostname(
3088        &mut self,
3089        repeating: bool,
3090        hostname: String,
3091        next_delay: u32,
3092        listener: Sender<HostnameResolutionEvent>,
3093        timeout: Option<u64>,
3094    ) {
3095        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3096        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3097            "{} on addrs {:?}",
3098            &hostname, &addr_list
3099        ))) {
3100            debug!(
3101                "Failed to send ResolveStarted({})(repeating:{}): {}",
3102                &hostname, repeating, e
3103            );
3104            return;
3105        }
3106        if !repeating {
3107            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3108            // if we already have the records in our cache, just send them
3109            self.query_cache_for_hostname(&hostname, listener.clone());
3110        }
3111
3112        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3113        self.increase_counter(Counter::ResolveHostname, 1);
3114
3115        let now = current_time_millis();
3116        let next_time = now + u64::from(next_delay) * 1000;
3117        let max_delay = 60 * 60;
3118        let delay = cmp::min(next_delay * 2, max_delay);
3119
3120        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3121        if self
3122            .hostname_resolvers
3123            .get(&hostname)
3124            .and_then(|(_sender, timeout)| *timeout)
3125            .map(|timeout| next_time < timeout)
3126            .unwrap_or(true)
3127        {
3128            self.add_retransmission(
3129                next_time,
3130                Command::ResolveHostname(hostname, delay, listener, None),
3131            );
3132        }
3133    }
3134
3135    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3136        let pending_query = self.query_unresolved(&instance);
3137        let max_try = 3;
3138        if pending_query && try_count < max_try {
3139            // Note that if the current try already succeeds, the next retransmission
3140            // will be no-op as the cache has been updated.
3141            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3142            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3143        }
3144    }
3145
3146    fn exec_command_unregister(
3147        &mut self,
3148        repeating: bool,
3149        fullname: String,
3150        resp_s: Sender<UnregisterStatus>,
3151    ) {
3152        let response = match self.my_services.remove_entry(&fullname) {
3153            None => {
3154                debug!("unregister: cannot find such service {}", &fullname);
3155                UnregisterStatus::NotFound
3156            }
3157            Some((_k, info)) => {
3158                let mut timers = Vec::new();
3159
3160                for (if_index, intf) in self.my_intfs.iter() {
3161                    if let Some(sock) = self.ipv4_sock.as_ref() {
3162                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3163                        // repeat for one time just in case some peers miss the message
3164                        if !repeating && !packet.is_empty() {
3165                            let next_time = current_time_millis() + 120;
3166                            self.retransmissions.push(ReRun {
3167                                next_time,
3168                                command: Command::UnregisterResend(packet, *if_index, true),
3169                            });
3170                            timers.push(next_time);
3171                        }
3172                    }
3173
3174                    // ipv6
3175                    if let Some(sock) = self.ipv6_sock.as_ref() {
3176                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3177                        if !repeating && !packet.is_empty() {
3178                            let next_time = current_time_millis() + 120;
3179                            self.retransmissions.push(ReRun {
3180                                next_time,
3181                                command: Command::UnregisterResend(packet, *if_index, false),
3182                            });
3183                            timers.push(next_time);
3184                        }
3185                    }
3186                }
3187
3188                for t in timers {
3189                    self.add_timer(t);
3190                }
3191
3192                self.increase_counter(Counter::Unregister, 1);
3193                UnregisterStatus::OK
3194            }
3195        };
3196        if let Err(e) = resp_s.send(response) {
3197            debug!("unregister: failed to send response: {}", e);
3198        }
3199    }
3200
3201    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3202        let Some(intf) = self.my_intfs.get(&if_index) else {
3203            return;
3204        };
3205        let sock_opt = if is_ipv4 {
3206            &self.ipv4_sock
3207        } else {
3208            &self.ipv6_sock
3209        };
3210        let Some(sock) = sock_opt else {
3211            return;
3212        };
3213
3214        let if_addr = if is_ipv4 {
3215            match intf.next_ifaddr_v4() {
3216                Some(addr) => addr,
3217                None => return,
3218            }
3219        } else {
3220            match intf.next_ifaddr_v6() {
3221                Some(addr) => addr,
3222                None => return,
3223            }
3224        };
3225
3226        debug!("UnregisterResend from {:?}", if_addr);
3227        multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, &sock.pktinfo);
3228
3229        self.increase_counter(Counter::UnregisterResend, 1);
3230    }
3231
3232    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3233        match self.service_queriers.remove_entry(&ty_domain) {
3234            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3235            Some((ty, sender)) => {
3236                // Remove pending browse commands in the reruns.
3237                trace!("StopBrowse: removed queryer for {}", &ty);
3238                let mut i = 0;
3239                while i < self.retransmissions.len() {
3240                    if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3241                        if t == &ty {
3242                            self.retransmissions.remove(i);
3243                            trace!("StopBrowse: removed retransmission for {}", &ty);
3244                            continue;
3245                        }
3246                    }
3247                    i += 1;
3248                }
3249
3250                // Remove cache entries.
3251                self.cache.remove_service_type(&ty_domain);
3252
3253                // Notify the client.
3254                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3255                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3256                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3257                }
3258            }
3259        }
3260    }
3261
3262    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3263        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3264            // Remove pending resolve commands in the reruns.
3265            trace!("StopResolve: removed queryer for {}", &host);
3266            let mut i = 0;
3267            while i < self.retransmissions.len() {
3268                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3269                    if t == &host {
3270                        self.retransmissions.remove(i);
3271                        trace!("StopResolve: removed retransmission for {}", &host);
3272                        continue;
3273                    }
3274                }
3275                i += 1;
3276            }
3277
3278            // Notify the client.
3279            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3280                Ok(()) => trace!("Sent SearchStopped to the listener"),
3281                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3282            }
3283        }
3284    }
3285
3286    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3287        let Some(info) = self.my_services.get_mut(&fullname) else {
3288            trace!("announce: cannot find such service {}", &fullname);
3289            return;
3290        };
3291
3292        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3293            return;
3294        };
3295
3296        let Some(intf) = self.my_intfs.get(&if_index) else {
3297            return;
3298        };
3299
3300        let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3301            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
3302        } else {
3303            false
3304        };
3305        let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3306            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
3307        } else {
3308            false
3309        };
3310
3311        if announced_v4 || announced_v6 {
3312            let mut hostname = info.get_hostname();
3313            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3314                hostname = new_name;
3315            }
3316            let service_name = match dns_registry.name_changes.get(&fullname) {
3317                Some(new_name) => new_name.to_string(),
3318                None => fullname,
3319            };
3320
3321            debug!("resend: announce service {service_name} on {}", intf.name);
3322
3323            notify_monitors(
3324                &mut self.monitors,
3325                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3326            );
3327            info.set_status(if_index, ServiceStatus::Announced);
3328        } else {
3329            debug!("register-resend should not fail");
3330        }
3331
3332        self.increase_counter(Counter::RegisterResend, 1);
3333    }
3334
3335    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3336        /*
3337        RFC 6762 section 10.4:
3338        ...
3339        When the cache receives this hint that it should reconfirm some
3340        record, it MUST issue two or more queries for the resource record in
3341        dispute.  If no response is received within ten seconds, then, even
3342        though its TTL may indicate that it is not yet due to expire, that
3343        record SHOULD be promptly flushed from the cache.
3344        */
3345        let now = current_time_millis();
3346        let expire_at = if repeating {
3347            None
3348        } else {
3349            Some(now + timeout.as_millis() as u64)
3350        };
3351
3352        // send query for the resource records.
3353        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3354
3355        if !record_vec.is_empty() {
3356            let query_vec: Vec<(&str, RRType)> = record_vec
3357                .iter()
3358                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3359                .collect();
3360            self.send_query_vec(&query_vec);
3361
3362            if let Some(new_expire) = expire_at {
3363                self.add_timer(new_expire); // ensure a check for the new expire time.
3364
3365                // schedule a resend 1 second later
3366                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3367            }
3368        }
3369    }
3370
3371    /// Refresh cached service records with active queriers
3372    fn refresh_active_services(&mut self) {
3373        let mut query_ptr_count = 0;
3374        let mut query_srv_count = 0;
3375        let mut new_timers = HashSet::new();
3376        let mut query_addr_count = 0;
3377
3378        for (ty_domain, _sender) in self.service_queriers.iter() {
3379            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3380            if !refreshed_timers.is_empty() {
3381                trace!("sending refresh query for PTR: {}", ty_domain);
3382                self.send_query(ty_domain, RRType::PTR);
3383                query_ptr_count += 1;
3384                new_timers.extend(refreshed_timers);
3385            }
3386
3387            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3388            for (instance, types) in instances {
3389                trace!("sending refresh query for: {}", &instance);
3390                let query_vec = types
3391                    .into_iter()
3392                    .map(|ty| (instance.as_str(), ty))
3393                    .collect::<Vec<_>>();
3394                self.send_query_vec(&query_vec);
3395                query_srv_count += 1;
3396            }
3397            new_timers.extend(timers);
3398            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3399            for hostname in hostnames.iter() {
3400                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3401                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3402                query_addr_count += 2;
3403            }
3404            new_timers.extend(timers);
3405        }
3406
3407        for timer in new_timers {
3408            self.add_timer(timer);
3409        }
3410
3411        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3412        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3413        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3414    }
3415}
3416
3417/// Adds one or more answers of a service for incoming msg and RR entry name.
3418fn add_answer_of_service(
3419    out: &mut DnsOutgoing,
3420    msg: &DnsIncoming,
3421    entry_name: &str,
3422    service: &ServiceInfo,
3423    qtype: RRType,
3424    intf_addrs: Vec<IpAddr>,
3425) {
3426    if qtype == RRType::SRV || qtype == RRType::ANY {
3427        out.add_answer(
3428            msg,
3429            DnsSrv::new(
3430                entry_name,
3431                CLASS_IN | CLASS_CACHE_FLUSH,
3432                service.get_host_ttl(),
3433                service.get_priority(),
3434                service.get_weight(),
3435                service.get_port(),
3436                service.get_hostname().to_string(),
3437            ),
3438        );
3439    }
3440
3441    if qtype == RRType::TXT || qtype == RRType::ANY {
3442        out.add_answer(
3443            msg,
3444            DnsTxt::new(
3445                entry_name,
3446                CLASS_IN | CLASS_CACHE_FLUSH,
3447                service.get_other_ttl(),
3448                service.generate_txt(),
3449            ),
3450        );
3451    }
3452
3453    if qtype == RRType::SRV {
3454        for address in intf_addrs {
3455            out.add_additional_answer(DnsAddress::new(
3456                service.get_hostname(),
3457                ip_address_rr_type(&address),
3458                CLASS_IN | CLASS_CACHE_FLUSH,
3459                service.get_host_ttl(),
3460                address,
3461                InterfaceId::default(),
3462            ));
3463        }
3464    }
3465}
3466
3467/// All possible events sent to the client from the daemon
3468/// regarding service discovery.
3469#[derive(Clone, Debug)]
3470#[non_exhaustive]
3471pub enum ServiceEvent {
3472    /// Started searching for a service type.
3473    SearchStarted(String),
3474
3475    /// Found a specific (service_type, fullname).
3476    ServiceFound(String, String),
3477
3478    /// Resolved a service instance in a ResolvedService struct.
3479    ServiceResolved(Box<ResolvedService>),
3480
3481    /// A service instance (service_type, fullname) was removed.
3482    ServiceRemoved(String, String),
3483
3484    /// Stopped searching for a service type.
3485    SearchStopped(String),
3486}
3487
3488/// All possible events sent to the client from the daemon
3489/// regarding host resolution.
3490#[derive(Clone, Debug)]
3491#[non_exhaustive]
3492pub enum HostnameResolutionEvent {
3493    /// Started searching for the ip address of a hostname.
3494    SearchStarted(String),
3495    /// One or more addresses for a hostname has been found.
3496    AddressesFound(String, HashSet<ScopedIp>),
3497    /// One or more addresses for a hostname has been removed.
3498    AddressesRemoved(String, HashSet<ScopedIp>),
3499    /// The search for the ip address of a hostname has timed out.
3500    SearchTimeout(String),
3501    /// Stopped searching for the ip address of a hostname.
3502    SearchStopped(String),
3503}
3504
3505/// Some notable events from the daemon besides [`ServiceEvent`].
3506/// These events are expected to happen infrequently.
3507#[derive(Clone, Debug)]
3508#[non_exhaustive]
3509pub enum DaemonEvent {
3510    /// Daemon unsolicitly announced a service from an interface.
3511    Announce(String, String),
3512
3513    /// Daemon encountered an error.
3514    Error(Error),
3515
3516    /// Daemon detected a new IP address from the host.
3517    IpAdd(IpAddr),
3518
3519    /// Daemon detected a IP address removed from the host.
3520    IpDel(IpAddr),
3521
3522    /// Daemon resolved a name conflict by changing one of its names.
3523    /// see [DnsNameChange] for more details.
3524    NameChange(DnsNameChange),
3525
3526    /// Send out a multicast response via an interface.
3527    Respond(String),
3528}
3529
3530/// Represents a name change due to a name conflict resolution.
3531/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3532#[derive(Clone, Debug)]
3533pub struct DnsNameChange {
3534    /// The original name set in `ServiceInfo` by the user.
3535    pub original: String,
3536
3537    /// A new name is created by appending a suffix after the original name.
3538    ///
3539    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3540    /// - for a host name, the suffix is `-N`, where N starts at 2.
3541    ///
3542    /// For example:
3543    ///
3544    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3545    /// - Host name `foo.local.` becomes `foo-2.local.`
3546    pub new_name: String,
3547
3548    /// The resource record type
3549    pub rr_type: RRType,
3550
3551    /// The interface where the name conflict and its change happened.
3552    pub intf_name: String,
3553}
3554
3555/// Commands supported by the daemon
3556#[derive(Debug)]
3557enum Command {
3558    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3559    Browse(String, u32, bool, Sender<ServiceEvent>),
3560
3561    /// Resolve a hostname to IP addresses.
3562    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3563
3564    /// Register a service
3565    Register(Box<ServiceInfo>),
3566
3567    /// Unregister a service
3568    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3569
3570    /// Announce again a service to local network
3571    RegisterResend(String, u32), // (fullname)
3572
3573    /// Resend unregister packet.
3574    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
3575
3576    /// Stop browsing a service type
3577    StopBrowse(String), // (ty_domain)
3578
3579    /// Stop resolving a hostname
3580    StopResolveHostname(String), // (hostname)
3581
3582    /// Send query to resolve a service instance.
3583    /// This is used when a PTR record exists but SRV & TXT records are missing.
3584    Resolve(String, u16), // (service_instance_fullname, try_count)
3585
3586    /// Read the current values of the counters
3587    GetMetrics(Sender<Metrics>),
3588
3589    /// Get the current status of the daemon.
3590    GetStatus(Sender<DaemonStatus>),
3591
3592    /// Monitor noticeable events in the daemon.
3593    Monitor(Sender<DaemonEvent>),
3594
3595    SetOption(DaemonOption),
3596
3597    GetOption(Sender<DaemonOptionVal>),
3598
3599    /// Proactively confirm a DNS resource record.
3600    ///
3601    /// The intention is to check if a service name or IP address still valid
3602    /// before its TTL expires.
3603    Verify(String, Duration),
3604
3605    Exit(Sender<DaemonStatus>),
3606}
3607
3608impl fmt::Display for Command {
3609    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3610        match self {
3611            Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3612            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3613            Self::Exit(_) => write!(f, "Command Exit"),
3614            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3615            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3616            Self::Monitor(_) => write!(f, "Command Monitor"),
3617            Self::Register(_) => write!(f, "Command Register"),
3618            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3619            Self::SetOption(_) => write!(f, "Command SetOption"),
3620            Self::GetOption(_) => write!(f, "Command GetOption"),
3621            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3622            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3623            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3624            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3625            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3626            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3627        }
3628    }
3629}
3630
3631struct DaemonOptionVal {
3632    _service_name_len_max: u8,
3633    ip_check_interval: u64,
3634}
3635
3636#[derive(Debug)]
3637enum DaemonOption {
3638    ServiceNameLenMax(u8),
3639    IpCheckInterval(u64),
3640    EnableInterface(Vec<IfKind>),
3641    DisableInterface(Vec<IfKind>),
3642    MulticastLoopV4(bool),
3643    MulticastLoopV6(bool),
3644    AcceptUnsolicited(bool),
3645    #[cfg(test)]
3646    TestDownInterface(String),
3647    #[cfg(test)]
3648    TestUpInterface(String),
3649}
3650
3651/// The length of Service Domain name supported in this lib.
3652const DOMAIN_LEN: usize = "._tcp.local.".len();
3653
3654/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3655fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3656    if ty_domain.len() <= DOMAIN_LEN + 1 {
3657        // service name cannot be empty or only '_'.
3658        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3659    }
3660
3661    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3662    if service_name_len > limit as usize {
3663        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3664    }
3665    Ok(())
3666}
3667
3668/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3669fn check_domain_suffix(name: &str) -> Result<()> {
3670    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3671        return Err(e_fmt!(
3672            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3673            name
3674        ));
3675    }
3676
3677    Ok(())
3678}
3679
3680/// Validate the service name in a fully qualified name.
3681///
3682/// A Full Name = <Instance>.<Service>.<Domain>
3683/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3684///
3685/// Note: this function does not check for the length of the service name.
3686/// Instead, `register_service` method will check the length.
3687fn check_service_name(fullname: &str) -> Result<()> {
3688    check_domain_suffix(fullname)?;
3689
3690    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3691    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3692
3693    if &name[0..1] != "_" {
3694        return Err(e_fmt!("Service name must start with '_'"));
3695    }
3696
3697    let name = &name[1..];
3698
3699    if name.contains("--") {
3700        return Err(e_fmt!("Service name must not contain '--'"));
3701    }
3702
3703    if name.starts_with('-') || name.ends_with('-') {
3704        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3705    }
3706
3707    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3708    if ascii_count < 1 {
3709        return Err(e_fmt!(
3710            "Service name must contain at least one letter (eg: 'A-Za-z')"
3711        ));
3712    }
3713
3714    Ok(())
3715}
3716
3717/// Validate a hostname.
3718fn check_hostname(hostname: &str) -> Result<()> {
3719    if !hostname.ends_with(".local.") {
3720        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3721    }
3722
3723    if hostname == ".local." {
3724        return Err(e_fmt!(
3725            "The part of the hostname before '.local.' cannot be empty"
3726        ));
3727    }
3728
3729    if hostname.len() > 255 {
3730        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3731    }
3732
3733    Ok(())
3734}
3735
3736fn call_service_listener(
3737    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3738    ty_domain: &str,
3739    event: ServiceEvent,
3740) {
3741    if let Some(listener) = listeners_map.get(ty_domain) {
3742        match listener.send(event) {
3743            Ok(()) => trace!("Sent event to listener successfully"),
3744            Err(e) => debug!("Failed to send event: {}", e),
3745        }
3746    }
3747}
3748
3749fn call_hostname_resolution_listener(
3750    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3751    hostname: &str,
3752    event: HostnameResolutionEvent,
3753) {
3754    let hostname_lower = hostname.to_lowercase();
3755    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3756        match listener.send(event) {
3757            Ok(()) => trace!("Sent event to listener successfully"),
3758            Err(e) => debug!("Failed to send event: {}", e),
3759        }
3760    }
3761}
3762
3763/// Returns valid network interfaces in the host system.
3764/// Operational down interfaces are excluded.
3765/// Loopback interfaces are excluded if `with_loopback` is false.
3766fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3767    if_addrs::get_if_addrs()
3768        .unwrap_or_default()
3769        .into_iter()
3770        .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3771        .collect()
3772}
3773
3774fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3775    let if_name = &my_intf.name;
3776
3777    let if_addr = if sock.domain() == Domain::IPV4 {
3778        match my_intf.next_ifaddr_v4() {
3779            Some(addr) => addr,
3780            None => return vec![],
3781        }
3782    } else {
3783        match my_intf.next_ifaddr_v6() {
3784            Some(addr) => addr,
3785            None => return vec![],
3786        }
3787    };
3788
3789    send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3790}
3791
3792/// Send an outgoing mDNS query or response, and returns the packet bytes.
3793fn send_dns_outgoing_impl(
3794    out: &DnsOutgoing,
3795    if_name: &str,
3796    if_index: u32,
3797    if_addr: &IfAddr,
3798    sock: &PktInfoUdpSocket,
3799) -> Vec<Vec<u8>> {
3800    let qtype = if out.is_query() {
3801        "query"
3802    } else {
3803        if out.answers_count() == 0 && out.additionals().is_empty() {
3804            return vec![]; // no need to send empty response
3805        }
3806        "response"
3807    };
3808    trace!(
3809        "send {}: {} questions {} answers {} authorities {} additional",
3810        qtype,
3811        out.questions().len(),
3812        out.answers_count(),
3813        out.authorities().len(),
3814        out.additionals().len()
3815    );
3816
3817    match if_addr.ip() {
3818        IpAddr::V4(ipv4) => {
3819            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3820                debug!(
3821                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3822                    ipv4, e
3823                );
3824                return vec![]; // cannot send without a valid interface
3825            }
3826        }
3827        IpAddr::V6(ipv6) => {
3828            if let Err(e) = sock.set_multicast_if_v6(if_index) {
3829                debug!(
3830                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3831                    ipv6, e
3832                );
3833                return vec![]; // cannot send without a valid interface
3834            }
3835        }
3836    }
3837
3838    let packet_list = out.to_data_on_wire();
3839    for packet in packet_list.iter() {
3840        multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3841    }
3842    packet_list
3843}
3844
3845/// Sends a multicast packet, and returns the packet bytes.
3846fn multicast_on_intf(
3847    packet: &[u8],
3848    if_name: &str,
3849    if_index: u32,
3850    if_addr: &IfAddr,
3851    socket: &PktInfoUdpSocket,
3852) {
3853    if packet.len() > MAX_MSG_ABSOLUTE {
3854        debug!("Drop over-sized packet ({})", packet.len());
3855        return;
3856    }
3857
3858    let addr: SocketAddr = match if_addr {
3859        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3860        if_addrs::IfAddr::V6(_) => {
3861            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3862            sock.set_scope_id(if_index); // Choose iface for multicast
3863            sock.into()
3864        }
3865    };
3866
3867    // Sends out `packet` to `addr` on the socket.
3868    let sock_addr = addr.into();
3869    match socket.send_to(packet, &sock_addr) {
3870        Ok(sz) => trace!(
3871            "sent out {} bytes on interface {} (idx {}) addr {}",
3872            sz,
3873            if_name,
3874            if_index,
3875            if_addr.ip()
3876        ),
3877        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3878    }
3879}
3880
3881/// Returns true if `name` is a valid instance name of format:
3882/// <instance>.<service_type>.<_udp|_tcp>.local.
3883/// Note: <instance> could contain '.' as well.
3884fn valid_instance_name(name: &str) -> bool {
3885    name.split('.').count() >= 5
3886}
3887
3888fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3889    monitors.retain(|sender| {
3890        if let Err(e) = sender.try_send(event.clone()) {
3891            debug!("notify_monitors: try_send: {}", &e);
3892            if matches!(e, TrySendError::Disconnected(_)) {
3893                return false; // This monitor is dropped.
3894            }
3895        }
3896        true
3897    });
3898}
3899
3900/// Check if all unique records passed "probing", and if yes, create a packet
3901/// to announce the service.
3902fn prepare_announce(
3903    info: &ServiceInfo,
3904    intf: &MyIntf,
3905    dns_registry: &mut DnsRegistry,
3906    is_ipv4: bool,
3907) -> Option<DnsOutgoing> {
3908    let intf_addrs = if is_ipv4 {
3909        info.get_addrs_on_my_intf_v4(intf)
3910    } else {
3911        info.get_addrs_on_my_intf_v6(intf)
3912    };
3913
3914    if intf_addrs.is_empty() {
3915        debug!(
3916            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3917            &intf.name
3918        );
3919        return None;
3920    }
3921
3922    // check if we changed our name due to conflicts.
3923    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3924        Some(new_name) => new_name,
3925        None => info.get_fullname(),
3926    };
3927
3928    debug!(
3929        "prepare to announce service {service_fullname} on {:?}",
3930        &intf_addrs
3931    );
3932
3933    let mut probing_count = 0;
3934    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3935    let create_time = current_time_millis() + fastrand::u64(0..250);
3936
3937    out.add_answer_at_time(
3938        DnsPointer::new(
3939            info.get_type(),
3940            RRType::PTR,
3941            CLASS_IN,
3942            info.get_other_ttl(),
3943            service_fullname.to_string(),
3944        ),
3945        0,
3946    );
3947
3948    if let Some(sub) = info.get_subtype() {
3949        trace!("Adding subdomain {}", sub);
3950        out.add_answer_at_time(
3951            DnsPointer::new(
3952                sub,
3953                RRType::PTR,
3954                CLASS_IN,
3955                info.get_other_ttl(),
3956                service_fullname.to_string(),
3957            ),
3958            0,
3959        );
3960    }
3961
3962    // SRV records.
3963    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3964        Some(new_name) => new_name.to_string(),
3965        None => info.get_hostname().to_string(),
3966    };
3967
3968    let mut srv = DnsSrv::new(
3969        info.get_fullname(),
3970        CLASS_IN | CLASS_CACHE_FLUSH,
3971        info.get_host_ttl(),
3972        info.get_priority(),
3973        info.get_weight(),
3974        info.get_port(),
3975        hostname,
3976    );
3977
3978    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3979        srv.get_record_mut().set_new_name(new_name.to_string());
3980    }
3981
3982    if !info.requires_probe()
3983        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3984    {
3985        out.add_answer_at_time(srv, 0);
3986    } else {
3987        probing_count += 1;
3988    }
3989
3990    // TXT records.
3991
3992    let mut txt = DnsTxt::new(
3993        info.get_fullname(),
3994        CLASS_IN | CLASS_CACHE_FLUSH,
3995        info.get_other_ttl(),
3996        info.generate_txt(),
3997    );
3998
3999    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4000        txt.get_record_mut().set_new_name(new_name.to_string());
4001    }
4002
4003    if !info.requires_probe()
4004        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4005    {
4006        out.add_answer_at_time(txt, 0);
4007    } else {
4008        probing_count += 1;
4009    }
4010
4011    // Address records. (A and AAAA)
4012
4013    let hostname = info.get_hostname();
4014    for address in intf_addrs {
4015        let mut dns_addr = DnsAddress::new(
4016            hostname,
4017            ip_address_rr_type(&address),
4018            CLASS_IN | CLASS_CACHE_FLUSH,
4019            info.get_host_ttl(),
4020            address,
4021            intf.into(),
4022        );
4023
4024        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4025            dns_addr.get_record_mut().set_new_name(new_name.to_string());
4026        }
4027
4028        if !info.requires_probe()
4029            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4030        {
4031            out.add_answer_at_time(dns_addr, 0);
4032        } else {
4033            probing_count += 1;
4034        }
4035    }
4036
4037    if probing_count > 0 {
4038        return None;
4039    }
4040
4041    Some(out)
4042}
4043
4044/// Send an unsolicited response for owned service via `intf` and `sock`.
4045/// Returns true if sent out successfully for IPv4 or IPv6.
4046fn announce_service_on_intf(
4047    dns_registry: &mut DnsRegistry,
4048    info: &ServiceInfo,
4049    intf: &MyIntf,
4050    sock: &PktInfoUdpSocket,
4051) -> bool {
4052    let is_ipv4 = sock.domain() == Domain::IPV4;
4053    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4054        send_dns_outgoing(&out, intf, sock);
4055        return true;
4056    }
4057
4058    false
4059}
4060
4061/// Returns a new name based on the `original` to avoid conflicts.
4062/// If the name already contains a number in parentheses, increments that number.
4063///
4064/// Examples:
4065/// - `foo.local.` becomes `foo (2).local.`
4066/// - `foo (2).local.` becomes `foo (3).local.`
4067/// - `foo (9)` becomes `foo (10)`
4068fn name_change(original: &str) -> String {
4069    let mut parts: Vec<_> = original.split('.').collect();
4070    let Some(first_part) = parts.get_mut(0) else {
4071        return format!("{original} (2)");
4072    };
4073
4074    let mut new_name = format!("{first_part} (2)");
4075
4076    // check if there is already has `(<num>)` suffix.
4077    if let Some(paren_pos) = first_part.rfind(" (") {
4078        // Check if there's a closing parenthesis
4079        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4080            let absolute_end_pos = paren_pos + end_paren;
4081            // Only process if the closing parenthesis is the last character
4082            if absolute_end_pos == first_part.len() - 1 {
4083                let num_start = paren_pos + 2; // Skip " ("
4084                                               // Try to parse the number between parentheses
4085                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4086                    let base_name = &first_part[..paren_pos];
4087                    new_name = format!("{} ({})", base_name, number + 1)
4088                }
4089            }
4090        }
4091    }
4092
4093    *first_part = &new_name;
4094    parts.join(".")
4095}
4096
4097/// Returns a new name based on the `original` to avoid conflicts.
4098/// If the name already contains a hyphenated number, increments that number.
4099///
4100/// Examples:
4101/// - `foo.local.` becomes `foo-2.local.`
4102/// - `foo-2.local.` becomes `foo-3.local.`
4103/// - `foo` becomes `foo-2`
4104fn hostname_change(original: &str) -> String {
4105    let mut parts: Vec<_> = original.split('.').collect();
4106    let Some(first_part) = parts.get_mut(0) else {
4107        return format!("{original}-2");
4108    };
4109
4110    let mut new_name = format!("{first_part}-2");
4111
4112    // check if there is already a `-<num>` suffix
4113    if let Some(hyphen_pos) = first_part.rfind('-') {
4114        // Try to parse everything after the hyphen as a number
4115        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4116            let base_name = &first_part[..hyphen_pos];
4117            new_name = format!("{}-{}", base_name, number + 1);
4118        }
4119    }
4120
4121    *first_part = &new_name;
4122    parts.join(".")
4123}
4124
4125fn add_answer_with_additionals(
4126    out: &mut DnsOutgoing,
4127    msg: &DnsIncoming,
4128    service: &ServiceInfo,
4129    intf: &MyIntf,
4130    dns_registry: &DnsRegistry,
4131    is_ipv4: bool,
4132) {
4133    let intf_addrs = if is_ipv4 {
4134        service.get_addrs_on_my_intf_v4(intf)
4135    } else {
4136        service.get_addrs_on_my_intf_v6(intf)
4137    };
4138    if intf_addrs.is_empty() {
4139        trace!("No addrs on LAN of intf {:?}", intf);
4140        return;
4141    }
4142
4143    // check if we changed our name due to conflicts.
4144    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4145        Some(new_name) => new_name,
4146        None => service.get_fullname(),
4147    };
4148
4149    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4150        Some(new_name) => new_name,
4151        None => service.get_hostname(),
4152    };
4153
4154    let ptr_added = out.add_answer(
4155        msg,
4156        DnsPointer::new(
4157            service.get_type(),
4158            RRType::PTR,
4159            CLASS_IN,
4160            service.get_other_ttl(),
4161            service_fullname.to_string(),
4162        ),
4163    );
4164
4165    if !ptr_added {
4166        trace!("answer was not added for msg {:?}", msg);
4167        return;
4168    }
4169
4170    if let Some(sub) = service.get_subtype() {
4171        trace!("Adding subdomain {}", sub);
4172        out.add_additional_answer(DnsPointer::new(
4173            sub,
4174            RRType::PTR,
4175            CLASS_IN,
4176            service.get_other_ttl(),
4177            service_fullname.to_string(),
4178        ));
4179    }
4180
4181    // Add recommended additional answers according to
4182    // https://tools.ietf.org/html/rfc6763#section-12.1.
4183    out.add_additional_answer(DnsSrv::new(
4184        service_fullname,
4185        CLASS_IN | CLASS_CACHE_FLUSH,
4186        service.get_host_ttl(),
4187        service.get_priority(),
4188        service.get_weight(),
4189        service.get_port(),
4190        hostname.to_string(),
4191    ));
4192
4193    out.add_additional_answer(DnsTxt::new(
4194        service_fullname,
4195        CLASS_IN | CLASS_CACHE_FLUSH,
4196        service.get_other_ttl(),
4197        service.generate_txt(),
4198    ));
4199
4200    for address in intf_addrs {
4201        out.add_additional_answer(DnsAddress::new(
4202            hostname,
4203            ip_address_rr_type(&address),
4204            CLASS_IN | CLASS_CACHE_FLUSH,
4205            service.get_host_ttl(),
4206            address,
4207            intf.into(),
4208        ));
4209    }
4210}
4211
4212/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4213/// that are finished.
4214fn check_probing(
4215    dns_registry: &mut DnsRegistry,
4216    timers: &mut BinaryHeap<Reverse<u64>>,
4217    now: u64,
4218) -> (DnsOutgoing, Vec<String>) {
4219    let mut expired_probes = Vec::new();
4220    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4221
4222    for (name, probe) in dns_registry.probing.iter_mut() {
4223        if now >= probe.next_send {
4224            if probe.expired(now) {
4225                // move the record to active
4226                expired_probes.push(name.clone());
4227            } else {
4228                out.add_question(name, RRType::ANY);
4229
4230                /*
4231                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4232                ...
4233                for tiebreaking to work correctly in all
4234                cases, the Authority Section must contain *all* the records and
4235                proposed rdata being probed for uniqueness.
4236                    */
4237                for record in probe.records.iter() {
4238                    out.add_authority(record.clone());
4239                }
4240
4241                probe.update_next_send(now);
4242
4243                // add timer
4244                timers.push(Reverse(probe.next_send));
4245            }
4246        }
4247    }
4248
4249    (out, expired_probes)
4250}
4251
4252/// Process expired probes on an interface and return a list of services
4253/// that are waiting for the probe to finish.
4254///
4255/// `DnsNameChange` events are sent to the monitors.
4256fn handle_expired_probes(
4257    expired_probes: Vec<String>,
4258    intf_name: &str,
4259    dns_registry: &mut DnsRegistry,
4260    monitors: &mut Vec<Sender<DaemonEvent>>,
4261) -> HashSet<String> {
4262    let mut waiting_services = HashSet::new();
4263
4264    for name in expired_probes {
4265        let Some(probe) = dns_registry.probing.remove(&name) else {
4266            continue;
4267        };
4268
4269        // send notifications about name changes
4270        for record in probe.records.iter() {
4271            if let Some(new_name) = record.get_record().get_new_name() {
4272                dns_registry
4273                    .name_changes
4274                    .insert(name.clone(), new_name.to_string());
4275
4276                let event = DnsNameChange {
4277                    original: record.get_record().get_original_name().to_string(),
4278                    new_name: new_name.to_string(),
4279                    rr_type: record.get_type(),
4280                    intf_name: intf_name.to_string(),
4281                };
4282                debug!("Name change event: {:?}", &event);
4283                notify_monitors(monitors, DaemonEvent::NameChange(event));
4284            }
4285        }
4286
4287        // move RR from probe to active.
4288        debug!(
4289            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4290            probe.records.len(),
4291            probe.waiting_services.len(),
4292        );
4293
4294        // Move records to active and plan to wake up services if records are not empty.
4295        if !probe.records.is_empty() {
4296            match dns_registry.active.get_mut(&name) {
4297                Some(records) => {
4298                    records.extend(probe.records);
4299                }
4300                None => {
4301                    dns_registry.active.insert(name, probe.records);
4302                }
4303            }
4304
4305            waiting_services.extend(probe.waiting_services);
4306        }
4307    }
4308
4309    waiting_services
4310}
4311
4312#[cfg(test)]
4313mod tests {
4314    use super::{
4315        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4316        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4317        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo,
4318    };
4319    use crate::{
4320        dns_parser::{
4321            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4322            FLAGS_AA, FLAGS_QR_RESPONSE,
4323        },
4324        service_daemon::{add_answer_of_service, check_hostname},
4325    };
4326    use std::time::{Duration, SystemTime};
4327    use test_log::test;
4328
4329    #[test]
4330    fn test_instance_name() {
4331        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4332        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4333        assert!(!valid_instance_name("_printer._tcp.local."));
4334    }
4335
4336    #[test]
4337    fn test_check_service_name_length() {
4338        let result = check_service_name_length("_tcp", 100);
4339        assert!(result.is_err());
4340        if let Err(e) = result {
4341            println!("{}", e);
4342        }
4343    }
4344
4345    #[test]
4346    fn test_check_hostname() {
4347        // valid hostnames
4348        for hostname in &[
4349            "my_host.local.",
4350            &("A".repeat(255 - ".local.".len()) + ".local."),
4351        ] {
4352            let result = check_hostname(hostname);
4353            assert!(result.is_ok());
4354        }
4355
4356        // erroneous hostnames
4357        for hostname in &[
4358            "my_host.local",
4359            ".local.",
4360            &("A".repeat(256 - ".local.".len()) + ".local."),
4361        ] {
4362            let result = check_hostname(hostname);
4363            assert!(result.is_err());
4364            if let Err(e) = result {
4365                println!("{}", e);
4366            }
4367        }
4368    }
4369
4370    #[test]
4371    fn test_check_domain_suffix() {
4372        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4373        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4374        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4375        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4376        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4377        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4378    }
4379
4380    #[test]
4381    fn test_service_with_temporarily_invalidated_ptr() {
4382        // Create a daemon
4383        let d = ServiceDaemon::new().expect("Failed to create daemon");
4384
4385        let service = "_test_inval_ptr._udp.local.";
4386        let host_name = "my_host_tmp_invalidated_ptr.local.";
4387        let intfs: Vec<_> = my_ip_interfaces(false);
4388        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4389        let port = 5201;
4390        let my_service =
4391            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4392                .expect("invalid service info")
4393                .enable_addr_auto();
4394        let result = d.register(my_service.clone());
4395        assert!(result.is_ok());
4396
4397        // Browse for a service
4398        let browse_chan = d.browse(service).unwrap();
4399        let timeout = Duration::from_secs(2);
4400        let mut resolved = false;
4401
4402        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4403            match event {
4404                ServiceEvent::ServiceResolved(info) => {
4405                    resolved = true;
4406                    println!("Resolved a service of {}", &info.fullname);
4407                    break;
4408                }
4409                e => {
4410                    println!("Received event {:?}", e);
4411                }
4412            }
4413        }
4414
4415        assert!(resolved);
4416
4417        println!("Stopping browse of {}", service);
4418        // Pause browsing so restarting will cause a new immediate query.
4419        // Unregistering will not work here, it will invalidate all the records.
4420        d.stop_browse(service).unwrap();
4421
4422        // Ensure the search is stopped.
4423        // Reduces the chance of receiving an answer adding the ptr back to the
4424        // cache causing the later browse to return directly from the cache.
4425        // (which invalidates what this test is trying to test for.)
4426        let mut stopped = false;
4427        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4428            match event {
4429                ServiceEvent::SearchStopped(_) => {
4430                    stopped = true;
4431                    println!("Stopped browsing service");
4432                    break;
4433                }
4434                // Other `ServiceResolved` messages may be received
4435                // here as they come from different interfaces.
4436                // That's fine for this test.
4437                e => {
4438                    println!("Received event {:?}", e);
4439                }
4440            }
4441        }
4442
4443        assert!(stopped);
4444
4445        // Invalidate the ptr from the service to the host.
4446        let invalidate_ptr_packet = DnsPointer::new(
4447            my_service.get_type(),
4448            RRType::PTR,
4449            CLASS_IN,
4450            0,
4451            my_service.get_fullname().to_string(),
4452        );
4453
4454        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4455        packet_buffer.add_additional_answer(invalidate_ptr_packet);
4456
4457        for intf in intfs {
4458            let sock = _new_socket_bind(&intf, true).unwrap();
4459            send_dns_outgoing_impl(
4460                &packet_buffer,
4461                &intf.name,
4462                intf.index.unwrap_or(0),
4463                &intf.addr,
4464                &sock.pktinfo,
4465            );
4466        }
4467
4468        println!(
4469            "Sent PTR record invalidation. Starting second browse for {}",
4470            service
4471        );
4472
4473        // Restart the browse to force the sender to re-send the announcements.
4474        let browse_chan = d.browse(service).unwrap();
4475
4476        resolved = false;
4477        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4478            match event {
4479                ServiceEvent::ServiceResolved(info) => {
4480                    resolved = true;
4481                    println!("Resolved a service of {}", &info.fullname);
4482                    break;
4483                }
4484                e => {
4485                    println!("Received event {:?}", e);
4486                }
4487            }
4488        }
4489
4490        assert!(resolved);
4491        d.shutdown().unwrap();
4492    }
4493
4494    #[test]
4495    fn test_expired_srv() {
4496        // construct service info
4497        let service_type = "_expired-srv._udp.local.";
4498        let instance = "test_instance";
4499        let host_name = "expired_srv_host.local.";
4500        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4501            .unwrap()
4502            .enable_addr_auto();
4503        // let fullname = my_service.get_fullname().to_string();
4504
4505        // set SRV to expire soon.
4506        let new_ttl = 3; // for testing only.
4507        my_service._set_host_ttl(new_ttl);
4508
4509        // register my service
4510        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4511        let result = mdns_server.register(my_service);
4512        assert!(result.is_ok());
4513
4514        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4515        let browse_chan = mdns_client.browse(service_type).unwrap();
4516        let timeout = Duration::from_secs(2);
4517        let mut resolved = false;
4518
4519        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4520            if let ServiceEvent::ServiceResolved(info) = event {
4521                resolved = true;
4522                println!("Resolved a service of {}", &info.fullname);
4523                break;
4524            }
4525        }
4526
4527        assert!(resolved);
4528
4529        // Exit the server so that no more responses.
4530        mdns_server.shutdown().unwrap();
4531
4532        // SRV record in the client cache will expire.
4533        let expire_timeout = Duration::from_secs(new_ttl as u64);
4534        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4535            if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4536                println!("Service removed: {}: {}", &service_type, &full_name);
4537                break;
4538            }
4539        }
4540    }
4541
4542    #[test]
4543    fn test_hostname_resolution_address_removed() {
4544        // Create a mDNS server
4545        let server = ServiceDaemon::new().expect("Failed to create server");
4546        let hostname = "addr_remove_host._tcp.local.";
4547        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4548            .iter()
4549            .find(|iface| iface.ip().is_ipv4())
4550            .map(|iface| iface.ip().into())
4551            .unwrap();
4552
4553        let mut my_service = ServiceInfo::new(
4554            "_host_res_test._tcp.local.",
4555            "my_instance",
4556            hostname,
4557            service_ip_addr.to_ip_addr(),
4558            1234,
4559            None,
4560        )
4561        .expect("invalid service info");
4562
4563        // Set a short TTL for addresses for testing.
4564        let addr_ttl = 2;
4565        my_service._set_host_ttl(addr_ttl); // Expire soon
4566
4567        server.register(my_service).unwrap();
4568
4569        // Create a mDNS client for resolving the hostname.
4570        let client = ServiceDaemon::new().expect("Failed to create client");
4571        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4572        let resolved = loop {
4573            match event_receiver.recv() {
4574                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4575                    assert!(found_hostname == hostname);
4576                    assert!(addresses.contains(&service_ip_addr));
4577                    println!("address found: {:?}", &addresses);
4578                    break true;
4579                }
4580                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4581                Ok(_event) => {}
4582                Err(_) => break false,
4583            }
4584        };
4585
4586        assert!(resolved);
4587
4588        // Shutdown the server so no more responses / refreshes for addresses.
4589        server.shutdown().unwrap();
4590
4591        // Wait till hostname address record expires, with 1 second grace period.
4592        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4593        let removed = loop {
4594            match event_receiver.recv_timeout(timeout) {
4595                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4596                    assert!(removed_host == hostname);
4597                    assert!(addresses.contains(&service_ip_addr));
4598
4599                    println!(
4600                        "address removed: hostname: {} addresses: {:?}",
4601                        &hostname, &addresses
4602                    );
4603                    break true;
4604                }
4605                Ok(_event) => {}
4606                Err(_) => {
4607                    break false;
4608                }
4609            }
4610        };
4611
4612        assert!(removed);
4613
4614        client.shutdown().unwrap();
4615    }
4616
4617    #[test]
4618    fn test_refresh_ptr() {
4619        // construct service info
4620        let service_type = "_refresh-ptr._udp.local.";
4621        let instance = "test_instance";
4622        let host_name = "refresh_ptr_host.local.";
4623        let service_ip_addr = my_ip_interfaces(false)
4624            .iter()
4625            .find(|iface| iface.ip().is_ipv4())
4626            .map(|iface| iface.ip())
4627            .unwrap();
4628
4629        let mut my_service = ServiceInfo::new(
4630            service_type,
4631            instance,
4632            host_name,
4633            service_ip_addr,
4634            5023,
4635            None,
4636        )
4637        .unwrap();
4638
4639        let new_ttl = 3; // for testing only.
4640        my_service._set_other_ttl(new_ttl);
4641
4642        // register my service
4643        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4644        let result = mdns_server.register(my_service);
4645        assert!(result.is_ok());
4646
4647        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4648        let browse_chan = mdns_client.browse(service_type).unwrap();
4649        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4650        let mut resolved = false;
4651
4652        // resolve the service first.
4653        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4654            if let ServiceEvent::ServiceResolved(info) = event {
4655                resolved = true;
4656                println!("Resolved a service of {}", &info.fullname);
4657                break;
4658            }
4659        }
4660
4661        assert!(resolved);
4662
4663        // wait over 80% of TTL, and refresh PTR should be sent out.
4664        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4665        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4666            println!("event: {:?}", &event);
4667        }
4668
4669        // verify refresh counter.
4670        let metrics_chan = mdns_client.get_metrics().unwrap();
4671        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4672        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4673        assert_eq!(ptr_refresh_counter, 1);
4674        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4675        assert_eq!(srvtxt_refresh_counter, 1);
4676
4677        // Exit the server so that no more responses.
4678        mdns_server.shutdown().unwrap();
4679        mdns_client.shutdown().unwrap();
4680    }
4681
4682    #[test]
4683    fn test_name_change() {
4684        assert_eq!(name_change("foo.local."), "foo (2).local.");
4685        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4686        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4687        assert_eq!(name_change("foo"), "foo (2)");
4688        assert_eq!(name_change("foo (2)"), "foo (3)");
4689        assert_eq!(name_change(""), " (2)");
4690
4691        // Additional edge cases
4692        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4693        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4694        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4695    }
4696
4697    #[test]
4698    fn test_hostname_change() {
4699        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4700        assert_eq!(hostname_change("foo"), "foo-2");
4701        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4702        assert_eq!(hostname_change("foo-9"), "foo-10");
4703        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4704    }
4705
4706    #[test]
4707    fn test_add_answer_txt_ttl() {
4708        // construct a simple service info
4709        let service_type = "_test_add_answer._udp.local.";
4710        let instance = "test_instance";
4711        let host_name = "add_answer_host.local.";
4712        let service_intf = my_ip_interfaces(false)
4713            .into_iter()
4714            .find(|iface| iface.ip().is_ipv4())
4715            .unwrap();
4716        let service_ip_addr = service_intf.ip();
4717        let my_service = ServiceInfo::new(
4718            service_type,
4719            instance,
4720            host_name,
4721            service_ip_addr,
4722            5023,
4723            None,
4724        )
4725        .unwrap();
4726
4727        // construct a DnsOutgoing message
4728        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4729
4730        // Construct a dummy DnsIncoming message
4731        let mut dummy_data = out.to_data_on_wire();
4732        let interface_id = InterfaceId::from(&service_intf);
4733        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4734
4735        // Add an answer of TXT type for the service.
4736        let if_addrs = vec![service_intf.ip()];
4737        add_answer_of_service(
4738            &mut out,
4739            &incoming,
4740            instance,
4741            &my_service,
4742            RRType::TXT,
4743            if_addrs,
4744        );
4745
4746        // Check if the answer was added correctly
4747        assert!(
4748            out.answers_count() > 0,
4749            "No answers added to the outgoing message"
4750        );
4751
4752        // Check if the first answer is of type TXT
4753        let answer = out._answers().first().unwrap();
4754        assert_eq!(answer.0.get_type(), RRType::TXT);
4755
4756        // Check TTL is set properly for the TXT record
4757        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4758    }
4759
4760    #[test]
4761    fn test_interface_flip() {
4762        // start a server
4763        let ty_domain = "_intf-flip._udp.local.";
4764        let host_name = "intf_flip.local.";
4765        let now = SystemTime::now()
4766            .duration_since(SystemTime::UNIX_EPOCH)
4767            .unwrap();
4768        let instance_name = now.as_micros().to_string(); // Create a unique name.
4769        let port = 5200;
4770
4771        // Get a single IPv4 address
4772        let (ip_addr1, intf_name) = my_ip_interfaces(false)
4773            .iter()
4774            .find(|iface| iface.ip().is_ipv4())
4775            .map(|iface| (iface.ip(), iface.name.clone()))
4776            .unwrap();
4777
4778        println!("Using interface {} with IP {}", intf_name, ip_addr1);
4779
4780        // Register the service.
4781        let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
4782            .expect("valid service info");
4783        let server1 = ServiceDaemon::new().expect("failed to start server");
4784        server1
4785            .register(service1)
4786            .expect("Failed to register service1");
4787
4788        // wait for the service announced.
4789        std::thread::sleep(Duration::from_secs(2));
4790
4791        // start a client
4792        let client = ServiceDaemon::new().expect("failed to start client");
4793
4794        let receiver = client.browse(ty_domain).unwrap();
4795
4796        let timeout = Duration::from_secs(3);
4797        let mut got_data = false;
4798
4799        while let Ok(event) = receiver.recv_timeout(timeout) {
4800            if let ServiceEvent::ServiceResolved(_) = event {
4801                println!("Received ServiceResolved event");
4802                got_data = true;
4803                break;
4804            }
4805        }
4806
4807        assert!(got_data, "Should receive ServiceResolved event");
4808
4809        // Set a short IP check interval to detect interface changes quickly.
4810        client.set_ip_check_interval(1).unwrap();
4811
4812        // Now shutdown the interface and expect the client to lose the service.
4813        println!("Shutting down interface {}", &intf_name);
4814        client.test_down_interface(&intf_name).unwrap();
4815
4816        let mut got_removed = false;
4817
4818        while let Ok(event) = receiver.recv_timeout(timeout) {
4819            if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
4820                got_removed = true;
4821                println!("removed: {ty_domain} : {instance}");
4822                break;
4823            }
4824        }
4825        assert!(got_removed, "Should receive ServiceRemoved event");
4826
4827        println!("Bringing up interface {}", &intf_name);
4828        client.test_up_interface(&intf_name).unwrap();
4829        let mut got_data = false;
4830        while let Ok(event) = receiver.recv_timeout(timeout) {
4831            if let ServiceEvent::ServiceResolved(resolved) = event {
4832                got_data = true;
4833                println!("Received ServiceResolved: {:?}", resolved);
4834                break;
4835            }
4836        }
4837        assert!(
4838            got_data,
4839            "Should receive ServiceResolved event after interface is back up"
4840        );
4841
4842        server1.shutdown().unwrap();
4843        client.shutdown().unwrap();
4844    }
4845
4846    #[test]
4847    fn test_cache_only() {
4848        // construct service info
4849        let service_type = "_cache_only._udp.local.";
4850        let instance = "test_instance";
4851        let host_name = "cache_only_host.local.";
4852        let service_ip_addr = my_ip_interfaces(false)
4853            .iter()
4854            .find(|iface| iface.ip().is_ipv4())
4855            .map(|iface| iface.ip())
4856            .unwrap();
4857
4858        let mut my_service = ServiceInfo::new(
4859            service_type,
4860            instance,
4861            host_name,
4862            service_ip_addr,
4863            5023,
4864            None,
4865        )
4866        .unwrap();
4867
4868        let new_ttl = 3; // for testing only.
4869        my_service._set_other_ttl(new_ttl);
4870
4871        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4872
4873        // make a single browse request to record that we are interested in the service.  This ensures that
4874        // subsequent announcements are cached.
4875        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4876        std::thread::sleep(Duration::from_secs(2));
4877
4878        // register my service
4879        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4880        let result = mdns_server.register(my_service);
4881        assert!(result.is_ok());
4882
4883        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4884        let mut resolved = false;
4885
4886        // resolve the service.
4887        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4888            if let ServiceEvent::ServiceResolved(info) = event {
4889                resolved = true;
4890                println!("Resolved a service of {}", &info.get_fullname());
4891                break;
4892            }
4893        }
4894
4895        assert!(resolved);
4896
4897        // Exit the server so that no more responses.
4898        mdns_server.shutdown().unwrap();
4899        mdns_client.shutdown().unwrap();
4900    }
4901
4902    #[test]
4903    fn test_cache_only_unsolicited() {
4904        // construct service info
4905        let service_type = "_cache_only._udp.local.";
4906        let instance = "test_instance";
4907        let host_name = "cache_only_host.local.";
4908        let service_ip_addr = my_ip_interfaces(false)
4909            .iter()
4910            .find(|iface| iface.ip().is_ipv4())
4911            .map(|iface| iface.ip())
4912            .unwrap();
4913
4914        let mut my_service = ServiceInfo::new(
4915            service_type,
4916            instance,
4917            host_name,
4918            service_ip_addr,
4919            5023,
4920            None,
4921        )
4922        .unwrap();
4923
4924        let new_ttl = 3; // for testing only.
4925        my_service._set_other_ttl(new_ttl);
4926
4927        // register my service
4928        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4929        let result = mdns_server.register(my_service);
4930        assert!(result.is_ok());
4931
4932        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4933        mdns_client.accept_unsolicited(true).unwrap();
4934
4935        // Wait a bit for the service announcements to go out, before calling browse_cache.  This ensures
4936        // that the announcements are treated as unsolicited
4937        std::thread::sleep(Duration::from_secs(2));
4938        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4939        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4940        let mut resolved = false;
4941
4942        // resolve the service.
4943        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4944            if let ServiceEvent::ServiceResolved(info) = event {
4945                resolved = true;
4946                println!("Resolved a service of {}", &info.get_fullname());
4947                break;
4948            }
4949        }
4950
4951        assert!(resolved);
4952
4953        // Exit the server so that no more responses.
4954        mdns_server.shutdown().unwrap();
4955        mdns_client.shutdown().unwrap();
4956    }
4957}