mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42    Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50    cmp::{self, Reverse},
51    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52    fmt, io,
53    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54    str, thread,
55    time::Duration,
56    vec,
57};
58
59/// The default max length of the service name without domain, not including the
60/// leading underscore (`_`). It is set to 15 per
61/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
62pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64/// The default interval for checking IP changes automatically.
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
68/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
69pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78/// Response status code for the service `unregister` call.
79#[derive(Debug)]
80pub enum UnregisterStatus {
81    /// Unregister was successful.
82    OK,
83    /// The service was not found in the registration.
84    NotFound,
85}
86
87/// Status code for the service daemon.
88#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91    /// The daemon is running as normal.
92    Running,
93
94    /// The daemon has been shutdown.
95    Shutdown,
96}
97
98/// Different counters included in the metrics.
99/// Currently all counters are for outgoing packets.
100#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102    Register,
103    RegisterResend,
104    Unregister,
105    UnregisterResend,
106    Browse,
107    ResolveHostname,
108    Respond,
109    CacheRefreshPTR,
110    CacheRefreshSrvTxt,
111    CacheRefreshAddr,
112    KnownAnswerSuppression,
113    CachedPTR,
114    CachedSRV,
115    CachedAddr,
116    CachedTxt,
117    CachedNSec,
118    CachedSubtype,
119    DnsRegistryProbe,
120    DnsRegistryActive,
121    DnsRegistryTimer,
122    DnsRegistryNameChange,
123    Timer,
124}
125
126impl fmt::Display for Counter {
127    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128        match self {
129            Self::Register => write!(f, "register"),
130            Self::RegisterResend => write!(f, "register-resend"),
131            Self::Unregister => write!(f, "unregister"),
132            Self::UnregisterResend => write!(f, "unregister-resend"),
133            Self::Browse => write!(f, "browse"),
134            Self::ResolveHostname => write!(f, "resolve-hostname"),
135            Self::Respond => write!(f, "respond"),
136            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140            Self::CachedPTR => write!(f, "cached-ptr"),
141            Self::CachedSRV => write!(f, "cached-srv"),
142            Self::CachedAddr => write!(f, "cached-addr"),
143            Self::CachedTxt => write!(f, "cached-txt"),
144            Self::CachedNSec => write!(f, "cached-nsec"),
145            Self::CachedSubtype => write!(f, "cached-subtype"),
146            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150            Self::Timer => write!(f, "timer"),
151        }
152    }
153}
154
155/// A wrapper around UDP socket used by the mDNS daemon.
156///
157/// We do this because `mio` does not support PKTINFO and
158/// does not provide a way to implement `Source` trait directly and safely.
159struct MyUdpSocket {
160    /// The underlying socket that supports control messages like
161    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
162    pktinfo: PktInfoUdpSocket,
163
164    /// The mio UDP socket that is a clone of `pktinfo` and
165    /// is used for event polling.
166    mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171        let std_sock = pktinfo.try_clone_std()?;
172        let mio = MioUdpSocket::from_std(std_sock);
173
174        Ok(Self { pktinfo, mio })
175    }
176}
177
178/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
179impl Source for MyUdpSocket {
180    fn register(
181        &mut self,
182        registry: &Registry,
183        token: Token,
184        interests: Interest,
185    ) -> io::Result<()> {
186        self.mio.register(registry, token, interests)
187    }
188
189    fn reregister(
190        &mut self,
191        registry: &Registry,
192        token: Token,
193        interests: Interest,
194    ) -> io::Result<()> {
195        self.mio.reregister(registry, token, interests)
196    }
197
198    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199        self.mio.deregister(registry)
200    }
201}
202
203/// The metrics is a HashMap of (name_key, i64_value).
204/// The main purpose is to help monitoring the mDNS packet traffic.
205pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
208const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
209const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
210
211/// A daemon thread for mDNS
212///
213/// This struct provides a handle and an API to the daemon. It is cloneable.
214#[derive(Clone)]
215pub struct ServiceDaemon {
216    /// Sender handle of the channel to the daemon.
217    sender: Sender<Command>,
218
219    /// Send to this addr to signal that a `Command` is coming.
220    ///
221    /// The daemon listens on this addr together with other mDNS sockets,
222    /// to avoid busy polling the flume channel. If there is a way to poll
223    /// the channel and mDNS sockets together, then this can be removed.
224    signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228    /// Creates a new daemon and spawns a thread to run the daemon.
229    ///
230    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
231    /// ask callers to set the port.
232    pub fn new() -> Result<Self> {
233        // Use port 0 to allow the system assign a random available port,
234        // no need for a pre-defined port number.
235        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237        let signal_sock = UdpSocket::bind(signal_addr)
238            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240        // Get the socket with the OS chosen port
241        let signal_addr = signal_sock
242            .local_addr()
243            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245        // Must be nonblocking so we can listen to it together with mDNS sockets.
246        signal_sock
247            .set_nonblocking(true)
248            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252        let (sender, receiver) = bounded(100);
253
254        // Spawn the daemon thread
255        let mio_sock = MioUdpSocket::from_std(signal_sock);
256        thread::Builder::new()
257            .name("mDNS_daemon".to_string())
258            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261        Ok(Self {
262            sender,
263            signal_addr,
264        })
265    }
266
267    /// Sends `cmd` to the daemon via its channel, and sends a signal
268    /// to its sock addr to notify.
269    fn send_cmd(&self, cmd: Command) -> Result<()> {
270        let cmd_name = cmd.to_string();
271
272        // First, send to the flume channel.
273        self.sender.try_send(cmd).map_err(|e| match e {
274            TrySendError::Full(_) => Error::Again,
275            e => e_fmt!("flume::channel::send failed: {}", e),
276        })?;
277
278        // Second, send a signal to notify the daemon.
279        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280        let socket = UdpSocket::bind(addr)
281            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282        socket
283            .send_to(cmd_name.as_bytes(), self.signal_addr)
284            .map_err(|e| {
285                e_fmt!(
286                    "signal socket send_to {} ({}) failed: {}",
287                    self.signal_addr,
288                    cmd_name,
289                    e
290                )
291            })?;
292
293        Ok(())
294    }
295
296    /// Starts browsing for a specific service type.
297    ///
298    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
299    ///
300    /// Returns a channel `Receiver` to receive events about the service. The caller
301    /// can call `.recv_async().await` on this receiver to handle events in an
302    /// async environment or call `.recv()` in a sync environment.
303    ///
304    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
305    /// finding more details, i.e. SRV records and TXT records.
306    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307        check_domain_suffix(service_type)?;
308
309        let (resp_s, resp_r) = bounded(10);
310        self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
311        Ok(resp_r)
312    }
313
314    /// Preforms a "cache-only" browse.
315    ///
316    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
317    ///
318    /// The functionality is identical to 'browse', but the service events are based solely on the contents
319    /// of the daemon's cache. No actual mDNS query is sent to the network.
320    ///
321    /// See [accept_unsolicited](Self::accept_unsolicited) if you want to do cache-only browsing.
322    pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
323        check_domain_suffix(service_type)?;
324
325        let (resp_s, resp_r) = bounded(10);
326        self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
327        Ok(resp_r)
328    }
329
330    /// Stops searching for a specific service type.
331    ///
332    /// When an error is returned, the caller should retry only when
333    /// the error is `Error::Again`, otherwise should log and move on.
334    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
335        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
336    }
337
338    /// Starts querying for the ip addresses of a hostname.
339    ///
340    /// Returns a channel `Receiver` to receive events about the hostname.
341    /// The caller can call `.recv_async().await` on this receiver to handle events in an
342    /// async environment or call `.recv()` in a sync environment.
343    ///
344    /// The `timeout` is specified in milliseconds.
345    pub fn resolve_hostname(
346        &self,
347        hostname: &str,
348        timeout: Option<u64>,
349    ) -> Result<Receiver<HostnameResolutionEvent>> {
350        check_hostname(hostname)?;
351        let (resp_s, resp_r) = bounded(10);
352        self.send_cmd(Command::ResolveHostname(
353            hostname.to_string(),
354            1,
355            resp_s,
356            timeout,
357        ))?;
358        Ok(resp_r)
359    }
360
361    /// Stops querying for the ip addresses of a hostname.
362    ///
363    /// When an error is returned, the caller should retry only when
364    /// the error is `Error::Again`, otherwise should log and move on.
365    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
366        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
367    }
368
369    /// Registers a service provided by this host.
370    ///
371    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
372    /// this method will automatically fill in addresses from the host.
373    ///
374    /// To re-announce a service with an updated `service_info`, just call
375    /// this `register` function again. No need to call `unregister` first.
376    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
377        check_service_name(service_info.get_fullname())?;
378        check_hostname(service_info.get_hostname())?;
379
380        self.send_cmd(Command::Register(service_info))
381    }
382
383    /// Unregisters a service. This is a graceful shutdown of a service.
384    ///
385    /// Returns a channel receiver that is used to receive the status code
386    /// of the unregister.
387    ///
388    /// When an error is returned, the caller should retry only when
389    /// the error is `Error::Again`, otherwise should log and move on.
390    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
391        let (resp_s, resp_r) = bounded(1);
392        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
393        Ok(resp_r)
394    }
395
396    /// Starts to monitor events from the daemon.
397    ///
398    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
399    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
400        let (resp_s, resp_r) = bounded(100);
401        self.send_cmd(Command::Monitor(resp_s))?;
402        Ok(resp_r)
403    }
404
405    /// Shuts down the daemon thread and returns a channel to receive the status.
406    ///
407    /// When an error is returned, the caller should retry only when
408    /// the error is `Error::Again`, otherwise should log and move on.
409    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
410        let (resp_s, resp_r) = bounded(1);
411        self.send_cmd(Command::Exit(resp_s))?;
412        Ok(resp_r)
413    }
414
415    /// Returns the status of the daemon.
416    ///
417    /// When an error is returned, the caller should retry only when
418    /// the error is `Error::Again`, otherwise should consider the daemon
419    /// stopped working and move on.
420    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
421        let (resp_s, resp_r) = bounded(1);
422
423        if self.sender.is_disconnected() {
424            resp_s
425                .send(DaemonStatus::Shutdown)
426                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
427        } else {
428            self.send_cmd(Command::GetStatus(resp_s))?;
429        }
430
431        Ok(resp_r)
432    }
433
434    /// Returns a channel receiver for the metrics, e.g. input/output counters.
435    ///
436    /// The metrics returned is a snapshot. Hence the caller should call
437    /// this method repeatedly if they want to monitor the metrics continuously.
438    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
439        let (resp_s, resp_r) = bounded(1);
440        self.send_cmd(Command::GetMetrics(resp_s))?;
441        Ok(resp_r)
442    }
443
444    /// Change the max length allowed for a service name.
445    ///
446    /// As RFC 6763 defines a length max for a service name, a user should not call
447    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
448    ///
449    /// `len_max` is capped at an internal limit, which is currently 30.
450    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
451        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
452
453        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
454            return Err(Error::Msg(format!(
455                "service name length max {len_max} is too large"
456            )));
457        }
458
459        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
460    }
461
462    /// Change the interval for checking IP changes automatically.
463    ///
464    /// Setting the interval to 0 disables the IP check.
465    ///
466    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
467    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
468        let interval_in_millis = interval_in_secs as u64 * 1000;
469        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
470            interval_in_millis,
471        )))
472    }
473
474    /// Get the current interval in seconds for checking IP changes automatically.
475    pub fn get_ip_check_interval(&self) -> Result<u32> {
476        let (resp_s, resp_r) = bounded(1);
477        self.send_cmd(Command::GetOption(resp_s))?;
478
479        let option = resp_r
480            .recv_timeout(Duration::from_secs(10))
481            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
482        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
483        Ok(ip_check_interval_in_secs as u32)
484    }
485
486    /// Include interfaces that match `if_kind` for this service daemon.
487    ///
488    /// For example:
489    /// ```ignore
490    ///     daemon.enable_interface("en0")?;
491    /// ```
492    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
493        let if_kind_vec = if_kind.into_vec();
494        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
495            if_kind_vec.kinds,
496        )))
497    }
498
499    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
500    ///
501    /// For example:
502    /// ```ignore
503    ///     daemon.disable_interface(IfKind::IPv6)?;
504    /// ```
505    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
506        let if_kind_vec = if_kind.into_vec();
507        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
508            if_kind_vec.kinds,
509        )))
510    }
511
512    /// If `accept` is true, accept and cache all responses, even if there is no active querier
513    /// for a given service type. This is useful / necessary when doing cache-only browsing. See
514    /// [browse_cache](Self::browse_cache).
515    ///
516    /// If `accept` is false (default), accept only responses matching queries that we have initiated.
517    ///
518    /// For example:
519    /// ```ignore
520    ///     daemon.accept_unsolicited(true)?;
521    /// ```
522    pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
523        self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
524    }
525
526    #[cfg(test)]
527    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
528        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
529            ifname.to_string(),
530        )))
531    }
532
533    #[cfg(test)]
534    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
535        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
536            ifname.to_string(),
537        )))
538    }
539
540    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
541    ///
542    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
543    /// receive announcements from a responder on the same host.
544    ///
545    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
546    ///
547    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
548    /// the UNIX version of the IP_MULTICAST_LOOP option:
549    ///
550    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
551    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
552    ///
553    /// Which means, in order NOT to receive localhost announcements, you want to call
554    /// this API on the querier side on Windows, but on the responder side on Unix.
555    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
556        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
557    }
558
559    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
560    ///
561    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
562    /// receive announcements from a responder on the same host.
563    ///
564    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
565    ///
566    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
567    /// the UNIX version of the IP_MULTICAST_LOOP option:
568    ///
569    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
570    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
571    ///
572    /// Which means, in order NOT to receive localhost announcements, you want to call
573    /// this API on the querier side on Windows, but on the responder side on Unix.
574    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
575        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
576    }
577
578    /// Proactively confirms whether a service instance still valid.
579    ///
580    /// This call will issue queries for a service instance's SRV record and Address records.
581    ///
582    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
583    /// unless there is a reason not to follow RFC.
584    ///
585    /// If no response is received within `timeout`, the current resource
586    /// records will be flushed, and if needed, `ServiceRemoved` event will be
587    /// sent to active queriers.
588    ///
589    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
590    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
591        self.send_cmd(Command::Verify(instance_fullname, timeout))
592    }
593
594    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
595        let mut zc = Zeroconf::new(signal_sock, poller);
596
597        if let Some(cmd) = zc.run(receiver) {
598            match cmd {
599                Command::Exit(resp_s) => {
600                    // It is guaranteed that the receiver already dropped,
601                    // i.e. the daemon command channel closed.
602                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
603                        debug!("exit: failed to send response of shutdown: {}", e);
604                    }
605                }
606                _ => {
607                    debug!("Unexpected command: {:?}", cmd);
608                }
609            }
610        }
611    }
612}
613
614/// Creates a new UDP socket that uses `intf` to send and recv multicast.
615fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
616    // Use the same socket for receiving and sending multicast packets.
617    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
618    let intf_ip = &intf.ip();
619    match intf_ip {
620        IpAddr::V4(ip) => {
621            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
622            let sock = new_socket(addr.into(), true)?;
623
624            // Join mDNS group to receive packets.
625            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
626                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
627
628            // Set IP_MULTICAST_IF to send packets.
629            sock.set_multicast_if_v4(ip)
630                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
631
632            // Per RFC 6762 section 11:
633            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
634            // be sent with IP TTL set to 255."
635            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
636            sock.set_multicast_ttl_v4(255)
637                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
638
639            if !should_loop {
640                sock.set_multicast_loop_v4(false)
641                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
642            }
643
644            // Test if we can send packets successfully.
645            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
646            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
647            for packet in test_packets {
648                sock.send_to(&packet, &multicast_addr)
649                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
650            }
651            MyUdpSocket::new(sock)
652                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
653        }
654        IpAddr::V6(ip) => {
655            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
656            let sock = new_socket(addr.into(), true)?;
657
658            let if_index = intf.index.unwrap_or(0);
659
660            // Join mDNS group to receive packets.
661            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
662                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
663
664            // Set IPV6_MULTICAST_IF to send packets.
665            sock.set_multicast_if_v6(if_index)
666                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
667
668            // We are not sending multicast packets to test this socket as there might
669            // be many IPv6 interfaces on a host and could cause such send error:
670            // "No buffer space available (os error 55)".
671
672            MyUdpSocket::new(sock)
673                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
674        }
675    }
676}
677
678/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
679/// `non_block` indicates whether to set O_NONBLOCK for the socket.
680fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
681    let domain = match addr {
682        SocketAddr::V4(_) => socket2::Domain::IPV4,
683        SocketAddr::V6(_) => socket2::Domain::IPV6,
684    };
685
686    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
687
688    fd.set_reuse_address(true)
689        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
690    #[cfg(unix)] // this is currently restricted to Unix's in socket2
691    fd.set_reuse_port(true)
692        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
693
694    if non_block {
695        fd.set_nonblocking(true)
696            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
697    }
698
699    fd.bind(&addr.into())
700        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
701
702    trace!("new socket bind to {}", &addr);
703    Ok(fd)
704}
705
706/// Specify a UNIX timestamp in millis to run `command` for the next time.
707struct ReRun {
708    /// UNIX timestamp in millis.
709    next_time: u64,
710    command: Command,
711}
712
713/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
714///
715/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
716#[derive(Debug, Clone)]
717#[non_exhaustive]
718pub enum IfKind {
719    /// All interfaces.
720    All,
721
722    /// All IPv4 interfaces.
723    IPv4,
724
725    /// All IPv6 interfaces.
726    IPv6,
727
728    /// By the interface name, for example "en0"
729    Name(String),
730
731    /// By an IPv4 or IPv6 address.
732    Addr(IpAddr),
733
734    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
735    ///
736    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
737    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
738    LoopbackV4,
739
740    /// ::1/128, disabled by default.
741    LoopbackV6,
742}
743
744impl IfKind {
745    /// Checks if `intf` matches with this interface kind.
746    fn matches(&self, intf: &Interface) -> bool {
747        match self {
748            Self::All => true,
749            Self::IPv4 => intf.ip().is_ipv4(),
750            Self::IPv6 => intf.ip().is_ipv6(),
751            Self::Name(ifname) => ifname == &intf.name,
752            Self::Addr(addr) => addr == &intf.ip(),
753            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
754            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
755        }
756    }
757}
758
759/// The first use case of specifying an interface was to
760/// use an interface name. Hence adding this for ergonomic reasons.
761impl From<&str> for IfKind {
762    fn from(val: &str) -> Self {
763        Self::Name(val.to_string())
764    }
765}
766
767impl From<&String> for IfKind {
768    fn from(val: &String) -> Self {
769        Self::Name(val.to_string())
770    }
771}
772
773/// Still for ergonomic reasons.
774impl From<IpAddr> for IfKind {
775    fn from(val: IpAddr) -> Self {
776        Self::Addr(val)
777    }
778}
779
780/// A list of `IfKind` that can be used to match interfaces.
781pub struct IfKindVec {
782    kinds: Vec<IfKind>,
783}
784
785/// A trait that converts a type into a Vec of `IfKind`.
786pub trait IntoIfKindVec {
787    fn into_vec(self) -> IfKindVec;
788}
789
790impl<T: Into<IfKind>> IntoIfKindVec for T {
791    fn into_vec(self) -> IfKindVec {
792        let if_kind: IfKind = self.into();
793        IfKindVec {
794            kinds: vec![if_kind],
795        }
796    }
797}
798
799impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
800    fn into_vec(self) -> IfKindVec {
801        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
802        IfKindVec { kinds }
803    }
804}
805
806/// Selection of interfaces.
807struct IfSelection {
808    /// The interfaces to be selected.
809    if_kind: IfKind,
810
811    /// Whether the `if_kind` should be enabled or not.
812    selected: bool,
813}
814
815/// A struct holding the state. It was inspired by `zeroconf` package in Python.
816struct Zeroconf {
817    /// Local interfaces keyed by interface index.
818    my_intfs: HashMap<u32, MyIntf>,
819
820    /// A common socket for IPv4 interfaces. It's None if IPv4 is disabled in OS kernel.
821    ipv4_sock: Option<MyUdpSocket>,
822
823    /// A common socket for IPv6 interfaces. It's None if IPv6 is disabled in OS kernel.
824    ipv6_sock: Option<MyUdpSocket>,
825
826    /// Local registered services, keyed by service full names.
827    my_services: HashMap<String, ServiceInfo>,
828
829    /// Received DNS records.
830    cache: DnsCache,
831
832    /// Registered service records, keyed by interface index.
833    dns_registry_map: HashMap<u32, DnsRegistry>,
834
835    /// Active "Browse" commands.
836    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
837
838    /// Active "ResolveHostname" commands.
839    ///
840    /// The timestamps are set at the future timestamp when the command should timeout.
841    /// `hostname` is case-insensitive and stored in lowercase.
842    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
843
844    /// All repeating transmissions.
845    retransmissions: Vec<ReRun>,
846
847    counters: Metrics,
848
849    /// Waits for incoming packets.
850    poller: Poll,
851
852    /// Channels to notify events.
853    monitors: Vec<Sender<DaemonEvent>>,
854
855    /// Options
856    service_name_len_max: u8,
857
858    /// Interval in millis to check IP address changes.
859    ip_check_interval: u64,
860
861    /// All interface selections called to the daemon.
862    if_selections: Vec<IfSelection>,
863
864    /// Socket for signaling.
865    signal_sock: MioUdpSocket,
866
867    /// Timestamps marking where we need another iteration of the run loop,
868    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
869    ///
870    /// When the run loop goes through a single iteration, it will
871    /// set its timeout to the earliest timer in this list.
872    timers: BinaryHeap<Reverse<u64>>,
873
874    status: DaemonStatus,
875
876    /// Service instances that are pending for resolving SRV and TXT.
877    pending_resolves: HashSet<String>,
878
879    /// Service instances that are already resolved.
880    resolved: HashSet<String>,
881
882    multicast_loop_v4: bool,
883
884    multicast_loop_v6: bool,
885
886    accept_unsolicited: bool,
887
888    #[cfg(test)]
889    test_down_interfaces: HashSet<String>,
890}
891
892/// Join the multicast group for the given interface.
893fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
894    let intf_ip = &intf.ip();
895    match intf_ip {
896        IpAddr::V4(ip) => {
897            // Join mDNS group to receive packets.
898            debug!("join multicast group V4 on {} addr {ip}", intf.name);
899            my_sock
900                .join_multicast_v4(&GROUP_ADDR_V4, ip)
901                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
902        }
903        IpAddr::V6(ip) => {
904            let if_index = intf.index.unwrap_or(0);
905            // Join mDNS group to receive packets.
906            debug!(
907                "join multicast group V6 on {} addr {ip} with index {if_index}",
908                intf.name
909            );
910            my_sock
911                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
912                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
913        }
914    }
915    Ok(())
916}
917
918impl Zeroconf {
919    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
920        // Get interfaces.
921        let my_ifaddrs = my_ip_interfaces(false);
922
923        // Create a socket for every IP addr.
924        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
925        // or the same interface name with different IP addrs.
926        let mut my_intfs = HashMap::new();
927        let mut dns_registry_map = HashMap::new();
928
929        // Use the same socket for receiving and sending multicast packets.
930        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
931        let mut ipv4_sock = None;
932        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
933        match new_socket(addr.into(), true) {
934            Ok(sock) => {
935                // Per RFC 6762 section 11:
936                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
937                // be sent with IP TTL set to 255."
938                // Here we set the TTL to 255 for multicast as we don't support unicast yet.
939                sock.set_multicast_ttl_v4(255)
940                    .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
941                    .ok();
942
943                // This clones a socket.
944                ipv4_sock = match MyUdpSocket::new(sock) {
945                    Ok(s) => Some(s),
946                    Err(e) => {
947                        debug!("failed to create IPv4 MyUdpSocket: {e}");
948                        None
949                    }
950                };
951            }
952            // Per RFC 6762 section 11:}
953            Err(e) => debug!("failed to create IPv4 socket: {e}"),
954        }
955
956        let mut ipv6_sock = None;
957        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
958        match new_socket(addr.into(), true) {
959            Ok(sock) => {
960                // Per RFC 6762 section 11:
961                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
962                // be sent with IP TTL set to 255."
963                sock.set_multicast_hops_v6(255)
964                    .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
965                    .ok();
966
967                // This clones the ipv6 socket.
968                ipv6_sock = match MyUdpSocket::new(sock) {
969                    Ok(s) => Some(s),
970                    Err(e) => {
971                        debug!("failed to create IPv6 MyUdpSocket: {e}");
972                        None
973                    }
974                };
975            }
976            Err(e) => debug!("failed to create IPv6 socket: {e}"),
977        }
978
979        // Configure sockets to join multicast groups.
980        for intf in my_ifaddrs {
981            let sock_opt = if intf.ip().is_ipv4() {
982                &ipv4_sock
983            } else {
984                &ipv6_sock
985            };
986            let Some(sock) = sock_opt else {
987                debug!(
988                    "no socket available for interface {} with addr {}. Skipped.",
989                    intf.name,
990                    intf.ip()
991                );
992                continue;
993            };
994
995            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
996                debug!(
997                    "config socket to join multicast: {}: {e}. Skipped.",
998                    &intf.ip()
999                );
1000            }
1001
1002            let if_index = intf.index.unwrap_or(0);
1003
1004            // Add this interface address if not already present.
1005            dns_registry_map
1006                .entry(if_index)
1007                .or_insert_with(DnsRegistry::new);
1008
1009            my_intfs
1010                .entry(if_index)
1011                .and_modify(|v: &mut MyIntf| {
1012                    v.addrs.insert(intf.addr.clone());
1013                })
1014                .or_insert(MyIntf {
1015                    name: intf.name.clone(),
1016                    index: if_index,
1017                    addrs: HashSet::from([intf.addr]),
1018                });
1019        }
1020
1021        let monitors = Vec::new();
1022        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1023        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1024
1025        let timers = BinaryHeap::new();
1026
1027        // Disable loopback by default.
1028        let if_selections = vec![
1029            IfSelection {
1030                if_kind: IfKind::LoopbackV4,
1031                selected: false,
1032            },
1033            IfSelection {
1034                if_kind: IfKind::LoopbackV6,
1035                selected: false,
1036            },
1037        ];
1038
1039        let status = DaemonStatus::Running;
1040
1041        Self {
1042            my_intfs,
1043            ipv4_sock,
1044            ipv6_sock,
1045            // poll_ids: HashMap::new(),
1046            my_services: HashMap::new(),
1047            cache: DnsCache::new(),
1048            dns_registry_map,
1049            hostname_resolvers: HashMap::new(),
1050            service_queriers: HashMap::new(),
1051            retransmissions: Vec::new(),
1052            counters: HashMap::new(),
1053            poller,
1054            monitors,
1055            service_name_len_max,
1056            ip_check_interval,
1057            if_selections,
1058            signal_sock,
1059            timers,
1060            status,
1061            pending_resolves: HashSet::new(),
1062            resolved: HashSet::new(),
1063            multicast_loop_v4: true,
1064            multicast_loop_v6: true,
1065            accept_unsolicited: false,
1066
1067            #[cfg(test)]
1068            test_down_interfaces: HashSet::new(),
1069        }
1070    }
1071
1072    /// The main event loop of the daemon thread
1073    ///
1074    /// In each round, it will:
1075    /// 1. select the listening sockets with a timeout.
1076    /// 2. process the incoming packets if any.
1077    /// 3. try_recv on its channel and execute commands.
1078    /// 4. announce its registered services.
1079    /// 5. process retransmissions if any.
1080    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1081        // Add the daemon's signal socket to the poller.
1082        if let Err(e) = self.poller.registry().register(
1083            &mut self.signal_sock,
1084            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1085            mio::Interest::READABLE,
1086        ) {
1087            debug!("failed to add signal socket to the poller: {}", e);
1088            return None;
1089        }
1090
1091        if let Some(sock) = self.ipv4_sock.as_mut() {
1092            if let Err(e) = self.poller.registry().register(
1093                sock,
1094                mio::Token(IPV4_SOCK_EVENT_KEY),
1095                mio::Interest::READABLE,
1096            ) {
1097                debug!("failed to register ipv4 socket: {}", e);
1098                return None;
1099            }
1100        }
1101
1102        if let Some(sock) = self.ipv6_sock.as_mut() {
1103            if let Err(e) = self.poller.registry().register(
1104                sock,
1105                mio::Token(IPV6_SOCK_EVENT_KEY),
1106                mio::Interest::READABLE,
1107            ) {
1108                debug!("failed to register ipv6 socket: {}", e);
1109                return None;
1110            }
1111        }
1112
1113        // Setup timer for IP checks.
1114        let mut next_ip_check = if self.ip_check_interval > 0 {
1115            current_time_millis() + self.ip_check_interval
1116        } else {
1117            0
1118        };
1119
1120        if next_ip_check > 0 {
1121            self.add_timer(next_ip_check);
1122        }
1123
1124        // Start the run loop.
1125
1126        let mut events = mio::Events::with_capacity(1024);
1127        loop {
1128            let now = current_time_millis();
1129
1130            let earliest_timer = self.peek_earliest_timer();
1131            let timeout = earliest_timer.map(|timer| {
1132                // If `timer` already passed, set `timeout` to be 1ms.
1133                let millis = if timer > now { timer - now } else { 1 };
1134                Duration::from_millis(millis)
1135            });
1136
1137            // Process incoming packets, command events and optional timeout.
1138            events.clear();
1139            match self.poller.poll(&mut events, timeout) {
1140                Ok(_) => self.handle_poller_events(&events),
1141                Err(e) => debug!("failed to select from sockets: {}", e),
1142            }
1143
1144            let now = current_time_millis();
1145
1146            // Remove the timers if already passed.
1147            self.pop_timers_till(now);
1148
1149            // Remove hostname resolvers with expired timeouts.
1150            for hostname in self
1151                .hostname_resolvers
1152                .clone()
1153                .into_iter()
1154                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1155                .map(|(hostname, _)| hostname)
1156            {
1157                trace!("hostname resolver timeout for {}", &hostname);
1158                call_hostname_resolution_listener(
1159                    &self.hostname_resolvers,
1160                    &hostname,
1161                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1162                );
1163                call_hostname_resolution_listener(
1164                    &self.hostname_resolvers,
1165                    &hostname,
1166                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1167                );
1168                self.hostname_resolvers.remove(&hostname);
1169            }
1170
1171            // process commands from the command channel
1172            while let Ok(command) = receiver.try_recv() {
1173                if matches!(command, Command::Exit(_)) {
1174                    self.status = DaemonStatus::Shutdown;
1175                    return Some(command);
1176                }
1177                self.exec_command(command, false);
1178            }
1179
1180            // check for repeated commands and run them if their time is up.
1181            let mut i = 0;
1182            while i < self.retransmissions.len() {
1183                if now >= self.retransmissions[i].next_time {
1184                    let rerun = self.retransmissions.remove(i);
1185                    self.exec_command(rerun.command, true);
1186                } else {
1187                    i += 1;
1188                }
1189            }
1190
1191            // Refresh cached service records with active queriers
1192            self.refresh_active_services();
1193
1194            // Refresh cached A/AAAA records with active queriers
1195            let mut query_count = 0;
1196            for (hostname, _sender) in self.hostname_resolvers.iter() {
1197                for (hostname, ip_addr) in
1198                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1199                {
1200                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1201                    query_count += 1;
1202                }
1203            }
1204
1205            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1206
1207            // check and evict expired records in our cache
1208            let now = current_time_millis();
1209
1210            // Notify service listeners about the expired records.
1211            let expired_services = self.cache.evict_expired_services(now);
1212            if !expired_services.is_empty() {
1213                debug!(
1214                    "run: send {} service removal to listeners",
1215                    expired_services.len()
1216                );
1217                self.notify_service_removal(expired_services);
1218            }
1219
1220            // Notify hostname listeners about the expired records.
1221            let expired_addrs = self.cache.evict_expired_addr(now);
1222            for (hostname, addrs) in expired_addrs {
1223                call_hostname_resolution_listener(
1224                    &self.hostname_resolvers,
1225                    &hostname,
1226                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1227                );
1228                let instances = self.cache.get_instances_on_host(&hostname);
1229                let instance_set: HashSet<String> = instances.into_iter().collect();
1230                self.resolve_updated_instances(&instance_set);
1231            }
1232
1233            // Send out probing queries.
1234            self.probing_handler();
1235
1236            // check IP changes if next_ip_check is reached.
1237            if now >= next_ip_check && next_ip_check > 0 {
1238                next_ip_check = now + self.ip_check_interval;
1239                self.add_timer(next_ip_check);
1240
1241                self.check_ip_changes();
1242            }
1243        }
1244    }
1245
1246    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1247        match daemon_opt {
1248            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1249            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1250            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1251            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1252            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1253            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1254            DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1255            #[cfg(test)]
1256            DaemonOption::TestDownInterface(ifname) => {
1257                self.test_down_interfaces.insert(ifname);
1258            }
1259            #[cfg(test)]
1260            DaemonOption::TestUpInterface(ifname) => {
1261                self.test_down_interfaces.remove(&ifname);
1262            }
1263        }
1264    }
1265
1266    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1267        debug!("enable_interface: {:?}", kinds);
1268        for if_kind in kinds {
1269            self.if_selections.push(IfSelection {
1270                if_kind,
1271                selected: true,
1272            });
1273        }
1274
1275        self.apply_intf_selections(my_ip_interfaces(true));
1276    }
1277
1278    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1279        debug!("disable_interface: {:?}", kinds);
1280        for if_kind in kinds {
1281            self.if_selections.push(IfSelection {
1282                if_kind,
1283                selected: false,
1284            });
1285        }
1286
1287        self.apply_intf_selections(my_ip_interfaces(true));
1288    }
1289
1290    fn set_multicast_loop_v4(&mut self, on: bool) {
1291        let Some(sock) = self.ipv4_sock.as_mut() else {
1292            return;
1293        };
1294        self.multicast_loop_v4 = on;
1295        sock.pktinfo
1296            .set_multicast_loop_v4(on)
1297            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1298            .unwrap();
1299    }
1300
1301    fn set_multicast_loop_v6(&mut self, on: bool) {
1302        let Some(sock) = self.ipv6_sock.as_mut() else {
1303            return;
1304        };
1305        self.multicast_loop_v6 = on;
1306        sock.pktinfo
1307            .set_multicast_loop_v6(on)
1308            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1309            .unwrap();
1310    }
1311
1312    fn set_accept_unsolicited(&mut self, accept: bool) {
1313        self.accept_unsolicited = accept;
1314    }
1315
1316    fn notify_monitors(&mut self, event: DaemonEvent) {
1317        // Only retain the monitors that are still connected.
1318        self.monitors.retain(|sender| {
1319            if let Err(e) = sender.try_send(event.clone()) {
1320                debug!("notify_monitors: try_send: {}", &e);
1321                if matches!(e, TrySendError::Disconnected(_)) {
1322                    return false; // This monitor is dropped.
1323                }
1324            }
1325            true
1326        });
1327    }
1328
1329    /// Remove `addr` in my services that enabled `addr_auto`.
1330    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1331        for (_, service_info) in self.my_services.iter_mut() {
1332            if service_info.is_addr_auto() {
1333                service_info.remove_ipaddr(addr);
1334            }
1335        }
1336    }
1337
1338    fn add_timer(&mut self, next_time: u64) {
1339        self.timers.push(Reverse(next_time));
1340    }
1341
1342    fn peek_earliest_timer(&self) -> Option<u64> {
1343        self.timers.peek().map(|Reverse(v)| *v)
1344    }
1345
1346    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1347        self.timers.pop().map(|Reverse(v)| v)
1348    }
1349
1350    /// Pop all timers that are already passed till `now`.
1351    fn pop_timers_till(&mut self, now: u64) {
1352        while let Some(Reverse(v)) = self.timers.peek() {
1353            if *v > now {
1354                break;
1355            }
1356            self.timers.pop();
1357        }
1358    }
1359
1360    /// Apply all selections to `interfaces` and return the selected addresses.
1361    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1362        let intf_count = interfaces.len();
1363        let mut intf_selections = vec![true; intf_count];
1364
1365        // apply if_selections
1366        for selection in self.if_selections.iter() {
1367            // Mark the interfaces for this selection.
1368            for i in 0..intf_count {
1369                if selection.if_kind.matches(&interfaces[i]) {
1370                    intf_selections[i] = selection.selected;
1371                }
1372            }
1373        }
1374
1375        let mut selected_addrs = HashSet::new();
1376        for i in 0..intf_count {
1377            if intf_selections[i] {
1378                selected_addrs.insert(interfaces[i].addr.ip());
1379            }
1380        }
1381
1382        selected_addrs
1383    }
1384
1385    /// Apply all selections to `interfaces`.
1386    ///
1387    /// For any interface, add it if selected but not bound yet,
1388    /// delete it if not selected but still bound.
1389    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1390        // By default, we enable all interfaces.
1391        let intf_count = interfaces.len();
1392        let mut intf_selections = vec![true; intf_count];
1393
1394        // apply if_selections
1395        for selection in self.if_selections.iter() {
1396            // Mark the interfaces for this selection.
1397            for i in 0..intf_count {
1398                if selection.if_kind.matches(&interfaces[i]) {
1399                    intf_selections[i] = selection.selected;
1400                }
1401            }
1402        }
1403
1404        // Update `my_intfs` based on the selections.
1405        for (idx, intf) in interfaces.into_iter().enumerate() {
1406            if intf_selections[idx] {
1407                // Add the interface
1408                self.add_interface(intf);
1409            } else {
1410                // Remove the interface
1411                self.del_interface(&intf);
1412            }
1413        }
1414    }
1415
1416    fn del_ip(&mut self, ip: IpAddr) {
1417        self.del_addr_in_my_services(&ip);
1418        self.notify_monitors(DaemonEvent::IpDel(ip));
1419    }
1420
1421    /// Check for IP changes and update [my_intfs] as needed.
1422    fn check_ip_changes(&mut self) {
1423        // Get the current interfaces.
1424        let my_ifaddrs = my_ip_interfaces(true);
1425
1426        #[cfg(test)]
1427        let my_ifaddrs: Vec<_> = my_ifaddrs
1428            .into_iter()
1429            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1430            .collect();
1431
1432        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1433            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1434                let if_index = intf.index.unwrap_or(0);
1435                acc.entry(if_index).or_default().push(&intf.addr);
1436                acc
1437            });
1438
1439        let mut deleted_intfs = Vec::new();
1440        let mut deleted_ips = Vec::new();
1441
1442        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1443            let mut last_ipv4 = None;
1444            let mut last_ipv6 = None;
1445
1446            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1447                my_intf.addrs.retain(|addr| {
1448                    if current_addrs.contains(&addr) {
1449                        true
1450                    } else {
1451                        match addr.ip() {
1452                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1453                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1454                        }
1455                        deleted_ips.push(addr.ip());
1456                        false
1457                    }
1458                });
1459                if my_intf.addrs.is_empty() {
1460                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1461                }
1462            } else {
1463                // If it does not exist, remove the interface.
1464                debug!(
1465                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1466                    my_intf.name, if_index
1467                );
1468                for addr in my_intf.addrs.iter() {
1469                    match addr.ip() {
1470                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1471                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1472                    }
1473                    deleted_ips.push(addr.ip())
1474                }
1475                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1476            }
1477        }
1478
1479        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1480            debug!(
1481                "check_ip_changes: {} deleted ips {} deleted intfs",
1482                deleted_ips.len(),
1483                deleted_intfs.len()
1484            );
1485        }
1486
1487        for ip in deleted_ips {
1488            self.del_ip(ip);
1489        }
1490
1491        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1492            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1493                continue;
1494            };
1495
1496            if let Some(ipv4) = last_ipv4 {
1497                debug!("leave multicast for {ipv4}");
1498                if let Some(sock) = self.ipv4_sock.as_mut() {
1499                    if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1500                        debug!("leave multicast group for addr {ipv4}: {e}");
1501                    }
1502                }
1503            }
1504
1505            if let Some(ipv6) = last_ipv6 {
1506                debug!("leave multicast for {ipv6}");
1507                if let Some(sock) = self.ipv6_sock.as_mut() {
1508                    if let Err(e) = sock
1509                        .pktinfo
1510                        .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1511                    {
1512                        debug!("leave multicast group for IPv6: {ipv6}: {e}");
1513                    }
1514                }
1515            }
1516
1517            // Remove cache records for this interface.
1518            let intf_id = InterfaceId {
1519                name: my_intf.name.to_string(),
1520                index: my_intf.index,
1521            };
1522            let removed_instances = self.cache.remove_records_on_intf(intf_id);
1523            self.notify_service_removal(removed_instances);
1524        }
1525
1526        // Add newly found interfaces only if in our selections.
1527        self.apply_intf_selections(my_ifaddrs);
1528    }
1529
1530    fn del_interface(&mut self, intf: &Interface) {
1531        let if_index = intf.index.unwrap_or(0);
1532        trace!(
1533            "del_interface: {} ({if_index}) addr {}",
1534            intf.name,
1535            intf.ip()
1536        );
1537
1538        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1539            debug!("del_interface: interface {} not found", intf.name);
1540            return;
1541        };
1542
1543        let mut ip_removed = false;
1544
1545        if my_intf.addrs.remove(&intf.addr) {
1546            ip_removed = true;
1547
1548            match intf.addr.ip() {
1549                IpAddr::V4(ipv4) => {
1550                    if my_intf.next_ifaddr_v4().is_none() {
1551                        if let Some(sock) = self.ipv4_sock.as_mut() {
1552                            if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1553                                debug!("leave multicast group for addr {ipv4}: {e}");
1554                            }
1555                        }
1556                    }
1557                }
1558
1559                IpAddr::V6(ipv6) => {
1560                    if my_intf.next_ifaddr_v6().is_none() {
1561                        if let Some(sock) = self.ipv6_sock.as_mut() {
1562                            if let Err(e) =
1563                                sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1564                            {
1565                                debug!("leave multicast group for addr {ipv6}: {e}");
1566                            }
1567                        }
1568                    }
1569                }
1570            }
1571
1572            if my_intf.addrs.is_empty() {
1573                // If no more addresses, remove the interface.
1574                debug!("del_interface: removing interface {}", intf.name);
1575                self.my_intfs.remove(&if_index);
1576                self.dns_registry_map.remove(&if_index);
1577                self.cache.remove_addrs_on_disabled_intf(if_index);
1578            }
1579        }
1580
1581        if ip_removed {
1582            // Notify the monitors.
1583            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1584            // Remove the interface from my services that enabled `addr_auto`.
1585            self.del_addr_in_my_services(&intf.ip());
1586        }
1587    }
1588
1589    fn add_interface(&mut self, intf: Interface) {
1590        let sock_opt = if intf.ip().is_ipv4() {
1591            &self.ipv4_sock
1592        } else {
1593            &self.ipv6_sock
1594        };
1595
1596        let Some(sock) = sock_opt else {
1597            debug!(
1598                "add_interface: no socket available for interface {} with addr {}. Skipped.",
1599                intf.name,
1600                intf.ip()
1601            );
1602            return;
1603        };
1604
1605        let if_index = intf.index.unwrap_or(0);
1606        let mut new_addr = false;
1607
1608        match self.my_intfs.entry(if_index) {
1609            Entry::Occupied(mut entry) => {
1610                // If intf has a new address, add it to the existing interface.
1611                let my_intf = entry.get_mut();
1612                if !my_intf.addrs.contains(&intf.addr) {
1613                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1614                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1615                    }
1616                    my_intf.addrs.insert(intf.addr.clone());
1617                    new_addr = true;
1618                }
1619            }
1620            Entry::Vacant(entry) => {
1621                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1622                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1623                    return;
1624                }
1625
1626                new_addr = true;
1627                let new_intf = MyIntf {
1628                    name: intf.name.clone(),
1629                    index: if_index,
1630                    addrs: HashSet::from([intf.addr.clone()]),
1631                };
1632                entry.insert(new_intf);
1633            }
1634        }
1635
1636        if !new_addr {
1637            trace!("add_interface: interface {} already exists", &intf.name);
1638            return;
1639        }
1640
1641        debug!("add new interface {}: {}", intf.name, intf.ip());
1642
1643        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1644            debug!("add_interface: cannot find if_index {if_index}");
1645            return;
1646        };
1647
1648        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1649            Some(registry) => registry,
1650            None => self
1651                .dns_registry_map
1652                .entry(if_index)
1653                .or_insert_with(DnsRegistry::new),
1654        };
1655
1656        for (_, service_info) in self.my_services.iter_mut() {
1657            if service_info.is_addr_auto() {
1658                let new_ip = intf.ip();
1659                service_info.insert_ipaddr(new_ip);
1660
1661                if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1662                    debug!(
1663                        "Announce service {} on {}",
1664                        service_info.get_fullname(),
1665                        intf.ip()
1666                    );
1667                    service_info.set_status(if_index, ServiceStatus::Announced);
1668                } else {
1669                    for timer in dns_registry.new_timers.drain(..) {
1670                        self.timers.push(Reverse(timer));
1671                    }
1672                    service_info.set_status(if_index, ServiceStatus::Probing);
1673                }
1674            }
1675        }
1676
1677        // As we added a new interface, we want to execute all active "Browse" reruns now.
1678        let mut browse_reruns = Vec::new();
1679        let mut i = 0;
1680        while i < self.retransmissions.len() {
1681            if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1682                browse_reruns.push(self.retransmissions.remove(i));
1683            } else {
1684                i += 1;
1685            }
1686        }
1687
1688        for rerun in browse_reruns {
1689            self.exec_command(rerun.command, true);
1690        }
1691
1692        // Notify the monitors.
1693        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1694    }
1695
1696    /// Registers a service.
1697    ///
1698    /// RFC 6762 section 8.3.
1699    /// ...the Multicast DNS responder MUST send
1700    ///    an unsolicited Multicast DNS response containing, in the Answer
1701    ///    Section, all of its newly registered resource records
1702    ///
1703    /// Zeroconf will then respond to requests for information about this service.
1704    fn register_service(&mut self, mut info: ServiceInfo) {
1705        // Check the service name length.
1706        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1707            debug!("check_service_name_length: {}", &e);
1708            self.notify_monitors(DaemonEvent::Error(e));
1709            return;
1710        }
1711
1712        if info.is_addr_auto() {
1713            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1714            for addr in selected_addrs {
1715                info.insert_ipaddr(addr);
1716            }
1717        }
1718
1719        debug!("register service {:?}", &info);
1720
1721        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1722        if !outgoing_addrs.is_empty() {
1723            self.notify_monitors(DaemonEvent::Announce(
1724                info.get_fullname().to_string(),
1725                format!("{:?}", &outgoing_addrs),
1726            ));
1727        }
1728
1729        // The key has to be lower case letter as DNS record name is case insensitive.
1730        // The info will have the original name.
1731        let service_fullname = info.get_fullname().to_lowercase();
1732        self.my_services.insert(service_fullname, info);
1733    }
1734
1735    /// Sends out announcement of `info` on every valid interface.
1736    /// Returns the list of interface IPs that sent out the announcement.
1737    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1738        let mut outgoing_addrs = Vec::new();
1739        let mut outgoing_intfs = HashSet::new();
1740
1741        for (if_index, intf) in self.my_intfs.iter() {
1742            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1743                Some(registry) => registry,
1744                None => self
1745                    .dns_registry_map
1746                    .entry(*if_index)
1747                    .or_insert_with(DnsRegistry::new),
1748            };
1749
1750            let mut announced = false;
1751
1752            // IPv4
1753            if let Some(sock) = self.ipv4_sock.as_mut() {
1754                if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo) {
1755                    for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1756                        outgoing_addrs.push(addr.ip());
1757                    }
1758                    outgoing_intfs.insert(intf.index);
1759
1760                    debug!(
1761                        "Announce service IPv4 {} on {}",
1762                        info.get_fullname(),
1763                        intf.name
1764                    );
1765                    announced = true;
1766                }
1767            }
1768
1769            if let Some(sock) = self.ipv6_sock.as_mut() {
1770                if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo) {
1771                    for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1772                        outgoing_addrs.push(addr.ip());
1773                    }
1774                    outgoing_intfs.insert(intf.index);
1775
1776                    debug!(
1777                        "Announce service IPv6 {} on {}",
1778                        info.get_fullname(),
1779                        intf.name
1780                    );
1781                    announced = true;
1782                }
1783            }
1784
1785            if announced {
1786                info.set_status(intf.index, ServiceStatus::Announced);
1787            } else {
1788                for timer in dns_registry.new_timers.drain(..) {
1789                    self.timers.push(Reverse(timer));
1790                }
1791                info.set_status(*if_index, ServiceStatus::Probing);
1792            }
1793        }
1794
1795        // RFC 6762 section 8.3.
1796        // ..The Multicast DNS responder MUST send at least two unsolicited
1797        //    responses, one second apart.
1798        let next_time = current_time_millis() + 1000;
1799        for if_index in outgoing_intfs {
1800            self.add_retransmission(
1801                next_time,
1802                Command::RegisterResend(info.get_fullname().to_string(), if_index),
1803            );
1804        }
1805
1806        outgoing_addrs
1807    }
1808
1809    /// Send probings or finish them if expired. Notify waiting services.
1810    fn probing_handler(&mut self) {
1811        let now = current_time_millis();
1812
1813        for (if_index, intf) in self.my_intfs.iter() {
1814            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1815                continue;
1816            };
1817
1818            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1819
1820            // send probing.
1821            if !out.questions().is_empty() {
1822                trace!("sending out probing of questions: {:?}", out.questions());
1823                if let Some(sock) = self.ipv4_sock.as_mut() {
1824                    send_dns_outgoing(&out, intf, &sock.pktinfo);
1825                }
1826                if let Some(sock) = self.ipv6_sock.as_mut() {
1827                    send_dns_outgoing(&out, intf, &sock.pktinfo);
1828                }
1829            }
1830
1831            // For finished probes, wake up services that are waiting for the probes.
1832            let waiting_services =
1833                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1834
1835            for service_name in waiting_services {
1836                // service names are lowercase
1837                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1838                    if info.get_status(*if_index) == ServiceStatus::Announced {
1839                        debug!("service {} already announced", info.get_fullname());
1840                        continue;
1841                    }
1842
1843                    let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
1844                        announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
1845                    } else {
1846                        false
1847                    };
1848                    let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
1849                        announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
1850                    } else {
1851                        false
1852                    };
1853
1854                    if announced_v4 || announced_v6 {
1855                        let next_time = now + 1000;
1856                        let command =
1857                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1858                        self.retransmissions.push(ReRun { next_time, command });
1859                        self.timers.push(Reverse(next_time));
1860
1861                        let fullname = match dns_registry.name_changes.get(&service_name) {
1862                            Some(new_name) => new_name.to_string(),
1863                            None => service_name.to_string(),
1864                        };
1865
1866                        let mut hostname = info.get_hostname();
1867                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1868                            hostname = new_name;
1869                        }
1870
1871                        debug!("wake up: announce service {} on {}", fullname, intf.name);
1872                        notify_monitors(
1873                            &mut self.monitors,
1874                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1875                        );
1876
1877                        info.set_status(*if_index, ServiceStatus::Announced);
1878                    }
1879                }
1880            }
1881        }
1882    }
1883
1884    fn unregister_service(
1885        &self,
1886        info: &ServiceInfo,
1887        intf: &MyIntf,
1888        sock: &PktInfoUdpSocket,
1889    ) -> Vec<u8> {
1890        let is_ipv4 = sock.domain() == Domain::IPV4;
1891
1892        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1893        out.add_answer_at_time(
1894            DnsPointer::new(
1895                info.get_type(),
1896                RRType::PTR,
1897                CLASS_IN,
1898                0,
1899                info.get_fullname().to_string(),
1900            ),
1901            0,
1902        );
1903
1904        if let Some(sub) = info.get_subtype() {
1905            trace!("Adding subdomain {}", sub);
1906            out.add_answer_at_time(
1907                DnsPointer::new(
1908                    sub,
1909                    RRType::PTR,
1910                    CLASS_IN,
1911                    0,
1912                    info.get_fullname().to_string(),
1913                ),
1914                0,
1915            );
1916        }
1917
1918        out.add_answer_at_time(
1919            DnsSrv::new(
1920                info.get_fullname(),
1921                CLASS_IN | CLASS_CACHE_FLUSH,
1922                0,
1923                info.get_priority(),
1924                info.get_weight(),
1925                info.get_port(),
1926                info.get_hostname().to_string(),
1927            ),
1928            0,
1929        );
1930        out.add_answer_at_time(
1931            DnsTxt::new(
1932                info.get_fullname(),
1933                CLASS_IN | CLASS_CACHE_FLUSH,
1934                0,
1935                info.generate_txt(),
1936            ),
1937            0,
1938        );
1939
1940        let if_addrs = if is_ipv4 {
1941            info.get_addrs_on_my_intf_v4(intf)
1942        } else {
1943            info.get_addrs_on_my_intf_v6(intf)
1944        };
1945
1946        if if_addrs.is_empty() {
1947            return vec![];
1948        }
1949
1950        for address in if_addrs {
1951            out.add_answer_at_time(
1952                DnsAddress::new(
1953                    info.get_hostname(),
1954                    ip_address_rr_type(&address),
1955                    CLASS_IN | CLASS_CACHE_FLUSH,
1956                    0,
1957                    address,
1958                    intf.into(),
1959                ),
1960                0,
1961            );
1962        }
1963
1964        // `out` data is non-empty, hence we can do this.
1965        send_dns_outgoing(&out, intf, sock).remove(0)
1966    }
1967
1968    /// Binds a channel `listener` to querying mDNS hostnames.
1969    ///
1970    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1971    fn add_hostname_resolver(
1972        &mut self,
1973        hostname: String,
1974        listener: Sender<HostnameResolutionEvent>,
1975        timeout: Option<u64>,
1976    ) {
1977        let real_timeout = timeout.map(|t| current_time_millis() + t);
1978        self.hostname_resolvers
1979            .insert(hostname.to_lowercase(), (listener, real_timeout));
1980        if let Some(t) = real_timeout {
1981            self.add_timer(t);
1982        }
1983    }
1984
1985    /// Sends a multicast query for `name` with `qtype`.
1986    fn send_query(&self, name: &str, qtype: RRType) {
1987        self.send_query_vec(&[(name, qtype)]);
1988    }
1989
1990    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1991    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1992        trace!("Sending query questions: {:?}", questions);
1993        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1994        let now = current_time_millis();
1995
1996        for (name, qtype) in questions {
1997            out.add_question(name, *qtype);
1998
1999            for record in self.cache.get_known_answers(name, *qtype, now) {
2000                /*
2001                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
2002                ...
2003                    When a Multicast DNS querier sends a query to which it already knows
2004                    some answers, it populates the Answer Section of the DNS query
2005                    message with those answers.
2006                 */
2007                trace!("add known answer: {:?}", record.record);
2008                let mut new_record = record.record.clone();
2009                new_record.get_record_mut().update_ttl(now);
2010                out.add_answer_box(new_record);
2011            }
2012        }
2013
2014        for (_, intf) in self.my_intfs.iter() {
2015            if let Some(sock) = self.ipv4_sock.as_ref() {
2016                send_dns_outgoing(&out, intf, &sock.pktinfo);
2017            }
2018            if let Some(sock) = self.ipv6_sock.as_ref() {
2019                send_dns_outgoing(&out, intf, &sock.pktinfo);
2020            }
2021        }
2022    }
2023
2024    /// Reads one UDP datagram from the socket of `intf`.
2025    ///
2026    /// Returns false if failed to receive a packet,
2027    /// otherwise returns true.
2028    fn handle_read(&mut self, event_key: usize) -> bool {
2029        let sock_opt = match event_key {
2030            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2031            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2032            _ => {
2033                debug!("handle_read: unknown token {}", event_key);
2034                return false;
2035            }
2036        };
2037        let Some(sock) = sock_opt.as_mut() else {
2038            debug!("handle_read: socket not available for token {}", event_key);
2039            return false;
2040        };
2041        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2042
2043        // Read the next mDNS UDP datagram.
2044        //
2045        // If the datagram is larger than `buf`, excess bytes may or may not
2046        // be truncated by the socket layer depending on the platform's libc.
2047        // In any case, such large datagram will not be decoded properly and
2048        // this function should return false but should not crash.
2049        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2050            Ok(sz) => sz,
2051            Err(e) => {
2052                if e.kind() != std::io::ErrorKind::WouldBlock {
2053                    debug!("listening socket read failed: {}", e);
2054                }
2055                return false;
2056            }
2057        };
2058
2059        // Find the interface that received the packet.
2060        let pkt_if_index = pktinfo.if_index as u32;
2061        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2062            debug!(
2063                "handle_read: no interface found for pktinfo if_index: {}",
2064                pktinfo.if_index
2065            );
2066            return true; // We still return true to indicate that we read something.
2067        };
2068
2069        buf.truncate(sz); // reduce potential processing errors
2070
2071        match DnsIncoming::new(buf, my_intf.into()) {
2072            Ok(msg) => {
2073                if msg.is_query() {
2074                    self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2075                } else if msg.is_response() {
2076                    self.handle_response(msg, pkt_if_index);
2077                } else {
2078                    debug!("Invalid message: not query and not response");
2079                }
2080            }
2081            Err(e) => debug!("Invalid incoming DNS message: {}", e),
2082        }
2083
2084        true
2085    }
2086
2087    /// Returns true, if sent query. Returns false if SRV already exists.
2088    fn query_unresolved(&mut self, instance: &str) -> bool {
2089        if !valid_instance_name(instance) {
2090            trace!("instance name {} not valid", instance);
2091            return false;
2092        }
2093
2094        if let Some(records) = self.cache.get_srv(instance) {
2095            for record in records {
2096                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2097                    if self.cache.get_addr(srv.host()).is_none() {
2098                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2099                        return true;
2100                    }
2101                }
2102            }
2103        } else {
2104            self.send_query(instance, RRType::ANY);
2105            return true;
2106        }
2107
2108        false
2109    }
2110
2111    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2112    /// cached records via `sender`.
2113    fn query_cache_for_service(
2114        &mut self,
2115        ty_domain: &str,
2116        sender: &Sender<ServiceEvent>,
2117        now: u64,
2118    ) {
2119        let mut resolved: HashSet<String> = HashSet::new();
2120        let mut unresolved: HashSet<String> = HashSet::new();
2121
2122        if let Some(records) = self.cache.get_ptr(ty_domain) {
2123            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2124                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2125                    let mut new_event = None;
2126                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2127                        Ok(resolved_service) => {
2128                            if resolved_service.is_valid() {
2129                                debug!("Resolved service from cache: {}", ptr.alias());
2130                                new_event =
2131                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2132                            } else {
2133                                debug!("Resolved service is not valid: {}", ptr.alias());
2134                            }
2135                        }
2136                        Err(err) => {
2137                            debug!("Error while resolving service from cache: {}", err);
2138                            continue;
2139                        }
2140                    }
2141
2142                    match sender.send(ServiceEvent::ServiceFound(
2143                        ty_domain.to_string(),
2144                        ptr.alias().to_string(),
2145                    )) {
2146                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2147                        Err(e) => {
2148                            debug!("failed to send service found: {}", e);
2149                            continue;
2150                        }
2151                    }
2152
2153                    if let Some(event) = new_event {
2154                        resolved.insert(ptr.alias().to_string());
2155                        match sender.send(event) {
2156                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2157                            Err(e) => debug!("failed to send service resolved: {}", e),
2158                        }
2159                    } else {
2160                        unresolved.insert(ptr.alias().to_string());
2161                    }
2162                }
2163            }
2164        }
2165
2166        for instance in resolved.drain() {
2167            self.pending_resolves.remove(&instance);
2168            self.resolved.insert(instance);
2169        }
2170
2171        for instance in unresolved.drain() {
2172            self.add_pending_resolve(instance);
2173        }
2174    }
2175
2176    /// Checks if `hostname` has records in the cache. If yes, sends the
2177    /// cached records via `sender`.
2178    fn query_cache_for_hostname(
2179        &mut self,
2180        hostname: &str,
2181        sender: Sender<HostnameResolutionEvent>,
2182    ) {
2183        let addresses_map = self.cache.get_addresses_for_host(hostname);
2184        for (name, addresses) in addresses_map {
2185            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2186                Ok(()) => trace!("sent hostname addresses found"),
2187                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2188            }
2189        }
2190    }
2191
2192    fn add_pending_resolve(&mut self, instance: String) {
2193        if !self.pending_resolves.contains(&instance) {
2194            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2195            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2196            self.pending_resolves.insert(instance);
2197        }
2198    }
2199
2200    /// Creates a `ResolvedService` from the cache.
2201    fn resolve_service_from_cache(
2202        &self,
2203        ty_domain: &str,
2204        fullname: &str,
2205    ) -> Result<ResolvedService> {
2206        let now = current_time_millis();
2207        let mut resolved_service = ResolvedService {
2208            ty_domain: ty_domain.to_string(),
2209            sub_ty_domain: None,
2210            fullname: fullname.to_string(),
2211            host: String::new(),
2212            port: 0,
2213            addresses: HashSet::new(),
2214            txt_properties: TxtProperties::new(),
2215        };
2216
2217        // Be sure setting `subtype` if available even when querying for the parent domain.
2218        if let Some(subtype) = self.cache.get_subtype(fullname) {
2219            trace!(
2220                "ty_domain: {} found subtype {} for instance: {}",
2221                ty_domain,
2222                subtype,
2223                fullname
2224            );
2225            if resolved_service.sub_ty_domain.is_none() {
2226                resolved_service.sub_ty_domain = Some(subtype.to_string());
2227            }
2228        }
2229
2230        // resolve SRV record
2231        if let Some(records) = self.cache.get_srv(fullname) {
2232            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2233                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2234                    resolved_service.host = dns_srv.host().to_string();
2235                    resolved_service.port = dns_srv.port();
2236                }
2237            }
2238        }
2239
2240        // resolve TXT record
2241        if let Some(records) = self.cache.get_txt(fullname) {
2242            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2243                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2244                    resolved_service.txt_properties = dns_txt.text().into();
2245                }
2246            }
2247        }
2248
2249        // resolve A and AAAA records
2250        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2251            for answer in records.iter() {
2252                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2253                    if dns_a.expires_soon(now) {
2254                        trace!(
2255                            "Addr expired or expires soon: {}",
2256                            dns_a.address().to_ip_addr()
2257                        );
2258                    } else {
2259                        resolved_service.addresses.insert(dns_a.address());
2260                    }
2261                }
2262            }
2263        }
2264
2265        Ok(resolved_service)
2266    }
2267
2268    fn handle_poller_events(&mut self, events: &mio::Events) {
2269        for ev in events.iter() {
2270            trace!("event received with key {:?}", ev.token());
2271            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2272                // Drain signals as we will drain commands as well.
2273                self.signal_sock_drain();
2274
2275                if let Err(e) = self.poller.registry().reregister(
2276                    &mut self.signal_sock,
2277                    ev.token(),
2278                    mio::Interest::READABLE,
2279                ) {
2280                    debug!("failed to modify poller for signal socket: {}", e);
2281                }
2282                continue; // Next event.
2283            }
2284
2285            // Read until no more packets available.
2286            while self.handle_read(ev.token().0) {}
2287
2288            // we continue to monitor this socket.
2289            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2290                // Re-register the IPv4 socket for reading.
2291                if let Some(sock) = self.ipv4_sock.as_mut() {
2292                    if let Err(e) =
2293                        self.poller
2294                            .registry()
2295                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2296                    {
2297                        debug!("modify poller for IPv4 socket: {}", e);
2298                    }
2299                }
2300            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2301                // Re-register the IPv6 socket for reading.
2302                if let Some(sock) = self.ipv6_sock.as_mut() {
2303                    if let Err(e) =
2304                        self.poller
2305                            .registry()
2306                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2307                    {
2308                        debug!("modify poller for IPv6 socket: {}", e);
2309                    }
2310                }
2311            }
2312        }
2313    }
2314
2315    /// Deal with incoming response packets.  All answers
2316    /// are held in the cache, and listeners are notified.
2317    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2318        let now = current_time_millis();
2319
2320        // remove records that are expired.
2321        let mut record_predicate = |record: &DnsRecordBox| {
2322            if !record.get_record().is_expired(now) {
2323                return true;
2324            }
2325
2326            debug!("record is expired, removing it from cache.");
2327            if self.cache.remove(record) {
2328                // for PTR records, send event to listeners
2329                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2330                    call_service_listener(
2331                        &self.service_queriers,
2332                        dns_ptr.get_name(),
2333                        ServiceEvent::ServiceRemoved(
2334                            dns_ptr.get_name().to_string(),
2335                            dns_ptr.alias().to_string(),
2336                        ),
2337                    );
2338                }
2339            }
2340            false
2341        };
2342        msg.answers_mut().retain(&mut record_predicate);
2343        msg.authorities_mut().retain(&mut record_predicate);
2344        msg.additionals_mut().retain(&mut record_predicate);
2345
2346        // check possible conflicts and handle them.
2347        self.conflict_handler(&msg, if_index);
2348
2349        // check if the message is for us.
2350        let mut is_for_us = true; // assume it is for us.
2351
2352        // If there are any PTR records in the answers, there should be
2353        // at least one PTR for us. Otherwise, the message is not for us.
2354        // If there are no PTR records at all, assume this message is for us.
2355        for answer in msg.answers() {
2356            if answer.get_type() == RRType::PTR {
2357                if self.service_queriers.contains_key(answer.get_name()) {
2358                    is_for_us = true;
2359                    break; // OK to break: at least one PTR for us.
2360                } else {
2361                    is_for_us = false;
2362                }
2363            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2364                // If there is a hostname querier for this address, then it is for us.
2365                let answer_lowercase = answer.get_name().to_lowercase();
2366                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2367                    is_for_us = true;
2368                    break; // OK to break: at least one hostname for us.
2369                }
2370            }
2371        }
2372
2373        // if we explicitily want to accept unsolicited responses, we should consider all messages as for us.
2374        if self.accept_unsolicited {
2375            is_for_us = true;
2376        }
2377
2378        /// Represents a DNS record change that involves one service instance.
2379        struct InstanceChange {
2380            ty: RRType,   // The type of DNS record for the instance.
2381            name: String, // The name of the record.
2382        }
2383
2384        // Go through all answers to get the new and updated records.
2385        // For new PTR records, send out ServiceFound immediately. For others,
2386        // collect them into `changes`.
2387        //
2388        // Note: we don't try to identify the update instances based on
2389        // each record immediately as the answers are likely related to each
2390        // other.
2391        let mut changes = Vec::new();
2392        let mut timers = Vec::new();
2393        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2394            return;
2395        };
2396        for record in msg.all_records() {
2397            match self
2398                .cache
2399                .add_or_update(my_intf, record, &mut timers, is_for_us)
2400            {
2401                Some((dns_record, true)) => {
2402                    timers.push(dns_record.record.get_record().get_expire_time());
2403                    timers.push(dns_record.record.get_record().get_refresh_time());
2404
2405                    let ty = dns_record.record.get_type();
2406                    let name = dns_record.record.get_name();
2407
2408                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2409                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2410                        if self.service_queriers.contains_key(name) {
2411                            timers.push(dns_record.record.get_record().get_refresh_time());
2412                        }
2413
2414                        // send ServiceFound
2415                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2416                        {
2417                            debug!("calling listener with service found: {name}");
2418                            call_service_listener(
2419                                &self.service_queriers,
2420                                name,
2421                                ServiceEvent::ServiceFound(
2422                                    name.to_string(),
2423                                    dns_ptr.alias().to_string(),
2424                                ),
2425                            );
2426                            changes.push(InstanceChange {
2427                                ty,
2428                                name: dns_ptr.alias().to_string(),
2429                            });
2430                        }
2431                    } else {
2432                        changes.push(InstanceChange {
2433                            ty,
2434                            name: name.to_string(),
2435                        });
2436                    }
2437                }
2438                Some((dns_record, false)) => {
2439                    timers.push(dns_record.record.get_record().get_expire_time());
2440                    timers.push(dns_record.record.get_record().get_refresh_time());
2441                }
2442                _ => {}
2443            }
2444        }
2445
2446        // Add timers for the new records.
2447        for t in timers {
2448            self.add_timer(t);
2449        }
2450
2451        // Go through remaining changes to see if any hostname resolutions were found or updated.
2452        for change in changes
2453            .iter()
2454            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2455        {
2456            let addr_map = self.cache.get_addresses_for_host(&change.name);
2457            for (name, addresses) in addr_map {
2458                call_hostname_resolution_listener(
2459                    &self.hostname_resolvers,
2460                    &change.name,
2461                    HostnameResolutionEvent::AddressesFound(name, addresses),
2462                )
2463            }
2464        }
2465
2466        // Identify the instances that need to be "resolved".
2467        let mut updated_instances = HashSet::new();
2468        for update in changes {
2469            match update.ty {
2470                RRType::PTR | RRType::SRV | RRType::TXT => {
2471                    updated_instances.insert(update.name);
2472                }
2473                RRType::A | RRType::AAAA => {
2474                    let instances = self.cache.get_instances_on_host(&update.name);
2475                    updated_instances.extend(instances);
2476                }
2477                _ => {}
2478            }
2479        }
2480
2481        self.resolve_updated_instances(&updated_instances);
2482    }
2483
2484    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2485        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2486            debug!("handle_response: no intf found for index {if_index}");
2487            return;
2488        };
2489
2490        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2491            return;
2492        };
2493
2494        for answer in msg.answers().iter() {
2495            let mut new_records = Vec::new();
2496
2497            let name = answer.get_name();
2498            let Some(probe) = dns_registry.probing.get_mut(name) else {
2499                continue;
2500            };
2501
2502            // check against possible multicast forwarding
2503            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2504                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2505                    if answer_addr.interface_id.index != if_index {
2506                        debug!(
2507                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2508                            answer_addr, my_intf.name
2509                        );
2510                        continue;
2511                    }
2512                }
2513
2514                // double check if any other address record matches rrdata,
2515                // as there could be multiple addresses for the same name.
2516                let any_match = probe.records.iter().any(|r| {
2517                    r.get_type() == answer.get_type()
2518                        && r.get_class() == answer.get_class()
2519                        && r.rrdata_match(answer.as_ref())
2520                });
2521                if any_match {
2522                    continue; // no conflict for this answer.
2523                }
2524            }
2525
2526            probe.records.retain(|record| {
2527                if record.get_type() == answer.get_type()
2528                    && record.get_class() == answer.get_class()
2529                    && !record.rrdata_match(answer.as_ref())
2530                {
2531                    debug!(
2532                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2533                        record.get_type(),
2534                        record.rdata_print(),
2535                        answer.rdata_print()
2536                    );
2537
2538                    // create a new name for this record
2539                    // then remove the old record in probing.
2540                    let mut new_record = record.clone();
2541                    let new_name = match record.get_type() {
2542                        RRType::A => hostname_change(name),
2543                        RRType::AAAA => hostname_change(name),
2544                        _ => name_change(name),
2545                    };
2546                    new_record.get_record_mut().set_new_name(new_name);
2547                    new_records.push(new_record);
2548                    return false; // old record is dropped from the probe.
2549                }
2550
2551                true
2552            });
2553
2554            // ?????
2555            // if probe.records.is_empty() {
2556            //     dns_registry.probing.remove(name);
2557            // }
2558
2559            // Probing again with the new names.
2560            let create_time = current_time_millis() + fastrand::u64(0..250);
2561
2562            let waiting_services = probe.waiting_services.clone();
2563
2564            for record in new_records {
2565                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2566                    self.timers.push(Reverse(create_time));
2567                }
2568
2569                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2570                dns_registry.name_changes.insert(
2571                    record.get_record().get_original_name().to_string(),
2572                    record.get_name().to_string(),
2573                );
2574
2575                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2576                    Some(p) => p,
2577                    None => {
2578                        let new_probe = dns_registry
2579                            .probing
2580                            .entry(record.get_name().to_string())
2581                            .or_insert_with(|| {
2582                                debug!("conflict handler: new probe of {}", record.get_name());
2583                                Probe::new(create_time)
2584                            });
2585                        self.timers.push(Reverse(new_probe.next_send));
2586                        new_probe
2587                    }
2588                };
2589
2590                debug!(
2591                    "insert record with new name '{}' {} into probe",
2592                    record.get_name(),
2593                    record.get_type()
2594                );
2595                new_probe.insert_record(record);
2596
2597                new_probe.waiting_services.extend(waiting_services.clone());
2598            }
2599        }
2600    }
2601
2602    /// Resolve the updated (including new) instances.
2603    ///
2604    /// Note: it is possible that more than 1 PTR pointing to the same
2605    /// instance. For example, a regular service type PTR and a sub-type
2606    /// service type PTR can both point to the same service instance.
2607    /// This loop automatically handles the sub-type PTRs.
2608    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2609        let mut resolved: HashSet<String> = HashSet::new();
2610        let mut unresolved: HashSet<String> = HashSet::new();
2611        let mut removed_instances = HashMap::new();
2612
2613        let now = current_time_millis();
2614
2615        for (ty_domain, records) in self.cache.all_ptr().iter() {
2616            if !self.service_queriers.contains_key(ty_domain) {
2617                // No need to resolve if not in our queries.
2618                continue;
2619            }
2620
2621            for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2622                let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2623                    continue;
2624                };
2625
2626                let instance = dns_ptr.alias();
2627                if !updated_instances.contains(instance) {
2628                    continue;
2629                }
2630
2631                let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2632                else {
2633                    continue;
2634                };
2635
2636                debug!("resolve_updated_instances: from cache: {instance}");
2637                if resolved_service.is_valid() {
2638                    debug!("call queriers to resolve {instance}");
2639                    resolved.insert(instance.to_string());
2640                    let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2641                    call_service_listener(&self.service_queriers, ty_domain, event);
2642                } else {
2643                    debug!("Resolved service is not valid: {instance}");
2644                    if self.resolved.remove(dns_ptr.alias()) {
2645                        removed_instances
2646                            .entry(ty_domain.to_string())
2647                            .or_insert_with(HashSet::new)
2648                            .insert(instance.to_string());
2649                    }
2650                    unresolved.insert(instance.to_string());
2651                }
2652            }
2653        }
2654
2655        for instance in resolved.drain() {
2656            self.pending_resolves.remove(&instance);
2657            self.resolved.insert(instance);
2658        }
2659
2660        for instance in unresolved.drain() {
2661            self.add_pending_resolve(instance);
2662        }
2663
2664        if !removed_instances.is_empty() {
2665            debug!(
2666                "resolve_updated_instances: removed {}",
2667                &removed_instances.len()
2668            );
2669            self.notify_service_removal(removed_instances);
2670        }
2671    }
2672
2673    /// Handle incoming query packets, figure out whether and what to respond.
2674    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2675        let sock_opt = if is_ipv4 {
2676            &self.ipv4_sock
2677        } else {
2678            &self.ipv6_sock
2679        };
2680        let Some(sock) = sock_opt.as_ref() else {
2681            debug!("handle_query: socket not available for intf {}", if_index);
2682            return;
2683        };
2684        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2685
2686        // Special meta-query "_services._dns-sd._udp.<Domain>".
2687        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2688        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2689
2690        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2691            debug!("missing dns registry for intf {}", if_index);
2692            return;
2693        };
2694
2695        let Some(intf) = self.my_intfs.get(&if_index) else {
2696            return;
2697        };
2698
2699        for question in msg.questions().iter() {
2700            let qtype = question.entry_type();
2701
2702            if qtype == RRType::PTR {
2703                for service in self.my_services.values() {
2704                    if service.get_status(if_index) != ServiceStatus::Announced {
2705                        continue;
2706                    }
2707
2708                    if question.entry_name() == service.get_type()
2709                        || service
2710                            .get_subtype()
2711                            .as_ref()
2712                            .is_some_and(|v| v == question.entry_name())
2713                    {
2714                        add_answer_with_additionals(
2715                            &mut out,
2716                            &msg,
2717                            service,
2718                            intf,
2719                            dns_registry,
2720                            is_ipv4,
2721                        );
2722                    } else if question.entry_name() == META_QUERY {
2723                        let ptr_added = out.add_answer(
2724                            &msg,
2725                            DnsPointer::new(
2726                                question.entry_name(),
2727                                RRType::PTR,
2728                                CLASS_IN,
2729                                service.get_other_ttl(),
2730                                service.get_type().to_string(),
2731                            ),
2732                        );
2733                        if !ptr_added {
2734                            trace!("answer was not added for meta-query {:?}", &question);
2735                        }
2736                    }
2737                }
2738            } else {
2739                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2740                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2741                    let probe_name = question.entry_name();
2742
2743                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2744                        let now = current_time_millis();
2745
2746                        // Only do tiebreaking if probe already started.
2747                        // This check also helps avoid redo tiebreaking if start time
2748                        // was postponed.
2749                        if probe.start_time < now {
2750                            let incoming_records: Vec<_> = msg
2751                                .authorities()
2752                                .iter()
2753                                .filter(|r| r.get_name() == probe_name)
2754                                .collect();
2755
2756                            probe.tiebreaking(&incoming_records, now, probe_name);
2757                        }
2758                    }
2759                }
2760
2761                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2762                    for service in self.my_services.values() {
2763                        if service.get_status(if_index) != ServiceStatus::Announced {
2764                            continue;
2765                        }
2766
2767                        let service_hostname =
2768                            match dns_registry.name_changes.get(service.get_hostname()) {
2769                                Some(new_name) => new_name,
2770                                None => service.get_hostname(),
2771                            };
2772
2773                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2774                            let intf_addrs = if is_ipv4 {
2775                                service.get_addrs_on_my_intf_v4(intf)
2776                            } else {
2777                                service.get_addrs_on_my_intf_v6(intf)
2778                            };
2779                            if intf_addrs.is_empty()
2780                                && (qtype == RRType::A || qtype == RRType::AAAA)
2781                            {
2782                                let t = match qtype {
2783                                    RRType::A => "TYPE_A",
2784                                    RRType::AAAA => "TYPE_AAAA",
2785                                    _ => "invalid_type",
2786                                };
2787                                trace!(
2788                                    "Cannot find valid addrs for {} response on intf {:?}",
2789                                    t,
2790                                    &intf
2791                                );
2792                                return;
2793                            }
2794                            for address in intf_addrs {
2795                                out.add_answer(
2796                                    &msg,
2797                                    DnsAddress::new(
2798                                        service_hostname,
2799                                        ip_address_rr_type(&address),
2800                                        CLASS_IN | CLASS_CACHE_FLUSH,
2801                                        service.get_host_ttl(),
2802                                        address,
2803                                        intf.into(),
2804                                    ),
2805                                );
2806                            }
2807                        }
2808                    }
2809                }
2810
2811                let query_name = question.entry_name().to_lowercase();
2812                let service_opt = self
2813                    .my_services
2814                    .iter()
2815                    .find(|(k, _v)| {
2816                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2817                            Some(new_name) => new_name,
2818                            None => k,
2819                        };
2820                        service_name == &query_name
2821                    })
2822                    .map(|(_, v)| v);
2823
2824                let Some(service) = service_opt else {
2825                    continue;
2826                };
2827
2828                if service.get_status(if_index) != ServiceStatus::Announced {
2829                    continue;
2830                }
2831
2832                let intf_addrs = if is_ipv4 {
2833                    service.get_addrs_on_my_intf_v4(intf)
2834                } else {
2835                    service.get_addrs_on_my_intf_v6(intf)
2836                };
2837                if intf_addrs.is_empty() {
2838                    debug!(
2839                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2840                        &intf
2841                    );
2842                    continue;
2843                }
2844
2845                add_answer_of_service(
2846                    &mut out,
2847                    &msg,
2848                    question.entry_name(),
2849                    service,
2850                    qtype,
2851                    intf_addrs,
2852                );
2853            }
2854        }
2855
2856        if !out.answers_count() > 0 {
2857            out.set_id(msg.id());
2858            send_dns_outgoing(&out, intf, &sock.pktinfo);
2859
2860            let if_name = intf.name.clone();
2861
2862            self.increase_counter(Counter::Respond, 1);
2863            self.notify_monitors(DaemonEvent::Respond(if_name));
2864        }
2865
2866        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2867    }
2868
2869    /// Increases the value of `counter` by `count`.
2870    fn increase_counter(&mut self, counter: Counter, count: i64) {
2871        let key = counter.to_string();
2872        match self.counters.get_mut(&key) {
2873            Some(v) => *v += count,
2874            None => {
2875                self.counters.insert(key, count);
2876            }
2877        }
2878    }
2879
2880    /// Sets the value of `counter` to `count`.
2881    fn set_counter(&mut self, counter: Counter, count: i64) {
2882        let key = counter.to_string();
2883        self.counters.insert(key, count);
2884    }
2885
2886    fn signal_sock_drain(&self) {
2887        let mut signal_buf = [0; 1024];
2888
2889        // This recv is non-blocking as the socket is non-blocking.
2890        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2891            trace!(
2892                "signal socket recvd: {}",
2893                String::from_utf8_lossy(&signal_buf[0..sz])
2894            );
2895        }
2896    }
2897
2898    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2899        self.retransmissions.push(ReRun { next_time, command });
2900        self.add_timer(next_time);
2901    }
2902
2903    /// Sends service removal event to listeners for expired service records.
2904    /// `expired`: map of service type domain to set of instance names.
2905    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2906        for (ty_domain, sender) in self.service_queriers.iter() {
2907            if let Some(instances) = expired.get(ty_domain) {
2908                for instance_name in instances {
2909                    let event = ServiceEvent::ServiceRemoved(
2910                        ty_domain.to_string(),
2911                        instance_name.to_string(),
2912                    );
2913                    match sender.send(event) {
2914                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2915                        Err(e) => debug!("Failed to send event: {}", e),
2916                    }
2917                }
2918            }
2919        }
2920    }
2921
2922    /// The entry point that executes all commands received by the daemon.
2923    ///
2924    /// `repeating`: whether this is a retransmission.
2925    fn exec_command(&mut self, command: Command, repeating: bool) {
2926        trace!("exec_command: {:?} repeating: {}", &command, repeating);
2927        match command {
2928            Command::Browse(ty, next_delay, cache_only, listener) => {
2929                self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
2930            }
2931
2932            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2933                self.exec_command_resolve_hostname(
2934                    repeating, hostname, next_delay, listener, timeout,
2935                );
2936            }
2937
2938            Command::Register(service_info) => {
2939                self.register_service(service_info);
2940                self.increase_counter(Counter::Register, 1);
2941            }
2942
2943            Command::RegisterResend(fullname, intf) => {
2944                trace!("register-resend service: {fullname} on {}", &intf);
2945                self.exec_command_register_resend(fullname, intf);
2946            }
2947
2948            Command::Unregister(fullname, resp_s) => {
2949                trace!("unregister service {} repeat {}", &fullname, &repeating);
2950                self.exec_command_unregister(repeating, fullname, resp_s);
2951            }
2952
2953            Command::UnregisterResend(packet, if_index, is_ipv4) => {
2954                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2955            }
2956
2957            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2958
2959            Command::StopResolveHostname(hostname) => {
2960                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2961            }
2962
2963            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2964
2965            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2966
2967            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2968                Ok(()) => trace!("Sent status to the client"),
2969                Err(e) => debug!("Failed to send status: {}", e),
2970            },
2971
2972            Command::Monitor(resp_s) => {
2973                self.monitors.push(resp_s);
2974            }
2975
2976            Command::SetOption(daemon_opt) => {
2977                self.process_set_option(daemon_opt);
2978            }
2979
2980            Command::GetOption(resp_s) => {
2981                let val = DaemonOptionVal {
2982                    _service_name_len_max: self.service_name_len_max,
2983                    ip_check_interval: self.ip_check_interval,
2984                };
2985                if let Err(e) = resp_s.send(val) {
2986                    debug!("Failed to send options: {}", e);
2987                }
2988            }
2989
2990            Command::Verify(instance_fullname, timeout) => {
2991                self.exec_command_verify(instance_fullname, timeout, repeating);
2992            }
2993
2994            _ => {
2995                debug!("unexpected command: {:?}", &command);
2996            }
2997        }
2998    }
2999
3000    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3001        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3002        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3003        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3004        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3005        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3006        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3007        self.set_counter(Counter::Timer, self.timers.len() as i64);
3008
3009        let dns_registry_probe_count: usize = self
3010            .dns_registry_map
3011            .values()
3012            .map(|r| r.probing.len())
3013            .sum();
3014        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3015
3016        let dns_registry_active_count: usize = self
3017            .dns_registry_map
3018            .values()
3019            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3020            .sum();
3021        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3022
3023        let dns_registry_timer_count: usize = self
3024            .dns_registry_map
3025            .values()
3026            .map(|r| r.new_timers.len())
3027            .sum();
3028        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3029
3030        let dns_registry_name_change_count: usize = self
3031            .dns_registry_map
3032            .values()
3033            .map(|r| r.name_changes.len())
3034            .sum();
3035        self.set_counter(
3036            Counter::DnsRegistryNameChange,
3037            dns_registry_name_change_count as i64,
3038        );
3039
3040        // Send the metrics to the client.
3041        if let Err(e) = resp_s.send(self.counters.clone()) {
3042            debug!("Failed to send metrics: {}", e);
3043        }
3044    }
3045
3046    fn exec_command_browse(
3047        &mut self,
3048        repeating: bool,
3049        ty: String,
3050        next_delay: u32,
3051        cache_only: bool,
3052        listener: Sender<ServiceEvent>,
3053    ) {
3054        let pretty_addrs: Vec<String> = self
3055            .my_intfs
3056            .iter()
3057            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3058            .collect();
3059
3060        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3061            "{ty} on {} interfaces [{}]",
3062            pretty_addrs.len(),
3063            pretty_addrs.join(", ")
3064        ))) {
3065            debug!(
3066                "Failed to send SearchStarted({})(repeating:{}): {}",
3067                &ty, repeating, e
3068            );
3069            return;
3070        }
3071
3072        let now = current_time_millis();
3073        if !repeating {
3074            // Binds a `listener` to querying mDNS domain type `ty`.
3075            //
3076            // If there is already a `listener`, it will be updated, i.e. overwritten.
3077            self.service_queriers.insert(ty.clone(), listener.clone());
3078
3079            // if we already have the records in our cache, just send them
3080            self.query_cache_for_service(&ty, &listener, now);
3081        }
3082
3083        if cache_only {
3084            // If cache_only is true, we do not send a query.
3085            match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3086                Ok(()) => debug!("SearchStopped sent for {}", &ty),
3087                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3088            }
3089            return;
3090        }
3091
3092        self.send_query(&ty, RRType::PTR);
3093        self.increase_counter(Counter::Browse, 1);
3094
3095        let next_time = now + (next_delay * 1000) as u64;
3096        let max_delay = 60 * 60;
3097        let delay = cmp::min(next_delay * 2, max_delay);
3098        self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3099    }
3100
3101    fn exec_command_resolve_hostname(
3102        &mut self,
3103        repeating: bool,
3104        hostname: String,
3105        next_delay: u32,
3106        listener: Sender<HostnameResolutionEvent>,
3107        timeout: Option<u64>,
3108    ) {
3109        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3110        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3111            "{} on addrs {:?}",
3112            &hostname, &addr_list
3113        ))) {
3114            debug!(
3115                "Failed to send ResolveStarted({})(repeating:{}): {}",
3116                &hostname, repeating, e
3117            );
3118            return;
3119        }
3120        if !repeating {
3121            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3122            // if we already have the records in our cache, just send them
3123            self.query_cache_for_hostname(&hostname, listener.clone());
3124        }
3125
3126        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3127        self.increase_counter(Counter::ResolveHostname, 1);
3128
3129        let now = current_time_millis();
3130        let next_time = now + u64::from(next_delay) * 1000;
3131        let max_delay = 60 * 60;
3132        let delay = cmp::min(next_delay * 2, max_delay);
3133
3134        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3135        if self
3136            .hostname_resolvers
3137            .get(&hostname)
3138            .and_then(|(_sender, timeout)| *timeout)
3139            .map(|timeout| next_time < timeout)
3140            .unwrap_or(true)
3141        {
3142            self.add_retransmission(
3143                next_time,
3144                Command::ResolveHostname(hostname, delay, listener, None),
3145            );
3146        }
3147    }
3148
3149    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3150        let pending_query = self.query_unresolved(&instance);
3151        let max_try = 3;
3152        if pending_query && try_count < max_try {
3153            // Note that if the current try already succeeds, the next retransmission
3154            // will be no-op as the cache has been updated.
3155            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3156            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3157        }
3158    }
3159
3160    fn exec_command_unregister(
3161        &mut self,
3162        repeating: bool,
3163        fullname: String,
3164        resp_s: Sender<UnregisterStatus>,
3165    ) {
3166        let response = match self.my_services.remove_entry(&fullname) {
3167            None => {
3168                debug!("unregister: cannot find such service {}", &fullname);
3169                UnregisterStatus::NotFound
3170            }
3171            Some((_k, info)) => {
3172                let mut timers = Vec::new();
3173
3174                for (if_index, intf) in self.my_intfs.iter() {
3175                    if let Some(sock) = self.ipv4_sock.as_ref() {
3176                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3177                        // repeat for one time just in case some peers miss the message
3178                        if !repeating && !packet.is_empty() {
3179                            let next_time = current_time_millis() + 120;
3180                            self.retransmissions.push(ReRun {
3181                                next_time,
3182                                command: Command::UnregisterResend(packet, *if_index, true),
3183                            });
3184                            timers.push(next_time);
3185                        }
3186                    }
3187
3188                    // ipv6
3189                    if let Some(sock) = self.ipv6_sock.as_ref() {
3190                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3191                        if !repeating && !packet.is_empty() {
3192                            let next_time = current_time_millis() + 120;
3193                            self.retransmissions.push(ReRun {
3194                                next_time,
3195                                command: Command::UnregisterResend(packet, *if_index, false),
3196                            });
3197                            timers.push(next_time);
3198                        }
3199                    }
3200                }
3201
3202                for t in timers {
3203                    self.add_timer(t);
3204                }
3205
3206                self.increase_counter(Counter::Unregister, 1);
3207                UnregisterStatus::OK
3208            }
3209        };
3210        if let Err(e) = resp_s.send(response) {
3211            debug!("unregister: failed to send response: {}", e);
3212        }
3213    }
3214
3215    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3216        let Some(intf) = self.my_intfs.get(&if_index) else {
3217            return;
3218        };
3219        let sock_opt = if is_ipv4 {
3220            &self.ipv4_sock
3221        } else {
3222            &self.ipv6_sock
3223        };
3224        let Some(sock) = sock_opt else {
3225            return;
3226        };
3227
3228        let if_addr = if is_ipv4 {
3229            match intf.next_ifaddr_v4() {
3230                Some(addr) => addr,
3231                None => return,
3232            }
3233        } else {
3234            match intf.next_ifaddr_v6() {
3235                Some(addr) => addr,
3236                None => return,
3237            }
3238        };
3239
3240        debug!("UnregisterResend from {:?}", if_addr);
3241        multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, &sock.pktinfo);
3242
3243        self.increase_counter(Counter::UnregisterResend, 1);
3244    }
3245
3246    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3247        match self.service_queriers.remove_entry(&ty_domain) {
3248            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3249            Some((ty, sender)) => {
3250                // Remove pending browse commands in the reruns.
3251                trace!("StopBrowse: removed queryer for {}", &ty);
3252                let mut i = 0;
3253                while i < self.retransmissions.len() {
3254                    if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3255                        if t == &ty {
3256                            self.retransmissions.remove(i);
3257                            trace!("StopBrowse: removed retransmission for {}", &ty);
3258                            continue;
3259                        }
3260                    }
3261                    i += 1;
3262                }
3263
3264                // Remove cache entries.
3265                self.cache.remove_service_type(&ty_domain);
3266
3267                // Notify the client.
3268                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3269                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3270                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3271                }
3272            }
3273        }
3274    }
3275
3276    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3277        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3278            // Remove pending resolve commands in the reruns.
3279            trace!("StopResolve: removed queryer for {}", &host);
3280            let mut i = 0;
3281            while i < self.retransmissions.len() {
3282                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3283                    if t == &host {
3284                        self.retransmissions.remove(i);
3285                        trace!("StopResolve: removed retransmission for {}", &host);
3286                        continue;
3287                    }
3288                }
3289                i += 1;
3290            }
3291
3292            // Notify the client.
3293            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3294                Ok(()) => trace!("Sent SearchStopped to the listener"),
3295                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3296            }
3297        }
3298    }
3299
3300    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3301        let Some(info) = self.my_services.get_mut(&fullname) else {
3302            trace!("announce: cannot find such service {}", &fullname);
3303            return;
3304        };
3305
3306        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3307            return;
3308        };
3309
3310        let Some(intf) = self.my_intfs.get(&if_index) else {
3311            return;
3312        };
3313
3314        let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3315            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
3316        } else {
3317            false
3318        };
3319        let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3320            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
3321        } else {
3322            false
3323        };
3324
3325        if announced_v4 || announced_v6 {
3326            let mut hostname = info.get_hostname();
3327            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3328                hostname = new_name;
3329            }
3330            let service_name = match dns_registry.name_changes.get(&fullname) {
3331                Some(new_name) => new_name.to_string(),
3332                None => fullname,
3333            };
3334
3335            debug!("resend: announce service {service_name} on {}", intf.name);
3336
3337            notify_monitors(
3338                &mut self.monitors,
3339                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3340            );
3341            info.set_status(if_index, ServiceStatus::Announced);
3342        } else {
3343            debug!("register-resend should not fail");
3344        }
3345
3346        self.increase_counter(Counter::RegisterResend, 1);
3347    }
3348
3349    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3350        /*
3351        RFC 6762 section 10.4:
3352        ...
3353        When the cache receives this hint that it should reconfirm some
3354        record, it MUST issue two or more queries for the resource record in
3355        dispute.  If no response is received within ten seconds, then, even
3356        though its TTL may indicate that it is not yet due to expire, that
3357        record SHOULD be promptly flushed from the cache.
3358        */
3359        let now = current_time_millis();
3360        let expire_at = if repeating {
3361            None
3362        } else {
3363            Some(now + timeout.as_millis() as u64)
3364        };
3365
3366        // send query for the resource records.
3367        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3368
3369        if !record_vec.is_empty() {
3370            let query_vec: Vec<(&str, RRType)> = record_vec
3371                .iter()
3372                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3373                .collect();
3374            self.send_query_vec(&query_vec);
3375
3376            if let Some(new_expire) = expire_at {
3377                self.add_timer(new_expire); // ensure a check for the new expire time.
3378
3379                // schedule a resend 1 second later
3380                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3381            }
3382        }
3383    }
3384
3385    /// Refresh cached service records with active queriers
3386    fn refresh_active_services(&mut self) {
3387        let mut query_ptr_count = 0;
3388        let mut query_srv_count = 0;
3389        let mut new_timers = HashSet::new();
3390        let mut query_addr_count = 0;
3391
3392        for (ty_domain, _sender) in self.service_queriers.iter() {
3393            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3394            if !refreshed_timers.is_empty() {
3395                trace!("sending refresh query for PTR: {}", ty_domain);
3396                self.send_query(ty_domain, RRType::PTR);
3397                query_ptr_count += 1;
3398                new_timers.extend(refreshed_timers);
3399            }
3400
3401            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3402            for (instance, types) in instances {
3403                trace!("sending refresh query for: {}", &instance);
3404                let query_vec = types
3405                    .into_iter()
3406                    .map(|ty| (instance.as_str(), ty))
3407                    .collect::<Vec<_>>();
3408                self.send_query_vec(&query_vec);
3409                query_srv_count += 1;
3410            }
3411            new_timers.extend(timers);
3412            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3413            for hostname in hostnames.iter() {
3414                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3415                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3416                query_addr_count += 2;
3417            }
3418            new_timers.extend(timers);
3419        }
3420
3421        for timer in new_timers {
3422            self.add_timer(timer);
3423        }
3424
3425        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3426        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3427        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3428    }
3429}
3430
3431/// Adds one or more answers of a service for incoming msg and RR entry name.
3432fn add_answer_of_service(
3433    out: &mut DnsOutgoing,
3434    msg: &DnsIncoming,
3435    entry_name: &str,
3436    service: &ServiceInfo,
3437    qtype: RRType,
3438    intf_addrs: Vec<IpAddr>,
3439) {
3440    if qtype == RRType::SRV || qtype == RRType::ANY {
3441        out.add_answer(
3442            msg,
3443            DnsSrv::new(
3444                entry_name,
3445                CLASS_IN | CLASS_CACHE_FLUSH,
3446                service.get_host_ttl(),
3447                service.get_priority(),
3448                service.get_weight(),
3449                service.get_port(),
3450                service.get_hostname().to_string(),
3451            ),
3452        );
3453    }
3454
3455    if qtype == RRType::TXT || qtype == RRType::ANY {
3456        out.add_answer(
3457            msg,
3458            DnsTxt::new(
3459                entry_name,
3460                CLASS_IN | CLASS_CACHE_FLUSH,
3461                service.get_other_ttl(),
3462                service.generate_txt(),
3463            ),
3464        );
3465    }
3466
3467    if qtype == RRType::SRV {
3468        for address in intf_addrs {
3469            out.add_additional_answer(DnsAddress::new(
3470                service.get_hostname(),
3471                ip_address_rr_type(&address),
3472                CLASS_IN | CLASS_CACHE_FLUSH,
3473                service.get_host_ttl(),
3474                address,
3475                InterfaceId::default(),
3476            ));
3477        }
3478    }
3479}
3480
3481/// All possible events sent to the client from the daemon
3482/// regarding service discovery.
3483#[derive(Clone, Debug)]
3484#[non_exhaustive]
3485pub enum ServiceEvent {
3486    /// Started searching for a service type.
3487    SearchStarted(String),
3488
3489    /// Found a specific (service_type, fullname).
3490    ServiceFound(String, String),
3491
3492    /// Resolved a service instance in a ResolvedService struct.
3493    ServiceResolved(Box<ResolvedService>),
3494
3495    /// A service instance (service_type, fullname) was removed.
3496    ServiceRemoved(String, String),
3497
3498    /// Stopped searching for a service type.
3499    SearchStopped(String),
3500}
3501
3502/// All possible events sent to the client from the daemon
3503/// regarding host resolution.
3504#[derive(Clone, Debug)]
3505#[non_exhaustive]
3506pub enum HostnameResolutionEvent {
3507    /// Started searching for the ip address of a hostname.
3508    SearchStarted(String),
3509    /// One or more addresses for a hostname has been found.
3510    AddressesFound(String, HashSet<ScopedIp>),
3511    /// One or more addresses for a hostname has been removed.
3512    AddressesRemoved(String, HashSet<ScopedIp>),
3513    /// The search for the ip address of a hostname has timed out.
3514    SearchTimeout(String),
3515    /// Stopped searching for the ip address of a hostname.
3516    SearchStopped(String),
3517}
3518
3519/// Some notable events from the daemon besides [`ServiceEvent`].
3520/// These events are expected to happen infrequently.
3521#[derive(Clone, Debug)]
3522#[non_exhaustive]
3523pub enum DaemonEvent {
3524    /// Daemon unsolicitly announced a service from an interface.
3525    Announce(String, String),
3526
3527    /// Daemon encountered an error.
3528    Error(Error),
3529
3530    /// Daemon detected a new IP address from the host.
3531    IpAdd(IpAddr),
3532
3533    /// Daemon detected a IP address removed from the host.
3534    IpDel(IpAddr),
3535
3536    /// Daemon resolved a name conflict by changing one of its names.
3537    /// see [DnsNameChange] for more details.
3538    NameChange(DnsNameChange),
3539
3540    /// Send out a multicast response via an interface.
3541    Respond(String),
3542}
3543
3544/// Represents a name change due to a name conflict resolution.
3545/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3546#[derive(Clone, Debug)]
3547pub struct DnsNameChange {
3548    /// The original name set in `ServiceInfo` by the user.
3549    pub original: String,
3550
3551    /// A new name is created by appending a suffix after the original name.
3552    ///
3553    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3554    /// - for a host name, the suffix is `-N`, where N starts at 2.
3555    ///
3556    /// For example:
3557    ///
3558    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3559    /// - Host name `foo.local.` becomes `foo-2.local.`
3560    pub new_name: String,
3561
3562    /// The resource record type
3563    pub rr_type: RRType,
3564
3565    /// The interface where the name conflict and its change happened.
3566    pub intf_name: String,
3567}
3568
3569/// Commands supported by the daemon
3570#[derive(Debug)]
3571enum Command {
3572    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3573    Browse(String, u32, bool, Sender<ServiceEvent>),
3574
3575    /// Resolve a hostname to IP addresses.
3576    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3577
3578    /// Register a service
3579    Register(ServiceInfo),
3580
3581    /// Unregister a service
3582    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3583
3584    /// Announce again a service to local network
3585    RegisterResend(String, u32), // (fullname)
3586
3587    /// Resend unregister packet.
3588    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
3589
3590    /// Stop browsing a service type
3591    StopBrowse(String), // (ty_domain)
3592
3593    /// Stop resolving a hostname
3594    StopResolveHostname(String), // (hostname)
3595
3596    /// Send query to resolve a service instance.
3597    /// This is used when a PTR record exists but SRV & TXT records are missing.
3598    Resolve(String, u16), // (service_instance_fullname, try_count)
3599
3600    /// Read the current values of the counters
3601    GetMetrics(Sender<Metrics>),
3602
3603    /// Get the current status of the daemon.
3604    GetStatus(Sender<DaemonStatus>),
3605
3606    /// Monitor noticeable events in the daemon.
3607    Monitor(Sender<DaemonEvent>),
3608
3609    SetOption(DaemonOption),
3610
3611    GetOption(Sender<DaemonOptionVal>),
3612
3613    /// Proactively confirm a DNS resource record.
3614    ///
3615    /// The intention is to check if a service name or IP address still valid
3616    /// before its TTL expires.
3617    Verify(String, Duration),
3618
3619    Exit(Sender<DaemonStatus>),
3620}
3621
3622impl fmt::Display for Command {
3623    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3624        match self {
3625            Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3626            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3627            Self::Exit(_) => write!(f, "Command Exit"),
3628            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3629            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3630            Self::Monitor(_) => write!(f, "Command Monitor"),
3631            Self::Register(_) => write!(f, "Command Register"),
3632            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3633            Self::SetOption(_) => write!(f, "Command SetOption"),
3634            Self::GetOption(_) => write!(f, "Command GetOption"),
3635            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3636            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3637            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3638            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3639            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3640            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3641        }
3642    }
3643}
3644
3645struct DaemonOptionVal {
3646    _service_name_len_max: u8,
3647    ip_check_interval: u64,
3648}
3649
3650#[derive(Debug)]
3651enum DaemonOption {
3652    ServiceNameLenMax(u8),
3653    IpCheckInterval(u64),
3654    EnableInterface(Vec<IfKind>),
3655    DisableInterface(Vec<IfKind>),
3656    MulticastLoopV4(bool),
3657    MulticastLoopV6(bool),
3658    AcceptUnsolicited(bool),
3659    #[cfg(test)]
3660    TestDownInterface(String),
3661    #[cfg(test)]
3662    TestUpInterface(String),
3663}
3664
3665/// The length of Service Domain name supported in this lib.
3666const DOMAIN_LEN: usize = "._tcp.local.".len();
3667
3668/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3669fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3670    if ty_domain.len() <= DOMAIN_LEN + 1 {
3671        // service name cannot be empty or only '_'.
3672        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3673    }
3674
3675    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3676    if service_name_len > limit as usize {
3677        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3678    }
3679    Ok(())
3680}
3681
3682/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3683fn check_domain_suffix(name: &str) -> Result<()> {
3684    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3685        return Err(e_fmt!(
3686            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3687            name
3688        ));
3689    }
3690
3691    Ok(())
3692}
3693
3694/// Validate the service name in a fully qualified name.
3695///
3696/// A Full Name = <Instance>.<Service>.<Domain>
3697/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3698///
3699/// Note: this function does not check for the length of the service name.
3700/// Instead, `register_service` method will check the length.
3701fn check_service_name(fullname: &str) -> Result<()> {
3702    check_domain_suffix(fullname)?;
3703
3704    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3705    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3706
3707    if &name[0..1] != "_" {
3708        return Err(e_fmt!("Service name must start with '_'"));
3709    }
3710
3711    let name = &name[1..];
3712
3713    if name.contains("--") {
3714        return Err(e_fmt!("Service name must not contain '--'"));
3715    }
3716
3717    if name.starts_with('-') || name.ends_with('-') {
3718        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3719    }
3720
3721    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3722    if ascii_count < 1 {
3723        return Err(e_fmt!(
3724            "Service name must contain at least one letter (eg: 'A-Za-z')"
3725        ));
3726    }
3727
3728    Ok(())
3729}
3730
3731/// Validate a hostname.
3732fn check_hostname(hostname: &str) -> Result<()> {
3733    if !hostname.ends_with(".local.") {
3734        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3735    }
3736
3737    if hostname == ".local." {
3738        return Err(e_fmt!(
3739            "The part of the hostname before '.local.' cannot be empty"
3740        ));
3741    }
3742
3743    if hostname.len() > 255 {
3744        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3745    }
3746
3747    Ok(())
3748}
3749
3750fn call_service_listener(
3751    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3752    ty_domain: &str,
3753    event: ServiceEvent,
3754) {
3755    if let Some(listener) = listeners_map.get(ty_domain) {
3756        match listener.send(event) {
3757            Ok(()) => trace!("Sent event to listener successfully"),
3758            Err(e) => debug!("Failed to send event: {}", e),
3759        }
3760    }
3761}
3762
3763fn call_hostname_resolution_listener(
3764    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3765    hostname: &str,
3766    event: HostnameResolutionEvent,
3767) {
3768    let hostname_lower = hostname.to_lowercase();
3769    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3770        match listener.send(event) {
3771            Ok(()) => trace!("Sent event to listener successfully"),
3772            Err(e) => debug!("Failed to send event: {}", e),
3773        }
3774    }
3775}
3776
3777/// Returns valid network interfaces in the host system.
3778/// Operational down interfaces are excluded.
3779/// Loopback interfaces are excluded if `with_loopback` is false.
3780fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3781    if_addrs::get_if_addrs()
3782        .unwrap_or_default()
3783        .into_iter()
3784        .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3785        .collect()
3786}
3787
3788fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3789    let if_name = &my_intf.name;
3790
3791    let if_addr = if sock.domain() == Domain::IPV4 {
3792        match my_intf.next_ifaddr_v4() {
3793            Some(addr) => addr,
3794            None => return vec![],
3795        }
3796    } else {
3797        match my_intf.next_ifaddr_v6() {
3798            Some(addr) => addr,
3799            None => return vec![],
3800        }
3801    };
3802
3803    send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3804}
3805
3806/// Send an outgoing mDNS query or response, and returns the packet bytes.
3807fn send_dns_outgoing_impl(
3808    out: &DnsOutgoing,
3809    if_name: &str,
3810    if_index: u32,
3811    if_addr: &IfAddr,
3812    sock: &PktInfoUdpSocket,
3813) -> Vec<Vec<u8>> {
3814    let qtype = if out.is_query() {
3815        "query"
3816    } else {
3817        if out.answers_count() == 0 && out.additionals().is_empty() {
3818            return vec![]; // no need to send empty response
3819        }
3820        "response"
3821    };
3822    trace!(
3823        "send {}: {} questions {} answers {} authorities {} additional",
3824        qtype,
3825        out.questions().len(),
3826        out.answers_count(),
3827        out.authorities().len(),
3828        out.additionals().len()
3829    );
3830
3831    match if_addr.ip() {
3832        IpAddr::V4(ipv4) => {
3833            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3834                debug!(
3835                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3836                    ipv4, e
3837                );
3838                return vec![]; // cannot send without a valid interface
3839            }
3840        }
3841        IpAddr::V6(ipv6) => {
3842            if let Err(e) = sock.set_multicast_if_v6(if_index) {
3843                debug!(
3844                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3845                    ipv6, e
3846                );
3847                return vec![]; // cannot send without a valid interface
3848            }
3849        }
3850    }
3851
3852    let packet_list = out.to_data_on_wire();
3853    for packet in packet_list.iter() {
3854        multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3855    }
3856    packet_list
3857}
3858
3859/// Sends a multicast packet, and returns the packet bytes.
3860fn multicast_on_intf(
3861    packet: &[u8],
3862    if_name: &str,
3863    if_index: u32,
3864    if_addr: &IfAddr,
3865    socket: &PktInfoUdpSocket,
3866) {
3867    if packet.len() > MAX_MSG_ABSOLUTE {
3868        debug!("Drop over-sized packet ({})", packet.len());
3869        return;
3870    }
3871
3872    let addr: SocketAddr = match if_addr {
3873        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3874        if_addrs::IfAddr::V6(_) => {
3875            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3876            sock.set_scope_id(if_index); // Choose iface for multicast
3877            sock.into()
3878        }
3879    };
3880
3881    // Sends out `packet` to `addr` on the socket.
3882    let sock_addr = addr.into();
3883    match socket.send_to(packet, &sock_addr) {
3884        Ok(sz) => trace!(
3885            "sent out {} bytes on interface {} (idx {}) addr {}",
3886            sz,
3887            if_name,
3888            if_index,
3889            if_addr.ip()
3890        ),
3891        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3892    }
3893}
3894
3895/// Returns true if `name` is a valid instance name of format:
3896/// <instance>.<service_type>.<_udp|_tcp>.local.
3897/// Note: <instance> could contain '.' as well.
3898fn valid_instance_name(name: &str) -> bool {
3899    name.split('.').count() >= 5
3900}
3901
3902fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3903    monitors.retain(|sender| {
3904        if let Err(e) = sender.try_send(event.clone()) {
3905            debug!("notify_monitors: try_send: {}", &e);
3906            if matches!(e, TrySendError::Disconnected(_)) {
3907                return false; // This monitor is dropped.
3908            }
3909        }
3910        true
3911    });
3912}
3913
3914/// Check if all unique records passed "probing", and if yes, create a packet
3915/// to announce the service.
3916fn prepare_announce(
3917    info: &ServiceInfo,
3918    intf: &MyIntf,
3919    dns_registry: &mut DnsRegistry,
3920    is_ipv4: bool,
3921) -> Option<DnsOutgoing> {
3922    let intf_addrs = if is_ipv4 {
3923        info.get_addrs_on_my_intf_v4(intf)
3924    } else {
3925        info.get_addrs_on_my_intf_v6(intf)
3926    };
3927
3928    if intf_addrs.is_empty() {
3929        debug!(
3930            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3931            &intf.name
3932        );
3933        return None;
3934    }
3935
3936    // check if we changed our name due to conflicts.
3937    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3938        Some(new_name) => new_name,
3939        None => info.get_fullname(),
3940    };
3941
3942    debug!(
3943        "prepare to announce service {service_fullname} on {:?}",
3944        &intf_addrs
3945    );
3946
3947    let mut probing_count = 0;
3948    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3949    let create_time = current_time_millis() + fastrand::u64(0..250);
3950
3951    out.add_answer_at_time(
3952        DnsPointer::new(
3953            info.get_type(),
3954            RRType::PTR,
3955            CLASS_IN,
3956            info.get_other_ttl(),
3957            service_fullname.to_string(),
3958        ),
3959        0,
3960    );
3961
3962    if let Some(sub) = info.get_subtype() {
3963        trace!("Adding subdomain {}", sub);
3964        out.add_answer_at_time(
3965            DnsPointer::new(
3966                sub,
3967                RRType::PTR,
3968                CLASS_IN,
3969                info.get_other_ttl(),
3970                service_fullname.to_string(),
3971            ),
3972            0,
3973        );
3974    }
3975
3976    // SRV records.
3977    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3978        Some(new_name) => new_name.to_string(),
3979        None => info.get_hostname().to_string(),
3980    };
3981
3982    let mut srv = DnsSrv::new(
3983        info.get_fullname(),
3984        CLASS_IN | CLASS_CACHE_FLUSH,
3985        info.get_host_ttl(),
3986        info.get_priority(),
3987        info.get_weight(),
3988        info.get_port(),
3989        hostname,
3990    );
3991
3992    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3993        srv.get_record_mut().set_new_name(new_name.to_string());
3994    }
3995
3996    if !info.requires_probe()
3997        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3998    {
3999        out.add_answer_at_time(srv, 0);
4000    } else {
4001        probing_count += 1;
4002    }
4003
4004    // TXT records.
4005
4006    let mut txt = DnsTxt::new(
4007        info.get_fullname(),
4008        CLASS_IN | CLASS_CACHE_FLUSH,
4009        info.get_other_ttl(),
4010        info.generate_txt(),
4011    );
4012
4013    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4014        txt.get_record_mut().set_new_name(new_name.to_string());
4015    }
4016
4017    if !info.requires_probe()
4018        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4019    {
4020        out.add_answer_at_time(txt, 0);
4021    } else {
4022        probing_count += 1;
4023    }
4024
4025    // Address records. (A and AAAA)
4026
4027    let hostname = info.get_hostname();
4028    for address in intf_addrs {
4029        let mut dns_addr = DnsAddress::new(
4030            hostname,
4031            ip_address_rr_type(&address),
4032            CLASS_IN | CLASS_CACHE_FLUSH,
4033            info.get_host_ttl(),
4034            address,
4035            intf.into(),
4036        );
4037
4038        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4039            dns_addr.get_record_mut().set_new_name(new_name.to_string());
4040        }
4041
4042        if !info.requires_probe()
4043            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4044        {
4045            out.add_answer_at_time(dns_addr, 0);
4046        } else {
4047            probing_count += 1;
4048        }
4049    }
4050
4051    if probing_count > 0 {
4052        return None;
4053    }
4054
4055    Some(out)
4056}
4057
4058/// Send an unsolicited response for owned service via `intf` and `sock`.
4059/// Returns true if sent out successfully for IPv4 or IPv6.
4060fn announce_service_on_intf(
4061    dns_registry: &mut DnsRegistry,
4062    info: &ServiceInfo,
4063    intf: &MyIntf,
4064    sock: &PktInfoUdpSocket,
4065) -> bool {
4066    let is_ipv4 = sock.domain() == Domain::IPV4;
4067    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4068        send_dns_outgoing(&out, intf, sock);
4069        return true;
4070    }
4071
4072    false
4073}
4074
4075/// Returns a new name based on the `original` to avoid conflicts.
4076/// If the name already contains a number in parentheses, increments that number.
4077///
4078/// Examples:
4079/// - `foo.local.` becomes `foo (2).local.`
4080/// - `foo (2).local.` becomes `foo (3).local.`
4081/// - `foo (9)` becomes `foo (10)`
4082fn name_change(original: &str) -> String {
4083    let mut parts: Vec<_> = original.split('.').collect();
4084    let Some(first_part) = parts.get_mut(0) else {
4085        return format!("{original} (2)");
4086    };
4087
4088    let mut new_name = format!("{first_part} (2)");
4089
4090    // check if there is already has `(<num>)` suffix.
4091    if let Some(paren_pos) = first_part.rfind(" (") {
4092        // Check if there's a closing parenthesis
4093        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4094            let absolute_end_pos = paren_pos + end_paren;
4095            // Only process if the closing parenthesis is the last character
4096            if absolute_end_pos == first_part.len() - 1 {
4097                let num_start = paren_pos + 2; // Skip " ("
4098                                               // Try to parse the number between parentheses
4099                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4100                    let base_name = &first_part[..paren_pos];
4101                    new_name = format!("{} ({})", base_name, number + 1)
4102                }
4103            }
4104        }
4105    }
4106
4107    *first_part = &new_name;
4108    parts.join(".")
4109}
4110
4111/// Returns a new name based on the `original` to avoid conflicts.
4112/// If the name already contains a hyphenated number, increments that number.
4113///
4114/// Examples:
4115/// - `foo.local.` becomes `foo-2.local.`
4116/// - `foo-2.local.` becomes `foo-3.local.`
4117/// - `foo` becomes `foo-2`
4118fn hostname_change(original: &str) -> String {
4119    let mut parts: Vec<_> = original.split('.').collect();
4120    let Some(first_part) = parts.get_mut(0) else {
4121        return format!("{original}-2");
4122    };
4123
4124    let mut new_name = format!("{first_part}-2");
4125
4126    // check if there is already a `-<num>` suffix
4127    if let Some(hyphen_pos) = first_part.rfind('-') {
4128        // Try to parse everything after the hyphen as a number
4129        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4130            let base_name = &first_part[..hyphen_pos];
4131            new_name = format!("{}-{}", base_name, number + 1);
4132        }
4133    }
4134
4135    *first_part = &new_name;
4136    parts.join(".")
4137}
4138
4139fn add_answer_with_additionals(
4140    out: &mut DnsOutgoing,
4141    msg: &DnsIncoming,
4142    service: &ServiceInfo,
4143    intf: &MyIntf,
4144    dns_registry: &DnsRegistry,
4145    is_ipv4: bool,
4146) {
4147    let intf_addrs = if is_ipv4 {
4148        service.get_addrs_on_my_intf_v4(intf)
4149    } else {
4150        service.get_addrs_on_my_intf_v6(intf)
4151    };
4152    if intf_addrs.is_empty() {
4153        trace!("No addrs on LAN of intf {:?}", intf);
4154        return;
4155    }
4156
4157    // check if we changed our name due to conflicts.
4158    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4159        Some(new_name) => new_name,
4160        None => service.get_fullname(),
4161    };
4162
4163    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4164        Some(new_name) => new_name,
4165        None => service.get_hostname(),
4166    };
4167
4168    let ptr_added = out.add_answer(
4169        msg,
4170        DnsPointer::new(
4171            service.get_type(),
4172            RRType::PTR,
4173            CLASS_IN,
4174            service.get_other_ttl(),
4175            service_fullname.to_string(),
4176        ),
4177    );
4178
4179    if !ptr_added {
4180        trace!("answer was not added for msg {:?}", msg);
4181        return;
4182    }
4183
4184    if let Some(sub) = service.get_subtype() {
4185        trace!("Adding subdomain {}", sub);
4186        out.add_additional_answer(DnsPointer::new(
4187            sub,
4188            RRType::PTR,
4189            CLASS_IN,
4190            service.get_other_ttl(),
4191            service_fullname.to_string(),
4192        ));
4193    }
4194
4195    // Add recommended additional answers according to
4196    // https://tools.ietf.org/html/rfc6763#section-12.1.
4197    out.add_additional_answer(DnsSrv::new(
4198        service_fullname,
4199        CLASS_IN | CLASS_CACHE_FLUSH,
4200        service.get_host_ttl(),
4201        service.get_priority(),
4202        service.get_weight(),
4203        service.get_port(),
4204        hostname.to_string(),
4205    ));
4206
4207    out.add_additional_answer(DnsTxt::new(
4208        service_fullname,
4209        CLASS_IN | CLASS_CACHE_FLUSH,
4210        service.get_other_ttl(),
4211        service.generate_txt(),
4212    ));
4213
4214    for address in intf_addrs {
4215        out.add_additional_answer(DnsAddress::new(
4216            hostname,
4217            ip_address_rr_type(&address),
4218            CLASS_IN | CLASS_CACHE_FLUSH,
4219            service.get_host_ttl(),
4220            address,
4221            intf.into(),
4222        ));
4223    }
4224}
4225
4226/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4227/// that are finished.
4228fn check_probing(
4229    dns_registry: &mut DnsRegistry,
4230    timers: &mut BinaryHeap<Reverse<u64>>,
4231    now: u64,
4232) -> (DnsOutgoing, Vec<String>) {
4233    let mut expired_probes = Vec::new();
4234    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4235
4236    for (name, probe) in dns_registry.probing.iter_mut() {
4237        if now >= probe.next_send {
4238            if probe.expired(now) {
4239                // move the record to active
4240                expired_probes.push(name.clone());
4241            } else {
4242                out.add_question(name, RRType::ANY);
4243
4244                /*
4245                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4246                ...
4247                for tiebreaking to work correctly in all
4248                cases, the Authority Section must contain *all* the records and
4249                proposed rdata being probed for uniqueness.
4250                    */
4251                for record in probe.records.iter() {
4252                    out.add_authority(record.clone());
4253                }
4254
4255                probe.update_next_send(now);
4256
4257                // add timer
4258                timers.push(Reverse(probe.next_send));
4259            }
4260        }
4261    }
4262
4263    (out, expired_probes)
4264}
4265
4266/// Process expired probes on an interface and return a list of services
4267/// that are waiting for the probe to finish.
4268///
4269/// `DnsNameChange` events are sent to the monitors.
4270fn handle_expired_probes(
4271    expired_probes: Vec<String>,
4272    intf_name: &str,
4273    dns_registry: &mut DnsRegistry,
4274    monitors: &mut Vec<Sender<DaemonEvent>>,
4275) -> HashSet<String> {
4276    let mut waiting_services = HashSet::new();
4277
4278    for name in expired_probes {
4279        let Some(probe) = dns_registry.probing.remove(&name) else {
4280            continue;
4281        };
4282
4283        // send notifications about name changes
4284        for record in probe.records.iter() {
4285            if let Some(new_name) = record.get_record().get_new_name() {
4286                dns_registry
4287                    .name_changes
4288                    .insert(name.clone(), new_name.to_string());
4289
4290                let event = DnsNameChange {
4291                    original: record.get_record().get_original_name().to_string(),
4292                    new_name: new_name.to_string(),
4293                    rr_type: record.get_type(),
4294                    intf_name: intf_name.to_string(),
4295                };
4296                debug!("Name change event: {:?}", &event);
4297                notify_monitors(monitors, DaemonEvent::NameChange(event));
4298            }
4299        }
4300
4301        // move RR from probe to active.
4302        debug!(
4303            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4304            probe.records.len(),
4305            probe.waiting_services.len(),
4306        );
4307
4308        // Move records to active and plan to wake up services if records are not empty.
4309        if !probe.records.is_empty() {
4310            match dns_registry.active.get_mut(&name) {
4311                Some(records) => {
4312                    records.extend(probe.records);
4313                }
4314                None => {
4315                    dns_registry.active.insert(name, probe.records);
4316                }
4317            }
4318
4319            waiting_services.extend(probe.waiting_services);
4320        }
4321    }
4322
4323    waiting_services
4324}
4325
4326#[cfg(test)]
4327mod tests {
4328    use super::{
4329        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4330        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4331        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
4332        MDNS_PORT,
4333    };
4334    use crate::{
4335        dns_parser::{
4336            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4337            FLAGS_AA, FLAGS_QR_RESPONSE,
4338        },
4339        service_daemon::{add_answer_of_service, check_hostname},
4340    };
4341    use std::{
4342        net::{SocketAddr, SocketAddrV4},
4343        time::{Duration, SystemTime},
4344    };
4345    use test_log::test;
4346
4347    #[test]
4348    fn test_socketaddr_print() {
4349        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
4350        let print = format!("{}", addr);
4351        assert_eq!(print, "224.0.0.251:5353");
4352    }
4353
4354    #[test]
4355    fn test_instance_name() {
4356        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4357        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4358        assert!(!valid_instance_name("_printer._tcp.local."));
4359    }
4360
4361    #[test]
4362    fn test_check_service_name_length() {
4363        let result = check_service_name_length("_tcp", 100);
4364        assert!(result.is_err());
4365        if let Err(e) = result {
4366            println!("{}", e);
4367        }
4368    }
4369
4370    #[test]
4371    fn test_check_hostname() {
4372        // valid hostnames
4373        for hostname in &[
4374            "my_host.local.",
4375            &("A".repeat(255 - ".local.".len()) + ".local."),
4376        ] {
4377            let result = check_hostname(hostname);
4378            assert!(result.is_ok());
4379        }
4380
4381        // erroneous hostnames
4382        for hostname in &[
4383            "my_host.local",
4384            ".local.",
4385            &("A".repeat(256 - ".local.".len()) + ".local."),
4386        ] {
4387            let result = check_hostname(hostname);
4388            assert!(result.is_err());
4389            if let Err(e) = result {
4390                println!("{}", e);
4391            }
4392        }
4393    }
4394
4395    #[test]
4396    fn test_check_domain_suffix() {
4397        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4398        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4399        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4400        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4401        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4402        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4403    }
4404
4405    #[test]
4406    fn test_service_with_temporarily_invalidated_ptr() {
4407        // Create a daemon
4408        let d = ServiceDaemon::new().expect("Failed to create daemon");
4409
4410        let service = "_test_inval_ptr._udp.local.";
4411        let host_name = "my_host_tmp_invalidated_ptr.local.";
4412        let intfs: Vec<_> = my_ip_interfaces(false);
4413        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4414        let port = 5201;
4415        let my_service =
4416            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4417                .expect("invalid service info")
4418                .enable_addr_auto();
4419        let result = d.register(my_service.clone());
4420        assert!(result.is_ok());
4421
4422        // Browse for a service
4423        let browse_chan = d.browse(service).unwrap();
4424        let timeout = Duration::from_secs(2);
4425        let mut resolved = false;
4426
4427        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4428            match event {
4429                ServiceEvent::ServiceResolved(info) => {
4430                    resolved = true;
4431                    println!("Resolved a service of {}", &info.fullname);
4432                    break;
4433                }
4434                e => {
4435                    println!("Received event {:?}", e);
4436                }
4437            }
4438        }
4439
4440        assert!(resolved);
4441
4442        println!("Stopping browse of {}", service);
4443        // Pause browsing so restarting will cause a new immediate query.
4444        // Unregistering will not work here, it will invalidate all the records.
4445        d.stop_browse(service).unwrap();
4446
4447        // Ensure the search is stopped.
4448        // Reduces the chance of receiving an answer adding the ptr back to the
4449        // cache causing the later browse to return directly from the cache.
4450        // (which invalidates what this test is trying to test for.)
4451        let mut stopped = false;
4452        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4453            match event {
4454                ServiceEvent::SearchStopped(_) => {
4455                    stopped = true;
4456                    println!("Stopped browsing service");
4457                    break;
4458                }
4459                // Other `ServiceResolved` messages may be received
4460                // here as they come from different interfaces.
4461                // That's fine for this test.
4462                e => {
4463                    println!("Received event {:?}", e);
4464                }
4465            }
4466        }
4467
4468        assert!(stopped);
4469
4470        // Invalidate the ptr from the service to the host.
4471        let invalidate_ptr_packet = DnsPointer::new(
4472            my_service.get_type(),
4473            RRType::PTR,
4474            CLASS_IN,
4475            0,
4476            my_service.get_fullname().to_string(),
4477        );
4478
4479        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4480        packet_buffer.add_additional_answer(invalidate_ptr_packet);
4481
4482        for intf in intfs {
4483            let sock = _new_socket_bind(&intf, true).unwrap();
4484            send_dns_outgoing_impl(
4485                &packet_buffer,
4486                &intf.name,
4487                intf.index.unwrap_or(0),
4488                &intf.addr,
4489                &sock.pktinfo,
4490            );
4491        }
4492
4493        println!(
4494            "Sent PTR record invalidation. Starting second browse for {}",
4495            service
4496        );
4497
4498        // Restart the browse to force the sender to re-send the announcements.
4499        let browse_chan = d.browse(service).unwrap();
4500
4501        resolved = false;
4502        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4503            match event {
4504                ServiceEvent::ServiceResolved(info) => {
4505                    resolved = true;
4506                    println!("Resolved a service of {}", &info.fullname);
4507                    break;
4508                }
4509                e => {
4510                    println!("Received event {:?}", e);
4511                }
4512            }
4513        }
4514
4515        assert!(resolved);
4516        d.shutdown().unwrap();
4517    }
4518
4519    #[test]
4520    fn test_expired_srv() {
4521        // construct service info
4522        let service_type = "_expired-srv._udp.local.";
4523        let instance = "test_instance";
4524        let host_name = "expired_srv_host.local.";
4525        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4526            .unwrap()
4527            .enable_addr_auto();
4528        // let fullname = my_service.get_fullname().to_string();
4529
4530        // set SRV to expire soon.
4531        let new_ttl = 3; // for testing only.
4532        my_service._set_host_ttl(new_ttl);
4533
4534        // register my service
4535        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4536        let result = mdns_server.register(my_service);
4537        assert!(result.is_ok());
4538
4539        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4540        let browse_chan = mdns_client.browse(service_type).unwrap();
4541        let timeout = Duration::from_secs(2);
4542        let mut resolved = false;
4543
4544        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4545            match event {
4546                ServiceEvent::ServiceResolved(info) => {
4547                    resolved = true;
4548                    println!("Resolved a service of {}", &info.fullname);
4549                    break;
4550                }
4551                _ => {}
4552            }
4553        }
4554
4555        assert!(resolved);
4556
4557        // Exit the server so that no more responses.
4558        mdns_server.shutdown().unwrap();
4559
4560        // SRV record in the client cache will expire.
4561        let expire_timeout = Duration::from_secs(new_ttl as u64);
4562        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4563            match event {
4564                ServiceEvent::ServiceRemoved(service_type, full_name) => {
4565                    println!("Service removed: {}: {}", &service_type, &full_name);
4566                    break;
4567                }
4568                _ => {}
4569            }
4570        }
4571    }
4572
4573    #[test]
4574    fn test_hostname_resolution_address_removed() {
4575        // Create a mDNS server
4576        let server = ServiceDaemon::new().expect("Failed to create server");
4577        let hostname = "addr_remove_host._tcp.local.";
4578        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4579            .iter()
4580            .find(|iface| iface.ip().is_ipv4())
4581            .map(|iface| iface.ip().into())
4582            .unwrap();
4583
4584        let mut my_service = ServiceInfo::new(
4585            "_host_res_test._tcp.local.",
4586            "my_instance",
4587            hostname,
4588            &service_ip_addr.to_ip_addr(),
4589            1234,
4590            None,
4591        )
4592        .expect("invalid service info");
4593
4594        // Set a short TTL for addresses for testing.
4595        let addr_ttl = 2;
4596        my_service._set_host_ttl(addr_ttl); // Expire soon
4597
4598        server.register(my_service).unwrap();
4599
4600        // Create a mDNS client for resolving the hostname.
4601        let client = ServiceDaemon::new().expect("Failed to create client");
4602        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4603        let resolved = loop {
4604            match event_receiver.recv() {
4605                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4606                    assert!(found_hostname == hostname);
4607                    assert!(addresses.contains(&service_ip_addr));
4608                    println!("address found: {:?}", &addresses);
4609                    break true;
4610                }
4611                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4612                Ok(_event) => {}
4613                Err(_) => break false,
4614            }
4615        };
4616
4617        assert!(resolved);
4618
4619        // Shutdown the server so no more responses / refreshes for addresses.
4620        server.shutdown().unwrap();
4621
4622        // Wait till hostname address record expires, with 1 second grace period.
4623        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4624        let removed = loop {
4625            match event_receiver.recv_timeout(timeout) {
4626                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4627                    assert!(removed_host == hostname);
4628                    assert!(addresses.contains(&service_ip_addr));
4629
4630                    println!(
4631                        "address removed: hostname: {} addresses: {:?}",
4632                        &hostname, &addresses
4633                    );
4634                    break true;
4635                }
4636                Ok(_event) => {}
4637                Err(_) => {
4638                    break false;
4639                }
4640            }
4641        };
4642
4643        assert!(removed);
4644
4645        client.shutdown().unwrap();
4646    }
4647
4648    #[test]
4649    fn test_refresh_ptr() {
4650        // construct service info
4651        let service_type = "_refresh-ptr._udp.local.";
4652        let instance = "test_instance";
4653        let host_name = "refresh_ptr_host.local.";
4654        let service_ip_addr = my_ip_interfaces(false)
4655            .iter()
4656            .find(|iface| iface.ip().is_ipv4())
4657            .map(|iface| iface.ip())
4658            .unwrap();
4659
4660        let mut my_service = ServiceInfo::new(
4661            service_type,
4662            instance,
4663            host_name,
4664            &service_ip_addr,
4665            5023,
4666            None,
4667        )
4668        .unwrap();
4669
4670        let new_ttl = 3; // for testing only.
4671        my_service._set_other_ttl(new_ttl);
4672
4673        // register my service
4674        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4675        let result = mdns_server.register(my_service);
4676        assert!(result.is_ok());
4677
4678        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4679        let browse_chan = mdns_client.browse(service_type).unwrap();
4680        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4681        let mut resolved = false;
4682
4683        // resolve the service first.
4684        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4685            match event {
4686                ServiceEvent::ServiceResolved(info) => {
4687                    resolved = true;
4688                    println!("Resolved a service of {}", &info.fullname);
4689                    break;
4690                }
4691                _ => {}
4692            }
4693        }
4694
4695        assert!(resolved);
4696
4697        // wait over 80% of TTL, and refresh PTR should be sent out.
4698        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4699        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4700            println!("event: {:?}", &event);
4701        }
4702
4703        // verify refresh counter.
4704        let metrics_chan = mdns_client.get_metrics().unwrap();
4705        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4706        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4707        assert_eq!(ptr_refresh_counter, 1);
4708        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4709        assert_eq!(srvtxt_refresh_counter, 1);
4710
4711        // Exit the server so that no more responses.
4712        mdns_server.shutdown().unwrap();
4713        mdns_client.shutdown().unwrap();
4714    }
4715
4716    #[test]
4717    fn test_name_change() {
4718        assert_eq!(name_change("foo.local."), "foo (2).local.");
4719        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4720        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4721        assert_eq!(name_change("foo"), "foo (2)");
4722        assert_eq!(name_change("foo (2)"), "foo (3)");
4723        assert_eq!(name_change(""), " (2)");
4724
4725        // Additional edge cases
4726        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4727        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4728        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4729    }
4730
4731    #[test]
4732    fn test_hostname_change() {
4733        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4734        assert_eq!(hostname_change("foo"), "foo-2");
4735        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4736        assert_eq!(hostname_change("foo-9"), "foo-10");
4737        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4738    }
4739
4740    #[test]
4741    fn test_add_answer_txt_ttl() {
4742        // construct a simple service info
4743        let service_type = "_test_add_answer._udp.local.";
4744        let instance = "test_instance";
4745        let host_name = "add_answer_host.local.";
4746        let service_intf = my_ip_interfaces(false)
4747            .into_iter()
4748            .find(|iface| iface.ip().is_ipv4())
4749            .unwrap();
4750        let service_ip_addr = service_intf.ip();
4751        let my_service = ServiceInfo::new(
4752            service_type,
4753            instance,
4754            host_name,
4755            &service_ip_addr,
4756            5023,
4757            None,
4758        )
4759        .unwrap();
4760
4761        // construct a DnsOutgoing message
4762        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4763
4764        // Construct a dummy DnsIncoming message
4765        let mut dummy_data = out.to_data_on_wire();
4766        let interface_id = InterfaceId::from(&service_intf);
4767        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4768
4769        // Add an answer of TXT type for the service.
4770        let if_addrs = vec![service_intf.ip()];
4771        add_answer_of_service(
4772            &mut out,
4773            &incoming,
4774            instance,
4775            &my_service,
4776            RRType::TXT,
4777            if_addrs,
4778        );
4779
4780        // Check if the answer was added correctly
4781        assert!(
4782            out.answers_count() > 0,
4783            "No answers added to the outgoing message"
4784        );
4785
4786        // Check if the first answer is of type TXT
4787        let answer = out._answers().first().unwrap();
4788        assert_eq!(answer.0.get_type(), RRType::TXT);
4789
4790        // Check TTL is set properly for the TXT record
4791        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4792    }
4793
4794    #[test]
4795    fn test_interface_flip() {
4796        // start a server
4797        let ty_domain = "_intf-flip._udp.local.";
4798        let host_name = "intf_flip.local.";
4799        let now = SystemTime::now()
4800            .duration_since(SystemTime::UNIX_EPOCH)
4801            .unwrap();
4802        let instance_name = now.as_micros().to_string(); // Create a unique name.
4803        let port = 5200;
4804
4805        // Get a single IPv4 address
4806        let (ip_addr1, intf_name) = my_ip_interfaces(false)
4807            .iter()
4808            .find(|iface| iface.ip().is_ipv4())
4809            .map(|iface| (iface.ip(), iface.name.clone()))
4810            .unwrap();
4811
4812        println!("Using interface {} with IP {}", intf_name, ip_addr1);
4813
4814        // Register the service.
4815        let service1 =
4816            ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
4817                .expect("valid service info");
4818        let server1 = ServiceDaemon::new().expect("failed to start server");
4819        server1
4820            .register(service1)
4821            .expect("Failed to register service1");
4822
4823        // wait for the service announced.
4824        std::thread::sleep(Duration::from_secs(2));
4825
4826        // start a client
4827        let client = ServiceDaemon::new().expect("failed to start client");
4828
4829        let receiver = client.browse(ty_domain).unwrap();
4830
4831        let timeout = Duration::from_secs(3);
4832        let mut got_data = false;
4833
4834        while let Ok(event) = receiver.recv_timeout(timeout) {
4835            match event {
4836                ServiceEvent::ServiceResolved(_) => {
4837                    println!("Received ServiceResolved event");
4838                    got_data = true;
4839                    break;
4840                }
4841                _ => {}
4842            }
4843        }
4844
4845        assert!(got_data, "Should receive ServiceResolved event");
4846
4847        // Set a short IP check interval to detect interface changes quickly.
4848        client.set_ip_check_interval(1).unwrap();
4849
4850        // Now shutdown the interface and expect the client to lose the service.
4851        println!("Shutting down interface {}", &intf_name);
4852        client.test_down_interface(&intf_name).unwrap();
4853
4854        let mut got_removed = false;
4855
4856        while let Ok(event) = receiver.recv_timeout(timeout) {
4857            match event {
4858                ServiceEvent::ServiceRemoved(ty_domain, instance) => {
4859                    got_removed = true;
4860                    println!("removed: {ty_domain} : {instance}");
4861                    break;
4862                }
4863                _ => {}
4864            }
4865        }
4866        assert!(got_removed, "Should receive ServiceRemoved event");
4867
4868        println!("Bringing up interface {}", &intf_name);
4869        client.test_up_interface(&intf_name).unwrap();
4870        let mut got_data = false;
4871        while let Ok(event) = receiver.recv_timeout(timeout) {
4872            match event {
4873                ServiceEvent::ServiceResolved(resolved) => {
4874                    got_data = true;
4875                    println!("Received ServiceResolved: {:?}", resolved);
4876                    break;
4877                }
4878                _ => {}
4879            }
4880        }
4881        assert!(
4882            got_data,
4883            "Should receive ServiceResolved event after interface is back up"
4884        );
4885
4886        server1.shutdown().unwrap();
4887        client.shutdown().unwrap();
4888    }
4889
4890    #[test]
4891    fn test_cache_only() {
4892        // construct service info
4893        let service_type = "_cache_only._udp.local.";
4894        let instance = "test_instance";
4895        let host_name = "cache_only_host.local.";
4896        let service_ip_addr = my_ip_interfaces(false)
4897            .iter()
4898            .find(|iface| iface.ip().is_ipv4())
4899            .map(|iface| iface.ip())
4900            .unwrap();
4901
4902        let mut my_service = ServiceInfo::new(
4903            service_type,
4904            instance,
4905            host_name,
4906            &service_ip_addr,
4907            5023,
4908            None,
4909        )
4910        .unwrap();
4911
4912        let new_ttl = 3; // for testing only.
4913        my_service._set_other_ttl(new_ttl);
4914
4915        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4916
4917        // make a single browse request to record that we are interested in the service.  This ensures that
4918        // subsequent announcements are cached.
4919        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4920        std::thread::sleep(Duration::from_secs(2));
4921
4922        // register my service
4923        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4924        let result = mdns_server.register(my_service);
4925        assert!(result.is_ok());
4926
4927        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4928        let mut resolved = false;
4929
4930        // resolve the service.
4931        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4932            match event {
4933                ServiceEvent::ServiceResolved(info) => {
4934                    resolved = true;
4935                    println!("Resolved a service of {}", &info.get_fullname());
4936                    break;
4937                }
4938                _ => {}
4939            }
4940        }
4941
4942        assert!(resolved);
4943
4944        // Exit the server so that no more responses.
4945        mdns_server.shutdown().unwrap();
4946        mdns_client.shutdown().unwrap();
4947    }
4948
4949    #[test]
4950    fn test_cache_only_unsolicited() {
4951        // construct service info
4952        let service_type = "_cache_only._udp.local.";
4953        let instance = "test_instance";
4954        let host_name = "cache_only_host.local.";
4955        let service_ip_addr = my_ip_interfaces(false)
4956            .iter()
4957            .find(|iface| iface.ip().is_ipv4())
4958            .map(|iface| iface.ip())
4959            .unwrap();
4960
4961        let mut my_service = ServiceInfo::new(
4962            service_type,
4963            instance,
4964            host_name,
4965            &service_ip_addr,
4966            5023,
4967            None,
4968        )
4969        .unwrap();
4970
4971        let new_ttl = 3; // for testing only.
4972        my_service._set_other_ttl(new_ttl);
4973
4974        // register my service
4975        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4976        let result = mdns_server.register(my_service);
4977        assert!(result.is_ok());
4978
4979        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4980        mdns_client.accept_unsolicited(true).unwrap();
4981
4982        // Wait a bit for the service announcements to go out, before calling browse_cache.  This ensures
4983        // that the announcements are treated as unsolicited
4984        std::thread::sleep(Duration::from_secs(2));
4985        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4986        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4987        let mut resolved = false;
4988
4989        // resolve the service.
4990        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4991            match event {
4992                ServiceEvent::ServiceResolved(info) => {
4993                    resolved = true;
4994                    println!("Resolved a service of {}", &info.get_fullname());
4995                    break;
4996                }
4997                _ => {}
4998            }
4999        }
5000
5001        assert!(resolved);
5002
5003        // Exit the server so that no more responses.
5004        mdns_server.shutdown().unwrap();
5005        mdns_client.shutdown().unwrap();
5006    }
5007}