mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38        FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{
42        split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43    },
44    Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51    cmp::{self, Reverse},
52    collections::{BinaryHeap, HashMap, HashSet},
53    fmt,
54    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55    str, thread,
56    time::Duration,
57    vec,
58};
59
60/// The default max length of the service name without domain, not including the
61/// leading underscore (`_`). It is set to 15 per
62/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
63pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
66/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
67pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
68
69const MDNS_PORT: u16 = 5353;
70const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
71const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
72const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
73
74const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
75
76/// Response status code for the service `unregister` call.
77#[derive(Debug)]
78pub enum UnregisterStatus {
79    /// Unregister was successful.
80    OK,
81    /// The service was not found in the registration.
82    NotFound,
83}
84
85/// Status code for the service daemon.
86#[derive(Debug, PartialEq, Clone, Eq)]
87#[non_exhaustive]
88pub enum DaemonStatus {
89    /// The daemon is running as normal.
90    Running,
91
92    /// The daemon has been shutdown.
93    Shutdown,
94}
95
96/// Different counters included in the metrics.
97/// Currently all counters are for outgoing packets.
98#[derive(Hash, Eq, PartialEq)]
99enum Counter {
100    Register,
101    RegisterResend,
102    Unregister,
103    UnregisterResend,
104    Browse,
105    ResolveHostname,
106    Respond,
107    CacheRefreshPTR,
108    CacheRefreshSRV,
109    CacheRefreshAddr,
110    KnownAnswerSuppression,
111}
112
113impl fmt::Display for Counter {
114    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
115        match self {
116            Self::Register => write!(f, "register"),
117            Self::RegisterResend => write!(f, "register-resend"),
118            Self::Unregister => write!(f, "unregister"),
119            Self::UnregisterResend => write!(f, "unregister-resend"),
120            Self::Browse => write!(f, "browse"),
121            Self::ResolveHostname => write!(f, "resolve-hostname"),
122            Self::Respond => write!(f, "respond"),
123            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
124            Self::CacheRefreshSRV => write!(f, "cache-refresh-srv"),
125            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
126            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
127        }
128    }
129}
130
131/// The metrics is a HashMap of (name_key, i64_value).
132/// The main purpose is to help monitoring the mDNS packet traffic.
133pub type Metrics = HashMap<String, i64>;
134
135const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
136
137/// A daemon thread for mDNS
138///
139/// This struct provides a handle and an API to the daemon. It is cloneable.
140#[derive(Clone)]
141pub struct ServiceDaemon {
142    /// Sender handle of the channel to the daemon.
143    sender: Sender<Command>,
144
145    /// Send to this addr to signal that a `Command` is coming.
146    ///
147    /// The daemon listens on this addr together with other mDNS sockets,
148    /// to avoid busy polling the flume channel. If there is a way to poll
149    /// the channel and mDNS sockets together, then this can be removed.
150    signal_addr: SocketAddr,
151}
152
153impl ServiceDaemon {
154    /// Creates a new daemon and spawns a thread to run the daemon.
155    ///
156    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
157    /// ask callers to set the port.
158    pub fn new() -> Result<Self> {
159        // Use port 0 to allow the system assign a random available port,
160        // no need for a pre-defined port number.
161        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
162
163        let signal_sock = UdpSocket::bind(signal_addr)
164            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
165
166        // Get the socket with the OS chosen port
167        let signal_addr = signal_sock
168            .local_addr()
169            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
170
171        // Must be nonblocking so we can listen to it together with mDNS sockets.
172        signal_sock
173            .set_nonblocking(true)
174            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
175
176        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
177
178        let (sender, receiver) = bounded(100);
179
180        // Spawn the daemon thread
181        let mio_sock = MioUdpSocket::from_std(signal_sock);
182        thread::Builder::new()
183            .name("mDNS_daemon".to_string())
184            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
185            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
186
187        Ok(Self {
188            sender,
189            signal_addr,
190        })
191    }
192
193    /// Sends `cmd` to the daemon via its channel, and sends a signal
194    /// to its sock addr to notify.
195    fn send_cmd(&self, cmd: Command) -> Result<()> {
196        let cmd_name = cmd.to_string();
197
198        // First, send to the flume channel.
199        self.sender.try_send(cmd).map_err(|e| match e {
200            TrySendError::Full(_) => Error::Again,
201            e => e_fmt!("flume::channel::send failed: {}", e),
202        })?;
203
204        // Second, send a signal to notify the daemon.
205        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
206        let socket = UdpSocket::bind(addr)
207            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
208        socket
209            .send_to(cmd_name.as_bytes(), self.signal_addr)
210            .map_err(|e| {
211                e_fmt!(
212                    "signal socket send_to {} ({}) failed: {}",
213                    self.signal_addr,
214                    cmd_name,
215                    e
216                )
217            })?;
218
219        Ok(())
220    }
221
222    /// Starts browsing for a specific service type.
223    ///
224    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
225    ///
226    /// Returns a channel `Receiver` to receive events about the service. The caller
227    /// can call `.recv_async().await` on this receiver to handle events in an
228    /// async environment or call `.recv()` in a sync environment.
229    ///
230    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
231    /// finding more details, i.e. SRV records and TXT records.
232    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
233        check_domain_suffix(service_type)?;
234
235        let (resp_s, resp_r) = bounded(10);
236        self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
237        Ok(resp_r)
238    }
239
240    /// Stops searching for a specific service type.
241    ///
242    /// When an error is returned, the caller should retry only when
243    /// the error is `Error::Again`, otherwise should log and move on.
244    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
245        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
246    }
247
248    /// Starts querying for the ip addresses of a hostname.
249    ///
250    /// Returns a channel `Receiver` to receive events about the hostname.
251    /// The caller can call `.recv_async().await` on this receiver to handle events in an
252    /// async environment or call `.recv()` in a sync environment.
253    ///
254    /// The `timeout` is specified in milliseconds.
255    pub fn resolve_hostname(
256        &self,
257        hostname: &str,
258        timeout: Option<u64>,
259    ) -> Result<Receiver<HostnameResolutionEvent>> {
260        check_hostname(hostname)?;
261        let (resp_s, resp_r) = bounded(10);
262        self.send_cmd(Command::ResolveHostname(
263            hostname.to_string(),
264            1,
265            resp_s,
266            timeout,
267        ))?;
268        Ok(resp_r)
269    }
270
271    /// Stops querying for the ip addresses of a hostname.
272    ///
273    /// When an error is returned, the caller should retry only when
274    /// the error is `Error::Again`, otherwise should log and move on.
275    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
276        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
277    }
278
279    /// Registers a service provided by this host.
280    ///
281    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
282    /// this method will automatically fill in addresses from the host.
283    ///
284    /// To re-announce a service with an updated `service_info`, just call
285    /// this `register` function again. No need to call `unregister` first.
286    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
287        check_service_name(service_info.get_fullname())?;
288        check_hostname(service_info.get_hostname())?;
289
290        self.send_cmd(Command::Register(service_info))
291    }
292
293    /// Unregisters a service. This is a graceful shutdown of a service.
294    ///
295    /// Returns a channel receiver that is used to receive the status code
296    /// of the unregister.
297    ///
298    /// When an error is returned, the caller should retry only when
299    /// the error is `Error::Again`, otherwise should log and move on.
300    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
301        let (resp_s, resp_r) = bounded(1);
302        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
303        Ok(resp_r)
304    }
305
306    /// Starts to monitor events from the daemon.
307    ///
308    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
309    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
310        let (resp_s, resp_r) = bounded(100);
311        self.send_cmd(Command::Monitor(resp_s))?;
312        Ok(resp_r)
313    }
314
315    /// Shuts down the daemon thread and returns a channel to receive the status.
316    ///
317    /// When an error is returned, the caller should retry only when
318    /// the error is `Error::Again`, otherwise should log and move on.
319    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
320        let (resp_s, resp_r) = bounded(1);
321        self.send_cmd(Command::Exit(resp_s))?;
322        Ok(resp_r)
323    }
324
325    /// Returns the status of the daemon.
326    ///
327    /// When an error is returned, the caller should retry only when
328    /// the error is `Error::Again`, otherwise should consider the daemon
329    /// stopped working and move on.
330    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
331        let (resp_s, resp_r) = bounded(1);
332
333        if self.sender.is_disconnected() {
334            resp_s
335                .send(DaemonStatus::Shutdown)
336                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
337        } else {
338            self.send_cmd(Command::GetStatus(resp_s))?;
339        }
340
341        Ok(resp_r)
342    }
343
344    /// Returns a channel receiver for the metrics, e.g. input/output counters.
345    ///
346    /// The metrics returned is a snapshot. Hence the caller should call
347    /// this method repeatedly if they want to monitor the metrics continuously.
348    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
349        let (resp_s, resp_r) = bounded(1);
350        self.send_cmd(Command::GetMetrics(resp_s))?;
351        Ok(resp_r)
352    }
353
354    /// Change the max length allowed for a service name.
355    ///
356    /// As RFC 6763 defines a length max for a service name, a user should not call
357    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
358    ///
359    /// `len_max` is capped at an internal limit, which is currently 30.
360    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
361        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
362
363        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
364            return Err(Error::Msg(format!(
365                "service name length max {} is too large",
366                len_max
367            )));
368        }
369
370        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
371    }
372
373    /// Include interfaces that match `if_kind` for this service daemon.
374    ///
375    /// For example:
376    /// ```ignore
377    ///     daemon.enable_interface("en0")?;
378    /// ```
379    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
380        let if_kind_vec = if_kind.into_vec();
381        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
382            if_kind_vec.kinds,
383        )))
384    }
385
386    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
387    ///
388    /// For example:
389    /// ```ignore
390    ///     daemon.disable_interface(IfKind::IPv6)?;
391    /// ```
392    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
393        let if_kind_vec = if_kind.into_vec();
394        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
395            if_kind_vec.kinds,
396        )))
397    }
398
399    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
400    ///
401    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
402    /// receive announcements from a responder on the same host.
403    ///
404    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
405    ///
406    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
407    /// the UNIX version of the IP_MULTICAST_LOOP option:
408    ///
409    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
410    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
411    ///
412    /// Which means, in order NOT to receive localhost announcements, you want to call
413    /// this API on the querier side on Windows, but on the responder side on Unix.
414    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
415        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
416    }
417
418    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
419    ///
420    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
421    /// receive announcements from a responder on the same host.
422    ///
423    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
424    ///
425    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
426    /// the UNIX version of the IP_MULTICAST_LOOP option:
427    ///
428    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
429    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
430    ///
431    /// Which means, in order NOT to receive localhost announcements, you want to call
432    /// this API on the querier side on Windows, but on the responder side on Unix.
433    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
434        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
435    }
436
437    /// Proactively confirms whether a service instance still valid.
438    ///
439    /// This call will issue queries for a service instance's SRV record and Address records.
440    ///
441    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
442    /// unless there is a reason not to follow RFC.
443    ///
444    /// If no response is received within `timeout`, the current resource
445    /// records will be flushed, and if needed, `ServiceRemoved` event will be
446    /// sent to active queriers.
447    ///
448    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
449    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
450        self.send_cmd(Command::Verify(instance_fullname, timeout))
451    }
452
453    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
454        let zc = Zeroconf::new(signal_sock, poller);
455
456        if let Some(cmd) = Self::run(zc, receiver) {
457            match cmd {
458                Command::Exit(resp_s) => {
459                    // It is guaranteed that the receiver already dropped,
460                    // i.e. the daemon command channel closed.
461                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
462                        debug!("exit: failed to send response of shutdown: {}", e);
463                    }
464                }
465                _ => {
466                    debug!("Unexpected command: {:?}", cmd);
467                }
468            }
469        }
470    }
471
472    /// The main event loop of the daemon thread
473    ///
474    /// In each round, it will:
475    /// 1. select the listening sockets with a timeout.
476    /// 2. process the incoming packets if any.
477    /// 3. try_recv on its channel and execute commands.
478    /// 4. announce its registered services.
479    /// 5. process retransmissions if any.
480    fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
481        // Add the daemon's signal socket to the poller.
482        if let Err(e) = zc.poller.registry().register(
483            &mut zc.signal_sock,
484            mio::Token(SIGNAL_SOCK_EVENT_KEY),
485            mio::Interest::READABLE,
486        ) {
487            debug!("failed to add signal socket to the poller: {}", e);
488            return None;
489        }
490
491        // Add mDNS sockets to the poller.
492        for (intf, sock) in zc.intf_socks.iter_mut() {
493            let key =
494                Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
495
496            if let Err(e) =
497                zc.poller
498                    .registry()
499                    .register(sock, mio::Token(key), mio::Interest::READABLE)
500            {
501                debug!("add socket of {:?} to poller: {e}", intf);
502                return None;
503            }
504        }
505
506        // Setup timer for IP checks.
507        const IP_CHECK_INTERVAL_MILLIS: u64 = 30_000;
508        let mut next_ip_check = current_time_millis() + IP_CHECK_INTERVAL_MILLIS;
509        zc.add_timer(next_ip_check);
510
511        // Start the run loop.
512
513        let mut events = mio::Events::with_capacity(1024);
514        loop {
515            let now = current_time_millis();
516
517            let earliest_timer = zc.peek_earliest_timer();
518            let timeout = earliest_timer.map(|timer| {
519                // If `timer` already passed, set `timeout` to be 1ms.
520                let millis = if timer > now { timer - now } else { 1 };
521                Duration::from_millis(millis)
522            });
523
524            // Process incoming packets, command events and optional timeout.
525            events.clear();
526            match zc.poller.poll(&mut events, timeout) {
527                Ok(_) => zc.handle_poller_events(&events),
528                Err(e) => debug!("failed to select from sockets: {}", e),
529            }
530
531            let now = current_time_millis();
532
533            // Remove the timer if already passed.
534            if let Some(timer) = earliest_timer {
535                if now >= timer {
536                    zc.pop_earliest_timer();
537                }
538            }
539
540            // Remove hostname resolvers with expired timeouts.
541            for hostname in zc
542                .hostname_resolvers
543                .clone()
544                .into_iter()
545                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
546                .map(|(hostname, _)| hostname)
547            {
548                trace!("hostname resolver timeout for {}", &hostname);
549                call_hostname_resolution_listener(
550                    &zc.hostname_resolvers,
551                    &hostname,
552                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
553                );
554                call_hostname_resolution_listener(
555                    &zc.hostname_resolvers,
556                    &hostname,
557                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
558                );
559                zc.hostname_resolvers.remove(&hostname);
560            }
561
562            // process commands from the command channel
563            while let Ok(command) = receiver.try_recv() {
564                if matches!(command, Command::Exit(_)) {
565                    zc.status = DaemonStatus::Shutdown;
566                    return Some(command);
567                }
568                zc.exec_command(command, false);
569            }
570
571            // check for repeated commands and run them if their time is up.
572            let mut i = 0;
573            while i < zc.retransmissions.len() {
574                if now >= zc.retransmissions[i].next_time {
575                    let rerun = zc.retransmissions.remove(i);
576                    zc.exec_command(rerun.command, true);
577                } else {
578                    i += 1;
579                }
580            }
581
582            // Refresh cached service records with active queriers
583            zc.refresh_active_services();
584
585            // Refresh cached A/AAAA records with active queriers
586            let mut query_count = 0;
587            for (hostname, _sender) in zc.hostname_resolvers.iter() {
588                for (hostname, ip_addr) in
589                    zc.cache.refresh_due_hostname_resolutions(hostname).iter()
590                {
591                    zc.send_query(hostname, ip_address_rr_type(ip_addr));
592                    query_count += 1;
593                }
594            }
595
596            zc.increase_counter(Counter::CacheRefreshAddr, query_count);
597
598            // check and evict expired records in our cache
599            let now = current_time_millis();
600
601            // Notify service listeners about the expired records.
602            let expired_services = zc.cache.evict_expired_services(now);
603            zc.notify_service_removal(expired_services);
604
605            // Notify hostname listeners about the expired records.
606            let expired_addrs = zc.cache.evict_expired_addr(now);
607            for (hostname, addrs) in expired_addrs {
608                call_hostname_resolution_listener(
609                    &zc.hostname_resolvers,
610                    &hostname,
611                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
612                );
613                let instances = zc.cache.get_instances_on_host(&hostname);
614                let instance_set: HashSet<String> = instances.into_iter().collect();
615                zc.resolve_updated_instances(&instance_set);
616            }
617
618            // Send out probing queries.
619            zc.probing_handler();
620
621            // check IP changes.
622            if now > next_ip_check {
623                next_ip_check = now + IP_CHECK_INTERVAL_MILLIS;
624                zc.check_ip_changes();
625                zc.add_timer(next_ip_check);
626            }
627        }
628    }
629}
630
631/// Creates a new UDP socket that uses `intf` to send and recv multicast.
632fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
633    // Use the same socket for receiving and sending multicast packets.
634    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
635    let intf_ip = &intf.ip();
636    match intf_ip {
637        IpAddr::V4(ip) => {
638            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
639            let sock = new_socket(addr.into(), true)?;
640
641            // Join mDNS group to receive packets.
642            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
643                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
644
645            // Set IP_MULTICAST_IF to send packets.
646            sock.set_multicast_if_v4(ip)
647                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
648
649            if !should_loop {
650                sock.set_multicast_loop_v4(false)
651                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
652            }
653
654            // Test if we can send packets successfully.
655            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
656            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
657            for packet in test_packets {
658                sock.send_to(&packet, &multicast_addr)
659                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
660            }
661            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
662        }
663        IpAddr::V6(ip) => {
664            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
665            let sock = new_socket(addr.into(), true)?;
666
667            // Join mDNS group to receive packets.
668            sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
669                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
670
671            // Set IPV6_MULTICAST_IF to send packets.
672            sock.set_multicast_if_v6(intf.index.unwrap_or(0))
673                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
674
675            // We are not sending multicast packets to test this socket as there might
676            // be many IPv6 interfaces on a host and could cause such send error:
677            // "No buffer space available (os error 55)".
678
679            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
680        }
681    }
682}
683
684/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
685/// `non_block` indicates whether to set O_NONBLOCK for the socket.
686fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
687    let domain = match addr {
688        SocketAddr::V4(_) => socket2::Domain::IPV4,
689        SocketAddr::V6(_) => socket2::Domain::IPV6,
690    };
691
692    let fd = Socket::new(domain, socket2::Type::DGRAM, None)
693        .map_err(|e| e_fmt!("create socket failed: {}", e))?;
694
695    fd.set_reuse_address(true)
696        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
697    #[cfg(unix)] // this is currently restricted to Unix's in socket2
698    fd.set_reuse_port(true)
699        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
700
701    if non_block {
702        fd.set_nonblocking(true)
703            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
704    }
705
706    fd.bind(&addr.into())
707        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
708
709    trace!("new socket bind to {}", &addr);
710    Ok(fd)
711}
712
713/// Specify a UNIX timestamp in millis to run `command` for the next time.
714struct ReRun {
715    /// UNIX timestamp in millis.
716    next_time: u64,
717    command: Command,
718}
719
720/// Enum to represent the IP version.
721#[derive(Debug, Eq, Hash, PartialEq)]
722enum IpVersion {
723    V4,
724    V6,
725}
726
727/// A struct to track multicast send status for a network interface.
728#[derive(Debug, Eq, Hash, PartialEq)]
729struct MulticastSendTracker {
730    intf_index: u32,
731    ip_version: IpVersion,
732}
733
734/// Returns the multicast send tracker if the interface index is valid
735fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
736    match intf.index {
737        Some(index) => {
738            let ip_ver = match intf.addr {
739                IfAddr::V4(_) => IpVersion::V4,
740                IfAddr::V6(_) => IpVersion::V6,
741            };
742            Some(MulticastSendTracker {
743                intf_index: index,
744                ip_version: ip_ver,
745            })
746        }
747        None => None,
748    }
749}
750
751/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
752///
753/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
754#[derive(Debug, Clone)]
755#[non_exhaustive]
756pub enum IfKind {
757    /// All interfaces.
758    All,
759
760    /// All IPv4 interfaces.
761    IPv4,
762
763    /// All IPv6 interfaces.
764    IPv6,
765
766    /// By the interface name, for example "en0"
767    Name(String),
768
769    /// By an IPv4 or IPv6 address.
770    Addr(IpAddr),
771
772    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
773    ///
774    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
775    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
776    LoopbackV4,
777
778    /// ::1/128, disabled by default.
779    LoopbackV6,
780}
781
782impl IfKind {
783    /// Checks if `intf` matches with this interface kind.
784    fn matches(&self, intf: &Interface) -> bool {
785        match self {
786            Self::All => true,
787            Self::IPv4 => intf.ip().is_ipv4(),
788            Self::IPv6 => intf.ip().is_ipv6(),
789            Self::Name(ifname) => ifname == &intf.name,
790            Self::Addr(addr) => addr == &intf.ip(),
791            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
792            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
793        }
794    }
795}
796
797/// The first use case of specifying an interface was to
798/// use an interface name. Hence adding this for ergonomic reasons.
799impl From<&str> for IfKind {
800    fn from(val: &str) -> Self {
801        Self::Name(val.to_string())
802    }
803}
804
805impl From<&String> for IfKind {
806    fn from(val: &String) -> Self {
807        Self::Name(val.to_string())
808    }
809}
810
811/// Still for ergonomic reasons.
812impl From<IpAddr> for IfKind {
813    fn from(val: IpAddr) -> Self {
814        Self::Addr(val)
815    }
816}
817
818/// A list of `IfKind` that can be used to match interfaces.
819pub struct IfKindVec {
820    kinds: Vec<IfKind>,
821}
822
823/// A trait that converts a type into a Vec of `IfKind`.
824pub trait IntoIfKindVec {
825    fn into_vec(self) -> IfKindVec;
826}
827
828impl<T: Into<IfKind>> IntoIfKindVec for T {
829    fn into_vec(self) -> IfKindVec {
830        let if_kind: IfKind = self.into();
831        IfKindVec {
832            kinds: vec![if_kind],
833        }
834    }
835}
836
837impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
838    fn into_vec(self) -> IfKindVec {
839        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
840        IfKindVec { kinds }
841    }
842}
843
844/// Selection of interfaces.
845struct IfSelection {
846    /// The interfaces to be selected.
847    if_kind: IfKind,
848
849    /// Whether the `if_kind` should be enabled or not.
850    selected: bool,
851}
852
853/// A struct holding the state. It was inspired by `zeroconf` package in Python.
854struct Zeroconf {
855    /// Local interfaces with sockets to recv/send on these interfaces.
856    intf_socks: HashMap<Interface, MioUdpSocket>,
857
858    /// Map poll id to Interface.
859    poll_ids: HashMap<usize, Interface>,
860
861    /// Next poll id value
862    poll_id_count: usize,
863
864    /// Local registered services, keyed by service full names.
865    my_services: HashMap<String, ServiceInfo>,
866
867    /// Received DNS records.
868    cache: DnsCache,
869
870    /// Registered service records.
871    dns_registry_map: HashMap<Interface, DnsRegistry>,
872
873    /// Active "Browse" commands.
874    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
875
876    /// Active "ResolveHostname" commands.
877    ///
878    /// The timestamps are set at the future timestamp when the command should timeout.
879    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
880
881    /// All repeating transmissions.
882    retransmissions: Vec<ReRun>,
883
884    counters: Metrics,
885
886    /// Waits for incoming packets.
887    poller: Poll,
888
889    /// Channels to notify events.
890    monitors: Vec<Sender<DaemonEvent>>,
891
892    /// Options
893    service_name_len_max: u8,
894
895    /// All interface selections called to the daemon.
896    if_selections: Vec<IfSelection>,
897
898    /// Socket for signaling.
899    signal_sock: MioUdpSocket,
900
901    /// Timestamps marking where we need another iteration of the run loop,
902    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
903    ///
904    /// When the run loop goes through a single iteration, it will
905    /// set its timeout to the earliest timer in this list.
906    timers: BinaryHeap<Reverse<u64>>,
907
908    status: DaemonStatus,
909
910    /// Service instances that are pending for resolving SRV and TXT.
911    pending_resolves: HashSet<String>,
912
913    /// Service instances that are already resolved.
914    resolved: HashSet<String>,
915
916    multicast_loop_v4: bool,
917
918    multicast_loop_v6: bool,
919}
920
921impl Zeroconf {
922    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
923        // Get interfaces.
924        let my_ifaddrs = my_ip_interfaces(false);
925
926        // Create a socket for every IP addr.
927        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
928        // or the same interface name with different IP addrs.
929        let mut intf_socks = HashMap::new();
930        let mut dns_registry_map = HashMap::new();
931
932        for intf in my_ifaddrs {
933            let sock = match new_socket_bind(&intf, true) {
934                Ok(s) => s,
935                Err(e) => {
936                    trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
937                    continue;
938                }
939            };
940
941            dns_registry_map.insert(intf.clone(), DnsRegistry::new());
942
943            intf_socks.insert(intf, sock);
944        }
945
946        let monitors = Vec::new();
947        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
948
949        let timers = BinaryHeap::new();
950
951        // Disable loopback by default.
952        let if_selections = vec![
953            IfSelection {
954                if_kind: IfKind::LoopbackV4,
955                selected: false,
956            },
957            IfSelection {
958                if_kind: IfKind::LoopbackV6,
959                selected: false,
960            },
961        ];
962
963        let status = DaemonStatus::Running;
964
965        Self {
966            intf_socks,
967            poll_ids: HashMap::new(),
968            poll_id_count: 0,
969            my_services: HashMap::new(),
970            cache: DnsCache::new(),
971            dns_registry_map,
972            hostname_resolvers: HashMap::new(),
973            service_queriers: HashMap::new(),
974            retransmissions: Vec::new(),
975            counters: HashMap::new(),
976            poller,
977            monitors,
978            service_name_len_max,
979            if_selections,
980            signal_sock,
981            timers,
982            status,
983            pending_resolves: HashSet::new(),
984            resolved: HashSet::new(),
985            multicast_loop_v4: true,
986            multicast_loop_v6: true,
987        }
988    }
989
990    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
991        match daemon_opt {
992            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
993            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
994            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
995            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
996            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
997        }
998    }
999
1000    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1001        for if_kind in kinds {
1002            self.if_selections.push(IfSelection {
1003                if_kind,
1004                selected: true,
1005            });
1006        }
1007
1008        self.apply_intf_selections(my_ip_interfaces(true));
1009    }
1010
1011    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1012        for if_kind in kinds {
1013            self.if_selections.push(IfSelection {
1014                if_kind,
1015                selected: false,
1016            });
1017        }
1018
1019        self.apply_intf_selections(my_ip_interfaces(true));
1020    }
1021
1022    fn set_multicast_loop_v4(&mut self, on: bool) {
1023        for (_, sock) in self.intf_socks.iter_mut() {
1024            if let Err(e) = sock.set_multicast_loop_v4(on) {
1025                debug!("failed to set multicast loop v4: {e}");
1026            }
1027        }
1028    }
1029
1030    fn set_multicast_loop_v6(&mut self, on: bool) {
1031        for (_, sock) in self.intf_socks.iter_mut() {
1032            if let Err(e) = sock.set_multicast_loop_v6(on) {
1033                debug!("failed to set multicast loop v6: {e}");
1034            }
1035        }
1036    }
1037
1038    fn notify_monitors(&mut self, event: DaemonEvent) {
1039        // Only retain the monitors that are still connected.
1040        self.monitors.retain(|sender| {
1041            if let Err(e) = sender.try_send(event.clone()) {
1042                debug!("notify_monitors: try_send: {}", &e);
1043                if matches!(e, TrySendError::Disconnected(_)) {
1044                    return false; // This monitor is dropped.
1045                }
1046            }
1047            true
1048        });
1049    }
1050
1051    /// Remove `addr` in my services that enabled `addr_auto`.
1052    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1053        for (_, service_info) in self.my_services.iter_mut() {
1054            if service_info.is_addr_auto() {
1055                service_info.remove_ipaddr(addr);
1056            }
1057        }
1058    }
1059
1060    /// Insert a new interface into the poll map and return key
1061    fn add_poll(&mut self, intf: Interface) -> usize {
1062        Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1063    }
1064
1065    /// Insert a new interface into the poll map and return its key.
1066    ///
1067    /// This exists to satisfy the borrow checker
1068    fn add_poll_impl(
1069        poll_ids: &mut HashMap<usize, Interface>,
1070        poll_id_count: &mut usize,
1071        intf: Interface,
1072    ) -> usize {
1073        let key = *poll_id_count;
1074        *poll_id_count += 1;
1075        let _ = (*poll_ids).insert(key, intf);
1076        key
1077    }
1078
1079    fn add_timer(&mut self, next_time: u64) {
1080        self.timers.push(Reverse(next_time));
1081    }
1082
1083    fn peek_earliest_timer(&self) -> Option<u64> {
1084        self.timers.peek().map(|Reverse(v)| *v)
1085    }
1086
1087    fn pop_earliest_timer(&mut self) -> Option<u64> {
1088        self.timers.pop().map(|Reverse(v)| v)
1089    }
1090
1091    /// Apply all selections to `interfaces` and return the selected addresses.
1092    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1093        let intf_count = interfaces.len();
1094        let mut intf_selections = vec![true; intf_count];
1095
1096        // apply if_selections
1097        for selection in self.if_selections.iter() {
1098            // Mark the interfaces for this selection.
1099            for i in 0..intf_count {
1100                if selection.if_kind.matches(&interfaces[i]) {
1101                    intf_selections[i] = selection.selected;
1102                }
1103            }
1104        }
1105
1106        let mut selected_addrs = HashSet::new();
1107        for i in 0..intf_count {
1108            if intf_selections[i] {
1109                selected_addrs.insert(interfaces[i].addr.ip());
1110            }
1111        }
1112
1113        selected_addrs
1114    }
1115
1116    /// Apply all selections to `interfaces`.
1117    ///
1118    /// For any interface, add it if selected but not bound yet,
1119    /// delete it if not selected but still bound.
1120    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1121        // By default, we enable all interfaces.
1122        let intf_count = interfaces.len();
1123        let mut intf_selections = vec![true; intf_count];
1124
1125        // apply if_selections
1126        for selection in self.if_selections.iter() {
1127            // Mark the interfaces for this selection.
1128            for i in 0..intf_count {
1129                if selection.if_kind.matches(&interfaces[i]) {
1130                    intf_selections[i] = selection.selected;
1131                }
1132            }
1133        }
1134
1135        // Update `intf_socks` based on the selections.
1136        for (idx, intf) in interfaces.into_iter().enumerate() {
1137            if intf_selections[idx] {
1138                // Add the interface
1139                if !self.intf_socks.contains_key(&intf) {
1140                    debug!("apply_intf_selections: add {:?}", &intf.ip());
1141                    self.add_new_interface(intf);
1142                }
1143            } else {
1144                // Remove the interface
1145                if let Some(mut sock) = self.intf_socks.remove(&intf) {
1146                    match self.poller.registry().deregister(&mut sock) {
1147                        Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1148                        Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1149                    }
1150
1151                    // Remove from poll_ids
1152                    self.poll_ids.retain(|_, v| v != &intf);
1153
1154                    // Remove cache records for this interface.
1155                    self.cache.remove_addrs_on_disabled_intf(&intf);
1156                }
1157            }
1158        }
1159    }
1160
1161    /// Check for IP changes and update intf_socks as needed.
1162    fn check_ip_changes(&mut self) {
1163        // Get the current interfaces.
1164        let my_ifaddrs = my_ip_interfaces(true);
1165
1166        let poll_ids = &mut self.poll_ids;
1167        let poller = &mut self.poller;
1168        // Remove unused sockets in the poller.
1169        let deleted_addrs = self
1170            .intf_socks
1171            .iter_mut()
1172            .filter_map(|(intf, sock)| {
1173                if !my_ifaddrs.contains(intf) {
1174                    if let Err(e) = poller.registry().deregister(sock) {
1175                        debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1176                    }
1177                    // Remove from poll_ids
1178                    poll_ids.retain(|_, v| v != intf);
1179                    Some(intf.ip())
1180                } else {
1181                    None
1182                }
1183            })
1184            .collect::<Vec<IpAddr>>();
1185
1186        // Remove deleted addrs from my services that enabled `addr_auto`.
1187        for ip in deleted_addrs.iter() {
1188            self.del_addr_in_my_services(ip);
1189            self.notify_monitors(DaemonEvent::IpDel(*ip));
1190        }
1191
1192        // Keep the interfaces only if they still exist.
1193        self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1194
1195        // Add newly found interfaces only if in our selections.
1196        self.apply_intf_selections(my_ifaddrs);
1197    }
1198
1199    fn add_new_interface(&mut self, intf: Interface) {
1200        // Bind the new interface.
1201        let new_ip = intf.ip();
1202        let should_loop = if new_ip.is_ipv4() {
1203            self.multicast_loop_v4
1204        } else {
1205            self.multicast_loop_v6
1206        };
1207        let mut sock = match new_socket_bind(&intf, should_loop) {
1208            Ok(s) => s,
1209            Err(e) => {
1210                debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1211                return;
1212            }
1213        };
1214
1215        // Add the new interface into the poller.
1216        let key = self.add_poll(intf.clone());
1217        if let Err(e) =
1218            self.poller
1219                .registry()
1220                .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1221        {
1222            debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1223            return;
1224        }
1225
1226        debug!("add new interface {}: {new_ip}", intf.name);
1227        let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1228            Some(registry) => registry,
1229            None => self
1230                .dns_registry_map
1231                .entry(intf.clone())
1232                .or_insert_with(DnsRegistry::new),
1233        };
1234
1235        for (_, service_info) in self.my_services.iter_mut() {
1236            if service_info.is_addr_auto() {
1237                service_info.insert_ipaddr(new_ip);
1238
1239                if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1240                    debug!(
1241                        "Announce service {} on {}",
1242                        service_info.get_fullname(),
1243                        intf.ip()
1244                    );
1245                    service_info.set_status(&intf, ServiceStatus::Announced);
1246                } else {
1247                    for timer in dns_registry.new_timers.drain(..) {
1248                        self.timers.push(Reverse(timer));
1249                    }
1250                    service_info.set_status(&intf, ServiceStatus::Probing);
1251                }
1252            }
1253        }
1254
1255        self.intf_socks.insert(intf, sock);
1256
1257        // Notify the monitors.
1258        self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1259    }
1260
1261    /// Registers a service.
1262    ///
1263    /// RFC 6762 section 8.3.
1264    /// ...the Multicast DNS responder MUST send
1265    ///    an unsolicited Multicast DNS response containing, in the Answer
1266    ///    Section, all of its newly registered resource records
1267    ///
1268    /// Zeroconf will then respond to requests for information about this service.
1269    fn register_service(&mut self, mut info: ServiceInfo) {
1270        // Check the service name length.
1271        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1272            debug!("check_service_name_length: {}", &e);
1273            self.notify_monitors(DaemonEvent::Error(e));
1274            return;
1275        }
1276
1277        if info.is_addr_auto() {
1278            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1279            for addr in selected_addrs {
1280                info.insert_ipaddr(addr);
1281            }
1282        }
1283
1284        debug!("register service {:?}", &info);
1285
1286        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1287        if !outgoing_addrs.is_empty() {
1288            self.notify_monitors(DaemonEvent::Announce(
1289                info.get_fullname().to_string(),
1290                format!("{:?}", &outgoing_addrs),
1291            ));
1292        }
1293
1294        // The key has to be lower case letter as DNS record name is case insensitive.
1295        // The info will have the original name.
1296        let service_fullname = info.get_fullname().to_lowercase();
1297        self.my_services.insert(service_fullname, info);
1298    }
1299
1300    /// Sends out announcement of `info` on every valid interface.
1301    /// Returns the list of interface IPs that sent out the announcement.
1302    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1303        let mut outgoing_addrs = Vec::new();
1304        // Send the announcement on one interface per ip version.
1305        let mut multicast_sent_trackers = HashSet::new();
1306
1307        let mut outgoing_intfs = Vec::new();
1308
1309        for (intf, sock) in self.intf_socks.iter() {
1310            if let Some(tracker) = multicast_send_tracker(intf) {
1311                if multicast_sent_trackers.contains(&tracker) {
1312                    continue; // No need to send again on the same interface with same ip version.
1313                }
1314            }
1315
1316            let dns_registry = match self.dns_registry_map.get_mut(intf) {
1317                Some(registry) => registry,
1318                None => self
1319                    .dns_registry_map
1320                    .entry(intf.clone())
1321                    .or_insert_with(DnsRegistry::new),
1322            };
1323
1324            if announce_service_on_intf(dns_registry, info, intf, sock) {
1325                if let Some(tracker) = multicast_send_tracker(intf) {
1326                    multicast_sent_trackers.insert(tracker);
1327                }
1328                outgoing_addrs.push(intf.ip());
1329                outgoing_intfs.push(intf.clone());
1330
1331                debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1332
1333                info.set_status(intf, ServiceStatus::Announced);
1334            } else {
1335                for timer in dns_registry.new_timers.drain(..) {
1336                    self.timers.push(Reverse(timer));
1337                }
1338                info.set_status(intf, ServiceStatus::Probing);
1339            }
1340        }
1341
1342        // RFC 6762 section 8.3.
1343        // ..The Multicast DNS responder MUST send at least two unsolicited
1344        //    responses, one second apart.
1345        let next_time = current_time_millis() + 1000;
1346        for intf in outgoing_intfs {
1347            self.add_retransmission(
1348                next_time,
1349                Command::RegisterResend(info.get_fullname().to_string(), intf),
1350            );
1351        }
1352
1353        outgoing_addrs
1354    }
1355
1356    /// Send probings or finish them if expired. Notify waiting services.
1357    fn probing_handler(&mut self) {
1358        let now = current_time_millis();
1359
1360        for (intf, sock) in self.intf_socks.iter() {
1361            let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1362                continue;
1363            };
1364
1365            let mut expired_probe_names = Vec::new();
1366            let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1367
1368            for (name, probe) in dns_registry.probing.iter_mut() {
1369                if now >= probe.next_send {
1370                    if probe.expired(now) {
1371                        // move the record to active
1372                        expired_probe_names.push(name.clone());
1373                    } else {
1374                        out.add_question(name, RRType::ANY);
1375
1376                        /*
1377                        RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
1378                        ...
1379                        for tiebreaking to work correctly in all
1380                        cases, the Authority Section must contain *all* the records and
1381                        proposed rdata being probed for uniqueness.
1382                         */
1383                        for record in probe.records.iter() {
1384                            out.add_authority(record.clone());
1385                        }
1386
1387                        probe.update_next_send(now);
1388
1389                        // add timer
1390                        self.timers.push(Reverse(probe.next_send));
1391                    }
1392                }
1393            }
1394
1395            // send probing.
1396            if !out.questions().is_empty() {
1397                debug!("sending out probing of {} questions", out.questions().len());
1398                send_dns_outgoing(&out, intf, sock);
1399            }
1400
1401            let mut waiting_services = HashSet::new();
1402
1403            for name in expired_probe_names {
1404                let Some(probe) = dns_registry.probing.remove(&name) else {
1405                    continue;
1406                };
1407
1408                // send notifications about name changes
1409                for record in probe.records.iter() {
1410                    if let Some(new_name) = record.get_record().get_new_name() {
1411                        dns_registry
1412                            .name_changes
1413                            .insert(name.clone(), new_name.to_string());
1414
1415                        let event = DnsNameChange {
1416                            original: record.get_record().get_original_name().to_string(),
1417                            new_name: new_name.to_string(),
1418                            rr_type: record.get_type(),
1419                            intf_name: intf.name.to_string(),
1420                        };
1421                        notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1422                    }
1423                }
1424
1425                // move RR from probe to active.
1426                debug!(
1427                    "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1428                    probe.records.len(),
1429                    probe.waiting_services.len(),
1430                );
1431
1432                // Move records to active and plan to wake up services if records are not empty.
1433                if !probe.records.is_empty() {
1434                    match dns_registry.active.get_mut(&name) {
1435                        Some(records) => {
1436                            records.extend(probe.records);
1437                        }
1438                        None => {
1439                            dns_registry.active.insert(name, probe.records);
1440                        }
1441                    }
1442
1443                    waiting_services.extend(probe.waiting_services);
1444                }
1445            }
1446
1447            // wake up services waiting.
1448            for service_name in waiting_services {
1449                debug!(
1450                    "try to announce service {service_name} on intf {}",
1451                    intf.ip()
1452                );
1453                // service names are lowercase
1454                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1455                    if info.get_status(intf) == ServiceStatus::Announced {
1456                        debug!("service {} already announced", info.get_fullname());
1457                        continue;
1458                    }
1459
1460                    if announce_service_on_intf(dns_registry, info, intf, sock) {
1461                        let next_time = now + 1000;
1462                        let command =
1463                            Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1464                        self.retransmissions.push(ReRun { next_time, command });
1465                        self.timers.push(Reverse(next_time));
1466
1467                        let fullname = match dns_registry.name_changes.get(&service_name) {
1468                            Some(new_name) => new_name.to_string(),
1469                            None => service_name.to_string(),
1470                        };
1471
1472                        let mut hostname = info.get_hostname();
1473                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1474                            hostname = new_name;
1475                        }
1476
1477                        debug!("wake up: announce service {} on {}", fullname, intf.ip());
1478                        notify_monitors(
1479                            &mut self.monitors,
1480                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1481                        );
1482
1483                        info.set_status(intf, ServiceStatus::Announced);
1484                    }
1485                }
1486            }
1487        }
1488    }
1489
1490    fn unregister_service(
1491        &self,
1492        info: &ServiceInfo,
1493        intf: &Interface,
1494        sock: &MioUdpSocket,
1495    ) -> Vec<u8> {
1496        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1497        out.add_answer_at_time(
1498            DnsPointer::new(
1499                info.get_type(),
1500                RRType::PTR,
1501                CLASS_IN,
1502                0,
1503                info.get_fullname().to_string(),
1504            ),
1505            0,
1506        );
1507
1508        if let Some(sub) = info.get_subtype() {
1509            trace!("Adding subdomain {}", sub);
1510            out.add_answer_at_time(
1511                DnsPointer::new(
1512                    sub,
1513                    RRType::PTR,
1514                    CLASS_IN,
1515                    0,
1516                    info.get_fullname().to_string(),
1517                ),
1518                0,
1519            );
1520        }
1521
1522        out.add_answer_at_time(
1523            DnsSrv::new(
1524                info.get_fullname(),
1525                CLASS_IN | CLASS_CACHE_FLUSH,
1526                0,
1527                info.get_priority(),
1528                info.get_weight(),
1529                info.get_port(),
1530                info.get_hostname().to_string(),
1531            ),
1532            0,
1533        );
1534        out.add_answer_at_time(
1535            DnsTxt::new(
1536                info.get_fullname(),
1537                CLASS_IN | CLASS_CACHE_FLUSH,
1538                0,
1539                info.generate_txt(),
1540            ),
1541            0,
1542        );
1543
1544        for address in info.get_addrs_on_intf(intf) {
1545            out.add_answer_at_time(
1546                DnsAddress::new(
1547                    info.get_hostname(),
1548                    ip_address_rr_type(&address),
1549                    CLASS_IN | CLASS_CACHE_FLUSH,
1550                    0,
1551                    address,
1552                ),
1553                0,
1554            );
1555        }
1556
1557        // `out` data is non-empty, hence we can do this.
1558        send_dns_outgoing(&out, intf, sock).remove(0)
1559    }
1560
1561    /// Binds a channel `listener` to querying mDNS hostnames.
1562    ///
1563    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1564    fn add_hostname_resolver(
1565        &mut self,
1566        hostname: String,
1567        listener: Sender<HostnameResolutionEvent>,
1568        timeout: Option<u64>,
1569    ) {
1570        let real_timeout = timeout.map(|t| current_time_millis() + t);
1571        self.hostname_resolvers
1572            .insert(hostname, (listener, real_timeout));
1573        if let Some(t) = real_timeout {
1574            self.add_timer(t);
1575        }
1576    }
1577
1578    /// Sends a multicast query for `name` with `qtype`.
1579    fn send_query(&self, name: &str, qtype: RRType) {
1580        self.send_query_vec(&[(name, qtype)]);
1581    }
1582
1583    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1584    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1585        trace!("Sending query questions: {:?}", questions);
1586        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1587        let now = current_time_millis();
1588
1589        for (name, qtype) in questions {
1590            out.add_question(name, *qtype);
1591
1592            for record in self.cache.get_known_answers(name, *qtype, now) {
1593                /*
1594                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1595                ...
1596                    When a Multicast DNS querier sends a query to which it already knows
1597                    some answers, it populates the Answer Section of the DNS query
1598                    message with those answers.
1599                 */
1600                trace!("add known answer: {:?}", record);
1601                let mut new_record = record.clone();
1602                new_record.get_record_mut().update_ttl(now);
1603                out.add_answer_box(new_record);
1604            }
1605        }
1606
1607        // Send the query on one interface per ip version.
1608        let mut multicast_sent_trackers = HashSet::new();
1609        for (intf, sock) in self.intf_socks.iter() {
1610            if let Some(tracker) = multicast_send_tracker(intf) {
1611                if multicast_sent_trackers.contains(&tracker) {
1612                    continue; // no need to send query the same interface with same ip version.
1613                }
1614                multicast_sent_trackers.insert(tracker);
1615            }
1616            send_dns_outgoing(&out, intf, sock);
1617        }
1618    }
1619
1620    /// Reads from the socket of `ip`.
1621    ///
1622    /// Returns false if failed to receive a packet,
1623    /// otherwise returns true.
1624    fn handle_read(&mut self, intf: &Interface) -> bool {
1625        let sock = match self.intf_socks.get_mut(intf) {
1626            Some(if_sock) => if_sock,
1627            None => return false,
1628        };
1629        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1630
1631        // Read the next mDNS UDP datagram.
1632        //
1633        // If the datagram is larger than `buf`, excess bytes may or may not
1634        // be truncated by the socket layer depending on the platform's libc.
1635        // In any case, such large datagram will not be decoded properly and
1636        // this function should return false but should not crash.
1637        let sz = match sock.recv(&mut buf) {
1638            Ok(sz) => sz,
1639            Err(e) => {
1640                if e.kind() != std::io::ErrorKind::WouldBlock {
1641                    debug!("listening socket read failed: {}", e);
1642                }
1643                return false;
1644            }
1645        };
1646
1647        trace!("received {} bytes at IP: {}", sz, intf.ip());
1648
1649        // If sz is 0, it means sock reached End-of-File.
1650        if sz == 0 {
1651            debug!("socket {:?} was likely shutdown", &sock);
1652            if let Err(e) = self.poller.registry().deregister(sock) {
1653                debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1654            }
1655
1656            // Replace the closed socket with a new one.
1657            let should_loop = if intf.ip().is_ipv4() {
1658                self.multicast_loop_v4
1659            } else {
1660                self.multicast_loop_v6
1661            };
1662            match new_socket_bind(intf, should_loop) {
1663                Ok(new_sock) => {
1664                    trace!("reset socket for IP {}", intf.ip());
1665                    self.intf_socks.insert(intf.clone(), new_sock);
1666                }
1667                Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1668            }
1669
1670            return false;
1671        }
1672
1673        buf.truncate(sz); // reduce potential processing errors
1674
1675        match DnsIncoming::new(buf) {
1676            Ok(msg) => {
1677                if msg.is_query() {
1678                    self.handle_query(msg, intf);
1679                } else if msg.is_response() {
1680                    self.handle_response(msg, intf);
1681                } else {
1682                    debug!("Invalid message: not query and not response");
1683                }
1684            }
1685            Err(e) => debug!("Invalid incoming DNS message: {}", e),
1686        }
1687
1688        true
1689    }
1690
1691    /// Returns true, if sent query. Returns false if SRV already exists.
1692    fn query_unresolved(&mut self, instance: &str) -> bool {
1693        if !valid_instance_name(instance) {
1694            trace!("instance name {} not valid", instance);
1695            return false;
1696        }
1697
1698        if let Some(records) = self.cache.get_srv(instance) {
1699            for record in records {
1700                if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1701                    if self.cache.get_addr(srv.host()).is_none() {
1702                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1703                        return true;
1704                    }
1705                }
1706            }
1707        } else {
1708            self.send_query(instance, RRType::ANY);
1709            return true;
1710        }
1711
1712        false
1713    }
1714
1715    /// Checks if `ty_domain` has records in the cache. If yes, sends the
1716    /// cached records via `sender`.
1717    fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1718        let mut resolved: HashSet<String> = HashSet::new();
1719        let mut unresolved: HashSet<String> = HashSet::new();
1720
1721        if let Some(records) = self.cache.get_ptr(ty_domain) {
1722            for record in records.iter() {
1723                if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1724                    let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1725                        Ok(ok) => ok,
1726                        Err(err) => {
1727                            debug!("Error while creating service info from cache: {}", err);
1728                            continue;
1729                        }
1730                    };
1731
1732                    match sender.send(ServiceEvent::ServiceFound(
1733                        ty_domain.to_string(),
1734                        ptr.alias().to_string(),
1735                    )) {
1736                        Ok(()) => debug!("send service found {}", ptr.alias()),
1737                        Err(e) => {
1738                            debug!("failed to send service found: {}", e);
1739                            continue;
1740                        }
1741                    }
1742
1743                    if info.is_ready() {
1744                        resolved.insert(ptr.alias().to_string());
1745                        match sender.send(ServiceEvent::ServiceResolved(info)) {
1746                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1747                            Err(e) => debug!("failed to send service resolved: {}", e),
1748                        }
1749                    } else {
1750                        unresolved.insert(ptr.alias().to_string());
1751                    }
1752                }
1753            }
1754        }
1755
1756        for instance in resolved.drain() {
1757            self.pending_resolves.remove(&instance);
1758            self.resolved.insert(instance);
1759        }
1760
1761        for instance in unresolved.drain() {
1762            self.add_pending_resolve(instance);
1763        }
1764    }
1765
1766    /// Checks if `hostname` has records in the cache. If yes, sends the
1767    /// cached records via `sender`.
1768    fn query_cache_for_hostname(
1769        &mut self,
1770        hostname: &str,
1771        sender: Sender<HostnameResolutionEvent>,
1772    ) {
1773        let addresses = self.cache.get_addresses_for_host(hostname);
1774        if !addresses.is_empty() {
1775            match sender.send(HostnameResolutionEvent::AddressesFound(
1776                hostname.to_string(),
1777                addresses,
1778            )) {
1779                Ok(()) => trace!("sent hostname addresses found"),
1780                Err(e) => debug!("failed to send hostname addresses found: {}", e),
1781            }
1782        }
1783    }
1784
1785    fn add_pending_resolve(&mut self, instance: String) {
1786        if !self.pending_resolves.contains(&instance) {
1787            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1788            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1789            self.pending_resolves.insert(instance);
1790        }
1791    }
1792
1793    fn create_service_info_from_cache(
1794        &self,
1795        ty_domain: &str,
1796        fullname: &str,
1797    ) -> Result<ServiceInfo> {
1798        let my_name = {
1799            let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1800            name.strip_suffix('.').unwrap_or(name).to_string()
1801        };
1802
1803        let now = current_time_millis();
1804        let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1805
1806        // Be sure setting `subtype` if available even when querying for the parent domain.
1807        if let Some(subtype) = self.cache.get_subtype(fullname) {
1808            trace!(
1809                "ty_domain: {} found subtype {} for instance: {}",
1810                ty_domain,
1811                subtype,
1812                fullname
1813            );
1814            if info.get_subtype().is_none() {
1815                info.set_subtype(subtype.clone());
1816            }
1817        }
1818
1819        // resolve SRV record
1820        if let Some(records) = self.cache.get_srv(fullname) {
1821            if let Some(answer) = records.first() {
1822                if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1823                    info.set_hostname(dns_srv.host().to_string());
1824                    info.set_port(dns_srv.port());
1825                }
1826            }
1827        }
1828
1829        // resolve TXT record
1830        if let Some(records) = self.cache.get_txt(fullname) {
1831            if let Some(record) = records.first() {
1832                if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1833                    info.set_properties_from_txt(dns_txt.text());
1834                }
1835            }
1836        }
1837
1838        // resolve A and AAAA records
1839        if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1840            for answer in records.iter() {
1841                if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1842                    if dns_a.get_record().is_expired(now) {
1843                        trace!("Addr expired: {}", dns_a.address());
1844                    } else {
1845                        info.insert_ipaddr(dns_a.address());
1846                    }
1847                }
1848            }
1849        }
1850
1851        Ok(info)
1852    }
1853
1854    fn handle_poller_events(&mut self, events: &mio::Events) {
1855        for ev in events.iter() {
1856            trace!("event received with key {:?}", ev.token());
1857            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1858                // Drain signals as we will drain commands as well.
1859                self.signal_sock_drain();
1860
1861                if let Err(e) = self.poller.registry().reregister(
1862                    &mut self.signal_sock,
1863                    ev.token(),
1864                    mio::Interest::READABLE,
1865                ) {
1866                    debug!("failed to modify poller for signal socket: {}", e);
1867                }
1868                continue; // Next event.
1869            }
1870
1871            // Read until no more packets available.
1872            let intf = match self.poll_ids.get(&ev.token().0) {
1873                Some(interface) => interface.clone(),
1874                None => {
1875                    debug!("Ip for event key {} not found", ev.token().0);
1876                    break;
1877                }
1878            };
1879            while self.handle_read(&intf) {}
1880
1881            // we continue to monitor this socket.
1882            if let Some(sock) = self.intf_socks.get_mut(&intf) {
1883                if let Err(e) =
1884                    self.poller
1885                        .registry()
1886                        .reregister(sock, ev.token(), mio::Interest::READABLE)
1887                {
1888                    debug!("modify poller for interface {:?}: {}", &intf, e);
1889                    break;
1890                }
1891            }
1892        }
1893    }
1894
1895    /// Deal with incoming response packets.  All answers
1896    /// are held in the cache, and listeners are notified.
1897    fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1898        trace!(
1899            "handle_response: {} answers {} authorities {} additionals",
1900            msg.answers().len(),
1901            &msg.authorities().len(),
1902            &msg.num_additionals()
1903        );
1904        let now = current_time_millis();
1905
1906        // remove records that are expired.
1907        let mut record_predicate = |record: &DnsRecordBox| {
1908            if !record.get_record().is_expired(now) {
1909                return true;
1910            }
1911
1912            debug!("record is expired, removing it from cache.");
1913            if self.cache.remove(record) {
1914                // for PTR records, send event to listeners
1915                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1916                    call_service_listener(
1917                        &self.service_queriers,
1918                        dns_ptr.get_name(),
1919                        ServiceEvent::ServiceRemoved(
1920                            dns_ptr.get_name().to_string(),
1921                            dns_ptr.alias().to_string(),
1922                        ),
1923                    );
1924                }
1925            }
1926            false
1927        };
1928        msg.answers_mut().retain(&mut record_predicate);
1929        msg.authorities_mut().retain(&mut record_predicate);
1930        msg.additionals_mut().retain(&mut record_predicate);
1931
1932        // check possible conflicts and handle them.
1933        self.conflict_handler(&msg, intf);
1934
1935        /// Represents a DNS record change that involves one service instance.
1936        struct InstanceChange {
1937            ty: RRType,   // The type of DNS record for the instance.
1938            name: String, // The name of the record.
1939        }
1940
1941        // Go through all answers to get the new and updated records.
1942        // For new PTR records, send out ServiceFound immediately. For others,
1943        // collect them into `changes`.
1944        //
1945        // Note: we don't try to identify the update instances based on
1946        // each record immediately as the answers are likely related to each
1947        // other.
1948        let mut changes = Vec::new();
1949        let mut timers = Vec::new();
1950        for record in msg.all_records() {
1951            match self.cache.add_or_update(intf, record, &mut timers) {
1952                Some((dns_record, true)) => {
1953                    timers.push(dns_record.get_record().get_expire_time());
1954                    timers.push(dns_record.get_record().get_refresh_time());
1955
1956                    let ty = dns_record.get_type();
1957                    let name = dns_record.get_name();
1958                    if ty == RRType::PTR {
1959                        if self.service_queriers.contains_key(name) {
1960                            timers.push(dns_record.get_record().get_refresh_time());
1961                        }
1962
1963                        // send ServiceFound
1964                        if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
1965                            call_service_listener(
1966                                &self.service_queriers,
1967                                name,
1968                                ServiceEvent::ServiceFound(
1969                                    name.to_string(),
1970                                    dns_ptr.alias().to_string(),
1971                                ),
1972                            );
1973                            changes.push(InstanceChange {
1974                                ty,
1975                                name: dns_ptr.alias().to_string(),
1976                            });
1977                        }
1978                    } else {
1979                        changes.push(InstanceChange {
1980                            ty,
1981                            name: name.to_string(),
1982                        });
1983                    }
1984                }
1985                Some((dns_record, false)) => {
1986                    timers.push(dns_record.get_record().get_expire_time());
1987                    timers.push(dns_record.get_record().get_refresh_time());
1988                }
1989                _ => {}
1990            }
1991        }
1992
1993        // Add timers for the new records.
1994        for t in timers {
1995            self.add_timer(t);
1996        }
1997
1998        // Go through remaining changes to see if any hostname resolutions were found or updated.
1999        changes
2000            .iter()
2001            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2002            .map(|change| change.name.clone())
2003            .collect::<HashSet<String>>()
2004            .iter()
2005            .map(|hostname| (hostname, self.cache.get_addresses_for_host(hostname)))
2006            .for_each(|(hostname, addresses)| {
2007                call_hostname_resolution_listener(
2008                    &self.hostname_resolvers,
2009                    hostname,
2010                    HostnameResolutionEvent::AddressesFound(hostname.to_string(), addresses),
2011                )
2012            });
2013
2014        // Identify the instances that need to be "resolved".
2015        let mut updated_instances = HashSet::new();
2016        for update in changes {
2017            match update.ty {
2018                RRType::PTR | RRType::SRV | RRType::TXT => {
2019                    updated_instances.insert(update.name);
2020                }
2021                RRType::A | RRType::AAAA => {
2022                    let instances = self.cache.get_instances_on_host(&update.name);
2023                    updated_instances.extend(instances);
2024                }
2025                _ => {}
2026            }
2027        }
2028
2029        self.resolve_updated_instances(&updated_instances);
2030    }
2031
2032    fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2033        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2034            return;
2035        };
2036
2037        for answer in msg.answers().iter() {
2038            let mut new_records = Vec::new();
2039
2040            let name = answer.get_name();
2041            let Some(probe) = dns_registry.probing.get_mut(name) else {
2042                continue;
2043            };
2044
2045            // check against possible multicast forwarding
2046            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2047                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2048                    if !valid_ip_on_intf(&answer_addr.address(), intf) {
2049                        debug!(
2050                            "conflict handler: answer addr {:?} not in the subnet of {:?}",
2051                            answer_addr, intf
2052                        );
2053                        continue;
2054                    }
2055                }
2056
2057                // double check if any other address record matches rrdata,
2058                // as there could be multiple addresses for the same name.
2059                let any_match = probe.records.iter().any(|r| {
2060                    r.get_type() == answer.get_type()
2061                        && r.get_class() == answer.get_class()
2062                        && r.rrdata_match(answer.as_ref())
2063                });
2064                if any_match {
2065                    continue; // no conflict for this answer.
2066                }
2067            }
2068
2069            probe.records.retain(|record| {
2070                if record.get_type() == answer.get_type()
2071                    && record.get_class() == answer.get_class()
2072                    && !record.rrdata_match(answer.as_ref())
2073                {
2074                    debug!(
2075                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2076                        record.get_type(),
2077                        record.rdata_print(),
2078                        answer.rdata_print()
2079                    );
2080
2081                    // create a new name for this record
2082                    // then remove the old record in probing.
2083                    let mut new_record = record.clone();
2084                    let new_name = match record.get_type() {
2085                        RRType::A => hostname_change(name),
2086                        RRType::AAAA => hostname_change(name),
2087                        _ => name_change(name),
2088                    };
2089                    new_record.get_record_mut().set_new_name(new_name);
2090                    new_records.push(new_record);
2091                    return false; // old record is dropped from the probe.
2092                }
2093
2094                true
2095            });
2096
2097            // ?????
2098            // if probe.records.is_empty() {
2099            //     dns_registry.probing.remove(name);
2100            // }
2101
2102            // Probing again with the new names.
2103            let create_time = current_time_millis() + fastrand::u64(0..250);
2104
2105            let waiting_services = probe.waiting_services.clone();
2106
2107            for record in new_records {
2108                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2109                    self.timers.push(Reverse(create_time));
2110                }
2111
2112                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2113                dns_registry.name_changes.insert(
2114                    record.get_record().get_original_name().to_string(),
2115                    record.get_name().to_string(),
2116                );
2117
2118                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2119                    Some(p) => p,
2120                    None => {
2121                        let new_probe = dns_registry
2122                            .probing
2123                            .entry(record.get_name().to_string())
2124                            .or_insert_with(|| {
2125                                debug!("conflict handler: new probe of {}", record.get_name());
2126                                Probe::new(create_time)
2127                            });
2128                        self.timers.push(Reverse(new_probe.next_send));
2129                        new_probe
2130                    }
2131                };
2132
2133                debug!(
2134                    "insert record with new name '{}' {} into probe",
2135                    record.get_name(),
2136                    record.get_type()
2137                );
2138                new_probe.insert_record(record);
2139
2140                new_probe.waiting_services.extend(waiting_services.clone());
2141            }
2142        }
2143    }
2144
2145    /// Resolve the updated (including new) instances.
2146    ///
2147    /// Note: it is possible that more than 1 PTR pointing to the same
2148    /// instance. For example, a regular service type PTR and a sub-type
2149    /// service type PTR can both point to the same service instance.
2150    /// This loop automatically handles the sub-type PTRs.
2151    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2152        let mut resolved: HashSet<String> = HashSet::new();
2153        let mut unresolved: HashSet<String> = HashSet::new();
2154        let mut removed_instances = HashMap::new();
2155
2156        for (ty_domain, records) in self.cache.all_ptr().iter() {
2157            if !self.service_queriers.contains_key(ty_domain) {
2158                // No need to resolve if not in our queries.
2159                continue;
2160            }
2161
2162            for record in records.iter() {
2163                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2164                    if updated_instances.contains(dns_ptr.alias()) {
2165                        if let Ok(info) =
2166                            self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2167                        {
2168                            if info.is_ready() {
2169                                debug!("call queriers to resolve {}", dns_ptr.alias());
2170                                resolved.insert(dns_ptr.alias().to_string());
2171                                call_service_listener(
2172                                    &self.service_queriers,
2173                                    ty_domain,
2174                                    ServiceEvent::ServiceResolved(info),
2175                                );
2176                            } else {
2177                                if self.resolved.remove(dns_ptr.alias()) {
2178                                    removed_instances
2179                                        .entry(ty_domain.to_string())
2180                                        .or_insert_with(HashSet::new)
2181                                        .insert(dns_ptr.alias().to_string());
2182                                }
2183                                unresolved.insert(dns_ptr.alias().to_string());
2184                            }
2185                        }
2186                    }
2187                }
2188            }
2189        }
2190
2191        for instance in resolved.drain() {
2192            self.pending_resolves.remove(&instance);
2193            self.resolved.insert(instance);
2194        }
2195
2196        for instance in unresolved.drain() {
2197            self.add_pending_resolve(instance);
2198        }
2199
2200        self.notify_service_removal(removed_instances);
2201    }
2202
2203    /// Handle incoming query packets, figure out whether and what to respond.
2204    fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2205        let sock = match self.intf_socks.get(intf) {
2206            Some(sock) => sock,
2207            None => return,
2208        };
2209        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2210
2211        // Special meta-query "_services._dns-sd._udp.<Domain>".
2212        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2213        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2214
2215        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2216            debug!("missing dns registry for intf {}", intf.ip());
2217            return;
2218        };
2219
2220        for question in msg.questions().iter() {
2221            trace!("query question: {:?}", &question);
2222
2223            let qtype = question.entry_type();
2224
2225            if qtype == RRType::PTR {
2226                for service in self.my_services.values() {
2227                    if service.get_status(intf) != ServiceStatus::Announced {
2228                        continue;
2229                    }
2230
2231                    if question.entry_name() == service.get_type()
2232                        || service
2233                            .get_subtype()
2234                            .as_ref()
2235                            .is_some_and(|v| v == question.entry_name())
2236                    {
2237                        add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2238                    } else if question.entry_name() == META_QUERY {
2239                        let ptr_added = out.add_answer(
2240                            &msg,
2241                            DnsPointer::new(
2242                                question.entry_name(),
2243                                RRType::PTR,
2244                                CLASS_IN,
2245                                service.get_other_ttl(),
2246                                service.get_type().to_string(),
2247                            ),
2248                        );
2249                        if !ptr_added {
2250                            trace!("answer was not added for meta-query {:?}", &question);
2251                        }
2252                    }
2253                }
2254            } else {
2255                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2256                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2257                    let probe_name = question.entry_name();
2258
2259                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2260                        let now = current_time_millis();
2261
2262                        // Only do tiebreaking if probe already started.
2263                        // This check also helps avoid redo tiebreaking if start time
2264                        // was postponed.
2265                        if probe.start_time < now {
2266                            let incoming_records: Vec<_> = msg
2267                                .authorities()
2268                                .iter()
2269                                .filter(|r| r.get_name() == probe_name)
2270                                .collect();
2271
2272                            /*
2273                            RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
2274                            ...
2275                            if the host finds that its own data is lexicographically later, it
2276                            simply ignores the other host's probe.  If the host finds that its
2277                            own data is lexicographically earlier, then it defers to the winning
2278                            host by waiting one second, and then begins probing for this record
2279                            again.
2280                            */
2281                            match probe.tiebreaking(&incoming_records) {
2282                                cmp::Ordering::Less => {
2283                                    debug!(
2284                                        "tiebreaking '{}': LOST, will wait for one second",
2285                                        probe_name
2286                                    );
2287                                    probe.start_time = now + 1000; // wait and restart.
2288                                    probe.next_send = now + 1000;
2289                                }
2290                                ordering => {
2291                                    debug!("tiebreaking '{}': {:?}", probe_name, ordering);
2292                                }
2293                            }
2294                        }
2295                    }
2296                }
2297
2298                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2299                    for service in self.my_services.values() {
2300                        if service.get_status(intf) != ServiceStatus::Announced {
2301                            continue;
2302                        }
2303
2304                        let service_hostname =
2305                            match dns_registry.name_changes.get(service.get_hostname()) {
2306                                Some(new_name) => new_name,
2307                                None => service.get_hostname(),
2308                            };
2309
2310                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2311                            let intf_addrs = service.get_addrs_on_intf(intf);
2312                            if intf_addrs.is_empty()
2313                                && (qtype == RRType::A || qtype == RRType::AAAA)
2314                            {
2315                                let t = match qtype {
2316                                    RRType::A => "TYPE_A",
2317                                    RRType::AAAA => "TYPE_AAAA",
2318                                    _ => "invalid_type",
2319                                };
2320                                trace!(
2321                                    "Cannot find valid addrs for {} response on intf {:?}",
2322                                    t,
2323                                    &intf
2324                                );
2325                                return;
2326                            }
2327                            for address in intf_addrs {
2328                                out.add_answer(
2329                                    &msg,
2330                                    DnsAddress::new(
2331                                        question.entry_name(),
2332                                        ip_address_rr_type(&address),
2333                                        CLASS_IN | CLASS_CACHE_FLUSH,
2334                                        service.get_host_ttl(),
2335                                        address,
2336                                    ),
2337                                );
2338                            }
2339                        }
2340                    }
2341                }
2342
2343                let query_name = question.entry_name().to_lowercase();
2344                let service_opt = self
2345                    .my_services
2346                    .iter()
2347                    .find(|(k, _v)| {
2348                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2349                            Some(new_name) => new_name,
2350                            None => k,
2351                        };
2352                        service_name == &query_name
2353                    })
2354                    .map(|(_, v)| v);
2355
2356                let Some(service) = service_opt else {
2357                    continue;
2358                };
2359
2360                if service.get_status(intf) != ServiceStatus::Announced {
2361                    continue;
2362                }
2363
2364                if qtype == RRType::SRV || qtype == RRType::ANY {
2365                    out.add_answer(
2366                        &msg,
2367                        DnsSrv::new(
2368                            question.entry_name(),
2369                            CLASS_IN | CLASS_CACHE_FLUSH,
2370                            service.get_host_ttl(),
2371                            service.get_priority(),
2372                            service.get_weight(),
2373                            service.get_port(),
2374                            service.get_hostname().to_string(),
2375                        ),
2376                    );
2377                }
2378
2379                if qtype == RRType::TXT || qtype == RRType::ANY {
2380                    out.add_answer(
2381                        &msg,
2382                        DnsTxt::new(
2383                            question.entry_name(),
2384                            CLASS_IN | CLASS_CACHE_FLUSH,
2385                            service.get_host_ttl(),
2386                            service.generate_txt(),
2387                        ),
2388                    );
2389                }
2390
2391                if qtype == RRType::SRV {
2392                    let intf_addrs = service.get_addrs_on_intf(intf);
2393                    if intf_addrs.is_empty() {
2394                        debug!(
2395                            "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2396                            &intf
2397                        );
2398                        return;
2399                    }
2400                    for address in intf_addrs {
2401                        out.add_additional_answer(DnsAddress::new(
2402                            service.get_hostname(),
2403                            ip_address_rr_type(&address),
2404                            CLASS_IN | CLASS_CACHE_FLUSH,
2405                            service.get_host_ttl(),
2406                            address,
2407                        ));
2408                    }
2409                }
2410            }
2411        }
2412
2413        if !out.answers_count() > 0 {
2414            out.set_id(msg.id());
2415            send_dns_outgoing(&out, intf, sock);
2416
2417            self.increase_counter(Counter::Respond, 1);
2418            self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2419        }
2420
2421        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2422    }
2423
2424    /// Increases the value of `counter` by `count`.
2425    fn increase_counter(&mut self, counter: Counter, count: i64) {
2426        let key = counter.to_string();
2427        match self.counters.get_mut(&key) {
2428            Some(v) => *v += count,
2429            None => {
2430                self.counters.insert(key, count);
2431            }
2432        }
2433    }
2434
2435    fn signal_sock_drain(&self) {
2436        let mut signal_buf = [0; 1024];
2437
2438        // This recv is non-blocking as the socket is non-blocking.
2439        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2440            trace!(
2441                "signal socket recvd: {}",
2442                String::from_utf8_lossy(&signal_buf[0..sz])
2443            );
2444        }
2445    }
2446
2447    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2448        self.retransmissions.push(ReRun { next_time, command });
2449        self.add_timer(next_time);
2450    }
2451
2452    /// Sends service removal event to listeners for expired service records.
2453    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2454        for (ty_domain, sender) in self.service_queriers.iter() {
2455            if let Some(instances) = expired.get(ty_domain) {
2456                for instance_name in instances {
2457                    let event = ServiceEvent::ServiceRemoved(
2458                        ty_domain.to_string(),
2459                        instance_name.to_string(),
2460                    );
2461                    match sender.send(event) {
2462                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2463                        Err(e) => debug!("Failed to send event: {}", e),
2464                    }
2465                }
2466            }
2467        }
2468    }
2469
2470    /// The entry point that executes all commands received by the daemon.
2471    ///
2472    /// `repeating`: whether this is a retransmission.
2473    fn exec_command(&mut self, command: Command, repeating: bool) {
2474        match command {
2475            Command::Browse(ty, next_delay, listener) => {
2476                self.exec_command_browse(repeating, ty, next_delay, listener);
2477            }
2478
2479            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2480                self.exec_command_resolve_hostname(
2481                    repeating, hostname, next_delay, listener, timeout,
2482                );
2483            }
2484
2485            Command::Register(service_info) => {
2486                self.register_service(service_info);
2487                self.increase_counter(Counter::Register, 1);
2488            }
2489
2490            Command::RegisterResend(fullname, intf) => {
2491                trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2492                self.exec_command_register_resend(fullname, intf);
2493            }
2494
2495            Command::Unregister(fullname, resp_s) => {
2496                trace!("unregister service {} repeat {}", &fullname, &repeating);
2497                self.exec_command_unregister(repeating, fullname, resp_s);
2498            }
2499
2500            Command::UnregisterResend(packet, ip) => {
2501                self.exec_command_unregister_resend(packet, ip);
2502            }
2503
2504            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2505
2506            Command::StopResolveHostname(hostname) => {
2507                self.exec_command_stop_resolve_hostname(hostname)
2508            }
2509
2510            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2511
2512            Command::GetMetrics(resp_s) => match resp_s.send(self.counters.clone()) {
2513                Ok(()) => trace!("Sent metrics to the client"),
2514                Err(e) => debug!("Failed to send metrics: {}", e),
2515            },
2516
2517            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2518                Ok(()) => trace!("Sent status to the client"),
2519                Err(e) => debug!("Failed to send status: {}", e),
2520            },
2521
2522            Command::Monitor(resp_s) => {
2523                self.monitors.push(resp_s);
2524            }
2525
2526            Command::SetOption(daemon_opt) => {
2527                self.process_set_option(daemon_opt);
2528            }
2529
2530            Command::Verify(instance_fullname, timeout) => {
2531                self.exec_command_verify(instance_fullname, timeout, repeating);
2532            }
2533
2534            _ => {
2535                debug!("unexpected command: {:?}", &command);
2536            }
2537        }
2538    }
2539
2540    fn exec_command_browse(
2541        &mut self,
2542        repeating: bool,
2543        ty: String,
2544        next_delay: u32,
2545        listener: Sender<ServiceEvent>,
2546    ) {
2547        let pretty_addrs: Vec<String> = self
2548            .intf_socks
2549            .keys()
2550            .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2551            .collect();
2552
2553        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2554            "{ty} on {} interfaces [{}]",
2555            pretty_addrs.len(),
2556            pretty_addrs.join(", ")
2557        ))) {
2558            debug!(
2559                "Failed to send SearchStarted({})(repeating:{}): {}",
2560                &ty, repeating, e
2561            );
2562            return;
2563        }
2564        if !repeating {
2565            // Binds a `listener` to querying mDNS domain type `ty`.
2566            //
2567            // If there is already a `listener`, it will be updated, i.e. overwritten.
2568            self.service_queriers.insert(ty.clone(), listener.clone());
2569
2570            // if we already have the records in our cache, just send them
2571            self.query_cache_for_service(&ty, &listener);
2572        }
2573
2574        self.send_query(&ty, RRType::PTR);
2575        self.increase_counter(Counter::Browse, 1);
2576
2577        let next_time = current_time_millis() + (next_delay * 1000) as u64;
2578        let max_delay = 60 * 60;
2579        let delay = cmp::min(next_delay * 2, max_delay);
2580        self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2581    }
2582
2583    fn exec_command_resolve_hostname(
2584        &mut self,
2585        repeating: bool,
2586        hostname: String,
2587        next_delay: u32,
2588        listener: Sender<HostnameResolutionEvent>,
2589        timeout: Option<u64>,
2590    ) {
2591        let addr_list: Vec<_> = self.intf_socks.keys().collect();
2592        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2593            "{} on addrs {:?}",
2594            &hostname, &addr_list
2595        ))) {
2596            debug!(
2597                "Failed to send ResolveStarted({})(repeating:{}): {}",
2598                &hostname, repeating, e
2599            );
2600            return;
2601        }
2602        if !repeating {
2603            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2604            // if we already have the records in our cache, just send them
2605            self.query_cache_for_hostname(&hostname, listener.clone());
2606        }
2607
2608        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2609        self.increase_counter(Counter::ResolveHostname, 1);
2610
2611        let now = current_time_millis();
2612        let next_time = now + u64::from(next_delay) * 1000;
2613        let max_delay = 60 * 60;
2614        let delay = cmp::min(next_delay * 2, max_delay);
2615
2616        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
2617        if self
2618            .hostname_resolvers
2619            .get(&hostname)
2620            .and_then(|(_sender, timeout)| *timeout)
2621            .map(|timeout| next_time < timeout)
2622            .unwrap_or(true)
2623        {
2624            self.add_retransmission(
2625                next_time,
2626                Command::ResolveHostname(hostname, delay, listener, None),
2627            );
2628        }
2629    }
2630
2631    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2632        let pending_query = self.query_unresolved(&instance);
2633        let max_try = 3;
2634        if pending_query && try_count < max_try {
2635            // Note that if the current try already succeeds, the next retransmission
2636            // will be no-op as the cache has been updated.
2637            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2638            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2639        }
2640    }
2641
2642    fn exec_command_unregister(
2643        &mut self,
2644        repeating: bool,
2645        fullname: String,
2646        resp_s: Sender<UnregisterStatus>,
2647    ) {
2648        let response = match self.my_services.remove_entry(&fullname) {
2649            None => {
2650                debug!("unregister: cannot find such service {}", &fullname);
2651                UnregisterStatus::NotFound
2652            }
2653            Some((_k, info)) => {
2654                let mut timers = Vec::new();
2655                // Send one unregister per interface and ip version
2656                let mut multicast_sent_trackers = HashSet::new();
2657
2658                for (intf, sock) in self.intf_socks.iter() {
2659                    if let Some(tracker) = multicast_send_tracker(intf) {
2660                        if multicast_sent_trackers.contains(&tracker) {
2661                            continue; // no need to send unregister the same interface with same ip version.
2662                        }
2663                        multicast_sent_trackers.insert(tracker);
2664                    }
2665                    let packet = self.unregister_service(&info, intf, sock);
2666                    // repeat for one time just in case some peers miss the message
2667                    if !repeating && !packet.is_empty() {
2668                        let next_time = current_time_millis() + 120;
2669                        self.retransmissions.push(ReRun {
2670                            next_time,
2671                            command: Command::UnregisterResend(packet, intf.clone()),
2672                        });
2673                        timers.push(next_time);
2674                    }
2675                }
2676
2677                for t in timers {
2678                    self.add_timer(t);
2679                }
2680
2681                self.increase_counter(Counter::Unregister, 1);
2682                UnregisterStatus::OK
2683            }
2684        };
2685        if let Err(e) = resp_s.send(response) {
2686            debug!("unregister: failed to send response: {}", e);
2687        }
2688    }
2689
2690    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2691        if let Some(sock) = self.intf_socks.get(&intf) {
2692            debug!("UnregisterResend from {}", &intf.ip());
2693            multicast_on_intf(&packet[..], &intf, sock);
2694            self.increase_counter(Counter::UnregisterResend, 1);
2695        }
2696    }
2697
2698    fn exec_command_stop_browse(&mut self, ty_domain: String) {
2699        match self.service_queriers.remove_entry(&ty_domain) {
2700            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2701            Some((ty, sender)) => {
2702                // Remove pending browse commands in the reruns.
2703                trace!("StopBrowse: removed queryer for {}", &ty);
2704                let mut i = 0;
2705                while i < self.retransmissions.len() {
2706                    if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2707                        if t == &ty {
2708                            self.retransmissions.remove(i);
2709                            trace!("StopBrowse: removed retransmission for {}", &ty);
2710                            continue;
2711                        }
2712                    }
2713                    i += 1;
2714                }
2715
2716                // Notify the client.
2717                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2718                    Ok(()) => trace!("Sent SearchStopped to the listener"),
2719                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
2720                }
2721            }
2722        }
2723    }
2724
2725    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2726        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2727            // Remove pending resolve commands in the reruns.
2728            trace!("StopResolve: removed queryer for {}", &host);
2729            let mut i = 0;
2730            while i < self.retransmissions.len() {
2731                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2732                    if t == &host {
2733                        self.retransmissions.remove(i);
2734                        trace!("StopResolve: removed retransmission for {}", &host);
2735                        continue;
2736                    }
2737                }
2738                i += 1;
2739            }
2740
2741            // Notify the client.
2742            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2743                Ok(()) => trace!("Sent SearchStopped to the listener"),
2744                Err(e) => debug!("Failed to send SearchStopped: {}", e),
2745            }
2746        }
2747    }
2748
2749    fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2750        let Some(info) = self.my_services.get_mut(&fullname) else {
2751            trace!("announce: cannot find such service {}", &fullname);
2752            return;
2753        };
2754
2755        let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2756            return;
2757        };
2758
2759        let Some(sock) = self.intf_socks.get(&intf) else {
2760            return;
2761        };
2762
2763        if announce_service_on_intf(dns_registry, info, &intf, sock) {
2764            let mut hostname = info.get_hostname();
2765            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2766                hostname = new_name;
2767            }
2768            let service_name = match dns_registry.name_changes.get(&fullname) {
2769                Some(new_name) => new_name.to_string(),
2770                None => fullname,
2771            };
2772
2773            debug!("resend: announce service {} on {}", service_name, intf.ip());
2774
2775            notify_monitors(
2776                &mut self.monitors,
2777                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2778            );
2779            info.set_status(&intf, ServiceStatus::Announced);
2780        } else {
2781            debug!("register-resend should not fail");
2782        }
2783
2784        self.increase_counter(Counter::RegisterResend, 1);
2785    }
2786
2787    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2788        /*
2789        RFC 6762 section 10.4:
2790        ...
2791        When the cache receives this hint that it should reconfirm some
2792        record, it MUST issue two or more queries for the resource record in
2793        dispute.  If no response is received within ten seconds, then, even
2794        though its TTL may indicate that it is not yet due to expire, that
2795        record SHOULD be promptly flushed from the cache.
2796        */
2797        let now = current_time_millis();
2798        let expire_at = if repeating {
2799            None
2800        } else {
2801            Some(now + timeout.as_millis() as u64)
2802        };
2803
2804        // send query for the resource records.
2805        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2806
2807        if !record_vec.is_empty() {
2808            let query_vec: Vec<(&str, RRType)> = record_vec
2809                .iter()
2810                .map(|(record, rr_type)| (record.as_str(), *rr_type))
2811                .collect();
2812            self.send_query_vec(&query_vec);
2813
2814            if let Some(new_expire) = expire_at {
2815                self.add_timer(new_expire); // ensure a check for the new expire time.
2816
2817                // schedule a resend 1 second later
2818                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2819            }
2820        }
2821    }
2822
2823    /// Refresh cached service records with active queriers
2824    fn refresh_active_services(&mut self) {
2825        let mut query_ptr_count = 0;
2826        let mut query_srv_count = 0;
2827        let mut new_timers = HashSet::new();
2828        let mut query_addr_count = 0;
2829
2830        for (ty_domain, _sender) in self.service_queriers.iter() {
2831            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2832            if !refreshed_timers.is_empty() {
2833                trace!("sending refresh query for PTR: {}", ty_domain);
2834                self.send_query(ty_domain, RRType::PTR);
2835                query_ptr_count += 1;
2836                new_timers.extend(refreshed_timers);
2837            }
2838
2839            let (instances, timers) = self.cache.refresh_due_srv(ty_domain);
2840            for instance in instances.iter() {
2841                trace!("sending refresh query for SRV: {}", instance);
2842                self.send_query(instance, RRType::SRV);
2843                query_srv_count += 1;
2844            }
2845            new_timers.extend(timers);
2846            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2847            for hostname in hostnames.iter() {
2848                trace!("sending refresh queries for A and AAAA:  {}", hostname);
2849                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2850                query_addr_count += 2;
2851            }
2852            new_timers.extend(timers);
2853        }
2854
2855        for timer in new_timers {
2856            self.add_timer(timer);
2857        }
2858
2859        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2860        self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
2861        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2862    }
2863}
2864
2865/// All possible events sent to the client from the daemon
2866/// regarding service discovery.
2867#[derive(Debug)]
2868pub enum ServiceEvent {
2869    /// Started searching for a service type.
2870    SearchStarted(String),
2871
2872    /// Found a specific (service_type, fullname).
2873    ServiceFound(String, String),
2874
2875    /// Resolved a service instance with detailed info.
2876    ServiceResolved(ServiceInfo),
2877
2878    /// A service instance (service_type, fullname) was removed.
2879    ServiceRemoved(String, String),
2880
2881    /// Stopped searching for a service type.
2882    SearchStopped(String),
2883}
2884
2885/// All possible events sent to the client from the daemon
2886/// regarding host resolution.
2887#[derive(Debug)]
2888#[non_exhaustive]
2889pub enum HostnameResolutionEvent {
2890    /// Started searching for the ip address of a hostname.
2891    SearchStarted(String),
2892    /// One or more addresses for a hostname has been found.
2893    AddressesFound(String, HashSet<IpAddr>),
2894    /// One or more addresses for a hostname has been removed.
2895    AddressesRemoved(String, HashSet<IpAddr>),
2896    /// The search for the ip address of a hostname has timed out.
2897    SearchTimeout(String),
2898    /// Stopped searching for the ip address of a hostname.
2899    SearchStopped(String),
2900}
2901
2902/// Some notable events from the daemon besides [`ServiceEvent`].
2903/// These events are expected to happen infrequently.
2904#[derive(Clone, Debug)]
2905#[non_exhaustive]
2906pub enum DaemonEvent {
2907    /// Daemon unsolicitly announced a service from an interface.
2908    Announce(String, String),
2909
2910    /// Daemon encountered an error.
2911    Error(Error),
2912
2913    /// Daemon detected a new IP address from the host.
2914    IpAdd(IpAddr),
2915
2916    /// Daemon detected a IP address removed from the host.
2917    IpDel(IpAddr),
2918
2919    /// Daemon resolved a name conflict by changing one of its names.
2920    /// see [DnsNameChange] for more details.
2921    NameChange(DnsNameChange),
2922
2923    /// Send out a multicast response via an IP address.
2924    Respond(IpAddr),
2925}
2926
2927/// Represents a name change due to a name conflict resolution.
2928/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
2929#[derive(Clone, Debug)]
2930pub struct DnsNameChange {
2931    /// The original name set in `ServiceInfo` by the user.
2932    pub original: String,
2933
2934    /// A new name is created by appending a suffix after the original name.
2935    ///
2936    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
2937    /// - for a host name, the suffix is `-N`, where N starts at 2.
2938    ///
2939    /// For example:
2940    ///
2941    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
2942    /// - Host name `foo.local.` becomes `foo-2.local.`
2943    pub new_name: String,
2944
2945    /// The resource record type
2946    pub rr_type: RRType,
2947
2948    /// The interface where the name conflict and its change happened.
2949    pub intf_name: String,
2950}
2951
2952/// Commands supported by the daemon
2953#[derive(Debug)]
2954enum Command {
2955    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
2956    Browse(String, u32, Sender<ServiceEvent>),
2957
2958    /// Resolve a hostname to IP addresses.
2959    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
2960
2961    /// Register a service
2962    Register(ServiceInfo),
2963
2964    /// Unregister a service
2965    Unregister(String, Sender<UnregisterStatus>), // (fullname)
2966
2967    /// Announce again a service to local network
2968    RegisterResend(String, Interface), // (fullname)
2969
2970    /// Resend unregister packet.
2971    UnregisterResend(Vec<u8>, Interface), // (packet content)
2972
2973    /// Stop browsing a service type
2974    StopBrowse(String), // (ty_domain)
2975
2976    /// Stop resolving a hostname
2977    StopResolveHostname(String), // (hostname)
2978
2979    /// Send query to resolve a service instance.
2980    /// This is used when a PTR record exists but SRV & TXT records are missing.
2981    Resolve(String, u16), // (service_instance_fullname, try_count)
2982
2983    /// Read the current values of the counters
2984    GetMetrics(Sender<Metrics>),
2985
2986    /// Get the current status of the daemon.
2987    GetStatus(Sender<DaemonStatus>),
2988
2989    /// Monitor noticable events in the daemon.
2990    Monitor(Sender<DaemonEvent>),
2991
2992    SetOption(DaemonOption),
2993
2994    /// Proactively confirm a DNS resource record.
2995    ///
2996    /// The intention is to check if a service name or IP address still valid
2997    /// before its TTL expires.
2998    Verify(String, Duration),
2999
3000    Exit(Sender<DaemonStatus>),
3001}
3002
3003impl fmt::Display for Command {
3004    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3005        match self {
3006            Self::Browse(_, _, _) => write!(f, "Command Browse"),
3007            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3008            Self::Exit(_) => write!(f, "Command Exit"),
3009            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3010            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3011            Self::Monitor(_) => write!(f, "Command Monitor"),
3012            Self::Register(_) => write!(f, "Command Register"),
3013            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3014            Self::SetOption(_) => write!(f, "Command SetOption"),
3015            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3016            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3017            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3018            Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3019            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3020            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3021        }
3022    }
3023}
3024
3025#[derive(Debug)]
3026enum DaemonOption {
3027    ServiceNameLenMax(u8),
3028    EnableInterface(Vec<IfKind>),
3029    DisableInterface(Vec<IfKind>),
3030    MulticastLoopV4(bool),
3031    MulticastLoopV6(bool),
3032}
3033
3034/// The length of Service Domain name supported in this lib.
3035const DOMAIN_LEN: usize = "._tcp.local.".len();
3036
3037/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3038fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3039    if ty_domain.len() <= DOMAIN_LEN + 1 {
3040        // service name cannot be empty or only '_'.
3041        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3042    }
3043
3044    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3045    if service_name_len > limit as usize {
3046        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3047    }
3048    Ok(())
3049}
3050
3051/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3052fn check_domain_suffix(name: &str) -> Result<()> {
3053    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3054        return Err(e_fmt!(
3055            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3056            name
3057        ));
3058    }
3059
3060    Ok(())
3061}
3062
3063/// Validate the service name in a fully qualified name.
3064///
3065/// A Full Name = <Instance>.<Service>.<Domain>
3066/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3067///
3068/// Note: this function does not check for the length of the service name.
3069/// Instead, `register_service` method will check the length.
3070fn check_service_name(fullname: &str) -> Result<()> {
3071    check_domain_suffix(fullname)?;
3072
3073    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3074    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3075
3076    if &name[0..1] != "_" {
3077        return Err(e_fmt!("Service name must start with '_'"));
3078    }
3079
3080    let name = &name[1..];
3081
3082    if name.contains("--") {
3083        return Err(e_fmt!("Service name must not contain '--'"));
3084    }
3085
3086    if name.starts_with('-') || name.ends_with('-') {
3087        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3088    }
3089
3090    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3091    if ascii_count < 1 {
3092        return Err(e_fmt!(
3093            "Service name must contain at least one letter (eg: 'A-Za-z')"
3094        ));
3095    }
3096
3097    Ok(())
3098}
3099
3100/// Validate a hostname.
3101fn check_hostname(hostname: &str) -> Result<()> {
3102    if !hostname.ends_with(".local.") {
3103        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3104    }
3105
3106    if hostname == ".local." {
3107        return Err(e_fmt!(
3108            "The part of the hostname before '.local.' cannot be empty"
3109        ));
3110    }
3111
3112    if hostname.len() > 255 {
3113        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3114    }
3115
3116    Ok(())
3117}
3118
3119fn call_service_listener(
3120    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3121    ty_domain: &str,
3122    event: ServiceEvent,
3123) {
3124    if let Some(listener) = listeners_map.get(ty_domain) {
3125        match listener.send(event) {
3126            Ok(()) => trace!("Sent event to listener successfully"),
3127            Err(e) => debug!("Failed to send event: {}", e),
3128        }
3129    }
3130}
3131
3132fn call_hostname_resolution_listener(
3133    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3134    hostname: &str,
3135    event: HostnameResolutionEvent,
3136) {
3137    if let Some(listener) = listeners_map.get(hostname).map(|(l, _)| l) {
3138        match listener.send(event) {
3139            Ok(()) => trace!("Sent event to listener successfully"),
3140            Err(e) => debug!("Failed to send event: {}", e),
3141        }
3142    }
3143}
3144
3145/// Returns valid network interfaces in the host system.
3146/// Loopback interfaces are excluded.
3147fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3148    if_addrs::get_if_addrs()
3149        .unwrap_or_default()
3150        .into_iter()
3151        .filter(|i| !i.is_loopback() || with_loopback)
3152        .collect()
3153}
3154
3155/// Send an outgoing mDNS query or response, and returns the packet bytes.
3156fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3157    let qtype = if out.is_query() { "query" } else { "response" };
3158    trace!(
3159        "send outgoing {}: {} questions {} answers {} authorities {} additional",
3160        qtype,
3161        out.questions().len(),
3162        out.answers_count(),
3163        out.authorities().len(),
3164        out.additionals().len()
3165    );
3166    let packet_list = out.to_data_on_wire();
3167    for packet in packet_list.iter() {
3168        multicast_on_intf(packet, intf, sock);
3169    }
3170    packet_list
3171}
3172
3173/// Sends a multicast packet, and returns the packet bytes.
3174fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3175    if packet.len() > MAX_MSG_ABSOLUTE {
3176        debug!("Drop over-sized packet ({})", packet.len());
3177        return;
3178    }
3179
3180    let addr: SocketAddr = match intf.addr {
3181        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3182        if_addrs::IfAddr::V6(_) => {
3183            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3184            sock.set_scope_id(intf.index.unwrap_or(0)); // Choose iface for multicast
3185            sock.into()
3186        }
3187    };
3188
3189    send_packet(packet, addr, intf, socket);
3190}
3191
3192/// Sends out `packet` to `addr` on the socket in `intf_sock`.
3193fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3194    match sock.send_to(packet, addr) {
3195        Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3196        Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3197    }
3198}
3199
3200/// Returns true if `name` is a valid instance name of format:
3201/// <instance>.<service_type>.<_udp|_tcp>.local.
3202/// Note: <instance> could contain '.' as well.
3203fn valid_instance_name(name: &str) -> bool {
3204    name.split('.').count() >= 5
3205}
3206
3207fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3208    monitors.retain(|sender| {
3209        if let Err(e) = sender.try_send(event.clone()) {
3210            debug!("notify_monitors: try_send: {}", &e);
3211            if matches!(e, TrySendError::Disconnected(_)) {
3212                return false; // This monitor is dropped.
3213            }
3214        }
3215        true
3216    });
3217}
3218
3219/// Check if all unique records passed "probing", and if yes, create a packet
3220/// to announce the service.
3221fn prepare_announce(
3222    info: &ServiceInfo,
3223    intf: &Interface,
3224    dns_registry: &mut DnsRegistry,
3225) -> Option<DnsOutgoing> {
3226    let intf_addrs = info.get_addrs_on_intf(intf);
3227    if intf_addrs.is_empty() {
3228        trace!("No valid addrs to add on intf {:?}", &intf);
3229        return None;
3230    }
3231
3232    // check if we changed our name due to conflicts.
3233    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3234        Some(new_name) => new_name,
3235        None => info.get_fullname(),
3236    };
3237
3238    debug!(
3239        "prepare to announce service {service_fullname} on {}: {}",
3240        &intf.name,
3241        &intf.ip()
3242    );
3243
3244    let mut probing_count = 0;
3245    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3246    let create_time = current_time_millis() + fastrand::u64(0..250);
3247
3248    out.add_answer_at_time(
3249        DnsPointer::new(
3250            info.get_type(),
3251            RRType::PTR,
3252            CLASS_IN,
3253            info.get_other_ttl(),
3254            service_fullname.to_string(),
3255        ),
3256        0,
3257    );
3258
3259    if let Some(sub) = info.get_subtype() {
3260        trace!("Adding subdomain {}", sub);
3261        out.add_answer_at_time(
3262            DnsPointer::new(
3263                sub,
3264                RRType::PTR,
3265                CLASS_IN,
3266                info.get_other_ttl(),
3267                service_fullname.to_string(),
3268            ),
3269            0,
3270        );
3271    }
3272
3273    // SRV records.
3274    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3275        Some(new_name) => new_name.to_string(),
3276        None => info.get_hostname().to_string(),
3277    };
3278
3279    let mut srv = DnsSrv::new(
3280        info.get_fullname(),
3281        CLASS_IN | CLASS_CACHE_FLUSH,
3282        info.get_host_ttl(),
3283        info.get_priority(),
3284        info.get_weight(),
3285        info.get_port(),
3286        hostname,
3287    );
3288
3289    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3290        srv.get_record_mut().set_new_name(new_name.to_string());
3291    }
3292
3293    if !info.requires_probe()
3294        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3295    {
3296        out.add_answer_at_time(srv, 0);
3297    } else {
3298        probing_count += 1;
3299    }
3300
3301    // TXT records.
3302
3303    let mut txt = DnsTxt::new(
3304        info.get_fullname(),
3305        CLASS_IN | CLASS_CACHE_FLUSH,
3306        info.get_other_ttl(),
3307        info.generate_txt(),
3308    );
3309
3310    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3311        txt.get_record_mut().set_new_name(new_name.to_string());
3312    }
3313
3314    if !info.requires_probe()
3315        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3316    {
3317        out.add_answer_at_time(txt, 0);
3318    } else {
3319        probing_count += 1;
3320    }
3321
3322    // Address records. (A and AAAA)
3323
3324    let hostname = info.get_hostname();
3325    for address in intf_addrs {
3326        let mut dns_addr = DnsAddress::new(
3327            hostname,
3328            ip_address_rr_type(&address),
3329            CLASS_IN | CLASS_CACHE_FLUSH,
3330            info.get_host_ttl(),
3331            address,
3332        );
3333
3334        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3335            dns_addr.get_record_mut().set_new_name(new_name.to_string());
3336        }
3337
3338        if !info.requires_probe()
3339            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3340        {
3341            out.add_answer_at_time(dns_addr, 0);
3342        } else {
3343            probing_count += 1;
3344        }
3345    }
3346
3347    if probing_count > 0 {
3348        return None;
3349    }
3350
3351    Some(out)
3352}
3353
3354/// Send an unsolicited response for owned service via `intf` and `sock`.
3355/// Returns true if sent out successfully.
3356fn announce_service_on_intf(
3357    dns_registry: &mut DnsRegistry,
3358    info: &ServiceInfo,
3359    intf: &Interface,
3360    sock: &MioUdpSocket,
3361) -> bool {
3362    if let Some(out) = prepare_announce(info, intf, dns_registry) {
3363        send_dns_outgoing(&out, intf, sock);
3364        return true;
3365    }
3366    false
3367}
3368
3369/// Returns a new name based on the `original` to avoid conflicts.
3370/// If the name already contains a number in parentheses, increments that number.
3371///
3372/// Examples:
3373/// - `foo.local.` becomes `foo (2).local.`
3374/// - `foo (2).local.` becomes `foo (3).local.`
3375/// - `foo (9)` becomes `foo (10)`
3376fn name_change(original: &str) -> String {
3377    let mut parts: Vec<_> = original.split('.').collect();
3378    let Some(first_part) = parts.get_mut(0) else {
3379        return format!("{original} (2)");
3380    };
3381
3382    let mut new_name = format!("{} (2)", first_part);
3383
3384    // check if there is already has `(<num>)` suffix.
3385    if let Some(paren_pos) = first_part.rfind(" (") {
3386        // Check if there's a closing parenthesis
3387        if let Some(end_paren) = first_part[paren_pos..].find(')') {
3388            let absolute_end_pos = paren_pos + end_paren;
3389            // Only process if the closing parenthesis is the last character
3390            if absolute_end_pos == first_part.len() - 1 {
3391                let num_start = paren_pos + 2; // Skip " ("
3392                                               // Try to parse the number between parentheses
3393                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3394                    let base_name = &first_part[..paren_pos];
3395                    new_name = format!("{} ({})", base_name, number + 1)
3396                }
3397            }
3398        }
3399    }
3400
3401    *first_part = &new_name;
3402    parts.join(".")
3403}
3404
3405/// Returns a new name based on the `original` to avoid conflicts.
3406/// If the name already contains a hyphenated number, increments that number.
3407///
3408/// Examples:
3409/// - `foo.local.` becomes `foo-2.local.`
3410/// - `foo-2.local.` becomes `foo-3.local.`
3411/// - `foo` becomes `foo-2`
3412fn hostname_change(original: &str) -> String {
3413    let mut parts: Vec<_> = original.split('.').collect();
3414    let Some(first_part) = parts.get_mut(0) else {
3415        return format!("{original}-2");
3416    };
3417
3418    let mut new_name = format!("{}-2", first_part);
3419
3420    // check if there is already a `-<num>` suffix
3421    if let Some(hyphen_pos) = first_part.rfind('-') {
3422        // Try to parse everything after the hyphen as a number
3423        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3424            let base_name = &first_part[..hyphen_pos];
3425            new_name = format!("{}-{}", base_name, number + 1);
3426        }
3427    }
3428
3429    *first_part = &new_name;
3430    parts.join(".")
3431}
3432
3433fn add_answer_with_additionals(
3434    out: &mut DnsOutgoing,
3435    msg: &DnsIncoming,
3436    service: &ServiceInfo,
3437    intf: &Interface,
3438    dns_registry: &DnsRegistry,
3439) {
3440    let intf_addrs = service.get_addrs_on_intf(intf);
3441    if intf_addrs.is_empty() {
3442        trace!("No addrs on LAN of intf {:?}", intf);
3443        return;
3444    }
3445
3446    // check if we changed our name due to conflicts.
3447    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3448        Some(new_name) => new_name,
3449        None => service.get_fullname(),
3450    };
3451
3452    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3453        Some(new_name) => new_name,
3454        None => service.get_hostname(),
3455    };
3456
3457    let ptr_added = out.add_answer(
3458        msg,
3459        DnsPointer::new(
3460            service.get_type(),
3461            RRType::PTR,
3462            CLASS_IN,
3463            service.get_other_ttl(),
3464            service_fullname.to_string(),
3465        ),
3466    );
3467
3468    if !ptr_added {
3469        trace!("answer was not added for msg {:?}", msg);
3470        return;
3471    }
3472
3473    if let Some(sub) = service.get_subtype() {
3474        trace!("Adding subdomain {}", sub);
3475        out.add_additional_answer(DnsPointer::new(
3476            sub,
3477            RRType::PTR,
3478            CLASS_IN,
3479            service.get_other_ttl(),
3480            service_fullname.to_string(),
3481        ));
3482    }
3483
3484    // Add recommended additional answers according to
3485    // https://tools.ietf.org/html/rfc6763#section-12.1.
3486    out.add_additional_answer(DnsSrv::new(
3487        service_fullname,
3488        CLASS_IN | CLASS_CACHE_FLUSH,
3489        service.get_host_ttl(),
3490        service.get_priority(),
3491        service.get_weight(),
3492        service.get_port(),
3493        hostname.to_string(),
3494    ));
3495
3496    out.add_additional_answer(DnsTxt::new(
3497        service_fullname,
3498        CLASS_IN | CLASS_CACHE_FLUSH,
3499        service.get_host_ttl(),
3500        service.generate_txt(),
3501    ));
3502
3503    for address in intf_addrs {
3504        out.add_additional_answer(DnsAddress::new(
3505            hostname,
3506            ip_address_rr_type(&address),
3507            CLASS_IN | CLASS_CACHE_FLUSH,
3508            service.get_host_ttl(),
3509            address,
3510        ));
3511    }
3512}
3513
3514#[cfg(test)]
3515mod tests {
3516    use super::{
3517        check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3518        name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3519        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3520        MDNS_PORT,
3521    };
3522    use crate::{
3523        dns_parser::{DnsOutgoing, DnsPointer, RRType, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE},
3524        service_daemon::check_hostname,
3525    };
3526    use std::{
3527        net::{SocketAddr, SocketAddrV4},
3528        time::Duration,
3529    };
3530    use test_log::test;
3531
3532    #[test]
3533    fn test_socketaddr_print() {
3534        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3535        let print = format!("{}", addr);
3536        assert_eq!(print, "224.0.0.251:5353");
3537    }
3538
3539    #[test]
3540    fn test_instance_name() {
3541        assert!(valid_instance_name("my-laser._printer._tcp.local."));
3542        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3543        assert!(!valid_instance_name("_printer._tcp.local."));
3544    }
3545
3546    #[test]
3547    fn test_check_service_name_length() {
3548        let result = check_service_name_length("_tcp", 100);
3549        assert!(result.is_err());
3550        if let Err(e) = result {
3551            println!("{}", e);
3552        }
3553    }
3554
3555    #[test]
3556    fn test_check_hostname() {
3557        // valid hostnames
3558        for hostname in &[
3559            "my_host.local.",
3560            &("A".repeat(255 - ".local.".len()) + ".local."),
3561        ] {
3562            let result = check_hostname(hostname);
3563            assert!(result.is_ok());
3564        }
3565
3566        // erroneous hostnames
3567        for hostname in &[
3568            "my_host.local",
3569            ".local.",
3570            &("A".repeat(256 - ".local.".len()) + ".local."),
3571        ] {
3572            let result = check_hostname(hostname);
3573            assert!(result.is_err());
3574            if let Err(e) = result {
3575                println!("{}", e);
3576            }
3577        }
3578    }
3579
3580    #[test]
3581    fn test_check_domain_suffix() {
3582        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3583        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3584        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3585        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3586        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3587        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3588    }
3589
3590    #[test]
3591    fn test_service_with_temporarily_invalidated_ptr() {
3592        // Create a daemon
3593        let d = ServiceDaemon::new().expect("Failed to create daemon");
3594
3595        let service = "_test_inval_ptr._udp.local.";
3596        let host_name = "my_host_tmp_invalidated_ptr.local.";
3597        let intfs: Vec<_> = my_ip_interfaces(false);
3598        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3599        let port = 5201;
3600        let my_service =
3601            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3602                .expect("invalid service info")
3603                .enable_addr_auto();
3604        let result = d.register(my_service.clone());
3605        assert!(result.is_ok());
3606
3607        // Browse for a service
3608        let browse_chan = d.browse(service).unwrap();
3609        let timeout = Duration::from_secs(2);
3610        let mut resolved = false;
3611
3612        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3613            match event {
3614                ServiceEvent::ServiceResolved(info) => {
3615                    resolved = true;
3616                    println!("Resolved a service of {}", &info.get_fullname());
3617                    break;
3618                }
3619                e => {
3620                    println!("Received event {:?}", e);
3621                }
3622            }
3623        }
3624
3625        assert!(resolved);
3626
3627        println!("Stopping browse of {}", service);
3628        // Pause browsing so restarting will cause a new immediate query.
3629        // Unregistering will not work here, it will invalidate all the records.
3630        d.stop_browse(service).unwrap();
3631
3632        // Ensure the search is stopped.
3633        // Reduces the chance of receiving an answer adding the ptr back to the
3634        // cache causing the later browse to return directly from the cache.
3635        // (which invalidates what this test is trying to test for.)
3636        let mut stopped = false;
3637        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3638            match event {
3639                ServiceEvent::SearchStopped(_) => {
3640                    stopped = true;
3641                    println!("Stopped browsing service");
3642                    break;
3643                }
3644                // Other `ServiceResolved` messages may be received
3645                // here as they come from different interfaces.
3646                // That's fine for this test.
3647                e => {
3648                    println!("Received event {:?}", e);
3649                }
3650            }
3651        }
3652
3653        assert!(stopped);
3654
3655        // Invalidate the ptr from the service to the host.
3656        let invalidate_ptr_packet = DnsPointer::new(
3657            my_service.get_type(),
3658            RRType::PTR,
3659            CLASS_IN,
3660            0,
3661            my_service.get_fullname().to_string(),
3662        );
3663
3664        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3665        packet_buffer.add_additional_answer(invalidate_ptr_packet);
3666
3667        for intf in intfs {
3668            let sock = new_socket_bind(&intf, true).unwrap();
3669            send_dns_outgoing(&packet_buffer, &intf, &sock);
3670        }
3671
3672        println!(
3673            "Sent PTR record invalidation. Starting second browse for {}",
3674            service
3675        );
3676
3677        // Restart the browse to force the sender to re-send the announcements.
3678        let browse_chan = d.browse(service).unwrap();
3679
3680        resolved = false;
3681        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3682            match event {
3683                ServiceEvent::ServiceResolved(info) => {
3684                    resolved = true;
3685                    println!("Resolved a service of {}", &info.get_fullname());
3686                    break;
3687                }
3688                e => {
3689                    println!("Received event {:?}", e);
3690                }
3691            }
3692        }
3693
3694        assert!(resolved);
3695        d.shutdown().unwrap();
3696    }
3697
3698    #[test]
3699    fn test_expired_srv() {
3700        // construct service info
3701        let service_type = "_expired-srv._udp.local.";
3702        let instance = "test_instance";
3703        let host_name = "expired_srv_host.local.";
3704        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3705            .unwrap()
3706            .enable_addr_auto();
3707        // let fullname = my_service.get_fullname().to_string();
3708
3709        // set SRV to expire soon.
3710        let new_ttl = 3; // for testing only.
3711        my_service._set_host_ttl(new_ttl);
3712
3713        // register my service
3714        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3715        let result = mdns_server.register(my_service);
3716        assert!(result.is_ok());
3717
3718        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3719        let browse_chan = mdns_client.browse(service_type).unwrap();
3720        let timeout = Duration::from_secs(2);
3721        let mut resolved = false;
3722
3723        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3724            match event {
3725                ServiceEvent::ServiceResolved(info) => {
3726                    resolved = true;
3727                    println!("Resolved a service of {}", &info.get_fullname());
3728                    break;
3729                }
3730                _ => {}
3731            }
3732        }
3733
3734        assert!(resolved);
3735
3736        // Exit the server so that no more responses.
3737        mdns_server.shutdown().unwrap();
3738
3739        // SRV record in the client cache will expire.
3740        let expire_timeout = Duration::from_secs(new_ttl as u64);
3741        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3742            match event {
3743                ServiceEvent::ServiceRemoved(service_type, full_name) => {
3744                    println!("Service removed: {}: {}", &service_type, &full_name);
3745                    break;
3746                }
3747                _ => {}
3748            }
3749        }
3750    }
3751
3752    #[test]
3753    fn test_hostname_resolution_address_removed() {
3754        // Create a mDNS server
3755        let server = ServiceDaemon::new().expect("Failed to create server");
3756        let hostname = "addr_remove_host._tcp.local.";
3757        let service_ip_addr = my_ip_interfaces(false)
3758            .iter()
3759            .find(|iface| iface.ip().is_ipv4())
3760            .map(|iface| iface.ip())
3761            .unwrap();
3762
3763        let mut my_service = ServiceInfo::new(
3764            "_host_res_test._tcp.local.",
3765            "my_instance",
3766            hostname,
3767            &service_ip_addr,
3768            1234,
3769            None,
3770        )
3771        .expect("invalid service info");
3772
3773        // Set a short TTL for addresses for testing.
3774        let addr_ttl = 2;
3775        my_service._set_host_ttl(addr_ttl); // Expire soon
3776
3777        server.register(my_service).unwrap();
3778
3779        // Create a mDNS client for resolving the hostname.
3780        let client = ServiceDaemon::new().expect("Failed to create client");
3781        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3782        let resolved = loop {
3783            match event_receiver.recv() {
3784                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3785                    assert!(found_hostname == hostname);
3786                    assert!(addresses.contains(&service_ip_addr));
3787                    println!("address found: {:?}", &addresses);
3788                    break true;
3789                }
3790                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3791                Ok(_event) => {}
3792                Err(_) => break false,
3793            }
3794        };
3795
3796        assert!(resolved);
3797
3798        // Shutdown the server so no more responses / refreshes for addresses.
3799        server.shutdown().unwrap();
3800
3801        // Wait till hostname address record expires, with 1 second grace period.
3802        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3803        let removed = loop {
3804            match event_receiver.recv_timeout(timeout) {
3805                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3806                    assert!(removed_host == hostname);
3807                    assert!(addresses.contains(&service_ip_addr));
3808
3809                    println!(
3810                        "address removed: hostname: {} addresses: {:?}",
3811                        &hostname, &addresses
3812                    );
3813                    break true;
3814                }
3815                Ok(_event) => {}
3816                Err(_) => {
3817                    break false;
3818                }
3819            }
3820        };
3821
3822        assert!(removed);
3823
3824        client.shutdown().unwrap();
3825    }
3826
3827    #[test]
3828    fn test_refresh_ptr() {
3829        // construct service info
3830        let service_type = "_refresh-ptr._udp.local.";
3831        let instance = "test_instance";
3832        let host_name = "refresh_ptr_host.local.";
3833        let service_ip_addr = my_ip_interfaces(false)
3834            .iter()
3835            .find(|iface| iface.ip().is_ipv4())
3836            .map(|iface| iface.ip())
3837            .unwrap();
3838
3839        let mut my_service = ServiceInfo::new(
3840            service_type,
3841            instance,
3842            host_name,
3843            &service_ip_addr,
3844            5023,
3845            None,
3846        )
3847        .unwrap();
3848
3849        let new_ttl = 3; // for testing only.
3850        my_service._set_other_ttl(new_ttl);
3851
3852        // register my service
3853        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3854        let result = mdns_server.register(my_service);
3855        assert!(result.is_ok());
3856
3857        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3858        let browse_chan = mdns_client.browse(service_type).unwrap();
3859        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
3860        let mut resolved = false;
3861
3862        // resolve the service first.
3863        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3864            match event {
3865                ServiceEvent::ServiceResolved(info) => {
3866                    resolved = true;
3867                    println!("Resolved a service of {}", &info.get_fullname());
3868                    break;
3869                }
3870                _ => {}
3871            }
3872        }
3873
3874        assert!(resolved);
3875
3876        // wait over 80% of TTL, and refresh PTR should be sent out.
3877        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
3878        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3879            println!("event: {:?}", &event);
3880        }
3881
3882        // verify refresh counter.
3883        let metrics_chan = mdns_client.get_metrics().unwrap();
3884        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
3885        let refresh_counter = metrics["cache-refresh-ptr"];
3886        assert_eq!(refresh_counter, 1);
3887
3888        // Exit the server so that no more responses.
3889        mdns_server.shutdown().unwrap();
3890        mdns_client.shutdown().unwrap();
3891    }
3892
3893    #[test]
3894    fn test_name_change() {
3895        assert_eq!(name_change("foo.local."), "foo (2).local.");
3896        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
3897        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
3898        assert_eq!(name_change("foo"), "foo (2)");
3899        assert_eq!(name_change("foo (2)"), "foo (3)");
3900        assert_eq!(name_change(""), " (2)");
3901
3902        // Additional edge cases
3903        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
3904        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
3905        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
3906    }
3907
3908    #[test]
3909    fn test_hostname_change() {
3910        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
3911        assert_eq!(hostname_change("foo"), "foo-2");
3912        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
3913        assert_eq!(hostname_change("foo-9"), "foo-10");
3914        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
3915    }
3916}