Skip to main content

mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38        CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42    Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50    cmp::{self, Reverse},
51    collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52    fmt, io,
53    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54    str, thread,
55    time::Duration,
56    vec,
57};
58
59/// The default max length of the service name without domain, not including the
60/// leading underscore (`_`). It is set to 15 per
61/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
62pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64/// The default interval for checking IP changes automatically.
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
68/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
69pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71/// The mDNS port number per RFC 6762.
72pub const MDNS_PORT: u16 = 5353;
73
74const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
75const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
76const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
77
78const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
79
80/// Response status code for the service `unregister` call.
81#[derive(Debug)]
82pub enum UnregisterStatus {
83    /// Unregister was successful.
84    OK,
85    /// The service was not found in the registration.
86    NotFound,
87}
88
89/// Status code for the service daemon.
90#[derive(Debug, PartialEq, Clone, Eq)]
91#[non_exhaustive]
92pub enum DaemonStatus {
93    /// The daemon is running as normal.
94    Running,
95
96    /// The daemon has been shutdown.
97    Shutdown,
98}
99
100/// Different counters included in the metrics.
101/// Currently all counters are for outgoing packets.
102#[derive(Hash, Eq, PartialEq)]
103enum Counter {
104    Register,
105    RegisterResend,
106    Unregister,
107    UnregisterResend,
108    Browse,
109    ResolveHostname,
110    Respond,
111    CacheRefreshPTR,
112    CacheRefreshSrvTxt,
113    CacheRefreshAddr,
114    KnownAnswerSuppression,
115    CachedPTR,
116    CachedSRV,
117    CachedAddr,
118    CachedTxt,
119    CachedNSec,
120    CachedSubtype,
121    DnsRegistryProbe,
122    DnsRegistryActive,
123    DnsRegistryTimer,
124    DnsRegistryNameChange,
125    Timer,
126}
127
128impl fmt::Display for Counter {
129    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
130        match self {
131            Self::Register => write!(f, "register"),
132            Self::RegisterResend => write!(f, "register-resend"),
133            Self::Unregister => write!(f, "unregister"),
134            Self::UnregisterResend => write!(f, "unregister-resend"),
135            Self::Browse => write!(f, "browse"),
136            Self::ResolveHostname => write!(f, "resolve-hostname"),
137            Self::Respond => write!(f, "respond"),
138            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
139            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
140            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
141            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
142            Self::CachedPTR => write!(f, "cached-ptr"),
143            Self::CachedSRV => write!(f, "cached-srv"),
144            Self::CachedAddr => write!(f, "cached-addr"),
145            Self::CachedTxt => write!(f, "cached-txt"),
146            Self::CachedNSec => write!(f, "cached-nsec"),
147            Self::CachedSubtype => write!(f, "cached-subtype"),
148            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
149            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
150            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
151            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
152            Self::Timer => write!(f, "timer"),
153        }
154    }
155}
156
157#[derive(Debug)]
158enum InternalError {
159    IntfAddrInvalid(Interface),
160}
161
162impl fmt::Display for InternalError {
163    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164        match self {
165            InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
166        }
167    }
168}
169
170type MyResult<T> = core::result::Result<T, InternalError>;
171
172/// A wrapper around UDP socket used by the mDNS daemon.
173///
174/// We do this because `mio` does not support PKTINFO and
175/// does not provide a way to implement `Source` trait directly and safely.
176struct MyUdpSocket {
177    /// The underlying socket that supports control messages like
178    /// `IP_PKTINFO` for IPv4 and `IPV6_PKTINFO` for IPv6.
179    pktinfo: PktInfoUdpSocket,
180
181    /// The mio UDP socket that is a clone of `pktinfo` and
182    /// is used for event polling.
183    mio: MioUdpSocket,
184}
185
186impl MyUdpSocket {
187    pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
188        let std_sock = pktinfo.try_clone_std()?;
189        let mio = MioUdpSocket::from_std(std_sock);
190
191        Ok(Self { pktinfo, mio })
192    }
193}
194
195/// Implements the mio `Source` trait so that we can use `MyUdpSocket` with `Poll`.
196impl Source for MyUdpSocket {
197    fn register(
198        &mut self,
199        registry: &Registry,
200        token: Token,
201        interests: Interest,
202    ) -> io::Result<()> {
203        self.mio.register(registry, token, interests)
204    }
205
206    fn reregister(
207        &mut self,
208        registry: &Registry,
209        token: Token,
210        interests: Interest,
211    ) -> io::Result<()> {
212        self.mio.reregister(registry, token, interests)
213    }
214
215    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
216        self.mio.deregister(registry)
217    }
218}
219
220/// The metrics is a HashMap of (name_key, i64_value).
221/// The main purpose is to help monitoring the mDNS packet traffic.
222pub type Metrics = HashMap<String, i64>;
223
224const IPV4_SOCK_EVENT_KEY: usize = 4; // Pick a key just to indicate IPv4.
225const IPV6_SOCK_EVENT_KEY: usize = 6; // Pick a key just to indicate IPv6.
226const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
227
228/// A daemon thread for mDNS
229///
230/// This struct provides a handle and an API to the daemon. It is cloneable.
231#[derive(Clone)]
232pub struct ServiceDaemon {
233    /// Sender handle of the channel to the daemon.
234    sender: Sender<Command>,
235
236    /// Send to this addr to signal that a `Command` is coming.
237    ///
238    /// The daemon listens on this addr together with other mDNS sockets,
239    /// to avoid busy polling the flume channel. If there is a way to poll
240    /// the channel and mDNS sockets together, then this can be removed.
241    signal_addr: SocketAddr,
242}
243
244impl ServiceDaemon {
245    /// Creates a new daemon and spawns a thread to run the daemon.
246    ///
247    /// Creates a new mDNS service daemon using the default port (5353).
248    ///
249    /// For development/testing with custom ports, use [`ServiceDaemon::new_with_port`].
250    pub fn new() -> Result<Self> {
251        Self::new_with_port(MDNS_PORT)
252    }
253
254    /// Creates a new mDNS service daemon using a custom port.
255    ///
256    /// # Arguments
257    ///
258    /// * `port` - The UDP port to bind for mDNS communication.
259    ///   - In production, this should be `MDNS_PORT` (5353) per RFC 6762.
260    ///   - For development/testing, you can use a non-standard port (e.g., 5454)
261    ///     to avoid conflicts with system mDNS services.
262    ///   - Both publisher and browser must use the same port to communicate.
263    ///
264    /// # Example
265    ///
266    /// ```no_run
267    /// use mdns_sd::ServiceDaemon;
268    ///
269    /// // Use standard mDNS port (production)
270    /// let daemon = ServiceDaemon::new_with_port(5353)?;
271    ///
272    /// // Use custom port for development (avoids macOS Bonjour conflict)
273    /// let daemon_dev = ServiceDaemon::new_with_port(5454)?;
274    /// # Ok::<(), mdns_sd::Error>(())
275    /// ```
276    pub fn new_with_port(port: u16) -> Result<Self> {
277        // Use port 0 to allow the system assign a random available port,
278        // no need for a pre-defined port number.
279        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280
281        let signal_sock = UdpSocket::bind(signal_addr)
282            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
283
284        // Get the socket with the OS chosen port
285        let signal_addr = signal_sock
286            .local_addr()
287            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
288
289        // Must be nonblocking so we can listen to it together with mDNS sockets.
290        signal_sock
291            .set_nonblocking(true)
292            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
293
294        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
295
296        let (sender, receiver) = bounded(100);
297
298        // Spawn the daemon thread
299        let mio_sock = MioUdpSocket::from_std(signal_sock);
300        let cmd_sender = sender.clone();
301        thread::Builder::new()
302            .name("mDNS_daemon".to_string())
303            .spawn(move || {
304                Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
305            })
306            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
307
308        Ok(Self {
309            sender,
310            signal_addr,
311        })
312    }
313
314    /// Sends `cmd` to the daemon via its channel, and sends a signal
315    /// to its sock addr to notify.
316    fn send_cmd(&self, cmd: Command) -> Result<()> {
317        let cmd_name = cmd.to_string();
318
319        // First, send to the flume channel.
320        self.sender.try_send(cmd).map_err(|e| match e {
321            TrySendError::Full(_) => Error::Again,
322            e => e_fmt!("flume::channel::send failed: {}", e),
323        })?;
324
325        // Second, send a signal to notify the daemon.
326        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
327        let socket = UdpSocket::bind(addr)
328            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
329        socket
330            .send_to(cmd_name.as_bytes(), self.signal_addr)
331            .map_err(|e| {
332                e_fmt!(
333                    "signal socket send_to {} ({}) failed: {}",
334                    self.signal_addr,
335                    cmd_name,
336                    e
337                )
338            })?;
339
340        Ok(())
341    }
342
343    /// Starts browsing for a specific service type.
344    ///
345    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
346    ///
347    /// Returns a channel `Receiver` to receive events about the service. The caller
348    /// can call `.recv_async().await` on this receiver to handle events in an
349    /// async environment or call `.recv()` in a sync environment.
350    ///
351    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
352    /// finding more details, i.e. SRV records and TXT records.
353    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
354        check_domain_suffix(service_type)?;
355
356        let (resp_s, resp_r) = bounded(10);
357        self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
358        Ok(resp_r)
359    }
360
361    /// Preforms a "cache-only" browse.
362    ///
363    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
364    ///
365    /// The functionality is identical to 'browse', but the service events are based solely on the contents
366    /// of the daemon's cache. No actual mDNS query is sent to the network.
367    ///
368    /// See [accept_unsolicited](Self::accept_unsolicited) if you want to do cache-only browsing.
369    pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
370        check_domain_suffix(service_type)?;
371
372        let (resp_s, resp_r) = bounded(10);
373        self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
374        Ok(resp_r)
375    }
376
377    /// Stops searching for a specific service type.
378    ///
379    /// When an error is returned, the caller should retry only when
380    /// the error is `Error::Again`, otherwise should log and move on.
381    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
382        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
383    }
384
385    /// Starts querying for the ip addresses of a hostname.
386    ///
387    /// Returns a channel `Receiver` to receive events about the hostname.
388    /// The caller can call `.recv_async().await` on this receiver to handle events in an
389    /// async environment or call `.recv()` in a sync environment.
390    ///
391    /// The `timeout` is specified in milliseconds.
392    pub fn resolve_hostname(
393        &self,
394        hostname: &str,
395        timeout: Option<u64>,
396    ) -> Result<Receiver<HostnameResolutionEvent>> {
397        check_hostname(hostname)?;
398        let (resp_s, resp_r) = bounded(10);
399        self.send_cmd(Command::ResolveHostname(
400            hostname.to_string(),
401            1,
402            resp_s,
403            timeout,
404        ))?;
405        Ok(resp_r)
406    }
407
408    /// Stops querying for the ip addresses of a hostname.
409    ///
410    /// When an error is returned, the caller should retry only when
411    /// the error is `Error::Again`, otherwise should log and move on.
412    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
413        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
414    }
415
416    /// Registers a service provided by this host.
417    ///
418    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
419    /// this method will automatically fill in addresses from the host.
420    ///
421    /// To re-announce a service with an updated `service_info`, just call
422    /// this `register` function again. No need to call `unregister` first.
423    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
424        check_service_name(service_info.get_fullname())?;
425        check_hostname(service_info.get_hostname())?;
426
427        self.send_cmd(Command::Register(service_info.into()))
428    }
429
430    /// Unregisters a service. This is a graceful shutdown of a service.
431    ///
432    /// Returns a channel receiver that is used to receive the status code
433    /// of the unregister.
434    ///
435    /// When an error is returned, the caller should retry only when
436    /// the error is `Error::Again`, otherwise should log and move on.
437    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
438        let (resp_s, resp_r) = bounded(1);
439        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
440        Ok(resp_r)
441    }
442
443    /// Starts to monitor events from the daemon.
444    ///
445    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
446    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
447        let (resp_s, resp_r) = bounded(100);
448        self.send_cmd(Command::Monitor(resp_s))?;
449        Ok(resp_r)
450    }
451
452    /// Shuts down the daemon thread and returns a channel to receive the status.
453    ///
454    /// When an error is returned, the caller should retry only when
455    /// the error is `Error::Again`, otherwise should log and move on.
456    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
457        let (resp_s, resp_r) = bounded(1);
458        self.send_cmd(Command::Exit(resp_s))?;
459        Ok(resp_r)
460    }
461
462    /// Returns the status of the daemon.
463    ///
464    /// When an error is returned, the caller should retry only when
465    /// the error is `Error::Again`, otherwise should consider the daemon
466    /// stopped working and move on.
467    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
468        let (resp_s, resp_r) = bounded(1);
469
470        if self.sender.is_disconnected() {
471            resp_s
472                .send(DaemonStatus::Shutdown)
473                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
474        } else {
475            self.send_cmd(Command::GetStatus(resp_s))?;
476        }
477
478        Ok(resp_r)
479    }
480
481    /// Returns a channel receiver for the metrics, e.g. input/output counters.
482    ///
483    /// The metrics returned is a snapshot. Hence the caller should call
484    /// this method repeatedly if they want to monitor the metrics continuously.
485    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
486        let (resp_s, resp_r) = bounded(1);
487        self.send_cmd(Command::GetMetrics(resp_s))?;
488        Ok(resp_r)
489    }
490
491    /// Change the max length allowed for a service name.
492    ///
493    /// As RFC 6763 defines a length max for a service name, a user should not call
494    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
495    ///
496    /// `len_max` is capped at an internal limit, which is currently 30.
497    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
498        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
499
500        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
501            return Err(Error::Msg(format!(
502                "service name length max {len_max} is too large"
503            )));
504        }
505
506        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
507    }
508
509    /// Change the interval for checking IP changes automatically.
510    ///
511    /// Setting the interval to 0 disables the IP check.
512    ///
513    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
514    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
515        let interval_in_millis = interval_in_secs as u64 * 1000;
516        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
517            interval_in_millis,
518        )))
519    }
520
521    /// Get the current interval in seconds for checking IP changes automatically.
522    pub fn get_ip_check_interval(&self) -> Result<u32> {
523        let (resp_s, resp_r) = bounded(1);
524        self.send_cmd(Command::GetOption(resp_s))?;
525
526        let option = resp_r
527            .recv_timeout(Duration::from_secs(10))
528            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
529        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
530        Ok(ip_check_interval_in_secs as u32)
531    }
532
533    /// Include interfaces that match `if_kind` for this service daemon.
534    ///
535    /// For example:
536    /// ```ignore
537    ///     daemon.enable_interface("en0")?;
538    /// ```
539    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
540        let if_kind_vec = if_kind.into_vec();
541        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
542            if_kind_vec.kinds,
543        )))
544    }
545
546    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
547    ///
548    /// For example:
549    /// ```ignore
550    ///     daemon.disable_interface(IfKind::IPv6)?;
551    /// ```
552    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
553        let if_kind_vec = if_kind.into_vec();
554        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
555            if_kind_vec.kinds,
556        )))
557    }
558
559    /// If `accept` is true, accept and cache all responses, even if there is no active querier
560    /// for a given service type. This is useful / necessary when doing cache-only browsing. See
561    /// [browse_cache](Self::browse_cache).
562    ///
563    /// If `accept` is false (default), accept only responses matching queries that we have initiated.
564    ///
565    /// For example:
566    /// ```ignore
567    ///     daemon.accept_unsolicited(true)?;
568    /// ```
569    pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
570        self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
571    }
572
573    /// Include or exclude Apple P2P interfaces, e.g. "awdl0", "llw0".
574    /// By default, they are excluded.
575    pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
576        self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
577    }
578
579    #[cfg(test)]
580    pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
581        self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
582            ifname.to_string(),
583        )))
584    }
585
586    #[cfg(test)]
587    pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
588        self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
589            ifname.to_string(),
590        )))
591    }
592
593    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
594    ///
595    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
596    /// receive announcements from a responder on the same host.
597    ///
598    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
599    ///
600    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
601    /// the UNIX version of the IP_MULTICAST_LOOP option:
602    ///
603    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
604    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
605    ///
606    /// Which means, in order NOT to receive localhost announcements, you want to call
607    /// this API on the querier side on Windows, but on the responder side on Unix.
608    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
609        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
610    }
611
612    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
613    ///
614    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
615    /// receive announcements from a responder on the same host.
616    ///
617    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
618    ///
619    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
620    /// the UNIX version of the IP_MULTICAST_LOOP option:
621    ///
622    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
623    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
624    ///
625    /// Which means, in order NOT to receive localhost announcements, you want to call
626    /// this API on the querier side on Windows, but on the responder side on Unix.
627    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
628        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
629    }
630
631    /// Proactively confirms whether a service instance still valid.
632    ///
633    /// This call will issue queries for a service instance's SRV record and Address records.
634    ///
635    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
636    /// unless there is a reason not to follow RFC.
637    ///
638    /// If no response is received within `timeout`, the current resource
639    /// records will be flushed, and if needed, `ServiceRemoved` event will be
640    /// sent to active queriers.
641    ///
642    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
643    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
644        self.send_cmd(Command::Verify(instance_fullname, timeout))
645    }
646
647    fn daemon_thread(
648        signal_sock: MioUdpSocket,
649        poller: Poll,
650        receiver: Receiver<Command>,
651        port: u16,
652        cmd_sender: Sender<Command>,
653        signal_addr: SocketAddr,
654    ) {
655        let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
656
657        if let Some(cmd) = zc.run(receiver) {
658            match cmd {
659                Command::Exit(resp_s) => {
660                    // It is guaranteed that the receiver already dropped,
661                    // i.e. the daemon command channel closed.
662                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
663                        debug!("exit: failed to send response of shutdown: {}", e);
664                    }
665                }
666                _ => {
667                    debug!("Unexpected command: {:?}", cmd);
668                }
669            }
670        }
671    }
672}
673
674/// Creates a new UDP socket that uses `intf` to send and recv multicast.
675fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
676    // Use the same socket for receiving and sending multicast packets.
677    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
678    let intf_ip = &intf.ip();
679    match intf_ip {
680        IpAddr::V4(ip) => {
681            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
682            let sock = new_socket(addr.into(), true)?;
683
684            // Join mDNS group to receive packets.
685            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
686                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
687
688            // Set IP_MULTICAST_IF to send packets.
689            sock.set_multicast_if_v4(ip)
690                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
691
692            // Per RFC 6762 section 11:
693            // "All Multicast DNS responses (including responses sent via unicast) SHOULD
694            // be sent with IP TTL set to 255."
695            // Here we set the TTL to 255 for multicast as we don't support unicast yet.
696            sock.set_multicast_ttl_v4(255)
697                .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
698
699            if !should_loop {
700                sock.set_multicast_loop_v4(false)
701                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
702            }
703
704            // Test if we can send packets successfully.
705            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
706            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
707            for packet in test_packets {
708                sock.send_to(&packet, &multicast_addr)
709                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
710            }
711            MyUdpSocket::new(sock)
712                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
713        }
714        IpAddr::V6(ip) => {
715            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
716            let sock = new_socket(addr.into(), true)?;
717
718            let if_index = intf.index.unwrap_or(0);
719
720            // Join mDNS group to receive packets.
721            sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
722                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
723
724            // Set IPV6_MULTICAST_IF to send packets.
725            sock.set_multicast_if_v6(if_index)
726                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
727
728            // We are not sending multicast packets to test this socket as there might
729            // be many IPv6 interfaces on a host and could cause such send error:
730            // "No buffer space available (os error 55)".
731
732            MyUdpSocket::new(sock)
733                .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
734        }
735    }
736}
737
738/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
739/// `non_block` indicates whether to set O_NONBLOCK for the socket.
740fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
741    let domain = match addr {
742        SocketAddr::V4(_) => socket2::Domain::IPV4,
743        SocketAddr::V6(_) => socket2::Domain::IPV6,
744    };
745
746    let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
747
748    fd.set_reuse_address(true)
749        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
750    #[cfg(unix)]
751    if let Err(e) = fd.set_reuse_port(true) {
752        debug!(
753            "SO_REUSEPORT is not supported, continuing without it: {}",
754            e
755        );
756    }
757
758    if non_block {
759        fd.set_nonblocking(true)
760            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
761    }
762
763    fd.bind(&addr.into())
764        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
765
766    trace!("new socket bind to {}", &addr);
767    Ok(fd)
768}
769
770/// Specify a UNIX timestamp in millis to run `command` for the next time.
771struct ReRun {
772    /// UNIX timestamp in millis.
773    next_time: u64,
774    command: Command,
775}
776
777/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
778///
779/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
780#[derive(Debug, Clone)]
781#[non_exhaustive]
782pub enum IfKind {
783    /// All interfaces.
784    All,
785
786    /// All IPv4 interfaces.
787    IPv4,
788
789    /// All IPv6 interfaces.
790    IPv6,
791
792    /// By the interface name, for example "en0"
793    Name(String),
794
795    /// By an IPv4 or IPv6 address.
796    Addr(IpAddr),
797
798    /// 127.0.0.1 (or anything in 127.0.0.0/8), enabled by default.
799    ///
800    /// Loopback interfaces are required by some use cases (e.g., OSCQuery) for publishing.
801    LoopbackV4,
802
803    /// ::1/128, enabled by default.
804    LoopbackV6,
805}
806
807impl IfKind {
808    /// Checks if `intf` matches with this interface kind.
809    fn matches(&self, intf: &Interface) -> bool {
810        match self {
811            Self::All => true,
812            Self::IPv4 => intf.ip().is_ipv4(),
813            Self::IPv6 => intf.ip().is_ipv6(),
814            Self::Name(ifname) => ifname == &intf.name,
815            Self::Addr(addr) => addr == &intf.ip(),
816            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
817            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
818        }
819    }
820}
821
822/// The first use case of specifying an interface was to
823/// use an interface name. Hence adding this for ergonomic reasons.
824impl From<&str> for IfKind {
825    fn from(val: &str) -> Self {
826        Self::Name(val.to_string())
827    }
828}
829
830impl From<&String> for IfKind {
831    fn from(val: &String) -> Self {
832        Self::Name(val.to_string())
833    }
834}
835
836/// Still for ergonomic reasons.
837impl From<IpAddr> for IfKind {
838    fn from(val: IpAddr) -> Self {
839        Self::Addr(val)
840    }
841}
842
843/// A list of `IfKind` that can be used to match interfaces.
844pub struct IfKindVec {
845    kinds: Vec<IfKind>,
846}
847
848/// A trait that converts a type into a Vec of `IfKind`.
849pub trait IntoIfKindVec {
850    fn into_vec(self) -> IfKindVec;
851}
852
853impl<T: Into<IfKind>> IntoIfKindVec for T {
854    fn into_vec(self) -> IfKindVec {
855        let if_kind: IfKind = self.into();
856        IfKindVec {
857            kinds: vec![if_kind],
858        }
859    }
860}
861
862impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
863    fn into_vec(self) -> IfKindVec {
864        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
865        IfKindVec { kinds }
866    }
867}
868
869/// Selection of interfaces.
870struct IfSelection {
871    /// The interfaces to be selected.
872    if_kind: IfKind,
873
874    /// Whether the `if_kind` should be enabled or not.
875    selected: bool,
876}
877
878/// A struct holding the state. It was inspired by `zeroconf` package in Python.
879struct Zeroconf {
880    /// The mDNS port number to use for socket binding.
881    /// Typically MDNS_PORT (5353), but can be customized for development/testing.
882    port: u16,
883
884    /// Local interfaces keyed by interface index.
885    my_intfs: HashMap<u32, MyIntf>,
886
887    /// A common socket for IPv4 interfaces. It's None if IPv4 is disabled in OS kernel.
888    ipv4_sock: Option<MyUdpSocket>,
889
890    /// A common socket for IPv6 interfaces. It's None if IPv6 is disabled in OS kernel.
891    ipv6_sock: Option<MyUdpSocket>,
892
893    /// Local registered services, keyed by service full names.
894    my_services: HashMap<String, ServiceInfo>,
895
896    /// Received DNS records.
897    cache: DnsCache,
898
899    /// Registered service records, keyed by interface index.
900    dns_registry_map: HashMap<u32, DnsRegistry>,
901
902    /// Active "Browse" commands.
903    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
904
905    /// Active "ResolveHostname" commands.
906    ///
907    /// The timestamps are set at the future timestamp when the command should timeout.
908    /// `hostname` is case-insensitive and stored in lowercase.
909    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
910
911    /// All repeating transmissions.
912    retransmissions: Vec<ReRun>,
913
914    counters: Metrics,
915
916    /// Waits for incoming packets.
917    poller: Poll,
918
919    /// Channels to notify events.
920    monitors: Vec<Sender<DaemonEvent>>,
921
922    /// Options
923    service_name_len_max: u8,
924
925    /// Interval in millis to check IP address changes.
926    ip_check_interval: u64,
927
928    /// All interface selections called to the daemon.
929    if_selections: Vec<IfSelection>,
930
931    /// Socket for signaling.
932    signal_sock: MioUdpSocket,
933
934    /// Timestamps marking where we need another iteration of the run loop,
935    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
936    ///
937    /// When the run loop goes through a single iteration, it will
938    /// set its timeout to the earliest timer in this list.
939    timers: BinaryHeap<Reverse<u64>>,
940
941    status: DaemonStatus,
942
943    /// Service instances that are pending for resolving SRV and TXT.
944    pending_resolves: HashSet<String>,
945
946    /// Service instances that are already resolved.
947    resolved: HashSet<String>,
948
949    multicast_loop_v4: bool,
950
951    multicast_loop_v6: bool,
952
953    accept_unsolicited: bool,
954
955    include_apple_p2p: bool,
956
957    cmd_sender: Sender<Command>,
958
959    signal_addr: SocketAddr,
960
961    #[cfg(test)]
962    test_down_interfaces: HashSet<String>,
963}
964
965/// Join the multicast group for the given interface.
966fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
967    let intf_ip = &intf.ip();
968    match intf_ip {
969        IpAddr::V4(ip) => {
970            // Join mDNS group to receive packets.
971            debug!("join multicast group V4 on {} addr {ip}", intf.name);
972            my_sock
973                .join_multicast_v4(&GROUP_ADDR_V4, ip)
974                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
975        }
976        IpAddr::V6(ip) => {
977            let if_index = intf.index.unwrap_or(0);
978            // Join mDNS group to receive packets.
979            debug!(
980                "join multicast group V6 on {} addr {ip} with index {if_index}",
981                intf.name
982            );
983            my_sock
984                .join_multicast_v6(&GROUP_ADDR_V6, if_index)
985                .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
986        }
987    }
988    Ok(())
989}
990
991impl Zeroconf {
992    fn new(
993        signal_sock: MioUdpSocket,
994        poller: Poll,
995        port: u16,
996        cmd_sender: Sender<Command>,
997        signal_addr: SocketAddr,
998    ) -> Self {
999        // Get interfaces.
1000        let my_ifaddrs = my_ip_interfaces(true);
1001
1002        // Create a socket for every IP addr.
1003        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
1004        // or the same interface name with different IP addrs.
1005        let mut my_intfs = HashMap::new();
1006        let mut dns_registry_map = HashMap::new();
1007
1008        // Use the same socket for receiving and sending multicast packets.
1009        // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
1010        let mut ipv4_sock = None;
1011        let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1012        match new_socket(addr.into(), true) {
1013            Ok(sock) => {
1014                // Per RFC 6762 section 11:
1015                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1016                // be sent with IP TTL set to 255."
1017                // Here we set the TTL to 255 for multicast as we don't support unicast yet.
1018                sock.set_multicast_ttl_v4(255)
1019                    .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1020                    .ok();
1021
1022                // This clones a socket.
1023                ipv4_sock = match MyUdpSocket::new(sock) {
1024                    Ok(s) => Some(s),
1025                    Err(e) => {
1026                        debug!("failed to create IPv4 MyUdpSocket: {e}");
1027                        None
1028                    }
1029                };
1030            }
1031            // Per RFC 6762 section 11:}
1032            Err(e) => debug!("failed to create IPv4 socket: {e}"),
1033        }
1034
1035        let mut ipv6_sock = None;
1036        let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1037        match new_socket(addr.into(), true) {
1038            Ok(sock) => {
1039                // Per RFC 6762 section 11:
1040                // "All Multicast DNS responses (including responses sent via unicast) SHOULD
1041                // be sent with IP TTL set to 255."
1042                sock.set_multicast_hops_v6(255)
1043                    .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1044                    .ok();
1045
1046                // This clones the ipv6 socket.
1047                ipv6_sock = match MyUdpSocket::new(sock) {
1048                    Ok(s) => Some(s),
1049                    Err(e) => {
1050                        debug!("failed to create IPv6 MyUdpSocket: {e}");
1051                        None
1052                    }
1053                };
1054            }
1055            Err(e) => debug!("failed to create IPv6 socket: {e}"),
1056        }
1057
1058        // Configure sockets to join multicast groups.
1059        for intf in my_ifaddrs {
1060            let sock_opt = if intf.ip().is_ipv4() {
1061                &ipv4_sock
1062            } else {
1063                &ipv6_sock
1064            };
1065            let Some(sock) = sock_opt else {
1066                debug!(
1067                    "no socket available for interface {} with addr {}. Skipped.",
1068                    intf.name,
1069                    intf.ip()
1070                );
1071                continue;
1072            };
1073
1074            if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1075                debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1076            }
1077
1078            let if_index = intf.index.unwrap_or(0);
1079
1080            // Add this interface address if not already present.
1081            dns_registry_map
1082                .entry(if_index)
1083                .or_insert_with(DnsRegistry::new);
1084
1085            my_intfs
1086                .entry(if_index)
1087                .and_modify(|v: &mut MyIntf| {
1088                    v.addrs.insert(intf.addr.clone());
1089                })
1090                .or_insert(MyIntf {
1091                    name: intf.name.clone(),
1092                    index: if_index,
1093                    addrs: HashSet::from([intf.addr]),
1094                });
1095        }
1096
1097        let monitors = Vec::new();
1098        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1099        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1100
1101        let timers = BinaryHeap::new();
1102
1103        // Enable everything, including loopback interfaces.
1104        let if_selections = vec![];
1105
1106        let status = DaemonStatus::Running;
1107
1108        Self {
1109            port,
1110            my_intfs,
1111            ipv4_sock,
1112            ipv6_sock,
1113            my_services: HashMap::new(),
1114            cache: DnsCache::new(),
1115            dns_registry_map,
1116            hostname_resolvers: HashMap::new(),
1117            service_queriers: HashMap::new(),
1118            retransmissions: Vec::new(),
1119            counters: HashMap::new(),
1120            poller,
1121            monitors,
1122            service_name_len_max,
1123            ip_check_interval,
1124            if_selections,
1125            signal_sock,
1126            timers,
1127            status,
1128            pending_resolves: HashSet::new(),
1129            resolved: HashSet::new(),
1130            multicast_loop_v4: true,
1131            multicast_loop_v6: true,
1132            accept_unsolicited: false,
1133            include_apple_p2p: false,
1134            cmd_sender,
1135            signal_addr,
1136
1137            #[cfg(test)]
1138            test_down_interfaces: HashSet::new(),
1139        }
1140    }
1141
1142    /// Send a Command into the daemon channel and poke the signal socket to wake the poll loop.
1143    fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1144        let cmd_name = cmd.to_string();
1145
1146        self.cmd_sender.try_send(cmd).map_err(|e| match e {
1147            TrySendError::Full(_) => Error::Again,
1148            e => e_fmt!("flume::channel::send failed: {}", e),
1149        })?;
1150
1151        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1152        let socket = UdpSocket::bind(addr)
1153            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1154        socket
1155            .send_to(cmd_name.as_bytes(), self.signal_addr)
1156            .map_err(|e| {
1157                e_fmt!(
1158                    "signal socket send_to {} ({}) failed: {}",
1159                    self.signal_addr,
1160                    cmd_name,
1161                    e
1162                )
1163            })?;
1164
1165        Ok(())
1166    }
1167
1168    /// Clean up all resources before shutdown.
1169    ///
1170    /// This method:
1171    /// 1. Unregisters all registered services (sends goodbye packets)
1172    /// 2. Stops all active browse operations
1173    /// 3. Stops all active hostname resolution operations
1174    /// 4. Clears all retransmissions
1175    fn cleanup(&mut self) {
1176        debug!("Starting cleanup for shutdown");
1177
1178        // 1. Unregister all services - send goodbye packets
1179        let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1180        for fullname in service_names {
1181            if let Some(info) = self.my_services.get(&fullname) {
1182                debug!("Unregistering service during shutdown: {}", &fullname);
1183
1184                for intf in self.my_intfs.values() {
1185                    if let Some(sock) = self.ipv4_sock.as_ref() {
1186                        self.unregister_service(info, intf, &sock.pktinfo);
1187                    }
1188
1189                    if let Some(sock) = self.ipv6_sock.as_ref() {
1190                        self.unregister_service(info, intf, &sock.pktinfo);
1191                    }
1192                }
1193            }
1194        }
1195        self.my_services.clear();
1196
1197        // 2. Stop all browse operations
1198        let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1199        for ty_domain in browse_types {
1200            debug!("Stopping browse during shutdown: {}", &ty_domain);
1201            if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1202                // Notify the client
1203                if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1204                    debug!("Failed to send SearchStopped during shutdown: {}", e);
1205                }
1206            }
1207        }
1208
1209        // 3. Stop all hostname resolution operations
1210        let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1211        for hostname in hostnames {
1212            debug!(
1213                "Stopping hostname resolution during shutdown: {}",
1214                &hostname
1215            );
1216            if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1217                // Notify the client
1218                if let Err(e) =
1219                    sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1220                {
1221                    debug!(
1222                        "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1223                        e
1224                    );
1225                }
1226            }
1227        }
1228
1229        // 4. Clear all retransmissions
1230        self.retransmissions.clear();
1231
1232        debug!("Cleanup completed");
1233    }
1234
1235    /// The main event loop of the daemon thread
1236    ///
1237    /// In each round, it will:
1238    /// 1. select the listening sockets with a timeout.
1239    /// 2. process the incoming packets if any.
1240    /// 3. try_recv on its channel and execute commands.
1241    /// 4. announce its registered services.
1242    /// 5. process retransmissions if any.
1243    fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1244        // Add the daemon's signal socket to the poller.
1245        if let Err(e) = self.poller.registry().register(
1246            &mut self.signal_sock,
1247            mio::Token(SIGNAL_SOCK_EVENT_KEY),
1248            mio::Interest::READABLE,
1249        ) {
1250            debug!("failed to add signal socket to the poller: {}", e);
1251            return None;
1252        }
1253
1254        if let Some(sock) = self.ipv4_sock.as_mut() {
1255            if let Err(e) = self.poller.registry().register(
1256                sock,
1257                mio::Token(IPV4_SOCK_EVENT_KEY),
1258                mio::Interest::READABLE,
1259            ) {
1260                debug!("failed to register ipv4 socket: {}", e);
1261                return None;
1262            }
1263        }
1264
1265        if let Some(sock) = self.ipv6_sock.as_mut() {
1266            if let Err(e) = self.poller.registry().register(
1267                sock,
1268                mio::Token(IPV6_SOCK_EVENT_KEY),
1269                mio::Interest::READABLE,
1270            ) {
1271                debug!("failed to register ipv6 socket: {}", e);
1272                return None;
1273            }
1274        }
1275
1276        // Setup timer for IP checks.
1277        let mut next_ip_check = if self.ip_check_interval > 0 {
1278            current_time_millis() + self.ip_check_interval
1279        } else {
1280            0
1281        };
1282
1283        if next_ip_check > 0 {
1284            self.add_timer(next_ip_check);
1285        }
1286
1287        // Start the run loop.
1288
1289        let mut events = mio::Events::with_capacity(1024);
1290        loop {
1291            let now = current_time_millis();
1292
1293            let earliest_timer = self.peek_earliest_timer();
1294            let timeout = earliest_timer.map(|timer| {
1295                // If `timer` already passed, set `timeout` to be 1ms.
1296                let millis = if timer > now { timer - now } else { 1 };
1297                Duration::from_millis(millis)
1298            });
1299
1300            // Process incoming packets, command events and optional timeout.
1301            events.clear();
1302            match self.poller.poll(&mut events, timeout) {
1303                Ok(_) => self.handle_poller_events(&events),
1304                Err(e) => debug!("failed to select from sockets: {}", e),
1305            }
1306
1307            let now = current_time_millis();
1308
1309            // Remove the timers if already passed.
1310            self.pop_timers_till(now);
1311
1312            // Remove hostname resolvers with expired timeouts.
1313            for hostname in self
1314                .hostname_resolvers
1315                .clone()
1316                .into_iter()
1317                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1318                .map(|(hostname, _)| hostname)
1319            {
1320                trace!("hostname resolver timeout for {}", &hostname);
1321                call_hostname_resolution_listener(
1322                    &self.hostname_resolvers,
1323                    &hostname,
1324                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1325                );
1326                call_hostname_resolution_listener(
1327                    &self.hostname_resolvers,
1328                    &hostname,
1329                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1330                );
1331                self.hostname_resolvers.remove(&hostname);
1332            }
1333
1334            // process commands from the command channel
1335            while let Ok(command) = receiver.try_recv() {
1336                if matches!(command, Command::Exit(_)) {
1337                    debug!("Exit command received, performing cleanup");
1338                    self.cleanup();
1339                    self.status = DaemonStatus::Shutdown;
1340                    return Some(command);
1341                }
1342                self.exec_command(command, false);
1343            }
1344
1345            // check for repeated commands and run them if their time is up.
1346            let mut i = 0;
1347            while i < self.retransmissions.len() {
1348                if now >= self.retransmissions[i].next_time {
1349                    let rerun = self.retransmissions.remove(i);
1350                    self.exec_command(rerun.command, true);
1351                } else {
1352                    i += 1;
1353                }
1354            }
1355
1356            // Refresh cached service records with active queriers
1357            self.refresh_active_services();
1358
1359            // Refresh cached A/AAAA records with active queriers
1360            let mut query_count = 0;
1361            for (hostname, _sender) in self.hostname_resolvers.iter() {
1362                for (hostname, ip_addr) in
1363                    self.cache.refresh_due_hostname_resolutions(hostname).iter()
1364                {
1365                    self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1366                    query_count += 1;
1367                }
1368            }
1369
1370            self.increase_counter(Counter::CacheRefreshAddr, query_count);
1371
1372            // check and evict expired records in our cache
1373            let now = current_time_millis();
1374
1375            // Notify service listeners about the expired records.
1376            let expired_services = self.cache.evict_expired_services(now);
1377            if !expired_services.is_empty() {
1378                debug!(
1379                    "run: send {} service removal to listeners",
1380                    expired_services.len()
1381                );
1382                self.notify_service_removal(expired_services);
1383            }
1384
1385            // Notify hostname listeners about the expired records.
1386            let expired_addrs = self.cache.evict_expired_addr(now);
1387            for (hostname, addrs) in expired_addrs {
1388                call_hostname_resolution_listener(
1389                    &self.hostname_resolvers,
1390                    &hostname,
1391                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1392                );
1393                let instances = self.cache.get_instances_on_host(&hostname);
1394                let instance_set: HashSet<String> = instances.into_iter().collect();
1395                self.resolve_updated_instances(&instance_set);
1396            }
1397
1398            // Send out probing queries.
1399            self.probing_handler();
1400
1401            // check IP changes if next_ip_check is reached.
1402            if now >= next_ip_check && next_ip_check > 0 {
1403                next_ip_check = now + self.ip_check_interval;
1404                self.add_timer(next_ip_check);
1405
1406                self.check_ip_changes();
1407            }
1408        }
1409    }
1410
1411    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1412        match daemon_opt {
1413            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1414            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1415            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1416            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1417            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1418            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1419            DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1420            DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1421            #[cfg(test)]
1422            DaemonOption::TestDownInterface(ifname) => {
1423                self.test_down_interfaces.insert(ifname);
1424            }
1425            #[cfg(test)]
1426            DaemonOption::TestUpInterface(ifname) => {
1427                self.test_down_interfaces.remove(&ifname);
1428            }
1429        }
1430    }
1431
1432    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1433        debug!("enable_interface: {:?}", kinds);
1434        for if_kind in kinds {
1435            self.if_selections.push(IfSelection {
1436                if_kind,
1437                selected: true,
1438            });
1439        }
1440
1441        self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1442    }
1443
1444    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1445        debug!("disable_interface: {:?}", kinds);
1446        for if_kind in kinds {
1447            self.if_selections.push(IfSelection {
1448                if_kind,
1449                selected: false,
1450            });
1451        }
1452
1453        self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1454    }
1455
1456    fn set_multicast_loop_v4(&mut self, on: bool) {
1457        let Some(sock) = self.ipv4_sock.as_mut() else {
1458            return;
1459        };
1460        self.multicast_loop_v4 = on;
1461        sock.pktinfo
1462            .set_multicast_loop_v4(on)
1463            .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1464            .unwrap();
1465    }
1466
1467    fn set_multicast_loop_v6(&mut self, on: bool) {
1468        let Some(sock) = self.ipv6_sock.as_mut() else {
1469            return;
1470        };
1471        self.multicast_loop_v6 = on;
1472        sock.pktinfo
1473            .set_multicast_loop_v6(on)
1474            .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1475            .unwrap();
1476    }
1477
1478    fn set_accept_unsolicited(&mut self, accept: bool) {
1479        self.accept_unsolicited = accept;
1480    }
1481
1482    fn set_apple_p2p(&mut self, include: bool) {
1483        if self.include_apple_p2p != include {
1484            self.include_apple_p2p = include;
1485            self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1486        }
1487    }
1488
1489    fn notify_monitors(&mut self, event: DaemonEvent) {
1490        // Only retain the monitors that are still connected.
1491        self.monitors.retain(|sender| {
1492            if let Err(e) = sender.try_send(event.clone()) {
1493                debug!("notify_monitors: try_send: {}", &e);
1494                if matches!(e, TrySendError::Disconnected(_)) {
1495                    return false; // This monitor is dropped.
1496                }
1497            }
1498            true
1499        });
1500    }
1501
1502    /// Remove `addr` in my services that enabled `addr_auto`.
1503    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1504        for (_, service_info) in self.my_services.iter_mut() {
1505            if service_info.is_addr_auto() {
1506                service_info.remove_ipaddr(addr);
1507            }
1508        }
1509    }
1510
1511    fn add_timer(&mut self, next_time: u64) {
1512        self.timers.push(Reverse(next_time));
1513    }
1514
1515    fn peek_earliest_timer(&self) -> Option<u64> {
1516        self.timers.peek().map(|Reverse(v)| *v)
1517    }
1518
1519    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1520        self.timers.pop().map(|Reverse(v)| v)
1521    }
1522
1523    /// Pop all timers that are already passed till `now`.
1524    fn pop_timers_till(&mut self, now: u64) {
1525        while let Some(Reverse(v)) = self.timers.peek() {
1526            if *v > now {
1527                break;
1528            }
1529            self.timers.pop();
1530        }
1531    }
1532
1533    /// Apply all selections to `interfaces` and return the selected addresses.
1534    fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1535        let intf_count = interfaces.len();
1536        let mut intf_selections = vec![true; intf_count];
1537
1538        // apply if_selections
1539        for selection in self.if_selections.iter() {
1540            // Mark the interfaces for this selection.
1541            for i in 0..intf_count {
1542                if selection.if_kind.matches(&interfaces[i]) {
1543                    intf_selections[i] = selection.selected;
1544                }
1545            }
1546        }
1547
1548        let mut selected_addrs = HashSet::new();
1549        for i in 0..intf_count {
1550            if intf_selections[i] {
1551                selected_addrs.insert(interfaces[i].clone());
1552            }
1553        }
1554
1555        selected_addrs
1556    }
1557
1558    /// Apply all selections to `interfaces`.
1559    ///
1560    /// For any interface, add it if selected but not bound yet,
1561    /// delete it if not selected but still bound.
1562    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1563        // By default, we enable all interfaces.
1564        let intf_count = interfaces.len();
1565        let mut intf_selections = vec![true; intf_count];
1566
1567        // apply if_selections
1568        for selection in self.if_selections.iter() {
1569            // Mark the interfaces for this selection.
1570            for i in 0..intf_count {
1571                if selection.if_kind.matches(&interfaces[i]) {
1572                    intf_selections[i] = selection.selected;
1573                }
1574            }
1575        }
1576
1577        // Update `my_intfs` based on the selections.
1578        for (idx, intf) in interfaces.into_iter().enumerate() {
1579            if intf_selections[idx] {
1580                // Add the interface
1581                self.add_interface(intf);
1582            } else {
1583                // Remove the interface
1584                self.del_interface_addr(&intf);
1585            }
1586        }
1587    }
1588
1589    fn del_ip(&mut self, ip: IpAddr) {
1590        self.del_addr_in_my_services(&ip);
1591        self.notify_monitors(DaemonEvent::IpDel(ip));
1592    }
1593
1594    /// Check for IP changes and update [my_intfs] as needed.
1595    fn check_ip_changes(&mut self) {
1596        // Get the current interfaces.
1597        let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1598
1599        #[cfg(test)]
1600        let my_ifaddrs: Vec<_> = my_ifaddrs
1601            .into_iter()
1602            .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1603            .collect();
1604
1605        let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1606            my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1607                let if_index = intf.index.unwrap_or(0);
1608                acc.entry(if_index).or_default().push(&intf.addr);
1609                acc
1610            });
1611
1612        let mut deleted_intfs = Vec::new();
1613        let mut deleted_ips = Vec::new();
1614
1615        for (if_index, my_intf) in self.my_intfs.iter_mut() {
1616            let mut last_ipv4 = None;
1617            let mut last_ipv6 = None;
1618
1619            if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1620                my_intf.addrs.retain(|addr| {
1621                    if current_addrs.contains(&addr) {
1622                        true
1623                    } else {
1624                        match addr.ip() {
1625                            IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1626                            IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1627                        }
1628                        deleted_ips.push(addr.ip());
1629                        false
1630                    }
1631                });
1632                if my_intf.addrs.is_empty() {
1633                    deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1634                }
1635            } else {
1636                // If it does not exist, remove the interface.
1637                debug!(
1638                    "check_ip_changes: interface {} ({}) no longer exists, removing",
1639                    my_intf.name, if_index
1640                );
1641                for addr in my_intf.addrs.iter() {
1642                    match addr.ip() {
1643                        IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1644                        IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1645                    }
1646                    deleted_ips.push(addr.ip())
1647                }
1648                deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1649            }
1650        }
1651
1652        if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1653            debug!(
1654                "check_ip_changes: {} deleted ips {} deleted intfs",
1655                deleted_ips.len(),
1656                deleted_intfs.len()
1657            );
1658        }
1659
1660        for ip in deleted_ips {
1661            self.del_ip(ip);
1662        }
1663
1664        for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1665            let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1666                continue;
1667            };
1668
1669            if let Some(ipv4) = last_ipv4 {
1670                debug!("leave multicast for {ipv4}");
1671                if let Some(sock) = self.ipv4_sock.as_mut() {
1672                    if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1673                        debug!("leave multicast group for addr {ipv4}: {e}");
1674                    }
1675                }
1676            }
1677
1678            if let Some(ipv6) = last_ipv6 {
1679                debug!("leave multicast for {ipv6}");
1680                if let Some(sock) = self.ipv6_sock.as_mut() {
1681                    if let Err(e) = sock
1682                        .pktinfo
1683                        .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1684                    {
1685                        debug!("leave multicast group for IPv6: {ipv6}: {e}");
1686                    }
1687                }
1688            }
1689
1690            // Remove cache records for this interface.
1691            let intf_id = InterfaceId {
1692                name: my_intf.name.to_string(),
1693                index: my_intf.index,
1694            };
1695            let removed_instances = self.cache.remove_records_on_intf(intf_id);
1696            self.notify_service_removal(removed_instances);
1697        }
1698
1699        // Add newly found interfaces only if in our selections.
1700        self.apply_intf_selections(my_ifaddrs);
1701    }
1702
1703    /// Remove an interface address when it was down, disabled or removed from the system.
1704    /// If no more addresses on the interface, remove the interface as well.
1705    fn del_interface_addr(&mut self, intf: &Interface) {
1706        let if_index = intf.index.unwrap_or(0);
1707        trace!(
1708            "del_interface_addr: {} ({if_index}) addr {}",
1709            intf.name,
1710            intf.ip()
1711        );
1712
1713        let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1714            debug!("del_interface_addr: interface {} not found", intf.name);
1715            return;
1716        };
1717
1718        let mut ip_removed = false;
1719
1720        if my_intf.addrs.remove(&intf.addr) {
1721            ip_removed = true;
1722
1723            match intf.addr.ip() {
1724                IpAddr::V4(ipv4) => {
1725                    if my_intf.next_ifaddr_v4().is_none() {
1726                        if let Some(sock) = self.ipv4_sock.as_mut() {
1727                            if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1728                                debug!("leave multicast group for addr {ipv4}: {e}");
1729                            }
1730                        }
1731                    }
1732                }
1733
1734                IpAddr::V6(ipv6) => {
1735                    if my_intf.next_ifaddr_v6().is_none() {
1736                        if let Some(sock) = self.ipv6_sock.as_mut() {
1737                            if let Err(e) =
1738                                sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1739                            {
1740                                debug!("leave multicast group for addr {ipv6}: {e}");
1741                            }
1742                        }
1743                    }
1744                }
1745            }
1746
1747            if my_intf.addrs.is_empty() {
1748                // If no more addresses, remove the interface.
1749                debug!("del_interface_addr: removing interface {}", intf.name);
1750                self.my_intfs.remove(&if_index);
1751                self.dns_registry_map.remove(&if_index);
1752                self.cache.remove_addrs_on_disabled_intf(if_index);
1753            }
1754        }
1755
1756        if ip_removed {
1757            // Notify the monitors.
1758            self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1759            // Remove the interface from my services that enabled `addr_auto`.
1760            self.del_addr_in_my_services(&intf.ip());
1761        }
1762    }
1763
1764    fn add_interface(&mut self, intf: Interface) {
1765        let sock_opt = if intf.ip().is_ipv4() {
1766            &self.ipv4_sock
1767        } else {
1768            &self.ipv6_sock
1769        };
1770
1771        let Some(sock) = sock_opt else {
1772            debug!(
1773                "add_interface: no socket available for interface {} with addr {}. Skipped.",
1774                intf.name,
1775                intf.ip()
1776            );
1777            return;
1778        };
1779
1780        let if_index = intf.index.unwrap_or(0);
1781        let mut new_addr = false;
1782
1783        match self.my_intfs.entry(if_index) {
1784            Entry::Occupied(mut entry) => {
1785                // If intf has a new address, add it to the existing interface.
1786                let my_intf = entry.get_mut();
1787                if !my_intf.addrs.contains(&intf.addr) {
1788                    if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1789                        debug!("add_interface: socket_config {}: {e}", &intf.name);
1790                    }
1791                    my_intf.addrs.insert(intf.addr.clone());
1792                    new_addr = true;
1793                }
1794            }
1795            Entry::Vacant(entry) => {
1796                if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1797                    debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1798                    return;
1799                }
1800
1801                new_addr = true;
1802                let new_intf = MyIntf {
1803                    name: intf.name.clone(),
1804                    index: if_index,
1805                    addrs: HashSet::from([intf.addr.clone()]),
1806                };
1807                entry.insert(new_intf);
1808            }
1809        }
1810
1811        if !new_addr {
1812            trace!("add_interface: interface {} already exists", &intf.name);
1813            return;
1814        }
1815
1816        debug!("add new interface {}: {}", intf.name, intf.ip());
1817
1818        let Some(my_intf) = self.my_intfs.get(&if_index) else {
1819            debug!("add_interface: cannot find if_index {if_index}");
1820            return;
1821        };
1822
1823        let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1824            Some(registry) => registry,
1825            None => self
1826                .dns_registry_map
1827                .entry(if_index)
1828                .or_insert_with(DnsRegistry::new),
1829        };
1830
1831        for (_, service_info) in self.my_services.iter_mut() {
1832            if service_info.is_addr_auto() {
1833                service_info.insert_ipaddr(&intf);
1834
1835                if let Ok(true) = announce_service_on_intf(
1836                    dns_registry,
1837                    service_info,
1838                    my_intf,
1839                    &sock.pktinfo,
1840                    self.port,
1841                ) {
1842                    debug!(
1843                        "Announce service {} on {}",
1844                        service_info.get_fullname(),
1845                        intf.ip()
1846                    );
1847                    service_info.set_status(if_index, ServiceStatus::Announced);
1848                } else {
1849                    for timer in dns_registry.new_timers.drain(..) {
1850                        self.timers.push(Reverse(timer));
1851                    }
1852                    service_info.set_status(if_index, ServiceStatus::Probing);
1853                }
1854            }
1855        }
1856
1857        // As we added a new interface, we want to execute all active "Browse" reruns now.
1858        let mut browse_reruns = Vec::new();
1859        let mut i = 0;
1860        while i < self.retransmissions.len() {
1861            if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1862                browse_reruns.push(self.retransmissions.remove(i));
1863            } else {
1864                i += 1;
1865            }
1866        }
1867
1868        for rerun in browse_reruns {
1869            self.exec_command(rerun.command, true);
1870        }
1871
1872        // Notify the monitors.
1873        self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1874    }
1875
1876    /// Registers a service.
1877    ///
1878    /// RFC 6762 section 8.3.
1879    /// ...the Multicast DNS responder MUST send
1880    ///    an unsolicited Multicast DNS response containing, in the Answer
1881    ///    Section, all of its newly registered resource records
1882    ///
1883    /// Zeroconf will then respond to requests for information about this service.
1884    fn register_service(&mut self, mut info: ServiceInfo) {
1885        // Check the service name length.
1886        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1887            error!("check_service_name_length: {}", &e);
1888            self.notify_monitors(DaemonEvent::Error(e));
1889            return;
1890        }
1891
1892        if info.is_addr_auto() {
1893            let selected_intfs =
1894                self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
1895            for intf in selected_intfs {
1896                info.insert_ipaddr(&intf);
1897            }
1898        }
1899
1900        debug!("register service {:?}", &info);
1901
1902        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1903        if !outgoing_addrs.is_empty() {
1904            self.notify_monitors(DaemonEvent::Announce(
1905                info.get_fullname().to_string(),
1906                format!("{:?}", &outgoing_addrs),
1907            ));
1908        }
1909
1910        // The key has to be lower case letter as DNS record name is case insensitive.
1911        // The info will have the original name.
1912        let service_fullname = info.get_fullname().to_lowercase();
1913        self.my_services.insert(service_fullname, info);
1914    }
1915
1916    /// Sends out announcement of `info` on every valid interface.
1917    /// Returns the list of interface IPs that sent out the announcement.
1918    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1919        let mut outgoing_addrs = Vec::new();
1920        let mut outgoing_intfs = HashSet::new();
1921
1922        let mut invalid_intf_addrs = HashSet::new();
1923
1924        for (if_index, intf) in self.my_intfs.iter() {
1925            let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1926                Some(registry) => registry,
1927                None => self
1928                    .dns_registry_map
1929                    .entry(*if_index)
1930                    .or_insert_with(DnsRegistry::new),
1931            };
1932
1933            let mut announced = false;
1934
1935            // IPv4
1936            if let Some(sock) = self.ipv4_sock.as_mut() {
1937                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1938                    Ok(true) => {
1939                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1940                            outgoing_addrs.push(addr.ip());
1941                        }
1942                        outgoing_intfs.insert(intf.index);
1943
1944                        debug!(
1945                            "Announce service IPv4 {} on {}",
1946                            info.get_fullname(),
1947                            intf.name
1948                        );
1949                        announced = true;
1950                    }
1951                    Ok(false) => {}
1952                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
1953                        invalid_intf_addrs.insert(intf_addr);
1954                    }
1955                }
1956            }
1957
1958            if let Some(sock) = self.ipv6_sock.as_mut() {
1959                match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1960                    Ok(true) => {
1961                        for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1962                            outgoing_addrs.push(addr.ip());
1963                        }
1964                        outgoing_intfs.insert(intf.index);
1965
1966                        debug!(
1967                            "Announce service IPv6 {} on {}",
1968                            info.get_fullname(),
1969                            intf.name
1970                        );
1971                        announced = true;
1972                    }
1973                    Ok(false) => {}
1974                    Err(InternalError::IntfAddrInvalid(intf_addr)) => {
1975                        invalid_intf_addrs.insert(intf_addr);
1976                    }
1977                }
1978            }
1979
1980            if announced {
1981                info.set_status(intf.index, ServiceStatus::Announced);
1982            } else {
1983                for timer in dns_registry.new_timers.drain(..) {
1984                    self.timers.push(Reverse(timer));
1985                }
1986                info.set_status(*if_index, ServiceStatus::Probing);
1987            }
1988        }
1989
1990        if !invalid_intf_addrs.is_empty() {
1991            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
1992        }
1993
1994        // RFC 6762 section 8.3.
1995        // ..The Multicast DNS responder MUST send at least two unsolicited
1996        //    responses, one second apart.
1997        let next_time = current_time_millis() + 1000;
1998        for if_index in outgoing_intfs {
1999            self.add_retransmission(
2000                next_time,
2001                Command::RegisterResend(info.get_fullname().to_string(), if_index),
2002            );
2003        }
2004
2005        outgoing_addrs
2006    }
2007
2008    /// Send probings or finish them if expired. Notify waiting services.
2009    fn probing_handler(&mut self) {
2010        let now = current_time_millis();
2011        let mut invalid_intf_addrs = HashSet::new();
2012
2013        for (if_index, intf) in self.my_intfs.iter() {
2014            let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2015                continue;
2016            };
2017
2018            let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2019
2020            // send probing.
2021            if !out.questions().is_empty() {
2022                trace!("sending out probing of questions: {:?}", out.questions());
2023                if let Some(sock) = self.ipv4_sock.as_mut() {
2024                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2025                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2026                    {
2027                        invalid_intf_addrs.insert(intf_addr);
2028                    }
2029                }
2030                if let Some(sock) = self.ipv6_sock.as_mut() {
2031                    if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2032                        send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2033                    {
2034                        invalid_intf_addrs.insert(intf_addr);
2035                    }
2036                }
2037            }
2038
2039            // For finished probes, wake up services that are waiting for the probes.
2040            let waiting_services =
2041                handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2042
2043            for service_name in waiting_services {
2044                // service names are lowercase
2045                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2046                    if info.get_status(*if_index) == ServiceStatus::Announced {
2047                        debug!("service {} already announced", info.get_fullname());
2048                        continue;
2049                    }
2050
2051                    let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2052                        match announce_service_on_intf(
2053                            dns_registry,
2054                            info,
2055                            intf,
2056                            &sock.pktinfo,
2057                            self.port,
2058                        ) {
2059                            Ok(announced) => announced,
2060                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2061                                invalid_intf_addrs.insert(intf_addr);
2062                                false
2063                            }
2064                        }
2065                    } else {
2066                        false
2067                    };
2068                    let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2069                        match announce_service_on_intf(
2070                            dns_registry,
2071                            info,
2072                            intf,
2073                            &sock.pktinfo,
2074                            self.port,
2075                        ) {
2076                            Ok(announced) => announced,
2077                            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2078                                invalid_intf_addrs.insert(intf_addr);
2079                                false
2080                            }
2081                        }
2082                    } else {
2083                        false
2084                    };
2085
2086                    if announced_v4 || announced_v6 {
2087                        let next_time = now + 1000;
2088                        let command =
2089                            Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2090                        self.retransmissions.push(ReRun { next_time, command });
2091                        self.timers.push(Reverse(next_time));
2092
2093                        let fullname = match dns_registry.name_changes.get(&service_name) {
2094                            Some(new_name) => new_name.to_string(),
2095                            None => service_name.to_string(),
2096                        };
2097
2098                        let mut hostname = info.get_hostname();
2099                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2100                            hostname = new_name;
2101                        }
2102
2103                        debug!("wake up: announce service {} on {}", fullname, intf.name);
2104                        notify_monitors(
2105                            &mut self.monitors,
2106                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2107                        );
2108
2109                        info.set_status(*if_index, ServiceStatus::Announced);
2110                    }
2111                }
2112            }
2113        }
2114
2115        if !invalid_intf_addrs.is_empty() {
2116            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2117        }
2118    }
2119
2120    fn unregister_service(
2121        &self,
2122        info: &ServiceInfo,
2123        intf: &MyIntf,
2124        sock: &PktInfoUdpSocket,
2125    ) -> Vec<u8> {
2126        let is_ipv4 = sock.domain() == Domain::IPV4;
2127
2128        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2129        out.add_answer_at_time(
2130            DnsPointer::new(
2131                info.get_type(),
2132                RRType::PTR,
2133                CLASS_IN,
2134                0,
2135                info.get_fullname().to_string(),
2136            ),
2137            0,
2138        );
2139
2140        if let Some(sub) = info.get_subtype() {
2141            trace!("Adding subdomain {}", sub);
2142            out.add_answer_at_time(
2143                DnsPointer::new(
2144                    sub,
2145                    RRType::PTR,
2146                    CLASS_IN,
2147                    0,
2148                    info.get_fullname().to_string(),
2149                ),
2150                0,
2151            );
2152        }
2153
2154        out.add_answer_at_time(
2155            DnsSrv::new(
2156                info.get_fullname(),
2157                CLASS_IN | CLASS_CACHE_FLUSH,
2158                0,
2159                info.get_priority(),
2160                info.get_weight(),
2161                info.get_port(),
2162                info.get_hostname().to_string(),
2163            ),
2164            0,
2165        );
2166        out.add_answer_at_time(
2167            DnsTxt::new(
2168                info.get_fullname(),
2169                CLASS_IN | CLASS_CACHE_FLUSH,
2170                0,
2171                info.generate_txt(),
2172            ),
2173            0,
2174        );
2175
2176        let if_addrs = if is_ipv4 {
2177            info.get_addrs_on_my_intf_v4(intf)
2178        } else {
2179            info.get_addrs_on_my_intf_v6(intf)
2180        };
2181
2182        if if_addrs.is_empty() {
2183            return vec![];
2184        }
2185
2186        for address in if_addrs {
2187            out.add_answer_at_time(
2188                DnsAddress::new(
2189                    info.get_hostname(),
2190                    ip_address_rr_type(&address),
2191                    CLASS_IN | CLASS_CACHE_FLUSH,
2192                    0,
2193                    address,
2194                    intf.into(),
2195                ),
2196                0,
2197            );
2198        }
2199
2200        // Only (at most) one packet is expected to be sent out.
2201        let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port) {
2202            Ok(sent_vec) => sent_vec,
2203            Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2204                let invalid_intf_addrs = HashSet::from([intf_addr]);
2205                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2206                vec![]
2207            }
2208        };
2209        sent_vec.into_iter().next().unwrap_or_default()
2210    }
2211
2212    /// Binds a channel `listener` to querying mDNS hostnames.
2213    ///
2214    /// If there is already a `listener`, it will be updated, i.e. overwritten.
2215    fn add_hostname_resolver(
2216        &mut self,
2217        hostname: String,
2218        listener: Sender<HostnameResolutionEvent>,
2219        timeout: Option<u64>,
2220    ) {
2221        let real_timeout = timeout.map(|t| current_time_millis() + t);
2222        self.hostname_resolvers
2223            .insert(hostname.to_lowercase(), (listener, real_timeout));
2224        if let Some(t) = real_timeout {
2225            self.add_timer(t);
2226        }
2227    }
2228
2229    /// Sends a multicast query for `name` with `qtype`.
2230    fn send_query(&self, name: &str, qtype: RRType) {
2231        self.send_query_vec(&[(name, qtype)]);
2232    }
2233
2234    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
2235    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2236        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2237        let now = current_time_millis();
2238
2239        for (name, qtype) in questions {
2240            out.add_question(name, *qtype);
2241
2242            for record in self.cache.get_known_answers(name, *qtype, now) {
2243                /*
2244                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
2245                ...
2246                    When a Multicast DNS querier sends a query to which it already knows
2247                    some answers, it populates the Answer Section of the DNS query
2248                    message with those answers.
2249                 */
2250                trace!("add known answer: {:?}", record.record);
2251                let mut new_record = record.record.clone();
2252                new_record.get_record_mut().update_ttl(now);
2253                out.add_answer_box(new_record);
2254            }
2255        }
2256
2257        let mut invalid_intf_addrs = HashSet::new();
2258        for (_, intf) in self.my_intfs.iter() {
2259            if let Some(sock) = self.ipv4_sock.as_ref() {
2260                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2261                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2262                {
2263                    invalid_intf_addrs.insert(intf_addr);
2264                }
2265            }
2266            if let Some(sock) = self.ipv6_sock.as_ref() {
2267                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2268                    send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2269                {
2270                    invalid_intf_addrs.insert(intf_addr);
2271                }
2272            }
2273        }
2274
2275        if !invalid_intf_addrs.is_empty() {
2276            let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2277        }
2278    }
2279
2280    /// Reads one UDP datagram from the socket of `intf`.
2281    ///
2282    /// Returns false if failed to receive a packet,
2283    /// otherwise returns true.
2284    fn handle_read(&mut self, event_key: usize) -> bool {
2285        let sock_opt = match event_key {
2286            IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2287            IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2288            _ => {
2289                debug!("handle_read: unknown token {}", event_key);
2290                return false;
2291            }
2292        };
2293        let Some(sock) = sock_opt.as_mut() else {
2294            debug!("handle_read: socket not available for token {}", event_key);
2295            return false;
2296        };
2297        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2298
2299        // Read the next mDNS UDP datagram.
2300        //
2301        // If the datagram is larger than `buf`, excess bytes may or may not
2302        // be truncated by the socket layer depending on the platform's libc.
2303        // In any case, such large datagram will not be decoded properly and
2304        // this function should return false but should not crash.
2305        let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2306            Ok(sz) => sz,
2307            Err(e) => {
2308                if e.kind() != std::io::ErrorKind::WouldBlock {
2309                    debug!("listening socket read failed: {}", e);
2310                }
2311                return false;
2312            }
2313        };
2314
2315        // Find the interface that received the packet.
2316        let pkt_if_index = pktinfo.if_index as u32;
2317        let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2318            debug!(
2319                "handle_read: no interface found for pktinfo if_index: {}",
2320                pktinfo.if_index
2321            );
2322            return true; // We still return true to indicate that we read something.
2323        };
2324
2325        buf.truncate(sz); // reduce potential processing errors
2326
2327        match DnsIncoming::new(buf, my_intf.into()) {
2328            Ok(msg) => {
2329                if msg.is_query() {
2330                    self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2331                } else if msg.is_response() {
2332                    self.handle_response(msg, pkt_if_index);
2333                } else {
2334                    debug!("Invalid message: not query and not response");
2335                }
2336            }
2337            Err(e) => debug!("Invalid incoming DNS message: {}", e),
2338        }
2339
2340        true
2341    }
2342
2343    /// Returns true, if sent query. Returns false if SRV already exists.
2344    fn query_unresolved(&mut self, instance: &str) -> bool {
2345        if !valid_instance_name(instance) {
2346            trace!("instance name {} not valid", instance);
2347            return false;
2348        }
2349
2350        if let Some(records) = self.cache.get_srv(instance) {
2351            for record in records {
2352                if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2353                    if self.cache.get_addr(srv.host()).is_none() {
2354                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2355                        return true;
2356                    }
2357                }
2358            }
2359        } else {
2360            self.send_query(instance, RRType::ANY);
2361            return true;
2362        }
2363
2364        false
2365    }
2366
2367    /// Checks if `ty_domain` has records in the cache. If yes, sends the
2368    /// cached records via `sender`.
2369    fn query_cache_for_service(
2370        &mut self,
2371        ty_domain: &str,
2372        sender: &Sender<ServiceEvent>,
2373        now: u64,
2374    ) {
2375        let mut resolved: HashSet<String> = HashSet::new();
2376        let mut unresolved: HashSet<String> = HashSet::new();
2377
2378        if let Some(records) = self.cache.get_ptr(ty_domain) {
2379            for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2380                if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2381                    let mut new_event = None;
2382                    match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2383                        Ok(resolved_service) => {
2384                            if resolved_service.is_valid() {
2385                                debug!("Resolved service from cache: {}", ptr.alias());
2386                                new_event =
2387                                    Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2388                            } else {
2389                                debug!("Resolved service is not valid: {}", ptr.alias());
2390                            }
2391                        }
2392                        Err(err) => {
2393                            debug!("Error while resolving service from cache: {}", err);
2394                            continue;
2395                        }
2396                    }
2397
2398                    match sender.send(ServiceEvent::ServiceFound(
2399                        ty_domain.to_string(),
2400                        ptr.alias().to_string(),
2401                    )) {
2402                        Ok(()) => debug!("sent service found {}", ptr.alias()),
2403                        Err(e) => {
2404                            debug!("failed to send service found: {}", e);
2405                            continue;
2406                        }
2407                    }
2408
2409                    if let Some(event) = new_event {
2410                        resolved.insert(ptr.alias().to_string());
2411                        match sender.send(event) {
2412                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2413                            Err(e) => debug!("failed to send service resolved: {}", e),
2414                        }
2415                    } else {
2416                        unresolved.insert(ptr.alias().to_string());
2417                    }
2418                }
2419            }
2420        }
2421
2422        for instance in resolved.drain() {
2423            self.pending_resolves.remove(&instance);
2424            self.resolved.insert(instance);
2425        }
2426
2427        for instance in unresolved.drain() {
2428            self.add_pending_resolve(instance);
2429        }
2430    }
2431
2432    /// Checks if `hostname` has records in the cache. If yes, sends the
2433    /// cached records via `sender`.
2434    fn query_cache_for_hostname(
2435        &mut self,
2436        hostname: &str,
2437        sender: Sender<HostnameResolutionEvent>,
2438    ) {
2439        let addresses_map = self.cache.get_addresses_for_host(hostname);
2440        for (name, addresses) in addresses_map {
2441            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2442                Ok(()) => trace!("sent hostname addresses found"),
2443                Err(e) => debug!("failed to send hostname addresses found: {}", e),
2444            }
2445        }
2446    }
2447
2448    fn add_pending_resolve(&mut self, instance: String) {
2449        if !self.pending_resolves.contains(&instance) {
2450            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2451            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2452            self.pending_resolves.insert(instance);
2453        }
2454    }
2455
2456    /// Creates a `ResolvedService` from the cache.
2457    fn resolve_service_from_cache(
2458        &self,
2459        ty_domain: &str,
2460        fullname: &str,
2461    ) -> Result<ResolvedService> {
2462        let now = current_time_millis();
2463        let mut resolved_service = ResolvedService {
2464            ty_domain: ty_domain.to_string(),
2465            sub_ty_domain: None,
2466            fullname: fullname.to_string(),
2467            host: String::new(),
2468            port: 0,
2469            addresses: HashSet::new(),
2470            txt_properties: TxtProperties::new(),
2471        };
2472
2473        // Be sure setting `subtype` if available even when querying for the parent domain.
2474        if let Some(subtype) = self.cache.get_subtype(fullname) {
2475            trace!(
2476                "ty_domain: {} found subtype {} for instance: {}",
2477                ty_domain,
2478                subtype,
2479                fullname
2480            );
2481            if resolved_service.sub_ty_domain.is_none() {
2482                resolved_service.sub_ty_domain = Some(subtype.to_string());
2483            }
2484        }
2485
2486        // resolve SRV record
2487        if let Some(records) = self.cache.get_srv(fullname) {
2488            if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2489                if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2490                    resolved_service.host = dns_srv.host().to_string();
2491                    resolved_service.port = dns_srv.port();
2492                }
2493            }
2494        }
2495
2496        // resolve TXT record
2497        if let Some(records) = self.cache.get_txt(fullname) {
2498            if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2499                if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2500                    resolved_service.txt_properties = dns_txt.text().into();
2501                }
2502            }
2503        }
2504
2505        // resolve A and AAAA records
2506        if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2507            for answer in records.iter() {
2508                if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2509                    if dns_a.expires_soon(now) {
2510                        trace!(
2511                            "Addr expired or expires soon: {}",
2512                            dns_a.address().to_ip_addr()
2513                        );
2514                    } else {
2515                        resolved_service.addresses.insert(dns_a.address());
2516                    }
2517                }
2518            }
2519        }
2520
2521        Ok(resolved_service)
2522    }
2523
2524    fn handle_poller_events(&mut self, events: &mio::Events) {
2525        for ev in events.iter() {
2526            trace!("event received with key {:?}", ev.token());
2527            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2528                // Drain signals as we will drain commands as well.
2529                self.signal_sock_drain();
2530
2531                if let Err(e) = self.poller.registry().reregister(
2532                    &mut self.signal_sock,
2533                    ev.token(),
2534                    mio::Interest::READABLE,
2535                ) {
2536                    debug!("failed to modify poller for signal socket: {}", e);
2537                }
2538                continue; // Next event.
2539            }
2540
2541            // Read until no more packets available.
2542            while self.handle_read(ev.token().0) {}
2543
2544            // we continue to monitor this socket.
2545            if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2546                // Re-register the IPv4 socket for reading.
2547                if let Some(sock) = self.ipv4_sock.as_mut() {
2548                    if let Err(e) =
2549                        self.poller
2550                            .registry()
2551                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2552                    {
2553                        debug!("modify poller for IPv4 socket: {}", e);
2554                    }
2555                }
2556            } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2557                // Re-register the IPv6 socket for reading.
2558                if let Some(sock) = self.ipv6_sock.as_mut() {
2559                    if let Err(e) =
2560                        self.poller
2561                            .registry()
2562                            .reregister(sock, ev.token(), mio::Interest::READABLE)
2563                    {
2564                        debug!("modify poller for IPv6 socket: {}", e);
2565                    }
2566                }
2567            }
2568        }
2569    }
2570
2571    /// Deal with incoming response packets.  All answers
2572    /// are held in the cache, and listeners are notified.
2573    fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2574        let now = current_time_millis();
2575
2576        // remove records that are expired.
2577        let mut record_predicate = |record: &DnsRecordBox| {
2578            if !record.get_record().is_expired(now) {
2579                return true;
2580            }
2581
2582            debug!("record is expired, removing it from cache.");
2583            if self.cache.remove(record) {
2584                // for PTR records, send event to listeners
2585                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2586                    call_service_listener(
2587                        &self.service_queriers,
2588                        dns_ptr.get_name(),
2589                        ServiceEvent::ServiceRemoved(
2590                            dns_ptr.get_name().to_string(),
2591                            dns_ptr.alias().to_string(),
2592                        ),
2593                    );
2594                }
2595            }
2596            false
2597        };
2598        msg.answers_mut().retain(&mut record_predicate);
2599        msg.authorities_mut().retain(&mut record_predicate);
2600        msg.additionals_mut().retain(&mut record_predicate);
2601
2602        // check possible conflicts and handle them.
2603        self.conflict_handler(&msg, if_index);
2604
2605        // check if the message is for us.
2606        let mut is_for_us = true; // assume it is for us.
2607
2608        // If there are any PTR records in the answers, there should be
2609        // at least one PTR for us. Otherwise, the message is not for us.
2610        // If there are no PTR records at all, assume this message is for us.
2611        for answer in msg.answers() {
2612            if answer.get_type() == RRType::PTR {
2613                if self.service_queriers.contains_key(answer.get_name()) {
2614                    is_for_us = true;
2615                    break; // OK to break: at least one PTR for us.
2616                } else {
2617                    is_for_us = false;
2618                }
2619            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2620                // If there is a hostname querier for this address, then it is for us.
2621                let answer_lowercase = answer.get_name().to_lowercase();
2622                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2623                    is_for_us = true;
2624                    break; // OK to break: at least one hostname for us.
2625                }
2626            }
2627        }
2628
2629        // if we explicitily want to accept unsolicited responses, we should consider all messages as for us.
2630        if self.accept_unsolicited {
2631            is_for_us = true;
2632        }
2633
2634        /// Represents a DNS record change that involves one service instance.
2635        struct InstanceChange {
2636            ty: RRType,   // The type of DNS record for the instance.
2637            name: String, // The name of the record.
2638        }
2639
2640        // Go through all answers to get the new and updated records.
2641        // For new PTR records, send out ServiceFound immediately. For others,
2642        // collect them into `changes`.
2643        //
2644        // Note: we don't try to identify the update instances based on
2645        // each record immediately as the answers are likely related to each
2646        // other.
2647        let mut changes = Vec::new();
2648        let mut timers = Vec::new();
2649        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2650            return;
2651        };
2652        for record in msg.all_records() {
2653            match self
2654                .cache
2655                .add_or_update(my_intf, record, &mut timers, is_for_us)
2656            {
2657                Some((dns_record, true)) => {
2658                    timers.push(dns_record.record.get_record().get_expire_time());
2659                    timers.push(dns_record.record.get_record().get_refresh_time());
2660
2661                    let ty = dns_record.record.get_type();
2662                    let name = dns_record.record.get_name();
2663
2664                    // Only process PTR that does not expire soon (i.e. TTL > 1).
2665                    if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2666                        if self.service_queriers.contains_key(name) {
2667                            timers.push(dns_record.record.get_record().get_refresh_time());
2668                        }
2669
2670                        // send ServiceFound
2671                        if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2672                        {
2673                            debug!("calling listener with service found: {name}");
2674                            call_service_listener(
2675                                &self.service_queriers,
2676                                name,
2677                                ServiceEvent::ServiceFound(
2678                                    name.to_string(),
2679                                    dns_ptr.alias().to_string(),
2680                                ),
2681                            );
2682                            changes.push(InstanceChange {
2683                                ty,
2684                                name: dns_ptr.alias().to_string(),
2685                            });
2686                        }
2687                    } else {
2688                        changes.push(InstanceChange {
2689                            ty,
2690                            name: name.to_string(),
2691                        });
2692                    }
2693                }
2694                Some((dns_record, false)) => {
2695                    timers.push(dns_record.record.get_record().get_expire_time());
2696                    timers.push(dns_record.record.get_record().get_refresh_time());
2697                }
2698                _ => {}
2699            }
2700        }
2701
2702        // Add timers for the new records.
2703        for t in timers {
2704            self.add_timer(t);
2705        }
2706
2707        // Go through remaining changes to see if any hostname resolutions were found or updated.
2708        for change in changes
2709            .iter()
2710            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2711        {
2712            let addr_map = self.cache.get_addresses_for_host(&change.name);
2713            for (name, addresses) in addr_map {
2714                call_hostname_resolution_listener(
2715                    &self.hostname_resolvers,
2716                    &change.name,
2717                    HostnameResolutionEvent::AddressesFound(name, addresses),
2718                )
2719            }
2720        }
2721
2722        // Identify the instances that need to be "resolved".
2723        let mut updated_instances = HashSet::new();
2724        for update in changes {
2725            match update.ty {
2726                RRType::PTR | RRType::SRV | RRType::TXT => {
2727                    updated_instances.insert(update.name);
2728                }
2729                RRType::A | RRType::AAAA => {
2730                    let instances = self.cache.get_instances_on_host(&update.name);
2731                    updated_instances.extend(instances);
2732                }
2733                _ => {}
2734            }
2735        }
2736
2737        self.resolve_updated_instances(&updated_instances);
2738    }
2739
2740    fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2741        let Some(my_intf) = self.my_intfs.get(&if_index) else {
2742            debug!("handle_response: no intf found for index {if_index}");
2743            return;
2744        };
2745
2746        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2747            return;
2748        };
2749
2750        for answer in msg.answers().iter() {
2751            let mut new_records = Vec::new();
2752
2753            let name = answer.get_name();
2754            let Some(probe) = dns_registry.probing.get_mut(name) else {
2755                continue;
2756            };
2757
2758            // check against possible multicast forwarding
2759            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2760                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2761                    if answer_addr.interface_id.index != if_index {
2762                        debug!(
2763                            "conflict handler: answer addr {:?} not in the subnet of intf {}",
2764                            answer_addr, my_intf.name
2765                        );
2766                        continue;
2767                    }
2768                }
2769
2770                // double check if any other address record matches rrdata,
2771                // as there could be multiple addresses for the same name.
2772                let any_match = probe.records.iter().any(|r| {
2773                    r.get_type() == answer.get_type()
2774                        && r.get_class() == answer.get_class()
2775                        && r.rrdata_match(answer.as_ref())
2776                });
2777                if any_match {
2778                    continue; // no conflict for this answer.
2779                }
2780            }
2781
2782            probe.records.retain(|record| {
2783                if record.get_type() == answer.get_type()
2784                    && record.get_class() == answer.get_class()
2785                    && !record.rrdata_match(answer.as_ref())
2786                {
2787                    debug!(
2788                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2789                        record.get_type(),
2790                        record.rdata_print(),
2791                        answer.rdata_print()
2792                    );
2793
2794                    // create a new name for this record
2795                    // then remove the old record in probing.
2796                    let mut new_record = record.clone();
2797                    let new_name = match record.get_type() {
2798                        RRType::A => hostname_change(name),
2799                        RRType::AAAA => hostname_change(name),
2800                        _ => name_change(name),
2801                    };
2802                    new_record.get_record_mut().set_new_name(new_name);
2803                    new_records.push(new_record);
2804                    return false; // old record is dropped from the probe.
2805                }
2806
2807                true
2808            });
2809
2810            // ?????
2811            // if probe.records.is_empty() {
2812            //     dns_registry.probing.remove(name);
2813            // }
2814
2815            // Probing again with the new names.
2816            let create_time = current_time_millis() + fastrand::u64(0..250);
2817
2818            let waiting_services = probe.waiting_services.clone();
2819
2820            for record in new_records {
2821                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2822                    self.timers.push(Reverse(create_time));
2823                }
2824
2825                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2826                dns_registry.name_changes.insert(
2827                    record.get_record().get_original_name().to_string(),
2828                    record.get_name().to_string(),
2829                );
2830
2831                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2832                    Some(p) => p,
2833                    None => {
2834                        let new_probe = dns_registry
2835                            .probing
2836                            .entry(record.get_name().to_string())
2837                            .or_insert_with(|| {
2838                                debug!("conflict handler: new probe of {}", record.get_name());
2839                                Probe::new(create_time)
2840                            });
2841                        self.timers.push(Reverse(new_probe.next_send));
2842                        new_probe
2843                    }
2844                };
2845
2846                debug!(
2847                    "insert record with new name '{}' {} into probe",
2848                    record.get_name(),
2849                    record.get_type()
2850                );
2851                new_probe.insert_record(record);
2852
2853                new_probe.waiting_services.extend(waiting_services.clone());
2854            }
2855        }
2856    }
2857
2858    /// Resolve the updated (including new) instances.
2859    ///
2860    /// Note: it is possible that more than 1 PTR pointing to the same
2861    /// instance. For example, a regular service type PTR and a sub-type
2862    /// service type PTR can both point to the same service instance.
2863    /// This loop automatically handles the sub-type PTRs.
2864    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2865        let mut resolved: HashSet<String> = HashSet::new();
2866        let mut unresolved: HashSet<String> = HashSet::new();
2867        let mut removed_instances = HashMap::new();
2868
2869        let now = current_time_millis();
2870
2871        for (ty_domain, records) in self.cache.all_ptr().iter() {
2872            if !self.service_queriers.contains_key(ty_domain) {
2873                // No need to resolve if not in our queries.
2874                continue;
2875            }
2876
2877            for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2878                let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2879                    continue;
2880                };
2881
2882                let instance = dns_ptr.alias();
2883                if !updated_instances.contains(instance) {
2884                    continue;
2885                }
2886
2887                let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2888                else {
2889                    continue;
2890                };
2891
2892                debug!("resolve_updated_instances: from cache: {instance}");
2893                if resolved_service.is_valid() {
2894                    debug!("call queriers to resolve {instance}");
2895                    resolved.insert(instance.to_string());
2896                    let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2897                    call_service_listener(&self.service_queriers, ty_domain, event);
2898                } else {
2899                    debug!("Resolved service is not valid: {instance}");
2900                    if self.resolved.remove(dns_ptr.alias()) {
2901                        removed_instances
2902                            .entry(ty_domain.to_string())
2903                            .or_insert_with(HashSet::new)
2904                            .insert(instance.to_string());
2905                    }
2906                    unresolved.insert(instance.to_string());
2907                }
2908            }
2909        }
2910
2911        for instance in resolved.drain() {
2912            self.pending_resolves.remove(&instance);
2913            self.resolved.insert(instance);
2914        }
2915
2916        for instance in unresolved.drain() {
2917            self.add_pending_resolve(instance);
2918        }
2919
2920        if !removed_instances.is_empty() {
2921            debug!(
2922                "resolve_updated_instances: removed {}",
2923                &removed_instances.len()
2924            );
2925            self.notify_service_removal(removed_instances);
2926        }
2927    }
2928
2929    /// Handle incoming query packets, figure out whether and what to respond.
2930    fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2931        let sock_opt = if is_ipv4 {
2932            &self.ipv4_sock
2933        } else {
2934            &self.ipv6_sock
2935        };
2936        let Some(sock) = sock_opt.as_ref() else {
2937            debug!("handle_query: socket not available for intf {}", if_index);
2938            return;
2939        };
2940        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2941
2942        // Special meta-query "_services._dns-sd._udp.<Domain>".
2943        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2944        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2945
2946        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2947            debug!("missing dns registry for intf {}", if_index);
2948            return;
2949        };
2950
2951        let Some(intf) = self.my_intfs.get(&if_index) else {
2952            debug!("handle_query: no intf found for index {if_index}");
2953            return;
2954        };
2955
2956        for question in msg.questions().iter() {
2957            let qtype = question.entry_type();
2958
2959            if qtype == RRType::PTR {
2960                for service in self.my_services.values() {
2961                    if service.get_status(if_index) != ServiceStatus::Announced {
2962                        continue;
2963                    }
2964
2965                    if question.entry_name() == service.get_type()
2966                        || service
2967                            .get_subtype()
2968                            .as_ref()
2969                            .is_some_and(|v| v == question.entry_name())
2970                    {
2971                        add_answer_with_additionals(
2972                            &mut out,
2973                            &msg,
2974                            service,
2975                            intf,
2976                            dns_registry,
2977                            is_ipv4,
2978                        );
2979                    } else if question.entry_name() == META_QUERY {
2980                        let ptr_added = out.add_answer(
2981                            &msg,
2982                            DnsPointer::new(
2983                                question.entry_name(),
2984                                RRType::PTR,
2985                                CLASS_IN,
2986                                service.get_other_ttl(),
2987                                service.get_type().to_string(),
2988                            ),
2989                        );
2990                        if !ptr_added {
2991                            trace!("answer was not added for meta-query {:?}", &question);
2992                        }
2993                    }
2994                }
2995            } else {
2996                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2997                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2998                    let probe_name = question.entry_name();
2999
3000                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
3001                        let now = current_time_millis();
3002
3003                        // Only do tiebreaking if probe already started.
3004                        // This check also helps avoid redo tiebreaking if start time
3005                        // was postponed.
3006                        if probe.start_time < now {
3007                            let incoming_records: Vec<_> = msg
3008                                .authorities()
3009                                .iter()
3010                                .filter(|r| r.get_name() == probe_name)
3011                                .collect();
3012
3013                            probe.tiebreaking(&incoming_records, now, probe_name);
3014                        }
3015                    }
3016                }
3017
3018                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3019                    for service in self.my_services.values() {
3020                        if service.get_status(if_index) != ServiceStatus::Announced {
3021                            continue;
3022                        }
3023
3024                        let service_hostname =
3025                            match dns_registry.name_changes.get(service.get_hostname()) {
3026                                Some(new_name) => new_name,
3027                                None => service.get_hostname(),
3028                            };
3029
3030                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3031                            let intf_addrs = if is_ipv4 {
3032                                service.get_addrs_on_my_intf_v4(intf)
3033                            } else {
3034                                service.get_addrs_on_my_intf_v6(intf)
3035                            };
3036                            if intf_addrs.is_empty()
3037                                && (qtype == RRType::A || qtype == RRType::AAAA)
3038                            {
3039                                let t = match qtype {
3040                                    RRType::A => "TYPE_A",
3041                                    RRType::AAAA => "TYPE_AAAA",
3042                                    _ => "invalid_type",
3043                                };
3044                                trace!(
3045                                    "Cannot find valid addrs for {} response on intf {:?}",
3046                                    t,
3047                                    &intf
3048                                );
3049                                return;
3050                            }
3051                            for address in intf_addrs {
3052                                out.add_answer(
3053                                    &msg,
3054                                    DnsAddress::new(
3055                                        service_hostname,
3056                                        ip_address_rr_type(&address),
3057                                        CLASS_IN | CLASS_CACHE_FLUSH,
3058                                        service.get_host_ttl(),
3059                                        address,
3060                                        intf.into(),
3061                                    ),
3062                                );
3063                            }
3064                        }
3065                    }
3066                }
3067
3068                let query_name = question.entry_name().to_lowercase();
3069                let service_opt = self
3070                    .my_services
3071                    .iter()
3072                    .find(|(k, _v)| {
3073                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
3074                            Some(new_name) => new_name,
3075                            None => k,
3076                        };
3077                        service_name == &query_name
3078                    })
3079                    .map(|(_, v)| v);
3080
3081                let Some(service) = service_opt else {
3082                    continue;
3083                };
3084
3085                if service.get_status(if_index) != ServiceStatus::Announced {
3086                    continue;
3087                }
3088
3089                let intf_addrs = if is_ipv4 {
3090                    service.get_addrs_on_my_intf_v4(intf)
3091                } else {
3092                    service.get_addrs_on_my_intf_v6(intf)
3093                };
3094                if intf_addrs.is_empty() {
3095                    debug!(
3096                        "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3097                        &intf
3098                    );
3099                    continue;
3100                }
3101
3102                add_answer_of_service(
3103                    &mut out,
3104                    &msg,
3105                    question.entry_name(),
3106                    service,
3107                    qtype,
3108                    intf_addrs,
3109                );
3110            }
3111        }
3112
3113        if out.answers_count() > 0 {
3114            debug!("sending response on intf {}", &intf.name);
3115            out.set_id(msg.id());
3116            if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3117                send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
3118            {
3119                let invalid_intf_addr = HashSet::from([intf_addr]);
3120                let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3121            }
3122
3123            let if_name = intf.name.clone();
3124
3125            self.increase_counter(Counter::Respond, 1);
3126            self.notify_monitors(DaemonEvent::Respond(if_name));
3127        }
3128
3129        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3130    }
3131
3132    /// Increases the value of `counter` by `count`.
3133    fn increase_counter(&mut self, counter: Counter, count: i64) {
3134        let key = counter.to_string();
3135        match self.counters.get_mut(&key) {
3136            Some(v) => *v += count,
3137            None => {
3138                self.counters.insert(key, count);
3139            }
3140        }
3141    }
3142
3143    /// Sets the value of `counter` to `count`.
3144    fn set_counter(&mut self, counter: Counter, count: i64) {
3145        let key = counter.to_string();
3146        self.counters.insert(key, count);
3147    }
3148
3149    fn signal_sock_drain(&self) {
3150        let mut signal_buf = [0; 1024];
3151
3152        // This recv is non-blocking as the socket is non-blocking.
3153        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3154            trace!(
3155                "signal socket recvd: {}",
3156                String::from_utf8_lossy(&signal_buf[0..sz])
3157            );
3158        }
3159    }
3160
3161    fn add_retransmission(&mut self, next_time: u64, command: Command) {
3162        self.retransmissions.push(ReRun { next_time, command });
3163        self.add_timer(next_time);
3164    }
3165
3166    /// Sends service removal event to listeners for expired service records.
3167    /// `expired`: map of service type domain to set of instance names.
3168    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3169        for (ty_domain, sender) in self.service_queriers.iter() {
3170            if let Some(instances) = expired.get(ty_domain) {
3171                for instance_name in instances {
3172                    let event = ServiceEvent::ServiceRemoved(
3173                        ty_domain.to_string(),
3174                        instance_name.to_string(),
3175                    );
3176                    match sender.send(event) {
3177                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3178                        Err(e) => debug!("Failed to send event: {}", e),
3179                    }
3180                }
3181            }
3182        }
3183    }
3184
3185    /// The entry point that executes all commands received by the daemon.
3186    ///
3187    /// `repeating`: whether this is a retransmission.
3188    fn exec_command(&mut self, command: Command, repeating: bool) {
3189        trace!("exec_command: {:?} repeating: {}", &command, repeating);
3190        match command {
3191            Command::Browse(ty, next_delay, cache_only, listener) => {
3192                self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3193            }
3194
3195            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3196                self.exec_command_resolve_hostname(
3197                    repeating, hostname, next_delay, listener, timeout,
3198                );
3199            }
3200
3201            Command::Register(service_info) => {
3202                self.register_service(*service_info);
3203                self.increase_counter(Counter::Register, 1);
3204            }
3205
3206            Command::RegisterResend(fullname, intf) => {
3207                trace!("register-resend service: {fullname} on {}", &intf);
3208                if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3209                    self.exec_command_register_resend(fullname, intf)
3210                {
3211                    let invalid_intf_addr = HashSet::from([intf_addr]);
3212                    let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3213                }
3214            }
3215
3216            Command::Unregister(fullname, resp_s) => {
3217                trace!("unregister service {} repeat {}", &fullname, &repeating);
3218                self.exec_command_unregister(repeating, fullname, resp_s);
3219            }
3220
3221            Command::UnregisterResend(packet, if_index, is_ipv4) => {
3222                self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3223            }
3224
3225            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3226
3227            Command::StopResolveHostname(hostname) => {
3228                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3229            }
3230
3231            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3232
3233            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3234
3235            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3236                Ok(()) => trace!("Sent status to the client"),
3237                Err(e) => debug!("Failed to send status: {}", e),
3238            },
3239
3240            Command::Monitor(resp_s) => {
3241                self.monitors.push(resp_s);
3242            }
3243
3244            Command::SetOption(daemon_opt) => {
3245                self.process_set_option(daemon_opt);
3246            }
3247
3248            Command::GetOption(resp_s) => {
3249                let val = DaemonOptionVal {
3250                    _service_name_len_max: self.service_name_len_max,
3251                    ip_check_interval: self.ip_check_interval,
3252                };
3253                if let Err(e) = resp_s.send(val) {
3254                    debug!("Failed to send options: {}", e);
3255                }
3256            }
3257
3258            Command::Verify(instance_fullname, timeout) => {
3259                self.exec_command_verify(instance_fullname, timeout, repeating);
3260            }
3261
3262            Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3263                for intf_addr in invalid_intf_addrs {
3264                    self.del_interface_addr(&intf_addr);
3265                }
3266
3267                self.check_ip_changes();
3268            }
3269
3270            _ => {
3271                debug!("unexpected command: {:?}", &command);
3272            }
3273        }
3274    }
3275
3276    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3277        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3278        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3279        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3280        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3281        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3282        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3283        self.set_counter(Counter::Timer, self.timers.len() as i64);
3284
3285        let dns_registry_probe_count: usize = self
3286            .dns_registry_map
3287            .values()
3288            .map(|r| r.probing.len())
3289            .sum();
3290        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3291
3292        let dns_registry_active_count: usize = self
3293            .dns_registry_map
3294            .values()
3295            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3296            .sum();
3297        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3298
3299        let dns_registry_timer_count: usize = self
3300            .dns_registry_map
3301            .values()
3302            .map(|r| r.new_timers.len())
3303            .sum();
3304        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3305
3306        let dns_registry_name_change_count: usize = self
3307            .dns_registry_map
3308            .values()
3309            .map(|r| r.name_changes.len())
3310            .sum();
3311        self.set_counter(
3312            Counter::DnsRegistryNameChange,
3313            dns_registry_name_change_count as i64,
3314        );
3315
3316        // Send the metrics to the client.
3317        if let Err(e) = resp_s.send(self.counters.clone()) {
3318            debug!("Failed to send metrics: {}", e);
3319        }
3320    }
3321
3322    fn exec_command_browse(
3323        &mut self,
3324        repeating: bool,
3325        ty: String,
3326        next_delay: u32,
3327        cache_only: bool,
3328        listener: Sender<ServiceEvent>,
3329    ) {
3330        let pretty_addrs: Vec<String> = self
3331            .my_intfs
3332            .iter()
3333            .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3334            .collect();
3335
3336        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3337            "{ty} on {} interfaces [{}]",
3338            pretty_addrs.len(),
3339            pretty_addrs.join(", ")
3340        ))) {
3341            debug!(
3342                "Failed to send SearchStarted({})(repeating:{}): {}",
3343                &ty, repeating, e
3344            );
3345            return;
3346        }
3347
3348        let now = current_time_millis();
3349        if !repeating {
3350            // Binds a `listener` to querying mDNS domain type `ty`.
3351            //
3352            // If there is already a `listener`, it will be updated, i.e. overwritten.
3353            self.service_queriers.insert(ty.clone(), listener.clone());
3354
3355            // if we already have the records in our cache, just send them
3356            self.query_cache_for_service(&ty, &listener, now);
3357        }
3358
3359        if cache_only {
3360            // If cache_only is true, we do not send a query.
3361            match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3362                Ok(()) => debug!("SearchStopped sent for {}", &ty),
3363                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3364            }
3365            return;
3366        }
3367
3368        self.send_query(&ty, RRType::PTR);
3369        self.increase_counter(Counter::Browse, 1);
3370
3371        let next_time = now + (next_delay * 1000) as u64;
3372        let max_delay = 60 * 60;
3373        let delay = cmp::min(next_delay * 2, max_delay);
3374        self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3375    }
3376
3377    fn exec_command_resolve_hostname(
3378        &mut self,
3379        repeating: bool,
3380        hostname: String,
3381        next_delay: u32,
3382        listener: Sender<HostnameResolutionEvent>,
3383        timeout: Option<u64>,
3384    ) {
3385        let addr_list: Vec<_> = self.my_intfs.iter().collect();
3386        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3387            "{} on addrs {:?}",
3388            &hostname, &addr_list
3389        ))) {
3390            debug!(
3391                "Failed to send ResolveStarted({})(repeating:{}): {}",
3392                &hostname, repeating, e
3393            );
3394            return;
3395        }
3396        if !repeating {
3397            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3398            // if we already have the records in our cache, just send them
3399            self.query_cache_for_hostname(&hostname, listener.clone());
3400        }
3401
3402        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3403        self.increase_counter(Counter::ResolveHostname, 1);
3404
3405        let now = current_time_millis();
3406        let next_time = now + u64::from(next_delay) * 1000;
3407        let max_delay = 60 * 60;
3408        let delay = cmp::min(next_delay * 2, max_delay);
3409
3410        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
3411        if self
3412            .hostname_resolvers
3413            .get(&hostname)
3414            .and_then(|(_sender, timeout)| *timeout)
3415            .map(|timeout| next_time < timeout)
3416            .unwrap_or(true)
3417        {
3418            self.add_retransmission(
3419                next_time,
3420                Command::ResolveHostname(hostname, delay, listener, None),
3421            );
3422        }
3423    }
3424
3425    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3426        let pending_query = self.query_unresolved(&instance);
3427        let max_try = 3;
3428        if pending_query && try_count < max_try {
3429            // Note that if the current try already succeeds, the next retransmission
3430            // will be no-op as the cache has been updated.
3431            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3432            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3433        }
3434    }
3435
3436    fn exec_command_unregister(
3437        &mut self,
3438        repeating: bool,
3439        fullname: String,
3440        resp_s: Sender<UnregisterStatus>,
3441    ) {
3442        let response = match self.my_services.remove_entry(&fullname) {
3443            None => {
3444                debug!("unregister: cannot find such service {}", &fullname);
3445                UnregisterStatus::NotFound
3446            }
3447            Some((_k, info)) => {
3448                let mut timers = Vec::new();
3449
3450                for (if_index, intf) in self.my_intfs.iter() {
3451                    if let Some(sock) = self.ipv4_sock.as_ref() {
3452                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3453                        // repeat for one time just in case some peers miss the message
3454                        if !repeating && !packet.is_empty() {
3455                            let next_time = current_time_millis() + 120;
3456                            self.retransmissions.push(ReRun {
3457                                next_time,
3458                                command: Command::UnregisterResend(packet, *if_index, true),
3459                            });
3460                            timers.push(next_time);
3461                        }
3462                    }
3463
3464                    // ipv6
3465                    if let Some(sock) = self.ipv6_sock.as_ref() {
3466                        let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3467                        if !repeating && !packet.is_empty() {
3468                            let next_time = current_time_millis() + 120;
3469                            self.retransmissions.push(ReRun {
3470                                next_time,
3471                                command: Command::UnregisterResend(packet, *if_index, false),
3472                            });
3473                            timers.push(next_time);
3474                        }
3475                    }
3476                }
3477
3478                for t in timers {
3479                    self.add_timer(t);
3480                }
3481
3482                self.increase_counter(Counter::Unregister, 1);
3483                UnregisterStatus::OK
3484            }
3485        };
3486        if let Err(e) = resp_s.send(response) {
3487            debug!("unregister: failed to send response: {}", e);
3488        }
3489    }
3490
3491    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3492        let Some(intf) = self.my_intfs.get(&if_index) else {
3493            return;
3494        };
3495        let sock_opt = if is_ipv4 {
3496            &self.ipv4_sock
3497        } else {
3498            &self.ipv6_sock
3499        };
3500        let Some(sock) = sock_opt else {
3501            return;
3502        };
3503
3504        let if_addr = if is_ipv4 {
3505            match intf.next_ifaddr_v4() {
3506                Some(addr) => addr,
3507                None => return,
3508            }
3509        } else {
3510            match intf.next_ifaddr_v6() {
3511                Some(addr) => addr,
3512                None => return,
3513            }
3514        };
3515
3516        debug!("UnregisterResend from {:?}", if_addr);
3517        multicast_on_intf(
3518            &packet[..],
3519            &intf.name,
3520            intf.index,
3521            if_addr,
3522            &sock.pktinfo,
3523            self.port,
3524        );
3525
3526        self.increase_counter(Counter::UnregisterResend, 1);
3527    }
3528
3529    fn exec_command_stop_browse(&mut self, ty_domain: String) {
3530        match self.service_queriers.remove_entry(&ty_domain) {
3531            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3532            Some((ty, sender)) => {
3533                // Remove pending browse commands in the reruns.
3534                trace!("StopBrowse: removed queryer for {}", &ty);
3535                let mut i = 0;
3536                while i < self.retransmissions.len() {
3537                    if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3538                        if t == &ty {
3539                            self.retransmissions.remove(i);
3540                            trace!("StopBrowse: removed retransmission for {}", &ty);
3541                            continue;
3542                        }
3543                    }
3544                    i += 1;
3545                }
3546
3547                // Remove cache entries.
3548                self.cache.remove_service_type(&ty_domain);
3549
3550                // Notify the client.
3551                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3552                    Ok(()) => trace!("Sent SearchStopped to the listener"),
3553                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
3554                }
3555            }
3556        }
3557    }
3558
3559    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3560        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3561            // Remove pending resolve commands in the reruns.
3562            trace!("StopResolve: removed queryer for {}", &host);
3563            let mut i = 0;
3564            while i < self.retransmissions.len() {
3565                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3566                    if t == &host {
3567                        self.retransmissions.remove(i);
3568                        trace!("StopResolve: removed retransmission for {}", &host);
3569                        continue;
3570                    }
3571                }
3572                i += 1;
3573            }
3574
3575            // Notify the client.
3576            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3577                Ok(()) => trace!("Sent SearchStopped to the listener"),
3578                Err(e) => debug!("Failed to send SearchStopped: {}", e),
3579            }
3580        }
3581    }
3582
3583    fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3584        let Some(info) = self.my_services.get_mut(&fullname) else {
3585            trace!("announce: cannot find such service {}", &fullname);
3586            return Ok(());
3587        };
3588
3589        let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3590            return Ok(());
3591        };
3592
3593        let Some(intf) = self.my_intfs.get(&if_index) else {
3594            return Ok(());
3595        };
3596
3597        let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3598            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3599        } else {
3600            false
3601        };
3602        let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3603            announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3604        } else {
3605            false
3606        };
3607
3608        if announced_v4 || announced_v6 {
3609            let mut hostname = info.get_hostname();
3610            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3611                hostname = new_name;
3612            }
3613            let service_name = match dns_registry.name_changes.get(&fullname) {
3614                Some(new_name) => new_name.to_string(),
3615                None => fullname,
3616            };
3617
3618            debug!("resend: announce service {service_name} on {}", intf.name);
3619
3620            notify_monitors(
3621                &mut self.monitors,
3622                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3623            );
3624            info.set_status(if_index, ServiceStatus::Announced);
3625        } else {
3626            debug!("register-resend should not fail");
3627        }
3628
3629        self.increase_counter(Counter::RegisterResend, 1);
3630        Ok(())
3631    }
3632
3633    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3634        /*
3635        RFC 6762 section 10.4:
3636        ...
3637        When the cache receives this hint that it should reconfirm some
3638        record, it MUST issue two or more queries for the resource record in
3639        dispute.  If no response is received within ten seconds, then, even
3640        though its TTL may indicate that it is not yet due to expire, that
3641        record SHOULD be promptly flushed from the cache.
3642        */
3643        let now = current_time_millis();
3644        let expire_at = if repeating {
3645            None
3646        } else {
3647            Some(now + timeout.as_millis() as u64)
3648        };
3649
3650        // send query for the resource records.
3651        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3652
3653        if !record_vec.is_empty() {
3654            let query_vec: Vec<(&str, RRType)> = record_vec
3655                .iter()
3656                .map(|(record, rr_type)| (record.as_str(), *rr_type))
3657                .collect();
3658            self.send_query_vec(&query_vec);
3659
3660            if let Some(new_expire) = expire_at {
3661                self.add_timer(new_expire); // ensure a check for the new expire time.
3662
3663                // schedule a resend 1 second later
3664                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3665            }
3666        }
3667    }
3668
3669    /// Refresh cached service records with active queriers
3670    fn refresh_active_services(&mut self) {
3671        let mut query_ptr_count = 0;
3672        let mut query_srv_count = 0;
3673        let mut new_timers = HashSet::new();
3674        let mut query_addr_count = 0;
3675
3676        for (ty_domain, _sender) in self.service_queriers.iter() {
3677            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3678            if !refreshed_timers.is_empty() {
3679                trace!("sending refresh query for PTR: {}", ty_domain);
3680                self.send_query(ty_domain, RRType::PTR);
3681                query_ptr_count += 1;
3682                new_timers.extend(refreshed_timers);
3683            }
3684
3685            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3686            for (instance, types) in instances {
3687                trace!("sending refresh query for: {}", &instance);
3688                let query_vec = types
3689                    .into_iter()
3690                    .map(|ty| (instance.as_str(), ty))
3691                    .collect::<Vec<_>>();
3692                self.send_query_vec(&query_vec);
3693                query_srv_count += 1;
3694            }
3695            new_timers.extend(timers);
3696            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3697            for hostname in hostnames.iter() {
3698                trace!("sending refresh queries for A and AAAA:  {}", hostname);
3699                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3700                query_addr_count += 2;
3701            }
3702            new_timers.extend(timers);
3703        }
3704
3705        for timer in new_timers {
3706            self.add_timer(timer);
3707        }
3708
3709        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3710        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3711        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3712    }
3713}
3714
3715/// Adds one or more answers of a service for incoming msg and RR entry name.
3716fn add_answer_of_service(
3717    out: &mut DnsOutgoing,
3718    msg: &DnsIncoming,
3719    entry_name: &str,
3720    service: &ServiceInfo,
3721    qtype: RRType,
3722    intf_addrs: Vec<IpAddr>,
3723) {
3724    if qtype == RRType::SRV || qtype == RRType::ANY {
3725        out.add_answer(
3726            msg,
3727            DnsSrv::new(
3728                entry_name,
3729                CLASS_IN | CLASS_CACHE_FLUSH,
3730                service.get_host_ttl(),
3731                service.get_priority(),
3732                service.get_weight(),
3733                service.get_port(),
3734                service.get_hostname().to_string(),
3735            ),
3736        );
3737    }
3738
3739    if qtype == RRType::TXT || qtype == RRType::ANY {
3740        out.add_answer(
3741            msg,
3742            DnsTxt::new(
3743                entry_name,
3744                CLASS_IN | CLASS_CACHE_FLUSH,
3745                service.get_other_ttl(),
3746                service.generate_txt(),
3747            ),
3748        );
3749    }
3750
3751    if qtype == RRType::SRV {
3752        for address in intf_addrs {
3753            out.add_additional_answer(DnsAddress::new(
3754                service.get_hostname(),
3755                ip_address_rr_type(&address),
3756                CLASS_IN | CLASS_CACHE_FLUSH,
3757                service.get_host_ttl(),
3758                address,
3759                InterfaceId::default(),
3760            ));
3761        }
3762    }
3763}
3764
3765/// All possible events sent to the client from the daemon
3766/// regarding service discovery.
3767#[derive(Clone, Debug)]
3768#[non_exhaustive]
3769pub enum ServiceEvent {
3770    /// Started searching for a service type.
3771    SearchStarted(String),
3772
3773    /// Found a specific (service_type, fullname).
3774    ServiceFound(String, String),
3775
3776    /// Resolved a service instance in a ResolvedService struct.
3777    ServiceResolved(Box<ResolvedService>),
3778
3779    /// A service instance (service_type, fullname) was removed.
3780    ServiceRemoved(String, String),
3781
3782    /// Stopped searching for a service type.
3783    SearchStopped(String),
3784}
3785
3786/// All possible events sent to the client from the daemon
3787/// regarding host resolution.
3788#[derive(Clone, Debug)]
3789#[non_exhaustive]
3790pub enum HostnameResolutionEvent {
3791    /// Started searching for the ip address of a hostname.
3792    SearchStarted(String),
3793    /// One or more addresses for a hostname has been found.
3794    AddressesFound(String, HashSet<ScopedIp>),
3795    /// One or more addresses for a hostname has been removed.
3796    AddressesRemoved(String, HashSet<ScopedIp>),
3797    /// The search for the ip address of a hostname has timed out.
3798    SearchTimeout(String),
3799    /// Stopped searching for the ip address of a hostname.
3800    SearchStopped(String),
3801}
3802
3803/// Some notable events from the daemon besides [`ServiceEvent`].
3804/// These events are expected to happen infrequently.
3805#[derive(Clone, Debug)]
3806#[non_exhaustive]
3807pub enum DaemonEvent {
3808    /// Daemon unsolicitly announced a service from an interface.
3809    Announce(String, String),
3810
3811    /// Daemon encountered an error.
3812    Error(Error),
3813
3814    /// Daemon detected a new IP address from the host.
3815    IpAdd(IpAddr),
3816
3817    /// Daemon detected a IP address removed from the host.
3818    IpDel(IpAddr),
3819
3820    /// Daemon resolved a name conflict by changing one of its names.
3821    /// see [DnsNameChange] for more details.
3822    NameChange(DnsNameChange),
3823
3824    /// Send out a multicast response via an interface.
3825    Respond(String),
3826}
3827
3828/// Represents a name change due to a name conflict resolution.
3829/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3830#[derive(Clone, Debug)]
3831pub struct DnsNameChange {
3832    /// The original name set in `ServiceInfo` by the user.
3833    pub original: String,
3834
3835    /// A new name is created by appending a suffix after the original name.
3836    ///
3837    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3838    /// - for a host name, the suffix is `-N`, where N starts at 2.
3839    ///
3840    /// For example:
3841    ///
3842    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3843    /// - Host name `foo.local.` becomes `foo-2.local.`
3844    pub new_name: String,
3845
3846    /// The resource record type
3847    pub rr_type: RRType,
3848
3849    /// The interface where the name conflict and its change happened.
3850    pub intf_name: String,
3851}
3852
3853/// Commands supported by the daemon
3854#[derive(Debug)]
3855enum Command {
3856    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3857    Browse(String, u32, bool, Sender<ServiceEvent>),
3858
3859    /// Resolve a hostname to IP addresses.
3860    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3861
3862    /// Register a service
3863    Register(Box<ServiceInfo>),
3864
3865    /// Unregister a service
3866    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3867
3868    /// Announce again a service to local network
3869    RegisterResend(String, u32), // (fullname)
3870
3871    /// Resend unregister packet.
3872    UnregisterResend(Vec<u8>, u32, bool), // (packet content, if_index, is_ipv4)
3873
3874    /// Stop browsing a service type
3875    StopBrowse(String), // (ty_domain)
3876
3877    /// Stop resolving a hostname
3878    StopResolveHostname(String), // (hostname)
3879
3880    /// Send query to resolve a service instance.
3881    /// This is used when a PTR record exists but SRV & TXT records are missing.
3882    Resolve(String, u16), // (service_instance_fullname, try_count)
3883
3884    /// Read the current values of the counters
3885    GetMetrics(Sender<Metrics>),
3886
3887    /// Get the current status of the daemon.
3888    GetStatus(Sender<DaemonStatus>),
3889
3890    /// Monitor noticeable events in the daemon.
3891    Monitor(Sender<DaemonEvent>),
3892
3893    SetOption(DaemonOption),
3894
3895    GetOption(Sender<DaemonOptionVal>),
3896
3897    /// Proactively confirm a DNS resource record.
3898    ///
3899    /// The intention is to check if a service name or IP address still valid
3900    /// before its TTL expires.
3901    Verify(String, Duration),
3902
3903    /// Invalidate some interface addresses.
3904    InvalidIntfAddrs(HashSet<Interface>),
3905
3906    Exit(Sender<DaemonStatus>),
3907}
3908
3909impl fmt::Display for Command {
3910    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3911        match self {
3912            Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3913            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3914            Self::Exit(_) => write!(f, "Command Exit"),
3915            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3916            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3917            Self::Monitor(_) => write!(f, "Command Monitor"),
3918            Self::Register(_) => write!(f, "Command Register"),
3919            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3920            Self::SetOption(_) => write!(f, "Command SetOption"),
3921            Self::GetOption(_) => write!(f, "Command GetOption"),
3922            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3923            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3924            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3925            Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3926            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3927            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3928            Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
3929        }
3930    }
3931}
3932
3933struct DaemonOptionVal {
3934    _service_name_len_max: u8,
3935    ip_check_interval: u64,
3936}
3937
3938#[derive(Debug)]
3939enum DaemonOption {
3940    ServiceNameLenMax(u8),
3941    IpCheckInterval(u64),
3942    EnableInterface(Vec<IfKind>),
3943    DisableInterface(Vec<IfKind>),
3944    MulticastLoopV4(bool),
3945    MulticastLoopV6(bool),
3946    AcceptUnsolicited(bool),
3947    IncludeAppleP2P(bool),
3948    #[cfg(test)]
3949    TestDownInterface(String),
3950    #[cfg(test)]
3951    TestUpInterface(String),
3952}
3953
3954/// The length of Service Domain name supported in this lib.
3955const DOMAIN_LEN: usize = "._tcp.local.".len();
3956
3957/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3958fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3959    if ty_domain.len() <= DOMAIN_LEN + 1 {
3960        // service name cannot be empty or only '_'.
3961        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3962    }
3963
3964    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3965    if service_name_len > limit as usize {
3966        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3967    }
3968    Ok(())
3969}
3970
3971/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3972fn check_domain_suffix(name: &str) -> Result<()> {
3973    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3974        return Err(e_fmt!(
3975            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3976            name
3977        ));
3978    }
3979
3980    Ok(())
3981}
3982
3983/// Validate the service name in a fully qualified name.
3984///
3985/// A Full Name = <Instance>.<Service>.<Domain>
3986/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3987///
3988/// Note: this function does not check for the length of the service name.
3989/// Instead, `register_service` method will check the length.
3990fn check_service_name(fullname: &str) -> Result<()> {
3991    check_domain_suffix(fullname)?;
3992
3993    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3994    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3995
3996    if &name[0..1] != "_" {
3997        return Err(e_fmt!("Service name must start with '_'"));
3998    }
3999
4000    let name = &name[1..];
4001
4002    if name.contains("--") {
4003        return Err(e_fmt!("Service name must not contain '--'"));
4004    }
4005
4006    if name.starts_with('-') || name.ends_with('-') {
4007        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4008    }
4009
4010    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4011    if ascii_count < 1 {
4012        return Err(e_fmt!(
4013            "Service name must contain at least one letter (eg: 'A-Za-z')"
4014        ));
4015    }
4016
4017    Ok(())
4018}
4019
4020/// Validate a hostname.
4021fn check_hostname(hostname: &str) -> Result<()> {
4022    if !hostname.ends_with(".local.") {
4023        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4024    }
4025
4026    if hostname == ".local." {
4027        return Err(e_fmt!(
4028            "The part of the hostname before '.local.' cannot be empty"
4029        ));
4030    }
4031
4032    if hostname.len() > 255 {
4033        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4034    }
4035
4036    Ok(())
4037}
4038
4039fn call_service_listener(
4040    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4041    ty_domain: &str,
4042    event: ServiceEvent,
4043) {
4044    if let Some(listener) = listeners_map.get(ty_domain) {
4045        match listener.send(event) {
4046            Ok(()) => trace!("Sent event to listener successfully"),
4047            Err(e) => debug!("Failed to send event: {}", e),
4048        }
4049    }
4050}
4051
4052fn call_hostname_resolution_listener(
4053    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4054    hostname: &str,
4055    event: HostnameResolutionEvent,
4056) {
4057    let hostname_lower = hostname.to_lowercase();
4058    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4059        match listener.send(event) {
4060            Ok(()) => trace!("Sent event to listener successfully"),
4061            Err(e) => debug!("Failed to send event: {}", e),
4062        }
4063    }
4064}
4065
4066/// Returns valid network interfaces in the host system.
4067/// Operational down interfaces are excluded.
4068/// Loopback interfaces are excluded if `with_loopback` is false.
4069fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4070    my_ip_interfaces_inner(with_loopback, false)
4071}
4072
4073fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4074    if_addrs::get_if_addrs()
4075        .unwrap_or_default()
4076        .into_iter()
4077        .filter(|i| {
4078            i.is_oper_up()
4079                && !i.is_p2p()
4080                && (!i.is_loopback() || with_loopback)
4081                && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4082        })
4083        .collect()
4084}
4085
4086/// Checks if the interface name indicates it's an Apple peer-to-peer interface,
4087/// which should be ignored by default.
4088fn is_apple_p2p_by_name(name: &str) -> bool {
4089    let p2p_prefixes = ["awdl", "llw"];
4090    p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4091}
4092
4093/// Send an outgoing mDNS query or response, and returns the packet bytes.
4094/// Returns empty vec if no valid interface address is found.
4095fn send_dns_outgoing(
4096    out: &DnsOutgoing,
4097    my_intf: &MyIntf,
4098    sock: &PktInfoUdpSocket,
4099    port: u16,
4100) -> MyResult<Vec<Vec<u8>>> {
4101    let if_name = &my_intf.name;
4102
4103    let if_addr = if sock.domain() == Domain::IPV4 {
4104        match my_intf.next_ifaddr_v4() {
4105            Some(addr) => addr,
4106            None => return Ok(vec![]),
4107        }
4108    } else {
4109        match my_intf.next_ifaddr_v6() {
4110            Some(addr) => addr,
4111            None => return Ok(vec![]),
4112        }
4113    };
4114
4115    send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock, port)
4116}
4117
4118/// Send an outgoing mDNS query or response, and returns the packet bytes.
4119fn send_dns_outgoing_impl(
4120    out: &DnsOutgoing,
4121    if_name: &str,
4122    if_index: u32,
4123    if_addr: &IfAddr,
4124    sock: &PktInfoUdpSocket,
4125    port: u16,
4126) -> MyResult<Vec<Vec<u8>>> {
4127    let qtype = if out.is_query() {
4128        "query"
4129    } else {
4130        if out.answers_count() == 0 && out.additionals().is_empty() {
4131            return Ok(vec![]); // no need to send empty response
4132        }
4133        "response"
4134    };
4135    trace!(
4136        "send {}: {} questions {} answers {} authorities {} additional",
4137        qtype,
4138        out.questions().len(),
4139        out.answers_count(),
4140        out.authorities().len(),
4141        out.additionals().len()
4142    );
4143
4144    match if_addr.ip() {
4145        IpAddr::V4(ipv4) => {
4146            if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4147                debug!(
4148                    "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4149                    ipv4, e
4150                );
4151                // cannot send without a valid interface
4152                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4153                    let intf_addr = Interface {
4154                        name: if_name.to_string(),
4155                        addr: if_addr.clone(),
4156                        index: Some(if_index),
4157                        oper_status: if_addrs::IfOperStatus::Down,
4158                        is_p2p: false,
4159                        #[cfg(windows)]
4160                        adapter_name: String::new(),
4161                    };
4162                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4163                }
4164                return Ok(vec![]); // non-fatal other failure
4165            }
4166        }
4167        IpAddr::V6(ipv6) => {
4168            if let Err(e) = sock.set_multicast_if_v6(if_index) {
4169                debug!(
4170                    "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4171                    ipv6, e
4172                );
4173                // cannot send without a valid interface
4174                if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4175                    let intf_addr = Interface {
4176                        name: if_name.to_string(),
4177                        addr: if_addr.clone(),
4178                        index: Some(if_index),
4179                        oper_status: if_addrs::IfOperStatus::Down,
4180                        is_p2p: false,
4181                        #[cfg(windows)]
4182                        adapter_name: String::new(),
4183                    };
4184                    return Err(InternalError::IntfAddrInvalid(intf_addr));
4185                }
4186                return Ok(vec![]); // non-fatal other failure
4187            }
4188        }
4189    }
4190
4191    let packet_list = out.to_data_on_wire();
4192    for packet in packet_list.iter() {
4193        multicast_on_intf(packet, if_name, if_index, if_addr, sock, port);
4194    }
4195    Ok(packet_list)
4196}
4197
4198/// Sends a multicast packet, and returns the packet bytes.
4199fn multicast_on_intf(
4200    packet: &[u8],
4201    if_name: &str,
4202    if_index: u32,
4203    if_addr: &IfAddr,
4204    socket: &PktInfoUdpSocket,
4205    port: u16,
4206) {
4207    if packet.len() > MAX_MSG_ABSOLUTE {
4208        debug!("Drop over-sized packet ({})", packet.len());
4209        return;
4210    }
4211
4212    let addr: SocketAddr = match if_addr {
4213        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4214        if_addrs::IfAddr::V6(_) => {
4215            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4216            sock.set_scope_id(if_index); // Choose iface for multicast
4217            sock.into()
4218        }
4219    };
4220
4221    // Sends out `packet` to `addr` on the socket.
4222    let sock_addr = addr.into();
4223    match socket.send_to(packet, &sock_addr) {
4224        Ok(sz) => trace!(
4225            "sent out {} bytes on interface {} (idx {}) addr {}",
4226            sz,
4227            if_name,
4228            if_index,
4229            if_addr.ip()
4230        ),
4231        Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4232    }
4233}
4234
4235/// Returns true if `name` is a valid instance name of format:
4236/// <instance>.<service_type>.<_udp|_tcp>.local.
4237/// Note: <instance> could contain '.' as well.
4238fn valid_instance_name(name: &str) -> bool {
4239    name.split('.').count() >= 5
4240}
4241
4242fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4243    monitors.retain(|sender| {
4244        if let Err(e) = sender.try_send(event.clone()) {
4245            debug!("notify_monitors: try_send: {}", &e);
4246            if matches!(e, TrySendError::Disconnected(_)) {
4247                return false; // This monitor is dropped.
4248            }
4249        }
4250        true
4251    });
4252}
4253
4254/// Check if all unique records passed "probing", and if yes, create a packet
4255/// to announce the service.
4256fn prepare_announce(
4257    info: &ServiceInfo,
4258    intf: &MyIntf,
4259    dns_registry: &mut DnsRegistry,
4260    is_ipv4: bool,
4261) -> Option<DnsOutgoing> {
4262    let intf_addrs = if is_ipv4 {
4263        info.get_addrs_on_my_intf_v4(intf)
4264    } else {
4265        info.get_addrs_on_my_intf_v6(intf)
4266    };
4267
4268    if intf_addrs.is_empty() {
4269        debug!(
4270            "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4271            &intf.name
4272        );
4273        return None;
4274    }
4275
4276    // check if we changed our name due to conflicts.
4277    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
4278        Some(new_name) => new_name,
4279        None => info.get_fullname(),
4280    };
4281
4282    debug!(
4283        "prepare to announce service {service_fullname} on {:?}",
4284        &intf_addrs
4285    );
4286
4287    let mut probing_count = 0;
4288    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4289    let create_time = current_time_millis() + fastrand::u64(0..250);
4290
4291    out.add_answer_at_time(
4292        DnsPointer::new(
4293            info.get_type(),
4294            RRType::PTR,
4295            CLASS_IN,
4296            info.get_other_ttl(),
4297            service_fullname.to_string(),
4298        ),
4299        0,
4300    );
4301
4302    if let Some(sub) = info.get_subtype() {
4303        trace!("Adding subdomain {}", sub);
4304        out.add_answer_at_time(
4305            DnsPointer::new(
4306                sub,
4307                RRType::PTR,
4308                CLASS_IN,
4309                info.get_other_ttl(),
4310                service_fullname.to_string(),
4311            ),
4312            0,
4313        );
4314    }
4315
4316    // SRV records.
4317    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
4318        Some(new_name) => new_name.to_string(),
4319        None => info.get_hostname().to_string(),
4320    };
4321
4322    let mut srv = DnsSrv::new(
4323        info.get_fullname(),
4324        CLASS_IN | CLASS_CACHE_FLUSH,
4325        info.get_host_ttl(),
4326        info.get_priority(),
4327        info.get_weight(),
4328        info.get_port(),
4329        hostname,
4330    );
4331
4332    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4333        srv.get_record_mut().set_new_name(new_name.to_string());
4334    }
4335
4336    if !info.requires_probe()
4337        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4338    {
4339        out.add_answer_at_time(srv, 0);
4340    } else {
4341        probing_count += 1;
4342    }
4343
4344    // TXT records.
4345
4346    let mut txt = DnsTxt::new(
4347        info.get_fullname(),
4348        CLASS_IN | CLASS_CACHE_FLUSH,
4349        info.get_other_ttl(),
4350        info.generate_txt(),
4351    );
4352
4353    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4354        txt.get_record_mut().set_new_name(new_name.to_string());
4355    }
4356
4357    if !info.requires_probe()
4358        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4359    {
4360        out.add_answer_at_time(txt, 0);
4361    } else {
4362        probing_count += 1;
4363    }
4364
4365    // Address records. (A and AAAA)
4366
4367    let hostname = info.get_hostname();
4368    for address in intf_addrs {
4369        let mut dns_addr = DnsAddress::new(
4370            hostname,
4371            ip_address_rr_type(&address),
4372            CLASS_IN | CLASS_CACHE_FLUSH,
4373            info.get_host_ttl(),
4374            address,
4375            intf.into(),
4376        );
4377
4378        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4379            dns_addr.get_record_mut().set_new_name(new_name.to_string());
4380        }
4381
4382        if !info.requires_probe()
4383            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4384        {
4385            out.add_answer_at_time(dns_addr, 0);
4386        } else {
4387            probing_count += 1;
4388        }
4389    }
4390
4391    if probing_count > 0 {
4392        return None;
4393    }
4394
4395    Some(out)
4396}
4397
4398/// Send an unsolicited response for owned service via `intf` and `sock`.
4399/// Returns true if sent out successfully for IPv4 or IPv6.
4400fn announce_service_on_intf(
4401    dns_registry: &mut DnsRegistry,
4402    info: &ServiceInfo,
4403    intf: &MyIntf,
4404    sock: &PktInfoUdpSocket,
4405    port: u16,
4406) -> MyResult<bool> {
4407    let is_ipv4 = sock.domain() == Domain::IPV4;
4408    if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4409        let _ = send_dns_outgoing(&out, intf, sock, port)?;
4410        return Ok(true);
4411    }
4412
4413    Ok(false)
4414}
4415
4416/// Returns a new name based on the `original` to avoid conflicts.
4417/// If the name already contains a number in parentheses, increments that number.
4418///
4419/// Examples:
4420/// - `foo.local.` becomes `foo (2).local.`
4421/// - `foo (2).local.` becomes `foo (3).local.`
4422/// - `foo (9)` becomes `foo (10)`
4423fn name_change(original: &str) -> String {
4424    let mut parts: Vec<_> = original.split('.').collect();
4425    let Some(first_part) = parts.get_mut(0) else {
4426        return format!("{original} (2)");
4427    };
4428
4429    let mut new_name = format!("{first_part} (2)");
4430
4431    // check if there is already has `(<num>)` suffix.
4432    if let Some(paren_pos) = first_part.rfind(" (") {
4433        // Check if there's a closing parenthesis
4434        if let Some(end_paren) = first_part[paren_pos..].find(')') {
4435            let absolute_end_pos = paren_pos + end_paren;
4436            // Only process if the closing parenthesis is the last character
4437            if absolute_end_pos == first_part.len() - 1 {
4438                let num_start = paren_pos + 2; // Skip " ("
4439                                               // Try to parse the number between parentheses
4440                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4441                    let base_name = &first_part[..paren_pos];
4442                    new_name = format!("{} ({})", base_name, number + 1)
4443                }
4444            }
4445        }
4446    }
4447
4448    *first_part = &new_name;
4449    parts.join(".")
4450}
4451
4452/// Returns a new name based on the `original` to avoid conflicts.
4453/// If the name already contains a hyphenated number, increments that number.
4454///
4455/// Examples:
4456/// - `foo.local.` becomes `foo-2.local.`
4457/// - `foo-2.local.` becomes `foo-3.local.`
4458/// - `foo` becomes `foo-2`
4459fn hostname_change(original: &str) -> String {
4460    let mut parts: Vec<_> = original.split('.').collect();
4461    let Some(first_part) = parts.get_mut(0) else {
4462        return format!("{original}-2");
4463    };
4464
4465    let mut new_name = format!("{first_part}-2");
4466
4467    // check if there is already a `-<num>` suffix
4468    if let Some(hyphen_pos) = first_part.rfind('-') {
4469        // Try to parse everything after the hyphen as a number
4470        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4471            let base_name = &first_part[..hyphen_pos];
4472            new_name = format!("{}-{}", base_name, number + 1);
4473        }
4474    }
4475
4476    *first_part = &new_name;
4477    parts.join(".")
4478}
4479
4480fn add_answer_with_additionals(
4481    out: &mut DnsOutgoing,
4482    msg: &DnsIncoming,
4483    service: &ServiceInfo,
4484    intf: &MyIntf,
4485    dns_registry: &DnsRegistry,
4486    is_ipv4: bool,
4487) {
4488    let intf_addrs = if is_ipv4 {
4489        service.get_addrs_on_my_intf_v4(intf)
4490    } else {
4491        service.get_addrs_on_my_intf_v6(intf)
4492    };
4493    if intf_addrs.is_empty() {
4494        trace!("No addrs on LAN of intf {:?}", intf);
4495        return;
4496    }
4497
4498    // check if we changed our name due to conflicts.
4499    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4500        Some(new_name) => new_name,
4501        None => service.get_fullname(),
4502    };
4503
4504    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4505        Some(new_name) => new_name,
4506        None => service.get_hostname(),
4507    };
4508
4509    let ptr_added = out.add_answer(
4510        msg,
4511        DnsPointer::new(
4512            service.get_type(),
4513            RRType::PTR,
4514            CLASS_IN,
4515            service.get_other_ttl(),
4516            service_fullname.to_string(),
4517        ),
4518    );
4519
4520    if !ptr_added {
4521        trace!("answer was not added for msg {:?}", msg);
4522        return;
4523    }
4524
4525    if let Some(sub) = service.get_subtype() {
4526        trace!("Adding subdomain {}", sub);
4527        out.add_additional_answer(DnsPointer::new(
4528            sub,
4529            RRType::PTR,
4530            CLASS_IN,
4531            service.get_other_ttl(),
4532            service_fullname.to_string(),
4533        ));
4534    }
4535
4536    // Add recommended additional answers according to
4537    // https://tools.ietf.org/html/rfc6763#section-12.1.
4538    out.add_additional_answer(DnsSrv::new(
4539        service_fullname,
4540        CLASS_IN | CLASS_CACHE_FLUSH,
4541        service.get_host_ttl(),
4542        service.get_priority(),
4543        service.get_weight(),
4544        service.get_port(),
4545        hostname.to_string(),
4546    ));
4547
4548    out.add_additional_answer(DnsTxt::new(
4549        service_fullname,
4550        CLASS_IN | CLASS_CACHE_FLUSH,
4551        service.get_other_ttl(),
4552        service.generate_txt(),
4553    ));
4554
4555    for address in intf_addrs {
4556        out.add_additional_answer(DnsAddress::new(
4557            hostname,
4558            ip_address_rr_type(&address),
4559            CLASS_IN | CLASS_CACHE_FLUSH,
4560            service.get_host_ttl(),
4561            address,
4562            intf.into(),
4563        ));
4564    }
4565}
4566
4567/// Check probes in a registry and returns: a probing packet to send out, and a list of probe names
4568/// that are finished.
4569fn check_probing(
4570    dns_registry: &mut DnsRegistry,
4571    timers: &mut BinaryHeap<Reverse<u64>>,
4572    now: u64,
4573) -> (DnsOutgoing, Vec<String>) {
4574    let mut expired_probes = Vec::new();
4575    let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4576
4577    for (name, probe) in dns_registry.probing.iter_mut() {
4578        if now >= probe.next_send {
4579            if probe.expired(now) {
4580                // move the record to active
4581                expired_probes.push(name.clone());
4582            } else {
4583                out.add_question(name, RRType::ANY);
4584
4585                /*
4586                RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
4587                ...
4588                for tiebreaking to work correctly in all
4589                cases, the Authority Section must contain *all* the records and
4590                proposed rdata being probed for uniqueness.
4591                    */
4592                for record in probe.records.iter() {
4593                    out.add_authority(record.clone());
4594                }
4595
4596                probe.update_next_send(now);
4597
4598                // add timer
4599                timers.push(Reverse(probe.next_send));
4600            }
4601        }
4602    }
4603
4604    (out, expired_probes)
4605}
4606
4607/// Process expired probes on an interface and return a list of services
4608/// that are waiting for the probe to finish.
4609///
4610/// `DnsNameChange` events are sent to the monitors.
4611fn handle_expired_probes(
4612    expired_probes: Vec<String>,
4613    intf_name: &str,
4614    dns_registry: &mut DnsRegistry,
4615    monitors: &mut Vec<Sender<DaemonEvent>>,
4616) -> HashSet<String> {
4617    let mut waiting_services = HashSet::new();
4618
4619    for name in expired_probes {
4620        let Some(probe) = dns_registry.probing.remove(&name) else {
4621            continue;
4622        };
4623
4624        // send notifications about name changes
4625        for record in probe.records.iter() {
4626            if let Some(new_name) = record.get_record().get_new_name() {
4627                dns_registry
4628                    .name_changes
4629                    .insert(name.clone(), new_name.to_string());
4630
4631                let event = DnsNameChange {
4632                    original: record.get_record().get_original_name().to_string(),
4633                    new_name: new_name.to_string(),
4634                    rr_type: record.get_type(),
4635                    intf_name: intf_name.to_string(),
4636                };
4637                debug!("Name change event: {:?}", &event);
4638                notify_monitors(monitors, DaemonEvent::NameChange(event));
4639            }
4640        }
4641
4642        // move RR from probe to active.
4643        debug!(
4644            "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4645            probe.records.len(),
4646            probe.waiting_services.len(),
4647        );
4648
4649        // Move records to active and plan to wake up services if records are not empty.
4650        if !probe.records.is_empty() {
4651            match dns_registry.active.get_mut(&name) {
4652                Some(records) => {
4653                    records.extend(probe.records);
4654                }
4655                None => {
4656                    dns_registry.active.insert(name, probe.records);
4657                }
4658            }
4659
4660            waiting_services.extend(probe.waiting_services);
4661        }
4662    }
4663
4664    waiting_services
4665}
4666
4667#[cfg(test)]
4668mod tests {
4669    use super::{
4670        _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4671        my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4672        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, MDNS_PORT,
4673    };
4674    use crate::{
4675        dns_parser::{
4676            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4677            FLAGS_AA, FLAGS_QR_RESPONSE,
4678        },
4679        service_daemon::{add_answer_of_service, check_hostname},
4680    };
4681    use std::time::{Duration, SystemTime};
4682    use test_log::test;
4683
4684    #[test]
4685    fn test_instance_name() {
4686        assert!(valid_instance_name("my-laser._printer._tcp.local."));
4687        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4688        assert!(!valid_instance_name("_printer._tcp.local."));
4689    }
4690
4691    #[test]
4692    fn test_check_service_name_length() {
4693        let result = check_service_name_length("_tcp", 100);
4694        assert!(result.is_err());
4695        if let Err(e) = result {
4696            println!("{}", e);
4697        }
4698    }
4699
4700    #[test]
4701    fn test_check_hostname() {
4702        // valid hostnames
4703        for hostname in &[
4704            "my_host.local.",
4705            &("A".repeat(255 - ".local.".len()) + ".local."),
4706        ] {
4707            let result = check_hostname(hostname);
4708            assert!(result.is_ok());
4709        }
4710
4711        // erroneous hostnames
4712        for hostname in &[
4713            "my_host.local",
4714            ".local.",
4715            &("A".repeat(256 - ".local.".len()) + ".local."),
4716        ] {
4717            let result = check_hostname(hostname);
4718            assert!(result.is_err());
4719            if let Err(e) = result {
4720                println!("{}", e);
4721            }
4722        }
4723    }
4724
4725    #[test]
4726    fn test_check_domain_suffix() {
4727        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4728        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4729        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4730        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4731        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4732        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4733    }
4734
4735    #[test]
4736    fn test_service_with_temporarily_invalidated_ptr() {
4737        // Create a daemon
4738        let d = ServiceDaemon::new().expect("Failed to create daemon");
4739
4740        let service = "_test_inval_ptr._udp.local.";
4741        let host_name = "my_host_tmp_invalidated_ptr.local.";
4742        let intfs: Vec<_> = my_ip_interfaces(false);
4743        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4744        let port = 5201;
4745        let my_service =
4746            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4747                .expect("invalid service info")
4748                .enable_addr_auto();
4749        let result = d.register(my_service.clone());
4750        assert!(result.is_ok());
4751
4752        // Browse for a service
4753        let browse_chan = d.browse(service).unwrap();
4754        let timeout = Duration::from_secs(2);
4755        let mut resolved = false;
4756
4757        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4758            match event {
4759                ServiceEvent::ServiceResolved(info) => {
4760                    resolved = true;
4761                    println!("Resolved a service of {}", &info.fullname);
4762                    break;
4763                }
4764                e => {
4765                    println!("Received event {:?}", e);
4766                }
4767            }
4768        }
4769
4770        assert!(resolved);
4771
4772        println!("Stopping browse of {}", service);
4773        // Pause browsing so restarting will cause a new immediate query.
4774        // Unregistering will not work here, it will invalidate all the records.
4775        d.stop_browse(service).unwrap();
4776
4777        // Ensure the search is stopped.
4778        // Reduces the chance of receiving an answer adding the ptr back to the
4779        // cache causing the later browse to return directly from the cache.
4780        // (which invalidates what this test is trying to test for.)
4781        let mut stopped = false;
4782        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4783            match event {
4784                ServiceEvent::SearchStopped(_) => {
4785                    stopped = true;
4786                    println!("Stopped browsing service");
4787                    break;
4788                }
4789                // Other `ServiceResolved` messages may be received
4790                // here as they come from different interfaces.
4791                // That's fine for this test.
4792                e => {
4793                    println!("Received event {:?}", e);
4794                }
4795            }
4796        }
4797
4798        assert!(stopped);
4799
4800        // Invalidate the ptr from the service to the host.
4801        let invalidate_ptr_packet = DnsPointer::new(
4802            my_service.get_type(),
4803            RRType::PTR,
4804            CLASS_IN,
4805            0,
4806            my_service.get_fullname().to_string(),
4807        );
4808
4809        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4810        packet_buffer.add_additional_answer(invalidate_ptr_packet);
4811
4812        for intf in intfs {
4813            let sock = _new_socket_bind(&intf, true).unwrap();
4814            send_dns_outgoing_impl(
4815                &packet_buffer,
4816                &intf.name,
4817                intf.index.unwrap_or(0),
4818                &intf.addr,
4819                &sock.pktinfo,
4820                MDNS_PORT,
4821            )
4822            .unwrap();
4823        }
4824
4825        println!(
4826            "Sent PTR record invalidation. Starting second browse for {}",
4827            service
4828        );
4829
4830        // Restart the browse to force the sender to re-send the announcements.
4831        let browse_chan = d.browse(service).unwrap();
4832
4833        resolved = false;
4834        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4835            match event {
4836                ServiceEvent::ServiceResolved(info) => {
4837                    resolved = true;
4838                    println!("Resolved a service of {}", &info.fullname);
4839                    break;
4840                }
4841                e => {
4842                    println!("Received event {:?}", e);
4843                }
4844            }
4845        }
4846
4847        assert!(resolved);
4848        d.shutdown().unwrap();
4849    }
4850
4851    #[test]
4852    fn test_expired_srv() {
4853        // construct service info
4854        let service_type = "_expired-srv._udp.local.";
4855        let instance = "test_instance";
4856        let host_name = "expired_srv_host.local.";
4857        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4858            .unwrap()
4859            .enable_addr_auto();
4860        // let fullname = my_service.get_fullname().to_string();
4861
4862        // set SRV to expire soon.
4863        let new_ttl = 3; // for testing only.
4864        my_service._set_host_ttl(new_ttl);
4865
4866        // register my service
4867        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4868        let result = mdns_server.register(my_service);
4869        assert!(result.is_ok());
4870
4871        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4872        let browse_chan = mdns_client.browse(service_type).unwrap();
4873        let timeout = Duration::from_secs(2);
4874        let mut resolved = false;
4875
4876        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4877            if let ServiceEvent::ServiceResolved(info) = event {
4878                resolved = true;
4879                println!("Resolved a service of {}", &info.fullname);
4880                break;
4881            }
4882        }
4883
4884        assert!(resolved);
4885
4886        // Exit the server so that no more responses.
4887        mdns_server.shutdown().unwrap();
4888
4889        // SRV record in the client cache will expire.
4890        let expire_timeout = Duration::from_secs(new_ttl as u64);
4891        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4892            if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4893                println!("Service removed: {}: {}", &service_type, &full_name);
4894                break;
4895            }
4896        }
4897    }
4898
4899    #[test]
4900    fn test_hostname_resolution_address_removed() {
4901        // Create a mDNS server
4902        let server = ServiceDaemon::new().expect("Failed to create server");
4903        let hostname = "addr_remove_host._tcp.local.";
4904        let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4905            .iter()
4906            .find(|iface| iface.ip().is_ipv4())
4907            .map(|iface| iface.ip().into())
4908            .unwrap();
4909
4910        let mut my_service = ServiceInfo::new(
4911            "_host_res_test._tcp.local.",
4912            "my_instance",
4913            hostname,
4914            service_ip_addr.to_ip_addr(),
4915            1234,
4916            None,
4917        )
4918        .expect("invalid service info");
4919
4920        // Set a short TTL for addresses for testing.
4921        let addr_ttl = 2;
4922        my_service._set_host_ttl(addr_ttl); // Expire soon
4923
4924        server.register(my_service).unwrap();
4925
4926        // Create a mDNS client for resolving the hostname.
4927        let client = ServiceDaemon::new().expect("Failed to create client");
4928        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4929        let resolved = loop {
4930            match event_receiver.recv() {
4931                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4932                    assert!(found_hostname == hostname);
4933                    assert!(addresses.contains(&service_ip_addr));
4934                    println!("address found: {:?}", &addresses);
4935                    break true;
4936                }
4937                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4938                Ok(_event) => {}
4939                Err(_) => break false,
4940            }
4941        };
4942
4943        assert!(resolved);
4944
4945        // Shutdown the server so no more responses / refreshes for addresses.
4946        server.shutdown().unwrap();
4947
4948        // Wait till hostname address record expires, with 1 second grace period.
4949        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4950        let removed = loop {
4951            match event_receiver.recv_timeout(timeout) {
4952                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4953                    assert!(removed_host == hostname);
4954                    assert!(addresses.contains(&service_ip_addr));
4955
4956                    println!(
4957                        "address removed: hostname: {} addresses: {:?}",
4958                        &hostname, &addresses
4959                    );
4960                    break true;
4961                }
4962                Ok(_event) => {}
4963                Err(_) => {
4964                    break false;
4965                }
4966            }
4967        };
4968
4969        assert!(removed);
4970
4971        client.shutdown().unwrap();
4972    }
4973
4974    #[test]
4975    fn test_refresh_ptr() {
4976        // construct service info
4977        let service_type = "_refresh-ptr._udp.local.";
4978        let instance = "test_instance";
4979        let host_name = "refresh_ptr_host.local.";
4980        let service_ip_addr = my_ip_interfaces(false)
4981            .iter()
4982            .find(|iface| iface.ip().is_ipv4())
4983            .map(|iface| iface.ip())
4984            .unwrap();
4985
4986        let mut my_service = ServiceInfo::new(
4987            service_type,
4988            instance,
4989            host_name,
4990            service_ip_addr,
4991            5023,
4992            None,
4993        )
4994        .unwrap();
4995
4996        let new_ttl = 3; // for testing only.
4997        my_service._set_other_ttl(new_ttl);
4998
4999        // register my service
5000        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5001        let result = mdns_server.register(my_service);
5002        assert!(result.is_ok());
5003
5004        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5005        let browse_chan = mdns_client.browse(service_type).unwrap();
5006        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5007        let mut resolved = false;
5008
5009        // resolve the service first.
5010        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5011            if let ServiceEvent::ServiceResolved(info) = event {
5012                resolved = true;
5013                println!("Resolved a service of {}", &info.fullname);
5014                break;
5015            }
5016        }
5017
5018        assert!(resolved);
5019
5020        // wait over 80% of TTL, and refresh PTR should be sent out.
5021        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
5022        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5023            println!("event: {:?}", &event);
5024        }
5025
5026        // verify refresh counter.
5027        let metrics_chan = mdns_client.get_metrics().unwrap();
5028        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
5029        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
5030        assert_eq!(ptr_refresh_counter, 1);
5031        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
5032        assert_eq!(srvtxt_refresh_counter, 1);
5033
5034        // Exit the server so that no more responses.
5035        mdns_server.shutdown().unwrap();
5036        mdns_client.shutdown().unwrap();
5037    }
5038
5039    #[test]
5040    fn test_name_change() {
5041        assert_eq!(name_change("foo.local."), "foo (2).local.");
5042        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5043        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5044        assert_eq!(name_change("foo"), "foo (2)");
5045        assert_eq!(name_change("foo (2)"), "foo (3)");
5046        assert_eq!(name_change(""), " (2)");
5047
5048        // Additional edge cases
5049        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
5050        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
5051        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
5052    }
5053
5054    #[test]
5055    fn test_hostname_change() {
5056        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5057        assert_eq!(hostname_change("foo"), "foo-2");
5058        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5059        assert_eq!(hostname_change("foo-9"), "foo-10");
5060        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5061    }
5062
5063    #[test]
5064    fn test_add_answer_txt_ttl() {
5065        // construct a simple service info
5066        let service_type = "_test_add_answer._udp.local.";
5067        let instance = "test_instance";
5068        let host_name = "add_answer_host.local.";
5069        let service_intf = my_ip_interfaces(false)
5070            .into_iter()
5071            .find(|iface| iface.ip().is_ipv4())
5072            .unwrap();
5073        let service_ip_addr = service_intf.ip();
5074        let my_service = ServiceInfo::new(
5075            service_type,
5076            instance,
5077            host_name,
5078            service_ip_addr,
5079            5023,
5080            None,
5081        )
5082        .unwrap();
5083
5084        // construct a DnsOutgoing message
5085        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5086
5087        // Construct a dummy DnsIncoming message
5088        let mut dummy_data = out.to_data_on_wire();
5089        let interface_id = InterfaceId::from(&service_intf);
5090        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5091
5092        // Add an answer of TXT type for the service.
5093        let if_addrs = vec![service_intf.ip()];
5094        add_answer_of_service(
5095            &mut out,
5096            &incoming,
5097            instance,
5098            &my_service,
5099            RRType::TXT,
5100            if_addrs,
5101        );
5102
5103        // Check if the answer was added correctly
5104        assert!(
5105            out.answers_count() > 0,
5106            "No answers added to the outgoing message"
5107        );
5108
5109        // Check if the first answer is of type TXT
5110        let answer = out._answers().first().unwrap();
5111        assert_eq!(answer.0.get_type(), RRType::TXT);
5112
5113        // Check TTL is set properly for the TXT record
5114        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5115    }
5116
5117    #[test]
5118    fn test_interface_flip() {
5119        // start a server
5120        let ty_domain = "_intf-flip._udp.local.";
5121        let host_name = "intf_flip.local.";
5122        let now = SystemTime::now()
5123            .duration_since(SystemTime::UNIX_EPOCH)
5124            .unwrap();
5125        let instance_name = now.as_micros().to_string(); // Create a unique name.
5126        let port = 5200;
5127
5128        // Get a single IPv4 address
5129        let (ip_addr1, intf_name) = my_ip_interfaces(false)
5130            .iter()
5131            .find(|iface| iface.ip().is_ipv4())
5132            .map(|iface| (iface.ip(), iface.name.clone()))
5133            .unwrap();
5134
5135        println!("Using interface {} with IP {}", intf_name, ip_addr1);
5136
5137        // Register the service.
5138        let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5139            .expect("valid service info");
5140        let server1 = ServiceDaemon::new().expect("failed to start server");
5141        server1
5142            .register(service1)
5143            .expect("Failed to register service1");
5144
5145        // wait for the service announced.
5146        std::thread::sleep(Duration::from_secs(2));
5147
5148        // start a client
5149        let client = ServiceDaemon::new().expect("failed to start client");
5150
5151        let receiver = client.browse(ty_domain).unwrap();
5152
5153        let timeout = Duration::from_secs(3);
5154        let mut got_data = false;
5155
5156        while let Ok(event) = receiver.recv_timeout(timeout) {
5157            if let ServiceEvent::ServiceResolved(_) = event {
5158                println!("Received ServiceResolved event");
5159                got_data = true;
5160                break;
5161            }
5162        }
5163
5164        assert!(got_data, "Should receive ServiceResolved event");
5165
5166        // Set a short IP check interval to detect interface changes quickly.
5167        client.set_ip_check_interval(1).unwrap();
5168
5169        // Now shutdown the interface and expect the client to lose the service.
5170        println!("Shutting down interface {}", &intf_name);
5171        client.test_down_interface(&intf_name).unwrap();
5172
5173        let mut got_removed = false;
5174
5175        while let Ok(event) = receiver.recv_timeout(timeout) {
5176            if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5177                got_removed = true;
5178                println!("removed: {ty_domain} : {instance}");
5179                break;
5180            }
5181        }
5182        assert!(got_removed, "Should receive ServiceRemoved event");
5183
5184        println!("Bringing up interface {}", &intf_name);
5185        client.test_up_interface(&intf_name).unwrap();
5186        let mut got_data = false;
5187        while let Ok(event) = receiver.recv_timeout(timeout) {
5188            if let ServiceEvent::ServiceResolved(resolved) = event {
5189                got_data = true;
5190                println!("Received ServiceResolved: {:?}", resolved);
5191                break;
5192            }
5193        }
5194        assert!(
5195            got_data,
5196            "Should receive ServiceResolved event after interface is back up"
5197        );
5198
5199        server1.shutdown().unwrap();
5200        client.shutdown().unwrap();
5201    }
5202
5203    #[test]
5204    fn test_cache_only() {
5205        // construct service info
5206        let service_type = "_cache_only._udp.local.";
5207        let instance = "test_instance";
5208        let host_name = "cache_only_host.local.";
5209        let service_ip_addr = my_ip_interfaces(false)
5210            .iter()
5211            .find(|iface| iface.ip().is_ipv4())
5212            .map(|iface| iface.ip())
5213            .unwrap();
5214
5215        let mut my_service = ServiceInfo::new(
5216            service_type,
5217            instance,
5218            host_name,
5219            service_ip_addr,
5220            5023,
5221            None,
5222        )
5223        .unwrap();
5224
5225        let new_ttl = 3; // for testing only.
5226        my_service._set_other_ttl(new_ttl);
5227
5228        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5229
5230        // make a single browse request to record that we are interested in the service.  This ensures that
5231        // subsequent announcements are cached.
5232        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5233        std::thread::sleep(Duration::from_secs(2));
5234
5235        // register my service
5236        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5237        let result = mdns_server.register(my_service);
5238        assert!(result.is_ok());
5239
5240        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5241        let mut resolved = false;
5242
5243        // resolve the service.
5244        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5245            if let ServiceEvent::ServiceResolved(info) = event {
5246                resolved = true;
5247                println!("Resolved a service of {}", &info.get_fullname());
5248                break;
5249            }
5250        }
5251
5252        assert!(resolved);
5253
5254        // Exit the server so that no more responses.
5255        mdns_server.shutdown().unwrap();
5256        mdns_client.shutdown().unwrap();
5257    }
5258
5259    #[test]
5260    fn test_cache_only_unsolicited() {
5261        let service_type = "_c_unsolicit._udp.local.";
5262        let instance = "test_instance";
5263        let host_name = "c_unsolicit_host.local.";
5264        let service_ip_addr = my_ip_interfaces(false)
5265            .iter()
5266            .find(|iface| iface.ip().is_ipv4())
5267            .map(|iface| iface.ip())
5268            .unwrap();
5269
5270        let my_service = ServiceInfo::new(
5271            service_type,
5272            instance,
5273            host_name,
5274            service_ip_addr,
5275            5023,
5276            None,
5277        )
5278        .unwrap();
5279
5280        // register my service
5281        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5282        let result = mdns_server.register(my_service);
5283        assert!(result.is_ok());
5284
5285        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5286        mdns_client.accept_unsolicited(true).unwrap();
5287
5288        // Wait a bit for the service announcements to go out, before calling browse_cache.  This ensures
5289        // that the announcements are treated as unsolicited
5290        std::thread::sleep(Duration::from_secs(2));
5291        let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5292        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
5293        let mut resolved = false;
5294
5295        // resolve the service.
5296        while let Ok(event) = browse_chan.recv_timeout(timeout) {
5297            if let ServiceEvent::ServiceResolved(info) = event {
5298                resolved = true;
5299                println!("Resolved a service of {}", &info.get_fullname());
5300                break;
5301            }
5302        }
5303
5304        assert!(resolved);
5305
5306        // Exit the server so that no more responses.
5307        mdns_server.shutdown().unwrap();
5308        mdns_client.shutdown().unwrap();
5309    }
5310
5311    #[test]
5312    fn test_custom_port_isolation() {
5313        // This test verifies:
5314        // 1. Daemons on a custom port can communicate with each other
5315        // 2. Daemons on different ports are isolated (no cross-talk)
5316
5317        let service_type = "_custom_port._udp.local.";
5318        let instance_custom = "custom_port_instance";
5319        let instance_default = "default_port_instance";
5320        let host_name = "custom_port_host.local.";
5321
5322        let service_ip_addr = my_ip_interfaces(false)
5323            .iter()
5324            .find(|iface| iface.ip().is_ipv4())
5325            .map(|iface| iface.ip())
5326            .expect("Test requires an IPv4 interface");
5327
5328        // Create service info for custom port (5454)
5329        let service_custom = ServiceInfo::new(
5330            service_type,
5331            instance_custom,
5332            host_name,
5333            service_ip_addr,
5334            8080,
5335            None,
5336        )
5337        .unwrap();
5338
5339        // Create service info for default port (5353)
5340        let service_default = ServiceInfo::new(
5341            service_type,
5342            instance_default,
5343            host_name,
5344            service_ip_addr,
5345            8081,
5346            None,
5347        )
5348        .unwrap();
5349
5350        // Create two daemons on custom port 5454
5351        let custom_port = 5454u16;
5352        let server_custom =
5353            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5354        let client_custom =
5355            ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5356
5357        // Create daemon on default port (5353)
5358        let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5359
5360        // Register service on custom port
5361        server_custom
5362            .register(service_custom.clone())
5363            .expect("Failed to register custom port service");
5364
5365        // Register service on default port
5366        server_default
5367            .register(service_default.clone())
5368            .expect("Failed to register default port service");
5369
5370        // Browse from custom port client
5371        let browse_custom = client_custom
5372            .browse(service_type)
5373            .expect("Failed to browse on custom port");
5374
5375        let timeout = Duration::from_secs(3);
5376        let mut found_custom = false;
5377        let mut found_default_on_custom = false;
5378
5379        // Custom port client should find the custom port service
5380        while let Ok(event) = browse_custom.recv_timeout(timeout) {
5381            if let ServiceEvent::ServiceResolved(info) = event {
5382                println!(
5383                    "Custom port client resolved: {} on port {}",
5384                    info.get_fullname(),
5385                    info.get_port()
5386                );
5387                if info.get_fullname().starts_with(instance_custom) {
5388                    found_custom = true;
5389                    assert_eq!(info.get_port(), 8080);
5390                }
5391                if info.get_fullname().starts_with(instance_default) {
5392                    found_default_on_custom = true;
5393                }
5394            }
5395        }
5396
5397        assert!(
5398            found_custom,
5399            "Custom port client should find service on custom port"
5400        );
5401        assert!(
5402            !found_default_on_custom,
5403            "Custom port client should NOT find service on default port"
5404        );
5405
5406        // Now verify the default port daemon can find its own services
5407        // but not the custom port services
5408        let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5409        let browse_default = client_default
5410            .browse(service_type)
5411            .expect("Failed to browse on default port");
5412
5413        let mut found_default = false;
5414        let mut found_custom_on_default = false;
5415
5416        while let Ok(event) = browse_default.recv_timeout(timeout) {
5417            if let ServiceEvent::ServiceResolved(info) = event {
5418                println!(
5419                    "Default port client resolved: {} on port {}",
5420                    info.get_fullname(),
5421                    info.get_port()
5422                );
5423                if info.get_fullname().starts_with(instance_default) {
5424                    found_default = true;
5425                    assert_eq!(info.get_port(), 8081);
5426                }
5427                if info.get_fullname().starts_with(instance_custom) {
5428                    found_custom_on_default = true;
5429                }
5430            }
5431        }
5432
5433        assert!(
5434            found_default,
5435            "Default port client should find service on default port"
5436        );
5437        assert!(
5438            !found_custom_on_default,
5439            "Default port client should NOT find service on custom port"
5440        );
5441
5442        // Cleanup
5443        server_custom.shutdown().unwrap();
5444        client_custom.shutdown().unwrap();
5445        server_default.shutdown().unwrap();
5446        client_default.shutdown().unwrap();
5447    }
5448}