mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38        FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{
42        split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43    },
44    Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51    cmp::{self, Reverse},
52    collections::{BinaryHeap, HashMap, HashSet},
53    fmt,
54    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55    str, thread,
56    time::Duration,
57    vec,
58};
59
60/// The default max length of the service name without domain, not including the
61/// leading underscore (`_`). It is set to 15 per
62/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
63pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65/// The default interval for checking IP changes automatically.
66pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
69/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
70pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79/// Response status code for the service `unregister` call.
80#[derive(Debug)]
81pub enum UnregisterStatus {
82    /// Unregister was successful.
83    OK,
84    /// The service was not found in the registration.
85    NotFound,
86}
87
88/// Status code for the service daemon.
89#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92    /// The daemon is running as normal.
93    Running,
94
95    /// The daemon has been shutdown.
96    Shutdown,
97}
98
99/// Different counters included in the metrics.
100/// Currently all counters are for outgoing packets.
101#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103    Register,
104    RegisterResend,
105    Unregister,
106    UnregisterResend,
107    Browse,
108    ResolveHostname,
109    Respond,
110    CacheRefreshPTR,
111    CacheRefreshSRV,
112    CacheRefreshAddr,
113    KnownAnswerSuppression,
114}
115
116impl fmt::Display for Counter {
117    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118        match self {
119            Self::Register => write!(f, "register"),
120            Self::RegisterResend => write!(f, "register-resend"),
121            Self::Unregister => write!(f, "unregister"),
122            Self::UnregisterResend => write!(f, "unregister-resend"),
123            Self::Browse => write!(f, "browse"),
124            Self::ResolveHostname => write!(f, "resolve-hostname"),
125            Self::Respond => write!(f, "respond"),
126            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
127            Self::CacheRefreshSRV => write!(f, "cache-refresh-srv"),
128            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
129            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
130        }
131    }
132}
133
134/// The metrics is a HashMap of (name_key, i64_value).
135/// The main purpose is to help monitoring the mDNS packet traffic.
136pub type Metrics = HashMap<String, i64>;
137
138const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
139
140/// A daemon thread for mDNS
141///
142/// This struct provides a handle and an API to the daemon. It is cloneable.
143#[derive(Clone)]
144pub struct ServiceDaemon {
145    /// Sender handle of the channel to the daemon.
146    sender: Sender<Command>,
147
148    /// Send to this addr to signal that a `Command` is coming.
149    ///
150    /// The daemon listens on this addr together with other mDNS sockets,
151    /// to avoid busy polling the flume channel. If there is a way to poll
152    /// the channel and mDNS sockets together, then this can be removed.
153    signal_addr: SocketAddr,
154}
155
156impl ServiceDaemon {
157    /// Creates a new daemon and spawns a thread to run the daemon.
158    ///
159    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
160    /// ask callers to set the port.
161    pub fn new() -> Result<Self> {
162        // Use port 0 to allow the system assign a random available port,
163        // no need for a pre-defined port number.
164        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
165
166        let signal_sock = UdpSocket::bind(signal_addr)
167            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
168
169        // Get the socket with the OS chosen port
170        let signal_addr = signal_sock
171            .local_addr()
172            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
173
174        // Must be nonblocking so we can listen to it together with mDNS sockets.
175        signal_sock
176            .set_nonblocking(true)
177            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
178
179        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
180
181        let (sender, receiver) = bounded(100);
182
183        // Spawn the daemon thread
184        let mio_sock = MioUdpSocket::from_std(signal_sock);
185        thread::Builder::new()
186            .name("mDNS_daemon".to_string())
187            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
188            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
189
190        Ok(Self {
191            sender,
192            signal_addr,
193        })
194    }
195
196    /// Sends `cmd` to the daemon via its channel, and sends a signal
197    /// to its sock addr to notify.
198    fn send_cmd(&self, cmd: Command) -> Result<()> {
199        let cmd_name = cmd.to_string();
200
201        // First, send to the flume channel.
202        self.sender.try_send(cmd).map_err(|e| match e {
203            TrySendError::Full(_) => Error::Again,
204            e => e_fmt!("flume::channel::send failed: {}", e),
205        })?;
206
207        // Second, send a signal to notify the daemon.
208        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
209        let socket = UdpSocket::bind(addr)
210            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
211        socket
212            .send_to(cmd_name.as_bytes(), self.signal_addr)
213            .map_err(|e| {
214                e_fmt!(
215                    "signal socket send_to {} ({}) failed: {}",
216                    self.signal_addr,
217                    cmd_name,
218                    e
219                )
220            })?;
221
222        Ok(())
223    }
224
225    /// Starts browsing for a specific service type.
226    ///
227    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
228    ///
229    /// Returns a channel `Receiver` to receive events about the service. The caller
230    /// can call `.recv_async().await` on this receiver to handle events in an
231    /// async environment or call `.recv()` in a sync environment.
232    ///
233    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
234    /// finding more details, i.e. SRV records and TXT records.
235    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
236        check_domain_suffix(service_type)?;
237
238        let (resp_s, resp_r) = bounded(10);
239        self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
240        Ok(resp_r)
241    }
242
243    /// Stops searching for a specific service type.
244    ///
245    /// When an error is returned, the caller should retry only when
246    /// the error is `Error::Again`, otherwise should log and move on.
247    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
248        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
249    }
250
251    /// Starts querying for the ip addresses of a hostname.
252    ///
253    /// Returns a channel `Receiver` to receive events about the hostname.
254    /// The caller can call `.recv_async().await` on this receiver to handle events in an
255    /// async environment or call `.recv()` in a sync environment.
256    ///
257    /// The `timeout` is specified in milliseconds.
258    pub fn resolve_hostname(
259        &self,
260        hostname: &str,
261        timeout: Option<u64>,
262    ) -> Result<Receiver<HostnameResolutionEvent>> {
263        check_hostname(hostname)?;
264        let (resp_s, resp_r) = bounded(10);
265        self.send_cmd(Command::ResolveHostname(
266            hostname.to_string(),
267            1,
268            resp_s,
269            timeout,
270        ))?;
271        Ok(resp_r)
272    }
273
274    /// Stops querying for the ip addresses of a hostname.
275    ///
276    /// When an error is returned, the caller should retry only when
277    /// the error is `Error::Again`, otherwise should log and move on.
278    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
279        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
280    }
281
282    /// Registers a service provided by this host.
283    ///
284    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
285    /// this method will automatically fill in addresses from the host.
286    ///
287    /// To re-announce a service with an updated `service_info`, just call
288    /// this `register` function again. No need to call `unregister` first.
289    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
290        check_service_name(service_info.get_fullname())?;
291        check_hostname(service_info.get_hostname())?;
292
293        self.send_cmd(Command::Register(service_info))
294    }
295
296    /// Unregisters a service. This is a graceful shutdown of a service.
297    ///
298    /// Returns a channel receiver that is used to receive the status code
299    /// of the unregister.
300    ///
301    /// When an error is returned, the caller should retry only when
302    /// the error is `Error::Again`, otherwise should log and move on.
303    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
304        let (resp_s, resp_r) = bounded(1);
305        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
306        Ok(resp_r)
307    }
308
309    /// Starts to monitor events from the daemon.
310    ///
311    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
312    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
313        let (resp_s, resp_r) = bounded(100);
314        self.send_cmd(Command::Monitor(resp_s))?;
315        Ok(resp_r)
316    }
317
318    /// Shuts down the daemon thread and returns a channel to receive the status.
319    ///
320    /// When an error is returned, the caller should retry only when
321    /// the error is `Error::Again`, otherwise should log and move on.
322    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
323        let (resp_s, resp_r) = bounded(1);
324        self.send_cmd(Command::Exit(resp_s))?;
325        Ok(resp_r)
326    }
327
328    /// Returns the status of the daemon.
329    ///
330    /// When an error is returned, the caller should retry only when
331    /// the error is `Error::Again`, otherwise should consider the daemon
332    /// stopped working and move on.
333    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
334        let (resp_s, resp_r) = bounded(1);
335
336        if self.sender.is_disconnected() {
337            resp_s
338                .send(DaemonStatus::Shutdown)
339                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
340        } else {
341            self.send_cmd(Command::GetStatus(resp_s))?;
342        }
343
344        Ok(resp_r)
345    }
346
347    /// Returns a channel receiver for the metrics, e.g. input/output counters.
348    ///
349    /// The metrics returned is a snapshot. Hence the caller should call
350    /// this method repeatedly if they want to monitor the metrics continuously.
351    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
352        let (resp_s, resp_r) = bounded(1);
353        self.send_cmd(Command::GetMetrics(resp_s))?;
354        Ok(resp_r)
355    }
356
357    /// Change the max length allowed for a service name.
358    ///
359    /// As RFC 6763 defines a length max for a service name, a user should not call
360    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
361    ///
362    /// `len_max` is capped at an internal limit, which is currently 30.
363    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
364        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
365
366        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
367            return Err(Error::Msg(format!(
368                "service name length max {} is too large",
369                len_max
370            )));
371        }
372
373        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
374    }
375
376    /// Change the interval for checking IP changes automatically.
377    ///
378    /// Setting the interval to 0 disables the IP check.
379    ///
380    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
381    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
382        let interval_in_millis = interval_in_secs as u64 * 1000;
383        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
384            interval_in_millis,
385        )))
386    }
387
388    /// Get the current interval in seconds for checking IP changes automatically.
389    pub fn get_ip_check_interval(&self) -> Result<u32> {
390        let (resp_s, resp_r) = bounded(1);
391        self.send_cmd(Command::GetOption(resp_s))?;
392
393        let option = resp_r
394            .recv_timeout(Duration::from_secs(10))
395            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
396        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
397        Ok(ip_check_interval_in_secs as u32)
398    }
399
400    /// Include interfaces that match `if_kind` for this service daemon.
401    ///
402    /// For example:
403    /// ```ignore
404    ///     daemon.enable_interface("en0")?;
405    /// ```
406    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
407        let if_kind_vec = if_kind.into_vec();
408        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
409            if_kind_vec.kinds,
410        )))
411    }
412
413    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
414    ///
415    /// For example:
416    /// ```ignore
417    ///     daemon.disable_interface(IfKind::IPv6)?;
418    /// ```
419    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
420        let if_kind_vec = if_kind.into_vec();
421        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
422            if_kind_vec.kinds,
423        )))
424    }
425
426    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
427    ///
428    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
429    /// receive announcements from a responder on the same host.
430    ///
431    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
432    ///
433    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
434    /// the UNIX version of the IP_MULTICAST_LOOP option:
435    ///
436    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
437    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
438    ///
439    /// Which means, in order NOT to receive localhost announcements, you want to call
440    /// this API on the querier side on Windows, but on the responder side on Unix.
441    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
442        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
443    }
444
445    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
446    ///
447    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
448    /// receive announcements from a responder on the same host.
449    ///
450    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
451    ///
452    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
453    /// the UNIX version of the IP_MULTICAST_LOOP option:
454    ///
455    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
456    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
457    ///
458    /// Which means, in order NOT to receive localhost announcements, you want to call
459    /// this API on the querier side on Windows, but on the responder side on Unix.
460    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
461        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
462    }
463
464    /// Proactively confirms whether a service instance still valid.
465    ///
466    /// This call will issue queries for a service instance's SRV record and Address records.
467    ///
468    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
469    /// unless there is a reason not to follow RFC.
470    ///
471    /// If no response is received within `timeout`, the current resource
472    /// records will be flushed, and if needed, `ServiceRemoved` event will be
473    /// sent to active queriers.
474    ///
475    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
476    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
477        self.send_cmd(Command::Verify(instance_fullname, timeout))
478    }
479
480    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
481        let zc = Zeroconf::new(signal_sock, poller);
482
483        if let Some(cmd) = Self::run(zc, receiver) {
484            match cmd {
485                Command::Exit(resp_s) => {
486                    // It is guaranteed that the receiver already dropped,
487                    // i.e. the daemon command channel closed.
488                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
489                        debug!("exit: failed to send response of shutdown: {}", e);
490                    }
491                }
492                _ => {
493                    debug!("Unexpected command: {:?}", cmd);
494                }
495            }
496        }
497    }
498
499    /// The main event loop of the daemon thread
500    ///
501    /// In each round, it will:
502    /// 1. select the listening sockets with a timeout.
503    /// 2. process the incoming packets if any.
504    /// 3. try_recv on its channel and execute commands.
505    /// 4. announce its registered services.
506    /// 5. process retransmissions if any.
507    fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
508        // Add the daemon's signal socket to the poller.
509        if let Err(e) = zc.poller.registry().register(
510            &mut zc.signal_sock,
511            mio::Token(SIGNAL_SOCK_EVENT_KEY),
512            mio::Interest::READABLE,
513        ) {
514            debug!("failed to add signal socket to the poller: {}", e);
515            return None;
516        }
517
518        // Add mDNS sockets to the poller.
519        for (intf, sock) in zc.intf_socks.iter_mut() {
520            let key =
521                Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
522
523            if let Err(e) =
524                zc.poller
525                    .registry()
526                    .register(sock, mio::Token(key), mio::Interest::READABLE)
527            {
528                debug!("add socket of {:?} to poller: {e}", intf);
529                return None;
530            }
531        }
532
533        // Setup timer for IP checks.
534        let mut next_ip_check = if zc.ip_check_interval > 0 {
535            current_time_millis() + zc.ip_check_interval
536        } else {
537            0
538        };
539
540        if next_ip_check > 0 {
541            zc.add_timer(next_ip_check);
542        }
543
544        // Start the run loop.
545
546        let mut events = mio::Events::with_capacity(1024);
547        loop {
548            let now = current_time_millis();
549
550            let earliest_timer = zc.peek_earliest_timer();
551            let timeout = earliest_timer.map(|timer| {
552                // If `timer` already passed, set `timeout` to be 1ms.
553                let millis = if timer > now { timer - now } else { 1 };
554                Duration::from_millis(millis)
555            });
556
557            // Process incoming packets, command events and optional timeout.
558            events.clear();
559            match zc.poller.poll(&mut events, timeout) {
560                Ok(_) => zc.handle_poller_events(&events),
561                Err(e) => debug!("failed to select from sockets: {}", e),
562            }
563
564            let now = current_time_millis();
565
566            // Remove the timer if already passed.
567            if let Some(timer) = earliest_timer {
568                if now >= timer {
569                    zc.pop_earliest_timer();
570                }
571            }
572
573            // Remove hostname resolvers with expired timeouts.
574            for hostname in zc
575                .hostname_resolvers
576                .clone()
577                .into_iter()
578                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
579                .map(|(hostname, _)| hostname)
580            {
581                trace!("hostname resolver timeout for {}", &hostname);
582                call_hostname_resolution_listener(
583                    &zc.hostname_resolvers,
584                    &hostname,
585                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
586                );
587                call_hostname_resolution_listener(
588                    &zc.hostname_resolvers,
589                    &hostname,
590                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
591                );
592                zc.hostname_resolvers.remove(&hostname);
593            }
594
595            // process commands from the command channel
596            while let Ok(command) = receiver.try_recv() {
597                if matches!(command, Command::Exit(_)) {
598                    zc.status = DaemonStatus::Shutdown;
599                    return Some(command);
600                }
601                zc.exec_command(command, false);
602            }
603
604            // check for repeated commands and run them if their time is up.
605            let mut i = 0;
606            while i < zc.retransmissions.len() {
607                if now >= zc.retransmissions[i].next_time {
608                    let rerun = zc.retransmissions.remove(i);
609                    zc.exec_command(rerun.command, true);
610                } else {
611                    i += 1;
612                }
613            }
614
615            // Refresh cached service records with active queriers
616            zc.refresh_active_services();
617
618            // Refresh cached A/AAAA records with active queriers
619            let mut query_count = 0;
620            for (hostname, _sender) in zc.hostname_resolvers.iter() {
621                for (hostname, ip_addr) in
622                    zc.cache.refresh_due_hostname_resolutions(hostname).iter()
623                {
624                    zc.send_query(hostname, ip_address_rr_type(ip_addr));
625                    query_count += 1;
626                }
627            }
628
629            zc.increase_counter(Counter::CacheRefreshAddr, query_count);
630
631            // check and evict expired records in our cache
632            let now = current_time_millis();
633
634            // Notify service listeners about the expired records.
635            let expired_services = zc.cache.evict_expired_services(now);
636            zc.notify_service_removal(expired_services);
637
638            // Notify hostname listeners about the expired records.
639            let expired_addrs = zc.cache.evict_expired_addr(now);
640            for (hostname, addrs) in expired_addrs {
641                call_hostname_resolution_listener(
642                    &zc.hostname_resolvers,
643                    &hostname,
644                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
645                );
646                let instances = zc.cache.get_instances_on_host(&hostname);
647                let instance_set: HashSet<String> = instances.into_iter().collect();
648                zc.resolve_updated_instances(&instance_set);
649            }
650
651            // Send out probing queries.
652            zc.probing_handler();
653
654            // check IP changes if next_ip_check is reached.
655            if now >= next_ip_check && next_ip_check > 0 {
656                next_ip_check = now + zc.ip_check_interval;
657                zc.add_timer(next_ip_check);
658
659                zc.check_ip_changes();
660            }
661        }
662    }
663}
664
665/// Creates a new UDP socket that uses `intf` to send and recv multicast.
666fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
667    // Use the same socket for receiving and sending multicast packets.
668    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
669    let intf_ip = &intf.ip();
670    match intf_ip {
671        IpAddr::V4(ip) => {
672            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
673            let sock = new_socket(addr.into(), true)?;
674
675            // Join mDNS group to receive packets.
676            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
677                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
678
679            // Set IP_MULTICAST_IF to send packets.
680            sock.set_multicast_if_v4(ip)
681                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
682
683            if !should_loop {
684                sock.set_multicast_loop_v4(false)
685                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
686            }
687
688            // Test if we can send packets successfully.
689            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
690            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
691            for packet in test_packets {
692                sock.send_to(&packet, &multicast_addr)
693                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
694            }
695            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
696        }
697        IpAddr::V6(ip) => {
698            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
699            let sock = new_socket(addr.into(), true)?;
700
701            // Join mDNS group to receive packets.
702            sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
703                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
704
705            // Set IPV6_MULTICAST_IF to send packets.
706            sock.set_multicast_if_v6(intf.index.unwrap_or(0))
707                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
708
709            // We are not sending multicast packets to test this socket as there might
710            // be many IPv6 interfaces on a host and could cause such send error:
711            // "No buffer space available (os error 55)".
712
713            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
714        }
715    }
716}
717
718/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
719/// `non_block` indicates whether to set O_NONBLOCK for the socket.
720fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
721    let domain = match addr {
722        SocketAddr::V4(_) => socket2::Domain::IPV4,
723        SocketAddr::V6(_) => socket2::Domain::IPV6,
724    };
725
726    let fd = Socket::new(domain, socket2::Type::DGRAM, None)
727        .map_err(|e| e_fmt!("create socket failed: {}", e))?;
728
729    fd.set_reuse_address(true)
730        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
731    #[cfg(unix)] // this is currently restricted to Unix's in socket2
732    fd.set_reuse_port(true)
733        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
734
735    if non_block {
736        fd.set_nonblocking(true)
737            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
738    }
739
740    fd.bind(&addr.into())
741        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
742
743    trace!("new socket bind to {}", &addr);
744    Ok(fd)
745}
746
747/// Specify a UNIX timestamp in millis to run `command` for the next time.
748struct ReRun {
749    /// UNIX timestamp in millis.
750    next_time: u64,
751    command: Command,
752}
753
754/// Enum to represent the IP version.
755#[derive(Debug, Eq, Hash, PartialEq)]
756enum IpVersion {
757    V4,
758    V6,
759}
760
761/// A struct to track multicast send status for a network interface.
762#[derive(Debug, Eq, Hash, PartialEq)]
763struct MulticastSendTracker {
764    intf_index: u32,
765    ip_version: IpVersion,
766}
767
768/// Returns the multicast send tracker if the interface index is valid
769fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
770    match intf.index {
771        Some(index) => {
772            let ip_ver = match intf.addr {
773                IfAddr::V4(_) => IpVersion::V4,
774                IfAddr::V6(_) => IpVersion::V6,
775            };
776            Some(MulticastSendTracker {
777                intf_index: index,
778                ip_version: ip_ver,
779            })
780        }
781        None => None,
782    }
783}
784
785/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
786///
787/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
788#[derive(Debug, Clone)]
789#[non_exhaustive]
790pub enum IfKind {
791    /// All interfaces.
792    All,
793
794    /// All IPv4 interfaces.
795    IPv4,
796
797    /// All IPv6 interfaces.
798    IPv6,
799
800    /// By the interface name, for example "en0"
801    Name(String),
802
803    /// By an IPv4 or IPv6 address.
804    Addr(IpAddr),
805
806    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
807    ///
808    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
809    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
810    LoopbackV4,
811
812    /// ::1/128, disabled by default.
813    LoopbackV6,
814}
815
816impl IfKind {
817    /// Checks if `intf` matches with this interface kind.
818    fn matches(&self, intf: &Interface) -> bool {
819        match self {
820            Self::All => true,
821            Self::IPv4 => intf.ip().is_ipv4(),
822            Self::IPv6 => intf.ip().is_ipv6(),
823            Self::Name(ifname) => ifname == &intf.name,
824            Self::Addr(addr) => addr == &intf.ip(),
825            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
826            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
827        }
828    }
829}
830
831/// The first use case of specifying an interface was to
832/// use an interface name. Hence adding this for ergonomic reasons.
833impl From<&str> for IfKind {
834    fn from(val: &str) -> Self {
835        Self::Name(val.to_string())
836    }
837}
838
839impl From<&String> for IfKind {
840    fn from(val: &String) -> Self {
841        Self::Name(val.to_string())
842    }
843}
844
845/// Still for ergonomic reasons.
846impl From<IpAddr> for IfKind {
847    fn from(val: IpAddr) -> Self {
848        Self::Addr(val)
849    }
850}
851
852/// A list of `IfKind` that can be used to match interfaces.
853pub struct IfKindVec {
854    kinds: Vec<IfKind>,
855}
856
857/// A trait that converts a type into a Vec of `IfKind`.
858pub trait IntoIfKindVec {
859    fn into_vec(self) -> IfKindVec;
860}
861
862impl<T: Into<IfKind>> IntoIfKindVec for T {
863    fn into_vec(self) -> IfKindVec {
864        let if_kind: IfKind = self.into();
865        IfKindVec {
866            kinds: vec![if_kind],
867        }
868    }
869}
870
871impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
872    fn into_vec(self) -> IfKindVec {
873        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
874        IfKindVec { kinds }
875    }
876}
877
878/// Selection of interfaces.
879struct IfSelection {
880    /// The interfaces to be selected.
881    if_kind: IfKind,
882
883    /// Whether the `if_kind` should be enabled or not.
884    selected: bool,
885}
886
887/// A struct holding the state. It was inspired by `zeroconf` package in Python.
888struct Zeroconf {
889    /// Local interfaces with sockets to recv/send on these interfaces.
890    intf_socks: HashMap<Interface, MioUdpSocket>,
891
892    /// Map poll id to Interface.
893    poll_ids: HashMap<usize, Interface>,
894
895    /// Next poll id value
896    poll_id_count: usize,
897
898    /// Local registered services, keyed by service full names.
899    my_services: HashMap<String, ServiceInfo>,
900
901    /// Received DNS records.
902    cache: DnsCache,
903
904    /// Registered service records.
905    dns_registry_map: HashMap<Interface, DnsRegistry>,
906
907    /// Active "Browse" commands.
908    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
909
910    /// Active "ResolveHostname" commands.
911    ///
912    /// The timestamps are set at the future timestamp when the command should timeout.
913    /// `hostname` is case-insensitive and stored in lowercase.
914    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
915
916    /// All repeating transmissions.
917    retransmissions: Vec<ReRun>,
918
919    counters: Metrics,
920
921    /// Waits for incoming packets.
922    poller: Poll,
923
924    /// Channels to notify events.
925    monitors: Vec<Sender<DaemonEvent>>,
926
927    /// Options
928    service_name_len_max: u8,
929
930    /// Interval in millis to check IP address changes.
931    ip_check_interval: u64,
932
933    /// All interface selections called to the daemon.
934    if_selections: Vec<IfSelection>,
935
936    /// Socket for signaling.
937    signal_sock: MioUdpSocket,
938
939    /// Timestamps marking where we need another iteration of the run loop,
940    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
941    ///
942    /// When the run loop goes through a single iteration, it will
943    /// set its timeout to the earliest timer in this list.
944    timers: BinaryHeap<Reverse<u64>>,
945
946    status: DaemonStatus,
947
948    /// Service instances that are pending for resolving SRV and TXT.
949    pending_resolves: HashSet<String>,
950
951    /// Service instances that are already resolved.
952    resolved: HashSet<String>,
953
954    multicast_loop_v4: bool,
955
956    multicast_loop_v6: bool,
957}
958
959impl Zeroconf {
960    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
961        // Get interfaces.
962        let my_ifaddrs = my_ip_interfaces(false);
963
964        // Create a socket for every IP addr.
965        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
966        // or the same interface name with different IP addrs.
967        let mut intf_socks = HashMap::new();
968        let mut dns_registry_map = HashMap::new();
969
970        for intf in my_ifaddrs {
971            let sock = match new_socket_bind(&intf, true) {
972                Ok(s) => s,
973                Err(e) => {
974                    trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
975                    continue;
976                }
977            };
978
979            dns_registry_map.insert(intf.clone(), DnsRegistry::new());
980
981            intf_socks.insert(intf, sock);
982        }
983
984        let monitors = Vec::new();
985        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
986        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
987
988        let timers = BinaryHeap::new();
989
990        // Disable loopback by default.
991        let if_selections = vec![
992            IfSelection {
993                if_kind: IfKind::LoopbackV4,
994                selected: false,
995            },
996            IfSelection {
997                if_kind: IfKind::LoopbackV6,
998                selected: false,
999            },
1000        ];
1001
1002        let status = DaemonStatus::Running;
1003
1004        Self {
1005            intf_socks,
1006            poll_ids: HashMap::new(),
1007            poll_id_count: 0,
1008            my_services: HashMap::new(),
1009            cache: DnsCache::new(),
1010            dns_registry_map,
1011            hostname_resolvers: HashMap::new(),
1012            service_queriers: HashMap::new(),
1013            retransmissions: Vec::new(),
1014            counters: HashMap::new(),
1015            poller,
1016            monitors,
1017            service_name_len_max,
1018            ip_check_interval,
1019            if_selections,
1020            signal_sock,
1021            timers,
1022            status,
1023            pending_resolves: HashSet::new(),
1024            resolved: HashSet::new(),
1025            multicast_loop_v4: true,
1026            multicast_loop_v6: true,
1027        }
1028    }
1029
1030    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1031        match daemon_opt {
1032            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1033            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1034            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1035            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1036            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1037            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1038        }
1039    }
1040
1041    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1042        for if_kind in kinds {
1043            self.if_selections.push(IfSelection {
1044                if_kind,
1045                selected: true,
1046            });
1047        }
1048
1049        self.apply_intf_selections(my_ip_interfaces(true));
1050    }
1051
1052    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1053        for if_kind in kinds {
1054            self.if_selections.push(IfSelection {
1055                if_kind,
1056                selected: false,
1057            });
1058        }
1059
1060        self.apply_intf_selections(my_ip_interfaces(true));
1061    }
1062
1063    fn set_multicast_loop_v4(&mut self, on: bool) {
1064        for (_, sock) in self.intf_socks.iter_mut() {
1065            if let Err(e) = sock.set_multicast_loop_v4(on) {
1066                debug!("failed to set multicast loop v4: {e}");
1067            }
1068        }
1069    }
1070
1071    fn set_multicast_loop_v6(&mut self, on: bool) {
1072        for (_, sock) in self.intf_socks.iter_mut() {
1073            if let Err(e) = sock.set_multicast_loop_v6(on) {
1074                debug!("failed to set multicast loop v6: {e}");
1075            }
1076        }
1077    }
1078
1079    fn notify_monitors(&mut self, event: DaemonEvent) {
1080        // Only retain the monitors that are still connected.
1081        self.monitors.retain(|sender| {
1082            if let Err(e) = sender.try_send(event.clone()) {
1083                debug!("notify_monitors: try_send: {}", &e);
1084                if matches!(e, TrySendError::Disconnected(_)) {
1085                    return false; // This monitor is dropped.
1086                }
1087            }
1088            true
1089        });
1090    }
1091
1092    /// Remove `addr` in my services that enabled `addr_auto`.
1093    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1094        for (_, service_info) in self.my_services.iter_mut() {
1095            if service_info.is_addr_auto() {
1096                service_info.remove_ipaddr(addr);
1097            }
1098        }
1099    }
1100
1101    /// Insert a new interface into the poll map and return key
1102    fn add_poll(&mut self, intf: Interface) -> usize {
1103        Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1104    }
1105
1106    /// Insert a new interface into the poll map and return its key.
1107    ///
1108    /// This exists to satisfy the borrow checker
1109    fn add_poll_impl(
1110        poll_ids: &mut HashMap<usize, Interface>,
1111        poll_id_count: &mut usize,
1112        intf: Interface,
1113    ) -> usize {
1114        let key = *poll_id_count;
1115        *poll_id_count += 1;
1116        let _ = (*poll_ids).insert(key, intf);
1117        key
1118    }
1119
1120    fn add_timer(&mut self, next_time: u64) {
1121        self.timers.push(Reverse(next_time));
1122    }
1123
1124    fn peek_earliest_timer(&self) -> Option<u64> {
1125        self.timers.peek().map(|Reverse(v)| *v)
1126    }
1127
1128    fn pop_earliest_timer(&mut self) -> Option<u64> {
1129        self.timers.pop().map(|Reverse(v)| v)
1130    }
1131
1132    /// Apply all selections to `interfaces` and return the selected addresses.
1133    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1134        let intf_count = interfaces.len();
1135        let mut intf_selections = vec![true; intf_count];
1136
1137        // apply if_selections
1138        for selection in self.if_selections.iter() {
1139            // Mark the interfaces for this selection.
1140            for i in 0..intf_count {
1141                if selection.if_kind.matches(&interfaces[i]) {
1142                    intf_selections[i] = selection.selected;
1143                }
1144            }
1145        }
1146
1147        let mut selected_addrs = HashSet::new();
1148        for i in 0..intf_count {
1149            if intf_selections[i] {
1150                selected_addrs.insert(interfaces[i].addr.ip());
1151            }
1152        }
1153
1154        selected_addrs
1155    }
1156
1157    /// Apply all selections to `interfaces`.
1158    ///
1159    /// For any interface, add it if selected but not bound yet,
1160    /// delete it if not selected but still bound.
1161    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1162        // By default, we enable all interfaces.
1163        let intf_count = interfaces.len();
1164        let mut intf_selections = vec![true; intf_count];
1165
1166        // apply if_selections
1167        for selection in self.if_selections.iter() {
1168            // Mark the interfaces for this selection.
1169            for i in 0..intf_count {
1170                if selection.if_kind.matches(&interfaces[i]) {
1171                    intf_selections[i] = selection.selected;
1172                }
1173            }
1174        }
1175
1176        // Update `intf_socks` based on the selections.
1177        for (idx, intf) in interfaces.into_iter().enumerate() {
1178            if intf_selections[idx] {
1179                // Add the interface
1180                if !self.intf_socks.contains_key(&intf) {
1181                    debug!("apply_intf_selections: add {:?}", &intf.ip());
1182                    self.add_new_interface(intf);
1183                }
1184            } else {
1185                // Remove the interface
1186                if let Some(mut sock) = self.intf_socks.remove(&intf) {
1187                    match self.poller.registry().deregister(&mut sock) {
1188                        Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1189                        Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1190                    }
1191
1192                    // Remove from poll_ids
1193                    self.poll_ids.retain(|_, v| v != &intf);
1194
1195                    // Remove cache records for this interface.
1196                    self.cache.remove_addrs_on_disabled_intf(&intf);
1197                }
1198            }
1199        }
1200    }
1201
1202    /// Check for IP changes and update intf_socks as needed.
1203    fn check_ip_changes(&mut self) {
1204        // Get the current interfaces.
1205        let my_ifaddrs = my_ip_interfaces(true);
1206
1207        let poll_ids = &mut self.poll_ids;
1208        let poller = &mut self.poller;
1209        // Remove unused sockets in the poller.
1210        let deleted_addrs = self
1211            .intf_socks
1212            .iter_mut()
1213            .filter_map(|(intf, sock)| {
1214                if !my_ifaddrs.contains(intf) {
1215                    if let Err(e) = poller.registry().deregister(sock) {
1216                        debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1217                    }
1218                    // Remove from poll_ids
1219                    poll_ids.retain(|_, v| v != intf);
1220                    Some(intf.ip())
1221                } else {
1222                    None
1223                }
1224            })
1225            .collect::<Vec<IpAddr>>();
1226
1227        // Remove deleted addrs from my services that enabled `addr_auto`.
1228        for ip in deleted_addrs.iter() {
1229            self.del_addr_in_my_services(ip);
1230            self.notify_monitors(DaemonEvent::IpDel(*ip));
1231        }
1232
1233        // Keep the interfaces only if they still exist.
1234        self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1235
1236        // Add newly found interfaces only if in our selections.
1237        self.apply_intf_selections(my_ifaddrs);
1238    }
1239
1240    fn add_new_interface(&mut self, intf: Interface) {
1241        // Bind the new interface.
1242        let new_ip = intf.ip();
1243        let should_loop = if new_ip.is_ipv4() {
1244            self.multicast_loop_v4
1245        } else {
1246            self.multicast_loop_v6
1247        };
1248        let mut sock = match new_socket_bind(&intf, should_loop) {
1249            Ok(s) => s,
1250            Err(e) => {
1251                debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1252                return;
1253            }
1254        };
1255
1256        // Add the new interface into the poller.
1257        let key = self.add_poll(intf.clone());
1258        if let Err(e) =
1259            self.poller
1260                .registry()
1261                .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1262        {
1263            debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1264            return;
1265        }
1266
1267        debug!("add new interface {}: {new_ip}", intf.name);
1268        let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1269            Some(registry) => registry,
1270            None => self
1271                .dns_registry_map
1272                .entry(intf.clone())
1273                .or_insert_with(DnsRegistry::new),
1274        };
1275
1276        for (_, service_info) in self.my_services.iter_mut() {
1277            if service_info.is_addr_auto() {
1278                service_info.insert_ipaddr(new_ip);
1279
1280                if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1281                    debug!(
1282                        "Announce service {} on {}",
1283                        service_info.get_fullname(),
1284                        intf.ip()
1285                    );
1286                    service_info.set_status(&intf, ServiceStatus::Announced);
1287                } else {
1288                    for timer in dns_registry.new_timers.drain(..) {
1289                        self.timers.push(Reverse(timer));
1290                    }
1291                    service_info.set_status(&intf, ServiceStatus::Probing);
1292                }
1293            }
1294        }
1295
1296        self.intf_socks.insert(intf, sock);
1297
1298        // Notify the monitors.
1299        self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1300    }
1301
1302    /// Registers a service.
1303    ///
1304    /// RFC 6762 section 8.3.
1305    /// ...the Multicast DNS responder MUST send
1306    ///    an unsolicited Multicast DNS response containing, in the Answer
1307    ///    Section, all of its newly registered resource records
1308    ///
1309    /// Zeroconf will then respond to requests for information about this service.
1310    fn register_service(&mut self, mut info: ServiceInfo) {
1311        // Check the service name length.
1312        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1313            debug!("check_service_name_length: {}", &e);
1314            self.notify_monitors(DaemonEvent::Error(e));
1315            return;
1316        }
1317
1318        if info.is_addr_auto() {
1319            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1320            for addr in selected_addrs {
1321                info.insert_ipaddr(addr);
1322            }
1323        }
1324
1325        debug!("register service {:?}", &info);
1326
1327        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1328        if !outgoing_addrs.is_empty() {
1329            self.notify_monitors(DaemonEvent::Announce(
1330                info.get_fullname().to_string(),
1331                format!("{:?}", &outgoing_addrs),
1332            ));
1333        }
1334
1335        // The key has to be lower case letter as DNS record name is case insensitive.
1336        // The info will have the original name.
1337        let service_fullname = info.get_fullname().to_lowercase();
1338        self.my_services.insert(service_fullname, info);
1339    }
1340
1341    /// Sends out announcement of `info` on every valid interface.
1342    /// Returns the list of interface IPs that sent out the announcement.
1343    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1344        let mut outgoing_addrs = Vec::new();
1345        // Send the announcement on one interface per ip version.
1346        let mut multicast_sent_trackers = HashSet::new();
1347
1348        let mut outgoing_intfs = Vec::new();
1349
1350        for (intf, sock) in self.intf_socks.iter() {
1351            if let Some(tracker) = multicast_send_tracker(intf) {
1352                if multicast_sent_trackers.contains(&tracker) {
1353                    continue; // No need to send again on the same interface with same ip version.
1354                }
1355            }
1356
1357            let dns_registry = match self.dns_registry_map.get_mut(intf) {
1358                Some(registry) => registry,
1359                None => self
1360                    .dns_registry_map
1361                    .entry(intf.clone())
1362                    .or_insert_with(DnsRegistry::new),
1363            };
1364
1365            if announce_service_on_intf(dns_registry, info, intf, sock) {
1366                if let Some(tracker) = multicast_send_tracker(intf) {
1367                    multicast_sent_trackers.insert(tracker);
1368                }
1369                outgoing_addrs.push(intf.ip());
1370                outgoing_intfs.push(intf.clone());
1371
1372                debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1373
1374                info.set_status(intf, ServiceStatus::Announced);
1375            } else {
1376                for timer in dns_registry.new_timers.drain(..) {
1377                    self.timers.push(Reverse(timer));
1378                }
1379                info.set_status(intf, ServiceStatus::Probing);
1380            }
1381        }
1382
1383        // RFC 6762 section 8.3.
1384        // ..The Multicast DNS responder MUST send at least two unsolicited
1385        //    responses, one second apart.
1386        let next_time = current_time_millis() + 1000;
1387        for intf in outgoing_intfs {
1388            self.add_retransmission(
1389                next_time,
1390                Command::RegisterResend(info.get_fullname().to_string(), intf),
1391            );
1392        }
1393
1394        outgoing_addrs
1395    }
1396
1397    /// Send probings or finish them if expired. Notify waiting services.
1398    fn probing_handler(&mut self) {
1399        let now = current_time_millis();
1400
1401        for (intf, sock) in self.intf_socks.iter() {
1402            let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1403                continue;
1404            };
1405
1406            let mut expired_probe_names = Vec::new();
1407            let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1408
1409            for (name, probe) in dns_registry.probing.iter_mut() {
1410                if now >= probe.next_send {
1411                    if probe.expired(now) {
1412                        // move the record to active
1413                        expired_probe_names.push(name.clone());
1414                    } else {
1415                        out.add_question(name, RRType::ANY);
1416
1417                        /*
1418                        RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
1419                        ...
1420                        for tiebreaking to work correctly in all
1421                        cases, the Authority Section must contain *all* the records and
1422                        proposed rdata being probed for uniqueness.
1423                         */
1424                        for record in probe.records.iter() {
1425                            out.add_authority(record.clone());
1426                        }
1427
1428                        probe.update_next_send(now);
1429
1430                        // add timer
1431                        self.timers.push(Reverse(probe.next_send));
1432                    }
1433                }
1434            }
1435
1436            // send probing.
1437            if !out.questions().is_empty() {
1438                debug!("sending out probing of {} questions", out.questions().len());
1439                send_dns_outgoing(&out, intf, sock);
1440            }
1441
1442            let mut waiting_services = HashSet::new();
1443
1444            for name in expired_probe_names {
1445                let Some(probe) = dns_registry.probing.remove(&name) else {
1446                    continue;
1447                };
1448
1449                // send notifications about name changes
1450                for record in probe.records.iter() {
1451                    if let Some(new_name) = record.get_record().get_new_name() {
1452                        dns_registry
1453                            .name_changes
1454                            .insert(name.clone(), new_name.to_string());
1455
1456                        let event = DnsNameChange {
1457                            original: record.get_record().get_original_name().to_string(),
1458                            new_name: new_name.to_string(),
1459                            rr_type: record.get_type(),
1460                            intf_name: intf.name.to_string(),
1461                        };
1462                        notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1463                    }
1464                }
1465
1466                // move RR from probe to active.
1467                debug!(
1468                    "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1469                    probe.records.len(),
1470                    probe.waiting_services.len(),
1471                );
1472
1473                // Move records to active and plan to wake up services if records are not empty.
1474                if !probe.records.is_empty() {
1475                    match dns_registry.active.get_mut(&name) {
1476                        Some(records) => {
1477                            records.extend(probe.records);
1478                        }
1479                        None => {
1480                            dns_registry.active.insert(name, probe.records);
1481                        }
1482                    }
1483
1484                    waiting_services.extend(probe.waiting_services);
1485                }
1486            }
1487
1488            // wake up services waiting.
1489            for service_name in waiting_services {
1490                debug!(
1491                    "try to announce service {service_name} on intf {}",
1492                    intf.ip()
1493                );
1494                // service names are lowercase
1495                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1496                    if info.get_status(intf) == ServiceStatus::Announced {
1497                        debug!("service {} already announced", info.get_fullname());
1498                        continue;
1499                    }
1500
1501                    if announce_service_on_intf(dns_registry, info, intf, sock) {
1502                        let next_time = now + 1000;
1503                        let command =
1504                            Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1505                        self.retransmissions.push(ReRun { next_time, command });
1506                        self.timers.push(Reverse(next_time));
1507
1508                        let fullname = match dns_registry.name_changes.get(&service_name) {
1509                            Some(new_name) => new_name.to_string(),
1510                            None => service_name.to_string(),
1511                        };
1512
1513                        let mut hostname = info.get_hostname();
1514                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1515                            hostname = new_name;
1516                        }
1517
1518                        debug!("wake up: announce service {} on {}", fullname, intf.ip());
1519                        notify_monitors(
1520                            &mut self.monitors,
1521                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1522                        );
1523
1524                        info.set_status(intf, ServiceStatus::Announced);
1525                    }
1526                }
1527            }
1528        }
1529    }
1530
1531    fn unregister_service(
1532        &self,
1533        info: &ServiceInfo,
1534        intf: &Interface,
1535        sock: &MioUdpSocket,
1536    ) -> Vec<u8> {
1537        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1538        out.add_answer_at_time(
1539            DnsPointer::new(
1540                info.get_type(),
1541                RRType::PTR,
1542                CLASS_IN,
1543                0,
1544                info.get_fullname().to_string(),
1545            ),
1546            0,
1547        );
1548
1549        if let Some(sub) = info.get_subtype() {
1550            trace!("Adding subdomain {}", sub);
1551            out.add_answer_at_time(
1552                DnsPointer::new(
1553                    sub,
1554                    RRType::PTR,
1555                    CLASS_IN,
1556                    0,
1557                    info.get_fullname().to_string(),
1558                ),
1559                0,
1560            );
1561        }
1562
1563        out.add_answer_at_time(
1564            DnsSrv::new(
1565                info.get_fullname(),
1566                CLASS_IN | CLASS_CACHE_FLUSH,
1567                0,
1568                info.get_priority(),
1569                info.get_weight(),
1570                info.get_port(),
1571                info.get_hostname().to_string(),
1572            ),
1573            0,
1574        );
1575        out.add_answer_at_time(
1576            DnsTxt::new(
1577                info.get_fullname(),
1578                CLASS_IN | CLASS_CACHE_FLUSH,
1579                0,
1580                info.generate_txt(),
1581            ),
1582            0,
1583        );
1584
1585        for address in info.get_addrs_on_intf(intf) {
1586            out.add_answer_at_time(
1587                DnsAddress::new(
1588                    info.get_hostname(),
1589                    ip_address_rr_type(&address),
1590                    CLASS_IN | CLASS_CACHE_FLUSH,
1591                    0,
1592                    address,
1593                ),
1594                0,
1595            );
1596        }
1597
1598        // `out` data is non-empty, hence we can do this.
1599        send_dns_outgoing(&out, intf, sock).remove(0)
1600    }
1601
1602    /// Binds a channel `listener` to querying mDNS hostnames.
1603    ///
1604    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1605    fn add_hostname_resolver(
1606        &mut self,
1607        hostname: String,
1608        listener: Sender<HostnameResolutionEvent>,
1609        timeout: Option<u64>,
1610    ) {
1611        let real_timeout = timeout.map(|t| current_time_millis() + t);
1612        self.hostname_resolvers
1613            .insert(hostname.to_lowercase(), (listener, real_timeout));
1614        if let Some(t) = real_timeout {
1615            self.add_timer(t);
1616        }
1617    }
1618
1619    /// Sends a multicast query for `name` with `qtype`.
1620    fn send_query(&self, name: &str, qtype: RRType) {
1621        self.send_query_vec(&[(name, qtype)]);
1622    }
1623
1624    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1625    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1626        trace!("Sending query questions: {:?}", questions);
1627        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1628        let now = current_time_millis();
1629
1630        for (name, qtype) in questions {
1631            out.add_question(name, *qtype);
1632
1633            for record in self.cache.get_known_answers(name, *qtype, now) {
1634                /*
1635                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1636                ...
1637                    When a Multicast DNS querier sends a query to which it already knows
1638                    some answers, it populates the Answer Section of the DNS query
1639                    message with those answers.
1640                 */
1641                trace!("add known answer: {:?}", record);
1642                let mut new_record = record.clone();
1643                new_record.get_record_mut().update_ttl(now);
1644                out.add_answer_box(new_record);
1645            }
1646        }
1647
1648        // Send the query on one interface per ip version.
1649        let mut multicast_sent_trackers = HashSet::new();
1650        for (intf, sock) in self.intf_socks.iter() {
1651            if let Some(tracker) = multicast_send_tracker(intf) {
1652                if multicast_sent_trackers.contains(&tracker) {
1653                    continue; // no need to send query the same interface with same ip version.
1654                }
1655                multicast_sent_trackers.insert(tracker);
1656            }
1657            send_dns_outgoing(&out, intf, sock);
1658        }
1659    }
1660
1661    /// Reads from the socket of `ip`.
1662    ///
1663    /// Returns false if failed to receive a packet,
1664    /// otherwise returns true.
1665    fn handle_read(&mut self, intf: &Interface) -> bool {
1666        let sock = match self.intf_socks.get_mut(intf) {
1667            Some(if_sock) => if_sock,
1668            None => return false,
1669        };
1670        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1671
1672        // Read the next mDNS UDP datagram.
1673        //
1674        // If the datagram is larger than `buf`, excess bytes may or may not
1675        // be truncated by the socket layer depending on the platform's libc.
1676        // In any case, such large datagram will not be decoded properly and
1677        // this function should return false but should not crash.
1678        let sz = match sock.recv(&mut buf) {
1679            Ok(sz) => sz,
1680            Err(e) => {
1681                if e.kind() != std::io::ErrorKind::WouldBlock {
1682                    debug!("listening socket read failed: {}", e);
1683                }
1684                return false;
1685            }
1686        };
1687
1688        trace!("received {} bytes at IP: {}", sz, intf.ip());
1689
1690        // If sz is 0, it means sock reached End-of-File.
1691        if sz == 0 {
1692            debug!("socket {:?} was likely shutdown", &sock);
1693            if let Err(e) = self.poller.registry().deregister(sock) {
1694                debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1695            }
1696
1697            // Replace the closed socket with a new one.
1698            let should_loop = if intf.ip().is_ipv4() {
1699                self.multicast_loop_v4
1700            } else {
1701                self.multicast_loop_v6
1702            };
1703            match new_socket_bind(intf, should_loop) {
1704                Ok(new_sock) => {
1705                    trace!("reset socket for IP {}", intf.ip());
1706                    self.intf_socks.insert(intf.clone(), new_sock);
1707                }
1708                Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1709            }
1710
1711            return false;
1712        }
1713
1714        buf.truncate(sz); // reduce potential processing errors
1715
1716        match DnsIncoming::new(buf) {
1717            Ok(msg) => {
1718                if msg.is_query() {
1719                    self.handle_query(msg, intf);
1720                } else if msg.is_response() {
1721                    self.handle_response(msg, intf);
1722                } else {
1723                    debug!("Invalid message: not query and not response");
1724                }
1725            }
1726            Err(e) => debug!("Invalid incoming DNS message: {}", e),
1727        }
1728
1729        true
1730    }
1731
1732    /// Returns true, if sent query. Returns false if SRV already exists.
1733    fn query_unresolved(&mut self, instance: &str) -> bool {
1734        if !valid_instance_name(instance) {
1735            trace!("instance name {} not valid", instance);
1736            return false;
1737        }
1738
1739        if let Some(records) = self.cache.get_srv(instance) {
1740            for record in records {
1741                if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1742                    if self.cache.get_addr(srv.host()).is_none() {
1743                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1744                        return true;
1745                    }
1746                }
1747            }
1748        } else {
1749            self.send_query(instance, RRType::ANY);
1750            return true;
1751        }
1752
1753        false
1754    }
1755
1756    /// Checks if `ty_domain` has records in the cache. If yes, sends the
1757    /// cached records via `sender`.
1758    fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1759        let mut resolved: HashSet<String> = HashSet::new();
1760        let mut unresolved: HashSet<String> = HashSet::new();
1761
1762        if let Some(records) = self.cache.get_ptr(ty_domain) {
1763            for record in records.iter() {
1764                if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1765                    let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1766                        Ok(ok) => ok,
1767                        Err(err) => {
1768                            debug!("Error while creating service info from cache: {}", err);
1769                            continue;
1770                        }
1771                    };
1772
1773                    match sender.send(ServiceEvent::ServiceFound(
1774                        ty_domain.to_string(),
1775                        ptr.alias().to_string(),
1776                    )) {
1777                        Ok(()) => debug!("send service found {}", ptr.alias()),
1778                        Err(e) => {
1779                            debug!("failed to send service found: {}", e);
1780                            continue;
1781                        }
1782                    }
1783
1784                    if info.is_ready() {
1785                        resolved.insert(ptr.alias().to_string());
1786                        match sender.send(ServiceEvent::ServiceResolved(info)) {
1787                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1788                            Err(e) => debug!("failed to send service resolved: {}", e),
1789                        }
1790                    } else {
1791                        unresolved.insert(ptr.alias().to_string());
1792                    }
1793                }
1794            }
1795        }
1796
1797        for instance in resolved.drain() {
1798            self.pending_resolves.remove(&instance);
1799            self.resolved.insert(instance);
1800        }
1801
1802        for instance in unresolved.drain() {
1803            self.add_pending_resolve(instance);
1804        }
1805    }
1806
1807    /// Checks if `hostname` has records in the cache. If yes, sends the
1808    /// cached records via `sender`.
1809    fn query_cache_for_hostname(
1810        &mut self,
1811        hostname: &str,
1812        sender: Sender<HostnameResolutionEvent>,
1813    ) {
1814        let addresses_map = self.cache.get_addresses_for_host(hostname);
1815        for (name, addresses) in addresses_map {
1816            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1817                Ok(()) => trace!("sent hostname addresses found"),
1818                Err(e) => debug!("failed to send hostname addresses found: {}", e),
1819            }
1820        }
1821    }
1822
1823    fn add_pending_resolve(&mut self, instance: String) {
1824        if !self.pending_resolves.contains(&instance) {
1825            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1826            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1827            self.pending_resolves.insert(instance);
1828        }
1829    }
1830
1831    fn create_service_info_from_cache(
1832        &self,
1833        ty_domain: &str,
1834        fullname: &str,
1835    ) -> Result<ServiceInfo> {
1836        let my_name = {
1837            let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1838            name.strip_suffix('.').unwrap_or(name).to_string()
1839        };
1840
1841        let now = current_time_millis();
1842        let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1843
1844        // Be sure setting `subtype` if available even when querying for the parent domain.
1845        if let Some(subtype) = self.cache.get_subtype(fullname) {
1846            trace!(
1847                "ty_domain: {} found subtype {} for instance: {}",
1848                ty_domain,
1849                subtype,
1850                fullname
1851            );
1852            if info.get_subtype().is_none() {
1853                info.set_subtype(subtype.clone());
1854            }
1855        }
1856
1857        // resolve SRV record
1858        if let Some(records) = self.cache.get_srv(fullname) {
1859            if let Some(answer) = records.first() {
1860                if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1861                    info.set_hostname(dns_srv.host().to_string());
1862                    info.set_port(dns_srv.port());
1863                }
1864            }
1865        }
1866
1867        // resolve TXT record
1868        if let Some(records) = self.cache.get_txt(fullname) {
1869            if let Some(record) = records.first() {
1870                if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1871                    info.set_properties_from_txt(dns_txt.text());
1872                }
1873            }
1874        }
1875
1876        // resolve A and AAAA records
1877        if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1878            for answer in records.iter() {
1879                if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1880                    if dns_a.get_record().is_expired(now) {
1881                        trace!("Addr expired: {}", dns_a.address());
1882                    } else {
1883                        info.insert_ipaddr(dns_a.address());
1884                    }
1885                }
1886            }
1887        }
1888
1889        Ok(info)
1890    }
1891
1892    fn handle_poller_events(&mut self, events: &mio::Events) {
1893        for ev in events.iter() {
1894            trace!("event received with key {:?}", ev.token());
1895            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1896                // Drain signals as we will drain commands as well.
1897                self.signal_sock_drain();
1898
1899                if let Err(e) = self.poller.registry().reregister(
1900                    &mut self.signal_sock,
1901                    ev.token(),
1902                    mio::Interest::READABLE,
1903                ) {
1904                    debug!("failed to modify poller for signal socket: {}", e);
1905                }
1906                continue; // Next event.
1907            }
1908
1909            // Read until no more packets available.
1910            let intf = match self.poll_ids.get(&ev.token().0) {
1911                Some(interface) => interface.clone(),
1912                None => {
1913                    debug!("Ip for event key {} not found", ev.token().0);
1914                    break;
1915                }
1916            };
1917            while self.handle_read(&intf) {}
1918
1919            // we continue to monitor this socket.
1920            if let Some(sock) = self.intf_socks.get_mut(&intf) {
1921                if let Err(e) =
1922                    self.poller
1923                        .registry()
1924                        .reregister(sock, ev.token(), mio::Interest::READABLE)
1925                {
1926                    debug!("modify poller for interface {:?}: {}", &intf, e);
1927                    break;
1928                }
1929            }
1930        }
1931    }
1932
1933    /// Deal with incoming response packets.  All answers
1934    /// are held in the cache, and listeners are notified.
1935    fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1936        trace!(
1937            "handle_response: {} answers {} authorities {} additionals",
1938            msg.answers().len(),
1939            &msg.authorities().len(),
1940            &msg.num_additionals()
1941        );
1942        let now = current_time_millis();
1943
1944        // remove records that are expired.
1945        let mut record_predicate = |record: &DnsRecordBox| {
1946            if !record.get_record().is_expired(now) {
1947                return true;
1948            }
1949
1950            debug!("record is expired, removing it from cache.");
1951            if self.cache.remove(record) {
1952                // for PTR records, send event to listeners
1953                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1954                    call_service_listener(
1955                        &self.service_queriers,
1956                        dns_ptr.get_name(),
1957                        ServiceEvent::ServiceRemoved(
1958                            dns_ptr.get_name().to_string(),
1959                            dns_ptr.alias().to_string(),
1960                        ),
1961                    );
1962                }
1963            }
1964            false
1965        };
1966        msg.answers_mut().retain(&mut record_predicate);
1967        msg.authorities_mut().retain(&mut record_predicate);
1968        msg.additionals_mut().retain(&mut record_predicate);
1969
1970        // check possible conflicts and handle them.
1971        self.conflict_handler(&msg, intf);
1972
1973        /// Represents a DNS record change that involves one service instance.
1974        struct InstanceChange {
1975            ty: RRType,   // The type of DNS record for the instance.
1976            name: String, // The name of the record.
1977        }
1978
1979        // Go through all answers to get the new and updated records.
1980        // For new PTR records, send out ServiceFound immediately. For others,
1981        // collect them into `changes`.
1982        //
1983        // Note: we don't try to identify the update instances based on
1984        // each record immediately as the answers are likely related to each
1985        // other.
1986        let mut changes = Vec::new();
1987        let mut timers = Vec::new();
1988        for record in msg.all_records() {
1989            match self.cache.add_or_update(intf, record, &mut timers) {
1990                Some((dns_record, true)) => {
1991                    timers.push(dns_record.get_record().get_expire_time());
1992                    timers.push(dns_record.get_record().get_refresh_time());
1993
1994                    let ty = dns_record.get_type();
1995                    let name = dns_record.get_name();
1996                    if ty == RRType::PTR {
1997                        if self.service_queriers.contains_key(name) {
1998                            timers.push(dns_record.get_record().get_refresh_time());
1999                        }
2000
2001                        // send ServiceFound
2002                        if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2003                            call_service_listener(
2004                                &self.service_queriers,
2005                                name,
2006                                ServiceEvent::ServiceFound(
2007                                    name.to_string(),
2008                                    dns_ptr.alias().to_string(),
2009                                ),
2010                            );
2011                            changes.push(InstanceChange {
2012                                ty,
2013                                name: dns_ptr.alias().to_string(),
2014                            });
2015                        }
2016                    } else {
2017                        changes.push(InstanceChange {
2018                            ty,
2019                            name: name.to_string(),
2020                        });
2021                    }
2022                }
2023                Some((dns_record, false)) => {
2024                    timers.push(dns_record.get_record().get_expire_time());
2025                    timers.push(dns_record.get_record().get_refresh_time());
2026                }
2027                _ => {}
2028            }
2029        }
2030
2031        // Add timers for the new records.
2032        for t in timers {
2033            self.add_timer(t);
2034        }
2035
2036        // Go through remaining changes to see if any hostname resolutions were found or updated.
2037        for change in changes
2038            .iter()
2039            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2040        {
2041            let addr_map = self.cache.get_addresses_for_host(&change.name);
2042            for (name, addresses) in addr_map {
2043                call_hostname_resolution_listener(
2044                    &self.hostname_resolvers,
2045                    &change.name,
2046                    HostnameResolutionEvent::AddressesFound(name, addresses),
2047                )
2048            }
2049        }
2050
2051        // Identify the instances that need to be "resolved".
2052        let mut updated_instances = HashSet::new();
2053        for update in changes {
2054            match update.ty {
2055                RRType::PTR | RRType::SRV | RRType::TXT => {
2056                    updated_instances.insert(update.name);
2057                }
2058                RRType::A | RRType::AAAA => {
2059                    let instances = self.cache.get_instances_on_host(&update.name);
2060                    updated_instances.extend(instances);
2061                }
2062                _ => {}
2063            }
2064        }
2065
2066        self.resolve_updated_instances(&updated_instances);
2067    }
2068
2069    fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2070        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2071            return;
2072        };
2073
2074        for answer in msg.answers().iter() {
2075            let mut new_records = Vec::new();
2076
2077            let name = answer.get_name();
2078            let Some(probe) = dns_registry.probing.get_mut(name) else {
2079                continue;
2080            };
2081
2082            // check against possible multicast forwarding
2083            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2084                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2085                    if !valid_ip_on_intf(&answer_addr.address(), intf) {
2086                        debug!(
2087                            "conflict handler: answer addr {:?} not in the subnet of {:?}",
2088                            answer_addr, intf
2089                        );
2090                        continue;
2091                    }
2092                }
2093
2094                // double check if any other address record matches rrdata,
2095                // as there could be multiple addresses for the same name.
2096                let any_match = probe.records.iter().any(|r| {
2097                    r.get_type() == answer.get_type()
2098                        && r.get_class() == answer.get_class()
2099                        && r.rrdata_match(answer.as_ref())
2100                });
2101                if any_match {
2102                    continue; // no conflict for this answer.
2103                }
2104            }
2105
2106            probe.records.retain(|record| {
2107                if record.get_type() == answer.get_type()
2108                    && record.get_class() == answer.get_class()
2109                    && !record.rrdata_match(answer.as_ref())
2110                {
2111                    debug!(
2112                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2113                        record.get_type(),
2114                        record.rdata_print(),
2115                        answer.rdata_print()
2116                    );
2117
2118                    // create a new name for this record
2119                    // then remove the old record in probing.
2120                    let mut new_record = record.clone();
2121                    let new_name = match record.get_type() {
2122                        RRType::A => hostname_change(name),
2123                        RRType::AAAA => hostname_change(name),
2124                        _ => name_change(name),
2125                    };
2126                    new_record.get_record_mut().set_new_name(new_name);
2127                    new_records.push(new_record);
2128                    return false; // old record is dropped from the probe.
2129                }
2130
2131                true
2132            });
2133
2134            // ?????
2135            // if probe.records.is_empty() {
2136            //     dns_registry.probing.remove(name);
2137            // }
2138
2139            // Probing again with the new names.
2140            let create_time = current_time_millis() + fastrand::u64(0..250);
2141
2142            let waiting_services = probe.waiting_services.clone();
2143
2144            for record in new_records {
2145                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2146                    self.timers.push(Reverse(create_time));
2147                }
2148
2149                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2150                dns_registry.name_changes.insert(
2151                    record.get_record().get_original_name().to_string(),
2152                    record.get_name().to_string(),
2153                );
2154
2155                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2156                    Some(p) => p,
2157                    None => {
2158                        let new_probe = dns_registry
2159                            .probing
2160                            .entry(record.get_name().to_string())
2161                            .or_insert_with(|| {
2162                                debug!("conflict handler: new probe of {}", record.get_name());
2163                                Probe::new(create_time)
2164                            });
2165                        self.timers.push(Reverse(new_probe.next_send));
2166                        new_probe
2167                    }
2168                };
2169
2170                debug!(
2171                    "insert record with new name '{}' {} into probe",
2172                    record.get_name(),
2173                    record.get_type()
2174                );
2175                new_probe.insert_record(record);
2176
2177                new_probe.waiting_services.extend(waiting_services.clone());
2178            }
2179        }
2180    }
2181
2182    /// Resolve the updated (including new) instances.
2183    ///
2184    /// Note: it is possible that more than 1 PTR pointing to the same
2185    /// instance. For example, a regular service type PTR and a sub-type
2186    /// service type PTR can both point to the same service instance.
2187    /// This loop automatically handles the sub-type PTRs.
2188    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2189        let mut resolved: HashSet<String> = HashSet::new();
2190        let mut unresolved: HashSet<String> = HashSet::new();
2191        let mut removed_instances = HashMap::new();
2192
2193        for (ty_domain, records) in self.cache.all_ptr().iter() {
2194            if !self.service_queriers.contains_key(ty_domain) {
2195                // No need to resolve if not in our queries.
2196                continue;
2197            }
2198
2199            for record in records.iter() {
2200                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2201                    if updated_instances.contains(dns_ptr.alias()) {
2202                        if let Ok(info) =
2203                            self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2204                        {
2205                            if info.is_ready() {
2206                                debug!("call queriers to resolve {}", dns_ptr.alias());
2207                                resolved.insert(dns_ptr.alias().to_string());
2208                                call_service_listener(
2209                                    &self.service_queriers,
2210                                    ty_domain,
2211                                    ServiceEvent::ServiceResolved(info),
2212                                );
2213                            } else {
2214                                if self.resolved.remove(dns_ptr.alias()) {
2215                                    removed_instances
2216                                        .entry(ty_domain.to_string())
2217                                        .or_insert_with(HashSet::new)
2218                                        .insert(dns_ptr.alias().to_string());
2219                                }
2220                                unresolved.insert(dns_ptr.alias().to_string());
2221                            }
2222                        }
2223                    }
2224                }
2225            }
2226        }
2227
2228        for instance in resolved.drain() {
2229            self.pending_resolves.remove(&instance);
2230            self.resolved.insert(instance);
2231        }
2232
2233        for instance in unresolved.drain() {
2234            self.add_pending_resolve(instance);
2235        }
2236
2237        self.notify_service_removal(removed_instances);
2238    }
2239
2240    /// Handle incoming query packets, figure out whether and what to respond.
2241    fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2242        let sock = match self.intf_socks.get(intf) {
2243            Some(sock) => sock,
2244            None => return,
2245        };
2246        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2247
2248        // Special meta-query "_services._dns-sd._udp.<Domain>".
2249        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2250        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2251
2252        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2253            debug!("missing dns registry for intf {}", intf.ip());
2254            return;
2255        };
2256
2257        for question in msg.questions().iter() {
2258            trace!("query question: {:?}", &question);
2259
2260            let qtype = question.entry_type();
2261
2262            if qtype == RRType::PTR {
2263                for service in self.my_services.values() {
2264                    if service.get_status(intf) != ServiceStatus::Announced {
2265                        continue;
2266                    }
2267
2268                    if question.entry_name() == service.get_type()
2269                        || service
2270                            .get_subtype()
2271                            .as_ref()
2272                            .is_some_and(|v| v == question.entry_name())
2273                    {
2274                        add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2275                    } else if question.entry_name() == META_QUERY {
2276                        let ptr_added = out.add_answer(
2277                            &msg,
2278                            DnsPointer::new(
2279                                question.entry_name(),
2280                                RRType::PTR,
2281                                CLASS_IN,
2282                                service.get_other_ttl(),
2283                                service.get_type().to_string(),
2284                            ),
2285                        );
2286                        if !ptr_added {
2287                            trace!("answer was not added for meta-query {:?}", &question);
2288                        }
2289                    }
2290                }
2291            } else {
2292                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2293                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2294                    let probe_name = question.entry_name();
2295
2296                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2297                        let now = current_time_millis();
2298
2299                        // Only do tiebreaking if probe already started.
2300                        // This check also helps avoid redo tiebreaking if start time
2301                        // was postponed.
2302                        if probe.start_time < now {
2303                            let incoming_records: Vec<_> = msg
2304                                .authorities()
2305                                .iter()
2306                                .filter(|r| r.get_name() == probe_name)
2307                                .collect();
2308
2309                            /*
2310                            RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
2311                            ...
2312                            if the host finds that its own data is lexicographically later, it
2313                            simply ignores the other host's probe.  If the host finds that its
2314                            own data is lexicographically earlier, then it defers to the winning
2315                            host by waiting one second, and then begins probing for this record
2316                            again.
2317                            */
2318                            match probe.tiebreaking(&incoming_records) {
2319                                cmp::Ordering::Less => {
2320                                    debug!(
2321                                        "tiebreaking '{}': LOST, will wait for one second",
2322                                        probe_name
2323                                    );
2324                                    probe.start_time = now + 1000; // wait and restart.
2325                                    probe.next_send = now + 1000;
2326                                }
2327                                ordering => {
2328                                    debug!("tiebreaking '{}': {:?}", probe_name, ordering);
2329                                }
2330                            }
2331                        }
2332                    }
2333                }
2334
2335                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2336                    for service in self.my_services.values() {
2337                        if service.get_status(intf) != ServiceStatus::Announced {
2338                            continue;
2339                        }
2340
2341                        let service_hostname =
2342                            match dns_registry.name_changes.get(service.get_hostname()) {
2343                                Some(new_name) => new_name,
2344                                None => service.get_hostname(),
2345                            };
2346
2347                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2348                            let intf_addrs = service.get_addrs_on_intf(intf);
2349                            if intf_addrs.is_empty()
2350                                && (qtype == RRType::A || qtype == RRType::AAAA)
2351                            {
2352                                let t = match qtype {
2353                                    RRType::A => "TYPE_A",
2354                                    RRType::AAAA => "TYPE_AAAA",
2355                                    _ => "invalid_type",
2356                                };
2357                                trace!(
2358                                    "Cannot find valid addrs for {} response on intf {:?}",
2359                                    t,
2360                                    &intf
2361                                );
2362                                return;
2363                            }
2364                            for address in intf_addrs {
2365                                out.add_answer(
2366                                    &msg,
2367                                    DnsAddress::new(
2368                                        service_hostname,
2369                                        ip_address_rr_type(&address),
2370                                        CLASS_IN | CLASS_CACHE_FLUSH,
2371                                        service.get_host_ttl(),
2372                                        address,
2373                                    ),
2374                                );
2375                            }
2376                        }
2377                    }
2378                }
2379
2380                let query_name = question.entry_name().to_lowercase();
2381                let service_opt = self
2382                    .my_services
2383                    .iter()
2384                    .find(|(k, _v)| {
2385                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2386                            Some(new_name) => new_name,
2387                            None => k,
2388                        };
2389                        service_name == &query_name
2390                    })
2391                    .map(|(_, v)| v);
2392
2393                let Some(service) = service_opt else {
2394                    continue;
2395                };
2396
2397                if service.get_status(intf) != ServiceStatus::Announced {
2398                    continue;
2399                }
2400
2401                if qtype == RRType::SRV || qtype == RRType::ANY {
2402                    out.add_answer(
2403                        &msg,
2404                        DnsSrv::new(
2405                            question.entry_name(),
2406                            CLASS_IN | CLASS_CACHE_FLUSH,
2407                            service.get_host_ttl(),
2408                            service.get_priority(),
2409                            service.get_weight(),
2410                            service.get_port(),
2411                            service.get_hostname().to_string(),
2412                        ),
2413                    );
2414                }
2415
2416                if qtype == RRType::TXT || qtype == RRType::ANY {
2417                    out.add_answer(
2418                        &msg,
2419                        DnsTxt::new(
2420                            question.entry_name(),
2421                            CLASS_IN | CLASS_CACHE_FLUSH,
2422                            service.get_host_ttl(),
2423                            service.generate_txt(),
2424                        ),
2425                    );
2426                }
2427
2428                if qtype == RRType::SRV {
2429                    let intf_addrs = service.get_addrs_on_intf(intf);
2430                    if intf_addrs.is_empty() {
2431                        debug!(
2432                            "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2433                            &intf
2434                        );
2435                        return;
2436                    }
2437                    for address in intf_addrs {
2438                        out.add_additional_answer(DnsAddress::new(
2439                            service.get_hostname(),
2440                            ip_address_rr_type(&address),
2441                            CLASS_IN | CLASS_CACHE_FLUSH,
2442                            service.get_host_ttl(),
2443                            address,
2444                        ));
2445                    }
2446                }
2447            }
2448        }
2449
2450        if !out.answers_count() > 0 {
2451            out.set_id(msg.id());
2452            send_dns_outgoing(&out, intf, sock);
2453
2454            self.increase_counter(Counter::Respond, 1);
2455            self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2456        }
2457
2458        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2459    }
2460
2461    /// Increases the value of `counter` by `count`.
2462    fn increase_counter(&mut self, counter: Counter, count: i64) {
2463        let key = counter.to_string();
2464        match self.counters.get_mut(&key) {
2465            Some(v) => *v += count,
2466            None => {
2467                self.counters.insert(key, count);
2468            }
2469        }
2470    }
2471
2472    fn signal_sock_drain(&self) {
2473        let mut signal_buf = [0; 1024];
2474
2475        // This recv is non-blocking as the socket is non-blocking.
2476        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2477            trace!(
2478                "signal socket recvd: {}",
2479                String::from_utf8_lossy(&signal_buf[0..sz])
2480            );
2481        }
2482    }
2483
2484    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2485        self.retransmissions.push(ReRun { next_time, command });
2486        self.add_timer(next_time);
2487    }
2488
2489    /// Sends service removal event to listeners for expired service records.
2490    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2491        for (ty_domain, sender) in self.service_queriers.iter() {
2492            if let Some(instances) = expired.get(ty_domain) {
2493                for instance_name in instances {
2494                    let event = ServiceEvent::ServiceRemoved(
2495                        ty_domain.to_string(),
2496                        instance_name.to_string(),
2497                    );
2498                    match sender.send(event) {
2499                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2500                        Err(e) => debug!("Failed to send event: {}", e),
2501                    }
2502                }
2503            }
2504        }
2505    }
2506
2507    /// The entry point that executes all commands received by the daemon.
2508    ///
2509    /// `repeating`: whether this is a retransmission.
2510    fn exec_command(&mut self, command: Command, repeating: bool) {
2511        match command {
2512            Command::Browse(ty, next_delay, listener) => {
2513                self.exec_command_browse(repeating, ty, next_delay, listener);
2514            }
2515
2516            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2517                self.exec_command_resolve_hostname(
2518                    repeating, hostname, next_delay, listener, timeout,
2519                );
2520            }
2521
2522            Command::Register(service_info) => {
2523                self.register_service(service_info);
2524                self.increase_counter(Counter::Register, 1);
2525            }
2526
2527            Command::RegisterResend(fullname, intf) => {
2528                trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2529                self.exec_command_register_resend(fullname, intf);
2530            }
2531
2532            Command::Unregister(fullname, resp_s) => {
2533                trace!("unregister service {} repeat {}", &fullname, &repeating);
2534                self.exec_command_unregister(repeating, fullname, resp_s);
2535            }
2536
2537            Command::UnregisterResend(packet, ip) => {
2538                self.exec_command_unregister_resend(packet, ip);
2539            }
2540
2541            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2542
2543            Command::StopResolveHostname(hostname) => {
2544                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2545            }
2546
2547            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2548
2549            Command::GetMetrics(resp_s) => match resp_s.send(self.counters.clone()) {
2550                Ok(()) => trace!("Sent metrics to the client"),
2551                Err(e) => debug!("Failed to send metrics: {}", e),
2552            },
2553
2554            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2555                Ok(()) => trace!("Sent status to the client"),
2556                Err(e) => debug!("Failed to send status: {}", e),
2557            },
2558
2559            Command::Monitor(resp_s) => {
2560                self.monitors.push(resp_s);
2561            }
2562
2563            Command::SetOption(daemon_opt) => {
2564                self.process_set_option(daemon_opt);
2565            }
2566
2567            Command::GetOption(resp_s) => {
2568                let val = DaemonOptionVal {
2569                    _service_name_len_max: self.service_name_len_max,
2570                    ip_check_interval: self.ip_check_interval,
2571                };
2572                if let Err(e) = resp_s.send(val) {
2573                    debug!("Failed to send options: {}", e);
2574                }
2575            }
2576
2577            Command::Verify(instance_fullname, timeout) => {
2578                self.exec_command_verify(instance_fullname, timeout, repeating);
2579            }
2580
2581            _ => {
2582                debug!("unexpected command: {:?}", &command);
2583            }
2584        }
2585    }
2586
2587    fn exec_command_browse(
2588        &mut self,
2589        repeating: bool,
2590        ty: String,
2591        next_delay: u32,
2592        listener: Sender<ServiceEvent>,
2593    ) {
2594        let pretty_addrs: Vec<String> = self
2595            .intf_socks
2596            .keys()
2597            .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2598            .collect();
2599
2600        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2601            "{ty} on {} interfaces [{}]",
2602            pretty_addrs.len(),
2603            pretty_addrs.join(", ")
2604        ))) {
2605            debug!(
2606                "Failed to send SearchStarted({})(repeating:{}): {}",
2607                &ty, repeating, e
2608            );
2609            return;
2610        }
2611        if !repeating {
2612            // Binds a `listener` to querying mDNS domain type `ty`.
2613            //
2614            // If there is already a `listener`, it will be updated, i.e. overwritten.
2615            self.service_queriers.insert(ty.clone(), listener.clone());
2616
2617            // if we already have the records in our cache, just send them
2618            self.query_cache_for_service(&ty, &listener);
2619        }
2620
2621        self.send_query(&ty, RRType::PTR);
2622        self.increase_counter(Counter::Browse, 1);
2623
2624        let next_time = current_time_millis() + (next_delay * 1000) as u64;
2625        let max_delay = 60 * 60;
2626        let delay = cmp::min(next_delay * 2, max_delay);
2627        self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2628    }
2629
2630    fn exec_command_resolve_hostname(
2631        &mut self,
2632        repeating: bool,
2633        hostname: String,
2634        next_delay: u32,
2635        listener: Sender<HostnameResolutionEvent>,
2636        timeout: Option<u64>,
2637    ) {
2638        let addr_list: Vec<_> = self.intf_socks.keys().collect();
2639        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2640            "{} on addrs {:?}",
2641            &hostname, &addr_list
2642        ))) {
2643            debug!(
2644                "Failed to send ResolveStarted({})(repeating:{}): {}",
2645                &hostname, repeating, e
2646            );
2647            return;
2648        }
2649        if !repeating {
2650            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2651            // if we already have the records in our cache, just send them
2652            self.query_cache_for_hostname(&hostname, listener.clone());
2653        }
2654
2655        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2656        self.increase_counter(Counter::ResolveHostname, 1);
2657
2658        let now = current_time_millis();
2659        let next_time = now + u64::from(next_delay) * 1000;
2660        let max_delay = 60 * 60;
2661        let delay = cmp::min(next_delay * 2, max_delay);
2662
2663        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
2664        if self
2665            .hostname_resolvers
2666            .get(&hostname)
2667            .and_then(|(_sender, timeout)| *timeout)
2668            .map(|timeout| next_time < timeout)
2669            .unwrap_or(true)
2670        {
2671            self.add_retransmission(
2672                next_time,
2673                Command::ResolveHostname(hostname, delay, listener, None),
2674            );
2675        }
2676    }
2677
2678    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2679        let pending_query = self.query_unresolved(&instance);
2680        let max_try = 3;
2681        if pending_query && try_count < max_try {
2682            // Note that if the current try already succeeds, the next retransmission
2683            // will be no-op as the cache has been updated.
2684            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2685            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2686        }
2687    }
2688
2689    fn exec_command_unregister(
2690        &mut self,
2691        repeating: bool,
2692        fullname: String,
2693        resp_s: Sender<UnregisterStatus>,
2694    ) {
2695        let response = match self.my_services.remove_entry(&fullname) {
2696            None => {
2697                debug!("unregister: cannot find such service {}", &fullname);
2698                UnregisterStatus::NotFound
2699            }
2700            Some((_k, info)) => {
2701                let mut timers = Vec::new();
2702                // Send one unregister per interface and ip version
2703                let mut multicast_sent_trackers = HashSet::new();
2704
2705                for (intf, sock) in self.intf_socks.iter() {
2706                    if let Some(tracker) = multicast_send_tracker(intf) {
2707                        if multicast_sent_trackers.contains(&tracker) {
2708                            continue; // no need to send unregister the same interface with same ip version.
2709                        }
2710                        multicast_sent_trackers.insert(tracker);
2711                    }
2712                    let packet = self.unregister_service(&info, intf, sock);
2713                    // repeat for one time just in case some peers miss the message
2714                    if !repeating && !packet.is_empty() {
2715                        let next_time = current_time_millis() + 120;
2716                        self.retransmissions.push(ReRun {
2717                            next_time,
2718                            command: Command::UnregisterResend(packet, intf.clone()),
2719                        });
2720                        timers.push(next_time);
2721                    }
2722                }
2723
2724                for t in timers {
2725                    self.add_timer(t);
2726                }
2727
2728                self.increase_counter(Counter::Unregister, 1);
2729                UnregisterStatus::OK
2730            }
2731        };
2732        if let Err(e) = resp_s.send(response) {
2733            debug!("unregister: failed to send response: {}", e);
2734        }
2735    }
2736
2737    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2738        if let Some(sock) = self.intf_socks.get(&intf) {
2739            debug!("UnregisterResend from {}", &intf.ip());
2740            multicast_on_intf(&packet[..], &intf, sock);
2741            self.increase_counter(Counter::UnregisterResend, 1);
2742        }
2743    }
2744
2745    fn exec_command_stop_browse(&mut self, ty_domain: String) {
2746        match self.service_queriers.remove_entry(&ty_domain) {
2747            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2748            Some((ty, sender)) => {
2749                // Remove pending browse commands in the reruns.
2750                trace!("StopBrowse: removed queryer for {}", &ty);
2751                let mut i = 0;
2752                while i < self.retransmissions.len() {
2753                    if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2754                        if t == &ty {
2755                            self.retransmissions.remove(i);
2756                            trace!("StopBrowse: removed retransmission for {}", &ty);
2757                            continue;
2758                        }
2759                    }
2760                    i += 1;
2761                }
2762
2763                // Notify the client.
2764                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2765                    Ok(()) => trace!("Sent SearchStopped to the listener"),
2766                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
2767                }
2768            }
2769        }
2770    }
2771
2772    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2773        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2774            // Remove pending resolve commands in the reruns.
2775            trace!("StopResolve: removed queryer for {}", &host);
2776            let mut i = 0;
2777            while i < self.retransmissions.len() {
2778                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2779                    if t == &host {
2780                        self.retransmissions.remove(i);
2781                        trace!("StopResolve: removed retransmission for {}", &host);
2782                        continue;
2783                    }
2784                }
2785                i += 1;
2786            }
2787
2788            // Notify the client.
2789            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2790                Ok(()) => trace!("Sent SearchStopped to the listener"),
2791                Err(e) => debug!("Failed to send SearchStopped: {}", e),
2792            }
2793        }
2794    }
2795
2796    fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2797        let Some(info) = self.my_services.get_mut(&fullname) else {
2798            trace!("announce: cannot find such service {}", &fullname);
2799            return;
2800        };
2801
2802        let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2803            return;
2804        };
2805
2806        let Some(sock) = self.intf_socks.get(&intf) else {
2807            return;
2808        };
2809
2810        if announce_service_on_intf(dns_registry, info, &intf, sock) {
2811            let mut hostname = info.get_hostname();
2812            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2813                hostname = new_name;
2814            }
2815            let service_name = match dns_registry.name_changes.get(&fullname) {
2816                Some(new_name) => new_name.to_string(),
2817                None => fullname,
2818            };
2819
2820            debug!("resend: announce service {} on {}", service_name, intf.ip());
2821
2822            notify_monitors(
2823                &mut self.monitors,
2824                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2825            );
2826            info.set_status(&intf, ServiceStatus::Announced);
2827        } else {
2828            debug!("register-resend should not fail");
2829        }
2830
2831        self.increase_counter(Counter::RegisterResend, 1);
2832    }
2833
2834    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2835        /*
2836        RFC 6762 section 10.4:
2837        ...
2838        When the cache receives this hint that it should reconfirm some
2839        record, it MUST issue two or more queries for the resource record in
2840        dispute.  If no response is received within ten seconds, then, even
2841        though its TTL may indicate that it is not yet due to expire, that
2842        record SHOULD be promptly flushed from the cache.
2843        */
2844        let now = current_time_millis();
2845        let expire_at = if repeating {
2846            None
2847        } else {
2848            Some(now + timeout.as_millis() as u64)
2849        };
2850
2851        // send query for the resource records.
2852        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2853
2854        if !record_vec.is_empty() {
2855            let query_vec: Vec<(&str, RRType)> = record_vec
2856                .iter()
2857                .map(|(record, rr_type)| (record.as_str(), *rr_type))
2858                .collect();
2859            self.send_query_vec(&query_vec);
2860
2861            if let Some(new_expire) = expire_at {
2862                self.add_timer(new_expire); // ensure a check for the new expire time.
2863
2864                // schedule a resend 1 second later
2865                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2866            }
2867        }
2868    }
2869
2870    /// Refresh cached service records with active queriers
2871    fn refresh_active_services(&mut self) {
2872        let mut query_ptr_count = 0;
2873        let mut query_srv_count = 0;
2874        let mut new_timers = HashSet::new();
2875        let mut query_addr_count = 0;
2876
2877        for (ty_domain, _sender) in self.service_queriers.iter() {
2878            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2879            if !refreshed_timers.is_empty() {
2880                trace!("sending refresh query for PTR: {}", ty_domain);
2881                self.send_query(ty_domain, RRType::PTR);
2882                query_ptr_count += 1;
2883                new_timers.extend(refreshed_timers);
2884            }
2885
2886            let (instances, timers) = self.cache.refresh_due_srv(ty_domain);
2887            for instance in instances.iter() {
2888                trace!("sending refresh query for SRV: {}", instance);
2889                self.send_query(instance, RRType::SRV);
2890                query_srv_count += 1;
2891            }
2892            new_timers.extend(timers);
2893            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2894            for hostname in hostnames.iter() {
2895                trace!("sending refresh queries for A and AAAA:  {}", hostname);
2896                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2897                query_addr_count += 2;
2898            }
2899            new_timers.extend(timers);
2900        }
2901
2902        for timer in new_timers {
2903            self.add_timer(timer);
2904        }
2905
2906        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2907        self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
2908        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2909    }
2910}
2911
2912/// All possible events sent to the client from the daemon
2913/// regarding service discovery.
2914#[derive(Debug)]
2915pub enum ServiceEvent {
2916    /// Started searching for a service type.
2917    SearchStarted(String),
2918
2919    /// Found a specific (service_type, fullname).
2920    ServiceFound(String, String),
2921
2922    /// Resolved a service instance with detailed info.
2923    ServiceResolved(ServiceInfo),
2924
2925    /// A service instance (service_type, fullname) was removed.
2926    ServiceRemoved(String, String),
2927
2928    /// Stopped searching for a service type.
2929    SearchStopped(String),
2930}
2931
2932/// All possible events sent to the client from the daemon
2933/// regarding host resolution.
2934#[derive(Debug)]
2935#[non_exhaustive]
2936pub enum HostnameResolutionEvent {
2937    /// Started searching for the ip address of a hostname.
2938    SearchStarted(String),
2939    /// One or more addresses for a hostname has been found.
2940    AddressesFound(String, HashSet<IpAddr>),
2941    /// One or more addresses for a hostname has been removed.
2942    AddressesRemoved(String, HashSet<IpAddr>),
2943    /// The search for the ip address of a hostname has timed out.
2944    SearchTimeout(String),
2945    /// Stopped searching for the ip address of a hostname.
2946    SearchStopped(String),
2947}
2948
2949/// Some notable events from the daemon besides [`ServiceEvent`].
2950/// These events are expected to happen infrequently.
2951#[derive(Clone, Debug)]
2952#[non_exhaustive]
2953pub enum DaemonEvent {
2954    /// Daemon unsolicitly announced a service from an interface.
2955    Announce(String, String),
2956
2957    /// Daemon encountered an error.
2958    Error(Error),
2959
2960    /// Daemon detected a new IP address from the host.
2961    IpAdd(IpAddr),
2962
2963    /// Daemon detected a IP address removed from the host.
2964    IpDel(IpAddr),
2965
2966    /// Daemon resolved a name conflict by changing one of its names.
2967    /// see [DnsNameChange] for more details.
2968    NameChange(DnsNameChange),
2969
2970    /// Send out a multicast response via an IP address.
2971    Respond(IpAddr),
2972}
2973
2974/// Represents a name change due to a name conflict resolution.
2975/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
2976#[derive(Clone, Debug)]
2977pub struct DnsNameChange {
2978    /// The original name set in `ServiceInfo` by the user.
2979    pub original: String,
2980
2981    /// A new name is created by appending a suffix after the original name.
2982    ///
2983    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
2984    /// - for a host name, the suffix is `-N`, where N starts at 2.
2985    ///
2986    /// For example:
2987    ///
2988    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
2989    /// - Host name `foo.local.` becomes `foo-2.local.`
2990    pub new_name: String,
2991
2992    /// The resource record type
2993    pub rr_type: RRType,
2994
2995    /// The interface where the name conflict and its change happened.
2996    pub intf_name: String,
2997}
2998
2999/// Commands supported by the daemon
3000#[derive(Debug)]
3001enum Command {
3002    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3003    Browse(String, u32, Sender<ServiceEvent>),
3004
3005    /// Resolve a hostname to IP addresses.
3006    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3007
3008    /// Register a service
3009    Register(ServiceInfo),
3010
3011    /// Unregister a service
3012    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3013
3014    /// Announce again a service to local network
3015    RegisterResend(String, Interface), // (fullname)
3016
3017    /// Resend unregister packet.
3018    UnregisterResend(Vec<u8>, Interface), // (packet content)
3019
3020    /// Stop browsing a service type
3021    StopBrowse(String), // (ty_domain)
3022
3023    /// Stop resolving a hostname
3024    StopResolveHostname(String), // (hostname)
3025
3026    /// Send query to resolve a service instance.
3027    /// This is used when a PTR record exists but SRV & TXT records are missing.
3028    Resolve(String, u16), // (service_instance_fullname, try_count)
3029
3030    /// Read the current values of the counters
3031    GetMetrics(Sender<Metrics>),
3032
3033    /// Get the current status of the daemon.
3034    GetStatus(Sender<DaemonStatus>),
3035
3036    /// Monitor noticable events in the daemon.
3037    Monitor(Sender<DaemonEvent>),
3038
3039    SetOption(DaemonOption),
3040
3041    GetOption(Sender<DaemonOptionVal>),
3042
3043    /// Proactively confirm a DNS resource record.
3044    ///
3045    /// The intention is to check if a service name or IP address still valid
3046    /// before its TTL expires.
3047    Verify(String, Duration),
3048
3049    Exit(Sender<DaemonStatus>),
3050}
3051
3052impl fmt::Display for Command {
3053    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3054        match self {
3055            Self::Browse(_, _, _) => write!(f, "Command Browse"),
3056            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3057            Self::Exit(_) => write!(f, "Command Exit"),
3058            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3059            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3060            Self::Monitor(_) => write!(f, "Command Monitor"),
3061            Self::Register(_) => write!(f, "Command Register"),
3062            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3063            Self::SetOption(_) => write!(f, "Command SetOption"),
3064            Self::GetOption(_) => write!(f, "Command GetOption"),
3065            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3066            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3067            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3068            Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3069            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3070            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3071        }
3072    }
3073}
3074
3075struct DaemonOptionVal {
3076    _service_name_len_max: u8,
3077    ip_check_interval: u64,
3078}
3079
3080#[derive(Debug)]
3081enum DaemonOption {
3082    ServiceNameLenMax(u8),
3083    IpCheckInterval(u64),
3084    EnableInterface(Vec<IfKind>),
3085    DisableInterface(Vec<IfKind>),
3086    MulticastLoopV4(bool),
3087    MulticastLoopV6(bool),
3088}
3089
3090/// The length of Service Domain name supported in this lib.
3091const DOMAIN_LEN: usize = "._tcp.local.".len();
3092
3093/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3094fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3095    if ty_domain.len() <= DOMAIN_LEN + 1 {
3096        // service name cannot be empty or only '_'.
3097        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3098    }
3099
3100    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3101    if service_name_len > limit as usize {
3102        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3103    }
3104    Ok(())
3105}
3106
3107/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3108fn check_domain_suffix(name: &str) -> Result<()> {
3109    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3110        return Err(e_fmt!(
3111            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3112            name
3113        ));
3114    }
3115
3116    Ok(())
3117}
3118
3119/// Validate the service name in a fully qualified name.
3120///
3121/// A Full Name = <Instance>.<Service>.<Domain>
3122/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3123///
3124/// Note: this function does not check for the length of the service name.
3125/// Instead, `register_service` method will check the length.
3126fn check_service_name(fullname: &str) -> Result<()> {
3127    check_domain_suffix(fullname)?;
3128
3129    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3130    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3131
3132    if &name[0..1] != "_" {
3133        return Err(e_fmt!("Service name must start with '_'"));
3134    }
3135
3136    let name = &name[1..];
3137
3138    if name.contains("--") {
3139        return Err(e_fmt!("Service name must not contain '--'"));
3140    }
3141
3142    if name.starts_with('-') || name.ends_with('-') {
3143        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3144    }
3145
3146    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3147    if ascii_count < 1 {
3148        return Err(e_fmt!(
3149            "Service name must contain at least one letter (eg: 'A-Za-z')"
3150        ));
3151    }
3152
3153    Ok(())
3154}
3155
3156/// Validate a hostname.
3157fn check_hostname(hostname: &str) -> Result<()> {
3158    if !hostname.ends_with(".local.") {
3159        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3160    }
3161
3162    if hostname == ".local." {
3163        return Err(e_fmt!(
3164            "The part of the hostname before '.local.' cannot be empty"
3165        ));
3166    }
3167
3168    if hostname.len() > 255 {
3169        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3170    }
3171
3172    Ok(())
3173}
3174
3175fn call_service_listener(
3176    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3177    ty_domain: &str,
3178    event: ServiceEvent,
3179) {
3180    if let Some(listener) = listeners_map.get(ty_domain) {
3181        match listener.send(event) {
3182            Ok(()) => trace!("Sent event to listener successfully"),
3183            Err(e) => debug!("Failed to send event: {}", e),
3184        }
3185    }
3186}
3187
3188fn call_hostname_resolution_listener(
3189    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3190    hostname: &str,
3191    event: HostnameResolutionEvent,
3192) {
3193    let hostname_lower = hostname.to_lowercase();
3194    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3195        match listener.send(event) {
3196            Ok(()) => trace!("Sent event to listener successfully"),
3197            Err(e) => debug!("Failed to send event: {}", e),
3198        }
3199    }
3200}
3201
3202/// Returns valid network interfaces in the host system.
3203/// Loopback interfaces are excluded.
3204fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3205    if_addrs::get_if_addrs()
3206        .unwrap_or_default()
3207        .into_iter()
3208        .filter(|i| !i.is_loopback() || with_loopback)
3209        .collect()
3210}
3211
3212/// Send an outgoing mDNS query or response, and returns the packet bytes.
3213fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3214    let qtype = if out.is_query() { "query" } else { "response" };
3215    trace!(
3216        "send outgoing {}: {} questions {} answers {} authorities {} additional",
3217        qtype,
3218        out.questions().len(),
3219        out.answers_count(),
3220        out.authorities().len(),
3221        out.additionals().len()
3222    );
3223    let packet_list = out.to_data_on_wire();
3224    for packet in packet_list.iter() {
3225        multicast_on_intf(packet, intf, sock);
3226    }
3227    packet_list
3228}
3229
3230/// Sends a multicast packet, and returns the packet bytes.
3231fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3232    if packet.len() > MAX_MSG_ABSOLUTE {
3233        debug!("Drop over-sized packet ({})", packet.len());
3234        return;
3235    }
3236
3237    let addr: SocketAddr = match intf.addr {
3238        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3239        if_addrs::IfAddr::V6(_) => {
3240            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3241            sock.set_scope_id(intf.index.unwrap_or(0)); // Choose iface for multicast
3242            sock.into()
3243        }
3244    };
3245
3246    send_packet(packet, addr, intf, socket);
3247}
3248
3249/// Sends out `packet` to `addr` on the socket in `intf_sock`.
3250fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3251    match sock.send_to(packet, addr) {
3252        Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3253        Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3254    }
3255}
3256
3257/// Returns true if `name` is a valid instance name of format:
3258/// <instance>.<service_type>.<_udp|_tcp>.local.
3259/// Note: <instance> could contain '.' as well.
3260fn valid_instance_name(name: &str) -> bool {
3261    name.split('.').count() >= 5
3262}
3263
3264fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3265    monitors.retain(|sender| {
3266        if let Err(e) = sender.try_send(event.clone()) {
3267            debug!("notify_monitors: try_send: {}", &e);
3268            if matches!(e, TrySendError::Disconnected(_)) {
3269                return false; // This monitor is dropped.
3270            }
3271        }
3272        true
3273    });
3274}
3275
3276/// Check if all unique records passed "probing", and if yes, create a packet
3277/// to announce the service.
3278fn prepare_announce(
3279    info: &ServiceInfo,
3280    intf: &Interface,
3281    dns_registry: &mut DnsRegistry,
3282) -> Option<DnsOutgoing> {
3283    let intf_addrs = info.get_addrs_on_intf(intf);
3284    if intf_addrs.is_empty() {
3285        trace!("No valid addrs to add on intf {:?}", &intf);
3286        return None;
3287    }
3288
3289    // check if we changed our name due to conflicts.
3290    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3291        Some(new_name) => new_name,
3292        None => info.get_fullname(),
3293    };
3294
3295    debug!(
3296        "prepare to announce service {service_fullname} on {}: {}",
3297        &intf.name,
3298        &intf.ip()
3299    );
3300
3301    let mut probing_count = 0;
3302    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3303    let create_time = current_time_millis() + fastrand::u64(0..250);
3304
3305    out.add_answer_at_time(
3306        DnsPointer::new(
3307            info.get_type(),
3308            RRType::PTR,
3309            CLASS_IN,
3310            info.get_other_ttl(),
3311            service_fullname.to_string(),
3312        ),
3313        0,
3314    );
3315
3316    if let Some(sub) = info.get_subtype() {
3317        trace!("Adding subdomain {}", sub);
3318        out.add_answer_at_time(
3319            DnsPointer::new(
3320                sub,
3321                RRType::PTR,
3322                CLASS_IN,
3323                info.get_other_ttl(),
3324                service_fullname.to_string(),
3325            ),
3326            0,
3327        );
3328    }
3329
3330    // SRV records.
3331    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3332        Some(new_name) => new_name.to_string(),
3333        None => info.get_hostname().to_string(),
3334    };
3335
3336    let mut srv = DnsSrv::new(
3337        info.get_fullname(),
3338        CLASS_IN | CLASS_CACHE_FLUSH,
3339        info.get_host_ttl(),
3340        info.get_priority(),
3341        info.get_weight(),
3342        info.get_port(),
3343        hostname,
3344    );
3345
3346    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3347        srv.get_record_mut().set_new_name(new_name.to_string());
3348    }
3349
3350    if !info.requires_probe()
3351        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3352    {
3353        out.add_answer_at_time(srv, 0);
3354    } else {
3355        probing_count += 1;
3356    }
3357
3358    // TXT records.
3359
3360    let mut txt = DnsTxt::new(
3361        info.get_fullname(),
3362        CLASS_IN | CLASS_CACHE_FLUSH,
3363        info.get_other_ttl(),
3364        info.generate_txt(),
3365    );
3366
3367    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3368        txt.get_record_mut().set_new_name(new_name.to_string());
3369    }
3370
3371    if !info.requires_probe()
3372        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3373    {
3374        out.add_answer_at_time(txt, 0);
3375    } else {
3376        probing_count += 1;
3377    }
3378
3379    // Address records. (A and AAAA)
3380
3381    let hostname = info.get_hostname();
3382    for address in intf_addrs {
3383        let mut dns_addr = DnsAddress::new(
3384            hostname,
3385            ip_address_rr_type(&address),
3386            CLASS_IN | CLASS_CACHE_FLUSH,
3387            info.get_host_ttl(),
3388            address,
3389        );
3390
3391        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3392            dns_addr.get_record_mut().set_new_name(new_name.to_string());
3393        }
3394
3395        if !info.requires_probe()
3396            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3397        {
3398            out.add_answer_at_time(dns_addr, 0);
3399        } else {
3400            probing_count += 1;
3401        }
3402    }
3403
3404    if probing_count > 0 {
3405        return None;
3406    }
3407
3408    Some(out)
3409}
3410
3411/// Send an unsolicited response for owned service via `intf` and `sock`.
3412/// Returns true if sent out successfully.
3413fn announce_service_on_intf(
3414    dns_registry: &mut DnsRegistry,
3415    info: &ServiceInfo,
3416    intf: &Interface,
3417    sock: &MioUdpSocket,
3418) -> bool {
3419    if let Some(out) = prepare_announce(info, intf, dns_registry) {
3420        send_dns_outgoing(&out, intf, sock);
3421        return true;
3422    }
3423    false
3424}
3425
3426/// Returns a new name based on the `original` to avoid conflicts.
3427/// If the name already contains a number in parentheses, increments that number.
3428///
3429/// Examples:
3430/// - `foo.local.` becomes `foo (2).local.`
3431/// - `foo (2).local.` becomes `foo (3).local.`
3432/// - `foo (9)` becomes `foo (10)`
3433fn name_change(original: &str) -> String {
3434    let mut parts: Vec<_> = original.split('.').collect();
3435    let Some(first_part) = parts.get_mut(0) else {
3436        return format!("{original} (2)");
3437    };
3438
3439    let mut new_name = format!("{} (2)", first_part);
3440
3441    // check if there is already has `(<num>)` suffix.
3442    if let Some(paren_pos) = first_part.rfind(" (") {
3443        // Check if there's a closing parenthesis
3444        if let Some(end_paren) = first_part[paren_pos..].find(')') {
3445            let absolute_end_pos = paren_pos + end_paren;
3446            // Only process if the closing parenthesis is the last character
3447            if absolute_end_pos == first_part.len() - 1 {
3448                let num_start = paren_pos + 2; // Skip " ("
3449                                               // Try to parse the number between parentheses
3450                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3451                    let base_name = &first_part[..paren_pos];
3452                    new_name = format!("{} ({})", base_name, number + 1)
3453                }
3454            }
3455        }
3456    }
3457
3458    *first_part = &new_name;
3459    parts.join(".")
3460}
3461
3462/// Returns a new name based on the `original` to avoid conflicts.
3463/// If the name already contains a hyphenated number, increments that number.
3464///
3465/// Examples:
3466/// - `foo.local.` becomes `foo-2.local.`
3467/// - `foo-2.local.` becomes `foo-3.local.`
3468/// - `foo` becomes `foo-2`
3469fn hostname_change(original: &str) -> String {
3470    let mut parts: Vec<_> = original.split('.').collect();
3471    let Some(first_part) = parts.get_mut(0) else {
3472        return format!("{original}-2");
3473    };
3474
3475    let mut new_name = format!("{}-2", first_part);
3476
3477    // check if there is already a `-<num>` suffix
3478    if let Some(hyphen_pos) = first_part.rfind('-') {
3479        // Try to parse everything after the hyphen as a number
3480        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3481            let base_name = &first_part[..hyphen_pos];
3482            new_name = format!("{}-{}", base_name, number + 1);
3483        }
3484    }
3485
3486    *first_part = &new_name;
3487    parts.join(".")
3488}
3489
3490fn add_answer_with_additionals(
3491    out: &mut DnsOutgoing,
3492    msg: &DnsIncoming,
3493    service: &ServiceInfo,
3494    intf: &Interface,
3495    dns_registry: &DnsRegistry,
3496) {
3497    let intf_addrs = service.get_addrs_on_intf(intf);
3498    if intf_addrs.is_empty() {
3499        trace!("No addrs on LAN of intf {:?}", intf);
3500        return;
3501    }
3502
3503    // check if we changed our name due to conflicts.
3504    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3505        Some(new_name) => new_name,
3506        None => service.get_fullname(),
3507    };
3508
3509    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3510        Some(new_name) => new_name,
3511        None => service.get_hostname(),
3512    };
3513
3514    let ptr_added = out.add_answer(
3515        msg,
3516        DnsPointer::new(
3517            service.get_type(),
3518            RRType::PTR,
3519            CLASS_IN,
3520            service.get_other_ttl(),
3521            service_fullname.to_string(),
3522        ),
3523    );
3524
3525    if !ptr_added {
3526        trace!("answer was not added for msg {:?}", msg);
3527        return;
3528    }
3529
3530    if let Some(sub) = service.get_subtype() {
3531        trace!("Adding subdomain {}", sub);
3532        out.add_additional_answer(DnsPointer::new(
3533            sub,
3534            RRType::PTR,
3535            CLASS_IN,
3536            service.get_other_ttl(),
3537            service_fullname.to_string(),
3538        ));
3539    }
3540
3541    // Add recommended additional answers according to
3542    // https://tools.ietf.org/html/rfc6763#section-12.1.
3543    out.add_additional_answer(DnsSrv::new(
3544        service_fullname,
3545        CLASS_IN | CLASS_CACHE_FLUSH,
3546        service.get_host_ttl(),
3547        service.get_priority(),
3548        service.get_weight(),
3549        service.get_port(),
3550        hostname.to_string(),
3551    ));
3552
3553    out.add_additional_answer(DnsTxt::new(
3554        service_fullname,
3555        CLASS_IN | CLASS_CACHE_FLUSH,
3556        service.get_host_ttl(),
3557        service.generate_txt(),
3558    ));
3559
3560    for address in intf_addrs {
3561        out.add_additional_answer(DnsAddress::new(
3562            hostname,
3563            ip_address_rr_type(&address),
3564            CLASS_IN | CLASS_CACHE_FLUSH,
3565            service.get_host_ttl(),
3566            address,
3567        ));
3568    }
3569}
3570
3571#[cfg(test)]
3572mod tests {
3573    use super::{
3574        check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3575        name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3576        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3577        MDNS_PORT,
3578    };
3579    use crate::{
3580        dns_parser::{DnsOutgoing, DnsPointer, RRType, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE},
3581        service_daemon::check_hostname,
3582    };
3583    use std::{
3584        net::{SocketAddr, SocketAddrV4},
3585        time::Duration,
3586    };
3587    use test_log::test;
3588
3589    #[test]
3590    fn test_socketaddr_print() {
3591        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3592        let print = format!("{}", addr);
3593        assert_eq!(print, "224.0.0.251:5353");
3594    }
3595
3596    #[test]
3597    fn test_instance_name() {
3598        assert!(valid_instance_name("my-laser._printer._tcp.local."));
3599        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3600        assert!(!valid_instance_name("_printer._tcp.local."));
3601    }
3602
3603    #[test]
3604    fn test_check_service_name_length() {
3605        let result = check_service_name_length("_tcp", 100);
3606        assert!(result.is_err());
3607        if let Err(e) = result {
3608            println!("{}", e);
3609        }
3610    }
3611
3612    #[test]
3613    fn test_check_hostname() {
3614        // valid hostnames
3615        for hostname in &[
3616            "my_host.local.",
3617            &("A".repeat(255 - ".local.".len()) + ".local."),
3618        ] {
3619            let result = check_hostname(hostname);
3620            assert!(result.is_ok());
3621        }
3622
3623        // erroneous hostnames
3624        for hostname in &[
3625            "my_host.local",
3626            ".local.",
3627            &("A".repeat(256 - ".local.".len()) + ".local."),
3628        ] {
3629            let result = check_hostname(hostname);
3630            assert!(result.is_err());
3631            if let Err(e) = result {
3632                println!("{}", e);
3633            }
3634        }
3635    }
3636
3637    #[test]
3638    fn test_check_domain_suffix() {
3639        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3640        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3641        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3642        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3643        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3644        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3645    }
3646
3647    #[test]
3648    fn test_service_with_temporarily_invalidated_ptr() {
3649        // Create a daemon
3650        let d = ServiceDaemon::new().expect("Failed to create daemon");
3651
3652        let service = "_test_inval_ptr._udp.local.";
3653        let host_name = "my_host_tmp_invalidated_ptr.local.";
3654        let intfs: Vec<_> = my_ip_interfaces(false);
3655        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3656        let port = 5201;
3657        let my_service =
3658            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3659                .expect("invalid service info")
3660                .enable_addr_auto();
3661        let result = d.register(my_service.clone());
3662        assert!(result.is_ok());
3663
3664        // Browse for a service
3665        let browse_chan = d.browse(service).unwrap();
3666        let timeout = Duration::from_secs(2);
3667        let mut resolved = false;
3668
3669        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3670            match event {
3671                ServiceEvent::ServiceResolved(info) => {
3672                    resolved = true;
3673                    println!("Resolved a service of {}", &info.get_fullname());
3674                    break;
3675                }
3676                e => {
3677                    println!("Received event {:?}", e);
3678                }
3679            }
3680        }
3681
3682        assert!(resolved);
3683
3684        println!("Stopping browse of {}", service);
3685        // Pause browsing so restarting will cause a new immediate query.
3686        // Unregistering will not work here, it will invalidate all the records.
3687        d.stop_browse(service).unwrap();
3688
3689        // Ensure the search is stopped.
3690        // Reduces the chance of receiving an answer adding the ptr back to the
3691        // cache causing the later browse to return directly from the cache.
3692        // (which invalidates what this test is trying to test for.)
3693        let mut stopped = false;
3694        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3695            match event {
3696                ServiceEvent::SearchStopped(_) => {
3697                    stopped = true;
3698                    println!("Stopped browsing service");
3699                    break;
3700                }
3701                // Other `ServiceResolved` messages may be received
3702                // here as they come from different interfaces.
3703                // That's fine for this test.
3704                e => {
3705                    println!("Received event {:?}", e);
3706                }
3707            }
3708        }
3709
3710        assert!(stopped);
3711
3712        // Invalidate the ptr from the service to the host.
3713        let invalidate_ptr_packet = DnsPointer::new(
3714            my_service.get_type(),
3715            RRType::PTR,
3716            CLASS_IN,
3717            0,
3718            my_service.get_fullname().to_string(),
3719        );
3720
3721        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3722        packet_buffer.add_additional_answer(invalidate_ptr_packet);
3723
3724        for intf in intfs {
3725            let sock = new_socket_bind(&intf, true).unwrap();
3726            send_dns_outgoing(&packet_buffer, &intf, &sock);
3727        }
3728
3729        println!(
3730            "Sent PTR record invalidation. Starting second browse for {}",
3731            service
3732        );
3733
3734        // Restart the browse to force the sender to re-send the announcements.
3735        let browse_chan = d.browse(service).unwrap();
3736
3737        resolved = false;
3738        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3739            match event {
3740                ServiceEvent::ServiceResolved(info) => {
3741                    resolved = true;
3742                    println!("Resolved a service of {}", &info.get_fullname());
3743                    break;
3744                }
3745                e => {
3746                    println!("Received event {:?}", e);
3747                }
3748            }
3749        }
3750
3751        assert!(resolved);
3752        d.shutdown().unwrap();
3753    }
3754
3755    #[test]
3756    fn test_expired_srv() {
3757        // construct service info
3758        let service_type = "_expired-srv._udp.local.";
3759        let instance = "test_instance";
3760        let host_name = "expired_srv_host.local.";
3761        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3762            .unwrap()
3763            .enable_addr_auto();
3764        // let fullname = my_service.get_fullname().to_string();
3765
3766        // set SRV to expire soon.
3767        let new_ttl = 3; // for testing only.
3768        my_service._set_host_ttl(new_ttl);
3769
3770        // register my service
3771        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3772        let result = mdns_server.register(my_service);
3773        assert!(result.is_ok());
3774
3775        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3776        let browse_chan = mdns_client.browse(service_type).unwrap();
3777        let timeout = Duration::from_secs(2);
3778        let mut resolved = false;
3779
3780        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3781            match event {
3782                ServiceEvent::ServiceResolved(info) => {
3783                    resolved = true;
3784                    println!("Resolved a service of {}", &info.get_fullname());
3785                    break;
3786                }
3787                _ => {}
3788            }
3789        }
3790
3791        assert!(resolved);
3792
3793        // Exit the server so that no more responses.
3794        mdns_server.shutdown().unwrap();
3795
3796        // SRV record in the client cache will expire.
3797        let expire_timeout = Duration::from_secs(new_ttl as u64);
3798        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3799            match event {
3800                ServiceEvent::ServiceRemoved(service_type, full_name) => {
3801                    println!("Service removed: {}: {}", &service_type, &full_name);
3802                    break;
3803                }
3804                _ => {}
3805            }
3806        }
3807    }
3808
3809    #[test]
3810    fn test_hostname_resolution_address_removed() {
3811        // Create a mDNS server
3812        let server = ServiceDaemon::new().expect("Failed to create server");
3813        let hostname = "addr_remove_host._tcp.local.";
3814        let service_ip_addr = my_ip_interfaces(false)
3815            .iter()
3816            .find(|iface| iface.ip().is_ipv4())
3817            .map(|iface| iface.ip())
3818            .unwrap();
3819
3820        let mut my_service = ServiceInfo::new(
3821            "_host_res_test._tcp.local.",
3822            "my_instance",
3823            hostname,
3824            &service_ip_addr,
3825            1234,
3826            None,
3827        )
3828        .expect("invalid service info");
3829
3830        // Set a short TTL for addresses for testing.
3831        let addr_ttl = 2;
3832        my_service._set_host_ttl(addr_ttl); // Expire soon
3833
3834        server.register(my_service).unwrap();
3835
3836        // Create a mDNS client for resolving the hostname.
3837        let client = ServiceDaemon::new().expect("Failed to create client");
3838        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3839        let resolved = loop {
3840            match event_receiver.recv() {
3841                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3842                    assert!(found_hostname == hostname);
3843                    assert!(addresses.contains(&service_ip_addr));
3844                    println!("address found: {:?}", &addresses);
3845                    break true;
3846                }
3847                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3848                Ok(_event) => {}
3849                Err(_) => break false,
3850            }
3851        };
3852
3853        assert!(resolved);
3854
3855        // Shutdown the server so no more responses / refreshes for addresses.
3856        server.shutdown().unwrap();
3857
3858        // Wait till hostname address record expires, with 1 second grace period.
3859        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3860        let removed = loop {
3861            match event_receiver.recv_timeout(timeout) {
3862                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3863                    assert!(removed_host == hostname);
3864                    assert!(addresses.contains(&service_ip_addr));
3865
3866                    println!(
3867                        "address removed: hostname: {} addresses: {:?}",
3868                        &hostname, &addresses
3869                    );
3870                    break true;
3871                }
3872                Ok(_event) => {}
3873                Err(_) => {
3874                    break false;
3875                }
3876            }
3877        };
3878
3879        assert!(removed);
3880
3881        client.shutdown().unwrap();
3882    }
3883
3884    #[test]
3885    fn test_refresh_ptr() {
3886        // construct service info
3887        let service_type = "_refresh-ptr._udp.local.";
3888        let instance = "test_instance";
3889        let host_name = "refresh_ptr_host.local.";
3890        let service_ip_addr = my_ip_interfaces(false)
3891            .iter()
3892            .find(|iface| iface.ip().is_ipv4())
3893            .map(|iface| iface.ip())
3894            .unwrap();
3895
3896        let mut my_service = ServiceInfo::new(
3897            service_type,
3898            instance,
3899            host_name,
3900            &service_ip_addr,
3901            5023,
3902            None,
3903        )
3904        .unwrap();
3905
3906        let new_ttl = 3; // for testing only.
3907        my_service._set_other_ttl(new_ttl);
3908
3909        // register my service
3910        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3911        let result = mdns_server.register(my_service);
3912        assert!(result.is_ok());
3913
3914        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3915        let browse_chan = mdns_client.browse(service_type).unwrap();
3916        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
3917        let mut resolved = false;
3918
3919        // resolve the service first.
3920        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3921            match event {
3922                ServiceEvent::ServiceResolved(info) => {
3923                    resolved = true;
3924                    println!("Resolved a service of {}", &info.get_fullname());
3925                    break;
3926                }
3927                _ => {}
3928            }
3929        }
3930
3931        assert!(resolved);
3932
3933        // wait over 80% of TTL, and refresh PTR should be sent out.
3934        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
3935        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3936            println!("event: {:?}", &event);
3937        }
3938
3939        // verify refresh counter.
3940        let metrics_chan = mdns_client.get_metrics().unwrap();
3941        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
3942        let refresh_counter = metrics["cache-refresh-ptr"];
3943        assert_eq!(refresh_counter, 1);
3944
3945        // Exit the server so that no more responses.
3946        mdns_server.shutdown().unwrap();
3947        mdns_client.shutdown().unwrap();
3948    }
3949
3950    #[test]
3951    fn test_name_change() {
3952        assert_eq!(name_change("foo.local."), "foo (2).local.");
3953        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
3954        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
3955        assert_eq!(name_change("foo"), "foo (2)");
3956        assert_eq!(name_change("foo (2)"), "foo (3)");
3957        assert_eq!(name_change(""), " (2)");
3958
3959        // Additional edge cases
3960        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
3961        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
3962        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
3963    }
3964
3965    #[test]
3966    fn test_hostname_change() {
3967        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
3968        assert_eq!(hostname_change("foo"), "foo-2");
3969        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
3970        assert_eq!(hostname_change("foo-9"), "foo-10");
3971        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
3972    }
3973}