mdns_sd/
service_daemon.rs

1//! Service daemon for mDNS Service Discovery.
2
3// How DNS-based Service Discovery works in a nutshell:
4//
5// (excerpt from RFC 6763)
6// .... that a particular service instance can be
7//    described using a DNS SRV [RFC2782] and DNS TXT [RFC1035] record.
8//    The SRV record has a name of the form "<Instance>.<Service>.<Domain>"
9//    and gives the target host and port where the service instance can be
10//    reached.  The DNS TXT record of the same name gives additional
11//    information about this instance, in a structured form using key/value
12//    pairs, described in Section 6.  A client discovers the list of
13//    available instances of a given service type using a query for a DNS
14//    PTR [RFC1035] record with a name of the form "<Service>.<Domain>",
15//    which returns a set of zero or more names, which are the names of the
16//    aforementioned DNS SRV/TXT record pairs.
17//
18// Some naming conventions in this source code:
19//
20// `ty_domain` refers to service type together with domain name, i.e. <service>.<domain>.
21// Every <service> consists of two labels: service itself and "_udp." or "_tcp".
22// See RFC 6763 section 7 Service Names.
23//     for example: `_my-service._udp.local.`
24//
25// `fullname` refers to a full Service Instance Name, i.e. <instance>.<service>.<domain>
26//     for example: `my_home._my-service._udp.local.`
27//
28// In mDNS and DNS, the basic data structure is "Resource Record" (RR), where
29// in Service Discovery, the basic data structure is "Service Info". One Service Info
30// corresponds to a set of DNS Resource Records.
31#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34    dns_cache::{current_time_millis, DnsCache},
35    dns_parser::{
36        ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37        DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38        FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39    },
40    error::{e_fmt, Error, Result},
41    service_info::{
42        split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43    },
44    Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51    cmp::{self, Reverse},
52    collections::{BinaryHeap, HashMap, HashSet},
53    fmt,
54    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55    str, thread,
56    time::Duration,
57    vec,
58};
59
60/// The default max length of the service name without domain, not including the
61/// leading underscore (`_`). It is set to 15 per
62/// [RFC 6763 section 7.2](https://www.rfc-editor.org/rfc/rfc6763#section-7.2).
63pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65/// The default interval for checking IP changes automatically.
66pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68/// The default time out for [ServiceDaemon::verify] is 10 seconds, per
69/// [RFC 6762 section 10.4](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
70pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79/// Response status code for the service `unregister` call.
80#[derive(Debug)]
81pub enum UnregisterStatus {
82    /// Unregister was successful.
83    OK,
84    /// The service was not found in the registration.
85    NotFound,
86}
87
88/// Status code for the service daemon.
89#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92    /// The daemon is running as normal.
93    Running,
94
95    /// The daemon has been shutdown.
96    Shutdown,
97}
98
99/// Different counters included in the metrics.
100/// Currently all counters are for outgoing packets.
101#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103    Register,
104    RegisterResend,
105    Unregister,
106    UnregisterResend,
107    Browse,
108    ResolveHostname,
109    Respond,
110    CacheRefreshPTR,
111    CacheRefreshSrvTxt,
112    CacheRefreshAddr,
113    KnownAnswerSuppression,
114    CachedPTR,
115    CachedSRV,
116    CachedAddr,
117    CachedTxt,
118    CachedNSec,
119    CachedSubtype,
120    DnsRegistryProbe,
121    DnsRegistryActive,
122    DnsRegistryTimer,
123    DnsRegistryNameChange,
124    Timer,
125}
126
127impl fmt::Display for Counter {
128    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
129        match self {
130            Self::Register => write!(f, "register"),
131            Self::RegisterResend => write!(f, "register-resend"),
132            Self::Unregister => write!(f, "unregister"),
133            Self::UnregisterResend => write!(f, "unregister-resend"),
134            Self::Browse => write!(f, "browse"),
135            Self::ResolveHostname => write!(f, "resolve-hostname"),
136            Self::Respond => write!(f, "respond"),
137            Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
138            Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
139            Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
140            Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
141            Self::CachedPTR => write!(f, "cached-ptr"),
142            Self::CachedSRV => write!(f, "cached-srv"),
143            Self::CachedAddr => write!(f, "cached-addr"),
144            Self::CachedTxt => write!(f, "cached-txt"),
145            Self::CachedNSec => write!(f, "cached-nsec"),
146            Self::CachedSubtype => write!(f, "cached-subtype"),
147            Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
148            Self::DnsRegistryActive => write!(f, "dns-registry-active"),
149            Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
150            Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
151            Self::Timer => write!(f, "timer"),
152        }
153    }
154}
155
156/// The metrics is a HashMap of (name_key, i64_value).
157/// The main purpose is to help monitoring the mDNS packet traffic.
158pub type Metrics = HashMap<String, i64>;
159
160const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; // avoid to overlap with zc.poll_ids
161
162/// A daemon thread for mDNS
163///
164/// This struct provides a handle and an API to the daemon. It is cloneable.
165#[derive(Clone)]
166pub struct ServiceDaemon {
167    /// Sender handle of the channel to the daemon.
168    sender: Sender<Command>,
169
170    /// Send to this addr to signal that a `Command` is coming.
171    ///
172    /// The daemon listens on this addr together with other mDNS sockets,
173    /// to avoid busy polling the flume channel. If there is a way to poll
174    /// the channel and mDNS sockets together, then this can be removed.
175    signal_addr: SocketAddr,
176}
177
178impl ServiceDaemon {
179    /// Creates a new daemon and spawns a thread to run the daemon.
180    ///
181    /// The daemon (re)uses the default mDNS port 5353. To keep it simple, we don't
182    /// ask callers to set the port.
183    pub fn new() -> Result<Self> {
184        // Use port 0 to allow the system assign a random available port,
185        // no need for a pre-defined port number.
186        let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
187
188        let signal_sock = UdpSocket::bind(signal_addr)
189            .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
190
191        // Get the socket with the OS chosen port
192        let signal_addr = signal_sock
193            .local_addr()
194            .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
195
196        // Must be nonblocking so we can listen to it together with mDNS sockets.
197        signal_sock
198            .set_nonblocking(true)
199            .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
200
201        let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
202
203        let (sender, receiver) = bounded(100);
204
205        // Spawn the daemon thread
206        let mio_sock = MioUdpSocket::from_std(signal_sock);
207        thread::Builder::new()
208            .name("mDNS_daemon".to_string())
209            .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
210            .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
211
212        Ok(Self {
213            sender,
214            signal_addr,
215        })
216    }
217
218    /// Sends `cmd` to the daemon via its channel, and sends a signal
219    /// to its sock addr to notify.
220    fn send_cmd(&self, cmd: Command) -> Result<()> {
221        let cmd_name = cmd.to_string();
222
223        // First, send to the flume channel.
224        self.sender.try_send(cmd).map_err(|e| match e {
225            TrySendError::Full(_) => Error::Again,
226            e => e_fmt!("flume::channel::send failed: {}", e),
227        })?;
228
229        // Second, send a signal to notify the daemon.
230        let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
231        let socket = UdpSocket::bind(addr)
232            .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
233        socket
234            .send_to(cmd_name.as_bytes(), self.signal_addr)
235            .map_err(|e| {
236                e_fmt!(
237                    "signal socket send_to {} ({}) failed: {}",
238                    self.signal_addr,
239                    cmd_name,
240                    e
241                )
242            })?;
243
244        Ok(())
245    }
246
247    /// Starts browsing for a specific service type.
248    ///
249    /// `service_type` must end with a valid mDNS domain: '._tcp.local.' or '._udp.local.'
250    ///
251    /// Returns a channel `Receiver` to receive events about the service. The caller
252    /// can call `.recv_async().await` on this receiver to handle events in an
253    /// async environment or call `.recv()` in a sync environment.
254    ///
255    /// When a new instance is found, the daemon automatically tries to resolve, i.e.
256    /// finding more details, i.e. SRV records and TXT records.
257    pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
258        check_domain_suffix(service_type)?;
259
260        let (resp_s, resp_r) = bounded(10);
261        self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
262        Ok(resp_r)
263    }
264
265    /// Stops searching for a specific service type.
266    ///
267    /// When an error is returned, the caller should retry only when
268    /// the error is `Error::Again`, otherwise should log and move on.
269    pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
270        self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
271    }
272
273    /// Starts querying for the ip addresses of a hostname.
274    ///
275    /// Returns a channel `Receiver` to receive events about the hostname.
276    /// The caller can call `.recv_async().await` on this receiver to handle events in an
277    /// async environment or call `.recv()` in a sync environment.
278    ///
279    /// The `timeout` is specified in milliseconds.
280    pub fn resolve_hostname(
281        &self,
282        hostname: &str,
283        timeout: Option<u64>,
284    ) -> Result<Receiver<HostnameResolutionEvent>> {
285        check_hostname(hostname)?;
286        let (resp_s, resp_r) = bounded(10);
287        self.send_cmd(Command::ResolveHostname(
288            hostname.to_string(),
289            1,
290            resp_s,
291            timeout,
292        ))?;
293        Ok(resp_r)
294    }
295
296    /// Stops querying for the ip addresses of a hostname.
297    ///
298    /// When an error is returned, the caller should retry only when
299    /// the error is `Error::Again`, otherwise should log and move on.
300    pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
301        self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
302    }
303
304    /// Registers a service provided by this host.
305    ///
306    /// If `service_info` has no addresses yet and its `addr_auto` is enabled,
307    /// this method will automatically fill in addresses from the host.
308    ///
309    /// To re-announce a service with an updated `service_info`, just call
310    /// this `register` function again. No need to call `unregister` first.
311    pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
312        check_service_name(service_info.get_fullname())?;
313        check_hostname(service_info.get_hostname())?;
314
315        self.send_cmd(Command::Register(service_info))
316    }
317
318    /// Unregisters a service. This is a graceful shutdown of a service.
319    ///
320    /// Returns a channel receiver that is used to receive the status code
321    /// of the unregister.
322    ///
323    /// When an error is returned, the caller should retry only when
324    /// the error is `Error::Again`, otherwise should log and move on.
325    pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
326        let (resp_s, resp_r) = bounded(1);
327        self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
328        Ok(resp_r)
329    }
330
331    /// Starts to monitor events from the daemon.
332    ///
333    /// Returns a channel [`Receiver`] of [`DaemonEvent`].
334    pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
335        let (resp_s, resp_r) = bounded(100);
336        self.send_cmd(Command::Monitor(resp_s))?;
337        Ok(resp_r)
338    }
339
340    /// Shuts down the daemon thread and returns a channel to receive the status.
341    ///
342    /// When an error is returned, the caller should retry only when
343    /// the error is `Error::Again`, otherwise should log and move on.
344    pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
345        let (resp_s, resp_r) = bounded(1);
346        self.send_cmd(Command::Exit(resp_s))?;
347        Ok(resp_r)
348    }
349
350    /// Returns the status of the daemon.
351    ///
352    /// When an error is returned, the caller should retry only when
353    /// the error is `Error::Again`, otherwise should consider the daemon
354    /// stopped working and move on.
355    pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
356        let (resp_s, resp_r) = bounded(1);
357
358        if self.sender.is_disconnected() {
359            resp_s
360                .send(DaemonStatus::Shutdown)
361                .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
362        } else {
363            self.send_cmd(Command::GetStatus(resp_s))?;
364        }
365
366        Ok(resp_r)
367    }
368
369    /// Returns a channel receiver for the metrics, e.g. input/output counters.
370    ///
371    /// The metrics returned is a snapshot. Hence the caller should call
372    /// this method repeatedly if they want to monitor the metrics continuously.
373    pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
374        let (resp_s, resp_r) = bounded(1);
375        self.send_cmd(Command::GetMetrics(resp_s))?;
376        Ok(resp_r)
377    }
378
379    /// Change the max length allowed for a service name.
380    ///
381    /// As RFC 6763 defines a length max for a service name, a user should not call
382    /// this method unless they have to. See [`SERVICE_NAME_LEN_MAX_DEFAULT`].
383    ///
384    /// `len_max` is capped at an internal limit, which is currently 30.
385    pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
386        const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; // Double the default length max.
387
388        if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
389            return Err(Error::Msg(format!(
390                "service name length max {} is too large",
391                len_max
392            )));
393        }
394
395        self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
396    }
397
398    /// Change the interval for checking IP changes automatically.
399    ///
400    /// Setting the interval to 0 disables the IP check.
401    ///
402    /// See [`IP_CHECK_INTERVAL_IN_SECS_DEFAULT`] for the default interval.
403    pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
404        let interval_in_millis = interval_in_secs as u64 * 1000;
405        self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
406            interval_in_millis,
407        )))
408    }
409
410    /// Get the current interval in seconds for checking IP changes automatically.
411    pub fn get_ip_check_interval(&self) -> Result<u32> {
412        let (resp_s, resp_r) = bounded(1);
413        self.send_cmd(Command::GetOption(resp_s))?;
414
415        let option = resp_r
416            .recv_timeout(Duration::from_secs(10))
417            .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
418        let ip_check_interval_in_secs = option.ip_check_interval / 1000;
419        Ok(ip_check_interval_in_secs as u32)
420    }
421
422    /// Include interfaces that match `if_kind` for this service daemon.
423    ///
424    /// For example:
425    /// ```ignore
426    ///     daemon.enable_interface("en0")?;
427    /// ```
428    pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
429        let if_kind_vec = if_kind.into_vec();
430        self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
431            if_kind_vec.kinds,
432        )))
433    }
434
435    /// Ignore/exclude interfaces that match `if_kind` for this daemon.
436    ///
437    /// For example:
438    /// ```ignore
439    ///     daemon.disable_interface(IfKind::IPv6)?;
440    /// ```
441    pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
442        let if_kind_vec = if_kind.into_vec();
443        self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
444            if_kind_vec.kinds,
445        )))
446    }
447
448    /// Enable or disable the loopback for locally sent multicast packets in IPv4.
449    ///
450    /// By default, multicast loop is enabled for IPv4. When disabled, a querier will not
451    /// receive announcements from a responder on the same host.
452    ///
453    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
454    ///
455    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
456    /// the UNIX version of the IP_MULTICAST_LOOP option:
457    ///
458    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
459    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
460    ///
461    /// Which means, in order NOT to receive localhost announcements, you want to call
462    /// this API on the querier side on Windows, but on the responder side on Unix.
463    pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
464        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
465    }
466
467    /// Enable or disable the loopback for locally sent multicast packets in IPv6.
468    ///
469    /// By default, multicast loop is enabled for IPv6. When disabled, a querier will not
470    /// receive announcements from a responder on the same host.
471    ///
472    /// Reference: <https://learn.microsoft.com/en-us/windows/win32/winsock/ip-multicast-2>
473    ///
474    /// "The Winsock version of the IP_MULTICAST_LOOP option is semantically different than
475    /// the UNIX version of the IP_MULTICAST_LOOP option:
476    ///
477    /// In Winsock, the IP_MULTICAST_LOOP option applies only to the receive path.
478    /// In the UNIX version, the IP_MULTICAST_LOOP option applies to the send path."
479    ///
480    /// Which means, in order NOT to receive localhost announcements, you want to call
481    /// this API on the querier side on Windows, but on the responder side on Unix.
482    pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
483        self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
484    }
485
486    /// Proactively confirms whether a service instance still valid.
487    ///
488    /// This call will issue queries for a service instance's SRV record and Address records.
489    ///
490    /// For `timeout`, most users should use [VERIFY_TIMEOUT_DEFAULT]
491    /// unless there is a reason not to follow RFC.
492    ///
493    /// If no response is received within `timeout`, the current resource
494    /// records will be flushed, and if needed, `ServiceRemoved` event will be
495    /// sent to active queriers.
496    ///
497    /// Reference: [RFC 6762](https://datatracker.ietf.org/doc/html/rfc6762#section-10.4)
498    pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
499        self.send_cmd(Command::Verify(instance_fullname, timeout))
500    }
501
502    fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
503        let zc = Zeroconf::new(signal_sock, poller);
504
505        if let Some(cmd) = Self::run(zc, receiver) {
506            match cmd {
507                Command::Exit(resp_s) => {
508                    // It is guaranteed that the receiver already dropped,
509                    // i.e. the daemon command channel closed.
510                    if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
511                        debug!("exit: failed to send response of shutdown: {}", e);
512                    }
513                }
514                _ => {
515                    debug!("Unexpected command: {:?}", cmd);
516                }
517            }
518        }
519    }
520
521    /// The main event loop of the daemon thread
522    ///
523    /// In each round, it will:
524    /// 1. select the listening sockets with a timeout.
525    /// 2. process the incoming packets if any.
526    /// 3. try_recv on its channel and execute commands.
527    /// 4. announce its registered services.
528    /// 5. process retransmissions if any.
529    fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
530        // Add the daemon's signal socket to the poller.
531        if let Err(e) = zc.poller.registry().register(
532            &mut zc.signal_sock,
533            mio::Token(SIGNAL_SOCK_EVENT_KEY),
534            mio::Interest::READABLE,
535        ) {
536            debug!("failed to add signal socket to the poller: {}", e);
537            return None;
538        }
539
540        // Add mDNS sockets to the poller.
541        for (intf, sock) in zc.intf_socks.iter_mut() {
542            let key =
543                Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
544
545            if let Err(e) =
546                zc.poller
547                    .registry()
548                    .register(sock, mio::Token(key), mio::Interest::READABLE)
549            {
550                debug!("add socket of {:?} to poller: {e}", intf);
551                return None;
552            }
553        }
554
555        // Setup timer for IP checks.
556        let mut next_ip_check = if zc.ip_check_interval > 0 {
557            current_time_millis() + zc.ip_check_interval
558        } else {
559            0
560        };
561
562        if next_ip_check > 0 {
563            zc.add_timer(next_ip_check);
564        }
565
566        // Start the run loop.
567
568        let mut events = mio::Events::with_capacity(1024);
569        loop {
570            let now = current_time_millis();
571
572            let earliest_timer = zc.peek_earliest_timer();
573            let timeout = earliest_timer.map(|timer| {
574                // If `timer` already passed, set `timeout` to be 1ms.
575                let millis = if timer > now { timer - now } else { 1 };
576                Duration::from_millis(millis)
577            });
578
579            // Process incoming packets, command events and optional timeout.
580            events.clear();
581            match zc.poller.poll(&mut events, timeout) {
582                Ok(_) => zc.handle_poller_events(&events),
583                Err(e) => debug!("failed to select from sockets: {}", e),
584            }
585
586            let now = current_time_millis();
587
588            // Remove the timers if already passed.
589            zc.pop_timers_till(now);
590
591            // Remove hostname resolvers with expired timeouts.
592            for hostname in zc
593                .hostname_resolvers
594                .clone()
595                .into_iter()
596                .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
597                .map(|(hostname, _)| hostname)
598            {
599                trace!("hostname resolver timeout for {}", &hostname);
600                call_hostname_resolution_listener(
601                    &zc.hostname_resolvers,
602                    &hostname,
603                    HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
604                );
605                call_hostname_resolution_listener(
606                    &zc.hostname_resolvers,
607                    &hostname,
608                    HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
609                );
610                zc.hostname_resolvers.remove(&hostname);
611            }
612
613            // process commands from the command channel
614            while let Ok(command) = receiver.try_recv() {
615                if matches!(command, Command::Exit(_)) {
616                    zc.status = DaemonStatus::Shutdown;
617                    return Some(command);
618                }
619                zc.exec_command(command, false);
620            }
621
622            // check for repeated commands and run them if their time is up.
623            let mut i = 0;
624            while i < zc.retransmissions.len() {
625                if now >= zc.retransmissions[i].next_time {
626                    let rerun = zc.retransmissions.remove(i);
627                    zc.exec_command(rerun.command, true);
628                } else {
629                    i += 1;
630                }
631            }
632
633            // Refresh cached service records with active queriers
634            zc.refresh_active_services();
635
636            // Refresh cached A/AAAA records with active queriers
637            let mut query_count = 0;
638            for (hostname, _sender) in zc.hostname_resolvers.iter() {
639                for (hostname, ip_addr) in
640                    zc.cache.refresh_due_hostname_resolutions(hostname).iter()
641                {
642                    zc.send_query(hostname, ip_address_rr_type(ip_addr));
643                    query_count += 1;
644                }
645            }
646
647            zc.increase_counter(Counter::CacheRefreshAddr, query_count);
648
649            // check and evict expired records in our cache
650            let now = current_time_millis();
651
652            // Notify service listeners about the expired records.
653            let expired_services = zc.cache.evict_expired_services(now);
654            zc.notify_service_removal(expired_services);
655
656            // Notify hostname listeners about the expired records.
657            let expired_addrs = zc.cache.evict_expired_addr(now);
658            for (hostname, addrs) in expired_addrs {
659                call_hostname_resolution_listener(
660                    &zc.hostname_resolvers,
661                    &hostname,
662                    HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
663                );
664                let instances = zc.cache.get_instances_on_host(&hostname);
665                let instance_set: HashSet<String> = instances.into_iter().collect();
666                zc.resolve_updated_instances(&instance_set);
667            }
668
669            // Send out probing queries.
670            zc.probing_handler();
671
672            // check IP changes if next_ip_check is reached.
673            if now >= next_ip_check && next_ip_check > 0 {
674                next_ip_check = now + zc.ip_check_interval;
675                zc.add_timer(next_ip_check);
676
677                zc.check_ip_changes();
678            }
679        }
680    }
681}
682
683/// Creates a new UDP socket that uses `intf` to send and recv multicast.
684fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
685    // Use the same socket for receiving and sending multicast packets.
686    // Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
687    let intf_ip = &intf.ip();
688    match intf_ip {
689        IpAddr::V4(ip) => {
690            let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
691            let sock = new_socket(addr.into(), true)?;
692
693            // Join mDNS group to receive packets.
694            sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
695                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
696
697            // Set IP_MULTICAST_IF to send packets.
698            sock.set_multicast_if_v4(ip)
699                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
700
701            if !should_loop {
702                sock.set_multicast_loop_v4(false)
703                    .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
704            }
705
706            // Test if we can send packets successfully.
707            let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
708            let test_packets = DnsOutgoing::new(0).to_data_on_wire();
709            for packet in test_packets {
710                sock.send_to(&packet, &multicast_addr)
711                    .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
712            }
713            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
714        }
715        IpAddr::V6(ip) => {
716            let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
717            let sock = new_socket(addr.into(), true)?;
718
719            // Join mDNS group to receive packets.
720            sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
721                .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
722
723            // Set IPV6_MULTICAST_IF to send packets.
724            sock.set_multicast_if_v6(intf.index.unwrap_or(0))
725                .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
726
727            // We are not sending multicast packets to test this socket as there might
728            // be many IPv6 interfaces on a host and could cause such send error:
729            // "No buffer space available (os error 55)".
730
731            Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
732        }
733    }
734}
735
736/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
737/// `non_block` indicates whether to set O_NONBLOCK for the socket.
738fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
739    let domain = match addr {
740        SocketAddr::V4(_) => socket2::Domain::IPV4,
741        SocketAddr::V6(_) => socket2::Domain::IPV6,
742    };
743
744    let fd = Socket::new(domain, socket2::Type::DGRAM, None)
745        .map_err(|e| e_fmt!("create socket failed: {}", e))?;
746
747    fd.set_reuse_address(true)
748        .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
749    #[cfg(unix)] // this is currently restricted to Unix's in socket2
750    fd.set_reuse_port(true)
751        .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
752
753    if non_block {
754        fd.set_nonblocking(true)
755            .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
756    }
757
758    fd.bind(&addr.into())
759        .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
760
761    trace!("new socket bind to {}", &addr);
762    Ok(fd)
763}
764
765/// Specify a UNIX timestamp in millis to run `command` for the next time.
766struct ReRun {
767    /// UNIX timestamp in millis.
768    next_time: u64,
769    command: Command,
770}
771
772/// Enum to represent the IP version.
773#[derive(Debug, Eq, Hash, PartialEq)]
774enum IpVersion {
775    V4,
776    V6,
777}
778
779/// A struct to track multicast send status for a network interface.
780#[derive(Debug, Eq, Hash, PartialEq)]
781struct MulticastSendTracker {
782    intf_index: u32,
783    ip_version: IpVersion,
784}
785
786/// Returns the multicast send tracker if the interface index is valid
787fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
788    match intf.index {
789        Some(index) => {
790            let ip_ver = match intf.addr {
791                IfAddr::V4(_) => IpVersion::V4,
792                IfAddr::V6(_) => IpVersion::V6,
793            };
794            Some(MulticastSendTracker {
795                intf_index: index,
796                ip_version: ip_ver,
797            })
798        }
799        None => None,
800    }
801}
802
803/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
804///
805/// Note that for ergonomic reasons, `From<&str>` and `From<IpAddr>` are implemented.
806#[derive(Debug, Clone)]
807#[non_exhaustive]
808pub enum IfKind {
809    /// All interfaces.
810    All,
811
812    /// All IPv4 interfaces.
813    IPv4,
814
815    /// All IPv6 interfaces.
816    IPv6,
817
818    /// By the interface name, for example "en0"
819    Name(String),
820
821    /// By an IPv4 or IPv6 address.
822    Addr(IpAddr),
823
824    /// 127.0.0.1 (or anything in 127.0.0.0/8), disabled by default.
825    ///
826    /// Use [ServiceDaemon::enable_interface] to support registering services on loopback interfaces,
827    /// which is required by some use cases (e.g., OSCQuery) that publish via mDNS.
828    LoopbackV4,
829
830    /// ::1/128, disabled by default.
831    LoopbackV6,
832}
833
834impl IfKind {
835    /// Checks if `intf` matches with this interface kind.
836    fn matches(&self, intf: &Interface) -> bool {
837        match self {
838            Self::All => true,
839            Self::IPv4 => intf.ip().is_ipv4(),
840            Self::IPv6 => intf.ip().is_ipv6(),
841            Self::Name(ifname) => ifname == &intf.name,
842            Self::Addr(addr) => addr == &intf.ip(),
843            Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
844            Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
845        }
846    }
847}
848
849/// The first use case of specifying an interface was to
850/// use an interface name. Hence adding this for ergonomic reasons.
851impl From<&str> for IfKind {
852    fn from(val: &str) -> Self {
853        Self::Name(val.to_string())
854    }
855}
856
857impl From<&String> for IfKind {
858    fn from(val: &String) -> Self {
859        Self::Name(val.to_string())
860    }
861}
862
863/// Still for ergonomic reasons.
864impl From<IpAddr> for IfKind {
865    fn from(val: IpAddr) -> Self {
866        Self::Addr(val)
867    }
868}
869
870/// A list of `IfKind` that can be used to match interfaces.
871pub struct IfKindVec {
872    kinds: Vec<IfKind>,
873}
874
875/// A trait that converts a type into a Vec of `IfKind`.
876pub trait IntoIfKindVec {
877    fn into_vec(self) -> IfKindVec;
878}
879
880impl<T: Into<IfKind>> IntoIfKindVec for T {
881    fn into_vec(self) -> IfKindVec {
882        let if_kind: IfKind = self.into();
883        IfKindVec {
884            kinds: vec![if_kind],
885        }
886    }
887}
888
889impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
890    fn into_vec(self) -> IfKindVec {
891        let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
892        IfKindVec { kinds }
893    }
894}
895
896/// Selection of interfaces.
897struct IfSelection {
898    /// The interfaces to be selected.
899    if_kind: IfKind,
900
901    /// Whether the `if_kind` should be enabled or not.
902    selected: bool,
903}
904
905/// A struct holding the state. It was inspired by `zeroconf` package in Python.
906struct Zeroconf {
907    /// Local interfaces with sockets to recv/send on these interfaces.
908    intf_socks: HashMap<Interface, MioUdpSocket>,
909
910    /// Map poll id to Interface.
911    poll_ids: HashMap<usize, Interface>,
912
913    /// Next poll id value
914    poll_id_count: usize,
915
916    /// Local registered services, keyed by service full names.
917    my_services: HashMap<String, ServiceInfo>,
918
919    /// Received DNS records.
920    cache: DnsCache,
921
922    /// Registered service records.
923    dns_registry_map: HashMap<Interface, DnsRegistry>,
924
925    /// Active "Browse" commands.
926    service_queriers: HashMap<String, Sender<ServiceEvent>>, // <ty_domain, channel::sender>
927
928    /// Active "ResolveHostname" commands.
929    ///
930    /// The timestamps are set at the future timestamp when the command should timeout.
931    /// `hostname` is case-insensitive and stored in lowercase.
932    hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, // <hostname, (channel::sender, UNIX timestamp in millis)>
933
934    /// All repeating transmissions.
935    retransmissions: Vec<ReRun>,
936
937    counters: Metrics,
938
939    /// Waits for incoming packets.
940    poller: Poll,
941
942    /// Channels to notify events.
943    monitors: Vec<Sender<DaemonEvent>>,
944
945    /// Options
946    service_name_len_max: u8,
947
948    /// Interval in millis to check IP address changes.
949    ip_check_interval: u64,
950
951    /// All interface selections called to the daemon.
952    if_selections: Vec<IfSelection>,
953
954    /// Socket for signaling.
955    signal_sock: MioUdpSocket,
956
957    /// Timestamps marking where we need another iteration of the run loop,
958    /// to react to events like retransmissions, cache refreshes, interface IP address changes, etc.
959    ///
960    /// When the run loop goes through a single iteration, it will
961    /// set its timeout to the earliest timer in this list.
962    timers: BinaryHeap<Reverse<u64>>,
963
964    status: DaemonStatus,
965
966    /// Service instances that are pending for resolving SRV and TXT.
967    pending_resolves: HashSet<String>,
968
969    /// Service instances that are already resolved.
970    resolved: HashSet<String>,
971
972    multicast_loop_v4: bool,
973
974    multicast_loop_v6: bool,
975}
976
977impl Zeroconf {
978    fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
979        // Get interfaces.
980        let my_ifaddrs = my_ip_interfaces(false);
981
982        // Create a socket for every IP addr.
983        // Note: it is possible that `my_ifaddrs` contains the same IP addr with different interface names,
984        // or the same interface name with different IP addrs.
985        let mut intf_socks = HashMap::new();
986        let mut dns_registry_map = HashMap::new();
987
988        for intf in my_ifaddrs {
989            let sock = match new_socket_bind(&intf, true) {
990                Ok(s) => s,
991                Err(e) => {
992                    trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
993                    continue;
994                }
995            };
996
997            dns_registry_map.insert(intf.clone(), DnsRegistry::new());
998
999            intf_socks.insert(intf, sock);
1000        }
1001
1002        let monitors = Vec::new();
1003        let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1004        let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1005
1006        let timers = BinaryHeap::new();
1007
1008        // Disable loopback by default.
1009        let if_selections = vec![
1010            IfSelection {
1011                if_kind: IfKind::LoopbackV4,
1012                selected: false,
1013            },
1014            IfSelection {
1015                if_kind: IfKind::LoopbackV6,
1016                selected: false,
1017            },
1018        ];
1019
1020        let status = DaemonStatus::Running;
1021
1022        Self {
1023            intf_socks,
1024            poll_ids: HashMap::new(),
1025            poll_id_count: 0,
1026            my_services: HashMap::new(),
1027            cache: DnsCache::new(),
1028            dns_registry_map,
1029            hostname_resolvers: HashMap::new(),
1030            service_queriers: HashMap::new(),
1031            retransmissions: Vec::new(),
1032            counters: HashMap::new(),
1033            poller,
1034            monitors,
1035            service_name_len_max,
1036            ip_check_interval,
1037            if_selections,
1038            signal_sock,
1039            timers,
1040            status,
1041            pending_resolves: HashSet::new(),
1042            resolved: HashSet::new(),
1043            multicast_loop_v4: true,
1044            multicast_loop_v6: true,
1045        }
1046    }
1047
1048    fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1049        match daemon_opt {
1050            DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1051            DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1052            DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1053            DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1054            DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1055            DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1056        }
1057    }
1058
1059    fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1060        for if_kind in kinds {
1061            self.if_selections.push(IfSelection {
1062                if_kind,
1063                selected: true,
1064            });
1065        }
1066
1067        self.apply_intf_selections(my_ip_interfaces(true));
1068    }
1069
1070    fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1071        for if_kind in kinds {
1072            self.if_selections.push(IfSelection {
1073                if_kind,
1074                selected: false,
1075            });
1076        }
1077
1078        self.apply_intf_selections(my_ip_interfaces(true));
1079    }
1080
1081    fn set_multicast_loop_v4(&mut self, on: bool) {
1082        for (_, sock) in self.intf_socks.iter_mut() {
1083            if let Err(e) = sock.set_multicast_loop_v4(on) {
1084                debug!("failed to set multicast loop v4: {e}");
1085            }
1086        }
1087    }
1088
1089    fn set_multicast_loop_v6(&mut self, on: bool) {
1090        for (_, sock) in self.intf_socks.iter_mut() {
1091            if let Err(e) = sock.set_multicast_loop_v6(on) {
1092                debug!("failed to set multicast loop v6: {e}");
1093            }
1094        }
1095    }
1096
1097    fn notify_monitors(&mut self, event: DaemonEvent) {
1098        // Only retain the monitors that are still connected.
1099        self.monitors.retain(|sender| {
1100            if let Err(e) = sender.try_send(event.clone()) {
1101                debug!("notify_monitors: try_send: {}", &e);
1102                if matches!(e, TrySendError::Disconnected(_)) {
1103                    return false; // This monitor is dropped.
1104                }
1105            }
1106            true
1107        });
1108    }
1109
1110    /// Remove `addr` in my services that enabled `addr_auto`.
1111    fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1112        for (_, service_info) in self.my_services.iter_mut() {
1113            if service_info.is_addr_auto() {
1114                service_info.remove_ipaddr(addr);
1115            }
1116        }
1117    }
1118
1119    /// Insert a new interface into the poll map and return key
1120    fn add_poll(&mut self, intf: Interface) -> usize {
1121        Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1122    }
1123
1124    /// Insert a new interface into the poll map and return its key.
1125    ///
1126    /// This exists to satisfy the borrow checker
1127    fn add_poll_impl(
1128        poll_ids: &mut HashMap<usize, Interface>,
1129        poll_id_count: &mut usize,
1130        intf: Interface,
1131    ) -> usize {
1132        let key = *poll_id_count;
1133        *poll_id_count += 1;
1134        let _ = (*poll_ids).insert(key, intf);
1135        key
1136    }
1137
1138    fn add_timer(&mut self, next_time: u64) {
1139        self.timers.push(Reverse(next_time));
1140    }
1141
1142    fn peek_earliest_timer(&self) -> Option<u64> {
1143        self.timers.peek().map(|Reverse(v)| *v)
1144    }
1145
1146    fn _pop_earliest_timer(&mut self) -> Option<u64> {
1147        self.timers.pop().map(|Reverse(v)| v)
1148    }
1149
1150    /// Pop all timers that are already passed till `now`.
1151    fn pop_timers_till(&mut self, now: u64) {
1152        while let Some(Reverse(v)) = self.timers.peek() {
1153            if *v > now {
1154                break;
1155            }
1156            self.timers.pop();
1157        }
1158    }
1159
1160    /// Apply all selections to `interfaces` and return the selected addresses.
1161    fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1162        let intf_count = interfaces.len();
1163        let mut intf_selections = vec![true; intf_count];
1164
1165        // apply if_selections
1166        for selection in self.if_selections.iter() {
1167            // Mark the interfaces for this selection.
1168            for i in 0..intf_count {
1169                if selection.if_kind.matches(&interfaces[i]) {
1170                    intf_selections[i] = selection.selected;
1171                }
1172            }
1173        }
1174
1175        let mut selected_addrs = HashSet::new();
1176        for i in 0..intf_count {
1177            if intf_selections[i] {
1178                selected_addrs.insert(interfaces[i].addr.ip());
1179            }
1180        }
1181
1182        selected_addrs
1183    }
1184
1185    /// Apply all selections to `interfaces`.
1186    ///
1187    /// For any interface, add it if selected but not bound yet,
1188    /// delete it if not selected but still bound.
1189    fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1190        // By default, we enable all interfaces.
1191        let intf_count = interfaces.len();
1192        let mut intf_selections = vec![true; intf_count];
1193
1194        // apply if_selections
1195        for selection in self.if_selections.iter() {
1196            // Mark the interfaces for this selection.
1197            for i in 0..intf_count {
1198                if selection.if_kind.matches(&interfaces[i]) {
1199                    intf_selections[i] = selection.selected;
1200                }
1201            }
1202        }
1203
1204        // Update `intf_socks` based on the selections.
1205        for (idx, intf) in interfaces.into_iter().enumerate() {
1206            if intf_selections[idx] {
1207                // Add the interface
1208                if !self.intf_socks.contains_key(&intf) {
1209                    debug!("apply_intf_selections: add {:?}", &intf.ip());
1210                    self.add_new_interface(intf);
1211                }
1212            } else {
1213                // Remove the interface
1214                if let Some(mut sock) = self.intf_socks.remove(&intf) {
1215                    match self.poller.registry().deregister(&mut sock) {
1216                        Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1217                        Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1218                    }
1219
1220                    // Remove from poll_ids
1221                    self.poll_ids.retain(|_, v| v != &intf);
1222
1223                    // Remove cache records for this interface.
1224                    self.cache.remove_addrs_on_disabled_intf(&intf);
1225                }
1226            }
1227        }
1228    }
1229
1230    /// Check for IP changes and update intf_socks as needed.
1231    fn check_ip_changes(&mut self) {
1232        // Get the current interfaces.
1233        let my_ifaddrs = my_ip_interfaces(true);
1234
1235        let poll_ids = &mut self.poll_ids;
1236        let poller = &mut self.poller;
1237        // Remove unused sockets in the poller.
1238        let deleted_addrs = self
1239            .intf_socks
1240            .iter_mut()
1241            .filter_map(|(intf, sock)| {
1242                if !my_ifaddrs.contains(intf) {
1243                    if let Err(e) = poller.registry().deregister(sock) {
1244                        debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1245                    }
1246                    // Remove from poll_ids
1247                    poll_ids.retain(|_, v| v != intf);
1248                    Some(intf.ip())
1249                } else {
1250                    None
1251                }
1252            })
1253            .collect::<Vec<IpAddr>>();
1254
1255        // Remove deleted addrs from my services that enabled `addr_auto`.
1256        for ip in deleted_addrs.iter() {
1257            self.del_addr_in_my_services(ip);
1258            self.notify_monitors(DaemonEvent::IpDel(*ip));
1259        }
1260
1261        // Keep the interfaces only if they still exist.
1262        self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1263
1264        // Add newly found interfaces only if in our selections.
1265        self.apply_intf_selections(my_ifaddrs);
1266    }
1267
1268    fn add_new_interface(&mut self, intf: Interface) {
1269        // Bind the new interface.
1270        let new_ip = intf.ip();
1271        let should_loop = if new_ip.is_ipv4() {
1272            self.multicast_loop_v4
1273        } else {
1274            self.multicast_loop_v6
1275        };
1276        let mut sock = match new_socket_bind(&intf, should_loop) {
1277            Ok(s) => s,
1278            Err(e) => {
1279                debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1280                return;
1281            }
1282        };
1283
1284        // Add the new interface into the poller.
1285        let key = self.add_poll(intf.clone());
1286        if let Err(e) =
1287            self.poller
1288                .registry()
1289                .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1290        {
1291            debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1292            return;
1293        }
1294
1295        debug!("add new interface {}: {new_ip}", intf.name);
1296        let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1297            Some(registry) => registry,
1298            None => self
1299                .dns_registry_map
1300                .entry(intf.clone())
1301                .or_insert_with(DnsRegistry::new),
1302        };
1303
1304        for (_, service_info) in self.my_services.iter_mut() {
1305            if service_info.is_addr_auto() {
1306                service_info.insert_ipaddr(new_ip);
1307
1308                if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1309                    debug!(
1310                        "Announce service {} on {}",
1311                        service_info.get_fullname(),
1312                        intf.ip()
1313                    );
1314                    service_info.set_status(&intf, ServiceStatus::Announced);
1315                } else {
1316                    for timer in dns_registry.new_timers.drain(..) {
1317                        self.timers.push(Reverse(timer));
1318                    }
1319                    service_info.set_status(&intf, ServiceStatus::Probing);
1320                }
1321            }
1322        }
1323
1324        self.intf_socks.insert(intf, sock);
1325
1326        // Notify the monitors.
1327        self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1328    }
1329
1330    /// Registers a service.
1331    ///
1332    /// RFC 6762 section 8.3.
1333    /// ...the Multicast DNS responder MUST send
1334    ///    an unsolicited Multicast DNS response containing, in the Answer
1335    ///    Section, all of its newly registered resource records
1336    ///
1337    /// Zeroconf will then respond to requests for information about this service.
1338    fn register_service(&mut self, mut info: ServiceInfo) {
1339        // Check the service name length.
1340        if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1341            debug!("check_service_name_length: {}", &e);
1342            self.notify_monitors(DaemonEvent::Error(e));
1343            return;
1344        }
1345
1346        if info.is_addr_auto() {
1347            let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1348            for addr in selected_addrs {
1349                info.insert_ipaddr(addr);
1350            }
1351        }
1352
1353        debug!("register service {:?}", &info);
1354
1355        let outgoing_addrs = self.send_unsolicited_response(&mut info);
1356        if !outgoing_addrs.is_empty() {
1357            self.notify_monitors(DaemonEvent::Announce(
1358                info.get_fullname().to_string(),
1359                format!("{:?}", &outgoing_addrs),
1360            ));
1361        }
1362
1363        // The key has to be lower case letter as DNS record name is case insensitive.
1364        // The info will have the original name.
1365        let service_fullname = info.get_fullname().to_lowercase();
1366        self.my_services.insert(service_fullname, info);
1367    }
1368
1369    /// Sends out announcement of `info` on every valid interface.
1370    /// Returns the list of interface IPs that sent out the announcement.
1371    fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1372        let mut outgoing_addrs = Vec::new();
1373        // Send the announcement on one interface per ip version.
1374        let mut multicast_sent_trackers = HashSet::new();
1375
1376        let mut outgoing_intfs = Vec::new();
1377
1378        for (intf, sock) in self.intf_socks.iter() {
1379            if let Some(tracker) = multicast_send_tracker(intf) {
1380                if multicast_sent_trackers.contains(&tracker) {
1381                    continue; // No need to send again on the same interface with same ip version.
1382                }
1383            }
1384
1385            let dns_registry = match self.dns_registry_map.get_mut(intf) {
1386                Some(registry) => registry,
1387                None => self
1388                    .dns_registry_map
1389                    .entry(intf.clone())
1390                    .or_insert_with(DnsRegistry::new),
1391            };
1392
1393            if announce_service_on_intf(dns_registry, info, intf, sock) {
1394                if let Some(tracker) = multicast_send_tracker(intf) {
1395                    multicast_sent_trackers.insert(tracker);
1396                }
1397                outgoing_addrs.push(intf.ip());
1398                outgoing_intfs.push(intf.clone());
1399
1400                debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1401
1402                info.set_status(intf, ServiceStatus::Announced);
1403            } else {
1404                for timer in dns_registry.new_timers.drain(..) {
1405                    self.timers.push(Reverse(timer));
1406                }
1407                info.set_status(intf, ServiceStatus::Probing);
1408            }
1409        }
1410
1411        // RFC 6762 section 8.3.
1412        // ..The Multicast DNS responder MUST send at least two unsolicited
1413        //    responses, one second apart.
1414        let next_time = current_time_millis() + 1000;
1415        for intf in outgoing_intfs {
1416            self.add_retransmission(
1417                next_time,
1418                Command::RegisterResend(info.get_fullname().to_string(), intf),
1419            );
1420        }
1421
1422        outgoing_addrs
1423    }
1424
1425    /// Send probings or finish them if expired. Notify waiting services.
1426    fn probing_handler(&mut self) {
1427        let now = current_time_millis();
1428
1429        for (intf, sock) in self.intf_socks.iter() {
1430            let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1431                continue;
1432            };
1433
1434            let mut expired_probe_names = Vec::new();
1435            let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1436
1437            for (name, probe) in dns_registry.probing.iter_mut() {
1438                if now >= probe.next_send {
1439                    if probe.expired(now) {
1440                        // move the record to active
1441                        expired_probe_names.push(name.clone());
1442                    } else {
1443                        out.add_question(name, RRType::ANY);
1444
1445                        /*
1446                        RFC 6762 section 8.2: https://datatracker.ietf.org/doc/html/rfc6762#section-8.2
1447                        ...
1448                        for tiebreaking to work correctly in all
1449                        cases, the Authority Section must contain *all* the records and
1450                        proposed rdata being probed for uniqueness.
1451                         */
1452                        for record in probe.records.iter() {
1453                            out.add_authority(record.clone());
1454                        }
1455
1456                        probe.update_next_send(now);
1457
1458                        // add timer
1459                        self.timers.push(Reverse(probe.next_send));
1460                    }
1461                }
1462            }
1463
1464            // send probing.
1465            if !out.questions().is_empty() {
1466                debug!("sending out probing of {} questions", out.questions().len());
1467                send_dns_outgoing(&out, intf, sock);
1468            }
1469
1470            let mut waiting_services = HashSet::new();
1471
1472            for name in expired_probe_names {
1473                let Some(probe) = dns_registry.probing.remove(&name) else {
1474                    continue;
1475                };
1476
1477                // send notifications about name changes
1478                for record in probe.records.iter() {
1479                    if let Some(new_name) = record.get_record().get_new_name() {
1480                        dns_registry
1481                            .name_changes
1482                            .insert(name.clone(), new_name.to_string());
1483
1484                        let event = DnsNameChange {
1485                            original: record.get_record().get_original_name().to_string(),
1486                            new_name: new_name.to_string(),
1487                            rr_type: record.get_type(),
1488                            intf_name: intf.name.to_string(),
1489                        };
1490                        notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1491                    }
1492                }
1493
1494                // move RR from probe to active.
1495                debug!(
1496                    "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1497                    probe.records.len(),
1498                    probe.waiting_services.len(),
1499                );
1500
1501                // Move records to active and plan to wake up services if records are not empty.
1502                if !probe.records.is_empty() {
1503                    match dns_registry.active.get_mut(&name) {
1504                        Some(records) => {
1505                            records.extend(probe.records);
1506                        }
1507                        None => {
1508                            dns_registry.active.insert(name, probe.records);
1509                        }
1510                    }
1511
1512                    waiting_services.extend(probe.waiting_services);
1513                }
1514            }
1515
1516            // wake up services waiting.
1517            for service_name in waiting_services {
1518                debug!(
1519                    "try to announce service {service_name} on intf {}",
1520                    intf.ip()
1521                );
1522                // service names are lowercase
1523                if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1524                    if info.get_status(intf) == ServiceStatus::Announced {
1525                        debug!("service {} already announced", info.get_fullname());
1526                        continue;
1527                    }
1528
1529                    if announce_service_on_intf(dns_registry, info, intf, sock) {
1530                        let next_time = now + 1000;
1531                        let command =
1532                            Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1533                        self.retransmissions.push(ReRun { next_time, command });
1534                        self.timers.push(Reverse(next_time));
1535
1536                        let fullname = match dns_registry.name_changes.get(&service_name) {
1537                            Some(new_name) => new_name.to_string(),
1538                            None => service_name.to_string(),
1539                        };
1540
1541                        let mut hostname = info.get_hostname();
1542                        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1543                            hostname = new_name;
1544                        }
1545
1546                        debug!("wake up: announce service {} on {}", fullname, intf.ip());
1547                        notify_monitors(
1548                            &mut self.monitors,
1549                            DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1550                        );
1551
1552                        info.set_status(intf, ServiceStatus::Announced);
1553                    }
1554                }
1555            }
1556        }
1557    }
1558
1559    fn unregister_service(
1560        &self,
1561        info: &ServiceInfo,
1562        intf: &Interface,
1563        sock: &MioUdpSocket,
1564    ) -> Vec<u8> {
1565        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1566        out.add_answer_at_time(
1567            DnsPointer::new(
1568                info.get_type(),
1569                RRType::PTR,
1570                CLASS_IN,
1571                0,
1572                info.get_fullname().to_string(),
1573            ),
1574            0,
1575        );
1576
1577        if let Some(sub) = info.get_subtype() {
1578            trace!("Adding subdomain {}", sub);
1579            out.add_answer_at_time(
1580                DnsPointer::new(
1581                    sub,
1582                    RRType::PTR,
1583                    CLASS_IN,
1584                    0,
1585                    info.get_fullname().to_string(),
1586                ),
1587                0,
1588            );
1589        }
1590
1591        out.add_answer_at_time(
1592            DnsSrv::new(
1593                info.get_fullname(),
1594                CLASS_IN | CLASS_CACHE_FLUSH,
1595                0,
1596                info.get_priority(),
1597                info.get_weight(),
1598                info.get_port(),
1599                info.get_hostname().to_string(),
1600            ),
1601            0,
1602        );
1603        out.add_answer_at_time(
1604            DnsTxt::new(
1605                info.get_fullname(),
1606                CLASS_IN | CLASS_CACHE_FLUSH,
1607                0,
1608                info.generate_txt(),
1609            ),
1610            0,
1611        );
1612
1613        for address in info.get_addrs_on_intf(intf) {
1614            out.add_answer_at_time(
1615                DnsAddress::new(
1616                    info.get_hostname(),
1617                    ip_address_rr_type(&address),
1618                    CLASS_IN | CLASS_CACHE_FLUSH,
1619                    0,
1620                    address,
1621                    intf.into(),
1622                ),
1623                0,
1624            );
1625        }
1626
1627        // `out` data is non-empty, hence we can do this.
1628        send_dns_outgoing(&out, intf, sock).remove(0)
1629    }
1630
1631    /// Binds a channel `listener` to querying mDNS hostnames.
1632    ///
1633    /// If there is already a `listener`, it will be updated, i.e. overwritten.
1634    fn add_hostname_resolver(
1635        &mut self,
1636        hostname: String,
1637        listener: Sender<HostnameResolutionEvent>,
1638        timeout: Option<u64>,
1639    ) {
1640        let real_timeout = timeout.map(|t| current_time_millis() + t);
1641        self.hostname_resolvers
1642            .insert(hostname.to_lowercase(), (listener, real_timeout));
1643        if let Some(t) = real_timeout {
1644            self.add_timer(t);
1645        }
1646    }
1647
1648    /// Sends a multicast query for `name` with `qtype`.
1649    fn send_query(&self, name: &str, qtype: RRType) {
1650        self.send_query_vec(&[(name, qtype)]);
1651    }
1652
1653    /// Sends out a list of `questions` (i.e. DNS questions) via multicast.
1654    fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1655        trace!("Sending query questions: {:?}", questions);
1656        let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1657        let now = current_time_millis();
1658
1659        for (name, qtype) in questions {
1660            out.add_question(name, *qtype);
1661
1662            for record in self.cache.get_known_answers(name, *qtype, now) {
1663                /*
1664                RFC 6762 section 7.1: https://datatracker.ietf.org/doc/html/rfc6762#section-7.1
1665                ...
1666                    When a Multicast DNS querier sends a query to which it already knows
1667                    some answers, it populates the Answer Section of the DNS query
1668                    message with those answers.
1669                 */
1670                trace!("add known answer: {:?}", record);
1671                let mut new_record = record.clone();
1672                new_record.get_record_mut().update_ttl(now);
1673                out.add_answer_box(new_record);
1674            }
1675        }
1676
1677        // Send the query on one interface per ip version.
1678        let mut multicast_sent_trackers = HashSet::new();
1679        for (intf, sock) in self.intf_socks.iter() {
1680            if let Some(tracker) = multicast_send_tracker(intf) {
1681                if multicast_sent_trackers.contains(&tracker) {
1682                    continue; // no need to send query the same interface with same ip version.
1683                }
1684                multicast_sent_trackers.insert(tracker);
1685            }
1686            send_dns_outgoing(&out, intf, sock);
1687        }
1688    }
1689
1690    /// Reads one UDP datagram from the socket of `intf`.
1691    ///
1692    /// Returns false if failed to receive a packet,
1693    /// otherwise returns true.
1694    fn handle_read(&mut self, intf: &Interface) -> bool {
1695        let sock = match self.intf_socks.get_mut(intf) {
1696            Some(if_sock) => if_sock,
1697            None => return false,
1698        };
1699        let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1700
1701        // Read the next mDNS UDP datagram.
1702        //
1703        // If the datagram is larger than `buf`, excess bytes may or may not
1704        // be truncated by the socket layer depending on the platform's libc.
1705        // In any case, such large datagram will not be decoded properly and
1706        // this function should return false but should not crash.
1707        let sz = match sock.recv(&mut buf) {
1708            Ok(sz) => sz,
1709            Err(e) => {
1710                if e.kind() != std::io::ErrorKind::WouldBlock {
1711                    debug!("listening socket read failed: {}", e);
1712                }
1713                return false;
1714            }
1715        };
1716
1717        trace!("received {} bytes at IP: {}", sz, intf.ip());
1718
1719        // If sz is 0, it means sock reached End-of-File.
1720        if sz == 0 {
1721            debug!("socket {:?} was likely shutdown", &sock);
1722            if let Err(e) = self.poller.registry().deregister(sock) {
1723                debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1724            }
1725
1726            // Replace the closed socket with a new one.
1727            let should_loop = if intf.ip().is_ipv4() {
1728                self.multicast_loop_v4
1729            } else {
1730                self.multicast_loop_v6
1731            };
1732            match new_socket_bind(intf, should_loop) {
1733                Ok(new_sock) => {
1734                    trace!("reset socket for IP {}", intf.ip());
1735                    self.intf_socks.insert(intf.clone(), new_sock);
1736                }
1737                Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1738            }
1739
1740            return false;
1741        }
1742
1743        buf.truncate(sz); // reduce potential processing errors
1744
1745        match DnsIncoming::new(buf, intf.into()) {
1746            Ok(msg) => {
1747                if msg.is_query() {
1748                    self.handle_query(msg, intf);
1749                } else if msg.is_response() {
1750                    self.handle_response(msg, intf);
1751                } else {
1752                    debug!("Invalid message: not query and not response");
1753                }
1754            }
1755            Err(e) => debug!("Invalid incoming DNS message: {}", e),
1756        }
1757
1758        true
1759    }
1760
1761    /// Returns true, if sent query. Returns false if SRV already exists.
1762    fn query_unresolved(&mut self, instance: &str) -> bool {
1763        if !valid_instance_name(instance) {
1764            trace!("instance name {} not valid", instance);
1765            return false;
1766        }
1767
1768        if let Some(records) = self.cache.get_srv(instance) {
1769            for record in records {
1770                if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1771                    if self.cache.get_addr(srv.host()).is_none() {
1772                        self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1773                        return true;
1774                    }
1775                }
1776            }
1777        } else {
1778            self.send_query(instance, RRType::ANY);
1779            return true;
1780        }
1781
1782        false
1783    }
1784
1785    /// Checks if `ty_domain` has records in the cache. If yes, sends the
1786    /// cached records via `sender`.
1787    fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1788        let mut resolved: HashSet<String> = HashSet::new();
1789        let mut unresolved: HashSet<String> = HashSet::new();
1790
1791        if let Some(records) = self.cache.get_ptr(ty_domain) {
1792            for record in records.iter() {
1793                if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1794                    let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1795                        Ok(ok) => ok,
1796                        Err(err) => {
1797                            debug!("Error while creating service info from cache: {}", err);
1798                            continue;
1799                        }
1800                    };
1801
1802                    match sender.send(ServiceEvent::ServiceFound(
1803                        ty_domain.to_string(),
1804                        ptr.alias().to_string(),
1805                    )) {
1806                        Ok(()) => debug!("send service found {}", ptr.alias()),
1807                        Err(e) => {
1808                            debug!("failed to send service found: {}", e);
1809                            continue;
1810                        }
1811                    }
1812
1813                    if info.is_ready() {
1814                        resolved.insert(ptr.alias().to_string());
1815                        match sender.send(ServiceEvent::ServiceResolved(info)) {
1816                            Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1817                            Err(e) => debug!("failed to send service resolved: {}", e),
1818                        }
1819                    } else {
1820                        unresolved.insert(ptr.alias().to_string());
1821                    }
1822                }
1823            }
1824        }
1825
1826        for instance in resolved.drain() {
1827            self.pending_resolves.remove(&instance);
1828            self.resolved.insert(instance);
1829        }
1830
1831        for instance in unresolved.drain() {
1832            self.add_pending_resolve(instance);
1833        }
1834    }
1835
1836    /// Checks if `hostname` has records in the cache. If yes, sends the
1837    /// cached records via `sender`.
1838    fn query_cache_for_hostname(
1839        &mut self,
1840        hostname: &str,
1841        sender: Sender<HostnameResolutionEvent>,
1842    ) {
1843        let addresses_map = self.cache.get_addresses_for_host(hostname);
1844        for (name, addresses) in addresses_map {
1845            match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1846                Ok(()) => trace!("sent hostname addresses found"),
1847                Err(e) => debug!("failed to send hostname addresses found: {}", e),
1848            }
1849        }
1850    }
1851
1852    fn add_pending_resolve(&mut self, instance: String) {
1853        if !self.pending_resolves.contains(&instance) {
1854            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1855            self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1856            self.pending_resolves.insert(instance);
1857        }
1858    }
1859
1860    fn create_service_info_from_cache(
1861        &self,
1862        ty_domain: &str,
1863        fullname: &str,
1864    ) -> Result<ServiceInfo> {
1865        let my_name = {
1866            let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1867            name.strip_suffix('.').unwrap_or(name).to_string()
1868        };
1869
1870        let now = current_time_millis();
1871        let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1872
1873        // Be sure setting `subtype` if available even when querying for the parent domain.
1874        if let Some(subtype) = self.cache.get_subtype(fullname) {
1875            trace!(
1876                "ty_domain: {} found subtype {} for instance: {}",
1877                ty_domain,
1878                subtype,
1879                fullname
1880            );
1881            if info.get_subtype().is_none() {
1882                info.set_subtype(subtype.clone());
1883            }
1884        }
1885
1886        // resolve SRV record
1887        if let Some(records) = self.cache.get_srv(fullname) {
1888            if let Some(answer) = records.first() {
1889                if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1890                    info.set_hostname(dns_srv.host().to_string());
1891                    info.set_port(dns_srv.port());
1892                }
1893            }
1894        }
1895
1896        // resolve TXT record
1897        if let Some(records) = self.cache.get_txt(fullname) {
1898            if let Some(record) = records.first() {
1899                if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1900                    info.set_properties_from_txt(dns_txt.text());
1901                }
1902            }
1903        }
1904
1905        // resolve A and AAAA records
1906        if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1907            for answer in records.iter() {
1908                if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1909                    if dns_a.get_record().is_expired(now) {
1910                        trace!("Addr expired: {}", dns_a.address());
1911                    } else {
1912                        info.insert_ipaddr(dns_a.address());
1913                    }
1914                }
1915            }
1916        }
1917
1918        Ok(info)
1919    }
1920
1921    fn handle_poller_events(&mut self, events: &mio::Events) {
1922        for ev in events.iter() {
1923            trace!("event received with key {:?}", ev.token());
1924            if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1925                // Drain signals as we will drain commands as well.
1926                self.signal_sock_drain();
1927
1928                if let Err(e) = self.poller.registry().reregister(
1929                    &mut self.signal_sock,
1930                    ev.token(),
1931                    mio::Interest::READABLE,
1932                ) {
1933                    debug!("failed to modify poller for signal socket: {}", e);
1934                }
1935                continue; // Next event.
1936            }
1937
1938            // Read until no more packets available.
1939            let intf = match self.poll_ids.get(&ev.token().0) {
1940                Some(interface) => interface.clone(),
1941                None => {
1942                    debug!("Ip for event key {} not found", ev.token().0);
1943                    break;
1944                }
1945            };
1946            while self.handle_read(&intf) {}
1947
1948            // we continue to monitor this socket.
1949            if let Some(sock) = self.intf_socks.get_mut(&intf) {
1950                if let Err(e) =
1951                    self.poller
1952                        .registry()
1953                        .reregister(sock, ev.token(), mio::Interest::READABLE)
1954                {
1955                    debug!("modify poller for interface {:?}: {}", &intf, e);
1956                    break;
1957                }
1958            }
1959        }
1960    }
1961
1962    /// Deal with incoming response packets.  All answers
1963    /// are held in the cache, and listeners are notified.
1964    fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1965        trace!(
1966            "handle_response: {} answers {} authorities {} additionals",
1967            msg.answers().len(),
1968            &msg.authorities().len(),
1969            &msg.num_additionals()
1970        );
1971        let now = current_time_millis();
1972
1973        // remove records that are expired.
1974        let mut record_predicate = |record: &DnsRecordBox| {
1975            if !record.get_record().is_expired(now) {
1976                return true;
1977            }
1978
1979            debug!("record is expired, removing it from cache.");
1980            if self.cache.remove(record) {
1981                // for PTR records, send event to listeners
1982                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1983                    call_service_listener(
1984                        &self.service_queriers,
1985                        dns_ptr.get_name(),
1986                        ServiceEvent::ServiceRemoved(
1987                            dns_ptr.get_name().to_string(),
1988                            dns_ptr.alias().to_string(),
1989                        ),
1990                    );
1991                }
1992            }
1993            false
1994        };
1995        msg.answers_mut().retain(&mut record_predicate);
1996        msg.authorities_mut().retain(&mut record_predicate);
1997        msg.additionals_mut().retain(&mut record_predicate);
1998
1999        // check possible conflicts and handle them.
2000        self.conflict_handler(&msg, intf);
2001
2002        // check if the message is for us.
2003        let mut is_for_us = true; // assume it is for us.
2004
2005        // If there are any PTR records in the answers, there should be
2006        // at least one PTR for us. Otherwise, the message is not for us.
2007        // If there are no PTR records at all, assume this message is for us.
2008        for answer in msg.answers() {
2009            if answer.get_type() == RRType::PTR {
2010                if self.service_queriers.contains_key(answer.get_name()) {
2011                    is_for_us = true;
2012                    break; // OK to break: at least one PTR for us.
2013                } else {
2014                    is_for_us = false;
2015                }
2016            } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2017                // If there is a hostname querier for this address, then it is for us.
2018                let answer_lowercase = answer.get_name().to_lowercase();
2019                if self.hostname_resolvers.contains_key(&answer_lowercase) {
2020                    is_for_us = true;
2021                    break; // OK to break: at least one hostname for us.
2022                }
2023            }
2024        }
2025
2026        /// Represents a DNS record change that involves one service instance.
2027        struct InstanceChange {
2028            ty: RRType,   // The type of DNS record for the instance.
2029            name: String, // The name of the record.
2030        }
2031
2032        // Go through all answers to get the new and updated records.
2033        // For new PTR records, send out ServiceFound immediately. For others,
2034        // collect them into `changes`.
2035        //
2036        // Note: we don't try to identify the update instances based on
2037        // each record immediately as the answers are likely related to each
2038        // other.
2039        let mut changes = Vec::new();
2040        let mut timers = Vec::new();
2041        for record in msg.all_records() {
2042            match self
2043                .cache
2044                .add_or_update(intf, record, &mut timers, is_for_us)
2045            {
2046                Some((dns_record, true)) => {
2047                    timers.push(dns_record.get_record().get_expire_time());
2048                    timers.push(dns_record.get_record().get_refresh_time());
2049
2050                    let ty = dns_record.get_type();
2051                    let name = dns_record.get_name();
2052                    if ty == RRType::PTR {
2053                        if self.service_queriers.contains_key(name) {
2054                            timers.push(dns_record.get_record().get_refresh_time());
2055                        }
2056
2057                        // send ServiceFound
2058                        if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2059                            call_service_listener(
2060                                &self.service_queriers,
2061                                name,
2062                                ServiceEvent::ServiceFound(
2063                                    name.to_string(),
2064                                    dns_ptr.alias().to_string(),
2065                                ),
2066                            );
2067                            changes.push(InstanceChange {
2068                                ty,
2069                                name: dns_ptr.alias().to_string(),
2070                            });
2071                        }
2072                    } else {
2073                        changes.push(InstanceChange {
2074                            ty,
2075                            name: name.to_string(),
2076                        });
2077                    }
2078                }
2079                Some((dns_record, false)) => {
2080                    timers.push(dns_record.get_record().get_expire_time());
2081                    timers.push(dns_record.get_record().get_refresh_time());
2082                }
2083                _ => {}
2084            }
2085        }
2086
2087        // Add timers for the new records.
2088        for t in timers {
2089            self.add_timer(t);
2090        }
2091
2092        // Go through remaining changes to see if any hostname resolutions were found or updated.
2093        for change in changes
2094            .iter()
2095            .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2096        {
2097            let addr_map = self.cache.get_addresses_for_host(&change.name);
2098            for (name, addresses) in addr_map {
2099                call_hostname_resolution_listener(
2100                    &self.hostname_resolvers,
2101                    &change.name,
2102                    HostnameResolutionEvent::AddressesFound(name, addresses),
2103                )
2104            }
2105        }
2106
2107        // Identify the instances that need to be "resolved".
2108        let mut updated_instances = HashSet::new();
2109        for update in changes {
2110            match update.ty {
2111                RRType::PTR | RRType::SRV | RRType::TXT => {
2112                    updated_instances.insert(update.name);
2113                }
2114                RRType::A | RRType::AAAA => {
2115                    let instances = self.cache.get_instances_on_host(&update.name);
2116                    updated_instances.extend(instances);
2117                }
2118                _ => {}
2119            }
2120        }
2121
2122        self.resolve_updated_instances(&updated_instances);
2123    }
2124
2125    fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2126        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2127            return;
2128        };
2129
2130        for answer in msg.answers().iter() {
2131            let mut new_records = Vec::new();
2132
2133            let name = answer.get_name();
2134            let Some(probe) = dns_registry.probing.get_mut(name) else {
2135                continue;
2136            };
2137
2138            // check against possible multicast forwarding
2139            if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2140                if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2141                    if !valid_ip_on_intf(&answer_addr.address(), intf) {
2142                        debug!(
2143                            "conflict handler: answer addr {:?} not in the subnet of {:?}",
2144                            answer_addr, intf
2145                        );
2146                        continue;
2147                    }
2148                }
2149
2150                // double check if any other address record matches rrdata,
2151                // as there could be multiple addresses for the same name.
2152                let any_match = probe.records.iter().any(|r| {
2153                    r.get_type() == answer.get_type()
2154                        && r.get_class() == answer.get_class()
2155                        && r.rrdata_match(answer.as_ref())
2156                });
2157                if any_match {
2158                    continue; // no conflict for this answer.
2159                }
2160            }
2161
2162            probe.records.retain(|record| {
2163                if record.get_type() == answer.get_type()
2164                    && record.get_class() == answer.get_class()
2165                    && !record.rrdata_match(answer.as_ref())
2166                {
2167                    debug!(
2168                        "found conflict name: '{name}' record: {}: {} PEER: {}",
2169                        record.get_type(),
2170                        record.rdata_print(),
2171                        answer.rdata_print()
2172                    );
2173
2174                    // create a new name for this record
2175                    // then remove the old record in probing.
2176                    let mut new_record = record.clone();
2177                    let new_name = match record.get_type() {
2178                        RRType::A => hostname_change(name),
2179                        RRType::AAAA => hostname_change(name),
2180                        _ => name_change(name),
2181                    };
2182                    new_record.get_record_mut().set_new_name(new_name);
2183                    new_records.push(new_record);
2184                    return false; // old record is dropped from the probe.
2185                }
2186
2187                true
2188            });
2189
2190            // ?????
2191            // if probe.records.is_empty() {
2192            //     dns_registry.probing.remove(name);
2193            // }
2194
2195            // Probing again with the new names.
2196            let create_time = current_time_millis() + fastrand::u64(0..250);
2197
2198            let waiting_services = probe.waiting_services.clone();
2199
2200            for record in new_records {
2201                if dns_registry.update_hostname(name, record.get_name(), create_time) {
2202                    self.timers.push(Reverse(create_time));
2203                }
2204
2205                // remember the name changes (note: `name` might not be the original, it could be already changed once.)
2206                dns_registry.name_changes.insert(
2207                    record.get_record().get_original_name().to_string(),
2208                    record.get_name().to_string(),
2209                );
2210
2211                let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2212                    Some(p) => p,
2213                    None => {
2214                        let new_probe = dns_registry
2215                            .probing
2216                            .entry(record.get_name().to_string())
2217                            .or_insert_with(|| {
2218                                debug!("conflict handler: new probe of {}", record.get_name());
2219                                Probe::new(create_time)
2220                            });
2221                        self.timers.push(Reverse(new_probe.next_send));
2222                        new_probe
2223                    }
2224                };
2225
2226                debug!(
2227                    "insert record with new name '{}' {} into probe",
2228                    record.get_name(),
2229                    record.get_type()
2230                );
2231                new_probe.insert_record(record);
2232
2233                new_probe.waiting_services.extend(waiting_services.clone());
2234            }
2235        }
2236    }
2237
2238    /// Resolve the updated (including new) instances.
2239    ///
2240    /// Note: it is possible that more than 1 PTR pointing to the same
2241    /// instance. For example, a regular service type PTR and a sub-type
2242    /// service type PTR can both point to the same service instance.
2243    /// This loop automatically handles the sub-type PTRs.
2244    fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2245        let mut resolved: HashSet<String> = HashSet::new();
2246        let mut unresolved: HashSet<String> = HashSet::new();
2247        let mut removed_instances = HashMap::new();
2248
2249        for (ty_domain, records) in self.cache.all_ptr().iter() {
2250            if !self.service_queriers.contains_key(ty_domain) {
2251                // No need to resolve if not in our queries.
2252                continue;
2253            }
2254
2255            for record in records.iter() {
2256                if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2257                    if updated_instances.contains(dns_ptr.alias()) {
2258                        if let Ok(info) =
2259                            self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2260                        {
2261                            if info.is_ready() {
2262                                debug!("call queriers to resolve {}", dns_ptr.alias());
2263                                resolved.insert(dns_ptr.alias().to_string());
2264                                call_service_listener(
2265                                    &self.service_queriers,
2266                                    ty_domain,
2267                                    ServiceEvent::ServiceResolved(info),
2268                                );
2269                            } else {
2270                                if self.resolved.remove(dns_ptr.alias()) {
2271                                    removed_instances
2272                                        .entry(ty_domain.to_string())
2273                                        .or_insert_with(HashSet::new)
2274                                        .insert(dns_ptr.alias().to_string());
2275                                }
2276                                unresolved.insert(dns_ptr.alias().to_string());
2277                            }
2278                        }
2279                    }
2280                }
2281            }
2282        }
2283
2284        for instance in resolved.drain() {
2285            self.pending_resolves.remove(&instance);
2286            self.resolved.insert(instance);
2287        }
2288
2289        for instance in unresolved.drain() {
2290            self.add_pending_resolve(instance);
2291        }
2292
2293        self.notify_service_removal(removed_instances);
2294    }
2295
2296    /// Handle incoming query packets, figure out whether and what to respond.
2297    fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2298        let sock = match self.intf_socks.get(intf) {
2299            Some(sock) => sock,
2300            None => return,
2301        };
2302        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2303
2304        // Special meta-query "_services._dns-sd._udp.<Domain>".
2305        // See https://datatracker.ietf.org/doc/html/rfc6763#section-9
2306        const META_QUERY: &str = "_services._dns-sd._udp.local.";
2307
2308        let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2309            debug!("missing dns registry for intf {}", intf.ip());
2310            return;
2311        };
2312
2313        for question in msg.questions().iter() {
2314            trace!("query question: {:?}", &question);
2315
2316            let qtype = question.entry_type();
2317
2318            if qtype == RRType::PTR {
2319                for service in self.my_services.values() {
2320                    if service.get_status(intf) != ServiceStatus::Announced {
2321                        continue;
2322                    }
2323
2324                    if question.entry_name() == service.get_type()
2325                        || service
2326                            .get_subtype()
2327                            .as_ref()
2328                            .is_some_and(|v| v == question.entry_name())
2329                    {
2330                        add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2331                    } else if question.entry_name() == META_QUERY {
2332                        let ptr_added = out.add_answer(
2333                            &msg,
2334                            DnsPointer::new(
2335                                question.entry_name(),
2336                                RRType::PTR,
2337                                CLASS_IN,
2338                                service.get_other_ttl(),
2339                                service.get_type().to_string(),
2340                            ),
2341                        );
2342                        if !ptr_added {
2343                            trace!("answer was not added for meta-query {:?}", &question);
2344                        }
2345                    }
2346                }
2347            } else {
2348                // Simultaneous Probe Tiebreaking (RFC 6762 section 8.2)
2349                if qtype == RRType::ANY && msg.num_authorities() > 0 {
2350                    let probe_name = question.entry_name();
2351
2352                    if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2353                        let now = current_time_millis();
2354
2355                        // Only do tiebreaking if probe already started.
2356                        // This check also helps avoid redo tiebreaking if start time
2357                        // was postponed.
2358                        if probe.start_time < now {
2359                            let incoming_records: Vec<_> = msg
2360                                .authorities()
2361                                .iter()
2362                                .filter(|r| r.get_name() == probe_name)
2363                                .collect();
2364
2365                            probe.tiebreaking(&incoming_records, now, probe_name);
2366                        }
2367                    }
2368                }
2369
2370                if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2371                    for service in self.my_services.values() {
2372                        if service.get_status(intf) != ServiceStatus::Announced {
2373                            continue;
2374                        }
2375
2376                        let service_hostname =
2377                            match dns_registry.name_changes.get(service.get_hostname()) {
2378                                Some(new_name) => new_name,
2379                                None => service.get_hostname(),
2380                            };
2381
2382                        if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2383                            let intf_addrs = service.get_addrs_on_intf(intf);
2384                            if intf_addrs.is_empty()
2385                                && (qtype == RRType::A || qtype == RRType::AAAA)
2386                            {
2387                                let t = match qtype {
2388                                    RRType::A => "TYPE_A",
2389                                    RRType::AAAA => "TYPE_AAAA",
2390                                    _ => "invalid_type",
2391                                };
2392                                trace!(
2393                                    "Cannot find valid addrs for {} response on intf {:?}",
2394                                    t,
2395                                    &intf
2396                                );
2397                                return;
2398                            }
2399                            for address in intf_addrs {
2400                                out.add_answer(
2401                                    &msg,
2402                                    DnsAddress::new(
2403                                        service_hostname,
2404                                        ip_address_rr_type(&address),
2405                                        CLASS_IN | CLASS_CACHE_FLUSH,
2406                                        service.get_host_ttl(),
2407                                        address,
2408                                        intf.into(),
2409                                    ),
2410                                );
2411                            }
2412                        }
2413                    }
2414                }
2415
2416                let query_name = question.entry_name().to_lowercase();
2417                let service_opt = self
2418                    .my_services
2419                    .iter()
2420                    .find(|(k, _v)| {
2421                        let service_name = match dns_registry.name_changes.get(k.as_str()) {
2422                            Some(new_name) => new_name,
2423                            None => k,
2424                        };
2425                        service_name == &query_name
2426                    })
2427                    .map(|(_, v)| v);
2428
2429                let Some(service) = service_opt else {
2430                    continue;
2431                };
2432
2433                if service.get_status(intf) != ServiceStatus::Announced {
2434                    continue;
2435                }
2436
2437                add_answer_of_service(&mut out, &msg, question.entry_name(), service, qtype, intf);
2438            }
2439        }
2440
2441        if !out.answers_count() > 0 {
2442            out.set_id(msg.id());
2443            send_dns_outgoing(&out, intf, sock);
2444
2445            self.increase_counter(Counter::Respond, 1);
2446            self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2447        }
2448
2449        self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2450    }
2451
2452    /// Increases the value of `counter` by `count`.
2453    fn increase_counter(&mut self, counter: Counter, count: i64) {
2454        let key = counter.to_string();
2455        match self.counters.get_mut(&key) {
2456            Some(v) => *v += count,
2457            None => {
2458                self.counters.insert(key, count);
2459            }
2460        }
2461    }
2462
2463    /// Sets the value of `counter` to `count`.
2464    fn set_counter(&mut self, counter: Counter, count: i64) {
2465        let key = counter.to_string();
2466        self.counters.insert(key, count);
2467    }
2468
2469    fn signal_sock_drain(&self) {
2470        let mut signal_buf = [0; 1024];
2471
2472        // This recv is non-blocking as the socket is non-blocking.
2473        while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2474            trace!(
2475                "signal socket recvd: {}",
2476                String::from_utf8_lossy(&signal_buf[0..sz])
2477            );
2478        }
2479    }
2480
2481    fn add_retransmission(&mut self, next_time: u64, command: Command) {
2482        self.retransmissions.push(ReRun { next_time, command });
2483        self.add_timer(next_time);
2484    }
2485
2486    /// Sends service removal event to listeners for expired service records.
2487    fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2488        for (ty_domain, sender) in self.service_queriers.iter() {
2489            if let Some(instances) = expired.get(ty_domain) {
2490                for instance_name in instances {
2491                    let event = ServiceEvent::ServiceRemoved(
2492                        ty_domain.to_string(),
2493                        instance_name.to_string(),
2494                    );
2495                    match sender.send(event) {
2496                        Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2497                        Err(e) => debug!("Failed to send event: {}", e),
2498                    }
2499                }
2500            }
2501        }
2502    }
2503
2504    /// The entry point that executes all commands received by the daemon.
2505    ///
2506    /// `repeating`: whether this is a retransmission.
2507    fn exec_command(&mut self, command: Command, repeating: bool) {
2508        match command {
2509            Command::Browse(ty, next_delay, listener) => {
2510                self.exec_command_browse(repeating, ty, next_delay, listener);
2511            }
2512
2513            Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2514                self.exec_command_resolve_hostname(
2515                    repeating, hostname, next_delay, listener, timeout,
2516                );
2517            }
2518
2519            Command::Register(service_info) => {
2520                self.register_service(service_info);
2521                self.increase_counter(Counter::Register, 1);
2522            }
2523
2524            Command::RegisterResend(fullname, intf) => {
2525                trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2526                self.exec_command_register_resend(fullname, intf);
2527            }
2528
2529            Command::Unregister(fullname, resp_s) => {
2530                trace!("unregister service {} repeat {}", &fullname, &repeating);
2531                self.exec_command_unregister(repeating, fullname, resp_s);
2532            }
2533
2534            Command::UnregisterResend(packet, ip) => {
2535                self.exec_command_unregister_resend(packet, ip);
2536            }
2537
2538            Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2539
2540            Command::StopResolveHostname(hostname) => {
2541                self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2542            }
2543
2544            Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2545
2546            Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2547
2548            Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2549                Ok(()) => trace!("Sent status to the client"),
2550                Err(e) => debug!("Failed to send status: {}", e),
2551            },
2552
2553            Command::Monitor(resp_s) => {
2554                self.monitors.push(resp_s);
2555            }
2556
2557            Command::SetOption(daemon_opt) => {
2558                self.process_set_option(daemon_opt);
2559            }
2560
2561            Command::GetOption(resp_s) => {
2562                let val = DaemonOptionVal {
2563                    _service_name_len_max: self.service_name_len_max,
2564                    ip_check_interval: self.ip_check_interval,
2565                };
2566                if let Err(e) = resp_s.send(val) {
2567                    debug!("Failed to send options: {}", e);
2568                }
2569            }
2570
2571            Command::Verify(instance_fullname, timeout) => {
2572                self.exec_command_verify(instance_fullname, timeout, repeating);
2573            }
2574
2575            _ => {
2576                debug!("unexpected command: {:?}", &command);
2577            }
2578        }
2579    }
2580
2581    fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2582        self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2583        self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2584        self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2585        self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2586        self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2587        self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2588        self.set_counter(Counter::Timer, self.timers.len() as i64);
2589
2590        let dns_registry_probe_count: usize = self
2591            .dns_registry_map
2592            .values()
2593            .map(|r| r.probing.len())
2594            .sum();
2595        self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2596
2597        let dns_registry_active_count: usize = self
2598            .dns_registry_map
2599            .values()
2600            .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2601            .sum();
2602        self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2603
2604        let dns_registry_timer_count: usize = self
2605            .dns_registry_map
2606            .values()
2607            .map(|r| r.new_timers.len())
2608            .sum();
2609        self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2610
2611        let dns_registry_name_change_count: usize = self
2612            .dns_registry_map
2613            .values()
2614            .map(|r| r.name_changes.len())
2615            .sum();
2616        self.set_counter(
2617            Counter::DnsRegistryNameChange,
2618            dns_registry_name_change_count as i64,
2619        );
2620
2621        // Send the metrics to the client.
2622        if let Err(e) = resp_s.send(self.counters.clone()) {
2623            debug!("Failed to send metrics: {}", e);
2624        }
2625    }
2626
2627    fn exec_command_browse(
2628        &mut self,
2629        repeating: bool,
2630        ty: String,
2631        next_delay: u32,
2632        listener: Sender<ServiceEvent>,
2633    ) {
2634        let pretty_addrs: Vec<String> = self
2635            .intf_socks
2636            .keys()
2637            .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2638            .collect();
2639
2640        if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2641            "{ty} on {} interfaces [{}]",
2642            pretty_addrs.len(),
2643            pretty_addrs.join(", ")
2644        ))) {
2645            debug!(
2646                "Failed to send SearchStarted({})(repeating:{}): {}",
2647                &ty, repeating, e
2648            );
2649            return;
2650        }
2651        if !repeating {
2652            // Binds a `listener` to querying mDNS domain type `ty`.
2653            //
2654            // If there is already a `listener`, it will be updated, i.e. overwritten.
2655            self.service_queriers.insert(ty.clone(), listener.clone());
2656
2657            // if we already have the records in our cache, just send them
2658            self.query_cache_for_service(&ty, &listener);
2659        }
2660
2661        self.send_query(&ty, RRType::PTR);
2662        self.increase_counter(Counter::Browse, 1);
2663
2664        let next_time = current_time_millis() + (next_delay * 1000) as u64;
2665        let max_delay = 60 * 60;
2666        let delay = cmp::min(next_delay * 2, max_delay);
2667        self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2668    }
2669
2670    fn exec_command_resolve_hostname(
2671        &mut self,
2672        repeating: bool,
2673        hostname: String,
2674        next_delay: u32,
2675        listener: Sender<HostnameResolutionEvent>,
2676        timeout: Option<u64>,
2677    ) {
2678        let addr_list: Vec<_> = self.intf_socks.keys().collect();
2679        if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2680            "{} on addrs {:?}",
2681            &hostname, &addr_list
2682        ))) {
2683            debug!(
2684                "Failed to send ResolveStarted({})(repeating:{}): {}",
2685                &hostname, repeating, e
2686            );
2687            return;
2688        }
2689        if !repeating {
2690            self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2691            // if we already have the records in our cache, just send them
2692            self.query_cache_for_hostname(&hostname, listener.clone());
2693        }
2694
2695        self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2696        self.increase_counter(Counter::ResolveHostname, 1);
2697
2698        let now = current_time_millis();
2699        let next_time = now + u64::from(next_delay) * 1000;
2700        let max_delay = 60 * 60;
2701        let delay = cmp::min(next_delay * 2, max_delay);
2702
2703        // Only add retransmission if it does not exceed the hostname resolver timeout, if any.
2704        if self
2705            .hostname_resolvers
2706            .get(&hostname)
2707            .and_then(|(_sender, timeout)| *timeout)
2708            .map(|timeout| next_time < timeout)
2709            .unwrap_or(true)
2710        {
2711            self.add_retransmission(
2712                next_time,
2713                Command::ResolveHostname(hostname, delay, listener, None),
2714            );
2715        }
2716    }
2717
2718    fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2719        let pending_query = self.query_unresolved(&instance);
2720        let max_try = 3;
2721        if pending_query && try_count < max_try {
2722            // Note that if the current try already succeeds, the next retransmission
2723            // will be no-op as the cache has been updated.
2724            let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2725            self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2726        }
2727    }
2728
2729    fn exec_command_unregister(
2730        &mut self,
2731        repeating: bool,
2732        fullname: String,
2733        resp_s: Sender<UnregisterStatus>,
2734    ) {
2735        let response = match self.my_services.remove_entry(&fullname) {
2736            None => {
2737                debug!("unregister: cannot find such service {}", &fullname);
2738                UnregisterStatus::NotFound
2739            }
2740            Some((_k, info)) => {
2741                let mut timers = Vec::new();
2742                // Send one unregister per interface and ip version
2743                let mut multicast_sent_trackers = HashSet::new();
2744
2745                for (intf, sock) in self.intf_socks.iter() {
2746                    if let Some(tracker) = multicast_send_tracker(intf) {
2747                        if multicast_sent_trackers.contains(&tracker) {
2748                            continue; // no need to send unregister the same interface with same ip version.
2749                        }
2750                        multicast_sent_trackers.insert(tracker);
2751                    }
2752                    let packet = self.unregister_service(&info, intf, sock);
2753                    // repeat for one time just in case some peers miss the message
2754                    if !repeating && !packet.is_empty() {
2755                        let next_time = current_time_millis() + 120;
2756                        self.retransmissions.push(ReRun {
2757                            next_time,
2758                            command: Command::UnregisterResend(packet, intf.clone()),
2759                        });
2760                        timers.push(next_time);
2761                    }
2762                }
2763
2764                for t in timers {
2765                    self.add_timer(t);
2766                }
2767
2768                self.increase_counter(Counter::Unregister, 1);
2769                UnregisterStatus::OK
2770            }
2771        };
2772        if let Err(e) = resp_s.send(response) {
2773            debug!("unregister: failed to send response: {}", e);
2774        }
2775    }
2776
2777    fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2778        if let Some(sock) = self.intf_socks.get(&intf) {
2779            debug!("UnregisterResend from {}", &intf.ip());
2780            multicast_on_intf(&packet[..], &intf, sock);
2781            self.increase_counter(Counter::UnregisterResend, 1);
2782        }
2783    }
2784
2785    fn exec_command_stop_browse(&mut self, ty_domain: String) {
2786        match self.service_queriers.remove_entry(&ty_domain) {
2787            None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2788            Some((ty, sender)) => {
2789                // Remove pending browse commands in the reruns.
2790                trace!("StopBrowse: removed queryer for {}", &ty);
2791                let mut i = 0;
2792                while i < self.retransmissions.len() {
2793                    if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2794                        if t == &ty {
2795                            self.retransmissions.remove(i);
2796                            trace!("StopBrowse: removed retransmission for {}", &ty);
2797                            continue;
2798                        }
2799                    }
2800                    i += 1;
2801                }
2802
2803                // Remove cache entries.
2804                self.cache.remove_service_type(&ty_domain);
2805
2806                // Notify the client.
2807                match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2808                    Ok(()) => trace!("Sent SearchStopped to the listener"),
2809                    Err(e) => debug!("Failed to send SearchStopped: {}", e),
2810                }
2811            }
2812        }
2813    }
2814
2815    fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2816        if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2817            // Remove pending resolve commands in the reruns.
2818            trace!("StopResolve: removed queryer for {}", &host);
2819            let mut i = 0;
2820            while i < self.retransmissions.len() {
2821                if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2822                    if t == &host {
2823                        self.retransmissions.remove(i);
2824                        trace!("StopResolve: removed retransmission for {}", &host);
2825                        continue;
2826                    }
2827                }
2828                i += 1;
2829            }
2830
2831            // Notify the client.
2832            match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2833                Ok(()) => trace!("Sent SearchStopped to the listener"),
2834                Err(e) => debug!("Failed to send SearchStopped: {}", e),
2835            }
2836        }
2837    }
2838
2839    fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2840        let Some(info) = self.my_services.get_mut(&fullname) else {
2841            trace!("announce: cannot find such service {}", &fullname);
2842            return;
2843        };
2844
2845        let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2846            return;
2847        };
2848
2849        let Some(sock) = self.intf_socks.get(&intf) else {
2850            return;
2851        };
2852
2853        if announce_service_on_intf(dns_registry, info, &intf, sock) {
2854            let mut hostname = info.get_hostname();
2855            if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2856                hostname = new_name;
2857            }
2858            let service_name = match dns_registry.name_changes.get(&fullname) {
2859                Some(new_name) => new_name.to_string(),
2860                None => fullname,
2861            };
2862
2863            debug!("resend: announce service {} on {}", service_name, intf.ip());
2864
2865            notify_monitors(
2866                &mut self.monitors,
2867                DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2868            );
2869            info.set_status(&intf, ServiceStatus::Announced);
2870        } else {
2871            debug!("register-resend should not fail");
2872        }
2873
2874        self.increase_counter(Counter::RegisterResend, 1);
2875    }
2876
2877    fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2878        /*
2879        RFC 6762 section 10.4:
2880        ...
2881        When the cache receives this hint that it should reconfirm some
2882        record, it MUST issue two or more queries for the resource record in
2883        dispute.  If no response is received within ten seconds, then, even
2884        though its TTL may indicate that it is not yet due to expire, that
2885        record SHOULD be promptly flushed from the cache.
2886        */
2887        let now = current_time_millis();
2888        let expire_at = if repeating {
2889            None
2890        } else {
2891            Some(now + timeout.as_millis() as u64)
2892        };
2893
2894        // send query for the resource records.
2895        let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2896
2897        if !record_vec.is_empty() {
2898            let query_vec: Vec<(&str, RRType)> = record_vec
2899                .iter()
2900                .map(|(record, rr_type)| (record.as_str(), *rr_type))
2901                .collect();
2902            self.send_query_vec(&query_vec);
2903
2904            if let Some(new_expire) = expire_at {
2905                self.add_timer(new_expire); // ensure a check for the new expire time.
2906
2907                // schedule a resend 1 second later
2908                self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2909            }
2910        }
2911    }
2912
2913    /// Refresh cached service records with active queriers
2914    fn refresh_active_services(&mut self) {
2915        let mut query_ptr_count = 0;
2916        let mut query_srv_count = 0;
2917        let mut new_timers = HashSet::new();
2918        let mut query_addr_count = 0;
2919
2920        for (ty_domain, _sender) in self.service_queriers.iter() {
2921            let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2922            if !refreshed_timers.is_empty() {
2923                trace!("sending refresh query for PTR: {}", ty_domain);
2924                self.send_query(ty_domain, RRType::PTR);
2925                query_ptr_count += 1;
2926                new_timers.extend(refreshed_timers);
2927            }
2928
2929            let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
2930            for (instance, types) in instances {
2931                trace!("sending refresh query for: {}", &instance);
2932                let query_vec = types
2933                    .into_iter()
2934                    .map(|ty| (instance.as_str(), ty))
2935                    .collect::<Vec<_>>();
2936                self.send_query_vec(&query_vec);
2937                query_srv_count += 1;
2938            }
2939            new_timers.extend(timers);
2940            let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2941            for hostname in hostnames.iter() {
2942                trace!("sending refresh queries for A and AAAA:  {}", hostname);
2943                self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2944                query_addr_count += 2;
2945            }
2946            new_timers.extend(timers);
2947        }
2948
2949        for timer in new_timers {
2950            self.add_timer(timer);
2951        }
2952
2953        self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2954        self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
2955        self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2956    }
2957}
2958
2959/// Adds one or more answers of a service for incoming msg and RR entry name.
2960fn add_answer_of_service(
2961    out: &mut DnsOutgoing,
2962    msg: &DnsIncoming,
2963    entry_name: &str,
2964    service: &ServiceInfo,
2965    qtype: RRType,
2966    intf: &Interface,
2967) {
2968    if qtype == RRType::SRV || qtype == RRType::ANY {
2969        out.add_answer(
2970            msg,
2971            DnsSrv::new(
2972                entry_name,
2973                CLASS_IN | CLASS_CACHE_FLUSH,
2974                service.get_host_ttl(),
2975                service.get_priority(),
2976                service.get_weight(),
2977                service.get_port(),
2978                service.get_hostname().to_string(),
2979            ),
2980        );
2981    }
2982
2983    if qtype == RRType::TXT || qtype == RRType::ANY {
2984        out.add_answer(
2985            msg,
2986            DnsTxt::new(
2987                entry_name,
2988                CLASS_IN | CLASS_CACHE_FLUSH,
2989                service.get_other_ttl(),
2990                service.generate_txt(),
2991            ),
2992        );
2993    }
2994
2995    if qtype == RRType::SRV {
2996        let intf_addrs = service.get_addrs_on_intf(intf);
2997        if intf_addrs.is_empty() {
2998            debug!(
2999                "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3000                &intf
3001            );
3002            return;
3003        }
3004        for address in intf_addrs {
3005            out.add_additional_answer(DnsAddress::new(
3006                service.get_hostname(),
3007                ip_address_rr_type(&address),
3008                CLASS_IN | CLASS_CACHE_FLUSH,
3009                service.get_host_ttl(),
3010                address,
3011                intf.into(),
3012            ));
3013        }
3014    }
3015}
3016
3017/// All possible events sent to the client from the daemon
3018/// regarding service discovery.
3019#[derive(Debug)]
3020pub enum ServiceEvent {
3021    /// Started searching for a service type.
3022    SearchStarted(String),
3023
3024    /// Found a specific (service_type, fullname).
3025    ServiceFound(String, String),
3026
3027    /// Resolved a service instance with detailed info.
3028    ServiceResolved(ServiceInfo),
3029
3030    /// A service instance (service_type, fullname) was removed.
3031    ServiceRemoved(String, String),
3032
3033    /// Stopped searching for a service type.
3034    SearchStopped(String),
3035}
3036
3037/// All possible events sent to the client from the daemon
3038/// regarding host resolution.
3039#[derive(Debug)]
3040#[non_exhaustive]
3041pub enum HostnameResolutionEvent {
3042    /// Started searching for the ip address of a hostname.
3043    SearchStarted(String),
3044    /// One or more addresses for a hostname has been found.
3045    AddressesFound(String, HashSet<IpAddr>),
3046    /// One or more addresses for a hostname has been removed.
3047    AddressesRemoved(String, HashSet<IpAddr>),
3048    /// The search for the ip address of a hostname has timed out.
3049    SearchTimeout(String),
3050    /// Stopped searching for the ip address of a hostname.
3051    SearchStopped(String),
3052}
3053
3054/// Some notable events from the daemon besides [`ServiceEvent`].
3055/// These events are expected to happen infrequently.
3056#[derive(Clone, Debug)]
3057#[non_exhaustive]
3058pub enum DaemonEvent {
3059    /// Daemon unsolicitly announced a service from an interface.
3060    Announce(String, String),
3061
3062    /// Daemon encountered an error.
3063    Error(Error),
3064
3065    /// Daemon detected a new IP address from the host.
3066    IpAdd(IpAddr),
3067
3068    /// Daemon detected a IP address removed from the host.
3069    IpDel(IpAddr),
3070
3071    /// Daemon resolved a name conflict by changing one of its names.
3072    /// see [DnsNameChange] for more details.
3073    NameChange(DnsNameChange),
3074
3075    /// Send out a multicast response via an IP address.
3076    Respond(IpAddr),
3077}
3078
3079/// Represents a name change due to a name conflict resolution.
3080/// See [RFC 6762 section 9](https://datatracker.ietf.org/doc/html/rfc6762#section-9)
3081#[derive(Clone, Debug)]
3082pub struct DnsNameChange {
3083    /// The original name set in `ServiceInfo` by the user.
3084    pub original: String,
3085
3086    /// A new name is created by appending a suffix after the original name.
3087    ///
3088    /// - for a service instance name, the suffix is `(N)`, where N starts at 2.
3089    /// - for a host name, the suffix is `-N`, where N starts at 2.
3090    ///
3091    /// For example:
3092    ///
3093    /// - Service name `foo._service-type._udp` becomes `foo (2)._service-type._udp`
3094    /// - Host name `foo.local.` becomes `foo-2.local.`
3095    pub new_name: String,
3096
3097    /// The resource record type
3098    pub rr_type: RRType,
3099
3100    /// The interface where the name conflict and its change happened.
3101    pub intf_name: String,
3102}
3103
3104/// Commands supported by the daemon
3105#[derive(Debug)]
3106enum Command {
3107    /// Browsing for a service type (ty_domain, next_time_delay_in_seconds, channel::sender)
3108    Browse(String, u32, Sender<ServiceEvent>),
3109
3110    /// Resolve a hostname to IP addresses.
3111    ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), // (hostname, next_time_delay_in_seconds, sender, timeout_in_milliseconds)
3112
3113    /// Register a service
3114    Register(ServiceInfo),
3115
3116    /// Unregister a service
3117    Unregister(String, Sender<UnregisterStatus>), // (fullname)
3118
3119    /// Announce again a service to local network
3120    RegisterResend(String, Interface), // (fullname)
3121
3122    /// Resend unregister packet.
3123    UnregisterResend(Vec<u8>, Interface), // (packet content)
3124
3125    /// Stop browsing a service type
3126    StopBrowse(String), // (ty_domain)
3127
3128    /// Stop resolving a hostname
3129    StopResolveHostname(String), // (hostname)
3130
3131    /// Send query to resolve a service instance.
3132    /// This is used when a PTR record exists but SRV & TXT records are missing.
3133    Resolve(String, u16), // (service_instance_fullname, try_count)
3134
3135    /// Read the current values of the counters
3136    GetMetrics(Sender<Metrics>),
3137
3138    /// Get the current status of the daemon.
3139    GetStatus(Sender<DaemonStatus>),
3140
3141    /// Monitor noticeable events in the daemon.
3142    Monitor(Sender<DaemonEvent>),
3143
3144    SetOption(DaemonOption),
3145
3146    GetOption(Sender<DaemonOptionVal>),
3147
3148    /// Proactively confirm a DNS resource record.
3149    ///
3150    /// The intention is to check if a service name or IP address still valid
3151    /// before its TTL expires.
3152    Verify(String, Duration),
3153
3154    Exit(Sender<DaemonStatus>),
3155}
3156
3157impl fmt::Display for Command {
3158    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3159        match self {
3160            Self::Browse(_, _, _) => write!(f, "Command Browse"),
3161            Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3162            Self::Exit(_) => write!(f, "Command Exit"),
3163            Self::GetStatus(_) => write!(f, "Command GetStatus"),
3164            Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3165            Self::Monitor(_) => write!(f, "Command Monitor"),
3166            Self::Register(_) => write!(f, "Command Register"),
3167            Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3168            Self::SetOption(_) => write!(f, "Command SetOption"),
3169            Self::GetOption(_) => write!(f, "Command GetOption"),
3170            Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3171            Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3172            Self::Unregister(_, _) => write!(f, "Command Unregister"),
3173            Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3174            Self::Resolve(_, _) => write!(f, "Command Resolve"),
3175            Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3176        }
3177    }
3178}
3179
3180struct DaemonOptionVal {
3181    _service_name_len_max: u8,
3182    ip_check_interval: u64,
3183}
3184
3185#[derive(Debug)]
3186enum DaemonOption {
3187    ServiceNameLenMax(u8),
3188    IpCheckInterval(u64),
3189    EnableInterface(Vec<IfKind>),
3190    DisableInterface(Vec<IfKind>),
3191    MulticastLoopV4(bool),
3192    MulticastLoopV6(bool),
3193}
3194
3195/// The length of Service Domain name supported in this lib.
3196const DOMAIN_LEN: usize = "._tcp.local.".len();
3197
3198/// Validate the length of "service_name" in a "_<service_name>.<domain_name>." string.
3199fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3200    if ty_domain.len() <= DOMAIN_LEN + 1 {
3201        // service name cannot be empty or only '_'.
3202        return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3203    }
3204
3205    let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; // exclude the leading `_`
3206    if service_name_len > limit as usize {
3207        return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3208    }
3209    Ok(())
3210}
3211
3212/// Checks if `name` ends with a valid domain: '._tcp.local.' or '._udp.local.'
3213fn check_domain_suffix(name: &str) -> Result<()> {
3214    if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3215        return Err(e_fmt!(
3216            "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3217            name
3218        ));
3219    }
3220
3221    Ok(())
3222}
3223
3224/// Validate the service name in a fully qualified name.
3225///
3226/// A Full Name = <Instance>.<Service>.<Domain>
3227/// The only `<Domain>` supported are "._tcp.local." and "._udp.local.".
3228///
3229/// Note: this function does not check for the length of the service name.
3230/// Instead, `register_service` method will check the length.
3231fn check_service_name(fullname: &str) -> Result<()> {
3232    check_domain_suffix(fullname)?;
3233
3234    let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3235    let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3236
3237    if &name[0..1] != "_" {
3238        return Err(e_fmt!("Service name must start with '_'"));
3239    }
3240
3241    let name = &name[1..];
3242
3243    if name.contains("--") {
3244        return Err(e_fmt!("Service name must not contain '--'"));
3245    }
3246
3247    if name.starts_with('-') || name.ends_with('-') {
3248        return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3249    }
3250
3251    let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3252    if ascii_count < 1 {
3253        return Err(e_fmt!(
3254            "Service name must contain at least one letter (eg: 'A-Za-z')"
3255        ));
3256    }
3257
3258    Ok(())
3259}
3260
3261/// Validate a hostname.
3262fn check_hostname(hostname: &str) -> Result<()> {
3263    if !hostname.ends_with(".local.") {
3264        return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3265    }
3266
3267    if hostname == ".local." {
3268        return Err(e_fmt!(
3269            "The part of the hostname before '.local.' cannot be empty"
3270        ));
3271    }
3272
3273    if hostname.len() > 255 {
3274        return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3275    }
3276
3277    Ok(())
3278}
3279
3280fn call_service_listener(
3281    listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3282    ty_domain: &str,
3283    event: ServiceEvent,
3284) {
3285    if let Some(listener) = listeners_map.get(ty_domain) {
3286        match listener.send(event) {
3287            Ok(()) => trace!("Sent event to listener successfully"),
3288            Err(e) => debug!("Failed to send event: {}", e),
3289        }
3290    }
3291}
3292
3293fn call_hostname_resolution_listener(
3294    listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3295    hostname: &str,
3296    event: HostnameResolutionEvent,
3297) {
3298    let hostname_lower = hostname.to_lowercase();
3299    if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3300        match listener.send(event) {
3301            Ok(()) => trace!("Sent event to listener successfully"),
3302            Err(e) => debug!("Failed to send event: {}", e),
3303        }
3304    }
3305}
3306
3307/// Returns valid network interfaces in the host system.
3308/// Loopback interfaces are excluded.
3309fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3310    if_addrs::get_if_addrs()
3311        .unwrap_or_default()
3312        .into_iter()
3313        .filter(|i| !i.is_loopback() || with_loopback)
3314        .collect()
3315}
3316
3317/// Send an outgoing mDNS query or response, and returns the packet bytes.
3318fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3319    let qtype = if out.is_query() { "query" } else { "response" };
3320    trace!(
3321        "send outgoing {}: {} questions {} answers {} authorities {} additional",
3322        qtype,
3323        out.questions().len(),
3324        out.answers_count(),
3325        out.authorities().len(),
3326        out.additionals().len()
3327    );
3328    let packet_list = out.to_data_on_wire();
3329    for packet in packet_list.iter() {
3330        multicast_on_intf(packet, intf, sock);
3331    }
3332    packet_list
3333}
3334
3335/// Sends a multicast packet, and returns the packet bytes.
3336fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3337    if packet.len() > MAX_MSG_ABSOLUTE {
3338        debug!("Drop over-sized packet ({})", packet.len());
3339        return;
3340    }
3341
3342    let addr: SocketAddr = match intf.addr {
3343        if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3344        if_addrs::IfAddr::V6(_) => {
3345            let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3346            sock.set_scope_id(intf.index.unwrap_or(0)); // Choose iface for multicast
3347            sock.into()
3348        }
3349    };
3350
3351    send_packet(packet, addr, intf, socket);
3352}
3353
3354/// Sends out `packet` to `addr` on the socket in `intf_sock`.
3355fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3356    match sock.send_to(packet, addr) {
3357        Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3358        Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3359    }
3360}
3361
3362/// Returns true if `name` is a valid instance name of format:
3363/// <instance>.<service_type>.<_udp|_tcp>.local.
3364/// Note: <instance> could contain '.' as well.
3365fn valid_instance_name(name: &str) -> bool {
3366    name.split('.').count() >= 5
3367}
3368
3369fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3370    monitors.retain(|sender| {
3371        if let Err(e) = sender.try_send(event.clone()) {
3372            debug!("notify_monitors: try_send: {}", &e);
3373            if matches!(e, TrySendError::Disconnected(_)) {
3374                return false; // This monitor is dropped.
3375            }
3376        }
3377        true
3378    });
3379}
3380
3381/// Check if all unique records passed "probing", and if yes, create a packet
3382/// to announce the service.
3383fn prepare_announce(
3384    info: &ServiceInfo,
3385    intf: &Interface,
3386    dns_registry: &mut DnsRegistry,
3387) -> Option<DnsOutgoing> {
3388    let intf_addrs = info.get_addrs_on_intf(intf);
3389    if intf_addrs.is_empty() {
3390        trace!("No valid addrs to add on intf {:?}", &intf);
3391        return None;
3392    }
3393
3394    // check if we changed our name due to conflicts.
3395    let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3396        Some(new_name) => new_name,
3397        None => info.get_fullname(),
3398    };
3399
3400    debug!(
3401        "prepare to announce service {service_fullname} on {}: {}",
3402        &intf.name,
3403        &intf.ip()
3404    );
3405
3406    let mut probing_count = 0;
3407    let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3408    let create_time = current_time_millis() + fastrand::u64(0..250);
3409
3410    out.add_answer_at_time(
3411        DnsPointer::new(
3412            info.get_type(),
3413            RRType::PTR,
3414            CLASS_IN,
3415            info.get_other_ttl(),
3416            service_fullname.to_string(),
3417        ),
3418        0,
3419    );
3420
3421    if let Some(sub) = info.get_subtype() {
3422        trace!("Adding subdomain {}", sub);
3423        out.add_answer_at_time(
3424            DnsPointer::new(
3425                sub,
3426                RRType::PTR,
3427                CLASS_IN,
3428                info.get_other_ttl(),
3429                service_fullname.to_string(),
3430            ),
3431            0,
3432        );
3433    }
3434
3435    // SRV records.
3436    let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3437        Some(new_name) => new_name.to_string(),
3438        None => info.get_hostname().to_string(),
3439    };
3440
3441    let mut srv = DnsSrv::new(
3442        info.get_fullname(),
3443        CLASS_IN | CLASS_CACHE_FLUSH,
3444        info.get_host_ttl(),
3445        info.get_priority(),
3446        info.get_weight(),
3447        info.get_port(),
3448        hostname,
3449    );
3450
3451    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3452        srv.get_record_mut().set_new_name(new_name.to_string());
3453    }
3454
3455    if !info.requires_probe()
3456        || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3457    {
3458        out.add_answer_at_time(srv, 0);
3459    } else {
3460        probing_count += 1;
3461    }
3462
3463    // TXT records.
3464
3465    let mut txt = DnsTxt::new(
3466        info.get_fullname(),
3467        CLASS_IN | CLASS_CACHE_FLUSH,
3468        info.get_other_ttl(),
3469        info.generate_txt(),
3470    );
3471
3472    if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3473        txt.get_record_mut().set_new_name(new_name.to_string());
3474    }
3475
3476    if !info.requires_probe()
3477        || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3478    {
3479        out.add_answer_at_time(txt, 0);
3480    } else {
3481        probing_count += 1;
3482    }
3483
3484    // Address records. (A and AAAA)
3485
3486    let hostname = info.get_hostname();
3487    for address in intf_addrs {
3488        let mut dns_addr = DnsAddress::new(
3489            hostname,
3490            ip_address_rr_type(&address),
3491            CLASS_IN | CLASS_CACHE_FLUSH,
3492            info.get_host_ttl(),
3493            address,
3494            intf.into(),
3495        );
3496
3497        if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3498            dns_addr.get_record_mut().set_new_name(new_name.to_string());
3499        }
3500
3501        if !info.requires_probe()
3502            || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3503        {
3504            out.add_answer_at_time(dns_addr, 0);
3505        } else {
3506            probing_count += 1;
3507        }
3508    }
3509
3510    if probing_count > 0 {
3511        return None;
3512    }
3513
3514    Some(out)
3515}
3516
3517/// Send an unsolicited response for owned service via `intf` and `sock`.
3518/// Returns true if sent out successfully.
3519fn announce_service_on_intf(
3520    dns_registry: &mut DnsRegistry,
3521    info: &ServiceInfo,
3522    intf: &Interface,
3523    sock: &MioUdpSocket,
3524) -> bool {
3525    if let Some(out) = prepare_announce(info, intf, dns_registry) {
3526        send_dns_outgoing(&out, intf, sock);
3527        return true;
3528    }
3529    false
3530}
3531
3532/// Returns a new name based on the `original` to avoid conflicts.
3533/// If the name already contains a number in parentheses, increments that number.
3534///
3535/// Examples:
3536/// - `foo.local.` becomes `foo (2).local.`
3537/// - `foo (2).local.` becomes `foo (3).local.`
3538/// - `foo (9)` becomes `foo (10)`
3539fn name_change(original: &str) -> String {
3540    let mut parts: Vec<_> = original.split('.').collect();
3541    let Some(first_part) = parts.get_mut(0) else {
3542        return format!("{original} (2)");
3543    };
3544
3545    let mut new_name = format!("{} (2)", first_part);
3546
3547    // check if there is already has `(<num>)` suffix.
3548    if let Some(paren_pos) = first_part.rfind(" (") {
3549        // Check if there's a closing parenthesis
3550        if let Some(end_paren) = first_part[paren_pos..].find(')') {
3551            let absolute_end_pos = paren_pos + end_paren;
3552            // Only process if the closing parenthesis is the last character
3553            if absolute_end_pos == first_part.len() - 1 {
3554                let num_start = paren_pos + 2; // Skip " ("
3555                                               // Try to parse the number between parentheses
3556                if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3557                    let base_name = &first_part[..paren_pos];
3558                    new_name = format!("{} ({})", base_name, number + 1)
3559                }
3560            }
3561        }
3562    }
3563
3564    *first_part = &new_name;
3565    parts.join(".")
3566}
3567
3568/// Returns a new name based on the `original` to avoid conflicts.
3569/// If the name already contains a hyphenated number, increments that number.
3570///
3571/// Examples:
3572/// - `foo.local.` becomes `foo-2.local.`
3573/// - `foo-2.local.` becomes `foo-3.local.`
3574/// - `foo` becomes `foo-2`
3575fn hostname_change(original: &str) -> String {
3576    let mut parts: Vec<_> = original.split('.').collect();
3577    let Some(first_part) = parts.get_mut(0) else {
3578        return format!("{original}-2");
3579    };
3580
3581    let mut new_name = format!("{}-2", first_part);
3582
3583    // check if there is already a `-<num>` suffix
3584    if let Some(hyphen_pos) = first_part.rfind('-') {
3585        // Try to parse everything after the hyphen as a number
3586        if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3587            let base_name = &first_part[..hyphen_pos];
3588            new_name = format!("{}-{}", base_name, number + 1);
3589        }
3590    }
3591
3592    *first_part = &new_name;
3593    parts.join(".")
3594}
3595
3596fn add_answer_with_additionals(
3597    out: &mut DnsOutgoing,
3598    msg: &DnsIncoming,
3599    service: &ServiceInfo,
3600    intf: &Interface,
3601    dns_registry: &DnsRegistry,
3602) {
3603    let intf_addrs = service.get_addrs_on_intf(intf);
3604    if intf_addrs.is_empty() {
3605        trace!("No addrs on LAN of intf {:?}", intf);
3606        return;
3607    }
3608
3609    // check if we changed our name due to conflicts.
3610    let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3611        Some(new_name) => new_name,
3612        None => service.get_fullname(),
3613    };
3614
3615    let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3616        Some(new_name) => new_name,
3617        None => service.get_hostname(),
3618    };
3619
3620    let ptr_added = out.add_answer(
3621        msg,
3622        DnsPointer::new(
3623            service.get_type(),
3624            RRType::PTR,
3625            CLASS_IN,
3626            service.get_other_ttl(),
3627            service_fullname.to_string(),
3628        ),
3629    );
3630
3631    if !ptr_added {
3632        trace!("answer was not added for msg {:?}", msg);
3633        return;
3634    }
3635
3636    if let Some(sub) = service.get_subtype() {
3637        trace!("Adding subdomain {}", sub);
3638        out.add_additional_answer(DnsPointer::new(
3639            sub,
3640            RRType::PTR,
3641            CLASS_IN,
3642            service.get_other_ttl(),
3643            service_fullname.to_string(),
3644        ));
3645    }
3646
3647    // Add recommended additional answers according to
3648    // https://tools.ietf.org/html/rfc6763#section-12.1.
3649    out.add_additional_answer(DnsSrv::new(
3650        service_fullname,
3651        CLASS_IN | CLASS_CACHE_FLUSH,
3652        service.get_host_ttl(),
3653        service.get_priority(),
3654        service.get_weight(),
3655        service.get_port(),
3656        hostname.to_string(),
3657    ));
3658
3659    out.add_additional_answer(DnsTxt::new(
3660        service_fullname,
3661        CLASS_IN | CLASS_CACHE_FLUSH,
3662        service.get_other_ttl(),
3663        service.generate_txt(),
3664    ));
3665
3666    for address in intf_addrs {
3667        out.add_additional_answer(DnsAddress::new(
3668            hostname,
3669            ip_address_rr_type(&address),
3670            CLASS_IN | CLASS_CACHE_FLUSH,
3671            service.get_host_ttl(),
3672            address,
3673            intf.into(),
3674        ));
3675    }
3676}
3677
3678#[cfg(test)]
3679mod tests {
3680    use super::{
3681        check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3682        name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3683        HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3684        MDNS_PORT,
3685    };
3686    use crate::{
3687        dns_parser::{
3688            DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, CLASS_IN, FLAGS_AA,
3689            FLAGS_QR_RESPONSE,
3690        },
3691        service_daemon::{add_answer_of_service, check_hostname},
3692    };
3693    use std::{
3694        net::{SocketAddr, SocketAddrV4},
3695        time::Duration,
3696    };
3697    use test_log::test;
3698
3699    #[test]
3700    fn test_socketaddr_print() {
3701        let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3702        let print = format!("{}", addr);
3703        assert_eq!(print, "224.0.0.251:5353");
3704    }
3705
3706    #[test]
3707    fn test_instance_name() {
3708        assert!(valid_instance_name("my-laser._printer._tcp.local."));
3709        assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3710        assert!(!valid_instance_name("_printer._tcp.local."));
3711    }
3712
3713    #[test]
3714    fn test_check_service_name_length() {
3715        let result = check_service_name_length("_tcp", 100);
3716        assert!(result.is_err());
3717        if let Err(e) = result {
3718            println!("{}", e);
3719        }
3720    }
3721
3722    #[test]
3723    fn test_check_hostname() {
3724        // valid hostnames
3725        for hostname in &[
3726            "my_host.local.",
3727            &("A".repeat(255 - ".local.".len()) + ".local."),
3728        ] {
3729            let result = check_hostname(hostname);
3730            assert!(result.is_ok());
3731        }
3732
3733        // erroneous hostnames
3734        for hostname in &[
3735            "my_host.local",
3736            ".local.",
3737            &("A".repeat(256 - ".local.".len()) + ".local."),
3738        ] {
3739            let result = check_hostname(hostname);
3740            assert!(result.is_err());
3741            if let Err(e) = result {
3742                println!("{}", e);
3743            }
3744        }
3745    }
3746
3747    #[test]
3748    fn test_check_domain_suffix() {
3749        assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3750        assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3751        assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3752        assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3753        assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3754        assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3755    }
3756
3757    #[test]
3758    fn test_service_with_temporarily_invalidated_ptr() {
3759        // Create a daemon
3760        let d = ServiceDaemon::new().expect("Failed to create daemon");
3761
3762        let service = "_test_inval_ptr._udp.local.";
3763        let host_name = "my_host_tmp_invalidated_ptr.local.";
3764        let intfs: Vec<_> = my_ip_interfaces(false);
3765        let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3766        let port = 5201;
3767        let my_service =
3768            ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3769                .expect("invalid service info")
3770                .enable_addr_auto();
3771        let result = d.register(my_service.clone());
3772        assert!(result.is_ok());
3773
3774        // Browse for a service
3775        let browse_chan = d.browse(service).unwrap();
3776        let timeout = Duration::from_secs(2);
3777        let mut resolved = false;
3778
3779        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3780            match event {
3781                ServiceEvent::ServiceResolved(info) => {
3782                    resolved = true;
3783                    println!("Resolved a service of {}", &info.get_fullname());
3784                    break;
3785                }
3786                e => {
3787                    println!("Received event {:?}", e);
3788                }
3789            }
3790        }
3791
3792        assert!(resolved);
3793
3794        println!("Stopping browse of {}", service);
3795        // Pause browsing so restarting will cause a new immediate query.
3796        // Unregistering will not work here, it will invalidate all the records.
3797        d.stop_browse(service).unwrap();
3798
3799        // Ensure the search is stopped.
3800        // Reduces the chance of receiving an answer adding the ptr back to the
3801        // cache causing the later browse to return directly from the cache.
3802        // (which invalidates what this test is trying to test for.)
3803        let mut stopped = false;
3804        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3805            match event {
3806                ServiceEvent::SearchStopped(_) => {
3807                    stopped = true;
3808                    println!("Stopped browsing service");
3809                    break;
3810                }
3811                // Other `ServiceResolved` messages may be received
3812                // here as they come from different interfaces.
3813                // That's fine for this test.
3814                e => {
3815                    println!("Received event {:?}", e);
3816                }
3817            }
3818        }
3819
3820        assert!(stopped);
3821
3822        // Invalidate the ptr from the service to the host.
3823        let invalidate_ptr_packet = DnsPointer::new(
3824            my_service.get_type(),
3825            RRType::PTR,
3826            CLASS_IN,
3827            0,
3828            my_service.get_fullname().to_string(),
3829        );
3830
3831        let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3832        packet_buffer.add_additional_answer(invalidate_ptr_packet);
3833
3834        for intf in intfs {
3835            let sock = new_socket_bind(&intf, true).unwrap();
3836            send_dns_outgoing(&packet_buffer, &intf, &sock);
3837        }
3838
3839        println!(
3840            "Sent PTR record invalidation. Starting second browse for {}",
3841            service
3842        );
3843
3844        // Restart the browse to force the sender to re-send the announcements.
3845        let browse_chan = d.browse(service).unwrap();
3846
3847        resolved = false;
3848        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3849            match event {
3850                ServiceEvent::ServiceResolved(info) => {
3851                    resolved = true;
3852                    println!("Resolved a service of {}", &info.get_fullname());
3853                    break;
3854                }
3855                e => {
3856                    println!("Received event {:?}", e);
3857                }
3858            }
3859        }
3860
3861        assert!(resolved);
3862        d.shutdown().unwrap();
3863    }
3864
3865    #[test]
3866    fn test_expired_srv() {
3867        // construct service info
3868        let service_type = "_expired-srv._udp.local.";
3869        let instance = "test_instance";
3870        let host_name = "expired_srv_host.local.";
3871        let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3872            .unwrap()
3873            .enable_addr_auto();
3874        // let fullname = my_service.get_fullname().to_string();
3875
3876        // set SRV to expire soon.
3877        let new_ttl = 3; // for testing only.
3878        my_service._set_host_ttl(new_ttl);
3879
3880        // register my service
3881        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3882        let result = mdns_server.register(my_service);
3883        assert!(result.is_ok());
3884
3885        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3886        let browse_chan = mdns_client.browse(service_type).unwrap();
3887        let timeout = Duration::from_secs(2);
3888        let mut resolved = false;
3889
3890        while let Ok(event) = browse_chan.recv_timeout(timeout) {
3891            match event {
3892                ServiceEvent::ServiceResolved(info) => {
3893                    resolved = true;
3894                    println!("Resolved a service of {}", &info.get_fullname());
3895                    break;
3896                }
3897                _ => {}
3898            }
3899        }
3900
3901        assert!(resolved);
3902
3903        // Exit the server so that no more responses.
3904        mdns_server.shutdown().unwrap();
3905
3906        // SRV record in the client cache will expire.
3907        let expire_timeout = Duration::from_secs(new_ttl as u64);
3908        while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3909            match event {
3910                ServiceEvent::ServiceRemoved(service_type, full_name) => {
3911                    println!("Service removed: {}: {}", &service_type, &full_name);
3912                    break;
3913                }
3914                _ => {}
3915            }
3916        }
3917    }
3918
3919    #[test]
3920    fn test_hostname_resolution_address_removed() {
3921        // Create a mDNS server
3922        let server = ServiceDaemon::new().expect("Failed to create server");
3923        let hostname = "addr_remove_host._tcp.local.";
3924        let service_ip_addr = my_ip_interfaces(false)
3925            .iter()
3926            .find(|iface| iface.ip().is_ipv4())
3927            .map(|iface| iface.ip())
3928            .unwrap();
3929
3930        let mut my_service = ServiceInfo::new(
3931            "_host_res_test._tcp.local.",
3932            "my_instance",
3933            hostname,
3934            &service_ip_addr,
3935            1234,
3936            None,
3937        )
3938        .expect("invalid service info");
3939
3940        // Set a short TTL for addresses for testing.
3941        let addr_ttl = 2;
3942        my_service._set_host_ttl(addr_ttl); // Expire soon
3943
3944        server.register(my_service).unwrap();
3945
3946        // Create a mDNS client for resolving the hostname.
3947        let client = ServiceDaemon::new().expect("Failed to create client");
3948        let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3949        let resolved = loop {
3950            match event_receiver.recv() {
3951                Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3952                    assert!(found_hostname == hostname);
3953                    assert!(addresses.contains(&service_ip_addr));
3954                    println!("address found: {:?}", &addresses);
3955                    break true;
3956                }
3957                Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3958                Ok(_event) => {}
3959                Err(_) => break false,
3960            }
3961        };
3962
3963        assert!(resolved);
3964
3965        // Shutdown the server so no more responses / refreshes for addresses.
3966        server.shutdown().unwrap();
3967
3968        // Wait till hostname address record expires, with 1 second grace period.
3969        let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3970        let removed = loop {
3971            match event_receiver.recv_timeout(timeout) {
3972                Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3973                    assert!(removed_host == hostname);
3974                    assert!(addresses.contains(&service_ip_addr));
3975
3976                    println!(
3977                        "address removed: hostname: {} addresses: {:?}",
3978                        &hostname, &addresses
3979                    );
3980                    break true;
3981                }
3982                Ok(_event) => {}
3983                Err(_) => {
3984                    break false;
3985                }
3986            }
3987        };
3988
3989        assert!(removed);
3990
3991        client.shutdown().unwrap();
3992    }
3993
3994    #[test]
3995    fn test_refresh_ptr() {
3996        // construct service info
3997        let service_type = "_refresh-ptr._udp.local.";
3998        let instance = "test_instance";
3999        let host_name = "refresh_ptr_host.local.";
4000        let service_ip_addr = my_ip_interfaces(false)
4001            .iter()
4002            .find(|iface| iface.ip().is_ipv4())
4003            .map(|iface| iface.ip())
4004            .unwrap();
4005
4006        let mut my_service = ServiceInfo::new(
4007            service_type,
4008            instance,
4009            host_name,
4010            &service_ip_addr,
4011            5023,
4012            None,
4013        )
4014        .unwrap();
4015
4016        let new_ttl = 3; // for testing only.
4017        my_service._set_other_ttl(new_ttl);
4018
4019        // register my service
4020        let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4021        let result = mdns_server.register(my_service);
4022        assert!(result.is_ok());
4023
4024        let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4025        let browse_chan = mdns_client.browse(service_type).unwrap();
4026        let timeout = Duration::from_millis(1500); // Give at least 1 second for the service probing.
4027        let mut resolved = false;
4028
4029        // resolve the service first.
4030        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4031            match event {
4032                ServiceEvent::ServiceResolved(info) => {
4033                    resolved = true;
4034                    println!("Resolved a service of {}", &info.get_fullname());
4035                    break;
4036                }
4037                _ => {}
4038            }
4039        }
4040
4041        assert!(resolved);
4042
4043        // wait over 80% of TTL, and refresh PTR should be sent out.
4044        let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4045        while let Ok(event) = browse_chan.recv_timeout(timeout) {
4046            println!("event: {:?}", &event);
4047        }
4048
4049        // verify refresh counter.
4050        let metrics_chan = mdns_client.get_metrics().unwrap();
4051        let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4052        let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4053        assert_eq!(ptr_refresh_counter, 1);
4054        let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4055        assert_eq!(srvtxt_refresh_counter, 1);
4056
4057        // Exit the server so that no more responses.
4058        mdns_server.shutdown().unwrap();
4059        mdns_client.shutdown().unwrap();
4060    }
4061
4062    #[test]
4063    fn test_name_change() {
4064        assert_eq!(name_change("foo.local."), "foo (2).local.");
4065        assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4066        assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4067        assert_eq!(name_change("foo"), "foo (2)");
4068        assert_eq!(name_change("foo (2)"), "foo (3)");
4069        assert_eq!(name_change(""), " (2)");
4070
4071        // Additional edge cases
4072        assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); // Invalid number
4073        assert_eq!(name_change("foo (2"), "foo (2 (2)"); // Missing closing parenthesis
4074        assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); // Extra text after number
4075    }
4076
4077    #[test]
4078    fn test_hostname_change() {
4079        assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4080        assert_eq!(hostname_change("foo"), "foo-2");
4081        assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4082        assert_eq!(hostname_change("foo-9"), "foo-10");
4083        assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4084    }
4085
4086    #[test]
4087    fn test_add_answer_txt_ttl() {
4088        // construct a simple service info
4089        let service_type = "_test_add_answer._udp.local.";
4090        let instance = "test_instance";
4091        let host_name = "add_answer_host.local.";
4092        let service_intf = my_ip_interfaces(false)
4093            .into_iter()
4094            .find(|iface| iface.ip().is_ipv4())
4095            .unwrap();
4096        let service_ip_addr = service_intf.ip();
4097        let my_service = ServiceInfo::new(
4098            service_type,
4099            instance,
4100            host_name,
4101            &service_ip_addr,
4102            5023,
4103            None,
4104        )
4105        .unwrap();
4106
4107        // construct a DnsOutgoing message
4108        let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4109
4110        // Construct a dummy DnsIncoming message
4111        let mut dummy_data = out.to_data_on_wire();
4112        let interface_id = InterfaceId::from(&service_intf);
4113        let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4114
4115        // Add an answer of TXT type for the service.
4116        add_answer_of_service(
4117            &mut out,
4118            &incoming,
4119            instance,
4120            &my_service,
4121            RRType::TXT,
4122            &service_intf,
4123        );
4124
4125        // Check if the answer was added correctly
4126        assert!(
4127            out.answers_count() > 0,
4128            "No answers added to the outgoing message"
4129        );
4130
4131        // Check if the first answer is of type TXT
4132        let answer = out._answers().first().unwrap();
4133        assert_eq!(answer.0.get_type(), RRType::TXT);
4134
4135        // Check TTL is set properly for the TXT record
4136        assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4137    }
4138}