mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42    Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50    cmp::{self, Reverse},
51    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52    fmt, io,
53    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54    str, thread,
55    time::Duration,
56    vec,
57};
58
59/// The default max length of the service name without domain, not including the
60/// leading underscore (`_`). It is set to 15 per
61/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
62pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64/// The default interval for checking IP changes automatically.
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
68/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
69pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71/// The mDNS port number per RFC 6762.
72pub const MDNS_PORT: u16 = 5353;
73
74const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
75const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
76const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
77
78const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
79
80/// Response status code for the service `unregister` call.
81#[derive(Debug)]
82pub enum UnregisterStatus {
83    /// Unregister was successful.
84    OK,
85    /// The service was not found in the registration.
86    NotFound,
87}
88
89/// Status code for the service daemon.
90#[derive(Debug, PartialEq, Clone, Eq)]
91#[non_exhaustive]
92pub enum DaemonStatus {
93    /// The daemon is running as normal.
94    Running,
95
96    /// The daemon has been shutdown.
97    Shutdown,
98}
99
100/// Different counters included in the metrics.
101/// Currently all counters are for outgoing packets.
102#[derive(Hash, Eq, PartialEq)]
103enum Counter {
104    Register,
105    RegisterResend,
106    Unregister,
107    UnregisterResend,
108    Browse,
109    ResolveHostname,
110    Respond,
111    CacheRefreshPTR,
112    CacheRefreshSrvTxt,
113    CacheRefreshAddr,
114    KnownAnswerSuppression,
115    CachedPTR,
116    CachedSRV,
117    CachedAddr,
118    CachedTxt,
119    CachedNSec,
120    CachedSubtype,
121    DnsRegistryProbe,
122    DnsRegistryActive,
123    DnsRegistryTimer,
124    DnsRegistryNameChange,
125    Timer,
126}
127
128impl fmt::Display for Counter {
129    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
130        match self {
131            Self::Register => write!(f, "register"),
132            Self::RegisterResend => write!(f, "register-resend"),
133            Self::Unregister => write!(f, "unregister"),
134            Self::UnregisterResend => write!(f, "unregister-resend"),
135            Self::Browse => write!(f, "browse"),
136            Self::ResolveHostname => write!(f, "resolve-hostname"),
137            Self::Respond => write!(f, "respond"),
138            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
139            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
140            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
141            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
142            Self::CachedPTR => write!(f, "cached-ptr"),
143            Self::CachedSRV => write!(f, "cached-srv"),
144            Self::CachedAddr => write!(f, "cached-addr"),
145            Self::CachedTxt => write!(f, "cached-txt"),
146            Self::CachedNSec => write!(f, "cached-nsec"),
147            Self::CachedSubtype => write!(f, "cached-subtype"),
148            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
149            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
150            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
151            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
152            Self::Timer => write!(f, "timer"),
153        }
154    }
155}
156
157/// A wrapper around UDP socket used by the mDNS daemon.
158///
159/// We do this because `mio` does not support PKTINFO and
160/// does not provide a way to implement `Source` trait directly and safely.
161struct MyUdpSocket {
162    /// The underlying socket that supports control messages like
163    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
164    pktinfo: PktInfoUdpSocket,
165
166    /// The mio UDP socket that is a clone of `pktinfo` and
167    /// is used for event polling.
168    mio: MioUdpSocket,
169}
170
171impl MyUdpSocket {
172    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
173        let std_sock = pktinfo.try_clone_std()?;
174        let mio = MioUdpSocket::from_std(std_sock);
175
176        Ok(Self { pktinfo, mio })
177    }
178}
179
180/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
181impl Source for MyUdpSocket {
182    fn register(
183        &mut self,
184        registry: &Registry,
185        token: Token,
186        interests: Interest,
187    ) -> io::Result<()> {
188        self.mio.register(registry, token, interests)
189    }
190
191    fn reregister(
192        &mut self,
193        registry: &Registry,
194        token: Token,
195        interests: Interest,
196    ) -> io::Result<()> {
197        self.mio.reregister(registry, token, interests)
198    }
199
200    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
201        self.mio.deregister(registry)
202    }
203}
204
205/// The metrics is a HashMap of (name_key, i64_value).
206/// The main purpose is to help monitoring the mDNS packet traffic.
207pub type Metrics = HashMap<String, i64>;
208
209const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
210const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
211const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
212
213/// A daemon thread for mDNS
214///
215/// This struct provides a handle and an API to the daemon. It is cloneable.
216#[derive(Clone)]
217pub struct ServiceDaemon {
218    /// Sender handle of the channel to the daemon.
219    sender: Sender<Command>,
220
221    /// Send to this addr to signal that a `Command` is coming.
222    ///
223    /// The daemon listens on this addr together with other mDNS sockets,
224    /// to avoid busy polling the flume channel. If there is a way to poll
225    /// the channel and mDNS sockets together, then this can be removed.
226    signal_addr: SocketAddr,
227}
228
229impl ServiceDaemon {
230    /// Creates a new daemon and spawns a thread to run the daemon.
231    ///
232    /// Creates a new mDNS service daemon using the default port (5353).
233    ///
234    /// For development/testing with custom ports, use [`ServiceDaemon::new_with_port`].
235    pub fn new() -> Result<Self> {
236        Self::new_with_port(MDNS_PORT)
237    }
238
239    /// Creates a new mDNS service daemon using a custom port.
240    ///
241    /// # Arguments
242    ///
243    /// * `port` - The UDP port to bind for mDNS communication.
244    ///   - In production, this should be `MDNS_PORT` (5353) per RFC 6762.
245    ///   - For development/testing, you can use a non-standard port (e.g., 5454)
246    ///     to avoid conflicts with system mDNS services.
247    ///   - Both publisher and browser must use the same port to communicate.
248    ///
249    /// # Example
250    ///
251    /// ```no_run
252    /// use mdns_sd::ServiceDaemon;
253    ///
254    /// // Use standard mDNS port (production)
255    /// let daemon = ServiceDaemon::new_with_port(5353)?;
256    ///
257    /// // Use custom port for development (avoids macOS Bonjour conflict)
258    /// let daemon_dev = ServiceDaemon::new_with_port(5454)?;
259    /// # Ok::<(), mdns_sd::Error>(())
260    /// ```
261    pub fn new_with_port(port: u16) -> Result<Self> {
262        // Use port 0 to allow the system assign a random available port,
263        // no need for a pre-defined port number.
264        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
265
266        let signal_sock = UdpSocket::bind(signal_addr)
267            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
268
269        // Get the socket with the OS chosen port
270        let signal_addr = signal_sock
271            .local_addr()
272            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
273
274        // Must be nonblocking so we can listen to it together with mDNS sockets.
275        signal_sock
276            .set_nonblocking(true)
277            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
278
279        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
280
281        let (sender, receiver) = bounded(100);
282
283        // Spawn the daemon thread
284        let mio_sock = MioUdpSocket::from_std(signal_sock);
285        thread::Builder::new()
286            .name("mDNS_daemon".to_string())
287            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver, port))
288            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
289
290        Ok(Self {
291            sender,
292            signal_addr,
293        })
294    }
295
296    /// Sends `cmd` to the daemon via its channel, and sends a signal
297    /// to its sock addr to notify.
298    fn send_cmd(&self, cmd: Command) -> Result<()> {
299        let cmd_name = cmd.to_string();
300
301        // First, send to the flume channel.
302        self.sender.try_send(cmd).map_err(|e| match e {
303            TrySendError::Full(_) => Error::Again,
304            e => e_fmt!("flume::channel::send failed: {}", e),
305        })?;
306
307        // Second, send a signal to notify the daemon.
308        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
309        let socket = UdpSocket::bind(addr)
310            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
311        socket
312            .send_to(cmd_name.as_bytes(), self.signal_addr)
313            .map_err(|e| {
314                e_fmt!(
315                    "signal socket send_to {} ({}) failed: {}",
316                    self.signal_addr,
317                    cmd_name,
318                    e
319                )
320            })?;
321
322        Ok(())
323    }
324
325    /// Starts browsing for a specific service type.
326    ///
327    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
328    ///
329    /// Returns a channel `Receiver` to receive events about the service. The caller
330    /// can call `.recv_async().await` on this receiver to handle events in an
331    /// async environment or call `.recv()` in a sync environment.
332    ///
333    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
334    /// finding more details, i.e. SRV records and TXT records.
335    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
336        check_domain_suffix(service_type)?;
337
338        let (resp_s, resp_r) = bounded(10);
339        self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
340        Ok(resp_r)
341    }
342
343    /// Preforms a "cache-only" browse.
344    ///
345    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
346    ///
347    /// The functionality is identical to 'browse', but the service events are based solely on the contents
348    /// of the daemon's cache. No actual mDNS query is sent to the network.
349    ///
350    /// See [accept_unsolicited](Self::accept_unsolicited) if you want to do cache-only browsing.
351    pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
352        check_domain_suffix(service_type)?;
353
354        let (resp_s, resp_r) = bounded(10);
355        self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
356        Ok(resp_r)
357    }
358
359    /// Stops searching for a specific service type.
360    ///
361    /// When an error is returned, the caller should retry only when
362    /// the error is `Error::Again`, otherwise should log and move on.
363    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
364        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
365    }
366
367    /// Starts querying for the ip addresses of a hostname.
368    ///
369    /// Returns a channel `Receiver` to receive events about the hostname.
370    /// The caller can call `.recv_async().await` on this receiver to handle events in an
371    /// async environment or call `.recv()` in a sync environment.
372    ///
373    /// The `timeout` is specified in milliseconds.
374    pub fn resolve_hostname(
375        &self,
376        hostname: &str,
377        timeout: Option<u64>,
378    ) -> Result<Receiver<HostnameResolutionEvent>> {
379        check_hostname(hostname)?;
380        let (resp_s, resp_r) = bounded(10);
381        self.send_cmd(Command::ResolveHostname(
382            hostname.to_string(),
383            1,
384            resp_s,
385            timeout,
386        ))?;
387        Ok(resp_r)
388    }
389
390    /// Stops querying for the ip addresses of a hostname.
391    ///
392    /// When an error is returned, the caller should retry only when
393    /// the error is `Error::Again`, otherwise should log and move on.
394    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
395        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
396    }
397
398    /// Registers a service provided by this host.
399    ///
400    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
401    /// this method will automatically fill in addresses from the host.
402    ///
403    /// To re-announce a service with an updated `service_info`, just call
404    /// this `register` function again. No need to call `unregister` first.
405    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
406        check_service_name(service_info.get_fullname())?;
407        check_hostname(service_info.get_hostname())?;
408
409        self.send_cmd(Command::Register(service_info.into()))
410    }
411
412    /// Unregisters a service. This is a graceful shutdown of a service.
413    ///
414    /// Returns a channel receiver that is used to receive the status code
415    /// of the unregister.
416    ///
417    /// When an error is returned, the caller should retry only when
418    /// the error is `Error::Again`, otherwise should log and move on.
419    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
420        let (resp_s, resp_r) = bounded(1);
421        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
422        Ok(resp_r)
423    }
424
425    /// Starts to monitor events from the daemon.
426    ///
427    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
428    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
429        let (resp_s, resp_r) = bounded(100);
430        self.send_cmd(Command::Monitor(resp_s))?;
431        Ok(resp_r)
432    }
433
434    /// Shuts down the daemon thread and returns a channel to receive the status.
435    ///
436    /// When an error is returned, the caller should retry only when
437    /// the error is `Error::Again`, otherwise should log and move on.
438    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
439        let (resp_s, resp_r) = bounded(1);
440        self.send_cmd(Command::Exit(resp_s))?;
441        Ok(resp_r)
442    }
443
444    /// Returns the status of the daemon.
445    ///
446    /// When an error is returned, the caller should retry only when
447    /// the error is `Error::Again`, otherwise should consider the daemon
448    /// stopped working and move on.
449    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
450        let (resp_s, resp_r) = bounded(1);
451
452        if self.sender.is_disconnected() {
453            resp_s
454                .send(DaemonStatus::Shutdown)
455                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
456        } else {
457            self.send_cmd(Command::GetStatus(resp_s))?;
458        }
459
460        Ok(resp_r)
461    }
462
463    /// Returns a channel receiver for the metrics, e.g. input/output counters.
464    ///
465    /// The metrics returned is a snapshot. Hence the caller should call
466    /// this method repeatedly if they want to monitor the metrics continuously.
467    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
468        let (resp_s, resp_r) = bounded(1);
469        self.send_cmd(Command::GetMetrics(resp_s))?;
470        Ok(resp_r)
471    }
472
473    /// Change the max length allowed for a service name.
474    ///
475    /// As RFC 6763 defines a length max for a service name, a user should not call
476    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
477    ///
478    /// `len_max` is capped at an internal limit, which is currently 30.
479    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
480        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
481
482        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
483            return Err(Error::Msg(format!(
484                "service name length max {len_max} is too large"
485            )));
486        }
487
488        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
489    }
490
491    /// Change the interval for checking IP changes automatically.
492    ///
493    /// Setting the interval to 0 disables the IP check.
494    ///
495    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
496    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
497        let interval_in_millis = interval_in_secs as u64 * 1000;
498        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
499            interval_in_millis,
500        )))
501    }
502
503    /// Get the current interval in seconds for checking IP changes automatically.
504    pub fn get_ip_check_interval(&self) -> Result<u32> {
505        let (resp_s, resp_r) = bounded(1);
506        self.send_cmd(Command::GetOption(resp_s))?;
507
508        let option = resp_r
509            .recv_timeout(Duration::from_secs(10))
510            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
511        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
512        Ok(ip_check_interval_in_secs as u32)
513    }
514
515    /// Include interfaces that match `if_kind` for this service daemon.
516    ///
517    /// For example:
518    /// ```ignore
519    ///     daemon.enable_interface("en0")?;
520    /// ```
521    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
522        let if_kind_vec = if_kind.into_vec();
523        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
524            if_kind_vec.kinds,
525        )))
526    }
527
528    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
529    ///
530    /// For example:
531    /// ```ignore
532    ///     daemon.disable_interface(IfKind::IPv6)?;
533    /// ```
534    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
535        let if_kind_vec = if_kind.into_vec();
536        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
537            if_kind_vec.kinds,
538        )))
539    }
540
541    /// If `accept` is true, accept and cache all responses, even if there is no active querier
542    /// for a given service type. This is useful / necessary when doing cache-only browsing. See
543    /// [browse_cache](Self::browse_cache).
544    ///
545    /// If `accept` is false (default), accept only responses matching queries that we have initiated.
546    ///
547    /// For example:
548    /// ```ignore
549    ///     daemon.accept_unsolicited(true)?;
550    /// ```
551    pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
552        self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
553    }
554
555    #[cfg(test)]
556    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
557        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
558            ifname.to_string(),
559        )))
560    }
561
562    #[cfg(test)]
563    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
564        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
565            ifname.to_string(),
566        )))
567    }
568
569    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
570    ///
571    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
572    /// receive announcements from a responder on the same host.
573    ///
574    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
575    ///
576    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
577    /// the UNIX version of the IP_MULTICAST_LOOP option:
578    ///
579    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
580    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
581    ///
582    /// Which means, in order NOT to receive localhost announcements, you want to call
583    /// this API on the querier side on Windows, but on the responder side on Unix.
584    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
585        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
586    }
587
588    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
589    ///
590    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
591    /// receive announcements from a responder on the same host.
592    ///
593    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
594    ///
595    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
596    /// the UNIX version of the IP_MULTICAST_LOOP option:
597    ///
598    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
599    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
600    ///
601    /// Which means, in order NOT to receive localhost announcements, you want to call
602    /// this API on the querier side on Windows, but on the responder side on Unix.
603    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
604        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
605    }
606
607    /// Proactively confirms whether a service instance still valid.
608    ///
609    /// This call will issue queries for a service instance's SRV record and Address records.
610    ///
611    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
612    /// unless there is a reason not to follow RFC.
613    ///
614    /// If no response is received within `timeout`, the current resource
615    /// records will be flushed, and if needed, `ServiceRemoved` event will be
616    /// sent to active queriers.
617    ///
618    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
619    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
620        self.send_cmd(Command::Verify(instance_fullname, timeout))
621    }
622
623    fn daemon_thread(
624        signal_sock: MioUdpSocket,
625        poller: Poll,
626        receiver: Receiver<Command>,
627        port: u16,
628    ) {
629        let mut zc = Zeroconf::new(signal_sock, poller, port);
630
631        if let Some(cmd) = zc.run(receiver) {
632            match cmd {
633                Command::Exit(resp_s) => {
634                    // It is guaranteed that the receiver already dropped,
635                    // i.e. the daemon command channel closed.
636                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
637                        debug!("exit: failed to send response of shutdown: {}", e);
638                    }
639                }
640                _ => {
641                    debug!("Unexpected command: {:?}", cmd);
642                }
643            }
644        }
645    }
646}
647
648/// Creates a new UDP socket that uses `intf` to send and recv multicast.
649fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
650    // Use the same socket for receiving and sending multicast packets.
651    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
652    let intf_ip = &intf.ip();
653    match intf_ip {
654        IpAddr::V4(ip) => {
655            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
656            let sock = new_socket(addr.into(), true)?;
657
658            // Join mDNS group to receive packets.
659            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
660                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
661
662            // Set IP_MULTICAST_IF to send packets.
663            sock.set_multicast_if_v4(ip)
664                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
665
666            // Per RFC 6762 section 11:
667            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
668            // be sent with IP TTL set to 255."
669            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
670            sock.set_multicast_ttl_v4(255)
671                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
672
673            if !should_loop {
674                sock.set_multicast_loop_v4(false)
675                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
676            }
677
678            // Test if we can send packets successfully.
679            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
680            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
681            for packet in test_packets {
682                sock.send_to(&packet, &multicast_addr)
683                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
684            }
685            MyUdpSocket::new(sock)
686                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
687        }
688        IpAddr::V6(ip) => {
689            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
690            let sock = new_socket(addr.into(), true)?;
691
692            let if_index = intf.index.unwrap_or(0);
693
694            // Join mDNS group to receive packets.
695            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
696                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
697
698            // Set IPV6_MULTICAST_IF to send packets.
699            sock.set_multicast_if_v6(if_index)
700                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
701
702            // We are not sending multicast packets to test this socket as there might
703            // be many IPv6 interfaces on a host and could cause such send error:
704            // "No buffer space available (os error 55)".
705
706            MyUdpSocket::new(sock)
707                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
708        }
709    }
710}
711
712/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
713/// `non_block` indicates whether to set O_NONBLOCK for the socket.
714fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
715    let domain = match addr {
716        SocketAddr::V4(_) => socket2::Domain::IPV4,
717        SocketAddr::V6(_) => socket2::Domain::IPV6,
718    };
719
720    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
721
722    fd.set_reuse_address(true)
723        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
724    #[cfg(unix)] // this is currently restricted to Unix's in socket2
725    fd.set_reuse_port(true)
726        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
727
728    if non_block {
729        fd.set_nonblocking(true)
730            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
731    }
732
733    fd.bind(&addr.into())
734        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
735
736    trace!("new socket bind to {}", &addr);
737    Ok(fd)
738}
739
740/// Specify a UNIX timestamp in millis to run `command` for the next time.
741struct ReRun {
742    /// UNIX timestamp in millis.
743    next_time: u64,
744    command: Command,
745}
746
747/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
748///
749/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
750#[derive(Debug, Clone)]
751#[non_exhaustive]
752pub enum IfKind {
753    /// All interfaces.
754    All,
755
756    /// All IPv4 interfaces.
757    IPv4,
758
759    /// All IPv6 interfaces.
760    IPv6,
761
762    /// By the interface name, for example "en0"
763    Name(String),
764
765    /// By an IPv4 or IPv6 address.
766    Addr(IpAddr),
767
768    /// 127.0.0.1 (or anything in 127.0.0.0/8), enabled by default.
769    ///
770    /// Loopback interfaces are required by some use cases (e.g., OSCQuery) for publishing.
771    LoopbackV4,
772
773    /// ::1/128, enabled by default.
774    LoopbackV6,
775}
776
777impl IfKind {
778    /// Checks if `intf` matches with this interface kind.
779    fn matches(&self, intf: &Interface) -> bool {
780        match self {
781            Self::All => true,
782            Self::IPv4 => intf.ip().is_ipv4(),
783            Self::IPv6 => intf.ip().is_ipv6(),
784            Self::Name(ifname) => ifname == &intf.name,
785            Self::Addr(addr) => addr == &intf.ip(),
786            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
787            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
788        }
789    }
790}
791
792/// The first use case of specifying an interface was to
793/// use an interface name. Hence adding this for ergonomic reasons.
794impl From<&str> for IfKind {
795    fn from(val: &str) -> Self {
796        Self::Name(val.to_string())
797    }
798}
799
800impl From<&String> for IfKind {
801    fn from(val: &String) -> Self {
802        Self::Name(val.to_string())
803    }
804}
805
806/// Still for ergonomic reasons.
807impl From<IpAddr> for IfKind {
808    fn from(val: IpAddr) -> Self {
809        Self::Addr(val)
810    }
811}
812
813/// A list of `IfKind` that can be used to match interfaces.
814pub struct IfKindVec {
815    kinds: Vec<IfKind>,
816}
817
818/// A trait that converts a type into a Vec of `IfKind`.
819pub trait IntoIfKindVec {
820    fn into_vec(self) -> IfKindVec;
821}
822
823impl<T: Into<IfKind>> IntoIfKindVec for T {
824    fn into_vec(self) -> IfKindVec {
825        let if_kind: IfKind = self.into();
826        IfKindVec {
827            kinds: vec![if_kind],
828        }
829    }
830}
831
832impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
833    fn into_vec(self) -> IfKindVec {
834        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
835        IfKindVec { kinds }
836    }
837}
838
839/// Selection of interfaces.
840struct IfSelection {
841    /// The interfaces to be selected.
842    if_kind: IfKind,
843
844    /// Whether the `if_kind` should be enabled or not.
845    selected: bool,
846}
847
848/// A struct holding the state. It was inspired by `zeroconf` package in Python.
849struct Zeroconf {
850    /// The mDNS port number to use for socket binding.
851    /// Typically MDNS_PORT (5353), but can be customized for development/testing.
852    port: u16,
853
854    /// Local interfaces keyed by interface index.
855    my_intfs: HashMap<u32, MyIntf>,
856
857    /// A common socket for IPv4 interfaces. It's None if IPv4 is disabled in OS kernel.
858    ipv4_sock: Option<MyUdpSocket>,
859
860    /// A common socket for IPv6 interfaces. It's None if IPv6 is disabled in OS kernel.
861    ipv6_sock: Option<MyUdpSocket>,
862
863    /// Local registered services, keyed by service full names.
864    my_services: HashMap<String, ServiceInfo>,
865
866    /// Received DNS records.
867    cache: DnsCache,
868
869    /// Registered service records, keyed by interface index.
870    dns_registry_map: HashMap<u32, DnsRegistry>,
871
872    /// Active "Browse" commands.
873    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
874
875    /// Active "ResolveHostname" commands.
876    ///
877    /// The timestamps are set at the future timestamp when the command should timeout.
878    /// `hostname` is case-insensitive and stored in lowercase.
879    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
880
881    /// All repeating transmissions.
882    retransmissions: Vec<ReRun>,
883
884    counters: Metrics,
885
886    /// Waits for incoming packets.
887    poller: Poll,
888
889    /// Channels to notify events.
890    monitors: Vec<Sender<DaemonEvent>>,
891
892    /// Options
893    service_name_len_max: u8,
894
895    /// Interval in millis to check IP address changes.
896    ip_check_interval: u64,
897
898    /// All interface selections called to the daemon.
899    if_selections: Vec<IfSelection>,
900
901    /// Socket for signaling.
902    signal_sock: MioUdpSocket,
903
904    /// Timestamps marking where we need another iteration of the run loop,
905    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
906    ///
907    /// When the run loop goes through a single iteration, it will
908    /// set its timeout to the earliest timer in this list.
909    timers: BinaryHeap<Reverse<u64>>,
910
911    status: DaemonStatus,
912
913    /// Service instances that are pending for resolving SRV and TXT.
914    pending_resolves: HashSet<String>,
915
916    /// Service instances that are already resolved.
917    resolved: HashSet<String>,
918
919    multicast_loop_v4: bool,
920
921    multicast_loop_v6: bool,
922
923    accept_unsolicited: bool,
924
925    #[cfg(test)]
926    test_down_interfaces: HashSet<String>,
927}
928
929/// Join the multicast group for the given interface.
930fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
931    let intf_ip = &intf.ip();
932    match intf_ip {
933        IpAddr::V4(ip) => {
934            // Join mDNS group to receive packets.
935            debug!("join multicast group V4 on {} addr {ip}", intf.name);
936            my_sock
937                .join_multicast_v4(&GROUP_ADDR_V4, ip)
938                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
939        }
940        IpAddr::V6(ip) => {
941            let if_index = intf.index.unwrap_or(0);
942            // Join mDNS group to receive packets.
943            debug!(
944                "join multicast group V6 on {} addr {ip} with index {if_index}",
945                intf.name
946            );
947            my_sock
948                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
949                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
950        }
951    }
952    Ok(())
953}
954
955impl Zeroconf {
956    fn new(signal_sock: MioUdpSocket, poller: Poll, port: u16) -> Self {
957        // Get interfaces.
958        let my_ifaddrs = my_ip_interfaces(true);
959
960        // Create a socket for every IP addr.
961        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
962        // or the same interface name with different IP addrs.
963        let mut my_intfs = HashMap::new();
964        let mut dns_registry_map = HashMap::new();
965
966        // Use the same socket for receiving and sending multicast packets.
967        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
968        let mut ipv4_sock = None;
969        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
970        match new_socket(addr.into(), true) {
971            Ok(sock) => {
972                // Per RFC 6762 section 11:
973                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
974                // be sent with IP TTL set to 255."
975                // Here we set the TTL to 255 for multicast as we don't support unicast yet.
976                sock.set_multicast_ttl_v4(255)
977                    .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
978                    .ok();
979
980                // This clones a socket.
981                ipv4_sock = match MyUdpSocket::new(sock) {
982                    Ok(s) => Some(s),
983                    Err(e) => {
984                        debug!("failed to create IPv4 MyUdpSocket: {e}");
985                        None
986                    }
987                };
988            }
989            // Per RFC 6762 section 11:}
990            Err(e) => debug!("failed to create IPv4 socket: {e}"),
991        }
992
993        let mut ipv6_sock = None;
994        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
995        match new_socket(addr.into(), true) {
996            Ok(sock) => {
997                // Per RFC 6762 section 11:
998                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
999                // be sent with IP TTL set to 255."
1000                sock.set_multicast_hops_v6(255)
1001                    .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1002                    .ok();
1003
1004                // This clones the ipv6 socket.
1005                ipv6_sock = match MyUdpSocket::new(sock) {
1006                    Ok(s) => Some(s),
1007                    Err(e) => {
1008                        debug!("failed to create IPv6 MyUdpSocket: {e}");
1009                        None
1010                    }
1011                };
1012            }
1013            Err(e) => debug!("failed to create IPv6 socket: {e}"),
1014        }
1015
1016        // Configure sockets to join multicast groups.
1017        for intf in my_ifaddrs {
1018            let sock_opt = if intf.ip().is_ipv4() {
1019                &ipv4_sock
1020            } else {
1021                &ipv6_sock
1022            };
1023            let Some(sock) = sock_opt else {
1024                debug!(
1025                    "no socket available for interface {} with addr {}. Skipped.",
1026                    intf.name,
1027                    intf.ip()
1028                );
1029                continue;
1030            };
1031
1032            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1033                debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1034            }
1035
1036            let if_index = intf.index.unwrap_or(0);
1037
1038            // Add this interface address if not already present.
1039            dns_registry_map
1040                .entry(if_index)
1041                .or_insert_with(DnsRegistry::new);
1042
1043            my_intfs
1044                .entry(if_index)
1045                .and_modify(|v: &mut MyIntf| {
1046                    v.addrs.insert(intf.addr.clone());
1047                })
1048                .or_insert(MyIntf {
1049                    name: intf.name.clone(),
1050                    index: if_index,
1051                    addrs: HashSet::from([intf.addr]),
1052                });
1053        }
1054
1055        let monitors = Vec::new();
1056        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1057        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1058
1059        let timers = BinaryHeap::new();
1060
1061        // Enable everything, including loopback interfaces.
1062        let if_selections = vec![];
1063
1064        let status = DaemonStatus::Running;
1065
1066        Self {
1067            port,
1068            my_intfs,
1069            ipv4_sock,
1070            ipv6_sock,
1071            my_services: HashMap::new(),
1072            cache: DnsCache::new(),
1073            dns_registry_map,
1074            hostname_resolvers: HashMap::new(),
1075            service_queriers: HashMap::new(),
1076            retransmissions: Vec::new(),
1077            counters: HashMap::new(),
1078            poller,
1079            monitors,
1080            service_name_len_max,
1081            ip_check_interval,
1082            if_selections,
1083            signal_sock,
1084            timers,
1085            status,
1086            pending_resolves: HashSet::new(),
1087            resolved: HashSet::new(),
1088            multicast_loop_v4: true,
1089            multicast_loop_v6: true,
1090            accept_unsolicited: false,
1091
1092            #[cfg(test)]
1093            test_down_interfaces: HashSet::new(),
1094        }
1095    }
1096
1097    /// The main event loop of the daemon thread
1098    ///
1099    /// In each round, it will:
1100    /// 1. select the listening sockets with a timeout.
1101    /// 2. process the incoming packets if any.
1102    /// 3. try_recv on its channel and execute commands.
1103    /// 4. announce its registered services.
1104    /// 5. process retransmissions if any.
1105    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1106        // Add the daemon's signal socket to the poller.
1107        if let Err(e) = self.poller.registry().register(
1108            &mut self.signal_sock,
1109            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1110            mio::Interest::READABLE,
1111        ) {
1112            debug!("failed to add signal socket to the poller: {}", e);
1113            return None;
1114        }
1115
1116        if let Some(sock) = self.ipv4_sock.as_mut() {
1117            if let Err(e) = self.poller.registry().register(
1118                sock,
1119                mio::Token(IPV4_SOCK_EVENT_KEY),
1120                mio::Interest::READABLE,
1121            ) {
1122                debug!("failed to register ipv4 socket: {}", e);
1123                return None;
1124            }
1125        }
1126
1127        if let Some(sock) = self.ipv6_sock.as_mut() {
1128            if let Err(e) = self.poller.registry().register(
1129                sock,
1130                mio::Token(IPV6_SOCK_EVENT_KEY),
1131                mio::Interest::READABLE,
1132            ) {
1133                debug!("failed to register ipv6 socket: {}", e);
1134                return None;
1135            }
1136        }
1137
1138        // Setup timer for IP checks.
1139        let mut next_ip_check = if self.ip_check_interval > 0 {
1140            current_time_millis() + self.ip_check_interval
1141        } else {
1142            0
1143        };
1144
1145        if next_ip_check > 0 {
1146            self.add_timer(next_ip_check);
1147        }
1148
1149        // Start the run loop.
1150
1151        let mut events = mio::Events::with_capacity(1024);
1152        loop {
1153            let now = current_time_millis();
1154
1155            let earliest_timer = self.peek_earliest_timer();
1156            let timeout = earliest_timer.map(|timer| {
1157                // If `timer` already passed, set `timeout` to be 1ms.
1158                let millis = if timer > now { timer - now } else { 1 };
1159                Duration::from_millis(millis)
1160            });
1161
1162            // Process incoming packets, command events and optional timeout.
1163            events.clear();
1164            match self.poller.poll(&mut events, timeout) {
1165                Ok(_) => self.handle_poller_events(&events),
1166                Err(e) => debug!("failed to select from sockets: {}", e),
1167            }
1168
1169            let now = current_time_millis();
1170
1171            // Remove the timers if already passed.
1172            self.pop_timers_till(now);
1173
1174            // Remove hostname resolvers with expired timeouts.
1175            for hostname in self
1176                .hostname_resolvers
1177                .clone()
1178                .into_iter()
1179                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1180                .map(|(hostname, _)| hostname)
1181            {
1182                trace!("hostname resolver timeout for {}", &hostname);
1183                call_hostname_resolution_listener(
1184                    &self.hostname_resolvers,
1185                    &hostname,
1186                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1187                );
1188                call_hostname_resolution_listener(
1189                    &self.hostname_resolvers,
1190                    &hostname,
1191                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1192                );
1193                self.hostname_resolvers.remove(&hostname);
1194            }
1195
1196            // process commands from the command channel
1197            while let Ok(command) = receiver.try_recv() {
1198                if matches!(command, Command::Exit(_)) {
1199                    self.status = DaemonStatus::Shutdown;
1200                    return Some(command);
1201                }
1202                self.exec_command(command, false);
1203            }
1204
1205            // check for repeated commands and run them if their time is up.
1206            let mut i = 0;
1207            while i < self.retransmissions.len() {
1208                if now >= self.retransmissions[i].next_time {
1209                    let rerun = self.retransmissions.remove(i);
1210                    self.exec_command(rerun.command, true);
1211                } else {
1212                    i += 1;
1213                }
1214            }
1215
1216            // Refresh cached service records with active queriers
1217            self.refresh_active_services();
1218
1219            // Refresh cached A/AAAA records with active queriers
1220            let mut query_count = 0;
1221            for (hostname, _sender) in self.hostname_resolvers.iter() {
1222                for (hostname, ip_addr) in
1223                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1224                {
1225                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1226                    query_count += 1;
1227                }
1228            }
1229
1230            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1231
1232            // check and evict expired records in our cache
1233            let now = current_time_millis();
1234
1235            // Notify service listeners about the expired records.
1236            let expired_services = self.cache.evict_expired_services(now);
1237            if !expired_services.is_empty() {
1238                debug!(
1239                    "run: send {} service removal to listeners",
1240                    expired_services.len()
1241                );
1242                self.notify_service_removal(expired_services);
1243            }
1244
1245            // Notify hostname listeners about the expired records.
1246            let expired_addrs = self.cache.evict_expired_addr(now);
1247            for (hostname, addrs) in expired_addrs {
1248                call_hostname_resolution_listener(
1249                    &self.hostname_resolvers,
1250                    &hostname,
1251                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1252                );
1253                let instances = self.cache.get_instances_on_host(&hostname);
1254                let instance_set: HashSet<String> = instances.into_iter().collect();
1255                self.resolve_updated_instances(&instance_set);
1256            }
1257
1258            // Send out probing queries.
1259            self.probing_handler();
1260
1261            // check IP changes if next_ip_check is reached.
1262            if now >= next_ip_check && next_ip_check > 0 {
1263                next_ip_check = now + self.ip_check_interval;
1264                self.add_timer(next_ip_check);
1265
1266                self.check_ip_changes();
1267            }
1268        }
1269    }
1270
1271    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1272        match daemon_opt {
1273            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1274            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1275            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1276            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1277            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1278            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1279            DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1280            #[cfg(test)]
1281            DaemonOption::TestDownInterface(ifname) => {
1282                self.test_down_interfaces.insert(ifname);
1283            }
1284            #[cfg(test)]
1285            DaemonOption::TestUpInterface(ifname) => {
1286                self.test_down_interfaces.remove(&ifname);
1287            }
1288        }
1289    }
1290
1291    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1292        debug!("enable_interface: {:?}", kinds);
1293        for if_kind in kinds {
1294            self.if_selections.push(IfSelection {
1295                if_kind,
1296                selected: true,
1297            });
1298        }
1299
1300        self.apply_intf_selections(my_ip_interfaces(true));
1301    }
1302
1303    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1304        debug!("disable_interface: {:?}", kinds);
1305        for if_kind in kinds {
1306            self.if_selections.push(IfSelection {
1307                if_kind,
1308                selected: false,
1309            });
1310        }
1311
1312        self.apply_intf_selections(my_ip_interfaces(true));
1313    }
1314
1315    fn set_multicast_loop_v4(&mut self, on: bool) {
1316        let Some(sock) = self.ipv4_sock.as_mut() else {
1317            return;
1318        };
1319        self.multicast_loop_v4 = on;
1320        sock.pktinfo
1321            .set_multicast_loop_v4(on)
1322            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1323            .unwrap();
1324    }
1325
1326    fn set_multicast_loop_v6(&mut self, on: bool) {
1327        let Some(sock) = self.ipv6_sock.as_mut() else {
1328            return;
1329        };
1330        self.multicast_loop_v6 = on;
1331        sock.pktinfo
1332            .set_multicast_loop_v6(on)
1333            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1334            .unwrap();
1335    }
1336
1337    fn set_accept_unsolicited(&mut self, accept: bool) {
1338        self.accept_unsolicited = accept;
1339    }
1340
1341    fn notify_monitors(&mut self, event: DaemonEvent) {
1342        // Only retain the monitors that are still connected.
1343        self.monitors.retain(|sender| {
1344            if let Err(e) = sender.try_send(event.clone()) {
1345                debug!("notify_monitors: try_send: {}", &e);
1346                if matches!(e, TrySendError::Disconnected(_)) {
1347                    return false; // This monitor is dropped.
1348                }
1349            }
1350            true
1351        });
1352    }
1353
1354    /// Remove `addr` in my services that enabled `addr_auto`.
1355    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1356        for (_, service_info) in self.my_services.iter_mut() {
1357            if service_info.is_addr_auto() {
1358                service_info.remove_ipaddr(addr);
1359            }
1360        }
1361    }
1362
1363    fn add_timer(&mut self, next_time: u64) {
1364        self.timers.push(Reverse(next_time));
1365    }
1366
1367    fn peek_earliest_timer(&self) -> Option<u64> {
1368        self.timers.peek().map(|Reverse(v)| *v)
1369    }
1370
1371    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1372        self.timers.pop().map(|Reverse(v)| v)
1373    }
1374
1375    /// Pop all timers that are already passed till `now`.
1376    fn pop_timers_till(&mut self, now: u64) {
1377        while let Some(Reverse(v)) = self.timers.peek() {
1378            if *v > now {
1379                break;
1380            }
1381            self.timers.pop();
1382        }
1383    }
1384
1385    /// Apply all selections to `interfaces` and return the selected addresses.
1386    fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1387        let intf_count = interfaces.len();
1388        let mut intf_selections = vec![true; intf_count];
1389
1390        // apply if_selections
1391        for selection in self.if_selections.iter() {
1392            // Mark the interfaces for this selection.
1393            for i in 0..intf_count {
1394                if selection.if_kind.matches(&interfaces[i]) {
1395                    intf_selections[i] = selection.selected;
1396                }
1397            }
1398        }
1399
1400        let mut selected_addrs = HashSet::new();
1401        for i in 0..intf_count {
1402            if intf_selections[i] {
1403                selected_addrs.insert(interfaces[i].clone());
1404            }
1405        }
1406
1407        selected_addrs
1408    }
1409
1410    /// Apply all selections to `interfaces`.
1411    ///
1412    /// For any interface, add it if selected but not bound yet,
1413    /// delete it if not selected but still bound.
1414    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1415        // By default, we enable all interfaces.
1416        let intf_count = interfaces.len();
1417        let mut intf_selections = vec![true; intf_count];
1418
1419        // apply if_selections
1420        for selection in self.if_selections.iter() {
1421            // Mark the interfaces for this selection.
1422            for i in 0..intf_count {
1423                if selection.if_kind.matches(&interfaces[i]) {
1424                    intf_selections[i] = selection.selected;
1425                }
1426            }
1427        }
1428
1429        // Update `my_intfs` based on the selections.
1430        for (idx, intf) in interfaces.into_iter().enumerate() {
1431            if intf_selections[idx] {
1432                // Add the interface
1433                self.add_interface(intf);
1434            } else {
1435                // Remove the interface
1436                self.del_interface(&intf);
1437            }
1438        }
1439    }
1440
1441    fn del_ip(&mut self, ip: IpAddr) {
1442        self.del_addr_in_my_services(&ip);
1443        self.notify_monitors(DaemonEvent::IpDel(ip));
1444    }
1445
1446    /// Check for IP changes and update [my_intfs] as needed.
1447    fn check_ip_changes(&mut self) {
1448        // Get the current interfaces.
1449        let my_ifaddrs = my_ip_interfaces(true);
1450
1451        #[cfg(test)]
1452        let my_ifaddrs: Vec<_> = my_ifaddrs
1453            .into_iter()
1454            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1455            .collect();
1456
1457        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1458            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1459                let if_index = intf.index.unwrap_or(0);
1460                acc.entry(if_index).or_default().push(&intf.addr);
1461                acc
1462            });
1463
1464        let mut deleted_intfs = Vec::new();
1465        let mut deleted_ips = Vec::new();
1466
1467        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1468            let mut last_ipv4 = None;
1469            let mut last_ipv6 = None;
1470
1471            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1472                my_intf.addrs.retain(|addr| {
1473                    if current_addrs.contains(&addr) {
1474                        true
1475                    } else {
1476                        match addr.ip() {
1477                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1478                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1479                        }
1480                        deleted_ips.push(addr.ip());
1481                        false
1482                    }
1483                });
1484                if my_intf.addrs.is_empty() {
1485                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1486                }
1487            } else {
1488                // If it does not exist, remove the interface.
1489                debug!(
1490                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1491                    my_intf.name, if_index
1492                );
1493                for addr in my_intf.addrs.iter() {
1494                    match addr.ip() {
1495                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1496                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1497                    }
1498                    deleted_ips.push(addr.ip())
1499                }
1500                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1501            }
1502        }
1503
1504        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1505            debug!(
1506                "check_ip_changes: {} deleted ips {} deleted intfs",
1507                deleted_ips.len(),
1508                deleted_intfs.len()
1509            );
1510        }
1511
1512        for ip in deleted_ips {
1513            self.del_ip(ip);
1514        }
1515
1516        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1517            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1518                continue;
1519            };
1520
1521            if let Some(ipv4) = last_ipv4 {
1522                debug!("leave multicast for {ipv4}");
1523                if let Some(sock) = self.ipv4_sock.as_mut() {
1524                    if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1525                        debug!("leave multicast group for addr {ipv4}: {e}");
1526                    }
1527                }
1528            }
1529
1530            if let Some(ipv6) = last_ipv6 {
1531                debug!("leave multicast for {ipv6}");
1532                if let Some(sock) = self.ipv6_sock.as_mut() {
1533                    if let Err(e) = sock
1534                        .pktinfo
1535                        .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1536                    {
1537                        debug!("leave multicast group for IPv6: {ipv6}: {e}");
1538                    }
1539                }
1540            }
1541
1542            // Remove cache records for this interface.
1543            let intf_id = InterfaceId {
1544                name: my_intf.name.to_string(),
1545                index: my_intf.index,
1546            };
1547            let removed_instances = self.cache.remove_records_on_intf(intf_id);
1548            self.notify_service_removal(removed_instances);
1549        }
1550
1551        // Add newly found interfaces only if in our selections.
1552        self.apply_intf_selections(my_ifaddrs);
1553    }
1554
1555    fn del_interface(&mut self, intf: &Interface) {
1556        let if_index = intf.index.unwrap_or(0);
1557        trace!(
1558            "del_interface: {} ({if_index}) addr {}",
1559            intf.name,
1560            intf.ip()
1561        );
1562
1563        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1564            debug!("del_interface: interface {} not found", intf.name);
1565            return;
1566        };
1567
1568        let mut ip_removed = false;
1569
1570        if my_intf.addrs.remove(&intf.addr) {
1571            ip_removed = true;
1572
1573            match intf.addr.ip() {
1574                IpAddr::V4(ipv4) => {
1575                    if my_intf.next_ifaddr_v4().is_none() {
1576                        if let Some(sock) = self.ipv4_sock.as_mut() {
1577                            if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1578                                debug!("leave multicast group for addr {ipv4}: {e}");
1579                            }
1580                        }
1581                    }
1582                }
1583
1584                IpAddr::V6(ipv6) => {
1585                    if my_intf.next_ifaddr_v6().is_none() {
1586                        if let Some(sock) = self.ipv6_sock.as_mut() {
1587                            if let Err(e) =
1588                                sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1589                            {
1590                                debug!("leave multicast group for addr {ipv6}: {e}");
1591                            }
1592                        }
1593                    }
1594                }
1595            }
1596
1597            if my_intf.addrs.is_empty() {
1598                // If no more addresses, remove the interface.
1599                debug!("del_interface: removing interface {}", intf.name);
1600                self.my_intfs.remove(&if_index);
1601                self.dns_registry_map.remove(&if_index);
1602                self.cache.remove_addrs_on_disabled_intf(if_index);
1603            }
1604        }
1605
1606        if ip_removed {
1607            // Notify the monitors.
1608            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1609            // Remove the interface from my services that enabled `addr_auto`.
1610            self.del_addr_in_my_services(&intf.ip());
1611        }
1612    }
1613
1614    fn add_interface(&mut self, intf: Interface) {
1615        let sock_opt = if intf.ip().is_ipv4() {
1616            &self.ipv4_sock
1617        } else {
1618            &self.ipv6_sock
1619        };
1620
1621        let Some(sock) = sock_opt else {
1622            debug!(
1623                "add_interface: no socket available for interface {} with addr {}. Skipped.",
1624                intf.name,
1625                intf.ip()
1626            );
1627            return;
1628        };
1629
1630        let if_index = intf.index.unwrap_or(0);
1631        let mut new_addr = false;
1632
1633        match self.my_intfs.entry(if_index) {
1634            Entry::Occupied(mut entry) => {
1635                // If intf has a new address, add it to the existing interface.
1636                let my_intf = entry.get_mut();
1637                if !my_intf.addrs.contains(&intf.addr) {
1638                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1639                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1640                    }
1641                    my_intf.addrs.insert(intf.addr.clone());
1642                    new_addr = true;
1643                }
1644            }
1645            Entry::Vacant(entry) => {
1646                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1647                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1648                    return;
1649                }
1650
1651                new_addr = true;
1652                let new_intf = MyIntf {
1653                    name: intf.name.clone(),
1654                    index: if_index,
1655                    addrs: HashSet::from([intf.addr.clone()]),
1656                };
1657                entry.insert(new_intf);
1658            }
1659        }
1660
1661        if !new_addr {
1662            trace!("add_interface: interface {} already exists", &intf.name);
1663            return;
1664        }
1665
1666        debug!("add new interface {}: {}", intf.name, intf.ip());
1667
1668        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1669            debug!("add_interface: cannot find if_index {if_index}");
1670            return;
1671        };
1672
1673        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1674            Some(registry) => registry,
1675            None => self
1676                .dns_registry_map
1677                .entry(if_index)
1678                .or_insert_with(DnsRegistry::new),
1679        };
1680
1681        for (_, service_info) in self.my_services.iter_mut() {
1682            if service_info.is_addr_auto() {
1683                service_info.insert_ipaddr(&intf);
1684
1685                if announce_service_on_intf(
1686                    dns_registry,
1687                    service_info,
1688                    my_intf,
1689                    &sock.pktinfo,
1690                    self.port,
1691                ) {
1692                    debug!(
1693                        "Announce service {} on {}",
1694                        service_info.get_fullname(),
1695                        intf.ip()
1696                    );
1697                    service_info.set_status(if_index, ServiceStatus::Announced);
1698                } else {
1699                    for timer in dns_registry.new_timers.drain(..) {
1700                        self.timers.push(Reverse(timer));
1701                    }
1702                    service_info.set_status(if_index, ServiceStatus::Probing);
1703                }
1704            }
1705        }
1706
1707        // As we added a new interface, we want to execute all active "Browse" reruns now.
1708        let mut browse_reruns = Vec::new();
1709        let mut i = 0;
1710        while i < self.retransmissions.len() {
1711            if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1712                browse_reruns.push(self.retransmissions.remove(i));
1713            } else {
1714                i += 1;
1715            }
1716        }
1717
1718        for rerun in browse_reruns {
1719            self.exec_command(rerun.command, true);
1720        }
1721
1722        // Notify the monitors.
1723        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1724    }
1725
1726    /// Registers a service.
1727    ///
1728    /// RFC 6762 section 8.3.
1729    /// ...the Multicast DNS responder MUST send
1730    ///    an unsolicited Multicast DNS response containing, in the Answer
1731    ///    Section, all of its newly registered resource records
1732    ///
1733    /// Zeroconf will then respond to requests for information about this service.
1734    fn register_service(&mut self, mut info: ServiceInfo) {
1735        // Check the service name length.
1736        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1737            debug!("check_service_name_length: {}", &e);
1738            self.notify_monitors(DaemonEvent::Error(e));
1739            return;
1740        }
1741
1742        if info.is_addr_auto() {
1743            let selected_intfs = self.selected_intfs(my_ip_interfaces(true));
1744            for intf in selected_intfs {
1745                info.insert_ipaddr(&intf);
1746            }
1747        }
1748
1749        debug!("register service {:?}", &info);
1750
1751        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1752        if !outgoing_addrs.is_empty() {
1753            self.notify_monitors(DaemonEvent::Announce(
1754                info.get_fullname().to_string(),
1755                format!("{:?}", &outgoing_addrs),
1756            ));
1757        }
1758
1759        // The key has to be lower case letter as DNS record name is case insensitive.
1760        // The info will have the original name.
1761        let service_fullname = info.get_fullname().to_lowercase();
1762        self.my_services.insert(service_fullname, info);
1763    }
1764
1765    /// Sends out announcement of `info` on every valid interface.
1766    /// Returns the list of interface IPs that sent out the announcement.
1767    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1768        let mut outgoing_addrs = Vec::new();
1769        let mut outgoing_intfs = HashSet::new();
1770
1771        for (if_index, intf) in self.my_intfs.iter() {
1772            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1773                Some(registry) => registry,
1774                None => self
1775                    .dns_registry_map
1776                    .entry(*if_index)
1777                    .or_insert_with(DnsRegistry::new),
1778            };
1779
1780            let mut announced = false;
1781
1782            // IPv4
1783            if let Some(sock) = self.ipv4_sock.as_mut() {
1784                if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1785                    for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1786                        outgoing_addrs.push(addr.ip());
1787                    }
1788                    outgoing_intfs.insert(intf.index);
1789
1790                    debug!(
1791                        "Announce service IPv4 {} on {}",
1792                        info.get_fullname(),
1793                        intf.name
1794                    );
1795                    announced = true;
1796                }
1797            }
1798
1799            if let Some(sock) = self.ipv6_sock.as_mut() {
1800                if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1801                    for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1802                        outgoing_addrs.push(addr.ip());
1803                    }
1804                    outgoing_intfs.insert(intf.index);
1805
1806                    debug!(
1807                        "Announce service IPv6 {} on {}",
1808                        info.get_fullname(),
1809                        intf.name
1810                    );
1811                    announced = true;
1812                }
1813            }
1814
1815            if announced {
1816                info.set_status(intf.index, ServiceStatus::Announced);
1817            } else {
1818                for timer in dns_registry.new_timers.drain(..) {
1819                    self.timers.push(Reverse(timer));
1820                }
1821                info.set_status(*if_index, ServiceStatus::Probing);
1822            }
1823        }
1824
1825        // RFC 6762 section 8.3.
1826        // ..The Multicast DNS responder MUST send at least two unsolicited
1827        //    responses, one second apart.
1828        let next_time = current_time_millis() + 1000;
1829        for if_index in outgoing_intfs {
1830            self.add_retransmission(
1831                next_time,
1832                Command::RegisterResend(info.get_fullname().to_string(), if_index),
1833            );
1834        }
1835
1836        outgoing_addrs
1837    }
1838
1839    /// Send probings or finish them if expired. Notify waiting services.
1840    fn probing_handler(&mut self) {
1841        let now = current_time_millis();
1842
1843        for (if_index, intf) in self.my_intfs.iter() {
1844            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1845                continue;
1846            };
1847
1848            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1849
1850            // send probing.
1851            if !out.questions().is_empty() {
1852                trace!("sending out probing of questions: {:?}", out.questions());
1853                if let Some(sock) = self.ipv4_sock.as_mut() {
1854                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port);
1855                }
1856                if let Some(sock) = self.ipv6_sock.as_mut() {
1857                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port);
1858                }
1859            }
1860
1861            // For finished probes, wake up services that are waiting for the probes.
1862            let waiting_services =
1863                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1864
1865            for service_name in waiting_services {
1866                // service names are lowercase
1867                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1868                    if info.get_status(*if_index) == ServiceStatus::Announced {
1869                        debug!("service {} already announced", info.get_fullname());
1870                        continue;
1871                    }
1872
1873                    let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
1874                        announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)
1875                    } else {
1876                        false
1877                    };
1878                    let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
1879                        announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)
1880                    } else {
1881                        false
1882                    };
1883
1884                    if announced_v4 || announced_v6 {
1885                        let next_time = now + 1000;
1886                        let command =
1887                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1888                        self.retransmissions.push(ReRun { next_time, command });
1889                        self.timers.push(Reverse(next_time));
1890
1891                        let fullname = match dns_registry.name_changes.get(&service_name) {
1892                            Some(new_name) => new_name.to_string(),
1893                            None => service_name.to_string(),
1894                        };
1895
1896                        let mut hostname = info.get_hostname();
1897                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1898                            hostname = new_name;
1899                        }
1900
1901                        debug!("wake up: announce service {} on {}", fullname, intf.name);
1902                        notify_monitors(
1903                            &mut self.monitors,
1904                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1905                        );
1906
1907                        info.set_status(*if_index, ServiceStatus::Announced);
1908                    }
1909                }
1910            }
1911        }
1912    }
1913
1914    fn unregister_service(
1915        &self,
1916        info: &ServiceInfo,
1917        intf: &MyIntf,
1918        sock: &PktInfoUdpSocket,
1919    ) -> Vec<u8> {
1920        let is_ipv4 = sock.domain() == Domain::IPV4;
1921
1922        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1923        out.add_answer_at_time(
1924            DnsPointer::new(
1925                info.get_type(),
1926                RRType::PTR,
1927                CLASS_IN,
1928                0,
1929                info.get_fullname().to_string(),
1930            ),
1931            0,
1932        );
1933
1934        if let Some(sub) = info.get_subtype() {
1935            trace!("Adding subdomain {}", sub);
1936            out.add_answer_at_time(
1937                DnsPointer::new(
1938                    sub,
1939                    RRType::PTR,
1940                    CLASS_IN,
1941                    0,
1942                    info.get_fullname().to_string(),
1943                ),
1944                0,
1945            );
1946        }
1947
1948        out.add_answer_at_time(
1949            DnsSrv::new(
1950                info.get_fullname(),
1951                CLASS_IN | CLASS_CACHE_FLUSH,
1952                0,
1953                info.get_priority(),
1954                info.get_weight(),
1955                info.get_port(),
1956                info.get_hostname().to_string(),
1957            ),
1958            0,
1959        );
1960        out.add_answer_at_time(
1961            DnsTxt::new(
1962                info.get_fullname(),
1963                CLASS_IN | CLASS_CACHE_FLUSH,
1964                0,
1965                info.generate_txt(),
1966            ),
1967            0,
1968        );
1969
1970        let if_addrs = if is_ipv4 {
1971            info.get_addrs_on_my_intf_v4(intf)
1972        } else {
1973            info.get_addrs_on_my_intf_v6(intf)
1974        };
1975
1976        if if_addrs.is_empty() {
1977            return vec![];
1978        }
1979
1980        for address in if_addrs {
1981            out.add_answer_at_time(
1982                DnsAddress::new(
1983                    info.get_hostname(),
1984                    ip_address_rr_type(&address),
1985                    CLASS_IN | CLASS_CACHE_FLUSH,
1986                    0,
1987                    address,
1988                    intf.into(),
1989                ),
1990                0,
1991            );
1992        }
1993
1994        // Only (at most) one packet is expected to be sent out.
1995        send_dns_outgoing(&out, intf, sock, self.port)
1996            .into_iter()
1997            .next()
1998            .unwrap_or_default()
1999    }
2000
2001    /// Binds a channel `listener` to querying mDNS hostnames.
2002    ///
2003    /// If there is already a `listener`, it will be updated, i.e. overwritten.
2004    fn add_hostname_resolver(
2005        &mut self,
2006        hostname: String,
2007        listener: Sender<HostnameResolutionEvent>,
2008        timeout: Option<u64>,
2009    ) {
2010        let real_timeout = timeout.map(|t| current_time_millis() + t);
2011        self.hostname_resolvers
2012            .insert(hostname.to_lowercase(), (listener, real_timeout));
2013        if let Some(t) = real_timeout {
2014            self.add_timer(t);
2015        }
2016    }
2017
2018    /// Sends a multicast query for `name` with `qtype`.
2019    fn send_query(&self, name: &str, qtype: RRType) {
2020        self.send_query_vec(&[(name, qtype)]);
2021    }
2022
2023    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
2024    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2025        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2026        let now = current_time_millis();
2027
2028        for (name, qtype) in questions {
2029            out.add_question(name, *qtype);
2030
2031            for record in self.cache.get_known_answers(name, *qtype, now) {
2032                /*
2033                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
2034                ...
2035                    When a Multicast DNS querier sends a query to which it already knows
2036                    some answers, it populates the Answer Section of the DNS query
2037                    message with those answers.
2038                 */
2039                trace!("add known answer: {:?}", record.record);
2040                let mut new_record = record.record.clone();
2041                new_record.get_record_mut().update_ttl(now);
2042                out.add_answer_box(new_record);
2043            }
2044        }
2045
2046        for (_, intf) in self.my_intfs.iter() {
2047            if let Some(sock) = self.ipv4_sock.as_ref() {
2048                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port);
2049            }
2050            if let Some(sock) = self.ipv6_sock.as_ref() {
2051                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port);
2052            }
2053        }
2054    }
2055
2056    /// Reads one UDP datagram from the socket of `intf`.
2057    ///
2058    /// Returns false if failed to receive a packet,
2059    /// otherwise returns true.
2060    fn handle_read(&mut self, event_key: usize) -> bool {
2061        let sock_opt = match event_key {
2062            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2063            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2064            _ => {
2065                debug!("handle_read: unknown token {}", event_key);
2066                return false;
2067            }
2068        };
2069        let Some(sock) = sock_opt.as_mut() else {
2070            debug!("handle_read: socket not available for token {}", event_key);
2071            return false;
2072        };
2073        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2074
2075        // Read the next mDNS UDP datagram.
2076        //
2077        // If the datagram is larger than `buf`, excess bytes may or may not
2078        // be truncated by the socket layer depending on the platform's libc.
2079        // In any case, such large datagram will not be decoded properly and
2080        // this function should return false but should not crash.
2081        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2082            Ok(sz) => sz,
2083            Err(e) => {
2084                if e.kind() != std::io::ErrorKind::WouldBlock {
2085                    debug!("listening socket read failed: {}", e);
2086                }
2087                return false;
2088            }
2089        };
2090
2091        // Find the interface that received the packet.
2092        let pkt_if_index = pktinfo.if_index as u32;
2093        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2094            debug!(
2095                "handle_read: no interface found for pktinfo if_index: {}",
2096                pktinfo.if_index
2097            );
2098            return true; // We still return true to indicate that we read something.
2099        };
2100
2101        buf.truncate(sz); // reduce potential processing errors
2102
2103        match DnsIncoming::new(buf, my_intf.into()) {
2104            Ok(msg) => {
2105                if msg.is_query() {
2106                    self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2107                } else if msg.is_response() {
2108                    self.handle_response(msg, pkt_if_index);
2109                } else {
2110                    debug!("Invalid message: not query and not response");
2111                }
2112            }
2113            Err(e) => debug!("Invalid incoming DNS message: {}", e),
2114        }
2115
2116        true
2117    }
2118
2119    /// Returns true, if sent query. Returns false if SRV already exists.
2120    fn query_unresolved(&mut self, instance: &str) -> bool {
2121        if !valid_instance_name(instance) {
2122            trace!("instance name {} not valid", instance);
2123            return false;
2124        }
2125
2126        if let Some(records) = self.cache.get_srv(instance) {
2127            for record in records {
2128                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2129                    if self.cache.get_addr(srv.host()).is_none() {
2130                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2131                        return true;
2132                    }
2133                }
2134            }
2135        } else {
2136            self.send_query(instance, RRType::ANY);
2137            return true;
2138        }
2139
2140        false
2141    }
2142
2143    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2144    /// cached records via `sender`.
2145    fn query_cache_for_service(
2146        &mut self,
2147        ty_domain: &str,
2148        sender: &Sender<ServiceEvent>,
2149        now: u64,
2150    ) {
2151        let mut resolved: HashSet<String> = HashSet::new();
2152        let mut unresolved: HashSet<String> = HashSet::new();
2153
2154        if let Some(records) = self.cache.get_ptr(ty_domain) {
2155            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2156                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2157                    let mut new_event = None;
2158                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2159                        Ok(resolved_service) => {
2160                            if resolved_service.is_valid() {
2161                                debug!("Resolved service from cache: {}", ptr.alias());
2162                                new_event =
2163                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2164                            } else {
2165                                debug!("Resolved service is not valid: {}", ptr.alias());
2166                            }
2167                        }
2168                        Err(err) => {
2169                            debug!("Error while resolving service from cache: {}", err);
2170                            continue;
2171                        }
2172                    }
2173
2174                    match sender.send(ServiceEvent::ServiceFound(
2175                        ty_domain.to_string(),
2176                        ptr.alias().to_string(),
2177                    )) {
2178                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2179                        Err(e) => {
2180                            debug!("failed to send service found: {}", e);
2181                            continue;
2182                        }
2183                    }
2184
2185                    if let Some(event) = new_event {
2186                        resolved.insert(ptr.alias().to_string());
2187                        match sender.send(event) {
2188                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2189                            Err(e) => debug!("failed to send service resolved: {}", e),
2190                        }
2191                    } else {
2192                        unresolved.insert(ptr.alias().to_string());
2193                    }
2194                }
2195            }
2196        }
2197
2198        for instance in resolved.drain() {
2199            self.pending_resolves.remove(&instance);
2200            self.resolved.insert(instance);
2201        }
2202
2203        for instance in unresolved.drain() {
2204            self.add_pending_resolve(instance);
2205        }
2206    }
2207
2208    /// Checks if `hostname` has records in the cache. If yes, sends the
2209    /// cached records via `sender`.
2210    fn query_cache_for_hostname(
2211        &mut self,
2212        hostname: &str,
2213        sender: Sender<HostnameResolutionEvent>,
2214    ) {
2215        let addresses_map = self.cache.get_addresses_for_host(hostname);
2216        for (name, addresses) in addresses_map {
2217            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2218                Ok(()) => trace!("sent hostname addresses found"),
2219                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2220            }
2221        }
2222    }
2223
2224    fn add_pending_resolve(&mut self, instance: String) {
2225        if !self.pending_resolves.contains(&instance) {
2226            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2227            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2228            self.pending_resolves.insert(instance);
2229        }
2230    }
2231
2232    /// Creates a `ResolvedService` from the cache.
2233    fn resolve_service_from_cache(
2234        &self,
2235        ty_domain: &str,
2236        fullname: &str,
2237    ) -> Result<ResolvedService> {
2238        let now = current_time_millis();
2239        let mut resolved_service = ResolvedService {
2240            ty_domain: ty_domain.to_string(),
2241            sub_ty_domain: None,
2242            fullname: fullname.to_string(),
2243            host: String::new(),
2244            port: 0,
2245            addresses: HashSet::new(),
2246            txt_properties: TxtProperties::new(),
2247        };
2248
2249        // Be sure setting `subtype` if available even when querying for the parent domain.
2250        if let Some(subtype) = self.cache.get_subtype(fullname) {
2251            trace!(
2252                "ty_domain: {} found subtype {} for instance: {}",
2253                ty_domain,
2254                subtype,
2255                fullname
2256            );
2257            if resolved_service.sub_ty_domain.is_none() {
2258                resolved_service.sub_ty_domain = Some(subtype.to_string());
2259            }
2260        }
2261
2262        // resolve SRV record
2263        if let Some(records) = self.cache.get_srv(fullname) {
2264            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2265                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2266                    resolved_service.host = dns_srv.host().to_string();
2267                    resolved_service.port = dns_srv.port();
2268                }
2269            }
2270        }
2271
2272        // resolve TXT record
2273        if let Some(records) = self.cache.get_txt(fullname) {
2274            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2275                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2276                    resolved_service.txt_properties = dns_txt.text().into();
2277                }
2278            }
2279        }
2280
2281        // resolve A and AAAA records
2282        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2283            for answer in records.iter() {
2284                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2285                    if dns_a.expires_soon(now) {
2286                        trace!(
2287                            "Addr expired or expires soon: {}",
2288                            dns_a.address().to_ip_addr()
2289                        );
2290                    } else {
2291                        resolved_service.addresses.insert(dns_a.address());
2292                    }
2293                }
2294            }
2295        }
2296
2297        Ok(resolved_service)
2298    }
2299
2300    fn handle_poller_events(&mut self, events: &mio::Events) {
2301        for ev in events.iter() {
2302            trace!("event received with key {:?}", ev.token());
2303            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2304                // Drain signals as we will drain commands as well.
2305                self.signal_sock_drain();
2306
2307                if let Err(e) = self.poller.registry().reregister(
2308                    &mut self.signal_sock,
2309                    ev.token(),
2310                    mio::Interest::READABLE,
2311                ) {
2312                    debug!("failed to modify poller for signal socket: {}", e);
2313                }
2314                continue; // Next event.
2315            }
2316
2317            // Read until no more packets available.
2318            while self.handle_read(ev.token().0) {}
2319
2320            // we continue to monitor this socket.
2321            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2322                // Re-register the IPv4 socket for reading.
2323                if let Some(sock) = self.ipv4_sock.as_mut() {
2324                    if let Err(e) =
2325                        self.poller
2326                            .registry()
2327                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2328                    {
2329                        debug!("modify poller for IPv4 socket: {}", e);
2330                    }
2331                }
2332            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2333                // Re-register the IPv6 socket for reading.
2334                if let Some(sock) = self.ipv6_sock.as_mut() {
2335                    if let Err(e) =
2336                        self.poller
2337                            .registry()
2338                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2339                    {
2340                        debug!("modify poller for IPv6 socket: {}", e);
2341                    }
2342                }
2343            }
2344        }
2345    }
2346
2347    /// Deal with incoming response packets.  All answers
2348    /// are held in the cache, and listeners are notified.
2349    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2350        let now = current_time_millis();
2351
2352        // remove records that are expired.
2353        let mut record_predicate = |record: &DnsRecordBox| {
2354            if !record.get_record().is_expired(now) {
2355                return true;
2356            }
2357
2358            debug!("record is expired, removing it from cache.");
2359            if self.cache.remove(record) {
2360                // for PTR records, send event to listeners
2361                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2362                    call_service_listener(
2363                        &self.service_queriers,
2364                        dns_ptr.get_name(),
2365                        ServiceEvent::ServiceRemoved(
2366                            dns_ptr.get_name().to_string(),
2367                            dns_ptr.alias().to_string(),
2368                        ),
2369                    );
2370                }
2371            }
2372            false
2373        };
2374        msg.answers_mut().retain(&mut record_predicate);
2375        msg.authorities_mut().retain(&mut record_predicate);
2376        msg.additionals_mut().retain(&mut record_predicate);
2377
2378        // check possible conflicts and handle them.
2379        self.conflict_handler(&msg, if_index);
2380
2381        // check if the message is for us.
2382        let mut is_for_us = true; // assume it is for us.
2383
2384        // If there are any PTR records in the answers, there should be
2385        // at least one PTR for us. Otherwise, the message is not for us.
2386        // If there are no PTR records at all, assume this message is for us.
2387        for answer in msg.answers() {
2388            if answer.get_type() == RRType::PTR {
2389                if self.service_queriers.contains_key(answer.get_name()) {
2390                    is_for_us = true;
2391                    break; // OK to break: at least one PTR for us.
2392                } else {
2393                    is_for_us = false;
2394                }
2395            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2396                // If there is a hostname querier for this address, then it is for us.
2397                let answer_lowercase = answer.get_name().to_lowercase();
2398                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2399                    is_for_us = true;
2400                    break; // OK to break: at least one hostname for us.
2401                }
2402            }
2403        }
2404
2405        // if we explicitily want to accept unsolicited responses, we should consider all messages as for us.
2406        if self.accept_unsolicited {
2407            is_for_us = true;
2408        }
2409
2410        /// Represents a DNS record change that involves one service instance.
2411        struct InstanceChange {
2412            ty: RRType,   // The type of DNS record for the instance.
2413            name: String, // The name of the record.
2414        }
2415
2416        // Go through all answers to get the new and updated records.
2417        // For new PTR records, send out ServiceFound immediately. For others,
2418        // collect them into `changes`.
2419        //
2420        // Note: we don't try to identify the update instances based on
2421        // each record immediately as the answers are likely related to each
2422        // other.
2423        let mut changes = Vec::new();
2424        let mut timers = Vec::new();
2425        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2426            return;
2427        };
2428        for record in msg.all_records() {
2429            match self
2430                .cache
2431                .add_or_update(my_intf, record, &mut timers, is_for_us)
2432            {
2433                Some((dns_record, true)) => {
2434                    timers.push(dns_record.record.get_record().get_expire_time());
2435                    timers.push(dns_record.record.get_record().get_refresh_time());
2436
2437                    let ty = dns_record.record.get_type();
2438                    let name = dns_record.record.get_name();
2439
2440                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2441                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2442                        if self.service_queriers.contains_key(name) {
2443                            timers.push(dns_record.record.get_record().get_refresh_time());
2444                        }
2445
2446                        // send ServiceFound
2447                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2448                        {
2449                            debug!("calling listener with service found: {name}");
2450                            call_service_listener(
2451                                &self.service_queriers,
2452                                name,
2453                                ServiceEvent::ServiceFound(
2454                                    name.to_string(),
2455                                    dns_ptr.alias().to_string(),
2456                                ),
2457                            );
2458                            changes.push(InstanceChange {
2459                                ty,
2460                                name: dns_ptr.alias().to_string(),
2461                            });
2462                        }
2463                    } else {
2464                        changes.push(InstanceChange {
2465                            ty,
2466                            name: name.to_string(),
2467                        });
2468                    }
2469                }
2470                Some((dns_record, false)) => {
2471                    timers.push(dns_record.record.get_record().get_expire_time());
2472                    timers.push(dns_record.record.get_record().get_refresh_time());
2473                }
2474                _ => {}
2475            }
2476        }
2477
2478        // Add timers for the new records.
2479        for t in timers {
2480            self.add_timer(t);
2481        }
2482
2483        // Go through remaining changes to see if any hostname resolutions were found or updated.
2484        for change in changes
2485            .iter()
2486            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2487        {
2488            let addr_map = self.cache.get_addresses_for_host(&change.name);
2489            for (name, addresses) in addr_map {
2490                call_hostname_resolution_listener(
2491                    &self.hostname_resolvers,
2492                    &change.name,
2493                    HostnameResolutionEvent::AddressesFound(name, addresses),
2494                )
2495            }
2496        }
2497
2498        // Identify the instances that need to be "resolved".
2499        let mut updated_instances = HashSet::new();
2500        for update in changes {
2501            match update.ty {
2502                RRType::PTR | RRType::SRV | RRType::TXT => {
2503                    updated_instances.insert(update.name);
2504                }
2505                RRType::A | RRType::AAAA => {
2506                    let instances = self.cache.get_instances_on_host(&update.name);
2507                    updated_instances.extend(instances);
2508                }
2509                _ => {}
2510            }
2511        }
2512
2513        self.resolve_updated_instances(&updated_instances);
2514    }
2515
2516    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2517        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2518            debug!("handle_response: no intf found for index {if_index}");
2519            return;
2520        };
2521
2522        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2523            return;
2524        };
2525
2526        for answer in msg.answers().iter() {
2527            let mut new_records = Vec::new();
2528
2529            let name = answer.get_name();
2530            let Some(probe) = dns_registry.probing.get_mut(name) else {
2531                continue;
2532            };
2533
2534            // check against possible multicast forwarding
2535            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2536                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2537                    if answer_addr.interface_id.index != if_index {
2538                        debug!(
2539                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2540                            answer_addr, my_intf.name
2541                        );
2542                        continue;
2543                    }
2544                }
2545
2546                // double check if any other address record matches rrdata,
2547                // as there could be multiple addresses for the same name.
2548                let any_match = probe.records.iter().any(|r| {
2549                    r.get_type() == answer.get_type()
2550                        && r.get_class() == answer.get_class()
2551                        && r.rrdata_match(answer.as_ref())
2552                });
2553                if any_match {
2554                    continue; // no conflict for this answer.
2555                }
2556            }
2557
2558            probe.records.retain(|record| {
2559                if record.get_type() == answer.get_type()
2560                    && record.get_class() == answer.get_class()
2561                    && !record.rrdata_match(answer.as_ref())
2562                {
2563                    debug!(
2564                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2565                        record.get_type(),
2566                        record.rdata_print(),
2567                        answer.rdata_print()
2568                    );
2569
2570                    // create a new name for this record
2571                    // then remove the old record in probing.
2572                    let mut new_record = record.clone();
2573                    let new_name = match record.get_type() {
2574                        RRType::A => hostname_change(name),
2575                        RRType::AAAA => hostname_change(name),
2576                        _ => name_change(name),
2577                    };
2578                    new_record.get_record_mut().set_new_name(new_name);
2579                    new_records.push(new_record);
2580                    return false; // old record is dropped from the probe.
2581                }
2582
2583                true
2584            });
2585
2586            // ?????
2587            // if probe.records.is_empty() {
2588            //     dns_registry.probing.remove(name);
2589            // }
2590
2591            // Probing again with the new names.
2592            let create_time = current_time_millis() + fastrand::u64(0..250);
2593
2594            let waiting_services = probe.waiting_services.clone();
2595
2596            for record in new_records {
2597                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2598                    self.timers.push(Reverse(create_time));
2599                }
2600
2601                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2602                dns_registry.name_changes.insert(
2603                    record.get_record().get_original_name().to_string(),
2604                    record.get_name().to_string(),
2605                );
2606
2607                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2608                    Some(p) => p,
2609                    None => {
2610                        let new_probe = dns_registry
2611                            .probing
2612                            .entry(record.get_name().to_string())
2613                            .or_insert_with(|| {
2614                                debug!("conflict handler: new probe of {}", record.get_name());
2615                                Probe::new(create_time)
2616                            });
2617                        self.timers.push(Reverse(new_probe.next_send));
2618                        new_probe
2619                    }
2620                };
2621
2622                debug!(
2623                    "insert record with new name '{}' {} into probe",
2624                    record.get_name(),
2625                    record.get_type()
2626                );
2627                new_probe.insert_record(record);
2628
2629                new_probe.waiting_services.extend(waiting_services.clone());
2630            }
2631        }
2632    }
2633
2634    /// Resolve the updated (including new) instances.
2635    ///
2636    /// Note: it is possible that more than 1 PTR pointing to the same
2637    /// instance. For example, a regular service type PTR and a sub-type
2638    /// service type PTR can both point to the same service instance.
2639    /// This loop automatically handles the sub-type PTRs.
2640    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2641        let mut resolved: HashSet<String> = HashSet::new();
2642        let mut unresolved: HashSet<String> = HashSet::new();
2643        let mut removed_instances = HashMap::new();
2644
2645        let now = current_time_millis();
2646
2647        for (ty_domain, records) in self.cache.all_ptr().iter() {
2648            if !self.service_queriers.contains_key(ty_domain) {
2649                // No need to resolve if not in our queries.
2650                continue;
2651            }
2652
2653            for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2654                let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2655                    continue;
2656                };
2657
2658                let instance = dns_ptr.alias();
2659                if !updated_instances.contains(instance) {
2660                    continue;
2661                }
2662
2663                let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2664                else {
2665                    continue;
2666                };
2667
2668                debug!("resolve_updated_instances: from cache: {instance}");
2669                if resolved_service.is_valid() {
2670                    debug!("call queriers to resolve {instance}");
2671                    resolved.insert(instance.to_string());
2672                    let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2673                    call_service_listener(&self.service_queriers, ty_domain, event);
2674                } else {
2675                    debug!("Resolved service is not valid: {instance}");
2676                    if self.resolved.remove(dns_ptr.alias()) {
2677                        removed_instances
2678                            .entry(ty_domain.to_string())
2679                            .or_insert_with(HashSet::new)
2680                            .insert(instance.to_string());
2681                    }
2682                    unresolved.insert(instance.to_string());
2683                }
2684            }
2685        }
2686
2687        for instance in resolved.drain() {
2688            self.pending_resolves.remove(&instance);
2689            self.resolved.insert(instance);
2690        }
2691
2692        for instance in unresolved.drain() {
2693            self.add_pending_resolve(instance);
2694        }
2695
2696        if !removed_instances.is_empty() {
2697            debug!(
2698                "resolve_updated_instances: removed {}",
2699                &removed_instances.len()
2700            );
2701            self.notify_service_removal(removed_instances);
2702        }
2703    }
2704
2705    /// Handle incoming query packets, figure out whether and what to respond.
2706    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2707        let sock_opt = if is_ipv4 {
2708            &self.ipv4_sock
2709        } else {
2710            &self.ipv6_sock
2711        };
2712        let Some(sock) = sock_opt.as_ref() else {
2713            debug!("handle_query: socket not available for intf {}", if_index);
2714            return;
2715        };
2716        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2717
2718        // Special meta-query "_services._dns-sd._udp.<Domain>".
2719        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2720        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2721
2722        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2723            debug!("missing dns registry for intf {}", if_index);
2724            return;
2725        };
2726
2727        let Some(intf) = self.my_intfs.get(&if_index) else {
2728            debug!("handle_query: no intf found for index {if_index}");
2729            return;
2730        };
2731
2732        for question in msg.questions().iter() {
2733            let qtype = question.entry_type();
2734
2735            if qtype == RRType::PTR {
2736                for service in self.my_services.values() {
2737                    if service.get_status(if_index) != ServiceStatus::Announced {
2738                        continue;
2739                    }
2740
2741                    if question.entry_name() == service.get_type()
2742                        || service
2743                            .get_subtype()
2744                            .as_ref()
2745                            .is_some_and(|v| v == question.entry_name())
2746                    {
2747                        add_answer_with_additionals(
2748                            &mut out,
2749                            &msg,
2750                            service,
2751                            intf,
2752                            dns_registry,
2753                            is_ipv4,
2754                        );
2755                    } else if question.entry_name() == META_QUERY {
2756                        let ptr_added = out.add_answer(
2757                            &msg,
2758                            DnsPointer::new(
2759                                question.entry_name(),
2760                                RRType::PTR,
2761                                CLASS_IN,
2762                                service.get_other_ttl(),
2763                                service.get_type().to_string(),
2764                            ),
2765                        );
2766                        if !ptr_added {
2767                            trace!("answer was not added for meta-query {:?}", &question);
2768                        }
2769                    }
2770                }
2771            } else {
2772                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2773                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2774                    let probe_name = question.entry_name();
2775
2776                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2777                        let now = current_time_millis();
2778
2779                        // Only do tiebreaking if probe already started.
2780                        // This check also helps avoid redo tiebreaking if start time
2781                        // was postponed.
2782                        if probe.start_time < now {
2783                            let incoming_records: Vec<_> = msg
2784                                .authorities()
2785                                .iter()
2786                                .filter(|r| r.get_name() == probe_name)
2787                                .collect();
2788
2789                            probe.tiebreaking(&incoming_records, now, probe_name);
2790                        }
2791                    }
2792                }
2793
2794                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2795                    for service in self.my_services.values() {
2796                        if service.get_status(if_index) != ServiceStatus::Announced {
2797                            continue;
2798                        }
2799
2800                        let service_hostname =
2801                            match dns_registry.name_changes.get(service.get_hostname()) {
2802                                Some(new_name) => new_name,
2803                                None => service.get_hostname(),
2804                            };
2805
2806                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2807                            let intf_addrs = if is_ipv4 {
2808                                service.get_addrs_on_my_intf_v4(intf)
2809                            } else {
2810                                service.get_addrs_on_my_intf_v6(intf)
2811                            };
2812                            if intf_addrs.is_empty()
2813                                && (qtype == RRType::A || qtype == RRType::AAAA)
2814                            {
2815                                let t = match qtype {
2816                                    RRType::A => "TYPE_A",
2817                                    RRType::AAAA => "TYPE_AAAA",
2818                                    _ => "invalid_type",
2819                                };
2820                                trace!(
2821                                    "Cannot find valid addrs for {} response on intf {:?}",
2822                                    t,
2823                                    &intf
2824                                );
2825                                return;
2826                            }
2827                            for address in intf_addrs {
2828                                out.add_answer(
2829                                    &msg,
2830                                    DnsAddress::new(
2831                                        service_hostname,
2832                                        ip_address_rr_type(&address),
2833                                        CLASS_IN | CLASS_CACHE_FLUSH,
2834                                        service.get_host_ttl(),
2835                                        address,
2836                                        intf.into(),
2837                                    ),
2838                                );
2839                            }
2840                        }
2841                    }
2842                }
2843
2844                let query_name = question.entry_name().to_lowercase();
2845                let service_opt = self
2846                    .my_services
2847                    .iter()
2848                    .find(|(k, _v)| {
2849                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2850                            Some(new_name) => new_name,
2851                            None => k,
2852                        };
2853                        service_name == &query_name
2854                    })
2855                    .map(|(_, v)| v);
2856
2857                let Some(service) = service_opt else {
2858                    continue;
2859                };
2860
2861                if service.get_status(if_index) != ServiceStatus::Announced {
2862                    continue;
2863                }
2864
2865                let intf_addrs = if is_ipv4 {
2866                    service.get_addrs_on_my_intf_v4(intf)
2867                } else {
2868                    service.get_addrs_on_my_intf_v6(intf)
2869                };
2870                if intf_addrs.is_empty() {
2871                    debug!(
2872                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2873                        &intf
2874                    );
2875                    continue;
2876                }
2877
2878                add_answer_of_service(
2879                    &mut out,
2880                    &msg,
2881                    question.entry_name(),
2882                    service,
2883                    qtype,
2884                    intf_addrs,
2885                );
2886            }
2887        }
2888
2889        if out.answers_count() > 0 {
2890            debug!("sending response on intf {}", &intf.name);
2891            out.set_id(msg.id());
2892            send_dns_outgoing(&out, intf, &sock.pktinfo, self.port);
2893
2894            let if_name = intf.name.clone();
2895
2896            self.increase_counter(Counter::Respond, 1);
2897            self.notify_monitors(DaemonEvent::Respond(if_name));
2898        }
2899
2900        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2901    }
2902
2903    /// Increases the value of `counter` by `count`.
2904    fn increase_counter(&mut self, counter: Counter, count: i64) {
2905        let key = counter.to_string();
2906        match self.counters.get_mut(&key) {
2907            Some(v) => *v += count,
2908            None => {
2909                self.counters.insert(key, count);
2910            }
2911        }
2912    }
2913
2914    /// Sets the value of `counter` to `count`.
2915    fn set_counter(&mut self, counter: Counter, count: i64) {
2916        let key = counter.to_string();
2917        self.counters.insert(key, count);
2918    }
2919
2920    fn signal_sock_drain(&self) {
2921        let mut signal_buf = [0; 1024];
2922
2923        // This recv is non-blocking as the socket is non-blocking.
2924        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2925            trace!(
2926                "signal socket recvd: {}",
2927                String::from_utf8_lossy(&signal_buf[0..sz])
2928            );
2929        }
2930    }
2931
2932    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2933        self.retransmissions.push(ReRun { next_time, command });
2934        self.add_timer(next_time);
2935    }
2936
2937    /// Sends service removal event to listeners for expired service records.
2938    /// `expired`: map of service type domain to set of instance names.
2939    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2940        for (ty_domain, sender) in self.service_queriers.iter() {
2941            if let Some(instances) = expired.get(ty_domain) {
2942                for instance_name in instances {
2943                    let event = ServiceEvent::ServiceRemoved(
2944                        ty_domain.to_string(),
2945                        instance_name.to_string(),
2946                    );
2947                    match sender.send(event) {
2948                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2949                        Err(e) => debug!("Failed to send event: {}", e),
2950                    }
2951                }
2952            }
2953        }
2954    }
2955
2956    /// The entry point that executes all commands received by the daemon.
2957    ///
2958    /// `repeating`: whether this is a retransmission.
2959    fn exec_command(&mut self, command: Command, repeating: bool) {
2960        trace!("exec_command: {:?} repeating: {}", &command, repeating);
2961        match command {
2962            Command::Browse(ty, next_delay, cache_only, listener) => {
2963                self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
2964            }
2965
2966            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2967                self.exec_command_resolve_hostname(
2968                    repeating, hostname, next_delay, listener, timeout,
2969                );
2970            }
2971
2972            Command::Register(service_info) => {
2973                self.register_service(*service_info);
2974                self.increase_counter(Counter::Register, 1);
2975            }
2976
2977            Command::RegisterResend(fullname, intf) => {
2978                trace!("register-resend service: {fullname} on {}", &intf);
2979                self.exec_command_register_resend(fullname, intf);
2980            }
2981
2982            Command::Unregister(fullname, resp_s) => {
2983                trace!("unregister service {} repeat {}", &fullname, &repeating);
2984                self.exec_command_unregister(repeating, fullname, resp_s);
2985            }
2986
2987            Command::UnregisterResend(packet, if_index, is_ipv4) => {
2988                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2989            }
2990
2991            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2992
2993            Command::StopResolveHostname(hostname) => {
2994                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2995            }
2996
2997            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2998
2999            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3000
3001            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3002                Ok(()) => trace!("Sent status to the client"),
3003                Err(e) => debug!("Failed to send status: {}", e),
3004            },
3005
3006            Command::Monitor(resp_s) => {
3007                self.monitors.push(resp_s);
3008            }
3009
3010            Command::SetOption(daemon_opt) => {
3011                self.process_set_option(daemon_opt);
3012            }
3013
3014            Command::GetOption(resp_s) => {
3015                let val = DaemonOptionVal {
3016                    _service_name_len_max: self.service_name_len_max,
3017                    ip_check_interval: self.ip_check_interval,
3018                };
3019                if let Err(e) = resp_s.send(val) {
3020                    debug!("Failed to send options: {}", e);
3021                }
3022            }
3023
3024            Command::Verify(instance_fullname, timeout) => {
3025                self.exec_command_verify(instance_fullname, timeout, repeating);
3026            }
3027
3028            _ => {
3029                debug!("unexpected command: {:?}", &command);
3030            }
3031        }
3032    }
3033
3034    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3035        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3036        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3037        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3038        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3039        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3040        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3041        self.set_counter(Counter::Timer, self.timers.len() as i64);
3042
3043        let dns_registry_probe_count: usize = self
3044            .dns_registry_map
3045            .values()
3046            .map(|r| r.probing.len())
3047            .sum();
3048        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3049
3050        let dns_registry_active_count: usize = self
3051            .dns_registry_map
3052            .values()
3053            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3054            .sum();
3055        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3056
3057        let dns_registry_timer_count: usize = self
3058            .dns_registry_map
3059            .values()
3060            .map(|r| r.new_timers.len())
3061            .sum();
3062        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3063
3064        let dns_registry_name_change_count: usize = self
3065            .dns_registry_map
3066            .values()
3067            .map(|r| r.name_changes.len())
3068            .sum();
3069        self.set_counter(
3070            Counter::DnsRegistryNameChange,
3071            dns_registry_name_change_count as i64,
3072        );
3073
3074        // Send the metrics to the client.
3075        if let Err(e) = resp_s.send(self.counters.clone()) {
3076            debug!("Failed to send metrics: {}", e);
3077        }
3078    }
3079
3080    fn exec_command_browse(
3081        &mut self,
3082        repeating: bool,
3083        ty: String,
3084        next_delay: u32,
3085        cache_only: bool,
3086        listener: Sender<ServiceEvent>,
3087    ) {
3088        let pretty_addrs: Vec<String> = self
3089            .my_intfs
3090            .iter()
3091            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3092            .collect();
3093
3094        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3095            "{ty} on {} interfaces [{}]",
3096            pretty_addrs.len(),
3097            pretty_addrs.join(", ")
3098        ))) {
3099            debug!(
3100                "Failed to send SearchStarted({})(repeating:{}): {}",
3101                &ty, repeating, e
3102            );
3103            return;
3104        }
3105
3106        let now = current_time_millis();
3107        if !repeating {
3108            // Binds a `listener` to querying mDNS domain type `ty`.
3109            //
3110            // If there is already a `listener`, it will be updated, i.e. overwritten.
3111            self.service_queriers.insert(ty.clone(), listener.clone());
3112
3113            // if we already have the records in our cache, just send them
3114            self.query_cache_for_service(&ty, &listener, now);
3115        }
3116
3117        if cache_only {
3118            // If cache_only is true, we do not send a query.
3119            match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3120                Ok(()) => debug!("SearchStopped sent for {}", &ty),
3121                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3122            }
3123            return;
3124        }
3125
3126        self.send_query(&ty, RRType::PTR);
3127        self.increase_counter(Counter::Browse, 1);
3128
3129        let next_time = now + (next_delay * 1000) as u64;
3130        let max_delay = 60 * 60;
3131        let delay = cmp::min(next_delay * 2, max_delay);
3132        self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3133    }
3134
3135    fn exec_command_resolve_hostname(
3136        &mut self,
3137        repeating: bool,
3138        hostname: String,
3139        next_delay: u32,
3140        listener: Sender<HostnameResolutionEvent>,
3141        timeout: Option<u64>,
3142    ) {
3143        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3144        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3145            "{} on addrs {:?}",
3146            &hostname, &addr_list
3147        ))) {
3148            debug!(
3149                "Failed to send ResolveStarted({})(repeating:{}): {}",
3150                &hostname, repeating, e
3151            );
3152            return;
3153        }
3154        if !repeating {
3155            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3156            // if we already have the records in our cache, just send them
3157            self.query_cache_for_hostname(&hostname, listener.clone());
3158        }
3159
3160        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3161        self.increase_counter(Counter::ResolveHostname, 1);
3162
3163        let now = current_time_millis();
3164        let next_time = now + u64::from(next_delay) * 1000;
3165        let max_delay = 60 * 60;
3166        let delay = cmp::min(next_delay * 2, max_delay);
3167
3168        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3169        if self
3170            .hostname_resolvers
3171            .get(&hostname)
3172            .and_then(|(_sender, timeout)| *timeout)
3173            .map(|timeout| next_time < timeout)
3174            .unwrap_or(true)
3175        {
3176            self.add_retransmission(
3177                next_time,
3178                Command::ResolveHostname(hostname, delay, listener, None),
3179            );
3180        }
3181    }
3182
3183    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3184        let pending_query = self.query_unresolved(&instance);
3185        let max_try = 3;
3186        if pending_query && try_count < max_try {
3187            // Note that if the current try already succeeds, the next retransmission
3188            // will be no-op as the cache has been updated.
3189            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3190            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3191        }
3192    }
3193
3194    fn exec_command_unregister(
3195        &mut self,
3196        repeating: bool,
3197        fullname: String,
3198        resp_s: Sender<UnregisterStatus>,
3199    ) {
3200        let response = match self.my_services.remove_entry(&fullname) {
3201            None => {
3202                debug!("unregister: cannot find such service {}", &fullname);
3203                UnregisterStatus::NotFound
3204            }
3205            Some((_k, info)) => {
3206                let mut timers = Vec::new();
3207
3208                for (if_index, intf) in self.my_intfs.iter() {
3209                    if let Some(sock) = self.ipv4_sock.as_ref() {
3210                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3211                        // repeat for one time just in case some peers miss the message
3212                        if !repeating && !packet.is_empty() {
3213                            let next_time = current_time_millis() + 120;
3214                            self.retransmissions.push(ReRun {
3215                                next_time,
3216                                command: Command::UnregisterResend(packet, *if_index, true),
3217                            });
3218                            timers.push(next_time);
3219                        }
3220                    }
3221
3222                    // ipv6
3223                    if let Some(sock) = self.ipv6_sock.as_ref() {
3224                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3225                        if !repeating && !packet.is_empty() {
3226                            let next_time = current_time_millis() + 120;
3227                            self.retransmissions.push(ReRun {
3228                                next_time,
3229                                command: Command::UnregisterResend(packet, *if_index, false),
3230                            });
3231                            timers.push(next_time);
3232                        }
3233                    }
3234                }
3235
3236                for t in timers {
3237                    self.add_timer(t);
3238                }
3239
3240                self.increase_counter(Counter::Unregister, 1);
3241                UnregisterStatus::OK
3242            }
3243        };
3244        if let Err(e) = resp_s.send(response) {
3245            debug!("unregister: failed to send response: {}", e);
3246        }
3247    }
3248
3249    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3250        let Some(intf) = self.my_intfs.get(&if_index) else {
3251            return;
3252        };
3253        let sock_opt = if is_ipv4 {
3254            &self.ipv4_sock
3255        } else {
3256            &self.ipv6_sock
3257        };
3258        let Some(sock) = sock_opt else {
3259            return;
3260        };
3261
3262        let if_addr = if is_ipv4 {
3263            match intf.next_ifaddr_v4() {
3264                Some(addr) => addr,
3265                None => return,
3266            }
3267        } else {
3268            match intf.next_ifaddr_v6() {
3269                Some(addr) => addr,
3270                None => return,
3271            }
3272        };
3273
3274        debug!("UnregisterResend from {:?}", if_addr);
3275        multicast_on_intf(
3276            &packet[..],
3277            &intf.name,
3278            intf.index,
3279            if_addr,
3280            &sock.pktinfo,
3281            self.port,
3282        );
3283
3284        self.increase_counter(Counter::UnregisterResend, 1);
3285    }
3286
3287    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3288        match self.service_queriers.remove_entry(&ty_domain) {
3289            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3290            Some((ty, sender)) => {
3291                // Remove pending browse commands in the reruns.
3292                trace!("StopBrowse: removed queryer for {}", &ty);
3293                let mut i = 0;
3294                while i < self.retransmissions.len() {
3295                    if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3296                        if t == &ty {
3297                            self.retransmissions.remove(i);
3298                            trace!("StopBrowse: removed retransmission for {}", &ty);
3299                            continue;
3300                        }
3301                    }
3302                    i += 1;
3303                }
3304
3305                // Remove cache entries.
3306                self.cache.remove_service_type(&ty_domain);
3307
3308                // Notify the client.
3309                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3310                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3311                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3312                }
3313            }
3314        }
3315    }
3316
3317    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3318        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3319            // Remove pending resolve commands in the reruns.
3320            trace!("StopResolve: removed queryer for {}", &host);
3321            let mut i = 0;
3322            while i < self.retransmissions.len() {
3323                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3324                    if t == &host {
3325                        self.retransmissions.remove(i);
3326                        trace!("StopResolve: removed retransmission for {}", &host);
3327                        continue;
3328                    }
3329                }
3330                i += 1;
3331            }
3332
3333            // Notify the client.
3334            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3335                Ok(()) => trace!("Sent SearchStopped to the listener"),
3336                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3337            }
3338        }
3339    }
3340
3341    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3342        let Some(info) = self.my_services.get_mut(&fullname) else {
3343            trace!("announce: cannot find such service {}", &fullname);
3344            return;
3345        };
3346
3347        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3348            return;
3349        };
3350
3351        let Some(intf) = self.my_intfs.get(&if_index) else {
3352            return;
3353        };
3354
3355        let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3356            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)
3357        } else {
3358            false
3359        };
3360        let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3361            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)
3362        } else {
3363            false
3364        };
3365
3366        if announced_v4 || announced_v6 {
3367            let mut hostname = info.get_hostname();
3368            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3369                hostname = new_name;
3370            }
3371            let service_name = match dns_registry.name_changes.get(&fullname) {
3372                Some(new_name) => new_name.to_string(),
3373                None => fullname,
3374            };
3375
3376            debug!("resend: announce service {service_name} on {}", intf.name);
3377
3378            notify_monitors(
3379                &mut self.monitors,
3380                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3381            );
3382            info.set_status(if_index, ServiceStatus::Announced);
3383        } else {
3384            debug!("register-resend should not fail");
3385        }
3386
3387        self.increase_counter(Counter::RegisterResend, 1);
3388    }
3389
3390    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3391        /*
3392        RFC 6762 section 10.4:
3393        ...
3394        When the cache receives this hint that it should reconfirm some
3395        record, it MUST issue two or more queries for the resource record in
3396        dispute.  If no response is received within ten seconds, then, even
3397        though its TTL may indicate that it is not yet due to expire, that
3398        record SHOULD be promptly flushed from the cache.
3399        */
3400        let now = current_time_millis();
3401        let expire_at = if repeating {
3402            None
3403        } else {
3404            Some(now + timeout.as_millis() as u64)
3405        };
3406
3407        // send query for the resource records.
3408        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3409
3410        if !record_vec.is_empty() {
3411            let query_vec: Vec<(&str, RRType)> = record_vec
3412                .iter()
3413                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3414                .collect();
3415            self.send_query_vec(&query_vec);
3416
3417            if let Some(new_expire) = expire_at {
3418                self.add_timer(new_expire); // ensure a check for the new expire time.
3419
3420                // schedule a resend 1 second later
3421                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3422            }
3423        }
3424    }
3425
3426    /// Refresh cached service records with active queriers
3427    fn refresh_active_services(&mut self) {
3428        let mut query_ptr_count = 0;
3429        let mut query_srv_count = 0;
3430        let mut new_timers = HashSet::new();
3431        let mut query_addr_count = 0;
3432
3433        for (ty_domain, _sender) in self.service_queriers.iter() {
3434            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3435            if !refreshed_timers.is_empty() {
3436                trace!("sending refresh query for PTR: {}", ty_domain);
3437                self.send_query(ty_domain, RRType::PTR);
3438                query_ptr_count += 1;
3439                new_timers.extend(refreshed_timers);
3440            }
3441
3442            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3443            for (instance, types) in instances {
3444                trace!("sending refresh query for: {}", &instance);
3445                let query_vec = types
3446                    .into_iter()
3447                    .map(|ty| (instance.as_str(), ty))
3448                    .collect::<Vec<_>>();
3449                self.send_query_vec(&query_vec);
3450                query_srv_count += 1;
3451            }
3452            new_timers.extend(timers);
3453            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3454            for hostname in hostnames.iter() {
3455                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3456                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3457                query_addr_count += 2;
3458            }
3459            new_timers.extend(timers);
3460        }
3461
3462        for timer in new_timers {
3463            self.add_timer(timer);
3464        }
3465
3466        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3467        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3468        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3469    }
3470}
3471
3472/// Adds one or more answers of a service for incoming msg and RR entry name.
3473fn add_answer_of_service(
3474    out: &mut DnsOutgoing,
3475    msg: &DnsIncoming,
3476    entry_name: &str,
3477    service: &ServiceInfo,
3478    qtype: RRType,
3479    intf_addrs: Vec<IpAddr>,
3480) {
3481    if qtype == RRType::SRV || qtype == RRType::ANY {
3482        out.add_answer(
3483            msg,
3484            DnsSrv::new(
3485                entry_name,
3486                CLASS_IN | CLASS_CACHE_FLUSH,
3487                service.get_host_ttl(),
3488                service.get_priority(),
3489                service.get_weight(),
3490                service.get_port(),
3491                service.get_hostname().to_string(),
3492            ),
3493        );
3494    }
3495
3496    if qtype == RRType::TXT || qtype == RRType::ANY {
3497        out.add_answer(
3498            msg,
3499            DnsTxt::new(
3500                entry_name,
3501                CLASS_IN | CLASS_CACHE_FLUSH,
3502                service.get_other_ttl(),
3503                service.generate_txt(),
3504            ),
3505        );
3506    }
3507
3508    if qtype == RRType::SRV {
3509        for address in intf_addrs {
3510            out.add_additional_answer(DnsAddress::new(
3511                service.get_hostname(),
3512                ip_address_rr_type(&address),
3513                CLASS_IN | CLASS_CACHE_FLUSH,
3514                service.get_host_ttl(),
3515                address,
3516                InterfaceId::default(),
3517            ));
3518        }
3519    }
3520}
3521
3522/// All possible events sent to the client from the daemon
3523/// regarding service discovery.
3524#[derive(Clone, Debug)]
3525#[non_exhaustive]
3526pub enum ServiceEvent {
3527    /// Started searching for a service type.
3528    SearchStarted(String),
3529
3530    /// Found a specific (service_type, fullname).
3531    ServiceFound(String, String),
3532
3533    /// Resolved a service instance in a ResolvedService struct.
3534    ServiceResolved(Box<ResolvedService>),
3535
3536    /// A service instance (service_type, fullname) was removed.
3537    ServiceRemoved(String, String),
3538
3539    /// Stopped searching for a service type.
3540    SearchStopped(String),
3541}
3542
3543/// All possible events sent to the client from the daemon
3544/// regarding host resolution.
3545#[derive(Clone, Debug)]
3546#[non_exhaustive]
3547pub enum HostnameResolutionEvent {
3548    /// Started searching for the ip address of a hostname.
3549    SearchStarted(String),
3550    /// One or more addresses for a hostname has been found.
3551    AddressesFound(String, HashSet<ScopedIp>),
3552    /// One or more addresses for a hostname has been removed.
3553    AddressesRemoved(String, HashSet<ScopedIp>),
3554    /// The search for the ip address of a hostname has timed out.
3555    SearchTimeout(String),
3556    /// Stopped searching for the ip address of a hostname.
3557    SearchStopped(String),
3558}
3559
3560/// Some notable events from the daemon besides [`ServiceEvent`].
3561/// These events are expected to happen infrequently.
3562#[derive(Clone, Debug)]
3563#[non_exhaustive]
3564pub enum DaemonEvent {
3565    /// Daemon unsolicitly announced a service from an interface.
3566    Announce(String, String),
3567
3568    /// Daemon encountered an error.
3569    Error(Error),
3570
3571    /// Daemon detected a new IP address from the host.
3572    IpAdd(IpAddr),
3573
3574    /// Daemon detected a IP address removed from the host.
3575    IpDel(IpAddr),
3576
3577    /// Daemon resolved a name conflict by changing one of its names.
3578    /// see [DnsNameChange] for more details.
3579    NameChange(DnsNameChange),
3580
3581    /// Send out a multicast response via an interface.
3582    Respond(String),
3583}
3584
3585/// Represents a name change due to a name conflict resolution.
3586/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3587#[derive(Clone, Debug)]
3588pub struct DnsNameChange {
3589    /// The original name set in `ServiceInfo` by the user.
3590    pub original: String,
3591
3592    /// A new name is created by appending a suffix after the original name.
3593    ///
3594    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3595    /// - for a host name, the suffix is `-N`, where N starts at 2.
3596    ///
3597    /// For example:
3598    ///
3599    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3600    /// - Host name `foo.local.` becomes `foo-2.local.`
3601    pub new_name: String,
3602
3603    /// The resource record type
3604    pub rr_type: RRType,
3605
3606    /// The interface where the name conflict and its change happened.
3607    pub intf_name: String,
3608}
3609
3610/// Commands supported by the daemon
3611#[derive(Debug)]
3612enum Command {
3613    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3614    Browse(String, u32, bool, Sender<ServiceEvent>),
3615
3616    /// Resolve a hostname to IP addresses.
3617    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3618
3619    /// Register a service
3620    Register(Box<ServiceInfo>),
3621
3622    /// Unregister a service
3623    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3624
3625    /// Announce again a service to local network
3626    RegisterResend(String, u32), // (fullname)
3627
3628    /// Resend unregister packet.
3629    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
3630
3631    /// Stop browsing a service type
3632    StopBrowse(String), // (ty_domain)
3633
3634    /// Stop resolving a hostname
3635    StopResolveHostname(String), // (hostname)
3636
3637    /// Send query to resolve a service instance.
3638    /// This is used when a PTR record exists but SRV & TXT records are missing.
3639    Resolve(String, u16), // (service_instance_fullname, try_count)
3640
3641    /// Read the current values of the counters
3642    GetMetrics(Sender<Metrics>),
3643
3644    /// Get the current status of the daemon.
3645    GetStatus(Sender<DaemonStatus>),
3646
3647    /// Monitor noticeable events in the daemon.
3648    Monitor(Sender<DaemonEvent>),
3649
3650    SetOption(DaemonOption),
3651
3652    GetOption(Sender<DaemonOptionVal>),
3653
3654    /// Proactively confirm a DNS resource record.
3655    ///
3656    /// The intention is to check if a service name or IP address still valid
3657    /// before its TTL expires.
3658    Verify(String, Duration),
3659
3660    Exit(Sender<DaemonStatus>),
3661}
3662
3663impl fmt::Display for Command {
3664    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3665        match self {
3666            Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3667            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3668            Self::Exit(_) => write!(f, "Command Exit"),
3669            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3670            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3671            Self::Monitor(_) => write!(f, "Command Monitor"),
3672            Self::Register(_) => write!(f, "Command Register"),
3673            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3674            Self::SetOption(_) => write!(f, "Command SetOption"),
3675            Self::GetOption(_) => write!(f, "Command GetOption"),
3676            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3677            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3678            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3679            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3680            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3681            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3682        }
3683    }
3684}
3685
3686struct DaemonOptionVal {
3687    _service_name_len_max: u8,
3688    ip_check_interval: u64,
3689}
3690
3691#[derive(Debug)]
3692enum DaemonOption {
3693    ServiceNameLenMax(u8),
3694    IpCheckInterval(u64),
3695    EnableInterface(Vec<IfKind>),
3696    DisableInterface(Vec<IfKind>),
3697    MulticastLoopV4(bool),
3698    MulticastLoopV6(bool),
3699    AcceptUnsolicited(bool),
3700    #[cfg(test)]
3701    TestDownInterface(String),
3702    #[cfg(test)]
3703    TestUpInterface(String),
3704}
3705
3706/// The length of Service Domain name supported in this lib.
3707const DOMAIN_LEN: usize = "._tcp.local.".len();
3708
3709/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3710fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3711    if ty_domain.len() <= DOMAIN_LEN + 1 {
3712        // service name cannot be empty or only '_'.
3713        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3714    }
3715
3716    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3717    if service_name_len > limit as usize {
3718        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3719    }
3720    Ok(())
3721}
3722
3723/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3724fn check_domain_suffix(name: &str) -> Result<()> {
3725    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3726        return Err(e_fmt!(
3727            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3728            name
3729        ));
3730    }
3731
3732    Ok(())
3733}
3734
3735/// Validate the service name in a fully qualified name.
3736///
3737/// A Full Name = <Instance>.<Service>.<Domain>
3738/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3739///
3740/// Note: this function does not check for the length of the service name.
3741/// Instead, `register_service` method will check the length.
3742fn check_service_name(fullname: &str) -> Result<()> {
3743    check_domain_suffix(fullname)?;
3744
3745    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3746    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3747
3748    if &name[0..1] != "_" {
3749        return Err(e_fmt!("Service name must start with '_'"));
3750    }
3751
3752    let name = &name[1..];
3753
3754    if name.contains("--") {
3755        return Err(e_fmt!("Service name must not contain '--'"));
3756    }
3757
3758    if name.starts_with('-') || name.ends_with('-') {
3759        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3760    }
3761
3762    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3763    if ascii_count < 1 {
3764        return Err(e_fmt!(
3765            "Service name must contain at least one letter (eg: 'A-Za-z')"
3766        ));
3767    }
3768
3769    Ok(())
3770}
3771
3772/// Validate a hostname.
3773fn check_hostname(hostname: &str) -> Result<()> {
3774    if !hostname.ends_with(".local.") {
3775        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3776    }
3777
3778    if hostname == ".local." {
3779        return Err(e_fmt!(
3780            "The part of the hostname before '.local.' cannot be empty"
3781        ));
3782    }
3783
3784    if hostname.len() > 255 {
3785        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3786    }
3787
3788    Ok(())
3789}
3790
3791fn call_service_listener(
3792    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3793    ty_domain: &str,
3794    event: ServiceEvent,
3795) {
3796    if let Some(listener) = listeners_map.get(ty_domain) {
3797        match listener.send(event) {
3798            Ok(()) => trace!("Sent event to listener successfully"),
3799            Err(e) => debug!("Failed to send event: {}", e),
3800        }
3801    }
3802}
3803
3804fn call_hostname_resolution_listener(
3805    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3806    hostname: &str,
3807    event: HostnameResolutionEvent,
3808) {
3809    let hostname_lower = hostname.to_lowercase();
3810    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3811        match listener.send(event) {
3812            Ok(()) => trace!("Sent event to listener successfully"),
3813            Err(e) => debug!("Failed to send event: {}", e),
3814        }
3815    }
3816}
3817
3818/// Returns valid network interfaces in the host system.
3819/// Operational down interfaces are excluded.
3820/// Loopback interfaces are excluded if `with_loopback` is false.
3821fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3822    if_addrs::get_if_addrs()
3823        .unwrap_or_default()
3824        .into_iter()
3825        .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3826        .collect()
3827}
3828
3829/// Send an outgoing mDNS query or response, and returns the packet bytes.
3830/// Returns empty vec if no valid interface address is found.
3831fn send_dns_outgoing(
3832    out: &DnsOutgoing,
3833    my_intf: &MyIntf,
3834    sock: &PktInfoUdpSocket,
3835    port: u16,
3836) -> Vec<Vec<u8>> {
3837    let if_name = &my_intf.name;
3838
3839    let if_addr = if sock.domain() == Domain::IPV4 {
3840        match my_intf.next_ifaddr_v4() {
3841            Some(addr) => addr,
3842            None => return vec![],
3843        }
3844    } else {
3845        match my_intf.next_ifaddr_v6() {
3846            Some(addr) => addr,
3847            None => return vec![],
3848        }
3849    };
3850
3851    send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock, port)
3852}
3853
3854/// Send an outgoing mDNS query or response, and returns the packet bytes.
3855fn send_dns_outgoing_impl(
3856    out: &DnsOutgoing,
3857    if_name: &str,
3858    if_index: u32,
3859    if_addr: &IfAddr,
3860    sock: &PktInfoUdpSocket,
3861    port: u16,
3862) -> Vec<Vec<u8>> {
3863    let qtype = if out.is_query() {
3864        "query"
3865    } else {
3866        if out.answers_count() == 0 && out.additionals().is_empty() {
3867            return vec![]; // no need to send empty response
3868        }
3869        "response"
3870    };
3871    trace!(
3872        "send {}: {} questions {} answers {} authorities {} additional",
3873        qtype,
3874        out.questions().len(),
3875        out.answers_count(),
3876        out.authorities().len(),
3877        out.additionals().len()
3878    );
3879
3880    match if_addr.ip() {
3881        IpAddr::V4(ipv4) => {
3882            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3883                debug!(
3884                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3885                    ipv4, e
3886                );
3887                return vec![]; // cannot send without a valid interface
3888            }
3889        }
3890        IpAddr::V6(ipv6) => {
3891            if let Err(e) = sock.set_multicast_if_v6(if_index) {
3892                debug!(
3893                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3894                    ipv6, e
3895                );
3896                return vec![]; // cannot send without a valid interface
3897            }
3898        }
3899    }
3900
3901    let packet_list = out.to_data_on_wire();
3902    for packet in packet_list.iter() {
3903        multicast_on_intf(packet, if_name, if_index, if_addr, sock, port);
3904    }
3905    packet_list
3906}
3907
3908/// Sends a multicast packet, and returns the packet bytes.
3909fn multicast_on_intf(
3910    packet: &[u8],
3911    if_name: &str,
3912    if_index: u32,
3913    if_addr: &IfAddr,
3914    socket: &PktInfoUdpSocket,
3915    port: u16,
3916) {
3917    if packet.len() > MAX_MSG_ABSOLUTE {
3918        debug!("Drop over-sized packet ({})", packet.len());
3919        return;
3920    }
3921
3922    let addr: SocketAddr = match if_addr {
3923        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
3924        if_addrs::IfAddr::V6(_) => {
3925            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
3926            sock.set_scope_id(if_index); // Choose iface for multicast
3927            sock.into()
3928        }
3929    };
3930
3931    // Sends out `packet` to `addr` on the socket.
3932    let sock_addr = addr.into();
3933    match socket.send_to(packet, &sock_addr) {
3934        Ok(sz) => trace!(
3935            "sent out {} bytes on interface {} (idx {}) addr {}",
3936            sz,
3937            if_name,
3938            if_index,
3939            if_addr.ip()
3940        ),
3941        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3942    }
3943}
3944
3945/// Returns true if `name` is a valid instance name of format:
3946/// <instance>.<service_type>.<_udp|_tcp>.local.
3947/// Note: <instance> could contain '.' as well.
3948fn valid_instance_name(name: &str) -> bool {
3949    name.split('.').count() >= 5
3950}
3951
3952fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3953    monitors.retain(|sender| {
3954        if let Err(e) = sender.try_send(event.clone()) {
3955            debug!("notify_monitors: try_send: {}", &e);
3956            if matches!(e, TrySendError::Disconnected(_)) {
3957                return false; // This monitor is dropped.
3958            }
3959        }
3960        true
3961    });
3962}
3963
3964/// Check if all unique records passed "probing", and if yes, create a packet
3965/// to announce the service.
3966fn prepare_announce(
3967    info: &ServiceInfo,
3968    intf: &MyIntf,
3969    dns_registry: &mut DnsRegistry,
3970    is_ipv4: bool,
3971) -> Option<DnsOutgoing> {
3972    let intf_addrs = if is_ipv4 {
3973        info.get_addrs_on_my_intf_v4(intf)
3974    } else {
3975        info.get_addrs_on_my_intf_v6(intf)
3976    };
3977
3978    if intf_addrs.is_empty() {
3979        debug!(
3980            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3981            &intf.name
3982        );
3983        return None;
3984    }
3985
3986    // check if we changed our name due to conflicts.
3987    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3988        Some(new_name) => new_name,
3989        None => info.get_fullname(),
3990    };
3991
3992    debug!(
3993        "prepare to announce service {service_fullname} on {:?}",
3994        &intf_addrs
3995    );
3996
3997    let mut probing_count = 0;
3998    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3999    let create_time = current_time_millis() + fastrand::u64(0..250);
4000
4001    out.add_answer_at_time(
4002        DnsPointer::new(
4003            info.get_type(),
4004            RRType::PTR,
4005            CLASS_IN,
4006            info.get_other_ttl(),
4007            service_fullname.to_string(),
4008        ),
4009        0,
4010    );
4011
4012    if let Some(sub) = info.get_subtype() {
4013        trace!("Adding subdomain {}", sub);
4014        out.add_answer_at_time(
4015            DnsPointer::new(
4016                sub,
4017                RRType::PTR,
4018                CLASS_IN,
4019                info.get_other_ttl(),
4020                service_fullname.to_string(),
4021            ),
4022            0,
4023        );
4024    }
4025
4026    // SRV records.
4027    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
4028        Some(new_name) => new_name.to_string(),
4029        None => info.get_hostname().to_string(),
4030    };
4031
4032    let mut srv = DnsSrv::new(
4033        info.get_fullname(),
4034        CLASS_IN | CLASS_CACHE_FLUSH,
4035        info.get_host_ttl(),
4036        info.get_priority(),
4037        info.get_weight(),
4038        info.get_port(),
4039        hostname,
4040    );
4041
4042    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4043        srv.get_record_mut().set_new_name(new_name.to_string());
4044    }
4045
4046    if !info.requires_probe()
4047        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4048    {
4049        out.add_answer_at_time(srv, 0);
4050    } else {
4051        probing_count += 1;
4052    }
4053
4054    // TXT records.
4055
4056    let mut txt = DnsTxt::new(
4057        info.get_fullname(),
4058        CLASS_IN | CLASS_CACHE_FLUSH,
4059        info.get_other_ttl(),
4060        info.generate_txt(),
4061    );
4062
4063    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4064        txt.get_record_mut().set_new_name(new_name.to_string());
4065    }
4066
4067    if !info.requires_probe()
4068        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4069    {
4070        out.add_answer_at_time(txt, 0);
4071    } else {
4072        probing_count += 1;
4073    }
4074
4075    // Address records. (A and AAAA)
4076
4077    let hostname = info.get_hostname();
4078    for address in intf_addrs {
4079        let mut dns_addr = DnsAddress::new(
4080            hostname,
4081            ip_address_rr_type(&address),
4082            CLASS_IN | CLASS_CACHE_FLUSH,
4083            info.get_host_ttl(),
4084            address,
4085            intf.into(),
4086        );
4087
4088        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4089            dns_addr.get_record_mut().set_new_name(new_name.to_string());
4090        }
4091
4092        if !info.requires_probe()
4093            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4094        {
4095            out.add_answer_at_time(dns_addr, 0);
4096        } else {
4097            probing_count += 1;
4098        }
4099    }
4100
4101    if probing_count > 0 {
4102        return None;
4103    }
4104
4105    Some(out)
4106}
4107
4108/// Send an unsolicited response for owned service via `intf` and `sock`.
4109/// Returns true if sent out successfully for IPv4 or IPv6.
4110fn announce_service_on_intf(
4111    dns_registry: &mut DnsRegistry,
4112    info: &ServiceInfo,
4113    intf: &MyIntf,
4114    sock: &PktInfoUdpSocket,
4115    port: u16,
4116) -> bool {
4117    let is_ipv4 = sock.domain() == Domain::IPV4;
4118    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4119        send_dns_outgoing(&out, intf, sock, port);
4120        return true;
4121    }
4122
4123    false
4124}
4125
4126/// Returns a new name based on the `original` to avoid conflicts.
4127/// If the name already contains a number in parentheses, increments that number.
4128///
4129/// Examples:
4130/// - `foo.local.` becomes `foo (2).local.`
4131/// - `foo (2).local.` becomes `foo (3).local.`
4132/// - `foo (9)` becomes `foo (10)`
4133fn name_change(original: &str) -> String {
4134    let mut parts: Vec<_> = original.split('.').collect();
4135    let Some(first_part) = parts.get_mut(0) else {
4136        return format!("{original} (2)");
4137    };
4138
4139    let mut new_name = format!("{first_part} (2)");
4140
4141    // check if there is already has `(<num>)` suffix.
4142    if let Some(paren_pos) = first_part.rfind(" (") {
4143        // Check if there's a closing parenthesis
4144        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4145            let absolute_end_pos = paren_pos + end_paren;
4146            // Only process if the closing parenthesis is the last character
4147            if absolute_end_pos == first_part.len() - 1 {
4148                let num_start = paren_pos + 2; // Skip " ("
4149                                               // Try to parse the number between parentheses
4150                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4151                    let base_name = &first_part[..paren_pos];
4152                    new_name = format!("{} ({})", base_name, number + 1)
4153                }
4154            }
4155        }
4156    }
4157
4158    *first_part = &new_name;
4159    parts.join(".")
4160}
4161
4162/// Returns a new name based on the `original` to avoid conflicts.
4163/// If the name already contains a hyphenated number, increments that number.
4164///
4165/// Examples:
4166/// - `foo.local.` becomes `foo-2.local.`
4167/// - `foo-2.local.` becomes `foo-3.local.`
4168/// - `foo` becomes `foo-2`
4169fn hostname_change(original: &str) -> String {
4170    let mut parts: Vec<_> = original.split('.').collect();
4171    let Some(first_part) = parts.get_mut(0) else {
4172        return format!("{original}-2");
4173    };
4174
4175    let mut new_name = format!("{first_part}-2");
4176
4177    // check if there is already a `-<num>` suffix
4178    if let Some(hyphen_pos) = first_part.rfind('-') {
4179        // Try to parse everything after the hyphen as a number
4180        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4181            let base_name = &first_part[..hyphen_pos];
4182            new_name = format!("{}-{}", base_name, number + 1);
4183        }
4184    }
4185
4186    *first_part = &new_name;
4187    parts.join(".")
4188}
4189
4190fn add_answer_with_additionals(
4191    out: &mut DnsOutgoing,
4192    msg: &DnsIncoming,
4193    service: &ServiceInfo,
4194    intf: &MyIntf,
4195    dns_registry: &DnsRegistry,
4196    is_ipv4: bool,
4197) {
4198    let intf_addrs = if is_ipv4 {
4199        service.get_addrs_on_my_intf_v4(intf)
4200    } else {
4201        service.get_addrs_on_my_intf_v6(intf)
4202    };
4203    if intf_addrs.is_empty() {
4204        trace!("No addrs on LAN of intf {:?}", intf);
4205        return;
4206    }
4207
4208    // check if we changed our name due to conflicts.
4209    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4210        Some(new_name) => new_name,
4211        None => service.get_fullname(),
4212    };
4213
4214    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4215        Some(new_name) => new_name,
4216        None => service.get_hostname(),
4217    };
4218
4219    let ptr_added = out.add_answer(
4220        msg,
4221        DnsPointer::new(
4222            service.get_type(),
4223            RRType::PTR,
4224            CLASS_IN,
4225            service.get_other_ttl(),
4226            service_fullname.to_string(),
4227        ),
4228    );
4229
4230    if !ptr_added {
4231        trace!("answer was not added for msg {:?}", msg);
4232        return;
4233    }
4234
4235    if let Some(sub) = service.get_subtype() {
4236        trace!("Adding subdomain {}", sub);
4237        out.add_additional_answer(DnsPointer::new(
4238            sub,
4239            RRType::PTR,
4240            CLASS_IN,
4241            service.get_other_ttl(),
4242            service_fullname.to_string(),
4243        ));
4244    }
4245
4246    // Add recommended additional answers according to
4247    // https://tools.ietf.org/html/rfc6763#section-12.1.
4248    out.add_additional_answer(DnsSrv::new(
4249        service_fullname,
4250        CLASS_IN | CLASS_CACHE_FLUSH,
4251        service.get_host_ttl(),
4252        service.get_priority(),
4253        service.get_weight(),
4254        service.get_port(),
4255        hostname.to_string(),
4256    ));
4257
4258    out.add_additional_answer(DnsTxt::new(
4259        service_fullname,
4260        CLASS_IN | CLASS_CACHE_FLUSH,
4261        service.get_other_ttl(),
4262        service.generate_txt(),
4263    ));
4264
4265    for address in intf_addrs {
4266        out.add_additional_answer(DnsAddress::new(
4267            hostname,
4268            ip_address_rr_type(&address),
4269            CLASS_IN | CLASS_CACHE_FLUSH,
4270            service.get_host_ttl(),
4271            address,
4272            intf.into(),
4273        ));
4274    }
4275}
4276
4277/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4278/// that are finished.
4279fn check_probing(
4280    dns_registry: &mut DnsRegistry,
4281    timers: &mut BinaryHeap<Reverse<u64>>,
4282    now: u64,
4283) -> (DnsOutgoing, Vec<String>) {
4284    let mut expired_probes = Vec::new();
4285    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4286
4287    for (name, probe) in dns_registry.probing.iter_mut() {
4288        if now >= probe.next_send {
4289            if probe.expired(now) {
4290                // move the record to active
4291                expired_probes.push(name.clone());
4292            } else {
4293                out.add_question(name, RRType::ANY);
4294
4295                /*
4296                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4297                ...
4298                for tiebreaking to work correctly in all
4299                cases, the Authority Section must contain *all* the records and
4300                proposed rdata being probed for uniqueness.
4301                    */
4302                for record in probe.records.iter() {
4303                    out.add_authority(record.clone());
4304                }
4305
4306                probe.update_next_send(now);
4307
4308                // add timer
4309                timers.push(Reverse(probe.next_send));
4310            }
4311        }
4312    }
4313
4314    (out, expired_probes)
4315}
4316
4317/// Process expired probes on an interface and return a list of services
4318/// that are waiting for the probe to finish.
4319///
4320/// `DnsNameChange` events are sent to the monitors.
4321fn handle_expired_probes(
4322    expired_probes: Vec<String>,
4323    intf_name: &str,
4324    dns_registry: &mut DnsRegistry,
4325    monitors: &mut Vec<Sender<DaemonEvent>>,
4326) -> HashSet<String> {
4327    let mut waiting_services = HashSet::new();
4328
4329    for name in expired_probes {
4330        let Some(probe) = dns_registry.probing.remove(&name) else {
4331            continue;
4332        };
4333
4334        // send notifications about name changes
4335        for record in probe.records.iter() {
4336            if let Some(new_name) = record.get_record().get_new_name() {
4337                dns_registry
4338                    .name_changes
4339                    .insert(name.clone(), new_name.to_string());
4340
4341                let event = DnsNameChange {
4342                    original: record.get_record().get_original_name().to_string(),
4343                    new_name: new_name.to_string(),
4344                    rr_type: record.get_type(),
4345                    intf_name: intf_name.to_string(),
4346                };
4347                debug!("Name change event: {:?}", &event);
4348                notify_monitors(monitors, DaemonEvent::NameChange(event));
4349            }
4350        }
4351
4352        // move RR from probe to active.
4353        debug!(
4354            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4355            probe.records.len(),
4356            probe.waiting_services.len(),
4357        );
4358
4359        // Move records to active and plan to wake up services if records are not empty.
4360        if !probe.records.is_empty() {
4361            match dns_registry.active.get_mut(&name) {
4362                Some(records) => {
4363                    records.extend(probe.records);
4364                }
4365                None => {
4366                    dns_registry.active.insert(name, probe.records);
4367                }
4368            }
4369
4370            waiting_services.extend(probe.waiting_services);
4371        }
4372    }
4373
4374    waiting_services
4375}
4376
4377#[cfg(test)]
4378mod tests {
4379    use super::{
4380        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4381        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4382        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, MDNS_PORT,
4383    };
4384    use crate::{
4385        dns_parser::{
4386            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4387            FLAGS_AA, FLAGS_QR_RESPONSE,
4388        },
4389        service_daemon::{add_answer_of_service, check_hostname},
4390    };
4391    use std::time::{Duration, SystemTime};
4392    use test_log::test;
4393
4394    #[test]
4395    fn test_instance_name() {
4396        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4397        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4398        assert!(!valid_instance_name("_printer._tcp.local."));
4399    }
4400
4401    #[test]
4402    fn test_check_service_name_length() {
4403        let result = check_service_name_length("_tcp", 100);
4404        assert!(result.is_err());
4405        if let Err(e) = result {
4406            println!("{}", e);
4407        }
4408    }
4409
4410    #[test]
4411    fn test_check_hostname() {
4412        // valid hostnames
4413        for hostname in &[
4414            "my_host.local.",
4415            &("A".repeat(255 - ".local.".len()) + ".local."),
4416        ] {
4417            let result = check_hostname(hostname);
4418            assert!(result.is_ok());
4419        }
4420
4421        // erroneous hostnames
4422        for hostname in &[
4423            "my_host.local",
4424            ".local.",
4425            &("A".repeat(256 - ".local.".len()) + ".local."),
4426        ] {
4427            let result = check_hostname(hostname);
4428            assert!(result.is_err());
4429            if let Err(e) = result {
4430                println!("{}", e);
4431            }
4432        }
4433    }
4434
4435    #[test]
4436    fn test_check_domain_suffix() {
4437        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4438        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4439        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4440        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4441        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4442        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4443    }
4444
4445    #[test]
4446    fn test_service_with_temporarily_invalidated_ptr() {
4447        // Create a daemon
4448        let d = ServiceDaemon::new().expect("Failed to create daemon");
4449
4450        let service = "_test_inval_ptr._udp.local.";
4451        let host_name = "my_host_tmp_invalidated_ptr.local.";
4452        let intfs: Vec<_> = my_ip_interfaces(false);
4453        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4454        let port = 5201;
4455        let my_service =
4456            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4457                .expect("invalid service info")
4458                .enable_addr_auto();
4459        let result = d.register(my_service.clone());
4460        assert!(result.is_ok());
4461
4462        // Browse for a service
4463        let browse_chan = d.browse(service).unwrap();
4464        let timeout = Duration::from_secs(2);
4465        let mut resolved = false;
4466
4467        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4468            match event {
4469                ServiceEvent::ServiceResolved(info) => {
4470                    resolved = true;
4471                    println!("Resolved a service of {}", &info.fullname);
4472                    break;
4473                }
4474                e => {
4475                    println!("Received event {:?}", e);
4476                }
4477            }
4478        }
4479
4480        assert!(resolved);
4481
4482        println!("Stopping browse of {}", service);
4483        // Pause browsing so restarting will cause a new immediate query.
4484        // Unregistering will not work here, it will invalidate all the records.
4485        d.stop_browse(service).unwrap();
4486
4487        // Ensure the search is stopped.
4488        // Reduces the chance of receiving an answer adding the ptr back to the
4489        // cache causing the later browse to return directly from the cache.
4490        // (which invalidates what this test is trying to test for.)
4491        let mut stopped = false;
4492        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4493            match event {
4494                ServiceEvent::SearchStopped(_) => {
4495                    stopped = true;
4496                    println!("Stopped browsing service");
4497                    break;
4498                }
4499                // Other `ServiceResolved` messages may be received
4500                // here as they come from different interfaces.
4501                // That's fine for this test.
4502                e => {
4503                    println!("Received event {:?}", e);
4504                }
4505            }
4506        }
4507
4508        assert!(stopped);
4509
4510        // Invalidate the ptr from the service to the host.
4511        let invalidate_ptr_packet = DnsPointer::new(
4512            my_service.get_type(),
4513            RRType::PTR,
4514            CLASS_IN,
4515            0,
4516            my_service.get_fullname().to_string(),
4517        );
4518
4519        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4520        packet_buffer.add_additional_answer(invalidate_ptr_packet);
4521
4522        for intf in intfs {
4523            let sock = _new_socket_bind(&intf, true).unwrap();
4524            send_dns_outgoing_impl(
4525                &packet_buffer,
4526                &intf.name,
4527                intf.index.unwrap_or(0),
4528                &intf.addr,
4529                &sock.pktinfo,
4530                MDNS_PORT,
4531            );
4532        }
4533
4534        println!(
4535            "Sent PTR record invalidation. Starting second browse for {}",
4536            service
4537        );
4538
4539        // Restart the browse to force the sender to re-send the announcements.
4540        let browse_chan = d.browse(service).unwrap();
4541
4542        resolved = false;
4543        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4544            match event {
4545                ServiceEvent::ServiceResolved(info) => {
4546                    resolved = true;
4547                    println!("Resolved a service of {}", &info.fullname);
4548                    break;
4549                }
4550                e => {
4551                    println!("Received event {:?}", e);
4552                }
4553            }
4554        }
4555
4556        assert!(resolved);
4557        d.shutdown().unwrap();
4558    }
4559
4560    #[test]
4561    fn test_expired_srv() {
4562        // construct service info
4563        let service_type = "_expired-srv._udp.local.";
4564        let instance = "test_instance";
4565        let host_name = "expired_srv_host.local.";
4566        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4567            .unwrap()
4568            .enable_addr_auto();
4569        // let fullname = my_service.get_fullname().to_string();
4570
4571        // set SRV to expire soon.
4572        let new_ttl = 3; // for testing only.
4573        my_service._set_host_ttl(new_ttl);
4574
4575        // register my service
4576        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4577        let result = mdns_server.register(my_service);
4578        assert!(result.is_ok());
4579
4580        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4581        let browse_chan = mdns_client.browse(service_type).unwrap();
4582        let timeout = Duration::from_secs(2);
4583        let mut resolved = false;
4584
4585        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4586            if let ServiceEvent::ServiceResolved(info) = event {
4587                resolved = true;
4588                println!("Resolved a service of {}", &info.fullname);
4589                break;
4590            }
4591        }
4592
4593        assert!(resolved);
4594
4595        // Exit the server so that no more responses.
4596        mdns_server.shutdown().unwrap();
4597
4598        // SRV record in the client cache will expire.
4599        let expire_timeout = Duration::from_secs(new_ttl as u64);
4600        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4601            if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4602                println!("Service removed: {}: {}", &service_type, &full_name);
4603                break;
4604            }
4605        }
4606    }
4607
4608    #[test]
4609    fn test_hostname_resolution_address_removed() {
4610        // Create a mDNS server
4611        let server = ServiceDaemon::new().expect("Failed to create server");
4612        let hostname = "addr_remove_host._tcp.local.";
4613        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4614            .iter()
4615            .find(|iface| iface.ip().is_ipv4())
4616            .map(|iface| iface.ip().into())
4617            .unwrap();
4618
4619        let mut my_service = ServiceInfo::new(
4620            "_host_res_test._tcp.local.",
4621            "my_instance",
4622            hostname,
4623            service_ip_addr.to_ip_addr(),
4624            1234,
4625            None,
4626        )
4627        .expect("invalid service info");
4628
4629        // Set a short TTL for addresses for testing.
4630        let addr_ttl = 2;
4631        my_service._set_host_ttl(addr_ttl); // Expire soon
4632
4633        server.register(my_service).unwrap();
4634
4635        // Create a mDNS client for resolving the hostname.
4636        let client = ServiceDaemon::new().expect("Failed to create client");
4637        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4638        let resolved = loop {
4639            match event_receiver.recv() {
4640                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4641                    assert!(found_hostname == hostname);
4642                    assert!(addresses.contains(&service_ip_addr));
4643                    println!("address found: {:?}", &addresses);
4644                    break true;
4645                }
4646                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4647                Ok(_event) => {}
4648                Err(_) => break false,
4649            }
4650        };
4651
4652        assert!(resolved);
4653
4654        // Shutdown the server so no more responses / refreshes for addresses.
4655        server.shutdown().unwrap();
4656
4657        // Wait till hostname address record expires, with 1 second grace period.
4658        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4659        let removed = loop {
4660            match event_receiver.recv_timeout(timeout) {
4661                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4662                    assert!(removed_host == hostname);
4663                    assert!(addresses.contains(&service_ip_addr));
4664
4665                    println!(
4666                        "address removed: hostname: {} addresses: {:?}",
4667                        &hostname, &addresses
4668                    );
4669                    break true;
4670                }
4671                Ok(_event) => {}
4672                Err(_) => {
4673                    break false;
4674                }
4675            }
4676        };
4677
4678        assert!(removed);
4679
4680        client.shutdown().unwrap();
4681    }
4682
4683    #[test]
4684    fn test_refresh_ptr() {
4685        // construct service info
4686        let service_type = "_refresh-ptr._udp.local.";
4687        let instance = "test_instance";
4688        let host_name = "refresh_ptr_host.local.";
4689        let service_ip_addr = my_ip_interfaces(false)
4690            .iter()
4691            .find(|iface| iface.ip().is_ipv4())
4692            .map(|iface| iface.ip())
4693            .unwrap();
4694
4695        let mut my_service = ServiceInfo::new(
4696            service_type,
4697            instance,
4698            host_name,
4699            service_ip_addr,
4700            5023,
4701            None,
4702        )
4703        .unwrap();
4704
4705        let new_ttl = 3; // for testing only.
4706        my_service._set_other_ttl(new_ttl);
4707
4708        // register my service
4709        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4710        let result = mdns_server.register(my_service);
4711        assert!(result.is_ok());
4712
4713        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4714        let browse_chan = mdns_client.browse(service_type).unwrap();
4715        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4716        let mut resolved = false;
4717
4718        // resolve the service first.
4719        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4720            if let ServiceEvent::ServiceResolved(info) = event {
4721                resolved = true;
4722                println!("Resolved a service of {}", &info.fullname);
4723                break;
4724            }
4725        }
4726
4727        assert!(resolved);
4728
4729        // wait over 80% of TTL, and refresh PTR should be sent out.
4730        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4731        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4732            println!("event: {:?}", &event);
4733        }
4734
4735        // verify refresh counter.
4736        let metrics_chan = mdns_client.get_metrics().unwrap();
4737        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4738        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4739        assert_eq!(ptr_refresh_counter, 1);
4740        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4741        assert_eq!(srvtxt_refresh_counter, 1);
4742
4743        // Exit the server so that no more responses.
4744        mdns_server.shutdown().unwrap();
4745        mdns_client.shutdown().unwrap();
4746    }
4747
4748    #[test]
4749    fn test_name_change() {
4750        assert_eq!(name_change("foo.local."), "foo (2).local.");
4751        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4752        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4753        assert_eq!(name_change("foo"), "foo (2)");
4754        assert_eq!(name_change("foo (2)"), "foo (3)");
4755        assert_eq!(name_change(""), " (2)");
4756
4757        // Additional edge cases
4758        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4759        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4760        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4761    }
4762
4763    #[test]
4764    fn test_hostname_change() {
4765        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4766        assert_eq!(hostname_change("foo"), "foo-2");
4767        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4768        assert_eq!(hostname_change("foo-9"), "foo-10");
4769        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4770    }
4771
4772    #[test]
4773    fn test_add_answer_txt_ttl() {
4774        // construct a simple service info
4775        let service_type = "_test_add_answer._udp.local.";
4776        let instance = "test_instance";
4777        let host_name = "add_answer_host.local.";
4778        let service_intf = my_ip_interfaces(false)
4779            .into_iter()
4780            .find(|iface| iface.ip().is_ipv4())
4781            .unwrap();
4782        let service_ip_addr = service_intf.ip();
4783        let my_service = ServiceInfo::new(
4784            service_type,
4785            instance,
4786            host_name,
4787            service_ip_addr,
4788            5023,
4789            None,
4790        )
4791        .unwrap();
4792
4793        // construct a DnsOutgoing message
4794        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4795
4796        // Construct a dummy DnsIncoming message
4797        let mut dummy_data = out.to_data_on_wire();
4798        let interface_id = InterfaceId::from(&service_intf);
4799        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4800
4801        // Add an answer of TXT type for the service.
4802        let if_addrs = vec![service_intf.ip()];
4803        add_answer_of_service(
4804            &mut out,
4805            &incoming,
4806            instance,
4807            &my_service,
4808            RRType::TXT,
4809            if_addrs,
4810        );
4811
4812        // Check if the answer was added correctly
4813        assert!(
4814            out.answers_count() > 0,
4815            "No answers added to the outgoing message"
4816        );
4817
4818        // Check if the first answer is of type TXT
4819        let answer = out._answers().first().unwrap();
4820        assert_eq!(answer.0.get_type(), RRType::TXT);
4821
4822        // Check TTL is set properly for the TXT record
4823        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4824    }
4825
4826    #[test]
4827    fn test_interface_flip() {
4828        // start a server
4829        let ty_domain = "_intf-flip._udp.local.";
4830        let host_name = "intf_flip.local.";
4831        let now = SystemTime::now()
4832            .duration_since(SystemTime::UNIX_EPOCH)
4833            .unwrap();
4834        let instance_name = now.as_micros().to_string(); // Create a unique name.
4835        let port = 5200;
4836
4837        // Get a single IPv4 address
4838        let (ip_addr1, intf_name) = my_ip_interfaces(false)
4839            .iter()
4840            .find(|iface| iface.ip().is_ipv4())
4841            .map(|iface| (iface.ip(), iface.name.clone()))
4842            .unwrap();
4843
4844        println!("Using interface {} with IP {}", intf_name, ip_addr1);
4845
4846        // Register the service.
4847        let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
4848            .expect("valid service info");
4849        let server1 = ServiceDaemon::new().expect("failed to start server");
4850        server1
4851            .register(service1)
4852            .expect("Failed to register service1");
4853
4854        // wait for the service announced.
4855        std::thread::sleep(Duration::from_secs(2));
4856
4857        // start a client
4858        let client = ServiceDaemon::new().expect("failed to start client");
4859
4860        let receiver = client.browse(ty_domain).unwrap();
4861
4862        let timeout = Duration::from_secs(3);
4863        let mut got_data = false;
4864
4865        while let Ok(event) = receiver.recv_timeout(timeout) {
4866            if let ServiceEvent::ServiceResolved(_) = event {
4867                println!("Received ServiceResolved event");
4868                got_data = true;
4869                break;
4870            }
4871        }
4872
4873        assert!(got_data, "Should receive ServiceResolved event");
4874
4875        // Set a short IP check interval to detect interface changes quickly.
4876        client.set_ip_check_interval(1).unwrap();
4877
4878        // Now shutdown the interface and expect the client to lose the service.
4879        println!("Shutting down interface {}", &intf_name);
4880        client.test_down_interface(&intf_name).unwrap();
4881
4882        let mut got_removed = false;
4883
4884        while let Ok(event) = receiver.recv_timeout(timeout) {
4885            if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
4886                got_removed = true;
4887                println!("removed: {ty_domain} : {instance}");
4888                break;
4889            }
4890        }
4891        assert!(got_removed, "Should receive ServiceRemoved event");
4892
4893        println!("Bringing up interface {}", &intf_name);
4894        client.test_up_interface(&intf_name).unwrap();
4895        let mut got_data = false;
4896        while let Ok(event) = receiver.recv_timeout(timeout) {
4897            if let ServiceEvent::ServiceResolved(resolved) = event {
4898                got_data = true;
4899                println!("Received ServiceResolved: {:?}", resolved);
4900                break;
4901            }
4902        }
4903        assert!(
4904            got_data,
4905            "Should receive ServiceResolved event after interface is back up"
4906        );
4907
4908        server1.shutdown().unwrap();
4909        client.shutdown().unwrap();
4910    }
4911
4912    #[test]
4913    fn test_cache_only() {
4914        // construct service info
4915        let service_type = "_cache_only._udp.local.";
4916        let instance = "test_instance";
4917        let host_name = "cache_only_host.local.";
4918        let service_ip_addr = my_ip_interfaces(false)
4919            .iter()
4920            .find(|iface| iface.ip().is_ipv4())
4921            .map(|iface| iface.ip())
4922            .unwrap();
4923
4924        let mut my_service = ServiceInfo::new(
4925            service_type,
4926            instance,
4927            host_name,
4928            service_ip_addr,
4929            5023,
4930            None,
4931        )
4932        .unwrap();
4933
4934        let new_ttl = 3; // for testing only.
4935        my_service._set_other_ttl(new_ttl);
4936
4937        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4938
4939        // make a single browse request to record that we are interested in the service.  This ensures that
4940        // subsequent announcements are cached.
4941        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4942        std::thread::sleep(Duration::from_secs(2));
4943
4944        // register my service
4945        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4946        let result = mdns_server.register(my_service);
4947        assert!(result.is_ok());
4948
4949        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4950        let mut resolved = false;
4951
4952        // resolve the service.
4953        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4954            if let ServiceEvent::ServiceResolved(info) = event {
4955                resolved = true;
4956                println!("Resolved a service of {}", &info.get_fullname());
4957                break;
4958            }
4959        }
4960
4961        assert!(resolved);
4962
4963        // Exit the server so that no more responses.
4964        mdns_server.shutdown().unwrap();
4965        mdns_client.shutdown().unwrap();
4966    }
4967
4968    #[test]
4969    fn test_cache_only_unsolicited() {
4970        // construct service info
4971        let service_type = "_cache_only._udp.local.";
4972        let instance = "test_instance";
4973        let host_name = "cache_only_host.local.";
4974        let service_ip_addr = my_ip_interfaces(false)
4975            .iter()
4976            .find(|iface| iface.ip().is_ipv4())
4977            .map(|iface| iface.ip())
4978            .unwrap();
4979
4980        let mut my_service = ServiceInfo::new(
4981            service_type,
4982            instance,
4983            host_name,
4984            service_ip_addr,
4985            5023,
4986            None,
4987        )
4988        .unwrap();
4989
4990        let new_ttl = 3; // for testing only.
4991        my_service._set_other_ttl(new_ttl);
4992
4993        // register my service
4994        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4995        let result = mdns_server.register(my_service);
4996        assert!(result.is_ok());
4997
4998        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4999        mdns_client.accept_unsolicited(true).unwrap();
5000
5001        // Wait a bit for the service announcements to go out, before calling browse_cache.  This ensures
5002        // that the announcements are treated as unsolicited
5003        std::thread::sleep(Duration::from_secs(2));
5004        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5005        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5006        let mut resolved = false;
5007
5008        // resolve the service.
5009        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5010            if let ServiceEvent::ServiceResolved(info) = event {
5011                resolved = true;
5012                println!("Resolved a service of {}", &info.get_fullname());
5013                break;
5014            }
5015        }
5016
5017        assert!(resolved);
5018
5019        // Exit the server so that no more responses.
5020        mdns_server.shutdown().unwrap();
5021        mdns_client.shutdown().unwrap();
5022    }
5023
5024    #[test]
5025    fn test_custom_port_isolation() {
5026        // This test verifies:
5027        // 1. Daemons on a custom port can communicate with each other
5028        // 2. Daemons on different ports are isolated (no cross-talk)
5029
5030        let service_type = "_custom_port._udp.local.";
5031        let instance_custom = "custom_port_instance";
5032        let instance_default = "default_port_instance";
5033        let host_name = "custom_port_host.local.";
5034
5035        let service_ip_addr = my_ip_interfaces(false)
5036            .iter()
5037            .find(|iface| iface.ip().is_ipv4())
5038            .map(|iface| iface.ip())
5039            .expect("Test requires an IPv4 interface");
5040
5041        // Create service info for custom port (5454)
5042        let service_custom = ServiceInfo::new(
5043            service_type,
5044            instance_custom,
5045            host_name,
5046            service_ip_addr,
5047            8080,
5048            None,
5049        )
5050        .unwrap();
5051
5052        // Create service info for default port (5353)
5053        let service_default = ServiceInfo::new(
5054            service_type,
5055            instance_default,
5056            host_name,
5057            service_ip_addr,
5058            8081,
5059            None,
5060        )
5061        .unwrap();
5062
5063        // Create two daemons on custom port 5454
5064        let custom_port = 5454u16;
5065        let server_custom =
5066            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5067        let client_custom =
5068            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5069
5070        // Create daemon on default port (5353)
5071        let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5072
5073        // Register service on custom port
5074        server_custom
5075            .register(service_custom.clone())
5076            .expect("Failed to register custom port service");
5077
5078        // Register service on default port
5079        server_default
5080            .register(service_default.clone())
5081            .expect("Failed to register default port service");
5082
5083        // Browse from custom port client
5084        let browse_custom = client_custom
5085            .browse(service_type)
5086            .expect("Failed to browse on custom port");
5087
5088        let timeout = Duration::from_secs(3);
5089        let mut found_custom = false;
5090        let mut found_default_on_custom = false;
5091
5092        // Custom port client should find the custom port service
5093        while let Ok(event) = browse_custom.recv_timeout(timeout) {
5094            if let ServiceEvent::ServiceResolved(info) = event {
5095                println!(
5096                    "Custom port client resolved: {} on port {}",
5097                    info.get_fullname(),
5098                    info.get_port()
5099                );
5100                if info.get_fullname().starts_with(instance_custom) {
5101                    found_custom = true;
5102                    assert_eq!(info.get_port(), 8080);
5103                }
5104                if info.get_fullname().starts_with(instance_default) {
5105                    found_default_on_custom = true;
5106                }
5107            }
5108        }
5109
5110        assert!(
5111            found_custom,
5112            "Custom port client should find service on custom port"
5113        );
5114        assert!(
5115            !found_default_on_custom,
5116            "Custom port client should NOT find service on default port"
5117        );
5118
5119        // Now verify the default port daemon can find its own services
5120        // but not the custom port services
5121        let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5122        let browse_default = client_default
5123            .browse(service_type)
5124            .expect("Failed to browse on default port");
5125
5126        let mut found_default = false;
5127        let mut found_custom_on_default = false;
5128
5129        while let Ok(event) = browse_default.recv_timeout(timeout) {
5130            if let ServiceEvent::ServiceResolved(info) = event {
5131                println!(
5132                    "Default port client resolved: {} on port {}",
5133                    info.get_fullname(),
5134                    info.get_port()
5135                );
5136                if info.get_fullname().starts_with(instance_default) {
5137                    found_default = true;
5138                    assert_eq!(info.get_port(), 8081);
5139                }
5140                if info.get_fullname().starts_with(instance_custom) {
5141                    found_custom_on_default = true;
5142                }
5143            }
5144        }
5145
5146        assert!(
5147            found_default,
5148            "Default port client should find service on default port"
5149        );
5150        assert!(
5151            !found_custom_on_default,
5152            "Default port client should NOT find service on custom port"
5153        );
5154
5155        // Cleanup
5156        server_custom.shutdown().unwrap();
5157        client_custom.shutdown().unwrap();
5158        server_default.shutdown().unwrap();
5159        client_default.shutdown().unwrap();
5160    }
5161}