Skip to main content

mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache, IpType},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42    Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50    cmp::{self, Reverse},
51    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52    fmt, io,
53    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54    str, thread,
55    time::Duration,
56    vec,
57};
58
59/// The default max length of the service name without domain, not including the
60/// leading underscore (`_`). It is set to 15 per
61/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
62pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64/// The default interval for checking IP changes automatically.
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
68/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
69pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71/// The mDNS port number per RFC 6762.
72pub const MDNS_PORT: u16 = 5353;
73
74const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
75const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
76const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
77
78const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
79
80/// Response status code for the service `unregister` call.
81#[derive(Debug)]
82pub enum UnregisterStatus {
83    /// Unregister was successful.
84    OK,
85    /// The service was not found in the registration.
86    NotFound,
87}
88
89/// Status code for the service daemon.
90#[derive(Debug, PartialEq, Clone, Eq)]
91#[non_exhaustive]
92pub enum DaemonStatus {
93    /// The daemon is running as normal.
94    Running,
95
96    /// The daemon has been shutdown.
97    Shutdown,
98}
99
100/// Different counters included in the metrics.
101/// Currently all counters are for outgoing packets.
102#[derive(Hash, Eq, PartialEq)]
103enum Counter {
104    Register,
105    RegisterResend,
106    Unregister,
107    UnregisterResend,
108    Browse,
109    ResolveHostname,
110    Respond,
111    CacheRefreshPTR,
112    CacheRefreshSrvTxt,
113    CacheRefreshAddr,
114    KnownAnswerSuppression,
115    CachedPTR,
116    CachedSRV,
117    CachedAddr,
118    CachedTxt,
119    CachedNSec,
120    CachedSubtype,
121    DnsRegistryProbe,
122    DnsRegistryActive,
123    DnsRegistryTimer,
124    DnsRegistryNameChange,
125    Timer,
126}
127
128impl fmt::Display for Counter {
129    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
130        match self {
131            Self::Register => write!(f, "register"),
132            Self::RegisterResend => write!(f, "register-resend"),
133            Self::Unregister => write!(f, "unregister"),
134            Self::UnregisterResend => write!(f, "unregister-resend"),
135            Self::Browse => write!(f, "browse"),
136            Self::ResolveHostname => write!(f, "resolve-hostname"),
137            Self::Respond => write!(f, "respond"),
138            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
139            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
140            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
141            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
142            Self::CachedPTR => write!(f, "cached-ptr"),
143            Self::CachedSRV => write!(f, "cached-srv"),
144            Self::CachedAddr => write!(f, "cached-addr"),
145            Self::CachedTxt => write!(f, "cached-txt"),
146            Self::CachedNSec => write!(f, "cached-nsec"),
147            Self::CachedSubtype => write!(f, "cached-subtype"),
148            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
149            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
150            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
151            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
152            Self::Timer => write!(f, "timer"),
153        }
154    }
155}
156
157#[derive(Debug)]
158enum InternalError {
159    IntfAddrInvalid(Interface),
160}
161
162impl fmt::Display for InternalError {
163    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164        match self {
165            InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
166        }
167    }
168}
169
170type MyResult<T> = core::result::Result<T, InternalError>;
171
172/// A wrapper around UDP socket used by the mDNS daemon.
173///
174/// We do this because `mio` does not support PKTINFO and
175/// does not provide a way to implement `Source` trait directly and safely.
176struct MyUdpSocket {
177    /// The underlying socket that supports control messages like
178    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
179    pktinfo: PktInfoUdpSocket,
180
181    /// The mio UDP socket that is a clone of `pktinfo` and
182    /// is used for event polling.
183    mio: MioUdpSocket,
184}
185
186impl MyUdpSocket {
187    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
188        let std_sock = pktinfo.try_clone_std()?;
189        let mio = MioUdpSocket::from_std(std_sock);
190
191        Ok(Self { pktinfo, mio })
192    }
193}
194
195/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
196impl Source for MyUdpSocket {
197    fn register(
198        &mut self,
199        registry: &Registry,
200        token: Token,
201        interests: Interest,
202    ) -> io::Result<()> {
203        self.mio.register(registry, token, interests)
204    }
205
206    fn reregister(
207        &mut self,
208        registry: &Registry,
209        token: Token,
210        interests: Interest,
211    ) -> io::Result<()> {
212        self.mio.reregister(registry, token, interests)
213    }
214
215    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
216        self.mio.deregister(registry)
217    }
218}
219
220/// The metrics is a HashMap of (name_key, i64_value).
221/// The main purpose is to help monitoring the mDNS packet traffic.
222pub type Metrics = HashMap<String, i64>;
223
224const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
225const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
226const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
227
228/// A daemon thread for mDNS
229///
230/// This struct provides a handle and an API to the daemon. It is cloneable.
231#[derive(Clone)]
232pub struct ServiceDaemon {
233    /// Sender handle of the channel to the daemon.
234    sender: Sender<Command>,
235
236    /// Send to this addr to signal that a `Command` is coming.
237    ///
238    /// The daemon listens on this addr together with other mDNS sockets,
239    /// to avoid busy polling the flume channel. If there is a way to poll
240    /// the channel and mDNS sockets together, then this can be removed.
241    signal_addr: SocketAddr,
242}
243
244impl ServiceDaemon {
245    /// Creates a new daemon and spawns a thread to run the daemon.
246    ///
247    /// Creates a new mDNS service daemon using the default port (5353).
248    ///
249    /// For development/testing with custom ports, use [`ServiceDaemon::new_with_port`].
250    pub fn new() -> Result<Self> {
251        Self::new_with_port(MDNS_PORT)
252    }
253
254    /// Creates a new mDNS service daemon using a custom port.
255    ///
256    /// # Arguments
257    ///
258    /// * `port` - The UDP port to bind for mDNS communication.
259    ///   - In production, this should be `MDNS_PORT` (5353) per RFC 6762.
260    ///   - For development/testing, you can use a non-standard port (e.g., 5454)
261    ///     to avoid conflicts with system mDNS services.
262    ///   - Both publisher and browser must use the same port to communicate.
263    ///
264    /// # Example
265    ///
266    /// ```no_run
267    /// use mdns_sd::ServiceDaemon;
268    ///
269    /// // Use standard mDNS port (production)
270    /// let daemon = ServiceDaemon::new_with_port(5353)?;
271    ///
272    /// // Use custom port for development (avoids macOS Bonjour conflict)
273    /// let daemon_dev = ServiceDaemon::new_with_port(5454)?;
274    /// # Ok::<(), mdns_sd::Error>(())
275    /// ```
276    pub fn new_with_port(port: u16) -> Result<Self> {
277        // Use port 0 to allow the system assign a random available port,
278        // no need for a pre-defined port number.
279        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280
281        let signal_sock = UdpSocket::bind(signal_addr)
282            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
283
284        // Get the socket with the OS chosen port
285        let signal_addr = signal_sock
286            .local_addr()
287            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
288
289        // Must be nonblocking so we can listen to it together with mDNS sockets.
290        signal_sock
291            .set_nonblocking(true)
292            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
293
294        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
295
296        let (sender, receiver) = bounded(100);
297
298        // Spawn the daemon thread
299        let mio_sock = MioUdpSocket::from_std(signal_sock);
300        let cmd_sender = sender.clone();
301        thread::Builder::new()
302            .name("mDNS_daemon".to_string())
303            .spawn(move || {
304                Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
305            })
306            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
307
308        Ok(Self {
309            sender,
310            signal_addr,
311        })
312    }
313
314    /// Sends `cmd` to the daemon via its channel, and sends a signal
315    /// to its sock addr to notify.
316    fn send_cmd(&self, cmd: Command) -> Result<()> {
317        let cmd_name = cmd.to_string();
318
319        // First, send to the flume channel.
320        self.sender.try_send(cmd).map_err(|e| match e {
321            TrySendError::Full(_) => Error::Again,
322            e => e_fmt!("flume::channel::send failed: {}", e),
323        })?;
324
325        // Second, send a signal to notify the daemon.
326        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
327        let socket = UdpSocket::bind(addr)
328            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
329        socket
330            .send_to(cmd_name.as_bytes(), self.signal_addr)
331            .map_err(|e| {
332                e_fmt!(
333                    "signal socket send_to {} ({}) failed: {}",
334                    self.signal_addr,
335                    cmd_name,
336                    e
337                )
338            })?;
339
340        Ok(())
341    }
342
343    /// Starts browsing for a specific service type.
344    ///
345    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
346    ///
347    /// Returns a channel `Receiver` to receive events about the service. The caller
348    /// can call `.recv_async().await` on this receiver to handle events in an
349    /// async environment or call `.recv()` in a sync environment.
350    ///
351    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
352    /// finding more details, i.e. SRV records and TXT records.
353    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
354        check_domain_suffix(service_type)?;
355
356        let (resp_s, resp_r) = bounded(10);
357        self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
358        Ok(resp_r)
359    }
360
361    /// Preforms a "cache-only" browse.
362    ///
363    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
364    ///
365    /// The functionality is identical to 'browse', but the service events are based solely on the contents
366    /// of the daemon's cache. No actual mDNS query is sent to the network.
367    ///
368    /// See [accept_unsolicited](Self::accept_unsolicited) if you want to do cache-only browsing.
369    pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
370        check_domain_suffix(service_type)?;
371
372        let (resp_s, resp_r) = bounded(10);
373        self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
374        Ok(resp_r)
375    }
376
377    /// Stops searching for a specific service type.
378    ///
379    /// When an error is returned, the caller should retry only when
380    /// the error is `Error::Again`, otherwise should log and move on.
381    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
382        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
383    }
384
385    /// Starts querying for the ip addresses of a hostname.
386    ///
387    /// Returns a channel `Receiver` to receive events about the hostname.
388    /// The caller can call `.recv_async().await` on this receiver to handle events in an
389    /// async environment or call `.recv()` in a sync environment.
390    ///
391    /// The `timeout` is specified in milliseconds.
392    pub fn resolve_hostname(
393        &self,
394        hostname: &str,
395        timeout: Option<u64>,
396    ) -> Result<Receiver<HostnameResolutionEvent>> {
397        check_hostname(hostname)?;
398        let (resp_s, resp_r) = bounded(10);
399        self.send_cmd(Command::ResolveHostname(
400            hostname.to_string(),
401            1,
402            resp_s,
403            timeout,
404        ))?;
405        Ok(resp_r)
406    }
407
408    /// Stops querying for the ip addresses of a hostname.
409    ///
410    /// When an error is returned, the caller should retry only when
411    /// the error is `Error::Again`, otherwise should log and move on.
412    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
413        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
414    }
415
416    /// Registers a service provided by this host.
417    ///
418    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
419    /// this method will automatically fill in addresses from the host.
420    ///
421    /// To re-announce a service with an updated `service_info`, just call
422    /// this `register` function again. No need to call `unregister` first.
423    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
424        check_service_name(service_info.get_fullname())?;
425        check_hostname(service_info.get_hostname())?;
426
427        self.send_cmd(Command::Register(service_info.into()))
428    }
429
430    /// Unregisters a service. This is a graceful shutdown of a service.
431    ///
432    /// Returns a channel receiver that is used to receive the status code
433    /// of the unregister.
434    ///
435    /// When an error is returned, the caller should retry only when
436    /// the error is `Error::Again`, otherwise should log and move on.
437    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
438        let (resp_s, resp_r) = bounded(1);
439        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
440        Ok(resp_r)
441    }
442
443    /// Starts to monitor events from the daemon.
444    ///
445    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
446    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
447        let (resp_s, resp_r) = bounded(100);
448        self.send_cmd(Command::Monitor(resp_s))?;
449        Ok(resp_r)
450    }
451
452    /// Shuts down the daemon thread and returns a channel to receive the status.
453    ///
454    /// When an error is returned, the caller should retry only when
455    /// the error is `Error::Again`, otherwise should log and move on.
456    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
457        let (resp_s, resp_r) = bounded(1);
458        self.send_cmd(Command::Exit(resp_s))?;
459        Ok(resp_r)
460    }
461
462    /// Returns the status of the daemon.
463    ///
464    /// When an error is returned, the caller should retry only when
465    /// the error is `Error::Again`, otherwise should consider the daemon
466    /// stopped working and move on.
467    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
468        let (resp_s, resp_r) = bounded(1);
469
470        if self.sender.is_disconnected() {
471            resp_s
472                .send(DaemonStatus::Shutdown)
473                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
474        } else {
475            self.send_cmd(Command::GetStatus(resp_s))?;
476        }
477
478        Ok(resp_r)
479    }
480
481    /// Returns a channel receiver for the metrics, e.g. input/output counters.
482    ///
483    /// The metrics returned is a snapshot. Hence the caller should call
484    /// this method repeatedly if they want to monitor the metrics continuously.
485    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
486        let (resp_s, resp_r) = bounded(1);
487        self.send_cmd(Command::GetMetrics(resp_s))?;
488        Ok(resp_r)
489    }
490
491    /// Change the max length allowed for a service name.
492    ///
493    /// As RFC 6763 defines a length max for a service name, a user should not call
494    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
495    ///
496    /// `len_max` is capped at an internal limit, which is currently 30.
497    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
498        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
499
500        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
501            return Err(Error::Msg(format!(
502                "service name length max {len_max} is too large"
503            )));
504        }
505
506        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
507    }
508
509    /// Change the interval for checking IP changes automatically.
510    ///
511    /// Setting the interval to 0 disables the IP check.
512    ///
513    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
514    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
515        let interval_in_millis = interval_in_secs as u64 * 1000;
516        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
517            interval_in_millis,
518        )))
519    }
520
521    /// Get the current interval in seconds for checking IP changes automatically.
522    pub fn get_ip_check_interval(&self) -> Result<u32> {
523        let (resp_s, resp_r) = bounded(1);
524        self.send_cmd(Command::GetOption(resp_s))?;
525
526        let option = resp_r
527            .recv_timeout(Duration::from_secs(10))
528            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
529        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
530        Ok(ip_check_interval_in_secs as u32)
531    }
532
533    /// Include interfaces that match `if_kind` for this service daemon.
534    ///
535    /// For example:
536    /// ```ignore
537    ///     daemon.enable_interface("en0")?;
538    /// ```
539    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
540        let if_kind_vec = if_kind.into_vec();
541        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
542            if_kind_vec.kinds,
543        )))
544    }
545
546    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
547    ///
548    /// For example:
549    /// ```ignore
550    ///     daemon.disable_interface(IfKind::IPv6)?;
551    /// ```
552    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
553        let if_kind_vec = if_kind.into_vec();
554        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
555            if_kind_vec.kinds,
556        )))
557    }
558
559    /// If `accept` is true, accept and cache all responses, even if there is no active querier
560    /// for a given service type. This is useful / necessary when doing cache-only browsing. See
561    /// [browse_cache](Self::browse_cache).
562    ///
563    /// If `accept` is false (default), accept only responses matching queries that we have initiated.
564    ///
565    /// For example:
566    /// ```ignore
567    ///     daemon.accept_unsolicited(true)?;
568    /// ```
569    pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
570        self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
571    }
572
573    /// Include or exclude Apple P2P interfaces, e.g. "awdl0", "llw0".
574    /// By default, they are excluded.
575    pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
576        self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
577    }
578
579    #[cfg(test)]
580    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
581        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
582            ifname.to_string(),
583        )))
584    }
585
586    #[cfg(test)]
587    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
588        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
589            ifname.to_string(),
590        )))
591    }
592
593    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
594    ///
595    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
596    /// receive announcements from a responder on the same host.
597    ///
598    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
599    ///
600    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
601    /// the UNIX version of the IP_MULTICAST_LOOP option:
602    ///
603    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
604    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
605    ///
606    /// Which means, in order NOT to receive localhost announcements, you want to call
607    /// this API on the querier side on Windows, but on the responder side on Unix.
608    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
609        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
610    }
611
612    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
613    ///
614    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
615    /// receive announcements from a responder on the same host.
616    ///
617    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
618    ///
619    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
620    /// the UNIX version of the IP_MULTICAST_LOOP option:
621    ///
622    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
623    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
624    ///
625    /// Which means, in order NOT to receive localhost announcements, you want to call
626    /// this API on the querier side on Windows, but on the responder side on Unix.
627    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
628        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
629    }
630
631    /// Proactively confirms whether a service instance still valid.
632    ///
633    /// This call will issue queries for a service instance's SRV record and Address records.
634    ///
635    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
636    /// unless there is a reason not to follow RFC.
637    ///
638    /// If no response is received within `timeout`, the current resource
639    /// records will be flushed, and if needed, `ServiceRemoved` event will be
640    /// sent to active queriers.
641    ///
642    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
643    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
644        self.send_cmd(Command::Verify(instance_fullname, timeout))
645    }
646
647    fn daemon_thread(
648        signal_sock: MioUdpSocket,
649        poller: Poll,
650        receiver: Receiver<Command>,
651        port: u16,
652        cmd_sender: Sender<Command>,
653        signal_addr: SocketAddr,
654    ) {
655        let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
656
657        if let Some(cmd) = zc.run(receiver) {
658            match cmd {
659                Command::Exit(resp_s) => {
660                    // It is guaranteed that the receiver already dropped,
661                    // i.e. the daemon command channel closed.
662                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
663                        debug!("exit: failed to send response of shutdown: {}", e);
664                    }
665                }
666                _ => {
667                    debug!("Unexpected command: {:?}", cmd);
668                }
669            }
670        }
671    }
672}
673
674/// Creates a new UDP socket that uses `intf` to send and recv multicast.
675fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
676    // Use the same socket for receiving and sending multicast packets.
677    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
678    let intf_ip = &intf.ip();
679    match intf_ip {
680        IpAddr::V4(ip) => {
681            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
682            let sock = new_socket(addr.into(), true)?;
683
684            // Join mDNS group to receive packets.
685            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
686                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
687
688            // Set IP_MULTICAST_IF to send packets.
689            sock.set_multicast_if_v4(ip)
690                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
691
692            // Per RFC 6762 section 11:
693            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
694            // be sent with IP TTL set to 255."
695            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
696            sock.set_multicast_ttl_v4(255)
697                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
698
699            if !should_loop {
700                sock.set_multicast_loop_v4(false)
701                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
702            }
703
704            // Test if we can send packets successfully.
705            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
706            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
707            for packet in test_packets {
708                sock.send_to(&packet, &multicast_addr)
709                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
710            }
711            MyUdpSocket::new(sock)
712                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
713        }
714        IpAddr::V6(ip) => {
715            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
716            let sock = new_socket(addr.into(), true)?;
717
718            let if_index = intf.index.unwrap_or(0);
719
720            // Join mDNS group to receive packets.
721            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
722                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
723
724            // Set IPV6_MULTICAST_IF to send packets.
725            sock.set_multicast_if_v6(if_index)
726                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
727
728            // We are not sending multicast packets to test this socket as there might
729            // be many IPv6 interfaces on a host and could cause such send error:
730            // "No buffer space available (os error 55)".
731
732            MyUdpSocket::new(sock)
733                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
734        }
735    }
736}
737
738/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
739/// `non_block` indicates whether to set O_NONBLOCK for the socket.
740fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
741    let domain = match addr {
742        SocketAddr::V4(_) => socket2::Domain::IPV4,
743        SocketAddr::V6(_) => socket2::Domain::IPV6,
744    };
745
746    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
747
748    fd.set_reuse_address(true)
749        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
750    #[cfg(unix)]
751    if let Err(e) = fd.set_reuse_port(true) {
752        debug!(
753            "SO_REUSEPORT is not supported, continuing without it: {}",
754            e
755        );
756    }
757
758    if non_block {
759        fd.set_nonblocking(true)
760            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
761    }
762
763    fd.bind(&addr.into())
764        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
765
766    trace!("new socket bind to {}", &addr);
767    Ok(fd)
768}
769
770/// Specify a UNIX timestamp in millis to run `command` for the next time.
771struct ReRun {
772    /// UNIX timestamp in millis.
773    next_time: u64,
774    command: Command,
775}
776
777/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
778///
779/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
780#[derive(Debug, Clone)]
781#[non_exhaustive]
782pub enum IfKind {
783    /// All interfaces.
784    All,
785
786    /// All IPv4 interfaces.
787    IPv4,
788
789    /// All IPv6 interfaces.
790    IPv6,
791
792    /// By the interface name, for example "en0"
793    Name(String),
794
795    /// By an IPv4 or IPv6 address.
796    /// This is used to look up the interface. The semantics is to identify an interface of
797    /// IPv4 or IPv6, not a specific address on the interface.
798    Addr(IpAddr),
799
800    /// 127.0.0.1 (or anything in 127.0.0.0/8), enabled by default.
801    ///
802    /// Loopback interfaces are required by some use cases (e.g., OSCQuery) for publishing.
803    LoopbackV4,
804
805    /// ::1/128, enabled by default.
806    LoopbackV6,
807
808    /// By interface index, IPv4 only.
809    IndexV4(u32),
810
811    /// By interface index, IPv6 only.
812    IndexV6(u32),
813}
814
815impl IfKind {
816    /// Checks if `intf` matches with this interface kind.
817    fn matches(&self, intf: &Interface) -> bool {
818        match self {
819            Self::All => true,
820            Self::IPv4 => intf.ip().is_ipv4(),
821            Self::IPv6 => intf.ip().is_ipv6(),
822            Self::Name(ifname) => ifname == &intf.name,
823            Self::Addr(addr) => addr == &intf.ip(),
824            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
825            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
826            Self::IndexV4(idx) => intf.index == Some(*idx) && intf.ip().is_ipv4(),
827            Self::IndexV6(idx) => intf.index == Some(*idx) && intf.ip().is_ipv6(),
828        }
829    }
830}
831
832/// The first use case of specifying an interface was to
833/// use an interface name. Hence adding this for ergonomic reasons.
834impl From<&str> for IfKind {
835    fn from(val: &str) -> Self {
836        Self::Name(val.to_string())
837    }
838}
839
840impl From<&String> for IfKind {
841    fn from(val: &String) -> Self {
842        Self::Name(val.to_string())
843    }
844}
845
846/// Still for ergonomic reasons.
847impl From<IpAddr> for IfKind {
848    fn from(val: IpAddr) -> Self {
849        Self::Addr(val)
850    }
851}
852
853/// A list of `IfKind` that can be used to match interfaces.
854pub struct IfKindVec {
855    kinds: Vec<IfKind>,
856}
857
858/// A trait that converts a type into a Vec of `IfKind`.
859pub trait IntoIfKindVec {
860    fn into_vec(self) -> IfKindVec;
861}
862
863impl<T: Into<IfKind>> IntoIfKindVec for T {
864    fn into_vec(self) -> IfKindVec {
865        let if_kind: IfKind = self.into();
866        IfKindVec {
867            kinds: vec![if_kind],
868        }
869    }
870}
871
872impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
873    fn into_vec(self) -> IfKindVec {
874        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
875        IfKindVec { kinds }
876    }
877}
878
879/// Selection of interfaces.
880struct IfSelection {
881    /// The interfaces to be selected.
882    if_kind: IfKind,
883
884    /// Whether the `if_kind` should be enabled or not.
885    selected: bool,
886}
887
888/// A struct holding the state. It was inspired by `zeroconf` package in Python.
889struct Zeroconf {
890    /// The mDNS port number to use for socket binding.
891    /// Typically MDNS_PORT (5353), but can be customized for development/testing.
892    port: u16,
893
894    /// Local interfaces keyed by interface index.
895    my_intfs: HashMap<u32, MyIntf>,
896
897    /// A common socket for IPv4 interfaces. It's None if IPv4 is disabled in OS kernel.
898    ipv4_sock: Option<MyUdpSocket>,
899
900    /// A common socket for IPv6 interfaces. It's None if IPv6 is disabled in OS kernel.
901    ipv6_sock: Option<MyUdpSocket>,
902
903    /// Local registered services, keyed by service full names.
904    my_services: HashMap<String, ServiceInfo>,
905
906    /// Received DNS records.
907    cache: DnsCache,
908
909    /// Registered service records, keyed by interface index.
910    dns_registry_map: HashMap<u32, DnsRegistry>,
911
912    /// Active "Browse" commands.
913    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
914
915    /// Active "ResolveHostname" commands.
916    ///
917    /// The timestamps are set at the future timestamp when the command should timeout.
918    /// `hostname` is case-insensitive and stored in lowercase.
919    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
920
921    /// All repeating transmissions.
922    retransmissions: Vec<ReRun>,
923
924    counters: Metrics,
925
926    /// Waits for incoming packets.
927    poller: Poll,
928
929    /// Channels to notify events.
930    monitors: Vec<Sender<DaemonEvent>>,
931
932    /// Options
933    service_name_len_max: u8,
934
935    /// Interval in millis to check IP address changes.
936    ip_check_interval: u64,
937
938    /// All interface selections called to the daemon.
939    if_selections: Vec<IfSelection>,
940
941    /// Socket for signaling.
942    signal_sock: MioUdpSocket,
943
944    /// Timestamps marking where we need another iteration of the run loop,
945    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
946    ///
947    /// When the run loop goes through a single iteration, it will
948    /// set its timeout to the earliest timer in this list.
949    timers: BinaryHeap<Reverse<u64>>,
950
951    status: DaemonStatus,
952
953    /// Service instances that are pending for resolving SRV and TXT.
954    pending_resolves: HashSet<String>,
955
956    /// Service instances that are already resolved.
957    resolved: HashSet<String>,
958
959    multicast_loop_v4: bool,
960
961    multicast_loop_v6: bool,
962
963    accept_unsolicited: bool,
964
965    include_apple_p2p: bool,
966
967    cmd_sender: Sender<Command>,
968
969    signal_addr: SocketAddr,
970
971    #[cfg(test)]
972    test_down_interfaces: HashSet<String>,
973}
974
975/// Join the multicast group for the given interface.
976fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
977    let intf_ip = &intf.ip();
978    match intf_ip {
979        IpAddr::V4(ip) => {
980            // Join mDNS group to receive packets.
981            debug!("join multicast group V4 on {} addr {ip}", intf.name);
982            my_sock
983                .join_multicast_v4(&GROUP_ADDR_V4, ip)
984                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
985        }
986        IpAddr::V6(ip) => {
987            let if_index = intf.index.unwrap_or(0);
988            // Join mDNS group to receive packets.
989            debug!(
990                "join multicast group V6 on {} addr {ip} with index {if_index}",
991                intf.name
992            );
993            my_sock
994                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
995                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
996        }
997    }
998    Ok(())
999}
1000
1001impl Zeroconf {
1002    fn new(
1003        signal_sock: MioUdpSocket,
1004        poller: Poll,
1005        port: u16,
1006        cmd_sender: Sender<Command>,
1007        signal_addr: SocketAddr,
1008    ) -> Self {
1009        // Get interfaces.
1010        let my_ifaddrs = my_ip_interfaces(true);
1011
1012        // Create a socket for every IP addr.
1013        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
1014        // or the same interface name with different IP addrs.
1015        let mut my_intfs = HashMap::new();
1016        let mut dns_registry_map = HashMap::new();
1017
1018        // Use the same socket for receiving and sending multicast packets.
1019        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
1020        let mut ipv4_sock = None;
1021        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1022        match new_socket(addr.into(), true) {
1023            Ok(sock) => {
1024                // Per RFC 6762 section 11:
1025                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1026                // be sent with IP TTL set to 255."
1027                // Here we set the TTL to 255 for multicast as we don't support unicast yet.
1028                sock.set_multicast_ttl_v4(255)
1029                    .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1030                    .ok();
1031
1032                // This clones a socket.
1033                ipv4_sock = match MyUdpSocket::new(sock) {
1034                    Ok(s) => Some(s),
1035                    Err(e) => {
1036                        debug!("failed to create IPv4 MyUdpSocket: {e}");
1037                        None
1038                    }
1039                };
1040            }
1041            // Per RFC 6762 section 11:}
1042            Err(e) => debug!("failed to create IPv4 socket: {e}"),
1043        }
1044
1045        let mut ipv6_sock = None;
1046        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1047        match new_socket(addr.into(), true) {
1048            Ok(sock) => {
1049                // Per RFC 6762 section 11:
1050                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1051                // be sent with IP TTL set to 255."
1052                sock.set_multicast_hops_v6(255)
1053                    .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1054                    .ok();
1055
1056                // This clones the ipv6 socket.
1057                ipv6_sock = match MyUdpSocket::new(sock) {
1058                    Ok(s) => Some(s),
1059                    Err(e) => {
1060                        debug!("failed to create IPv6 MyUdpSocket: {e}");
1061                        None
1062                    }
1063                };
1064            }
1065            Err(e) => debug!("failed to create IPv6 socket: {e}"),
1066        }
1067
1068        // Configure sockets to join multicast groups.
1069        for intf in my_ifaddrs {
1070            let sock_opt = if intf.ip().is_ipv4() {
1071                &ipv4_sock
1072            } else {
1073                &ipv6_sock
1074            };
1075            let Some(sock) = sock_opt else {
1076                debug!(
1077                    "no socket available for interface {} with addr {}. Skipped.",
1078                    intf.name,
1079                    intf.ip()
1080                );
1081                continue;
1082            };
1083
1084            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1085                debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1086            }
1087
1088            let if_index = intf.index.unwrap_or(0);
1089
1090            // Add this interface address if not already present.
1091            dns_registry_map
1092                .entry(if_index)
1093                .or_insert_with(DnsRegistry::new);
1094
1095            my_intfs
1096                .entry(if_index)
1097                .and_modify(|v: &mut MyIntf| {
1098                    v.addrs.insert(intf.addr.clone());
1099                })
1100                .or_insert(MyIntf {
1101                    name: intf.name.clone(),
1102                    index: if_index,
1103                    addrs: HashSet::from([intf.addr]),
1104                });
1105        }
1106
1107        let monitors = Vec::new();
1108        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1109        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1110
1111        let timers = BinaryHeap::new();
1112
1113        // Enable everything, including loopback interfaces.
1114        let if_selections = vec![];
1115
1116        let status = DaemonStatus::Running;
1117
1118        Self {
1119            port,
1120            my_intfs,
1121            ipv4_sock,
1122            ipv6_sock,
1123            my_services: HashMap::new(),
1124            cache: DnsCache::new(),
1125            dns_registry_map,
1126            hostname_resolvers: HashMap::new(),
1127            service_queriers: HashMap::new(),
1128            retransmissions: Vec::new(),
1129            counters: HashMap::new(),
1130            poller,
1131            monitors,
1132            service_name_len_max,
1133            ip_check_interval,
1134            if_selections,
1135            signal_sock,
1136            timers,
1137            status,
1138            pending_resolves: HashSet::new(),
1139            resolved: HashSet::new(),
1140            multicast_loop_v4: true,
1141            multicast_loop_v6: true,
1142            accept_unsolicited: false,
1143            include_apple_p2p: false,
1144            cmd_sender,
1145            signal_addr,
1146
1147            #[cfg(test)]
1148            test_down_interfaces: HashSet::new(),
1149        }
1150    }
1151
1152    /// Send a Command into the daemon channel and poke the signal socket to wake the poll loop.
1153    fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1154        let cmd_name = cmd.to_string();
1155
1156        self.cmd_sender.try_send(cmd).map_err(|e| match e {
1157            TrySendError::Full(_) => Error::Again,
1158            e => e_fmt!("flume::channel::send failed: {}", e),
1159        })?;
1160
1161        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1162        let socket = UdpSocket::bind(addr)
1163            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1164        socket
1165            .send_to(cmd_name.as_bytes(), self.signal_addr)
1166            .map_err(|e| {
1167                e_fmt!(
1168                    "signal socket send_to {} ({}) failed: {}",
1169                    self.signal_addr,
1170                    cmd_name,
1171                    e
1172                )
1173            })?;
1174
1175        Ok(())
1176    }
1177
1178    /// Clean up all resources before shutdown.
1179    ///
1180    /// This method:
1181    /// 1. Unregisters all registered services (sends goodbye packets)
1182    /// 2. Stops all active browse operations
1183    /// 3. Stops all active hostname resolution operations
1184    /// 4. Clears all retransmissions
1185    fn cleanup(&mut self) {
1186        debug!("Starting cleanup for shutdown");
1187
1188        // 1. Unregister all services - send goodbye packets
1189        let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1190        for fullname in service_names {
1191            if let Some(info) = self.my_services.get(&fullname) {
1192                debug!("Unregistering service during shutdown: {}", &fullname);
1193
1194                for intf in self.my_intfs.values() {
1195                    if let Some(sock) = self.ipv4_sock.as_ref() {
1196                        self.unregister_service(info, intf, &sock.pktinfo);
1197                    }
1198
1199                    if let Some(sock) = self.ipv6_sock.as_ref() {
1200                        self.unregister_service(info, intf, &sock.pktinfo);
1201                    }
1202                }
1203            }
1204        }
1205        self.my_services.clear();
1206
1207        // 2. Stop all browse operations
1208        let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1209        for ty_domain in browse_types {
1210            debug!("Stopping browse during shutdown: {}", &ty_domain);
1211            if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1212                // Notify the client
1213                if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1214                    debug!("Failed to send SearchStopped during shutdown: {}", e);
1215                }
1216            }
1217        }
1218
1219        // 3. Stop all hostname resolution operations
1220        let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1221        for hostname in hostnames {
1222            debug!(
1223                "Stopping hostname resolution during shutdown: {}",
1224                &hostname
1225            );
1226            if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1227                // Notify the client
1228                if let Err(e) =
1229                    sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1230                {
1231                    debug!(
1232                        "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1233                        e
1234                    );
1235                }
1236            }
1237        }
1238
1239        // 4. Clear all retransmissions
1240        self.retransmissions.clear();
1241
1242        debug!("Cleanup completed");
1243    }
1244
1245    /// The main event loop of the daemon thread
1246    ///
1247    /// In each round, it will:
1248    /// 1. select the listening sockets with a timeout.
1249    /// 2. process the incoming packets if any.
1250    /// 3. try_recv on its channel and execute commands.
1251    /// 4. announce its registered services.
1252    /// 5. process retransmissions if any.
1253    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1254        // Add the daemon's signal socket to the poller.
1255        if let Err(e) = self.poller.registry().register(
1256            &mut self.signal_sock,
1257            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1258            mio::Interest::READABLE,
1259        ) {
1260            debug!("failed to add signal socket to the poller: {}", e);
1261            return None;
1262        }
1263
1264        if let Some(sock) = self.ipv4_sock.as_mut() {
1265            if let Err(e) = self.poller.registry().register(
1266                sock,
1267                mio::Token(IPV4_SOCK_EVENT_KEY),
1268                mio::Interest::READABLE,
1269            ) {
1270                debug!("failed to register ipv4 socket: {}", e);
1271                return None;
1272            }
1273        }
1274
1275        if let Some(sock) = self.ipv6_sock.as_mut() {
1276            if let Err(e) = self.poller.registry().register(
1277                sock,
1278                mio::Token(IPV6_SOCK_EVENT_KEY),
1279                mio::Interest::READABLE,
1280            ) {
1281                debug!("failed to register ipv6 socket: {}", e);
1282                return None;
1283            }
1284        }
1285
1286        // Setup timer for IP checks.
1287        let mut next_ip_check = if self.ip_check_interval > 0 {
1288            current_time_millis() + self.ip_check_interval
1289        } else {
1290            0
1291        };
1292
1293        if next_ip_check > 0 {
1294            self.add_timer(next_ip_check);
1295        }
1296
1297        // Start the run loop.
1298
1299        let mut events = mio::Events::with_capacity(1024);
1300        loop {
1301            let now = current_time_millis();
1302
1303            let earliest_timer = self.peek_earliest_timer();
1304            let timeout = earliest_timer.map(|timer| {
1305                // If `timer` already passed, set `timeout` to be 1ms.
1306                let millis = if timer > now { timer - now } else { 1 };
1307                Duration::from_millis(millis)
1308            });
1309
1310            // Process incoming packets, command events and optional timeout.
1311            events.clear();
1312            match self.poller.poll(&mut events, timeout) {
1313                Ok(_) => self.handle_poller_events(&events),
1314                Err(e) => debug!("failed to select from sockets: {}", e),
1315            }
1316
1317            let now = current_time_millis();
1318
1319            // Remove the timers if already passed.
1320            self.pop_timers_till(now);
1321
1322            // Remove hostname resolvers with expired timeouts.
1323            for hostname in self
1324                .hostname_resolvers
1325                .clone()
1326                .into_iter()
1327                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1328                .map(|(hostname, _)| hostname)
1329            {
1330                trace!("hostname resolver timeout for {}", &hostname);
1331                call_hostname_resolution_listener(
1332                    &self.hostname_resolvers,
1333                    &hostname,
1334                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1335                );
1336                call_hostname_resolution_listener(
1337                    &self.hostname_resolvers,
1338                    &hostname,
1339                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1340                );
1341                self.hostname_resolvers.remove(&hostname);
1342            }
1343
1344            // process commands from the command channel
1345            while let Ok(command) = receiver.try_recv() {
1346                if matches!(command, Command::Exit(_)) {
1347                    debug!("Exit command received, performing cleanup");
1348                    self.cleanup();
1349                    self.status = DaemonStatus::Shutdown;
1350                    return Some(command);
1351                }
1352                self.exec_command(command, false);
1353            }
1354
1355            // check for repeated commands and run them if their time is up.
1356            let mut i = 0;
1357            while i < self.retransmissions.len() {
1358                if now >= self.retransmissions[i].next_time {
1359                    let rerun = self.retransmissions.remove(i);
1360                    self.exec_command(rerun.command, true);
1361                } else {
1362                    i += 1;
1363                }
1364            }
1365
1366            // Refresh cached service records with active queriers
1367            self.refresh_active_services();
1368
1369            // Refresh cached A/AAAA records with active queriers
1370            let mut query_count = 0;
1371            for (hostname, _sender) in self.hostname_resolvers.iter() {
1372                for (hostname, ip_addr) in
1373                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1374                {
1375                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1376                    query_count += 1;
1377                }
1378            }
1379
1380            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1381
1382            // check and evict expired records in our cache
1383            let now = current_time_millis();
1384
1385            // Notify service listeners about the expired records.
1386            let expired_services = self.cache.evict_expired_services(now);
1387            if !expired_services.is_empty() {
1388                debug!(
1389                    "run: send {} service removal to listeners",
1390                    expired_services.len()
1391                );
1392                self.notify_service_removal(expired_services);
1393            }
1394
1395            // Notify hostname listeners about the expired records.
1396            let expired_addrs = self.cache.evict_expired_addr(now);
1397            for (hostname, addrs) in expired_addrs {
1398                call_hostname_resolution_listener(
1399                    &self.hostname_resolvers,
1400                    &hostname,
1401                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1402                );
1403                let instances = self.cache.get_instances_on_host(&hostname);
1404                let instance_set: HashSet<String> = instances.into_iter().collect();
1405                self.resolve_updated_instances(&instance_set);
1406            }
1407
1408            // Send out probing queries.
1409            self.probing_handler();
1410
1411            // check IP changes if next_ip_check is reached.
1412            if now >= next_ip_check && next_ip_check > 0 {
1413                next_ip_check = now + self.ip_check_interval;
1414                self.add_timer(next_ip_check);
1415
1416                self.check_ip_changes();
1417            }
1418        }
1419    }
1420
1421    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1422        match daemon_opt {
1423            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1424            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1425            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1426            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1427            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1428            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1429            DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1430            DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1431            #[cfg(test)]
1432            DaemonOption::TestDownInterface(ifname) => {
1433                self.test_down_interfaces.insert(ifname);
1434            }
1435            #[cfg(test)]
1436            DaemonOption::TestUpInterface(ifname) => {
1437                self.test_down_interfaces.remove(&ifname);
1438            }
1439        }
1440    }
1441
1442    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1443        debug!("enable_interface: {:?}", kinds);
1444        let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1445
1446        for if_kind in kinds {
1447            self.if_selections.push(IfSelection {
1448                if_kind: resolve_addr_to_index(if_kind, &interfaces),
1449                selected: true,
1450            });
1451        }
1452
1453        self.apply_intf_selections(interfaces);
1454    }
1455
1456    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1457        debug!("disable_interface: {:?}", kinds);
1458        let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1459
1460        for if_kind in kinds {
1461            self.if_selections.push(IfSelection {
1462                if_kind: resolve_addr_to_index(if_kind, &interfaces),
1463                selected: false,
1464            });
1465        }
1466
1467        self.apply_intf_selections(interfaces);
1468    }
1469
1470    fn set_multicast_loop_v4(&mut self, on: bool) {
1471        let Some(sock) = self.ipv4_sock.as_mut() else {
1472            return;
1473        };
1474        self.multicast_loop_v4 = on;
1475        sock.pktinfo
1476            .set_multicast_loop_v4(on)
1477            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1478            .unwrap();
1479    }
1480
1481    fn set_multicast_loop_v6(&mut self, on: bool) {
1482        let Some(sock) = self.ipv6_sock.as_mut() else {
1483            return;
1484        };
1485        self.multicast_loop_v6 = on;
1486        sock.pktinfo
1487            .set_multicast_loop_v6(on)
1488            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1489            .unwrap();
1490    }
1491
1492    fn set_accept_unsolicited(&mut self, accept: bool) {
1493        self.accept_unsolicited = accept;
1494    }
1495
1496    fn set_apple_p2p(&mut self, include: bool) {
1497        if self.include_apple_p2p != include {
1498            self.include_apple_p2p = include;
1499            self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1500        }
1501    }
1502
1503    fn notify_monitors(&mut self, event: DaemonEvent) {
1504        // Only retain the monitors that are still connected.
1505        self.monitors.retain(|sender| {
1506            if let Err(e) = sender.try_send(event.clone()) {
1507                debug!("notify_monitors: try_send: {}", &e);
1508                if matches!(e, TrySendError::Disconnected(_)) {
1509                    return false; // This monitor is dropped.
1510                }
1511            }
1512            true
1513        });
1514    }
1515
1516    /// Remove `addr` in my services that enabled `addr_auto`.
1517    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1518        for (_, service_info) in self.my_services.iter_mut() {
1519            if service_info.is_addr_auto() {
1520                service_info.remove_ipaddr(addr);
1521            }
1522        }
1523    }
1524
1525    fn add_timer(&mut self, next_time: u64) {
1526        self.timers.push(Reverse(next_time));
1527    }
1528
1529    fn peek_earliest_timer(&self) -> Option<u64> {
1530        self.timers.peek().map(|Reverse(v)| *v)
1531    }
1532
1533    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1534        self.timers.pop().map(|Reverse(v)| v)
1535    }
1536
1537    /// Pop all timers that are already passed till `now`.
1538    fn pop_timers_till(&mut self, now: u64) {
1539        while let Some(Reverse(v)) = self.timers.peek() {
1540            if *v > now {
1541                break;
1542            }
1543            self.timers.pop();
1544        }
1545    }
1546
1547    /// Apply all selections to `interfaces` and return the selected addresses.
1548    fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1549        let intf_count = interfaces.len();
1550        let mut intf_selections = vec![true; intf_count];
1551
1552        // apply if_selections
1553        for selection in self.if_selections.iter() {
1554            // Mark the interfaces for this selection.
1555            for i in 0..intf_count {
1556                if selection.if_kind.matches(&interfaces[i]) {
1557                    intf_selections[i] = selection.selected;
1558                }
1559            }
1560        }
1561
1562        let mut selected_addrs = HashSet::new();
1563        for i in 0..intf_count {
1564            if intf_selections[i] {
1565                selected_addrs.insert(interfaces[i].clone());
1566            }
1567        }
1568
1569        selected_addrs
1570    }
1571
1572    /// Apply all selections to `interfaces`.
1573    ///
1574    /// For any interface, add it if selected but not bound yet,
1575    /// delete it if not selected but still bound.
1576    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1577        // By default, we enable all interfaces.
1578        let intf_count = interfaces.len();
1579        let mut intf_selections = vec![true; intf_count];
1580
1581        // apply if_selections
1582        for selection in self.if_selections.iter() {
1583            // Mark the interfaces for this selection.
1584            for i in 0..intf_count {
1585                if selection.if_kind.matches(&interfaces[i]) {
1586                    intf_selections[i] = selection.selected;
1587                }
1588            }
1589        }
1590
1591        // Update `my_intfs` based on the selections.
1592        for (idx, intf) in interfaces.into_iter().enumerate() {
1593            if intf_selections[idx] {
1594                // Add the interface
1595                self.add_interface(intf);
1596            } else {
1597                // Remove the interface
1598                self.del_interface_addr(&intf);
1599            }
1600        }
1601    }
1602
1603    fn del_ip(&mut self, ip: IpAddr) {
1604        self.del_addr_in_my_services(&ip);
1605        self.notify_monitors(DaemonEvent::IpDel(ip));
1606    }
1607
1608    /// Check for IP changes and update [my_intfs] as needed.
1609    fn check_ip_changes(&mut self) {
1610        // Get the current interfaces.
1611        let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1612
1613        #[cfg(test)]
1614        let my_ifaddrs: Vec<_> = my_ifaddrs
1615            .into_iter()
1616            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1617            .collect();
1618
1619        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1620            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1621                let if_index = intf.index.unwrap_or(0);
1622                acc.entry(if_index).or_default().push(&intf.addr);
1623                acc
1624            });
1625
1626        let mut deleted_intfs = Vec::new();
1627        let mut deleted_ips = Vec::new();
1628
1629        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1630            let mut last_ipv4 = None;
1631            let mut last_ipv6 = None;
1632
1633            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1634                my_intf.addrs.retain(|addr| {
1635                    if current_addrs.contains(&addr) {
1636                        true
1637                    } else {
1638                        match addr.ip() {
1639                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1640                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1641                        }
1642                        deleted_ips.push(addr.ip());
1643                        false
1644                    }
1645                });
1646                if my_intf.addrs.is_empty() {
1647                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1648                }
1649            } else {
1650                // If it does not exist, remove the interface.
1651                debug!(
1652                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1653                    my_intf.name, if_index
1654                );
1655                for addr in my_intf.addrs.iter() {
1656                    match addr.ip() {
1657                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1658                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1659                    }
1660                    deleted_ips.push(addr.ip())
1661                }
1662                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1663            }
1664        }
1665
1666        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1667            debug!(
1668                "check_ip_changes: {} deleted ips {} deleted intfs",
1669                deleted_ips.len(),
1670                deleted_intfs.len()
1671            );
1672        }
1673
1674        for ip in deleted_ips {
1675            self.del_ip(ip);
1676        }
1677
1678        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1679            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1680                continue;
1681            };
1682
1683            if let Some(ipv4) = last_ipv4 {
1684                debug!("leave multicast for {ipv4}");
1685                if let Some(sock) = self.ipv4_sock.as_mut() {
1686                    if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1687                        debug!("leave multicast group for addr {ipv4}: {e}");
1688                    }
1689                }
1690            }
1691
1692            if let Some(ipv6) = last_ipv6 {
1693                debug!("leave multicast for {ipv6}");
1694                if let Some(sock) = self.ipv6_sock.as_mut() {
1695                    if let Err(e) = sock
1696                        .pktinfo
1697                        .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1698                    {
1699                        debug!("leave multicast group for IPv6: {ipv6}: {e}");
1700                    }
1701                }
1702            }
1703
1704            // Remove cache records for this interface.
1705            let intf_id = InterfaceId {
1706                name: my_intf.name.to_string(),
1707                index: my_intf.index,
1708            };
1709            let removed_instances = self.cache.remove_records_on_intf(intf_id);
1710            self.notify_service_removal(removed_instances);
1711        }
1712
1713        // Add newly found interfaces only if in our selections.
1714        self.apply_intf_selections(my_ifaddrs);
1715    }
1716
1717    /// Remove an interface address when it was down, disabled or removed from the system.
1718    /// If no more addresses on the interface, remove the interface as well.
1719    fn del_interface_addr(&mut self, intf: &Interface) {
1720        let if_index = intf.index.unwrap_or(0);
1721        debug!(
1722            "del_interface_addr: {} ({if_index}) addr {}",
1723            intf.name,
1724            intf.ip()
1725        );
1726
1727        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1728            debug!("del_interface_addr: interface {} not found", intf.name);
1729            return;
1730        };
1731
1732        let mut ip_removed = false;
1733
1734        if my_intf.addrs.remove(&intf.addr) {
1735            ip_removed = true;
1736
1737            match intf.addr.ip() {
1738                IpAddr::V4(ipv4) => {
1739                    if my_intf.next_ifaddr_v4().is_none() {
1740                        if let Some(sock) = self.ipv4_sock.as_mut() {
1741                            if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1742                                debug!("leave multicast group for addr {ipv4}: {e}");
1743                            } else {
1744                                debug!("leave multicast for {ipv4}");
1745                            }
1746                        }
1747                    }
1748                }
1749
1750                IpAddr::V6(ipv6) => {
1751                    if my_intf.next_ifaddr_v6().is_none() {
1752                        if let Some(sock) = self.ipv6_sock.as_mut() {
1753                            if let Err(e) =
1754                                sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1755                            {
1756                                debug!("leave multicast group for addr {ipv6}: {e}");
1757                            }
1758                        }
1759                    }
1760                }
1761            }
1762
1763            if my_intf.addrs.is_empty() {
1764                // If no more addresses, remove the interface.
1765                debug!("del_interface_addr: removing interface {}", intf.name);
1766                self.my_intfs.remove(&if_index);
1767                self.dns_registry_map.remove(&if_index);
1768                self.cache
1769                    .remove_addrs_on_disabled_intf(if_index, IpType::BOTH);
1770            } else {
1771                // Interface still has addresses of the other IP version.
1772                // Remove cached address records for the disabled IP version
1773                // only if no more addresses of that version remain.
1774                let is_v4 = intf.addr.ip().is_ipv4();
1775                let version_gone = if is_v4 {
1776                    my_intf.next_ifaddr_v4().is_none()
1777                } else {
1778                    my_intf.next_ifaddr_v6().is_none()
1779                };
1780                if version_gone {
1781                    let ip_type = if is_v4 { IpType::V4 } else { IpType::V6 };
1782                    self.cache.remove_addrs_on_disabled_intf(if_index, ip_type);
1783                }
1784            }
1785        }
1786
1787        if ip_removed {
1788            // Notify the monitors.
1789            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1790            // Remove the interface from my services that enabled `addr_auto`.
1791            self.del_addr_in_my_services(&intf.ip());
1792        }
1793    }
1794
1795    fn add_interface(&mut self, intf: Interface) {
1796        let sock_opt = if intf.ip().is_ipv4() {
1797            &self.ipv4_sock
1798        } else {
1799            &self.ipv6_sock
1800        };
1801
1802        let Some(sock) = sock_opt else {
1803            debug!(
1804                "add_interface: no socket available for interface {} with addr {}. Skipped.",
1805                intf.name,
1806                intf.ip()
1807            );
1808            return;
1809        };
1810
1811        let if_index = intf.index.unwrap_or(0);
1812        let mut new_addr = false;
1813
1814        match self.my_intfs.entry(if_index) {
1815            Entry::Occupied(mut entry) => {
1816                // If intf has a new address, add it to the existing interface.
1817                let my_intf = entry.get_mut();
1818                if !my_intf.addrs.contains(&intf.addr) {
1819                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1820                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1821                    }
1822                    my_intf.addrs.insert(intf.addr.clone());
1823                    new_addr = true;
1824                }
1825            }
1826            Entry::Vacant(entry) => {
1827                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1828                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1829                    return;
1830                }
1831
1832                new_addr = true;
1833                let new_intf = MyIntf {
1834                    name: intf.name.clone(),
1835                    index: if_index,
1836                    addrs: HashSet::from([intf.addr.clone()]),
1837                };
1838                entry.insert(new_intf);
1839            }
1840        }
1841
1842        if !new_addr {
1843            trace!("add_interface: interface {} already exists", &intf.name);
1844            return;
1845        }
1846
1847        debug!("add new interface {}: {}", intf.name, intf.ip());
1848
1849        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1850            debug!("add_interface: cannot find if_index {if_index}");
1851            return;
1852        };
1853
1854        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1855            Some(registry) => registry,
1856            None => self
1857                .dns_registry_map
1858                .entry(if_index)
1859                .or_insert_with(DnsRegistry::new),
1860        };
1861
1862        for (_, service_info) in self.my_services.iter_mut() {
1863            if service_info.is_addr_auto() {
1864                service_info.insert_ipaddr(&intf);
1865
1866                if let Ok(true) = announce_service_on_intf(
1867                    dns_registry,
1868                    service_info,
1869                    my_intf,
1870                    &sock.pktinfo,
1871                    self.port,
1872                ) {
1873                    debug!(
1874                        "Announce service {} on {}",
1875                        service_info.get_fullname(),
1876                        intf.ip()
1877                    );
1878                    service_info.set_status(if_index, ServiceStatus::Announced);
1879                } else {
1880                    for timer in dns_registry.new_timers.drain(..) {
1881                        self.timers.push(Reverse(timer));
1882                    }
1883                    service_info.set_status(if_index, ServiceStatus::Probing);
1884                }
1885            }
1886        }
1887
1888        // As we added a new interface, we want to execute all active "Browse" reruns now.
1889        let mut browse_reruns = Vec::new();
1890        let mut i = 0;
1891        while i < self.retransmissions.len() {
1892            if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1893                browse_reruns.push(self.retransmissions.remove(i));
1894            } else {
1895                i += 1;
1896            }
1897        }
1898
1899        for rerun in browse_reruns {
1900            self.exec_command(rerun.command, true);
1901        }
1902
1903        // Notify the monitors.
1904        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1905    }
1906
1907    /// Registers a service.
1908    ///
1909    /// RFC 6762 section 8.3.
1910    /// ...the Multicast DNS responder MUST send
1911    ///    an unsolicited Multicast DNS response containing, in the Answer
1912    ///    Section, all of its newly registered resource records
1913    ///
1914    /// Zeroconf will then respond to requests for information about this service.
1915    fn register_service(&mut self, mut info: ServiceInfo) {
1916        // Check the service name length.
1917        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1918            error!("check_service_name_length: {}", &e);
1919            self.notify_monitors(DaemonEvent::Error(e));
1920            return;
1921        }
1922
1923        if info.is_addr_auto() {
1924            let selected_intfs =
1925                self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
1926            for intf in selected_intfs {
1927                info.insert_ipaddr(&intf);
1928            }
1929        }
1930
1931        debug!("register service {:?}", &info);
1932
1933        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1934        if !outgoing_addrs.is_empty() {
1935            self.notify_monitors(DaemonEvent::Announce(
1936                info.get_fullname().to_string(),
1937                format!("{:?}", &outgoing_addrs),
1938            ));
1939        }
1940
1941        // The key has to be lower case letter as DNS record name is case insensitive.
1942        // The info will have the original name.
1943        let service_fullname = info.get_fullname().to_lowercase();
1944        self.my_services.insert(service_fullname, info);
1945    }
1946
1947    /// Sends out announcement of `info` on every valid interface.
1948    /// Returns the list of interface IPs that sent out the announcement.
1949    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1950        let mut outgoing_addrs = Vec::new();
1951        let mut outgoing_intfs = HashSet::new();
1952
1953        let mut invalid_intf_addrs = HashSet::new();
1954
1955        for (if_index, intf) in self.my_intfs.iter() {
1956            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1957                Some(registry) => registry,
1958                None => self
1959                    .dns_registry_map
1960                    .entry(*if_index)
1961                    .or_insert_with(DnsRegistry::new),
1962            };
1963
1964            let mut announced = false;
1965
1966            // IPv4
1967            if let Some(sock) = self.ipv4_sock.as_mut() {
1968                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1969                    Ok(true) => {
1970                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1971                            outgoing_addrs.push(addr.ip());
1972                        }
1973                        outgoing_intfs.insert(intf.index);
1974
1975                        debug!(
1976                            "Announce service IPv4 {} on {}",
1977                            info.get_fullname(),
1978                            intf.name
1979                        );
1980                        announced = true;
1981                    }
1982                    Ok(false) => {}
1983                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
1984                        invalid_intf_addrs.insert(intf_addr);
1985                    }
1986                }
1987            }
1988
1989            if let Some(sock) = self.ipv6_sock.as_mut() {
1990                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1991                    Ok(true) => {
1992                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1993                            outgoing_addrs.push(addr.ip());
1994                        }
1995                        outgoing_intfs.insert(intf.index);
1996
1997                        debug!(
1998                            "Announce service IPv6 {} on {}",
1999                            info.get_fullname(),
2000                            intf.name
2001                        );
2002                        announced = true;
2003                    }
2004                    Ok(false) => {}
2005                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2006                        invalid_intf_addrs.insert(intf_addr);
2007                    }
2008                }
2009            }
2010
2011            if announced {
2012                info.set_status(intf.index, ServiceStatus::Announced);
2013            } else {
2014                for timer in dns_registry.new_timers.drain(..) {
2015                    self.timers.push(Reverse(timer));
2016                }
2017                info.set_status(*if_index, ServiceStatus::Probing);
2018            }
2019        }
2020
2021        if !invalid_intf_addrs.is_empty() {
2022            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2023        }
2024
2025        // RFC 6762 section 8.3.
2026        // ..The Multicast DNS responder MUST send at least two unsolicited
2027        //    responses, one second apart.
2028        let next_time = current_time_millis() + 1000;
2029        for if_index in outgoing_intfs {
2030            self.add_retransmission(
2031                next_time,
2032                Command::RegisterResend(info.get_fullname().to_string(), if_index),
2033            );
2034        }
2035
2036        outgoing_addrs
2037    }
2038
2039    /// Send probings or finish them if expired. Notify waiting services.
2040    fn probing_handler(&mut self) {
2041        let now = current_time_millis();
2042        let mut invalid_intf_addrs = HashSet::new();
2043
2044        for (if_index, intf) in self.my_intfs.iter() {
2045            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2046                continue;
2047            };
2048
2049            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2050
2051            // send probing.
2052            if !out.questions().is_empty() {
2053                trace!("sending out probing of questions: {:?}", out.questions());
2054                if let Some(sock) = self.ipv4_sock.as_mut() {
2055                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2056                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2057                    {
2058                        invalid_intf_addrs.insert(intf_addr);
2059                    }
2060                }
2061                if let Some(sock) = self.ipv6_sock.as_mut() {
2062                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2063                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2064                    {
2065                        invalid_intf_addrs.insert(intf_addr);
2066                    }
2067                }
2068            }
2069
2070            // For finished probes, wake up services that are waiting for the probes.
2071            let waiting_services =
2072                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2073
2074            for service_name in waiting_services {
2075                // service names are lowercase
2076                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2077                    if info.get_status(*if_index) == ServiceStatus::Announced {
2078                        debug!("service {} already announced", info.get_fullname());
2079                        continue;
2080                    }
2081
2082                    let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2083                        match announce_service_on_intf(
2084                            dns_registry,
2085                            info,
2086                            intf,
2087                            &sock.pktinfo,
2088                            self.port,
2089                        ) {
2090                            Ok(announced) => announced,
2091                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2092                                invalid_intf_addrs.insert(intf_addr);
2093                                false
2094                            }
2095                        }
2096                    } else {
2097                        false
2098                    };
2099                    let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2100                        match announce_service_on_intf(
2101                            dns_registry,
2102                            info,
2103                            intf,
2104                            &sock.pktinfo,
2105                            self.port,
2106                        ) {
2107                            Ok(announced) => announced,
2108                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2109                                invalid_intf_addrs.insert(intf_addr);
2110                                false
2111                            }
2112                        }
2113                    } else {
2114                        false
2115                    };
2116
2117                    if announced_v4 || announced_v6 {
2118                        let next_time = now + 1000;
2119                        let command =
2120                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2121                        self.retransmissions.push(ReRun { next_time, command });
2122                        self.timers.push(Reverse(next_time));
2123
2124                        let fullname = match dns_registry.name_changes.get(&service_name) {
2125                            Some(new_name) => new_name.to_string(),
2126                            None => service_name.to_string(),
2127                        };
2128
2129                        let mut hostname = info.get_hostname();
2130                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2131                            hostname = new_name;
2132                        }
2133
2134                        debug!("wake up: announce service {} on {}", fullname, intf.name);
2135                        notify_monitors(
2136                            &mut self.monitors,
2137                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2138                        );
2139
2140                        info.set_status(*if_index, ServiceStatus::Announced);
2141                    }
2142                }
2143            }
2144        }
2145
2146        if !invalid_intf_addrs.is_empty() {
2147            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2148        }
2149    }
2150
2151    fn unregister_service(
2152        &self,
2153        info: &ServiceInfo,
2154        intf: &MyIntf,
2155        sock: &PktInfoUdpSocket,
2156    ) -> Vec<u8> {
2157        let is_ipv4 = sock.domain() == Domain::IPV4;
2158
2159        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2160        out.add_answer_at_time(
2161            DnsPointer::new(
2162                info.get_type(),
2163                RRType::PTR,
2164                CLASS_IN,
2165                0,
2166                info.get_fullname().to_string(),
2167            ),
2168            0,
2169        );
2170
2171        if let Some(sub) = info.get_subtype() {
2172            trace!("Adding subdomain {}", sub);
2173            out.add_answer_at_time(
2174                DnsPointer::new(
2175                    sub,
2176                    RRType::PTR,
2177                    CLASS_IN,
2178                    0,
2179                    info.get_fullname().to_string(),
2180                ),
2181                0,
2182            );
2183        }
2184
2185        out.add_answer_at_time(
2186            DnsSrv::new(
2187                info.get_fullname(),
2188                CLASS_IN | CLASS_CACHE_FLUSH,
2189                0,
2190                info.get_priority(),
2191                info.get_weight(),
2192                info.get_port(),
2193                info.get_hostname().to_string(),
2194            ),
2195            0,
2196        );
2197        out.add_answer_at_time(
2198            DnsTxt::new(
2199                info.get_fullname(),
2200                CLASS_IN | CLASS_CACHE_FLUSH,
2201                0,
2202                info.generate_txt(),
2203            ),
2204            0,
2205        );
2206
2207        let if_addrs = if is_ipv4 {
2208            info.get_addrs_on_my_intf_v4(intf)
2209        } else {
2210            info.get_addrs_on_my_intf_v6(intf)
2211        };
2212
2213        if if_addrs.is_empty() {
2214            return vec![];
2215        }
2216
2217        for address in if_addrs {
2218            out.add_answer_at_time(
2219                DnsAddress::new(
2220                    info.get_hostname(),
2221                    ip_address_rr_type(&address),
2222                    CLASS_IN | CLASS_CACHE_FLUSH,
2223                    0,
2224                    address,
2225                    intf.into(),
2226                ),
2227                0,
2228            );
2229        }
2230
2231        // Only (at most) one packet is expected to be sent out.
2232        let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port) {
2233            Ok(sent_vec) => sent_vec,
2234            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2235                let invalid_intf_addrs = HashSet::from([intf_addr]);
2236                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2237                vec![]
2238            }
2239        };
2240        sent_vec.into_iter().next().unwrap_or_default()
2241    }
2242
2243    /// Binds a channel `listener` to querying mDNS hostnames.
2244    ///
2245    /// If there is already a `listener`, it will be updated, i.e. overwritten.
2246    fn add_hostname_resolver(
2247        &mut self,
2248        hostname: String,
2249        listener: Sender<HostnameResolutionEvent>,
2250        timeout: Option<u64>,
2251    ) {
2252        let real_timeout = timeout.map(|t| current_time_millis() + t);
2253        self.hostname_resolvers
2254            .insert(hostname.to_lowercase(), (listener, real_timeout));
2255        if let Some(t) = real_timeout {
2256            self.add_timer(t);
2257        }
2258    }
2259
2260    /// Sends a multicast query for `name` with `qtype`.
2261    fn send_query(&self, name: &str, qtype: RRType) {
2262        self.send_query_vec(&[(name, qtype)]);
2263    }
2264
2265    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
2266    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2267        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2268        let now = current_time_millis();
2269
2270        for (name, qtype) in questions {
2271            out.add_question(name, *qtype);
2272
2273            for record in self.cache.get_known_answers(name, *qtype, now) {
2274                /*
2275                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
2276                ...
2277                    When a Multicast DNS querier sends a query to which it already knows
2278                    some answers, it populates the Answer Section of the DNS query
2279                    message with those answers.
2280                 */
2281                trace!("add known answer: {:?}", record.record);
2282                let mut new_record = record.record.clone();
2283                new_record.get_record_mut().update_ttl(now);
2284                out.add_answer_box(new_record);
2285            }
2286        }
2287
2288        let mut invalid_intf_addrs = HashSet::new();
2289        for (_, intf) in self.my_intfs.iter() {
2290            if let Some(sock) = self.ipv4_sock.as_ref() {
2291                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2292                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2293                {
2294                    invalid_intf_addrs.insert(intf_addr);
2295                }
2296            }
2297            if let Some(sock) = self.ipv6_sock.as_ref() {
2298                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2299                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2300                {
2301                    invalid_intf_addrs.insert(intf_addr);
2302                }
2303            }
2304        }
2305
2306        if !invalid_intf_addrs.is_empty() {
2307            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2308        }
2309    }
2310
2311    /// Reads one UDP datagram from the socket of `intf`.
2312    ///
2313    /// Returns false if failed to receive a packet,
2314    /// otherwise returns true.
2315    fn handle_read(&mut self, event_key: usize) -> bool {
2316        let sock_opt = match event_key {
2317            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2318            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2319            _ => {
2320                debug!("handle_read: unknown token {}", event_key);
2321                return false;
2322            }
2323        };
2324        let Some(sock) = sock_opt.as_mut() else {
2325            debug!("handle_read: socket not available for token {}", event_key);
2326            return false;
2327        };
2328        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2329
2330        // Read the next mDNS UDP datagram.
2331        //
2332        // If the datagram is larger than `buf`, excess bytes may or may not
2333        // be truncated by the socket layer depending on the platform's libc.
2334        // In any case, such large datagram will not be decoded properly and
2335        // this function should return false but should not crash.
2336        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2337            Ok(sz) => sz,
2338            Err(e) => {
2339                if e.kind() != std::io::ErrorKind::WouldBlock {
2340                    debug!("listening socket read failed: {}", e);
2341                }
2342                return false;
2343            }
2344        };
2345
2346        // Find the interface that received the packet.
2347        let pkt_if_index = pktinfo.if_index as u32;
2348        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2349            debug!(
2350                "handle_read: no interface found for pktinfo if_index: {}",
2351                pktinfo.if_index
2352            );
2353            return true; // We still return true to indicate that we read something.
2354        };
2355
2356        // Drop packets for an IP version that has been disabled on this interface.
2357        // This is needed because some times the socket layer may still receive packets
2358        // for an IP version even after we left the multicast group for that IP version.
2359        // We want to drop such packets to avoid unnecessary processing.
2360        let is_ipv4 = event_key == IPV4_SOCK_EVENT_KEY;
2361        if (is_ipv4 && my_intf.next_ifaddr_v4().is_none())
2362            || (!is_ipv4 && my_intf.next_ifaddr_v6().is_none())
2363        {
2364            debug!(
2365                "handle_read: dropping {} packet on intf {} (disabled)",
2366                if is_ipv4 { "IPv4" } else { "IPv6" },
2367                my_intf.name
2368            );
2369            return true;
2370        }
2371
2372        buf.truncate(sz); // reduce potential processing errors
2373
2374        match DnsIncoming::new(buf, my_intf.into()) {
2375            Ok(msg) => {
2376                if msg.is_query() {
2377                    self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2378                } else if msg.is_response() {
2379                    self.handle_response(msg, pkt_if_index);
2380                } else {
2381                    debug!("Invalid message: not query and not response");
2382                }
2383            }
2384            Err(e) => debug!("Invalid incoming DNS message: {}", e),
2385        }
2386
2387        true
2388    }
2389
2390    /// Returns true, if sent query. Returns false if SRV already exists.
2391    fn query_unresolved(&mut self, instance: &str) -> bool {
2392        if !valid_instance_name(instance) {
2393            trace!("instance name {} not valid", instance);
2394            return false;
2395        }
2396
2397        if let Some(records) = self.cache.get_srv(instance) {
2398            for record in records {
2399                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2400                    if self.cache.get_addr(srv.host()).is_none() {
2401                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2402                        return true;
2403                    }
2404                }
2405            }
2406        } else {
2407            self.send_query(instance, RRType::ANY);
2408            return true;
2409        }
2410
2411        false
2412    }
2413
2414    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2415    /// cached records via `sender`.
2416    fn query_cache_for_service(
2417        &mut self,
2418        ty_domain: &str,
2419        sender: &Sender<ServiceEvent>,
2420        now: u64,
2421    ) {
2422        let mut resolved: HashSet<String> = HashSet::new();
2423        let mut unresolved: HashSet<String> = HashSet::new();
2424
2425        if let Some(records) = self.cache.get_ptr(ty_domain) {
2426            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2427                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2428                    let mut new_event = None;
2429                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2430                        Ok(resolved_service) => {
2431                            if resolved_service.is_valid() {
2432                                debug!("Resolved service from cache: {}", ptr.alias());
2433                                new_event =
2434                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2435                            } else {
2436                                debug!("Resolved service is not valid: {}", ptr.alias());
2437                            }
2438                        }
2439                        Err(err) => {
2440                            debug!("Error while resolving service from cache: {}", err);
2441                            continue;
2442                        }
2443                    }
2444
2445                    match sender.send(ServiceEvent::ServiceFound(
2446                        ty_domain.to_string(),
2447                        ptr.alias().to_string(),
2448                    )) {
2449                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2450                        Err(e) => {
2451                            debug!("failed to send service found: {}", e);
2452                            continue;
2453                        }
2454                    }
2455
2456                    if let Some(event) = new_event {
2457                        resolved.insert(ptr.alias().to_string());
2458                        match sender.send(event) {
2459                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2460                            Err(e) => debug!("failed to send service resolved: {}", e),
2461                        }
2462                    } else {
2463                        unresolved.insert(ptr.alias().to_string());
2464                    }
2465                }
2466            }
2467        }
2468
2469        for instance in resolved.drain() {
2470            self.pending_resolves.remove(&instance);
2471            self.resolved.insert(instance);
2472        }
2473
2474        for instance in unresolved.drain() {
2475            self.add_pending_resolve(instance);
2476        }
2477    }
2478
2479    /// Checks if `hostname` has records in the cache. If yes, sends the
2480    /// cached records via `sender`.
2481    fn query_cache_for_hostname(
2482        &mut self,
2483        hostname: &str,
2484        sender: Sender<HostnameResolutionEvent>,
2485    ) {
2486        let addresses_map = self.cache.get_addresses_for_host(hostname);
2487        for (name, addresses) in addresses_map {
2488            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2489                Ok(()) => trace!("sent hostname addresses found"),
2490                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2491            }
2492        }
2493    }
2494
2495    fn add_pending_resolve(&mut self, instance: String) {
2496        if !self.pending_resolves.contains(&instance) {
2497            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2498            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2499            self.pending_resolves.insert(instance);
2500        }
2501    }
2502
2503    /// Creates a `ResolvedService` from the cache.
2504    fn resolve_service_from_cache(
2505        &self,
2506        ty_domain: &str,
2507        fullname: &str,
2508    ) -> Result<ResolvedService> {
2509        let now = current_time_millis();
2510        let mut resolved_service = ResolvedService {
2511            ty_domain: ty_domain.to_string(),
2512            sub_ty_domain: None,
2513            fullname: fullname.to_string(),
2514            host: String::new(),
2515            port: 0,
2516            addresses: HashSet::new(),
2517            txt_properties: TxtProperties::new(),
2518        };
2519
2520        // Be sure setting `subtype` if available even when querying for the parent domain.
2521        if let Some(subtype) = self.cache.get_subtype(fullname) {
2522            trace!(
2523                "ty_domain: {} found subtype {} for instance: {}",
2524                ty_domain,
2525                subtype,
2526                fullname
2527            );
2528            if resolved_service.sub_ty_domain.is_none() {
2529                resolved_service.sub_ty_domain = Some(subtype.to_string());
2530            }
2531        }
2532
2533        // resolve SRV record
2534        if let Some(records) = self.cache.get_srv(fullname) {
2535            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2536                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2537                    resolved_service.host = dns_srv.host().to_string();
2538                    resolved_service.port = dns_srv.port();
2539                }
2540            }
2541        }
2542
2543        // resolve TXT record
2544        if let Some(records) = self.cache.get_txt(fullname) {
2545            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2546                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2547                    resolved_service.txt_properties = dns_txt.text().into();
2548                }
2549            }
2550        }
2551
2552        // resolve A and AAAA records
2553        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2554            for answer in records.iter() {
2555                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2556                    if dns_a.expires_soon(now) {
2557                        trace!(
2558                            "Addr expired or expires soon: {}",
2559                            dns_a.address().to_ip_addr()
2560                        );
2561                    } else {
2562                        resolved_service.addresses.insert(dns_a.address());
2563                    }
2564                }
2565            }
2566        }
2567
2568        Ok(resolved_service)
2569    }
2570
2571    fn handle_poller_events(&mut self, events: &mio::Events) {
2572        for ev in events.iter() {
2573            trace!("event received with key {:?}", ev.token());
2574            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2575                // Drain signals as we will drain commands as well.
2576                self.signal_sock_drain();
2577
2578                if let Err(e) = self.poller.registry().reregister(
2579                    &mut self.signal_sock,
2580                    ev.token(),
2581                    mio::Interest::READABLE,
2582                ) {
2583                    debug!("failed to modify poller for signal socket: {}", e);
2584                }
2585                continue; // Next event.
2586            }
2587
2588            // Read until no more packets available.
2589            while self.handle_read(ev.token().0) {}
2590
2591            // we continue to monitor this socket.
2592            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2593                // Re-register the IPv4 socket for reading.
2594                if let Some(sock) = self.ipv4_sock.as_mut() {
2595                    if let Err(e) =
2596                        self.poller
2597                            .registry()
2598                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2599                    {
2600                        debug!("modify poller for IPv4 socket: {}", e);
2601                    }
2602                }
2603            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2604                // Re-register the IPv6 socket for reading.
2605                if let Some(sock) = self.ipv6_sock.as_mut() {
2606                    if let Err(e) =
2607                        self.poller
2608                            .registry()
2609                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2610                    {
2611                        debug!("modify poller for IPv6 socket: {}", e);
2612                    }
2613                }
2614            }
2615        }
2616    }
2617
2618    /// Deal with incoming response packets.  All answers
2619    /// are held in the cache, and listeners are notified.
2620    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2621        let now = current_time_millis();
2622
2623        // remove records that are expired.
2624        let mut record_predicate = |record: &DnsRecordBox| {
2625            if !record.get_record().is_expired(now) {
2626                return true;
2627            }
2628
2629            debug!("record is expired, removing it from cache.");
2630            if self.cache.remove(record) {
2631                // for PTR records, send event to listeners
2632                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2633                    call_service_listener(
2634                        &self.service_queriers,
2635                        dns_ptr.get_name(),
2636                        ServiceEvent::ServiceRemoved(
2637                            dns_ptr.get_name().to_string(),
2638                            dns_ptr.alias().to_string(),
2639                        ),
2640                    );
2641                }
2642            }
2643            false
2644        };
2645        msg.answers_mut().retain(&mut record_predicate);
2646        msg.authorities_mut().retain(&mut record_predicate);
2647        msg.additionals_mut().retain(&mut record_predicate);
2648
2649        // check possible conflicts and handle them.
2650        self.conflict_handler(&msg, if_index);
2651
2652        // check if the message is for us.
2653        let mut is_for_us = true; // assume it is for us.
2654
2655        // If there are any PTR records in the answers, there should be
2656        // at least one PTR for us. Otherwise, the message is not for us.
2657        // If there are no PTR records at all, assume this message is for us.
2658        for answer in msg.answers() {
2659            if answer.get_type() == RRType::PTR {
2660                if self.service_queriers.contains_key(answer.get_name()) {
2661                    is_for_us = true;
2662                    break; // OK to break: at least one PTR for us.
2663                } else {
2664                    is_for_us = false;
2665                }
2666            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2667                // If there is a hostname querier for this address, then it is for us.
2668                let answer_lowercase = answer.get_name().to_lowercase();
2669                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2670                    is_for_us = true;
2671                    break; // OK to break: at least one hostname for us.
2672                }
2673            }
2674        }
2675
2676        // if we explicitily want to accept unsolicited responses, we should consider all messages as for us.
2677        if self.accept_unsolicited {
2678            is_for_us = true;
2679        }
2680
2681        /// Represents a DNS record change that involves one service instance.
2682        struct InstanceChange {
2683            ty: RRType,   // The type of DNS record for the instance.
2684            name: String, // The name of the record.
2685        }
2686
2687        // Go through all answers to get the new and updated records.
2688        // For new PTR records, send out ServiceFound immediately. For others,
2689        // collect them into `changes`.
2690        //
2691        // Note: we don't try to identify the update instances based on
2692        // each record immediately as the answers are likely related to each
2693        // other.
2694        let mut changes = Vec::new();
2695        let mut timers = Vec::new();
2696        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2697            return;
2698        };
2699        for record in msg.all_records() {
2700            match self
2701                .cache
2702                .add_or_update(my_intf, record, &mut timers, is_for_us)
2703            {
2704                Some((dns_record, true)) => {
2705                    timers.push(dns_record.record.get_record().get_expire_time());
2706                    timers.push(dns_record.record.get_record().get_refresh_time());
2707
2708                    let ty = dns_record.record.get_type();
2709                    let name = dns_record.record.get_name();
2710
2711                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2712                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2713                        if self.service_queriers.contains_key(name) {
2714                            timers.push(dns_record.record.get_record().get_refresh_time());
2715                        }
2716
2717                        // send ServiceFound
2718                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2719                        {
2720                            debug!("calling listener with service found: {name}");
2721                            call_service_listener(
2722                                &self.service_queriers,
2723                                name,
2724                                ServiceEvent::ServiceFound(
2725                                    name.to_string(),
2726                                    dns_ptr.alias().to_string(),
2727                                ),
2728                            );
2729                            changes.push(InstanceChange {
2730                                ty,
2731                                name: dns_ptr.alias().to_string(),
2732                            });
2733                        }
2734                    } else {
2735                        changes.push(InstanceChange {
2736                            ty,
2737                            name: name.to_string(),
2738                        });
2739                    }
2740                }
2741                Some((dns_record, false)) => {
2742                    timers.push(dns_record.record.get_record().get_expire_time());
2743                    timers.push(dns_record.record.get_record().get_refresh_time());
2744                }
2745                _ => {}
2746            }
2747        }
2748
2749        // Add timers for the new records.
2750        for t in timers {
2751            self.add_timer(t);
2752        }
2753
2754        // Go through remaining changes to see if any hostname resolutions were found or updated.
2755        for change in changes
2756            .iter()
2757            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2758        {
2759            let addr_map = self.cache.get_addresses_for_host(&change.name);
2760            for (name, addresses) in addr_map {
2761                call_hostname_resolution_listener(
2762                    &self.hostname_resolvers,
2763                    &change.name,
2764                    HostnameResolutionEvent::AddressesFound(name, addresses),
2765                )
2766            }
2767        }
2768
2769        // Identify the instances that need to be "resolved".
2770        let mut updated_instances = HashSet::new();
2771        for update in changes {
2772            match update.ty {
2773                RRType::PTR | RRType::SRV | RRType::TXT => {
2774                    updated_instances.insert(update.name);
2775                }
2776                RRType::A | RRType::AAAA => {
2777                    let instances = self.cache.get_instances_on_host(&update.name);
2778                    updated_instances.extend(instances);
2779                }
2780                _ => {}
2781            }
2782        }
2783
2784        self.resolve_updated_instances(&updated_instances);
2785    }
2786
2787    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2788        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2789            debug!("handle_response: no intf found for index {if_index}");
2790            return;
2791        };
2792
2793        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2794            return;
2795        };
2796
2797        for answer in msg.answers().iter() {
2798            let mut new_records = Vec::new();
2799
2800            let name = answer.get_name();
2801            let Some(probe) = dns_registry.probing.get_mut(name) else {
2802                continue;
2803            };
2804
2805            // check against possible multicast forwarding
2806            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2807                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2808                    if answer_addr.interface_id.index != if_index {
2809                        debug!(
2810                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2811                            answer_addr, my_intf.name
2812                        );
2813                        continue;
2814                    }
2815                }
2816
2817                // double check if any other address record matches rrdata,
2818                // as there could be multiple addresses for the same name.
2819                let any_match = probe.records.iter().any(|r| {
2820                    r.get_type() == answer.get_type()
2821                        && r.get_class() == answer.get_class()
2822                        && r.rrdata_match(answer.as_ref())
2823                });
2824                if any_match {
2825                    continue; // no conflict for this answer.
2826                }
2827            }
2828
2829            probe.records.retain(|record| {
2830                if record.get_type() == answer.get_type()
2831                    && record.get_class() == answer.get_class()
2832                    && !record.rrdata_match(answer.as_ref())
2833                {
2834                    debug!(
2835                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2836                        record.get_type(),
2837                        record.rdata_print(),
2838                        answer.rdata_print()
2839                    );
2840
2841                    // create a new name for this record
2842                    // then remove the old record in probing.
2843                    let mut new_record = record.clone();
2844                    let new_name = match record.get_type() {
2845                        RRType::A => hostname_change(name),
2846                        RRType::AAAA => hostname_change(name),
2847                        _ => name_change(name),
2848                    };
2849                    new_record.get_record_mut().set_new_name(new_name);
2850                    new_records.push(new_record);
2851                    return false; // old record is dropped from the probe.
2852                }
2853
2854                true
2855            });
2856
2857            // ?????
2858            // if probe.records.is_empty() {
2859            //     dns_registry.probing.remove(name);
2860            // }
2861
2862            // Probing again with the new names.
2863            let create_time = current_time_millis() + fastrand::u64(0..250);
2864
2865            let waiting_services = probe.waiting_services.clone();
2866
2867            for record in new_records {
2868                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2869                    self.timers.push(Reverse(create_time));
2870                }
2871
2872                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2873                dns_registry.name_changes.insert(
2874                    record.get_record().get_original_name().to_string(),
2875                    record.get_name().to_string(),
2876                );
2877
2878                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2879                    Some(p) => p,
2880                    None => {
2881                        let new_probe = dns_registry
2882                            .probing
2883                            .entry(record.get_name().to_string())
2884                            .or_insert_with(|| {
2885                                debug!("conflict handler: new probe of {}", record.get_name());
2886                                Probe::new(create_time)
2887                            });
2888                        self.timers.push(Reverse(new_probe.next_send));
2889                        new_probe
2890                    }
2891                };
2892
2893                debug!(
2894                    "insert record with new name '{}' {} into probe",
2895                    record.get_name(),
2896                    record.get_type()
2897                );
2898                new_probe.insert_record(record);
2899
2900                new_probe.waiting_services.extend(waiting_services.clone());
2901            }
2902        }
2903    }
2904
2905    /// Resolve the updated (including new) instances.
2906    ///
2907    /// Note: it is possible that more than 1 PTR pointing to the same
2908    /// instance. For example, a regular service type PTR and a sub-type
2909    /// service type PTR can both point to the same service instance.
2910    /// This loop automatically handles the sub-type PTRs.
2911    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2912        let mut resolved: HashSet<String> = HashSet::new();
2913        let mut unresolved: HashSet<String> = HashSet::new();
2914        let mut removed_instances = HashMap::new();
2915
2916        let now = current_time_millis();
2917
2918        for (ty_domain, records) in self.cache.all_ptr().iter() {
2919            if !self.service_queriers.contains_key(ty_domain) {
2920                // No need to resolve if not in our queries.
2921                continue;
2922            }
2923
2924            for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2925                let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2926                    continue;
2927                };
2928
2929                let instance = dns_ptr.alias();
2930                if !updated_instances.contains(instance) {
2931                    continue;
2932                }
2933
2934                let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2935                else {
2936                    continue;
2937                };
2938
2939                debug!("resolve_updated_instances: from cache: {instance}");
2940                if resolved_service.is_valid() {
2941                    debug!("call queriers to resolve {instance}");
2942                    resolved.insert(instance.to_string());
2943                    let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2944                    call_service_listener(&self.service_queriers, ty_domain, event);
2945                } else {
2946                    debug!("Resolved service is not valid: {instance}");
2947                    if self.resolved.remove(dns_ptr.alias()) {
2948                        removed_instances
2949                            .entry(ty_domain.to_string())
2950                            .or_insert_with(HashSet::new)
2951                            .insert(instance.to_string());
2952                    }
2953                    unresolved.insert(instance.to_string());
2954                }
2955            }
2956        }
2957
2958        for instance in resolved.drain() {
2959            self.pending_resolves.remove(&instance);
2960            self.resolved.insert(instance);
2961        }
2962
2963        for instance in unresolved.drain() {
2964            self.add_pending_resolve(instance);
2965        }
2966
2967        if !removed_instances.is_empty() {
2968            debug!(
2969                "resolve_updated_instances: removed {}",
2970                &removed_instances.len()
2971            );
2972            self.notify_service_removal(removed_instances);
2973        }
2974    }
2975
2976    /// Handle incoming query packets, figure out whether and what to respond.
2977    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2978        let sock_opt = if is_ipv4 {
2979            &self.ipv4_sock
2980        } else {
2981            &self.ipv6_sock
2982        };
2983        let Some(sock) = sock_opt.as_ref() else {
2984            debug!("handle_query: socket not available for intf {}", if_index);
2985            return;
2986        };
2987        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2988
2989        // Special meta-query "_services._dns-sd._udp.<Domain>".
2990        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2991        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2992
2993        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2994            debug!("missing dns registry for intf {}", if_index);
2995            return;
2996        };
2997
2998        let Some(intf) = self.my_intfs.get(&if_index) else {
2999            debug!("handle_query: no intf found for index {if_index}");
3000            return;
3001        };
3002
3003        for question in msg.questions().iter() {
3004            let qtype = question.entry_type();
3005
3006            if qtype == RRType::PTR {
3007                for service in self.my_services.values() {
3008                    if service.get_status(if_index) != ServiceStatus::Announced {
3009                        continue;
3010                    }
3011
3012                    if question.entry_name() == service.get_type()
3013                        || service
3014                            .get_subtype()
3015                            .as_ref()
3016                            .is_some_and(|v| v == question.entry_name())
3017                    {
3018                        add_answer_with_additionals(
3019                            &mut out,
3020                            &msg,
3021                            service,
3022                            intf,
3023                            dns_registry,
3024                            is_ipv4,
3025                        );
3026                    } else if question.entry_name() == META_QUERY {
3027                        let ptr_added = out.add_answer(
3028                            &msg,
3029                            DnsPointer::new(
3030                                question.entry_name(),
3031                                RRType::PTR,
3032                                CLASS_IN,
3033                                service.get_other_ttl(),
3034                                service.get_type().to_string(),
3035                            ),
3036                        );
3037                        if !ptr_added {
3038                            trace!("answer was not added for meta-query {:?}", &question);
3039                        }
3040                    }
3041                }
3042            } else {
3043                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
3044                if qtype == RRType::ANY && msg.num_authorities() > 0 {
3045                    let probe_name = question.entry_name();
3046
3047                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
3048                        let now = current_time_millis();
3049
3050                        // Only do tiebreaking if probe already started.
3051                        // This check also helps avoid redo tiebreaking if start time
3052                        // was postponed.
3053                        if probe.start_time < now {
3054                            let incoming_records: Vec<_> = msg
3055                                .authorities()
3056                                .iter()
3057                                .filter(|r| r.get_name() == probe_name)
3058                                .collect();
3059
3060                            probe.tiebreaking(&incoming_records, now, probe_name);
3061                        }
3062                    }
3063                }
3064
3065                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3066                    for service in self.my_services.values() {
3067                        if service.get_status(if_index) != ServiceStatus::Announced {
3068                            continue;
3069                        }
3070
3071                        let service_hostname =
3072                            match dns_registry.name_changes.get(service.get_hostname()) {
3073                                Some(new_name) => new_name,
3074                                None => service.get_hostname(),
3075                            };
3076
3077                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3078                            let intf_addrs = if is_ipv4 {
3079                                service.get_addrs_on_my_intf_v4(intf)
3080                            } else {
3081                                service.get_addrs_on_my_intf_v6(intf)
3082                            };
3083                            if intf_addrs.is_empty()
3084                                && (qtype == RRType::A || qtype == RRType::AAAA)
3085                            {
3086                                let t = match qtype {
3087                                    RRType::A => "TYPE_A",
3088                                    RRType::AAAA => "TYPE_AAAA",
3089                                    _ => "invalid_type",
3090                                };
3091                                trace!(
3092                                    "Cannot find valid addrs for {} response on intf {:?}",
3093                                    t,
3094                                    &intf
3095                                );
3096                                return;
3097                            }
3098                            for address in intf_addrs {
3099                                out.add_answer(
3100                                    &msg,
3101                                    DnsAddress::new(
3102                                        service_hostname,
3103                                        ip_address_rr_type(&address),
3104                                        CLASS_IN | CLASS_CACHE_FLUSH,
3105                                        service.get_host_ttl(),
3106                                        address,
3107                                        intf.into(),
3108                                    ),
3109                                );
3110                            }
3111                        }
3112                    }
3113                }
3114
3115                let query_name = question.entry_name().to_lowercase();
3116                let service_opt = self
3117                    .my_services
3118                    .iter()
3119                    .find(|(k, _v)| {
3120                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
3121                            Some(new_name) => new_name,
3122                            None => k,
3123                        };
3124                        service_name == &query_name
3125                    })
3126                    .map(|(_, v)| v);
3127
3128                let Some(service) = service_opt else {
3129                    continue;
3130                };
3131
3132                if service.get_status(if_index) != ServiceStatus::Announced {
3133                    continue;
3134                }
3135
3136                let intf_addrs = if is_ipv4 {
3137                    service.get_addrs_on_my_intf_v4(intf)
3138                } else {
3139                    service.get_addrs_on_my_intf_v6(intf)
3140                };
3141                if intf_addrs.is_empty() {
3142                    debug!(
3143                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3144                        &intf
3145                    );
3146                    continue;
3147                }
3148
3149                add_answer_of_service(
3150                    &mut out,
3151                    &msg,
3152                    question.entry_name(),
3153                    service,
3154                    qtype,
3155                    intf_addrs,
3156                );
3157            }
3158        }
3159
3160        if out.answers_count() > 0 {
3161            debug!("sending response on intf {}", &intf.name);
3162            out.set_id(msg.id());
3163            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3164                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
3165            {
3166                let invalid_intf_addr = HashSet::from([intf_addr]);
3167                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3168            }
3169
3170            let if_name = intf.name.clone();
3171
3172            self.increase_counter(Counter::Respond, 1);
3173            self.notify_monitors(DaemonEvent::Respond(if_name));
3174        }
3175
3176        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3177    }
3178
3179    /// Increases the value of `counter` by `count`.
3180    fn increase_counter(&mut self, counter: Counter, count: i64) {
3181        let key = counter.to_string();
3182        match self.counters.get_mut(&key) {
3183            Some(v) => *v += count,
3184            None => {
3185                self.counters.insert(key, count);
3186            }
3187        }
3188    }
3189
3190    /// Sets the value of `counter` to `count`.
3191    fn set_counter(&mut self, counter: Counter, count: i64) {
3192        let key = counter.to_string();
3193        self.counters.insert(key, count);
3194    }
3195
3196    fn signal_sock_drain(&self) {
3197        let mut signal_buf = [0; 1024];
3198
3199        // This recv is non-blocking as the socket is non-blocking.
3200        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3201            trace!(
3202                "signal socket recvd: {}",
3203                String::from_utf8_lossy(&signal_buf[0..sz])
3204            );
3205        }
3206    }
3207
3208    fn add_retransmission(&mut self, next_time: u64, command: Command) {
3209        self.retransmissions.push(ReRun { next_time, command });
3210        self.add_timer(next_time);
3211    }
3212
3213    /// Sends service removal event to listeners for expired service records.
3214    /// `expired`: map of service type domain to set of instance names.
3215    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3216        for (ty_domain, sender) in self.service_queriers.iter() {
3217            if let Some(instances) = expired.get(ty_domain) {
3218                for instance_name in instances {
3219                    let event = ServiceEvent::ServiceRemoved(
3220                        ty_domain.to_string(),
3221                        instance_name.to_string(),
3222                    );
3223                    match sender.send(event) {
3224                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3225                        Err(e) => debug!("Failed to send event: {}", e),
3226                    }
3227                }
3228            }
3229        }
3230    }
3231
3232    /// The entry point that executes all commands received by the daemon.
3233    ///
3234    /// `repeating`: whether this is a retransmission.
3235    fn exec_command(&mut self, command: Command, repeating: bool) {
3236        trace!("exec_command: {:?} repeating: {}", &command, repeating);
3237        match command {
3238            Command::Browse(ty, next_delay, cache_only, listener) => {
3239                self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3240            }
3241
3242            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3243                self.exec_command_resolve_hostname(
3244                    repeating, hostname, next_delay, listener, timeout,
3245                );
3246            }
3247
3248            Command::Register(service_info) => {
3249                self.register_service(*service_info);
3250                self.increase_counter(Counter::Register, 1);
3251            }
3252
3253            Command::RegisterResend(fullname, intf) => {
3254                trace!("register-resend service: {fullname} on {}", &intf);
3255                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3256                    self.exec_command_register_resend(fullname, intf)
3257                {
3258                    let invalid_intf_addr = HashSet::from([intf_addr]);
3259                    let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3260                }
3261            }
3262
3263            Command::Unregister(fullname, resp_s) => {
3264                trace!("unregister service {} repeat {}", &fullname, &repeating);
3265                self.exec_command_unregister(repeating, fullname, resp_s);
3266            }
3267
3268            Command::UnregisterResend(packet, if_index, is_ipv4) => {
3269                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3270            }
3271
3272            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3273
3274            Command::StopResolveHostname(hostname) => {
3275                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3276            }
3277
3278            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3279
3280            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3281
3282            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3283                Ok(()) => trace!("Sent status to the client"),
3284                Err(e) => debug!("Failed to send status: {}", e),
3285            },
3286
3287            Command::Monitor(resp_s) => {
3288                self.monitors.push(resp_s);
3289            }
3290
3291            Command::SetOption(daemon_opt) => {
3292                self.process_set_option(daemon_opt);
3293            }
3294
3295            Command::GetOption(resp_s) => {
3296                let val = DaemonOptionVal {
3297                    _service_name_len_max: self.service_name_len_max,
3298                    ip_check_interval: self.ip_check_interval,
3299                };
3300                if let Err(e) = resp_s.send(val) {
3301                    debug!("Failed to send options: {}", e);
3302                }
3303            }
3304
3305            Command::Verify(instance_fullname, timeout) => {
3306                self.exec_command_verify(instance_fullname, timeout, repeating);
3307            }
3308
3309            Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3310                for intf_addr in invalid_intf_addrs {
3311                    self.del_interface_addr(&intf_addr);
3312                }
3313
3314                self.check_ip_changes();
3315            }
3316
3317            _ => {
3318                debug!("unexpected command: {:?}", &command);
3319            }
3320        }
3321    }
3322
3323    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3324        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3325        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3326        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3327        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3328        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3329        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3330        self.set_counter(Counter::Timer, self.timers.len() as i64);
3331
3332        let dns_registry_probe_count: usize = self
3333            .dns_registry_map
3334            .values()
3335            .map(|r| r.probing.len())
3336            .sum();
3337        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3338
3339        let dns_registry_active_count: usize = self
3340            .dns_registry_map
3341            .values()
3342            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3343            .sum();
3344        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3345
3346        let dns_registry_timer_count: usize = self
3347            .dns_registry_map
3348            .values()
3349            .map(|r| r.new_timers.len())
3350            .sum();
3351        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3352
3353        let dns_registry_name_change_count: usize = self
3354            .dns_registry_map
3355            .values()
3356            .map(|r| r.name_changes.len())
3357            .sum();
3358        self.set_counter(
3359            Counter::DnsRegistryNameChange,
3360            dns_registry_name_change_count as i64,
3361        );
3362
3363        // Send the metrics to the client.
3364        if let Err(e) = resp_s.send(self.counters.clone()) {
3365            debug!("Failed to send metrics: {}", e);
3366        }
3367    }
3368
3369    fn exec_command_browse(
3370        &mut self,
3371        repeating: bool,
3372        ty: String,
3373        next_delay: u32,
3374        cache_only: bool,
3375        listener: Sender<ServiceEvent>,
3376    ) {
3377        let pretty_addrs: Vec<String> = self
3378            .my_intfs
3379            .iter()
3380            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3381            .collect();
3382
3383        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3384            "{ty} on {} interfaces [{}]",
3385            pretty_addrs.len(),
3386            pretty_addrs.join(", ")
3387        ))) {
3388            debug!(
3389                "Failed to send SearchStarted({})(repeating:{}): {}",
3390                &ty, repeating, e
3391            );
3392            return;
3393        }
3394
3395        let now = current_time_millis();
3396        if !repeating {
3397            // Binds a `listener` to querying mDNS domain type `ty`.
3398            //
3399            // If there is already a `listener`, it will be updated, i.e. overwritten.
3400            self.service_queriers.insert(ty.clone(), listener.clone());
3401
3402            // if we already have the records in our cache, just send them
3403            self.query_cache_for_service(&ty, &listener, now);
3404        }
3405
3406        if cache_only {
3407            // If cache_only is true, we do not send a query.
3408            match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3409                Ok(()) => debug!("SearchStopped sent for {}", &ty),
3410                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3411            }
3412            return;
3413        }
3414
3415        self.send_query(&ty, RRType::PTR);
3416        self.increase_counter(Counter::Browse, 1);
3417
3418        let next_time = now + (next_delay * 1000) as u64;
3419        let max_delay = 60 * 60;
3420        let delay = cmp::min(next_delay * 2, max_delay);
3421        self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3422    }
3423
3424    fn exec_command_resolve_hostname(
3425        &mut self,
3426        repeating: bool,
3427        hostname: String,
3428        next_delay: u32,
3429        listener: Sender<HostnameResolutionEvent>,
3430        timeout: Option<u64>,
3431    ) {
3432        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3433        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3434            "{} on addrs {:?}",
3435            &hostname, &addr_list
3436        ))) {
3437            debug!(
3438                "Failed to send ResolveStarted({})(repeating:{}): {}",
3439                &hostname, repeating, e
3440            );
3441            return;
3442        }
3443        if !repeating {
3444            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3445            // if we already have the records in our cache, just send them
3446            self.query_cache_for_hostname(&hostname, listener.clone());
3447        }
3448
3449        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3450        self.increase_counter(Counter::ResolveHostname, 1);
3451
3452        let now = current_time_millis();
3453        let next_time = now + u64::from(next_delay) * 1000;
3454        let max_delay = 60 * 60;
3455        let delay = cmp::min(next_delay * 2, max_delay);
3456
3457        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3458        if self
3459            .hostname_resolvers
3460            .get(&hostname)
3461            .and_then(|(_sender, timeout)| *timeout)
3462            .map(|timeout| next_time < timeout)
3463            .unwrap_or(true)
3464        {
3465            self.add_retransmission(
3466                next_time,
3467                Command::ResolveHostname(hostname, delay, listener, None),
3468            );
3469        }
3470    }
3471
3472    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3473        let pending_query = self.query_unresolved(&instance);
3474        let max_try = 3;
3475        if pending_query && try_count < max_try {
3476            // Note that if the current try already succeeds, the next retransmission
3477            // will be no-op as the cache has been updated.
3478            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3479            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3480        }
3481    }
3482
3483    fn exec_command_unregister(
3484        &mut self,
3485        repeating: bool,
3486        fullname: String,
3487        resp_s: Sender<UnregisterStatus>,
3488    ) {
3489        let response = match self.my_services.remove_entry(&fullname) {
3490            None => {
3491                debug!("unregister: cannot find such service {}", &fullname);
3492                UnregisterStatus::NotFound
3493            }
3494            Some((_k, info)) => {
3495                let mut timers = Vec::new();
3496
3497                for (if_index, intf) in self.my_intfs.iter() {
3498                    if let Some(sock) = self.ipv4_sock.as_ref() {
3499                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3500                        // repeat for one time just in case some peers miss the message
3501                        if !repeating && !packet.is_empty() {
3502                            let next_time = current_time_millis() + 120;
3503                            self.retransmissions.push(ReRun {
3504                                next_time,
3505                                command: Command::UnregisterResend(packet, *if_index, true),
3506                            });
3507                            timers.push(next_time);
3508                        }
3509                    }
3510
3511                    // ipv6
3512                    if let Some(sock) = self.ipv6_sock.as_ref() {
3513                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3514                        if !repeating && !packet.is_empty() {
3515                            let next_time = current_time_millis() + 120;
3516                            self.retransmissions.push(ReRun {
3517                                next_time,
3518                                command: Command::UnregisterResend(packet, *if_index, false),
3519                            });
3520                            timers.push(next_time);
3521                        }
3522                    }
3523                }
3524
3525                for t in timers {
3526                    self.add_timer(t);
3527                }
3528
3529                self.increase_counter(Counter::Unregister, 1);
3530                UnregisterStatus::OK
3531            }
3532        };
3533        if let Err(e) = resp_s.send(response) {
3534            debug!("unregister: failed to send response: {}", e);
3535        }
3536    }
3537
3538    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3539        let Some(intf) = self.my_intfs.get(&if_index) else {
3540            return;
3541        };
3542        let sock_opt = if is_ipv4 {
3543            &self.ipv4_sock
3544        } else {
3545            &self.ipv6_sock
3546        };
3547        let Some(sock) = sock_opt else {
3548            return;
3549        };
3550
3551        let if_addr = if is_ipv4 {
3552            match intf.next_ifaddr_v4() {
3553                Some(addr) => addr,
3554                None => return,
3555            }
3556        } else {
3557            match intf.next_ifaddr_v6() {
3558                Some(addr) => addr,
3559                None => return,
3560            }
3561        };
3562
3563        debug!("UnregisterResend from {:?}", if_addr);
3564        multicast_on_intf(
3565            &packet[..],
3566            &intf.name,
3567            intf.index,
3568            if_addr,
3569            &sock.pktinfo,
3570            self.port,
3571        );
3572
3573        self.increase_counter(Counter::UnregisterResend, 1);
3574    }
3575
3576    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3577        match self.service_queriers.remove_entry(&ty_domain) {
3578            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3579            Some((ty, sender)) => {
3580                // Remove pending browse commands in the reruns.
3581                trace!("StopBrowse: removed queryer for {}", &ty);
3582                let mut i = 0;
3583                while i < self.retransmissions.len() {
3584                    if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3585                        if t == &ty {
3586                            self.retransmissions.remove(i);
3587                            trace!("StopBrowse: removed retransmission for {}", &ty);
3588                            continue;
3589                        }
3590                    }
3591                    i += 1;
3592                }
3593
3594                // Remove cache entries.
3595                self.cache.remove_service_type(&ty_domain);
3596
3597                // Notify the client.
3598                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3599                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3600                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3601                }
3602            }
3603        }
3604    }
3605
3606    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3607        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3608            // Remove pending resolve commands in the reruns.
3609            trace!("StopResolve: removed queryer for {}", &host);
3610            let mut i = 0;
3611            while i < self.retransmissions.len() {
3612                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3613                    if t == &host {
3614                        self.retransmissions.remove(i);
3615                        trace!("StopResolve: removed retransmission for {}", &host);
3616                        continue;
3617                    }
3618                }
3619                i += 1;
3620            }
3621
3622            // Notify the client.
3623            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3624                Ok(()) => trace!("Sent SearchStopped to the listener"),
3625                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3626            }
3627        }
3628    }
3629
3630    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3631        let Some(info) = self.my_services.get_mut(&fullname) else {
3632            trace!("announce: cannot find such service {}", &fullname);
3633            return Ok(());
3634        };
3635
3636        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3637            return Ok(());
3638        };
3639
3640        let Some(intf) = self.my_intfs.get(&if_index) else {
3641            return Ok(());
3642        };
3643
3644        let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3645            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3646        } else {
3647            false
3648        };
3649        let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3650            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3651        } else {
3652            false
3653        };
3654
3655        if announced_v4 || announced_v6 {
3656            let mut hostname = info.get_hostname();
3657            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3658                hostname = new_name;
3659            }
3660            let service_name = match dns_registry.name_changes.get(&fullname) {
3661                Some(new_name) => new_name.to_string(),
3662                None => fullname,
3663            };
3664
3665            debug!("resend: announce service {service_name} on {}", intf.name);
3666
3667            notify_monitors(
3668                &mut self.monitors,
3669                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3670            );
3671            info.set_status(if_index, ServiceStatus::Announced);
3672        } else {
3673            debug!("register-resend should not fail");
3674        }
3675
3676        self.increase_counter(Counter::RegisterResend, 1);
3677        Ok(())
3678    }
3679
3680    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3681        /*
3682        RFC 6762 section 10.4:
3683        ...
3684        When the cache receives this hint that it should reconfirm some
3685        record, it MUST issue two or more queries for the resource record in
3686        dispute.  If no response is received within ten seconds, then, even
3687        though its TTL may indicate that it is not yet due to expire, that
3688        record SHOULD be promptly flushed from the cache.
3689        */
3690        let now = current_time_millis();
3691        let expire_at = if repeating {
3692            None
3693        } else {
3694            Some(now + timeout.as_millis() as u64)
3695        };
3696
3697        // send query for the resource records.
3698        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3699
3700        if !record_vec.is_empty() {
3701            let query_vec: Vec<(&str, RRType)> = record_vec
3702                .iter()
3703                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3704                .collect();
3705            self.send_query_vec(&query_vec);
3706
3707            if let Some(new_expire) = expire_at {
3708                self.add_timer(new_expire); // ensure a check for the new expire time.
3709
3710                // schedule a resend 1 second later
3711                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3712            }
3713        }
3714    }
3715
3716    /// Refresh cached service records with active queriers
3717    fn refresh_active_services(&mut self) {
3718        let mut query_ptr_count = 0;
3719        let mut query_srv_count = 0;
3720        let mut new_timers = HashSet::new();
3721        let mut query_addr_count = 0;
3722
3723        for (ty_domain, _sender) in self.service_queriers.iter() {
3724            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3725            if !refreshed_timers.is_empty() {
3726                trace!("sending refresh query for PTR: {}", ty_domain);
3727                self.send_query(ty_domain, RRType::PTR);
3728                query_ptr_count += 1;
3729                new_timers.extend(refreshed_timers);
3730            }
3731
3732            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3733            for (instance, types) in instances {
3734                trace!("sending refresh query for: {}", &instance);
3735                let query_vec = types
3736                    .into_iter()
3737                    .map(|ty| (instance.as_str(), ty))
3738                    .collect::<Vec<_>>();
3739                self.send_query_vec(&query_vec);
3740                query_srv_count += 1;
3741            }
3742            new_timers.extend(timers);
3743            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3744            for hostname in hostnames.iter() {
3745                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3746                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3747                query_addr_count += 2;
3748            }
3749            new_timers.extend(timers);
3750        }
3751
3752        for timer in new_timers {
3753            self.add_timer(timer);
3754        }
3755
3756        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3757        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3758        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3759    }
3760}
3761
3762/// Adds one or more answers of a service for incoming msg and RR entry name.
3763fn add_answer_of_service(
3764    out: &mut DnsOutgoing,
3765    msg: &DnsIncoming,
3766    entry_name: &str,
3767    service: &ServiceInfo,
3768    qtype: RRType,
3769    intf_addrs: Vec<IpAddr>,
3770) {
3771    if qtype == RRType::SRV || qtype == RRType::ANY {
3772        out.add_answer(
3773            msg,
3774            DnsSrv::new(
3775                entry_name,
3776                CLASS_IN | CLASS_CACHE_FLUSH,
3777                service.get_host_ttl(),
3778                service.get_priority(),
3779                service.get_weight(),
3780                service.get_port(),
3781                service.get_hostname().to_string(),
3782            ),
3783        );
3784    }
3785
3786    if qtype == RRType::TXT || qtype == RRType::ANY {
3787        out.add_answer(
3788            msg,
3789            DnsTxt::new(
3790                entry_name,
3791                CLASS_IN | CLASS_CACHE_FLUSH,
3792                service.get_other_ttl(),
3793                service.generate_txt(),
3794            ),
3795        );
3796    }
3797
3798    if qtype == RRType::SRV {
3799        for address in intf_addrs {
3800            out.add_additional_answer(DnsAddress::new(
3801                service.get_hostname(),
3802                ip_address_rr_type(&address),
3803                CLASS_IN | CLASS_CACHE_FLUSH,
3804                service.get_host_ttl(),
3805                address,
3806                InterfaceId::default(),
3807            ));
3808        }
3809    }
3810}
3811
3812/// All possible events sent to the client from the daemon
3813/// regarding service discovery.
3814#[derive(Clone, Debug)]
3815#[non_exhaustive]
3816pub enum ServiceEvent {
3817    /// Started searching for a service type.
3818    SearchStarted(String),
3819
3820    /// Found a specific (service_type, fullname).
3821    ServiceFound(String, String),
3822
3823    /// Resolved a service instance in a ResolvedService struct.
3824    ServiceResolved(Box<ResolvedService>),
3825
3826    /// A service instance (service_type, fullname) was removed.
3827    ServiceRemoved(String, String),
3828
3829    /// Stopped searching for a service type.
3830    SearchStopped(String),
3831}
3832
3833/// All possible events sent to the client from the daemon
3834/// regarding host resolution.
3835#[derive(Clone, Debug)]
3836#[non_exhaustive]
3837pub enum HostnameResolutionEvent {
3838    /// Started searching for the ip address of a hostname.
3839    SearchStarted(String),
3840    /// One or more addresses for a hostname has been found.
3841    AddressesFound(String, HashSet<ScopedIp>),
3842    /// One or more addresses for a hostname has been removed.
3843    AddressesRemoved(String, HashSet<ScopedIp>),
3844    /// The search for the ip address of a hostname has timed out.
3845    SearchTimeout(String),
3846    /// Stopped searching for the ip address of a hostname.
3847    SearchStopped(String),
3848}
3849
3850/// Some notable events from the daemon besides [`ServiceEvent`].
3851/// These events are expected to happen infrequently.
3852#[derive(Clone, Debug)]
3853#[non_exhaustive]
3854pub enum DaemonEvent {
3855    /// Daemon unsolicitly announced a service from an interface.
3856    Announce(String, String),
3857
3858    /// Daemon encountered an error.
3859    Error(Error),
3860
3861    /// Daemon detected a new IP address from the host.
3862    IpAdd(IpAddr),
3863
3864    /// Daemon detected a IP address removed from the host.
3865    IpDel(IpAddr),
3866
3867    /// Daemon resolved a name conflict by changing one of its names.
3868    /// see [DnsNameChange] for more details.
3869    NameChange(DnsNameChange),
3870
3871    /// Send out a multicast response via an interface.
3872    Respond(String),
3873}
3874
3875/// Represents a name change due to a name conflict resolution.
3876/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3877#[derive(Clone, Debug)]
3878pub struct DnsNameChange {
3879    /// The original name set in `ServiceInfo` by the user.
3880    pub original: String,
3881
3882    /// A new name is created by appending a suffix after the original name.
3883    ///
3884    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3885    /// - for a host name, the suffix is `-N`, where N starts at 2.
3886    ///
3887    /// For example:
3888    ///
3889    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3890    /// - Host name `foo.local.` becomes `foo-2.local.`
3891    pub new_name: String,
3892
3893    /// The resource record type
3894    pub rr_type: RRType,
3895
3896    /// The interface where the name conflict and its change happened.
3897    pub intf_name: String,
3898}
3899
3900/// Commands supported by the daemon
3901#[derive(Debug)]
3902enum Command {
3903    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3904    Browse(String, u32, bool, Sender<ServiceEvent>),
3905
3906    /// Resolve a hostname to IP addresses.
3907    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3908
3909    /// Register a service
3910    Register(Box<ServiceInfo>),
3911
3912    /// Unregister a service
3913    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3914
3915    /// Announce again a service to local network
3916    RegisterResend(String, u32), // (fullname)
3917
3918    /// Resend unregister packet.
3919    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
3920
3921    /// Stop browsing a service type
3922    StopBrowse(String), // (ty_domain)
3923
3924    /// Stop resolving a hostname
3925    StopResolveHostname(String), // (hostname)
3926
3927    /// Send query to resolve a service instance.
3928    /// This is used when a PTR record exists but SRV & TXT records are missing.
3929    Resolve(String, u16), // (service_instance_fullname, try_count)
3930
3931    /// Read the current values of the counters
3932    GetMetrics(Sender<Metrics>),
3933
3934    /// Get the current status of the daemon.
3935    GetStatus(Sender<DaemonStatus>),
3936
3937    /// Monitor noticeable events in the daemon.
3938    Monitor(Sender<DaemonEvent>),
3939
3940    SetOption(DaemonOption),
3941
3942    GetOption(Sender<DaemonOptionVal>),
3943
3944    /// Proactively confirm a DNS resource record.
3945    ///
3946    /// The intention is to check if a service name or IP address still valid
3947    /// before its TTL expires.
3948    Verify(String, Duration),
3949
3950    /// Invalidate some interface addresses.
3951    InvalidIntfAddrs(HashSet<Interface>),
3952
3953    Exit(Sender<DaemonStatus>),
3954}
3955
3956impl fmt::Display for Command {
3957    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3958        match self {
3959            Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3960            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3961            Self::Exit(_) => write!(f, "Command Exit"),
3962            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3963            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3964            Self::Monitor(_) => write!(f, "Command Monitor"),
3965            Self::Register(_) => write!(f, "Command Register"),
3966            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3967            Self::SetOption(_) => write!(f, "Command SetOption"),
3968            Self::GetOption(_) => write!(f, "Command GetOption"),
3969            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3970            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3971            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3972            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3973            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3974            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3975            Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
3976        }
3977    }
3978}
3979
3980struct DaemonOptionVal {
3981    _service_name_len_max: u8,
3982    ip_check_interval: u64,
3983}
3984
3985#[derive(Debug)]
3986enum DaemonOption {
3987    ServiceNameLenMax(u8),
3988    IpCheckInterval(u64),
3989    EnableInterface(Vec<IfKind>),
3990    DisableInterface(Vec<IfKind>),
3991    MulticastLoopV4(bool),
3992    MulticastLoopV6(bool),
3993    AcceptUnsolicited(bool),
3994    IncludeAppleP2P(bool),
3995    #[cfg(test)]
3996    TestDownInterface(String),
3997    #[cfg(test)]
3998    TestUpInterface(String),
3999}
4000
4001/// The length of Service Domain name supported in this lib.
4002const DOMAIN_LEN: usize = "._tcp.local.".len();
4003
4004/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
4005fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
4006    if ty_domain.len() <= DOMAIN_LEN + 1 {
4007        // service name cannot be empty or only '_'.
4008        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
4009    }
4010
4011    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
4012    if service_name_len > limit as usize {
4013        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
4014    }
4015    Ok(())
4016}
4017
4018/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
4019fn check_domain_suffix(name: &str) -> Result<()> {
4020    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
4021        return Err(e_fmt!(
4022            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
4023            name
4024        ));
4025    }
4026
4027    Ok(())
4028}
4029
4030/// Validate the service name in a fully qualified name.
4031///
4032/// A Full Name = <Instance>.<Service>.<Domain>
4033/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
4034///
4035/// Note: this function does not check for the length of the service name.
4036/// Instead, `register_service` method will check the length.
4037fn check_service_name(fullname: &str) -> Result<()> {
4038    check_domain_suffix(fullname)?;
4039
4040    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
4041    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
4042
4043    if &name[0..1] != "_" {
4044        return Err(e_fmt!("Service name must start with '_'"));
4045    }
4046
4047    let name = &name[1..];
4048
4049    if name.contains("--") {
4050        return Err(e_fmt!("Service name must not contain '--'"));
4051    }
4052
4053    if name.starts_with('-') || name.ends_with('-') {
4054        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4055    }
4056
4057    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4058    if ascii_count < 1 {
4059        return Err(e_fmt!(
4060            "Service name must contain at least one letter (eg: 'A-Za-z')"
4061        ));
4062    }
4063
4064    Ok(())
4065}
4066
4067/// Validate a hostname.
4068fn check_hostname(hostname: &str) -> Result<()> {
4069    if !hostname.ends_with(".local.") {
4070        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4071    }
4072
4073    if hostname == ".local." {
4074        return Err(e_fmt!(
4075            "The part of the hostname before '.local.' cannot be empty"
4076        ));
4077    }
4078
4079    if hostname.len() > 255 {
4080        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4081    }
4082
4083    Ok(())
4084}
4085
4086fn call_service_listener(
4087    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4088    ty_domain: &str,
4089    event: ServiceEvent,
4090) {
4091    if let Some(listener) = listeners_map.get(ty_domain) {
4092        match listener.send(event) {
4093            Ok(()) => trace!("Sent event to listener successfully"),
4094            Err(e) => debug!("Failed to send event: {}", e),
4095        }
4096    }
4097}
4098
4099fn call_hostname_resolution_listener(
4100    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4101    hostname: &str,
4102    event: HostnameResolutionEvent,
4103) {
4104    let hostname_lower = hostname.to_lowercase();
4105    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4106        match listener.send(event) {
4107            Ok(()) => trace!("Sent event to listener successfully"),
4108            Err(e) => debug!("Failed to send event: {}", e),
4109        }
4110    }
4111}
4112
4113/// Returns valid network interfaces in the host system.
4114/// Operational down interfaces are excluded.
4115/// Loopback interfaces are excluded if `with_loopback` is false.
4116fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4117    my_ip_interfaces_inner(with_loopback, false)
4118}
4119
4120fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4121    if_addrs::get_if_addrs()
4122        .unwrap_or_default()
4123        .into_iter()
4124        .filter(|i| {
4125            i.is_oper_up()
4126                && !i.is_p2p()
4127                && (!i.is_loopback() || with_loopback)
4128                && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4129        })
4130        .collect()
4131}
4132
4133/// Checks if the interface name indicates it's an Apple peer-to-peer interface,
4134/// which should be ignored by default.
4135fn is_apple_p2p_by_name(name: &str) -> bool {
4136    let p2p_prefixes = ["awdl", "llw"];
4137    p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4138}
4139
4140/// Send an outgoing mDNS query or response, and returns the packet bytes.
4141/// Returns empty vec if no valid interface address is found.
4142fn send_dns_outgoing(
4143    out: &DnsOutgoing,
4144    my_intf: &MyIntf,
4145    sock: &PktInfoUdpSocket,
4146    port: u16,
4147) -> MyResult<Vec<Vec<u8>>> {
4148    let if_name = &my_intf.name;
4149
4150    let if_addr = if sock.domain() == Domain::IPV4 {
4151        match my_intf.next_ifaddr_v4() {
4152            Some(addr) => addr,
4153            None => return Ok(vec![]),
4154        }
4155    } else {
4156        match my_intf.next_ifaddr_v6() {
4157            Some(addr) => addr,
4158            None => return Ok(vec![]),
4159        }
4160    };
4161
4162    send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock, port)
4163}
4164
4165/// Send an outgoing mDNS query or response, and returns the packet bytes.
4166fn send_dns_outgoing_impl(
4167    out: &DnsOutgoing,
4168    if_name: &str,
4169    if_index: u32,
4170    if_addr: &IfAddr,
4171    sock: &PktInfoUdpSocket,
4172    port: u16,
4173) -> MyResult<Vec<Vec<u8>>> {
4174    let qtype = if out.is_query() {
4175        "query"
4176    } else {
4177        if out.answers_count() == 0 && out.additionals().is_empty() {
4178            return Ok(vec![]); // no need to send empty response
4179        }
4180        "response"
4181    };
4182    trace!(
4183        "send {}: {} questions {} answers {} authorities {} additional",
4184        qtype,
4185        out.questions().len(),
4186        out.answers_count(),
4187        out.authorities().len(),
4188        out.additionals().len()
4189    );
4190
4191    match if_addr.ip() {
4192        IpAddr::V4(ipv4) => {
4193            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4194                debug!(
4195                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4196                    ipv4, e
4197                );
4198                // cannot send without a valid interface
4199                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4200                    let intf_addr = Interface {
4201                        name: if_name.to_string(),
4202                        addr: if_addr.clone(),
4203                        index: Some(if_index),
4204                        oper_status: if_addrs::IfOperStatus::Down,
4205                        is_p2p: false,
4206                        #[cfg(windows)]
4207                        adapter_name: String::new(),
4208                    };
4209                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4210                }
4211                return Ok(vec![]); // non-fatal other failure
4212            }
4213        }
4214        IpAddr::V6(ipv6) => {
4215            if let Err(e) = sock.set_multicast_if_v6(if_index) {
4216                debug!(
4217                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4218                    ipv6, e
4219                );
4220                // cannot send without a valid interface
4221                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4222                    let intf_addr = Interface {
4223                        name: if_name.to_string(),
4224                        addr: if_addr.clone(),
4225                        index: Some(if_index),
4226                        oper_status: if_addrs::IfOperStatus::Down,
4227                        is_p2p: false,
4228                        #[cfg(windows)]
4229                        adapter_name: String::new(),
4230                    };
4231                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4232                }
4233                return Ok(vec![]); // non-fatal other failure
4234            }
4235        }
4236    }
4237
4238    let packet_list = out.to_data_on_wire();
4239    for packet in packet_list.iter() {
4240        multicast_on_intf(packet, if_name, if_index, if_addr, sock, port);
4241    }
4242    Ok(packet_list)
4243}
4244
4245/// Sends a multicast packet, and returns the packet bytes.
4246fn multicast_on_intf(
4247    packet: &[u8],
4248    if_name: &str,
4249    if_index: u32,
4250    if_addr: &IfAddr,
4251    socket: &PktInfoUdpSocket,
4252    port: u16,
4253) {
4254    if packet.len() > MAX_MSG_ABSOLUTE {
4255        debug!("Drop over-sized packet ({})", packet.len());
4256        return;
4257    }
4258
4259    let addr: SocketAddr = match if_addr {
4260        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4261        if_addrs::IfAddr::V6(_) => {
4262            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4263            sock.set_scope_id(if_index); // Choose iface for multicast
4264            sock.into()
4265        }
4266    };
4267
4268    // Sends out `packet` to `addr` on the socket.
4269    let sock_addr = addr.into();
4270    match socket.send_to(packet, &sock_addr) {
4271        Ok(sz) => trace!(
4272            "sent out {} bytes on interface {} (idx {}) addr {}",
4273            sz,
4274            if_name,
4275            if_index,
4276            if_addr.ip()
4277        ),
4278        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4279    }
4280}
4281
4282/// Returns true if `name` is a valid instance name of format:
4283/// <instance>.<service_type>.<_udp|_tcp>.local.
4284/// Note: <instance> could contain '.' as well.
4285fn valid_instance_name(name: &str) -> bool {
4286    name.split('.').count() >= 5
4287}
4288
4289fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4290    monitors.retain(|sender| {
4291        if let Err(e) = sender.try_send(event.clone()) {
4292            debug!("notify_monitors: try_send: {}", &e);
4293            if matches!(e, TrySendError::Disconnected(_)) {
4294                return false; // This monitor is dropped.
4295            }
4296        }
4297        true
4298    });
4299}
4300
4301/// Check if all unique records passed "probing", and if yes, create a packet
4302/// to announce the service.
4303fn prepare_announce(
4304    info: &ServiceInfo,
4305    intf: &MyIntf,
4306    dns_registry: &mut DnsRegistry,
4307    is_ipv4: bool,
4308) -> Option<DnsOutgoing> {
4309    let intf_addrs = if is_ipv4 {
4310        info.get_addrs_on_my_intf_v4(intf)
4311    } else {
4312        info.get_addrs_on_my_intf_v6(intf)
4313    };
4314
4315    if intf_addrs.is_empty() {
4316        debug!(
4317            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4318            &intf.name
4319        );
4320        return None;
4321    }
4322
4323    // check if we changed our name due to conflicts.
4324    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
4325        Some(new_name) => new_name,
4326        None => info.get_fullname(),
4327    };
4328
4329    debug!(
4330        "prepare to announce service {service_fullname} on {:?}",
4331        &intf_addrs
4332    );
4333
4334    let mut probing_count = 0;
4335    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4336    let create_time = current_time_millis() + fastrand::u64(0..250);
4337
4338    out.add_answer_at_time(
4339        DnsPointer::new(
4340            info.get_type(),
4341            RRType::PTR,
4342            CLASS_IN,
4343            info.get_other_ttl(),
4344            service_fullname.to_string(),
4345        ),
4346        0,
4347    );
4348
4349    if let Some(sub) = info.get_subtype() {
4350        trace!("Adding subdomain {}", sub);
4351        out.add_answer_at_time(
4352            DnsPointer::new(
4353                sub,
4354                RRType::PTR,
4355                CLASS_IN,
4356                info.get_other_ttl(),
4357                service_fullname.to_string(),
4358            ),
4359            0,
4360        );
4361    }
4362
4363    // SRV records.
4364    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
4365        Some(new_name) => new_name.to_string(),
4366        None => info.get_hostname().to_string(),
4367    };
4368
4369    let mut srv = DnsSrv::new(
4370        info.get_fullname(),
4371        CLASS_IN | CLASS_CACHE_FLUSH,
4372        info.get_host_ttl(),
4373        info.get_priority(),
4374        info.get_weight(),
4375        info.get_port(),
4376        hostname,
4377    );
4378
4379    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4380        srv.get_record_mut().set_new_name(new_name.to_string());
4381    }
4382
4383    if !info.requires_probe()
4384        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4385    {
4386        out.add_answer_at_time(srv, 0);
4387    } else {
4388        probing_count += 1;
4389    }
4390
4391    // TXT records.
4392
4393    let mut txt = DnsTxt::new(
4394        info.get_fullname(),
4395        CLASS_IN | CLASS_CACHE_FLUSH,
4396        info.get_other_ttl(),
4397        info.generate_txt(),
4398    );
4399
4400    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4401        txt.get_record_mut().set_new_name(new_name.to_string());
4402    }
4403
4404    if !info.requires_probe()
4405        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4406    {
4407        out.add_answer_at_time(txt, 0);
4408    } else {
4409        probing_count += 1;
4410    }
4411
4412    // Address records. (A and AAAA)
4413
4414    let hostname = info.get_hostname();
4415    for address in intf_addrs {
4416        let mut dns_addr = DnsAddress::new(
4417            hostname,
4418            ip_address_rr_type(&address),
4419            CLASS_IN | CLASS_CACHE_FLUSH,
4420            info.get_host_ttl(),
4421            address,
4422            intf.into(),
4423        );
4424
4425        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4426            dns_addr.get_record_mut().set_new_name(new_name.to_string());
4427        }
4428
4429        if !info.requires_probe()
4430            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4431        {
4432            out.add_answer_at_time(dns_addr, 0);
4433        } else {
4434            probing_count += 1;
4435        }
4436    }
4437
4438    if probing_count > 0 {
4439        return None;
4440    }
4441
4442    Some(out)
4443}
4444
4445/// Send an unsolicited response for owned service via `intf` and `sock`.
4446/// Returns true if sent out successfully for IPv4 or IPv6.
4447fn announce_service_on_intf(
4448    dns_registry: &mut DnsRegistry,
4449    info: &ServiceInfo,
4450    intf: &MyIntf,
4451    sock: &PktInfoUdpSocket,
4452    port: u16,
4453) -> MyResult<bool> {
4454    let is_ipv4 = sock.domain() == Domain::IPV4;
4455    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4456        let _ = send_dns_outgoing(&out, intf, sock, port)?;
4457        return Ok(true);
4458    }
4459
4460    Ok(false)
4461}
4462
4463/// Returns a new name based on the `original` to avoid conflicts.
4464/// If the name already contains a number in parentheses, increments that number.
4465///
4466/// Examples:
4467/// - `foo.local.` becomes `foo (2).local.`
4468/// - `foo (2).local.` becomes `foo (3).local.`
4469/// - `foo (9)` becomes `foo (10)`
4470fn name_change(original: &str) -> String {
4471    let mut parts: Vec<_> = original.split('.').collect();
4472    let Some(first_part) = parts.get_mut(0) else {
4473        return format!("{original} (2)");
4474    };
4475
4476    let mut new_name = format!("{first_part} (2)");
4477
4478    // check if there is already has `(<num>)` suffix.
4479    if let Some(paren_pos) = first_part.rfind(" (") {
4480        // Check if there's a closing parenthesis
4481        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4482            let absolute_end_pos = paren_pos + end_paren;
4483            // Only process if the closing parenthesis is the last character
4484            if absolute_end_pos == first_part.len() - 1 {
4485                let num_start = paren_pos + 2; // Skip " ("
4486                                               // Try to parse the number between parentheses
4487                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4488                    let base_name = &first_part[..paren_pos];
4489                    new_name = format!("{} ({})", base_name, number + 1)
4490                }
4491            }
4492        }
4493    }
4494
4495    *first_part = &new_name;
4496    parts.join(".")
4497}
4498
4499/// Returns a new name based on the `original` to avoid conflicts.
4500/// If the name already contains a hyphenated number, increments that number.
4501///
4502/// Examples:
4503/// - `foo.local.` becomes `foo-2.local.`
4504/// - `foo-2.local.` becomes `foo-3.local.`
4505/// - `foo` becomes `foo-2`
4506fn hostname_change(original: &str) -> String {
4507    let mut parts: Vec<_> = original.split('.').collect();
4508    let Some(first_part) = parts.get_mut(0) else {
4509        return format!("{original}-2");
4510    };
4511
4512    let mut new_name = format!("{first_part}-2");
4513
4514    // check if there is already a `-<num>` suffix
4515    if let Some(hyphen_pos) = first_part.rfind('-') {
4516        // Try to parse everything after the hyphen as a number
4517        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4518            let base_name = &first_part[..hyphen_pos];
4519            new_name = format!("{}-{}", base_name, number + 1);
4520        }
4521    }
4522
4523    *first_part = &new_name;
4524    parts.join(".")
4525}
4526
4527fn add_answer_with_additionals(
4528    out: &mut DnsOutgoing,
4529    msg: &DnsIncoming,
4530    service: &ServiceInfo,
4531    intf: &MyIntf,
4532    dns_registry: &DnsRegistry,
4533    is_ipv4: bool,
4534) {
4535    let intf_addrs = if is_ipv4 {
4536        service.get_addrs_on_my_intf_v4(intf)
4537    } else {
4538        service.get_addrs_on_my_intf_v6(intf)
4539    };
4540    if intf_addrs.is_empty() {
4541        trace!("No addrs on LAN of intf {:?}", intf);
4542        return;
4543    }
4544
4545    // check if we changed our name due to conflicts.
4546    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4547        Some(new_name) => new_name,
4548        None => service.get_fullname(),
4549    };
4550
4551    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4552        Some(new_name) => new_name,
4553        None => service.get_hostname(),
4554    };
4555
4556    let ptr_added = out.add_answer(
4557        msg,
4558        DnsPointer::new(
4559            service.get_type(),
4560            RRType::PTR,
4561            CLASS_IN,
4562            service.get_other_ttl(),
4563            service_fullname.to_string(),
4564        ),
4565    );
4566
4567    if !ptr_added {
4568        trace!("answer was not added for msg {:?}", msg);
4569        return;
4570    }
4571
4572    if let Some(sub) = service.get_subtype() {
4573        trace!("Adding subdomain {}", sub);
4574        out.add_additional_answer(DnsPointer::new(
4575            sub,
4576            RRType::PTR,
4577            CLASS_IN,
4578            service.get_other_ttl(),
4579            service_fullname.to_string(),
4580        ));
4581    }
4582
4583    // Add recommended additional answers according to
4584    // https://tools.ietf.org/html/rfc6763#section-12.1.
4585    out.add_additional_answer(DnsSrv::new(
4586        service_fullname,
4587        CLASS_IN | CLASS_CACHE_FLUSH,
4588        service.get_host_ttl(),
4589        service.get_priority(),
4590        service.get_weight(),
4591        service.get_port(),
4592        hostname.to_string(),
4593    ));
4594
4595    out.add_additional_answer(DnsTxt::new(
4596        service_fullname,
4597        CLASS_IN | CLASS_CACHE_FLUSH,
4598        service.get_other_ttl(),
4599        service.generate_txt(),
4600    ));
4601
4602    for address in intf_addrs {
4603        out.add_additional_answer(DnsAddress::new(
4604            hostname,
4605            ip_address_rr_type(&address),
4606            CLASS_IN | CLASS_CACHE_FLUSH,
4607            service.get_host_ttl(),
4608            address,
4609            intf.into(),
4610        ));
4611    }
4612}
4613
4614/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4615/// that are finished.
4616fn check_probing(
4617    dns_registry: &mut DnsRegistry,
4618    timers: &mut BinaryHeap<Reverse<u64>>,
4619    now: u64,
4620) -> (DnsOutgoing, Vec<String>) {
4621    let mut expired_probes = Vec::new();
4622    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4623
4624    for (name, probe) in dns_registry.probing.iter_mut() {
4625        if now >= probe.next_send {
4626            if probe.expired(now) {
4627                // move the record to active
4628                expired_probes.push(name.clone());
4629            } else {
4630                out.add_question(name, RRType::ANY);
4631
4632                /*
4633                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4634                ...
4635                for tiebreaking to work correctly in all
4636                cases, the Authority Section must contain *all* the records and
4637                proposed rdata being probed for uniqueness.
4638                    */
4639                for record in probe.records.iter() {
4640                    out.add_authority(record.clone());
4641                }
4642
4643                probe.update_next_send(now);
4644
4645                // add timer
4646                timers.push(Reverse(probe.next_send));
4647            }
4648        }
4649    }
4650
4651    (out, expired_probes)
4652}
4653
4654/// Process expired probes on an interface and return a list of services
4655/// that are waiting for the probe to finish.
4656///
4657/// `DnsNameChange` events are sent to the monitors.
4658fn handle_expired_probes(
4659    expired_probes: Vec<String>,
4660    intf_name: &str,
4661    dns_registry: &mut DnsRegistry,
4662    monitors: &mut Vec<Sender<DaemonEvent>>,
4663) -> HashSet<String> {
4664    let mut waiting_services = HashSet::new();
4665
4666    for name in expired_probes {
4667        let Some(probe) = dns_registry.probing.remove(&name) else {
4668            continue;
4669        };
4670
4671        // send notifications about name changes
4672        for record in probe.records.iter() {
4673            if let Some(new_name) = record.get_record().get_new_name() {
4674                dns_registry
4675                    .name_changes
4676                    .insert(name.clone(), new_name.to_string());
4677
4678                let event = DnsNameChange {
4679                    original: record.get_record().get_original_name().to_string(),
4680                    new_name: new_name.to_string(),
4681                    rr_type: record.get_type(),
4682                    intf_name: intf_name.to_string(),
4683                };
4684                debug!("Name change event: {:?}", &event);
4685                notify_monitors(monitors, DaemonEvent::NameChange(event));
4686            }
4687        }
4688
4689        // move RR from probe to active.
4690        debug!(
4691            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4692            probe.records.len(),
4693            probe.waiting_services.len(),
4694        );
4695
4696        // Move records to active and plan to wake up services if records are not empty.
4697        if !probe.records.is_empty() {
4698            match dns_registry.active.get_mut(&name) {
4699                Some(records) => {
4700                    records.extend(probe.records);
4701                }
4702                None => {
4703                    dns_registry.active.insert(name, probe.records);
4704                }
4705            }
4706
4707            waiting_services.extend(probe.waiting_services);
4708        }
4709    }
4710
4711    waiting_services
4712}
4713
4714/// Resolves `IfKind::Addr(ip)` to `IndexV4(if_index)` or `IndexV6(if_index)`.
4715fn resolve_addr_to_index(if_kind: IfKind, interfaces: &[Interface]) -> IfKind {
4716    if let IfKind::Addr(addr) = &if_kind {
4717        if let Some(intf) = interfaces.iter().find(|intf| &intf.ip() == addr) {
4718            let if_index = intf.index.unwrap_or(0);
4719            return if addr.is_ipv4() {
4720                IfKind::IndexV4(if_index)
4721            } else {
4722                IfKind::IndexV6(if_index)
4723            };
4724        }
4725    }
4726    if_kind
4727}
4728
4729#[cfg(test)]
4730mod tests {
4731    use super::{
4732        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4733        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4734        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, MDNS_PORT,
4735    };
4736    use crate::{
4737        dns_parser::{
4738            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4739            FLAGS_AA, FLAGS_QR_RESPONSE,
4740        },
4741        service_daemon::{add_answer_of_service, check_hostname},
4742    };
4743    use std::time::{Duration, SystemTime};
4744    use test_log::test;
4745
4746    #[test]
4747    fn test_instance_name() {
4748        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4749        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4750        assert!(!valid_instance_name("_printer._tcp.local."));
4751    }
4752
4753    #[test]
4754    fn test_check_service_name_length() {
4755        let result = check_service_name_length("_tcp", 100);
4756        assert!(result.is_err());
4757        if let Err(e) = result {
4758            println!("{}", e);
4759        }
4760    }
4761
4762    #[test]
4763    fn test_check_hostname() {
4764        // valid hostnames
4765        for hostname in &[
4766            "my_host.local.",
4767            &("A".repeat(255 - ".local.".len()) + ".local."),
4768        ] {
4769            let result = check_hostname(hostname);
4770            assert!(result.is_ok());
4771        }
4772
4773        // erroneous hostnames
4774        for hostname in &[
4775            "my_host.local",
4776            ".local.",
4777            &("A".repeat(256 - ".local.".len()) + ".local."),
4778        ] {
4779            let result = check_hostname(hostname);
4780            assert!(result.is_err());
4781            if let Err(e) = result {
4782                println!("{}", e);
4783            }
4784        }
4785    }
4786
4787    #[test]
4788    fn test_check_domain_suffix() {
4789        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4790        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4791        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4792        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4793        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4794        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4795    }
4796
4797    #[test]
4798    fn test_service_with_temporarily_invalidated_ptr() {
4799        // Create a daemon
4800        let d = ServiceDaemon::new().expect("Failed to create daemon");
4801
4802        let service = "_test_inval_ptr._udp.local.";
4803        let host_name = "my_host_tmp_invalidated_ptr.local.";
4804        let intfs: Vec<_> = my_ip_interfaces(false);
4805        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4806        let port = 5201;
4807        let my_service =
4808            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4809                .expect("invalid service info")
4810                .enable_addr_auto();
4811        let result = d.register(my_service.clone());
4812        assert!(result.is_ok());
4813
4814        // Browse for a service
4815        let browse_chan = d.browse(service).unwrap();
4816        let timeout = Duration::from_secs(2);
4817        let mut resolved = false;
4818
4819        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4820            match event {
4821                ServiceEvent::ServiceResolved(info) => {
4822                    resolved = true;
4823                    println!("Resolved a service of {}", &info.fullname);
4824                    break;
4825                }
4826                e => {
4827                    println!("Received event {:?}", e);
4828                }
4829            }
4830        }
4831
4832        assert!(resolved);
4833
4834        println!("Stopping browse of {}", service);
4835        // Pause browsing so restarting will cause a new immediate query.
4836        // Unregistering will not work here, it will invalidate all the records.
4837        d.stop_browse(service).unwrap();
4838
4839        // Ensure the search is stopped.
4840        // Reduces the chance of receiving an answer adding the ptr back to the
4841        // cache causing the later browse to return directly from the cache.
4842        // (which invalidates what this test is trying to test for.)
4843        let mut stopped = false;
4844        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4845            match event {
4846                ServiceEvent::SearchStopped(_) => {
4847                    stopped = true;
4848                    println!("Stopped browsing service");
4849                    break;
4850                }
4851                // Other `ServiceResolved` messages may be received
4852                // here as they come from different interfaces.
4853                // That's fine for this test.
4854                e => {
4855                    println!("Received event {:?}", e);
4856                }
4857            }
4858        }
4859
4860        assert!(stopped);
4861
4862        // Invalidate the ptr from the service to the host.
4863        let invalidate_ptr_packet = DnsPointer::new(
4864            my_service.get_type(),
4865            RRType::PTR,
4866            CLASS_IN,
4867            0,
4868            my_service.get_fullname().to_string(),
4869        );
4870
4871        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4872        packet_buffer.add_additional_answer(invalidate_ptr_packet);
4873
4874        for intf in intfs {
4875            let sock = _new_socket_bind(&intf, true).unwrap();
4876            send_dns_outgoing_impl(
4877                &packet_buffer,
4878                &intf.name,
4879                intf.index.unwrap_or(0),
4880                &intf.addr,
4881                &sock.pktinfo,
4882                MDNS_PORT,
4883            )
4884            .unwrap();
4885        }
4886
4887        println!(
4888            "Sent PTR record invalidation. Starting second browse for {}",
4889            service
4890        );
4891
4892        // Restart the browse to force the sender to re-send the announcements.
4893        let browse_chan = d.browse(service).unwrap();
4894
4895        resolved = false;
4896        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4897            match event {
4898                ServiceEvent::ServiceResolved(info) => {
4899                    resolved = true;
4900                    println!("Resolved a service of {}", &info.fullname);
4901                    break;
4902                }
4903                e => {
4904                    println!("Received event {:?}", e);
4905                }
4906            }
4907        }
4908
4909        assert!(resolved);
4910        d.shutdown().unwrap();
4911    }
4912
4913    #[test]
4914    fn test_expired_srv() {
4915        // construct service info
4916        let service_type = "_expired-srv._udp.local.";
4917        let instance = "test_instance";
4918        let host_name = "expired_srv_host.local.";
4919        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4920            .unwrap()
4921            .enable_addr_auto();
4922        // let fullname = my_service.get_fullname().to_string();
4923
4924        // set SRV to expire soon.
4925        let new_ttl = 3; // for testing only.
4926        my_service._set_host_ttl(new_ttl);
4927
4928        // register my service
4929        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4930        let result = mdns_server.register(my_service);
4931        assert!(result.is_ok());
4932
4933        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4934        let browse_chan = mdns_client.browse(service_type).unwrap();
4935        let timeout = Duration::from_secs(2);
4936        let mut resolved = false;
4937
4938        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4939            if let ServiceEvent::ServiceResolved(info) = event {
4940                resolved = true;
4941                println!("Resolved a service of {}", &info.fullname);
4942                break;
4943            }
4944        }
4945
4946        assert!(resolved);
4947
4948        // Exit the server so that no more responses.
4949        mdns_server.shutdown().unwrap();
4950
4951        // SRV record in the client cache will expire.
4952        let expire_timeout = Duration::from_secs(new_ttl as u64);
4953        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4954            if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4955                println!("Service removed: {}: {}", &service_type, &full_name);
4956                break;
4957            }
4958        }
4959    }
4960
4961    #[test]
4962    fn test_hostname_resolution_address_removed() {
4963        // Create a mDNS server
4964        let server = ServiceDaemon::new().expect("Failed to create server");
4965        let hostname = "addr_remove_host._tcp.local.";
4966        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4967            .iter()
4968            .find(|iface| iface.ip().is_ipv4())
4969            .map(|iface| iface.ip().into())
4970            .unwrap();
4971
4972        let mut my_service = ServiceInfo::new(
4973            "_host_res_test._tcp.local.",
4974            "my_instance",
4975            hostname,
4976            service_ip_addr.to_ip_addr(),
4977            1234,
4978            None,
4979        )
4980        .expect("invalid service info");
4981
4982        // Set a short TTL for addresses for testing.
4983        let addr_ttl = 2;
4984        my_service._set_host_ttl(addr_ttl); // Expire soon
4985
4986        server.register(my_service).unwrap();
4987
4988        // Create a mDNS client for resolving the hostname.
4989        let client = ServiceDaemon::new().expect("Failed to create client");
4990        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4991        let resolved = loop {
4992            match event_receiver.recv() {
4993                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4994                    assert!(found_hostname == hostname);
4995                    assert!(addresses.contains(&service_ip_addr));
4996                    println!("address found: {:?}", &addresses);
4997                    break true;
4998                }
4999                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
5000                Ok(_event) => {}
5001                Err(_) => break false,
5002            }
5003        };
5004
5005        assert!(resolved);
5006
5007        // Shutdown the server so no more responses / refreshes for addresses.
5008        server.shutdown().unwrap();
5009
5010        // Wait till hostname address record expires, with 1 second grace period.
5011        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
5012        let removed = loop {
5013            match event_receiver.recv_timeout(timeout) {
5014                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
5015                    assert!(removed_host == hostname);
5016                    assert!(addresses.contains(&service_ip_addr));
5017
5018                    println!(
5019                        "address removed: hostname: {} addresses: {:?}",
5020                        &hostname, &addresses
5021                    );
5022                    break true;
5023                }
5024                Ok(_event) => {}
5025                Err(_) => {
5026                    break false;
5027                }
5028            }
5029        };
5030
5031        assert!(removed);
5032
5033        client.shutdown().unwrap();
5034    }
5035
5036    #[test]
5037    fn test_refresh_ptr() {
5038        // construct service info
5039        let service_type = "_refresh-ptr._udp.local.";
5040        let instance = "test_instance";
5041        let host_name = "refresh_ptr_host.local.";
5042        let service_ip_addr = my_ip_interfaces(false)
5043            .iter()
5044            .find(|iface| iface.ip().is_ipv4())
5045            .map(|iface| iface.ip())
5046            .unwrap();
5047
5048        let mut my_service = ServiceInfo::new(
5049            service_type,
5050            instance,
5051            host_name,
5052            service_ip_addr,
5053            5023,
5054            None,
5055        )
5056        .unwrap();
5057
5058        let new_ttl = 3; // for testing only.
5059        my_service._set_other_ttl(new_ttl);
5060
5061        // register my service
5062        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5063        let result = mdns_server.register(my_service);
5064        assert!(result.is_ok());
5065
5066        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5067        let browse_chan = mdns_client.browse(service_type).unwrap();
5068        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5069        let mut resolved = false;
5070
5071        // resolve the service first.
5072        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5073            if let ServiceEvent::ServiceResolved(info) = event {
5074                resolved = true;
5075                println!("Resolved a service of {}", &info.fullname);
5076                break;
5077            }
5078        }
5079
5080        assert!(resolved);
5081
5082        // wait over 80% of TTL, and refresh PTR should be sent out.
5083        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
5084        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5085            println!("event: {:?}", &event);
5086        }
5087
5088        // verify refresh counter.
5089        let metrics_chan = mdns_client.get_metrics().unwrap();
5090        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
5091        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
5092        assert_eq!(ptr_refresh_counter, 1);
5093        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
5094        assert_eq!(srvtxt_refresh_counter, 1);
5095
5096        // Exit the server so that no more responses.
5097        mdns_server.shutdown().unwrap();
5098        mdns_client.shutdown().unwrap();
5099    }
5100
5101    #[test]
5102    fn test_name_change() {
5103        assert_eq!(name_change("foo.local."), "foo (2).local.");
5104        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5105        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5106        assert_eq!(name_change("foo"), "foo (2)");
5107        assert_eq!(name_change("foo (2)"), "foo (3)");
5108        assert_eq!(name_change(""), " (2)");
5109
5110        // Additional edge cases
5111        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
5112        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
5113        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
5114    }
5115
5116    #[test]
5117    fn test_hostname_change() {
5118        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5119        assert_eq!(hostname_change("foo"), "foo-2");
5120        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5121        assert_eq!(hostname_change("foo-9"), "foo-10");
5122        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5123    }
5124
5125    #[test]
5126    fn test_add_answer_txt_ttl() {
5127        // construct a simple service info
5128        let service_type = "_test_add_answer._udp.local.";
5129        let instance = "test_instance";
5130        let host_name = "add_answer_host.local.";
5131        let service_intf = my_ip_interfaces(false)
5132            .into_iter()
5133            .find(|iface| iface.ip().is_ipv4())
5134            .unwrap();
5135        let service_ip_addr = service_intf.ip();
5136        let my_service = ServiceInfo::new(
5137            service_type,
5138            instance,
5139            host_name,
5140            service_ip_addr,
5141            5023,
5142            None,
5143        )
5144        .unwrap();
5145
5146        // construct a DnsOutgoing message
5147        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5148
5149        // Construct a dummy DnsIncoming message
5150        let mut dummy_data = out.to_data_on_wire();
5151        let interface_id = InterfaceId::from(&service_intf);
5152        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5153
5154        // Add an answer of TXT type for the service.
5155        let if_addrs = vec![service_intf.ip()];
5156        add_answer_of_service(
5157            &mut out,
5158            &incoming,
5159            instance,
5160            &my_service,
5161            RRType::TXT,
5162            if_addrs,
5163        );
5164
5165        // Check if the answer was added correctly
5166        assert!(
5167            out.answers_count() > 0,
5168            "No answers added to the outgoing message"
5169        );
5170
5171        // Check if the first answer is of type TXT
5172        let answer = out._answers().first().unwrap();
5173        assert_eq!(answer.0.get_type(), RRType::TXT);
5174
5175        // Check TTL is set properly for the TXT record
5176        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5177    }
5178
5179    #[test]
5180    fn test_interface_flip() {
5181        // start a server
5182        let ty_domain = "_intf-flip._udp.local.";
5183        let host_name = "intf_flip.local.";
5184        let now = SystemTime::now()
5185            .duration_since(SystemTime::UNIX_EPOCH)
5186            .unwrap();
5187        let instance_name = now.as_micros().to_string(); // Create a unique name.
5188        let port = 5200;
5189
5190        // Get a single IPv4 address
5191        let (ip_addr1, intf_name) = my_ip_interfaces(false)
5192            .iter()
5193            .find(|iface| iface.ip().is_ipv4())
5194            .map(|iface| (iface.ip(), iface.name.clone()))
5195            .unwrap();
5196
5197        println!("Using interface {} with IP {}", intf_name, ip_addr1);
5198
5199        // Register the service.
5200        let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5201            .expect("valid service info");
5202        let server1 = ServiceDaemon::new().expect("failed to start server");
5203        server1
5204            .register(service1)
5205            .expect("Failed to register service1");
5206
5207        // wait for the service announced.
5208        std::thread::sleep(Duration::from_secs(2));
5209
5210        // start a client
5211        let client = ServiceDaemon::new().expect("failed to start client");
5212
5213        let receiver = client.browse(ty_domain).unwrap();
5214
5215        let timeout = Duration::from_secs(3);
5216        let mut got_data = false;
5217
5218        while let Ok(event) = receiver.recv_timeout(timeout) {
5219            if let ServiceEvent::ServiceResolved(_) = event {
5220                println!("Received ServiceResolved event");
5221                got_data = true;
5222                break;
5223            }
5224        }
5225
5226        assert!(got_data, "Should receive ServiceResolved event");
5227
5228        // Set a short IP check interval to detect interface changes quickly.
5229        client.set_ip_check_interval(1).unwrap();
5230
5231        // Now shutdown the interface and expect the client to lose the service.
5232        println!("Shutting down interface {}", &intf_name);
5233        client.test_down_interface(&intf_name).unwrap();
5234
5235        let mut got_removed = false;
5236
5237        while let Ok(event) = receiver.recv_timeout(timeout) {
5238            if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5239                got_removed = true;
5240                println!("removed: {ty_domain} : {instance}");
5241                break;
5242            }
5243        }
5244        assert!(got_removed, "Should receive ServiceRemoved event");
5245
5246        println!("Bringing up interface {}", &intf_name);
5247        client.test_up_interface(&intf_name).unwrap();
5248        let mut got_data = false;
5249        while let Ok(event) = receiver.recv_timeout(timeout) {
5250            if let ServiceEvent::ServiceResolved(resolved) = event {
5251                got_data = true;
5252                println!("Received ServiceResolved: {:?}", resolved);
5253                break;
5254            }
5255        }
5256        assert!(
5257            got_data,
5258            "Should receive ServiceResolved event after interface is back up"
5259        );
5260
5261        server1.shutdown().unwrap();
5262        client.shutdown().unwrap();
5263    }
5264
5265    #[test]
5266    fn test_cache_only() {
5267        // construct service info
5268        let service_type = "_cache_only._udp.local.";
5269        let instance = "test_instance";
5270        let host_name = "cache_only_host.local.";
5271        let service_ip_addr = my_ip_interfaces(false)
5272            .iter()
5273            .find(|iface| iface.ip().is_ipv4())
5274            .map(|iface| iface.ip())
5275            .unwrap();
5276
5277        let mut my_service = ServiceInfo::new(
5278            service_type,
5279            instance,
5280            host_name,
5281            service_ip_addr,
5282            5023,
5283            None,
5284        )
5285        .unwrap();
5286
5287        let new_ttl = 3; // for testing only.
5288        my_service._set_other_ttl(new_ttl);
5289
5290        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5291
5292        // make a single browse request to record that we are interested in the service.  This ensures that
5293        // subsequent announcements are cached.
5294        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5295        std::thread::sleep(Duration::from_secs(2));
5296
5297        // register my service
5298        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5299        let result = mdns_server.register(my_service);
5300        assert!(result.is_ok());
5301
5302        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5303        let mut resolved = false;
5304
5305        // resolve the service.
5306        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5307            if let ServiceEvent::ServiceResolved(info) = event {
5308                resolved = true;
5309                println!("Resolved a service of {}", &info.get_fullname());
5310                break;
5311            }
5312        }
5313
5314        assert!(resolved);
5315
5316        // Exit the server so that no more responses.
5317        mdns_server.shutdown().unwrap();
5318        mdns_client.shutdown().unwrap();
5319    }
5320
5321    #[test]
5322    fn test_cache_only_unsolicited() {
5323        let service_type = "_c_unsolicit._udp.local.";
5324        let instance = "test_instance";
5325        let host_name = "c_unsolicit_host.local.";
5326        let service_ip_addr = my_ip_interfaces(false)
5327            .iter()
5328            .find(|iface| iface.ip().is_ipv4())
5329            .map(|iface| iface.ip())
5330            .unwrap();
5331
5332        let my_service = ServiceInfo::new(
5333            service_type,
5334            instance,
5335            host_name,
5336            service_ip_addr,
5337            5023,
5338            None,
5339        )
5340        .unwrap();
5341
5342        // register my service
5343        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5344        let result = mdns_server.register(my_service);
5345        assert!(result.is_ok());
5346
5347        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5348        mdns_client.accept_unsolicited(true).unwrap();
5349
5350        // Wait a bit for the service announcements to go out, before calling browse_cache.  This ensures
5351        // that the announcements are treated as unsolicited
5352        std::thread::sleep(Duration::from_secs(2));
5353        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5354        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5355        let mut resolved = false;
5356
5357        // resolve the service.
5358        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5359            if let ServiceEvent::ServiceResolved(info) = event {
5360                resolved = true;
5361                println!("Resolved a service of {}", &info.get_fullname());
5362                break;
5363            }
5364        }
5365
5366        assert!(resolved);
5367
5368        // Exit the server so that no more responses.
5369        mdns_server.shutdown().unwrap();
5370        mdns_client.shutdown().unwrap();
5371    }
5372
5373    #[test]
5374    fn test_custom_port_isolation() {
5375        // This test verifies:
5376        // 1. Daemons on a custom port can communicate with each other
5377        // 2. Daemons on different ports are isolated (no cross-talk)
5378
5379        let service_type = "_custom_port._udp.local.";
5380        let instance_custom = "custom_port_instance";
5381        let instance_default = "default_port_instance";
5382        let host_name = "custom_port_host.local.";
5383
5384        let service_ip_addr = my_ip_interfaces(false)
5385            .iter()
5386            .find(|iface| iface.ip().is_ipv4())
5387            .map(|iface| iface.ip())
5388            .expect("Test requires an IPv4 interface");
5389
5390        // Create service info for custom port (5454)
5391        let service_custom = ServiceInfo::new(
5392            service_type,
5393            instance_custom,
5394            host_name,
5395            service_ip_addr,
5396            8080,
5397            None,
5398        )
5399        .unwrap();
5400
5401        // Create service info for default port (5353)
5402        let service_default = ServiceInfo::new(
5403            service_type,
5404            instance_default,
5405            host_name,
5406            service_ip_addr,
5407            8081,
5408            None,
5409        )
5410        .unwrap();
5411
5412        // Create two daemons on custom port 5454
5413        let custom_port = 5454u16;
5414        let server_custom =
5415            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5416        let client_custom =
5417            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5418
5419        // Create daemon on default port (5353)
5420        let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5421
5422        // Register service on custom port
5423        server_custom
5424            .register(service_custom.clone())
5425            .expect("Failed to register custom port service");
5426
5427        // Register service on default port
5428        server_default
5429            .register(service_default.clone())
5430            .expect("Failed to register default port service");
5431
5432        // Browse from custom port client
5433        let browse_custom = client_custom
5434            .browse(service_type)
5435            .expect("Failed to browse on custom port");
5436
5437        let timeout = Duration::from_secs(3);
5438        let mut found_custom = false;
5439        let mut found_default_on_custom = false;
5440
5441        // Custom port client should find the custom port service
5442        while let Ok(event) = browse_custom.recv_timeout(timeout) {
5443            if let ServiceEvent::ServiceResolved(info) = event {
5444                println!(
5445                    "Custom port client resolved: {} on port {}",
5446                    info.get_fullname(),
5447                    info.get_port()
5448                );
5449                if info.get_fullname().starts_with(instance_custom) {
5450                    found_custom = true;
5451                    assert_eq!(info.get_port(), 8080);
5452                }
5453                if info.get_fullname().starts_with(instance_default) {
5454                    found_default_on_custom = true;
5455                }
5456            }
5457        }
5458
5459        assert!(
5460            found_custom,
5461            "Custom port client should find service on custom port"
5462        );
5463        assert!(
5464            !found_default_on_custom,
5465            "Custom port client should NOT find service on default port"
5466        );
5467
5468        // Now verify the default port daemon can find its own services
5469        // but not the custom port services
5470        let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5471        let browse_default = client_default
5472            .browse(service_type)
5473            .expect("Failed to browse on default port");
5474
5475        let mut found_default = false;
5476        let mut found_custom_on_default = false;
5477
5478        while let Ok(event) = browse_default.recv_timeout(timeout) {
5479            if let ServiceEvent::ServiceResolved(info) = event {
5480                println!(
5481                    "Default port client resolved: {} on port {}",
5482                    info.get_fullname(),
5483                    info.get_port()
5484                );
5485                if info.get_fullname().starts_with(instance_default) {
5486                    found_default = true;
5487                    assert_eq!(info.get_port(), 8081);
5488                }
5489                if info.get_fullname().starts_with(instance_custom) {
5490                    found_custom_on_default = true;
5491                }
5492            }
5493        }
5494
5495        assert!(
5496            found_default,
5497            "Default port client should find service on default port"
5498        );
5499        assert!(
5500            !found_custom_on_default,
5501            "Default port client should NOT find service on custom port"
5502        );
5503
5504        // Cleanup
5505        server_custom.shutdown().unwrap();
5506        client_custom.shutdown().unwrap();
5507        server_default.shutdown().unwrap();
5508        client_default.shutdown().unwrap();
5509    }
5510}