1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42 Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50 cmp::{self, Reverse},
51 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52 fmt, io,
53 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54 str, thread,
55 time::Duration,
56 vec,
57};
58
59pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78#[derive(Debug)]
80pub enum UnregisterStatus {
81 OK,
83 NotFound,
85}
86
87#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91 Running,
93
94 Shutdown,
96}
97
98#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102 Register,
103 RegisterResend,
104 Unregister,
105 UnregisterResend,
106 Browse,
107 ResolveHostname,
108 Respond,
109 CacheRefreshPTR,
110 CacheRefreshSrvTxt,
111 CacheRefreshAddr,
112 KnownAnswerSuppression,
113 CachedPTR,
114 CachedSRV,
115 CachedAddr,
116 CachedTxt,
117 CachedNSec,
118 CachedSubtype,
119 DnsRegistryProbe,
120 DnsRegistryActive,
121 DnsRegistryTimer,
122 DnsRegistryNameChange,
123 Timer,
124}
125
126impl fmt::Display for Counter {
127 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128 match self {
129 Self::Register => write!(f, "register"),
130 Self::RegisterResend => write!(f, "register-resend"),
131 Self::Unregister => write!(f, "unregister"),
132 Self::UnregisterResend => write!(f, "unregister-resend"),
133 Self::Browse => write!(f, "browse"),
134 Self::ResolveHostname => write!(f, "resolve-hostname"),
135 Self::Respond => write!(f, "respond"),
136 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140 Self::CachedPTR => write!(f, "cached-ptr"),
141 Self::CachedSRV => write!(f, "cached-srv"),
142 Self::CachedAddr => write!(f, "cached-addr"),
143 Self::CachedTxt => write!(f, "cached-txt"),
144 Self::CachedNSec => write!(f, "cached-nsec"),
145 Self::CachedSubtype => write!(f, "cached-subtype"),
146 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150 Self::Timer => write!(f, "timer"),
151 }
152 }
153}
154
155struct MyUdpSocket {
160 pktinfo: PktInfoUdpSocket,
163
164 mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171 let std_sock = pktinfo.try_clone_std()?;
172 let mio = MioUdpSocket::from_std(std_sock);
173
174 Ok(Self { pktinfo, mio })
175 }
176}
177
178impl Source for MyUdpSocket {
180 fn register(
181 &mut self,
182 registry: &Registry,
183 token: Token,
184 interests: Interest,
185 ) -> io::Result<()> {
186 self.mio.register(registry, token, interests)
187 }
188
189 fn reregister(
190 &mut self,
191 registry: &Registry,
192 token: Token,
193 interests: Interest,
194 ) -> io::Result<()> {
195 self.mio.reregister(registry, token, interests)
196 }
197
198 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199 self.mio.deregister(registry)
200 }
201}
202
203pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
215pub struct ServiceDaemon {
216 sender: Sender<Command>,
218
219 signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228 pub fn new() -> Result<Self> {
233 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237 let signal_sock = UdpSocket::bind(signal_addr)
238 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240 let signal_addr = signal_sock
242 .local_addr()
243 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245 signal_sock
247 .set_nonblocking(true)
248 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252 let (sender, receiver) = bounded(100);
253
254 let mio_sock = MioUdpSocket::from_std(signal_sock);
256 thread::Builder::new()
257 .name("mDNS_daemon".to_string())
258 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261 Ok(Self {
262 sender,
263 signal_addr,
264 })
265 }
266
267 fn send_cmd(&self, cmd: Command) -> Result<()> {
270 let cmd_name = cmd.to_string();
271
272 self.sender.try_send(cmd).map_err(|e| match e {
274 TrySendError::Full(_) => Error::Again,
275 e => e_fmt!("flume::channel::send failed: {}", e),
276 })?;
277
278 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280 let socket = UdpSocket::bind(addr)
281 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282 socket
283 .send_to(cmd_name.as_bytes(), self.signal_addr)
284 .map_err(|e| {
285 e_fmt!(
286 "signal socket send_to {} ({}) failed: {}",
287 self.signal_addr,
288 cmd_name,
289 e
290 )
291 })?;
292
293 Ok(())
294 }
295
296 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307 check_domain_suffix(service_type)?;
308
309 let (resp_s, resp_r) = bounded(10);
310 self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
311 Ok(resp_r)
312 }
313
314 pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
323 check_domain_suffix(service_type)?;
324
325 let (resp_s, resp_r) = bounded(10);
326 self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
327 Ok(resp_r)
328 }
329
330 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
335 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
336 }
337
338 pub fn resolve_hostname(
346 &self,
347 hostname: &str,
348 timeout: Option<u64>,
349 ) -> Result<Receiver<HostnameResolutionEvent>> {
350 check_hostname(hostname)?;
351 let (resp_s, resp_r) = bounded(10);
352 self.send_cmd(Command::ResolveHostname(
353 hostname.to_string(),
354 1,
355 resp_s,
356 timeout,
357 ))?;
358 Ok(resp_r)
359 }
360
361 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
366 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
367 }
368
369 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
377 check_service_name(service_info.get_fullname())?;
378 check_hostname(service_info.get_hostname())?;
379
380 self.send_cmd(Command::Register(service_info))
381 }
382
383 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
391 let (resp_s, resp_r) = bounded(1);
392 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
393 Ok(resp_r)
394 }
395
396 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
400 let (resp_s, resp_r) = bounded(100);
401 self.send_cmd(Command::Monitor(resp_s))?;
402 Ok(resp_r)
403 }
404
405 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
410 let (resp_s, resp_r) = bounded(1);
411 self.send_cmd(Command::Exit(resp_s))?;
412 Ok(resp_r)
413 }
414
415 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
421 let (resp_s, resp_r) = bounded(1);
422
423 if self.sender.is_disconnected() {
424 resp_s
425 .send(DaemonStatus::Shutdown)
426 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
427 } else {
428 self.send_cmd(Command::GetStatus(resp_s))?;
429 }
430
431 Ok(resp_r)
432 }
433
434 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
439 let (resp_s, resp_r) = bounded(1);
440 self.send_cmd(Command::GetMetrics(resp_s))?;
441 Ok(resp_r)
442 }
443
444 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
451 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
454 return Err(Error::Msg(format!(
455 "service name length max {len_max} is too large"
456 )));
457 }
458
459 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
460 }
461
462 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
468 let interval_in_millis = interval_in_secs as u64 * 1000;
469 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
470 interval_in_millis,
471 )))
472 }
473
474 pub fn get_ip_check_interval(&self) -> Result<u32> {
476 let (resp_s, resp_r) = bounded(1);
477 self.send_cmd(Command::GetOption(resp_s))?;
478
479 let option = resp_r
480 .recv_timeout(Duration::from_secs(10))
481 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
482 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
483 Ok(ip_check_interval_in_secs as u32)
484 }
485
486 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
493 let if_kind_vec = if_kind.into_vec();
494 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
495 if_kind_vec.kinds,
496 )))
497 }
498
499 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
506 let if_kind_vec = if_kind.into_vec();
507 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
508 if_kind_vec.kinds,
509 )))
510 }
511
512 pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
523 self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
524 }
525
526 #[cfg(test)]
527 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
528 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
529 ifname.to_string(),
530 )))
531 }
532
533 #[cfg(test)]
534 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
535 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
536 ifname.to_string(),
537 )))
538 }
539
540 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
556 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
557 }
558
559 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
575 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
576 }
577
578 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
591 self.send_cmd(Command::Verify(instance_fullname, timeout))
592 }
593
594 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
595 let mut zc = Zeroconf::new(signal_sock, poller);
596
597 if let Some(cmd) = zc.run(receiver) {
598 match cmd {
599 Command::Exit(resp_s) => {
600 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
603 debug!("exit: failed to send response of shutdown: {}", e);
604 }
605 }
606 _ => {
607 debug!("Unexpected command: {:?}", cmd);
608 }
609 }
610 }
611 }
612}
613
614fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
616 let intf_ip = &intf.ip();
619 match intf_ip {
620 IpAddr::V4(ip) => {
621 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
622 let sock = new_socket(addr.into(), true)?;
623
624 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
626 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
627
628 sock.set_multicast_if_v4(ip)
630 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
631
632 sock.set_multicast_ttl_v4(255)
637 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
638
639 if !should_loop {
640 sock.set_multicast_loop_v4(false)
641 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
642 }
643
644 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
646 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
647 for packet in test_packets {
648 sock.send_to(&packet, &multicast_addr)
649 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
650 }
651 MyUdpSocket::new(sock)
652 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
653 }
654 IpAddr::V6(ip) => {
655 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
656 let sock = new_socket(addr.into(), true)?;
657
658 let if_index = intf.index.unwrap_or(0);
659
660 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
662 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
663
664 sock.set_multicast_if_v6(if_index)
666 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
667
668 MyUdpSocket::new(sock)
673 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
674 }
675 }
676}
677
678fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
681 let domain = match addr {
682 SocketAddr::V4(_) => socket2::Domain::IPV4,
683 SocketAddr::V6(_) => socket2::Domain::IPV6,
684 };
685
686 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
687
688 fd.set_reuse_address(true)
689 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
690 #[cfg(unix)] fd.set_reuse_port(true)
692 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
693
694 if non_block {
695 fd.set_nonblocking(true)
696 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
697 }
698
699 fd.bind(&addr.into())
700 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
701
702 trace!("new socket bind to {}", &addr);
703 Ok(fd)
704}
705
706struct ReRun {
708 next_time: u64,
710 command: Command,
711}
712
713#[derive(Debug, Clone)]
717#[non_exhaustive]
718pub enum IfKind {
719 All,
721
722 IPv4,
724
725 IPv6,
727
728 Name(String),
730
731 Addr(IpAddr),
733
734 LoopbackV4,
739
740 LoopbackV6,
742}
743
744impl IfKind {
745 fn matches(&self, intf: &Interface) -> bool {
747 match self {
748 Self::All => true,
749 Self::IPv4 => intf.ip().is_ipv4(),
750 Self::IPv6 => intf.ip().is_ipv6(),
751 Self::Name(ifname) => ifname == &intf.name,
752 Self::Addr(addr) => addr == &intf.ip(),
753 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
754 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
755 }
756 }
757}
758
759impl From<&str> for IfKind {
762 fn from(val: &str) -> Self {
763 Self::Name(val.to_string())
764 }
765}
766
767impl From<&String> for IfKind {
768 fn from(val: &String) -> Self {
769 Self::Name(val.to_string())
770 }
771}
772
773impl From<IpAddr> for IfKind {
775 fn from(val: IpAddr) -> Self {
776 Self::Addr(val)
777 }
778}
779
780pub struct IfKindVec {
782 kinds: Vec<IfKind>,
783}
784
785pub trait IntoIfKindVec {
787 fn into_vec(self) -> IfKindVec;
788}
789
790impl<T: Into<IfKind>> IntoIfKindVec for T {
791 fn into_vec(self) -> IfKindVec {
792 let if_kind: IfKind = self.into();
793 IfKindVec {
794 kinds: vec![if_kind],
795 }
796 }
797}
798
799impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
800 fn into_vec(self) -> IfKindVec {
801 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
802 IfKindVec { kinds }
803 }
804}
805
806struct IfSelection {
808 if_kind: IfKind,
810
811 selected: bool,
813}
814
815struct Zeroconf {
817 my_intfs: HashMap<u32, MyIntf>,
819
820 ipv4_sock: Option<MyUdpSocket>,
822
823 ipv6_sock: Option<MyUdpSocket>,
825
826 my_services: HashMap<String, ServiceInfo>,
828
829 cache: DnsCache,
831
832 dns_registry_map: HashMap<u32, DnsRegistry>,
834
835 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
846
847 counters: Metrics,
848
849 poller: Poll,
851
852 monitors: Vec<Sender<DaemonEvent>>,
854
855 service_name_len_max: u8,
857
858 ip_check_interval: u64,
860
861 if_selections: Vec<IfSelection>,
863
864 signal_sock: MioUdpSocket,
866
867 timers: BinaryHeap<Reverse<u64>>,
873
874 status: DaemonStatus,
875
876 pending_resolves: HashSet<String>,
878
879 resolved: HashSet<String>,
881
882 multicast_loop_v4: bool,
883
884 multicast_loop_v6: bool,
885
886 accept_unsolicited: bool,
887
888 #[cfg(test)]
889 test_down_interfaces: HashSet<String>,
890}
891
892fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
894 let intf_ip = &intf.ip();
895 match intf_ip {
896 IpAddr::V4(ip) => {
897 debug!("join multicast group V4 on {} addr {ip}", intf.name);
899 my_sock
900 .join_multicast_v4(&GROUP_ADDR_V4, ip)
901 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
902 }
903 IpAddr::V6(ip) => {
904 let if_index = intf.index.unwrap_or(0);
905 debug!(
907 "join multicast group V6 on {} addr {ip} with index {if_index}",
908 intf.name
909 );
910 my_sock
911 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
912 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
913 }
914 }
915 Ok(())
916}
917
918impl Zeroconf {
919 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
920 let my_ifaddrs = my_ip_interfaces(false);
922
923 let mut my_intfs = HashMap::new();
927 let mut dns_registry_map = HashMap::new();
928
929 let mut ipv4_sock = None;
932 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
933 match new_socket(addr.into(), true) {
934 Ok(sock) => {
935 sock.set_multicast_ttl_v4(255)
940 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
941 .ok();
942
943 ipv4_sock = match MyUdpSocket::new(sock) {
945 Ok(s) => Some(s),
946 Err(e) => {
947 debug!("failed to create IPv4 MyUdpSocket: {e}");
948 None
949 }
950 };
951 }
952 Err(e) => debug!("failed to create IPv4 socket: {e}"),
954 }
955
956 let mut ipv6_sock = None;
957 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
958 match new_socket(addr.into(), true) {
959 Ok(sock) => {
960 sock.set_multicast_hops_v6(255)
964 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
965 .ok();
966
967 ipv6_sock = match MyUdpSocket::new(sock) {
969 Ok(s) => Some(s),
970 Err(e) => {
971 debug!("failed to create IPv6 MyUdpSocket: {e}");
972 None
973 }
974 };
975 }
976 Err(e) => debug!("failed to create IPv6 socket: {e}"),
977 }
978
979 for intf in my_ifaddrs {
981 let sock_opt = if intf.ip().is_ipv4() {
982 &ipv4_sock
983 } else {
984 &ipv6_sock
985 };
986 let Some(sock) = sock_opt else {
987 debug!(
988 "no socket available for interface {} with addr {}. Skipped.",
989 intf.name,
990 intf.ip()
991 );
992 continue;
993 };
994
995 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
996 debug!(
997 "config socket to join multicast: {}: {e}. Skipped.",
998 &intf.ip()
999 );
1000 }
1001
1002 let if_index = intf.index.unwrap_or(0);
1003
1004 dns_registry_map
1006 .entry(if_index)
1007 .or_insert_with(DnsRegistry::new);
1008
1009 my_intfs
1010 .entry(if_index)
1011 .and_modify(|v: &mut MyIntf| {
1012 v.addrs.insert(intf.addr.clone());
1013 })
1014 .or_insert(MyIntf {
1015 name: intf.name.clone(),
1016 index: if_index,
1017 addrs: HashSet::from([intf.addr]),
1018 });
1019 }
1020
1021 let monitors = Vec::new();
1022 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1023 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1024
1025 let timers = BinaryHeap::new();
1026
1027 let if_selections = vec![
1029 IfSelection {
1030 if_kind: IfKind::LoopbackV4,
1031 selected: false,
1032 },
1033 IfSelection {
1034 if_kind: IfKind::LoopbackV6,
1035 selected: false,
1036 },
1037 ];
1038
1039 let status = DaemonStatus::Running;
1040
1041 Self {
1042 my_intfs,
1043 ipv4_sock,
1044 ipv6_sock,
1045 my_services: HashMap::new(),
1047 cache: DnsCache::new(),
1048 dns_registry_map,
1049 hostname_resolvers: HashMap::new(),
1050 service_queriers: HashMap::new(),
1051 retransmissions: Vec::new(),
1052 counters: HashMap::new(),
1053 poller,
1054 monitors,
1055 service_name_len_max,
1056 ip_check_interval,
1057 if_selections,
1058 signal_sock,
1059 timers,
1060 status,
1061 pending_resolves: HashSet::new(),
1062 resolved: HashSet::new(),
1063 multicast_loop_v4: true,
1064 multicast_loop_v6: true,
1065 accept_unsolicited: false,
1066
1067 #[cfg(test)]
1068 test_down_interfaces: HashSet::new(),
1069 }
1070 }
1071
1072 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1081 if let Err(e) = self.poller.registry().register(
1083 &mut self.signal_sock,
1084 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1085 mio::Interest::READABLE,
1086 ) {
1087 debug!("failed to add signal socket to the poller: {}", e);
1088 return None;
1089 }
1090
1091 if let Some(sock) = self.ipv4_sock.as_mut() {
1092 if let Err(e) = self.poller.registry().register(
1093 sock,
1094 mio::Token(IPV4_SOCK_EVENT_KEY),
1095 mio::Interest::READABLE,
1096 ) {
1097 debug!("failed to register ipv4 socket: {}", e);
1098 return None;
1099 }
1100 }
1101
1102 if let Some(sock) = self.ipv6_sock.as_mut() {
1103 if let Err(e) = self.poller.registry().register(
1104 sock,
1105 mio::Token(IPV6_SOCK_EVENT_KEY),
1106 mio::Interest::READABLE,
1107 ) {
1108 debug!("failed to register ipv6 socket: {}", e);
1109 return None;
1110 }
1111 }
1112
1113 let mut next_ip_check = if self.ip_check_interval > 0 {
1115 current_time_millis() + self.ip_check_interval
1116 } else {
1117 0
1118 };
1119
1120 if next_ip_check > 0 {
1121 self.add_timer(next_ip_check);
1122 }
1123
1124 let mut events = mio::Events::with_capacity(1024);
1127 loop {
1128 let now = current_time_millis();
1129
1130 let earliest_timer = self.peek_earliest_timer();
1131 let timeout = earliest_timer.map(|timer| {
1132 let millis = if timer > now { timer - now } else { 1 };
1134 Duration::from_millis(millis)
1135 });
1136
1137 events.clear();
1139 match self.poller.poll(&mut events, timeout) {
1140 Ok(_) => self.handle_poller_events(&events),
1141 Err(e) => debug!("failed to select from sockets: {}", e),
1142 }
1143
1144 let now = current_time_millis();
1145
1146 self.pop_timers_till(now);
1148
1149 for hostname in self
1151 .hostname_resolvers
1152 .clone()
1153 .into_iter()
1154 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1155 .map(|(hostname, _)| hostname)
1156 {
1157 trace!("hostname resolver timeout for {}", &hostname);
1158 call_hostname_resolution_listener(
1159 &self.hostname_resolvers,
1160 &hostname,
1161 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1162 );
1163 call_hostname_resolution_listener(
1164 &self.hostname_resolvers,
1165 &hostname,
1166 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1167 );
1168 self.hostname_resolvers.remove(&hostname);
1169 }
1170
1171 while let Ok(command) = receiver.try_recv() {
1173 if matches!(command, Command::Exit(_)) {
1174 self.status = DaemonStatus::Shutdown;
1175 return Some(command);
1176 }
1177 self.exec_command(command, false);
1178 }
1179
1180 let mut i = 0;
1182 while i < self.retransmissions.len() {
1183 if now >= self.retransmissions[i].next_time {
1184 let rerun = self.retransmissions.remove(i);
1185 self.exec_command(rerun.command, true);
1186 } else {
1187 i += 1;
1188 }
1189 }
1190
1191 self.refresh_active_services();
1193
1194 let mut query_count = 0;
1196 for (hostname, _sender) in self.hostname_resolvers.iter() {
1197 for (hostname, ip_addr) in
1198 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1199 {
1200 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1201 query_count += 1;
1202 }
1203 }
1204
1205 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1206
1207 let now = current_time_millis();
1209
1210 let expired_services = self.cache.evict_expired_services(now);
1212 if !expired_services.is_empty() {
1213 debug!(
1214 "run: send {} service removal to listeners",
1215 expired_services.len()
1216 );
1217 self.notify_service_removal(expired_services);
1218 }
1219
1220 let expired_addrs = self.cache.evict_expired_addr(now);
1222 for (hostname, addrs) in expired_addrs {
1223 call_hostname_resolution_listener(
1224 &self.hostname_resolvers,
1225 &hostname,
1226 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1227 );
1228 let instances = self.cache.get_instances_on_host(&hostname);
1229 let instance_set: HashSet<String> = instances.into_iter().collect();
1230 self.resolve_updated_instances(&instance_set);
1231 }
1232
1233 self.probing_handler();
1235
1236 if now >= next_ip_check && next_ip_check > 0 {
1238 next_ip_check = now + self.ip_check_interval;
1239 self.add_timer(next_ip_check);
1240
1241 self.check_ip_changes();
1242 }
1243 }
1244 }
1245
1246 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1247 match daemon_opt {
1248 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1249 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1250 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1251 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1252 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1253 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1254 DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1255 #[cfg(test)]
1256 DaemonOption::TestDownInterface(ifname) => {
1257 self.test_down_interfaces.insert(ifname);
1258 }
1259 #[cfg(test)]
1260 DaemonOption::TestUpInterface(ifname) => {
1261 self.test_down_interfaces.remove(&ifname);
1262 }
1263 }
1264 }
1265
1266 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1267 debug!("enable_interface: {:?}", kinds);
1268 for if_kind in kinds {
1269 self.if_selections.push(IfSelection {
1270 if_kind,
1271 selected: true,
1272 });
1273 }
1274
1275 self.apply_intf_selections(my_ip_interfaces(true));
1276 }
1277
1278 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1279 debug!("disable_interface: {:?}", kinds);
1280 for if_kind in kinds {
1281 self.if_selections.push(IfSelection {
1282 if_kind,
1283 selected: false,
1284 });
1285 }
1286
1287 self.apply_intf_selections(my_ip_interfaces(true));
1288 }
1289
1290 fn set_multicast_loop_v4(&mut self, on: bool) {
1291 let Some(sock) = self.ipv4_sock.as_mut() else {
1292 return;
1293 };
1294 self.multicast_loop_v4 = on;
1295 sock.pktinfo
1296 .set_multicast_loop_v4(on)
1297 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1298 .unwrap();
1299 }
1300
1301 fn set_multicast_loop_v6(&mut self, on: bool) {
1302 let Some(sock) = self.ipv6_sock.as_mut() else {
1303 return;
1304 };
1305 self.multicast_loop_v6 = on;
1306 sock.pktinfo
1307 .set_multicast_loop_v6(on)
1308 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1309 .unwrap();
1310 }
1311
1312 fn set_accept_unsolicited(&mut self, accept: bool) {
1313 self.accept_unsolicited = accept;
1314 }
1315
1316 fn notify_monitors(&mut self, event: DaemonEvent) {
1317 self.monitors.retain(|sender| {
1319 if let Err(e) = sender.try_send(event.clone()) {
1320 debug!("notify_monitors: try_send: {}", &e);
1321 if matches!(e, TrySendError::Disconnected(_)) {
1322 return false; }
1324 }
1325 true
1326 });
1327 }
1328
1329 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1331 for (_, service_info) in self.my_services.iter_mut() {
1332 if service_info.is_addr_auto() {
1333 service_info.remove_ipaddr(addr);
1334 }
1335 }
1336 }
1337
1338 fn add_timer(&mut self, next_time: u64) {
1339 self.timers.push(Reverse(next_time));
1340 }
1341
1342 fn peek_earliest_timer(&self) -> Option<u64> {
1343 self.timers.peek().map(|Reverse(v)| *v)
1344 }
1345
1346 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1347 self.timers.pop().map(|Reverse(v)| v)
1348 }
1349
1350 fn pop_timers_till(&mut self, now: u64) {
1352 while let Some(Reverse(v)) = self.timers.peek() {
1353 if *v > now {
1354 break;
1355 }
1356 self.timers.pop();
1357 }
1358 }
1359
1360 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1362 let intf_count = interfaces.len();
1363 let mut intf_selections = vec![true; intf_count];
1364
1365 for selection in self.if_selections.iter() {
1367 for i in 0..intf_count {
1369 if selection.if_kind.matches(&interfaces[i]) {
1370 intf_selections[i] = selection.selected;
1371 }
1372 }
1373 }
1374
1375 let mut selected_addrs = HashSet::new();
1376 for i in 0..intf_count {
1377 if intf_selections[i] {
1378 selected_addrs.insert(interfaces[i].addr.ip());
1379 }
1380 }
1381
1382 selected_addrs
1383 }
1384
1385 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1390 let intf_count = interfaces.len();
1392 let mut intf_selections = vec![true; intf_count];
1393
1394 for selection in self.if_selections.iter() {
1396 for i in 0..intf_count {
1398 if selection.if_kind.matches(&interfaces[i]) {
1399 intf_selections[i] = selection.selected;
1400 }
1401 }
1402 }
1403
1404 for (idx, intf) in interfaces.into_iter().enumerate() {
1406 if intf_selections[idx] {
1407 self.add_interface(intf);
1409 } else {
1410 self.del_interface(&intf);
1412 }
1413 }
1414 }
1415
1416 fn del_ip(&mut self, ip: IpAddr) {
1417 self.del_addr_in_my_services(&ip);
1418 self.notify_monitors(DaemonEvent::IpDel(ip));
1419 }
1420
1421 fn check_ip_changes(&mut self) {
1423 let my_ifaddrs = my_ip_interfaces(true);
1425
1426 #[cfg(test)]
1427 let my_ifaddrs: Vec<_> = my_ifaddrs
1428 .into_iter()
1429 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1430 .collect();
1431
1432 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1433 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1434 let if_index = intf.index.unwrap_or(0);
1435 acc.entry(if_index).or_default().push(&intf.addr);
1436 acc
1437 });
1438
1439 let mut deleted_intfs = Vec::new();
1440 let mut deleted_ips = Vec::new();
1441
1442 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1443 let mut last_ipv4 = None;
1444 let mut last_ipv6 = None;
1445
1446 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1447 my_intf.addrs.retain(|addr| {
1448 if current_addrs.contains(&addr) {
1449 true
1450 } else {
1451 match addr.ip() {
1452 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1453 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1454 }
1455 deleted_ips.push(addr.ip());
1456 false
1457 }
1458 });
1459 if my_intf.addrs.is_empty() {
1460 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1461 }
1462 } else {
1463 debug!(
1465 "check_ip_changes: interface {} ({}) no longer exists, removing",
1466 my_intf.name, if_index
1467 );
1468 for addr in my_intf.addrs.iter() {
1469 match addr.ip() {
1470 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1471 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1472 }
1473 deleted_ips.push(addr.ip())
1474 }
1475 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1476 }
1477 }
1478
1479 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1480 debug!(
1481 "check_ip_changes: {} deleted ips {} deleted intfs",
1482 deleted_ips.len(),
1483 deleted_intfs.len()
1484 );
1485 }
1486
1487 for ip in deleted_ips {
1488 self.del_ip(ip);
1489 }
1490
1491 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1492 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1493 continue;
1494 };
1495
1496 if let Some(ipv4) = last_ipv4 {
1497 debug!("leave multicast for {ipv4}");
1498 if let Some(sock) = self.ipv4_sock.as_mut() {
1499 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1500 debug!("leave multicast group for addr {ipv4}: {e}");
1501 }
1502 }
1503 }
1504
1505 if let Some(ipv6) = last_ipv6 {
1506 debug!("leave multicast for {ipv6}");
1507 if let Some(sock) = self.ipv6_sock.as_mut() {
1508 if let Err(e) = sock
1509 .pktinfo
1510 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1511 {
1512 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1513 }
1514 }
1515 }
1516
1517 let intf_id = InterfaceId {
1519 name: my_intf.name.to_string(),
1520 index: my_intf.index,
1521 };
1522 let removed_instances = self.cache.remove_records_on_intf(intf_id);
1523 self.notify_service_removal(removed_instances);
1524 }
1525
1526 self.apply_intf_selections(my_ifaddrs);
1528 }
1529
1530 fn del_interface(&mut self, intf: &Interface) {
1531 let if_index = intf.index.unwrap_or(0);
1532 trace!(
1533 "del_interface: {} ({if_index}) addr {}",
1534 intf.name,
1535 intf.ip()
1536 );
1537
1538 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1539 debug!("del_interface: interface {} not found", intf.name);
1540 return;
1541 };
1542
1543 let mut ip_removed = false;
1544
1545 if my_intf.addrs.remove(&intf.addr) {
1546 ip_removed = true;
1547
1548 match intf.addr.ip() {
1549 IpAddr::V4(ipv4) => {
1550 if my_intf.next_ifaddr_v4().is_none() {
1551 if let Some(sock) = self.ipv4_sock.as_mut() {
1552 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1553 debug!("leave multicast group for addr {ipv4}: {e}");
1554 }
1555 }
1556 }
1557 }
1558
1559 IpAddr::V6(ipv6) => {
1560 if my_intf.next_ifaddr_v6().is_none() {
1561 if let Some(sock) = self.ipv6_sock.as_mut() {
1562 if let Err(e) =
1563 sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1564 {
1565 debug!("leave multicast group for addr {ipv6}: {e}");
1566 }
1567 }
1568 }
1569 }
1570 }
1571
1572 if my_intf.addrs.is_empty() {
1573 debug!("del_interface: removing interface {}", intf.name);
1575 self.my_intfs.remove(&if_index);
1576 self.dns_registry_map.remove(&if_index);
1577 self.cache.remove_addrs_on_disabled_intf(if_index);
1578 }
1579 }
1580
1581 if ip_removed {
1582 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1584 self.del_addr_in_my_services(&intf.ip());
1586 }
1587 }
1588
1589 fn add_interface(&mut self, intf: Interface) {
1590 let sock_opt = if intf.ip().is_ipv4() {
1591 &self.ipv4_sock
1592 } else {
1593 &self.ipv6_sock
1594 };
1595
1596 let Some(sock) = sock_opt else {
1597 debug!(
1598 "add_interface: no socket available for interface {} with addr {}. Skipped.",
1599 intf.name,
1600 intf.ip()
1601 );
1602 return;
1603 };
1604
1605 let if_index = intf.index.unwrap_or(0);
1606 let mut new_addr = false;
1607
1608 match self.my_intfs.entry(if_index) {
1609 Entry::Occupied(mut entry) => {
1610 let my_intf = entry.get_mut();
1612 if !my_intf.addrs.contains(&intf.addr) {
1613 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1614 debug!("add_interface: socket_config {}: {e}", &intf.name);
1615 }
1616 my_intf.addrs.insert(intf.addr.clone());
1617 new_addr = true;
1618 }
1619 }
1620 Entry::Vacant(entry) => {
1621 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1622 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1623 return;
1624 }
1625
1626 new_addr = true;
1627 let new_intf = MyIntf {
1628 name: intf.name.clone(),
1629 index: if_index,
1630 addrs: HashSet::from([intf.addr.clone()]),
1631 };
1632 entry.insert(new_intf);
1633 }
1634 }
1635
1636 if !new_addr {
1637 trace!("add_interface: interface {} already exists", &intf.name);
1638 return;
1639 }
1640
1641 debug!("add new interface {}: {}", intf.name, intf.ip());
1642
1643 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1644 debug!("add_interface: cannot find if_index {if_index}");
1645 return;
1646 };
1647
1648 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1649 Some(registry) => registry,
1650 None => self
1651 .dns_registry_map
1652 .entry(if_index)
1653 .or_insert_with(DnsRegistry::new),
1654 };
1655
1656 for (_, service_info) in self.my_services.iter_mut() {
1657 if service_info.is_addr_auto() {
1658 let new_ip = intf.ip();
1659 service_info.insert_ipaddr(new_ip);
1660
1661 if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1662 debug!(
1663 "Announce service {} on {}",
1664 service_info.get_fullname(),
1665 intf.ip()
1666 );
1667 service_info.set_status(if_index, ServiceStatus::Announced);
1668 } else {
1669 for timer in dns_registry.new_timers.drain(..) {
1670 self.timers.push(Reverse(timer));
1671 }
1672 service_info.set_status(if_index, ServiceStatus::Probing);
1673 }
1674 }
1675 }
1676
1677 let mut browse_reruns = Vec::new();
1679 let mut i = 0;
1680 while i < self.retransmissions.len() {
1681 if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1682 browse_reruns.push(self.retransmissions.remove(i));
1683 } else {
1684 i += 1;
1685 }
1686 }
1687
1688 for rerun in browse_reruns {
1689 self.exec_command(rerun.command, true);
1690 }
1691
1692 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1694 }
1695
1696 fn register_service(&mut self, mut info: ServiceInfo) {
1705 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1707 debug!("check_service_name_length: {}", &e);
1708 self.notify_monitors(DaemonEvent::Error(e));
1709 return;
1710 }
1711
1712 if info.is_addr_auto() {
1713 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1714 for addr in selected_addrs {
1715 info.insert_ipaddr(addr);
1716 }
1717 }
1718
1719 debug!("register service {:?}", &info);
1720
1721 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1722 if !outgoing_addrs.is_empty() {
1723 self.notify_monitors(DaemonEvent::Announce(
1724 info.get_fullname().to_string(),
1725 format!("{:?}", &outgoing_addrs),
1726 ));
1727 }
1728
1729 let service_fullname = info.get_fullname().to_lowercase();
1732 self.my_services.insert(service_fullname, info);
1733 }
1734
1735 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1738 let mut outgoing_addrs = Vec::new();
1739 let mut outgoing_intfs = HashSet::new();
1740
1741 for (if_index, intf) in self.my_intfs.iter() {
1742 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1743 Some(registry) => registry,
1744 None => self
1745 .dns_registry_map
1746 .entry(*if_index)
1747 .or_insert_with(DnsRegistry::new),
1748 };
1749
1750 let mut announced = false;
1751
1752 if let Some(sock) = self.ipv4_sock.as_mut() {
1754 if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo) {
1755 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1756 outgoing_addrs.push(addr.ip());
1757 }
1758 outgoing_intfs.insert(intf.index);
1759
1760 debug!(
1761 "Announce service IPv4 {} on {}",
1762 info.get_fullname(),
1763 intf.name
1764 );
1765 announced = true;
1766 }
1767 }
1768
1769 if let Some(sock) = self.ipv6_sock.as_mut() {
1770 if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo) {
1771 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1772 outgoing_addrs.push(addr.ip());
1773 }
1774 outgoing_intfs.insert(intf.index);
1775
1776 debug!(
1777 "Announce service IPv6 {} on {}",
1778 info.get_fullname(),
1779 intf.name
1780 );
1781 announced = true;
1782 }
1783 }
1784
1785 if announced {
1786 info.set_status(intf.index, ServiceStatus::Announced);
1787 } else {
1788 for timer in dns_registry.new_timers.drain(..) {
1789 self.timers.push(Reverse(timer));
1790 }
1791 info.set_status(*if_index, ServiceStatus::Probing);
1792 }
1793 }
1794
1795 let next_time = current_time_millis() + 1000;
1799 for if_index in outgoing_intfs {
1800 self.add_retransmission(
1801 next_time,
1802 Command::RegisterResend(info.get_fullname().to_string(), if_index),
1803 );
1804 }
1805
1806 outgoing_addrs
1807 }
1808
1809 fn probing_handler(&mut self) {
1811 let now = current_time_millis();
1812
1813 for (if_index, intf) in self.my_intfs.iter() {
1814 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1815 continue;
1816 };
1817
1818 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1819
1820 if !out.questions().is_empty() {
1822 trace!("sending out probing of questions: {:?}", out.questions());
1823 if let Some(sock) = self.ipv4_sock.as_mut() {
1824 send_dns_outgoing(&out, intf, &sock.pktinfo);
1825 }
1826 if let Some(sock) = self.ipv6_sock.as_mut() {
1827 send_dns_outgoing(&out, intf, &sock.pktinfo);
1828 }
1829 }
1830
1831 let waiting_services =
1833 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1834
1835 for service_name in waiting_services {
1836 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1838 if info.get_status(*if_index) == ServiceStatus::Announced {
1839 debug!("service {} already announced", info.get_fullname());
1840 continue;
1841 }
1842
1843 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
1844 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
1845 } else {
1846 false
1847 };
1848 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
1849 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
1850 } else {
1851 false
1852 };
1853
1854 if announced_v4 || announced_v6 {
1855 let next_time = now + 1000;
1856 let command =
1857 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1858 self.retransmissions.push(ReRun { next_time, command });
1859 self.timers.push(Reverse(next_time));
1860
1861 let fullname = match dns_registry.name_changes.get(&service_name) {
1862 Some(new_name) => new_name.to_string(),
1863 None => service_name.to_string(),
1864 };
1865
1866 let mut hostname = info.get_hostname();
1867 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1868 hostname = new_name;
1869 }
1870
1871 debug!("wake up: announce service {} on {}", fullname, intf.name);
1872 notify_monitors(
1873 &mut self.monitors,
1874 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1875 );
1876
1877 info.set_status(*if_index, ServiceStatus::Announced);
1878 }
1879 }
1880 }
1881 }
1882 }
1883
1884 fn unregister_service(
1885 &self,
1886 info: &ServiceInfo,
1887 intf: &MyIntf,
1888 sock: &PktInfoUdpSocket,
1889 ) -> Vec<u8> {
1890 let is_ipv4 = sock.domain() == Domain::IPV4;
1891
1892 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1893 out.add_answer_at_time(
1894 DnsPointer::new(
1895 info.get_type(),
1896 RRType::PTR,
1897 CLASS_IN,
1898 0,
1899 info.get_fullname().to_string(),
1900 ),
1901 0,
1902 );
1903
1904 if let Some(sub) = info.get_subtype() {
1905 trace!("Adding subdomain {}", sub);
1906 out.add_answer_at_time(
1907 DnsPointer::new(
1908 sub,
1909 RRType::PTR,
1910 CLASS_IN,
1911 0,
1912 info.get_fullname().to_string(),
1913 ),
1914 0,
1915 );
1916 }
1917
1918 out.add_answer_at_time(
1919 DnsSrv::new(
1920 info.get_fullname(),
1921 CLASS_IN | CLASS_CACHE_FLUSH,
1922 0,
1923 info.get_priority(),
1924 info.get_weight(),
1925 info.get_port(),
1926 info.get_hostname().to_string(),
1927 ),
1928 0,
1929 );
1930 out.add_answer_at_time(
1931 DnsTxt::new(
1932 info.get_fullname(),
1933 CLASS_IN | CLASS_CACHE_FLUSH,
1934 0,
1935 info.generate_txt(),
1936 ),
1937 0,
1938 );
1939
1940 let if_addrs = if is_ipv4 {
1941 info.get_addrs_on_my_intf_v4(intf)
1942 } else {
1943 info.get_addrs_on_my_intf_v6(intf)
1944 };
1945
1946 if if_addrs.is_empty() {
1947 return vec![];
1948 }
1949
1950 for address in if_addrs {
1951 out.add_answer_at_time(
1952 DnsAddress::new(
1953 info.get_hostname(),
1954 ip_address_rr_type(&address),
1955 CLASS_IN | CLASS_CACHE_FLUSH,
1956 0,
1957 address,
1958 intf.into(),
1959 ),
1960 0,
1961 );
1962 }
1963
1964 send_dns_outgoing(&out, intf, sock).remove(0)
1966 }
1967
1968 fn add_hostname_resolver(
1972 &mut self,
1973 hostname: String,
1974 listener: Sender<HostnameResolutionEvent>,
1975 timeout: Option<u64>,
1976 ) {
1977 let real_timeout = timeout.map(|t| current_time_millis() + t);
1978 self.hostname_resolvers
1979 .insert(hostname.to_lowercase(), (listener, real_timeout));
1980 if let Some(t) = real_timeout {
1981 self.add_timer(t);
1982 }
1983 }
1984
1985 fn send_query(&self, name: &str, qtype: RRType) {
1987 self.send_query_vec(&[(name, qtype)]);
1988 }
1989
1990 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1992 trace!("Sending query questions: {:?}", questions);
1993 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1994 let now = current_time_millis();
1995
1996 for (name, qtype) in questions {
1997 out.add_question(name, *qtype);
1998
1999 for record in self.cache.get_known_answers(name, *qtype, now) {
2000 trace!("add known answer: {:?}", record.record);
2008 let mut new_record = record.record.clone();
2009 new_record.get_record_mut().update_ttl(now);
2010 out.add_answer_box(new_record);
2011 }
2012 }
2013
2014 for (_, intf) in self.my_intfs.iter() {
2015 if let Some(sock) = self.ipv4_sock.as_ref() {
2016 send_dns_outgoing(&out, intf, &sock.pktinfo);
2017 }
2018 if let Some(sock) = self.ipv6_sock.as_ref() {
2019 send_dns_outgoing(&out, intf, &sock.pktinfo);
2020 }
2021 }
2022 }
2023
2024 fn handle_read(&mut self, event_key: usize) -> bool {
2029 let sock_opt = match event_key {
2030 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2031 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2032 _ => {
2033 debug!("handle_read: unknown token {}", event_key);
2034 return false;
2035 }
2036 };
2037 let Some(sock) = sock_opt.as_mut() else {
2038 debug!("handle_read: socket not available for token {}", event_key);
2039 return false;
2040 };
2041 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2042
2043 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2050 Ok(sz) => sz,
2051 Err(e) => {
2052 if e.kind() != std::io::ErrorKind::WouldBlock {
2053 debug!("listening socket read failed: {}", e);
2054 }
2055 return false;
2056 }
2057 };
2058
2059 let pkt_if_index = pktinfo.if_index as u32;
2061 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2062 debug!(
2063 "handle_read: no interface found for pktinfo if_index: {}",
2064 pktinfo.if_index
2065 );
2066 return true; };
2068
2069 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
2072 Ok(msg) => {
2073 if msg.is_query() {
2074 self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2075 } else if msg.is_response() {
2076 self.handle_response(msg, pkt_if_index);
2077 } else {
2078 debug!("Invalid message: not query and not response");
2079 }
2080 }
2081 Err(e) => debug!("Invalid incoming DNS message: {}", e),
2082 }
2083
2084 true
2085 }
2086
2087 fn query_unresolved(&mut self, instance: &str) -> bool {
2089 if !valid_instance_name(instance) {
2090 trace!("instance name {} not valid", instance);
2091 return false;
2092 }
2093
2094 if let Some(records) = self.cache.get_srv(instance) {
2095 for record in records {
2096 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2097 if self.cache.get_addr(srv.host()).is_none() {
2098 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2099 return true;
2100 }
2101 }
2102 }
2103 } else {
2104 self.send_query(instance, RRType::ANY);
2105 return true;
2106 }
2107
2108 false
2109 }
2110
2111 fn query_cache_for_service(
2114 &mut self,
2115 ty_domain: &str,
2116 sender: &Sender<ServiceEvent>,
2117 now: u64,
2118 ) {
2119 let mut resolved: HashSet<String> = HashSet::new();
2120 let mut unresolved: HashSet<String> = HashSet::new();
2121
2122 if let Some(records) = self.cache.get_ptr(ty_domain) {
2123 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2124 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2125 let mut new_event = None;
2126 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2127 Ok(resolved_service) => {
2128 if resolved_service.is_valid() {
2129 debug!("Resolved service from cache: {}", ptr.alias());
2130 new_event =
2131 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2132 } else {
2133 debug!("Resolved service is not valid: {}", ptr.alias());
2134 }
2135 }
2136 Err(err) => {
2137 debug!("Error while resolving service from cache: {}", err);
2138 continue;
2139 }
2140 }
2141
2142 match sender.send(ServiceEvent::ServiceFound(
2143 ty_domain.to_string(),
2144 ptr.alias().to_string(),
2145 )) {
2146 Ok(()) => debug!("sent service found {}", ptr.alias()),
2147 Err(e) => {
2148 debug!("failed to send service found: {}", e);
2149 continue;
2150 }
2151 }
2152
2153 if let Some(event) = new_event {
2154 resolved.insert(ptr.alias().to_string());
2155 match sender.send(event) {
2156 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2157 Err(e) => debug!("failed to send service resolved: {}", e),
2158 }
2159 } else {
2160 unresolved.insert(ptr.alias().to_string());
2161 }
2162 }
2163 }
2164 }
2165
2166 for instance in resolved.drain() {
2167 self.pending_resolves.remove(&instance);
2168 self.resolved.insert(instance);
2169 }
2170
2171 for instance in unresolved.drain() {
2172 self.add_pending_resolve(instance);
2173 }
2174 }
2175
2176 fn query_cache_for_hostname(
2179 &mut self,
2180 hostname: &str,
2181 sender: Sender<HostnameResolutionEvent>,
2182 ) {
2183 let addresses_map = self.cache.get_addresses_for_host(hostname);
2184 for (name, addresses) in addresses_map {
2185 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2186 Ok(()) => trace!("sent hostname addresses found"),
2187 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2188 }
2189 }
2190 }
2191
2192 fn add_pending_resolve(&mut self, instance: String) {
2193 if !self.pending_resolves.contains(&instance) {
2194 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2195 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2196 self.pending_resolves.insert(instance);
2197 }
2198 }
2199
2200 fn resolve_service_from_cache(
2202 &self,
2203 ty_domain: &str,
2204 fullname: &str,
2205 ) -> Result<ResolvedService> {
2206 let now = current_time_millis();
2207 let mut resolved_service = ResolvedService {
2208 ty_domain: ty_domain.to_string(),
2209 sub_ty_domain: None,
2210 fullname: fullname.to_string(),
2211 host: String::new(),
2212 port: 0,
2213 addresses: HashSet::new(),
2214 txt_properties: TxtProperties::new(),
2215 };
2216
2217 if let Some(subtype) = self.cache.get_subtype(fullname) {
2219 trace!(
2220 "ty_domain: {} found subtype {} for instance: {}",
2221 ty_domain,
2222 subtype,
2223 fullname
2224 );
2225 if resolved_service.sub_ty_domain.is_none() {
2226 resolved_service.sub_ty_domain = Some(subtype.to_string());
2227 }
2228 }
2229
2230 if let Some(records) = self.cache.get_srv(fullname) {
2232 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2233 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2234 resolved_service.host = dns_srv.host().to_string();
2235 resolved_service.port = dns_srv.port();
2236 }
2237 }
2238 }
2239
2240 if let Some(records) = self.cache.get_txt(fullname) {
2242 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2243 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2244 resolved_service.txt_properties = dns_txt.text().into();
2245 }
2246 }
2247 }
2248
2249 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2251 for answer in records.iter() {
2252 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2253 if dns_a.expires_soon(now) {
2254 trace!(
2255 "Addr expired or expires soon: {}",
2256 dns_a.address().to_ip_addr()
2257 );
2258 } else {
2259 resolved_service.addresses.insert(dns_a.address());
2260 }
2261 }
2262 }
2263 }
2264
2265 Ok(resolved_service)
2266 }
2267
2268 fn handle_poller_events(&mut self, events: &mio::Events) {
2269 for ev in events.iter() {
2270 trace!("event received with key {:?}", ev.token());
2271 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2272 self.signal_sock_drain();
2274
2275 if let Err(e) = self.poller.registry().reregister(
2276 &mut self.signal_sock,
2277 ev.token(),
2278 mio::Interest::READABLE,
2279 ) {
2280 debug!("failed to modify poller for signal socket: {}", e);
2281 }
2282 continue; }
2284
2285 while self.handle_read(ev.token().0) {}
2287
2288 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2290 if let Some(sock) = self.ipv4_sock.as_mut() {
2292 if let Err(e) =
2293 self.poller
2294 .registry()
2295 .reregister(sock, ev.token(), mio::Interest::READABLE)
2296 {
2297 debug!("modify poller for IPv4 socket: {}", e);
2298 }
2299 }
2300 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2301 if let Some(sock) = self.ipv6_sock.as_mut() {
2303 if let Err(e) =
2304 self.poller
2305 .registry()
2306 .reregister(sock, ev.token(), mio::Interest::READABLE)
2307 {
2308 debug!("modify poller for IPv6 socket: {}", e);
2309 }
2310 }
2311 }
2312 }
2313 }
2314
2315 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2318 let now = current_time_millis();
2319
2320 let mut record_predicate = |record: &DnsRecordBox| {
2322 if !record.get_record().is_expired(now) {
2323 return true;
2324 }
2325
2326 debug!("record is expired, removing it from cache.");
2327 if self.cache.remove(record) {
2328 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2330 call_service_listener(
2331 &self.service_queriers,
2332 dns_ptr.get_name(),
2333 ServiceEvent::ServiceRemoved(
2334 dns_ptr.get_name().to_string(),
2335 dns_ptr.alias().to_string(),
2336 ),
2337 );
2338 }
2339 }
2340 false
2341 };
2342 msg.answers_mut().retain(&mut record_predicate);
2343 msg.authorities_mut().retain(&mut record_predicate);
2344 msg.additionals_mut().retain(&mut record_predicate);
2345
2346 self.conflict_handler(&msg, if_index);
2348
2349 let mut is_for_us = true; for answer in msg.answers() {
2356 if answer.get_type() == RRType::PTR {
2357 if self.service_queriers.contains_key(answer.get_name()) {
2358 is_for_us = true;
2359 break; } else {
2361 is_for_us = false;
2362 }
2363 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2364 let answer_lowercase = answer.get_name().to_lowercase();
2366 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2367 is_for_us = true;
2368 break; }
2370 }
2371 }
2372
2373 if self.accept_unsolicited {
2375 is_for_us = true;
2376 }
2377
2378 struct InstanceChange {
2380 ty: RRType, name: String, }
2383
2384 let mut changes = Vec::new();
2392 let mut timers = Vec::new();
2393 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2394 return;
2395 };
2396 for record in msg.all_records() {
2397 match self
2398 .cache
2399 .add_or_update(my_intf, record, &mut timers, is_for_us)
2400 {
2401 Some((dns_record, true)) => {
2402 timers.push(dns_record.record.get_record().get_expire_time());
2403 timers.push(dns_record.record.get_record().get_refresh_time());
2404
2405 let ty = dns_record.record.get_type();
2406 let name = dns_record.record.get_name();
2407
2408 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2410 if self.service_queriers.contains_key(name) {
2411 timers.push(dns_record.record.get_record().get_refresh_time());
2412 }
2413
2414 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2416 {
2417 debug!("calling listener with service found: {name}");
2418 call_service_listener(
2419 &self.service_queriers,
2420 name,
2421 ServiceEvent::ServiceFound(
2422 name.to_string(),
2423 dns_ptr.alias().to_string(),
2424 ),
2425 );
2426 changes.push(InstanceChange {
2427 ty,
2428 name: dns_ptr.alias().to_string(),
2429 });
2430 }
2431 } else {
2432 changes.push(InstanceChange {
2433 ty,
2434 name: name.to_string(),
2435 });
2436 }
2437 }
2438 Some((dns_record, false)) => {
2439 timers.push(dns_record.record.get_record().get_expire_time());
2440 timers.push(dns_record.record.get_record().get_refresh_time());
2441 }
2442 _ => {}
2443 }
2444 }
2445
2446 for t in timers {
2448 self.add_timer(t);
2449 }
2450
2451 for change in changes
2453 .iter()
2454 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2455 {
2456 let addr_map = self.cache.get_addresses_for_host(&change.name);
2457 for (name, addresses) in addr_map {
2458 call_hostname_resolution_listener(
2459 &self.hostname_resolvers,
2460 &change.name,
2461 HostnameResolutionEvent::AddressesFound(name, addresses),
2462 )
2463 }
2464 }
2465
2466 let mut updated_instances = HashSet::new();
2468 for update in changes {
2469 match update.ty {
2470 RRType::PTR | RRType::SRV | RRType::TXT => {
2471 updated_instances.insert(update.name);
2472 }
2473 RRType::A | RRType::AAAA => {
2474 let instances = self.cache.get_instances_on_host(&update.name);
2475 updated_instances.extend(instances);
2476 }
2477 _ => {}
2478 }
2479 }
2480
2481 self.resolve_updated_instances(&updated_instances);
2482 }
2483
2484 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2485 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2486 debug!("handle_response: no intf found for index {if_index}");
2487 return;
2488 };
2489
2490 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2491 return;
2492 };
2493
2494 for answer in msg.answers().iter() {
2495 let mut new_records = Vec::new();
2496
2497 let name = answer.get_name();
2498 let Some(probe) = dns_registry.probing.get_mut(name) else {
2499 continue;
2500 };
2501
2502 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2504 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2505 if answer_addr.interface_id.index != if_index {
2506 debug!(
2507 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2508 answer_addr, my_intf.name
2509 );
2510 continue;
2511 }
2512 }
2513
2514 let any_match = probe.records.iter().any(|r| {
2517 r.get_type() == answer.get_type()
2518 && r.get_class() == answer.get_class()
2519 && r.rrdata_match(answer.as_ref())
2520 });
2521 if any_match {
2522 continue; }
2524 }
2525
2526 probe.records.retain(|record| {
2527 if record.get_type() == answer.get_type()
2528 && record.get_class() == answer.get_class()
2529 && !record.rrdata_match(answer.as_ref())
2530 {
2531 debug!(
2532 "found conflict name: '{name}' record: {}: {} PEER: {}",
2533 record.get_type(),
2534 record.rdata_print(),
2535 answer.rdata_print()
2536 );
2537
2538 let mut new_record = record.clone();
2541 let new_name = match record.get_type() {
2542 RRType::A => hostname_change(name),
2543 RRType::AAAA => hostname_change(name),
2544 _ => name_change(name),
2545 };
2546 new_record.get_record_mut().set_new_name(new_name);
2547 new_records.push(new_record);
2548 return false; }
2550
2551 true
2552 });
2553
2554 let create_time = current_time_millis() + fastrand::u64(0..250);
2561
2562 let waiting_services = probe.waiting_services.clone();
2563
2564 for record in new_records {
2565 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2566 self.timers.push(Reverse(create_time));
2567 }
2568
2569 dns_registry.name_changes.insert(
2571 record.get_record().get_original_name().to_string(),
2572 record.get_name().to_string(),
2573 );
2574
2575 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2576 Some(p) => p,
2577 None => {
2578 let new_probe = dns_registry
2579 .probing
2580 .entry(record.get_name().to_string())
2581 .or_insert_with(|| {
2582 debug!("conflict handler: new probe of {}", record.get_name());
2583 Probe::new(create_time)
2584 });
2585 self.timers.push(Reverse(new_probe.next_send));
2586 new_probe
2587 }
2588 };
2589
2590 debug!(
2591 "insert record with new name '{}' {} into probe",
2592 record.get_name(),
2593 record.get_type()
2594 );
2595 new_probe.insert_record(record);
2596
2597 new_probe.waiting_services.extend(waiting_services.clone());
2598 }
2599 }
2600 }
2601
2602 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2609 let mut resolved: HashSet<String> = HashSet::new();
2610 let mut unresolved: HashSet<String> = HashSet::new();
2611 let mut removed_instances = HashMap::new();
2612
2613 let now = current_time_millis();
2614
2615 for (ty_domain, records) in self.cache.all_ptr().iter() {
2616 if !self.service_queriers.contains_key(ty_domain) {
2617 continue;
2619 }
2620
2621 for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2622 let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2623 continue;
2624 };
2625
2626 let instance = dns_ptr.alias();
2627 if !updated_instances.contains(instance) {
2628 continue;
2629 }
2630
2631 let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2632 else {
2633 continue;
2634 };
2635
2636 debug!("resolve_updated_instances: from cache: {instance}");
2637 if resolved_service.is_valid() {
2638 debug!("call queriers to resolve {instance}");
2639 resolved.insert(instance.to_string());
2640 let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2641 call_service_listener(&self.service_queriers, ty_domain, event);
2642 } else {
2643 debug!("Resolved service is not valid: {instance}");
2644 if self.resolved.remove(dns_ptr.alias()) {
2645 removed_instances
2646 .entry(ty_domain.to_string())
2647 .or_insert_with(HashSet::new)
2648 .insert(instance.to_string());
2649 }
2650 unresolved.insert(instance.to_string());
2651 }
2652 }
2653 }
2654
2655 for instance in resolved.drain() {
2656 self.pending_resolves.remove(&instance);
2657 self.resolved.insert(instance);
2658 }
2659
2660 for instance in unresolved.drain() {
2661 self.add_pending_resolve(instance);
2662 }
2663
2664 if !removed_instances.is_empty() {
2665 debug!(
2666 "resolve_updated_instances: removed {}",
2667 &removed_instances.len()
2668 );
2669 self.notify_service_removal(removed_instances);
2670 }
2671 }
2672
2673 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2675 let sock_opt = if is_ipv4 {
2676 &self.ipv4_sock
2677 } else {
2678 &self.ipv6_sock
2679 };
2680 let Some(sock) = sock_opt.as_ref() else {
2681 debug!("handle_query: socket not available for intf {}", if_index);
2682 return;
2683 };
2684 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2685
2686 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2689
2690 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2691 debug!("missing dns registry for intf {}", if_index);
2692 return;
2693 };
2694
2695 let Some(intf) = self.my_intfs.get(&if_index) else {
2696 return;
2697 };
2698
2699 for question in msg.questions().iter() {
2700 let qtype = question.entry_type();
2701
2702 if qtype == RRType::PTR {
2703 for service in self.my_services.values() {
2704 if service.get_status(if_index) != ServiceStatus::Announced {
2705 continue;
2706 }
2707
2708 if question.entry_name() == service.get_type()
2709 || service
2710 .get_subtype()
2711 .as_ref()
2712 .is_some_and(|v| v == question.entry_name())
2713 {
2714 add_answer_with_additionals(
2715 &mut out,
2716 &msg,
2717 service,
2718 intf,
2719 dns_registry,
2720 is_ipv4,
2721 );
2722 } else if question.entry_name() == META_QUERY {
2723 let ptr_added = out.add_answer(
2724 &msg,
2725 DnsPointer::new(
2726 question.entry_name(),
2727 RRType::PTR,
2728 CLASS_IN,
2729 service.get_other_ttl(),
2730 service.get_type().to_string(),
2731 ),
2732 );
2733 if !ptr_added {
2734 trace!("answer was not added for meta-query {:?}", &question);
2735 }
2736 }
2737 }
2738 } else {
2739 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2741 let probe_name = question.entry_name();
2742
2743 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2744 let now = current_time_millis();
2745
2746 if probe.start_time < now {
2750 let incoming_records: Vec<_> = msg
2751 .authorities()
2752 .iter()
2753 .filter(|r| r.get_name() == probe_name)
2754 .collect();
2755
2756 probe.tiebreaking(&incoming_records, now, probe_name);
2757 }
2758 }
2759 }
2760
2761 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2762 for service in self.my_services.values() {
2763 if service.get_status(if_index) != ServiceStatus::Announced {
2764 continue;
2765 }
2766
2767 let service_hostname =
2768 match dns_registry.name_changes.get(service.get_hostname()) {
2769 Some(new_name) => new_name,
2770 None => service.get_hostname(),
2771 };
2772
2773 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2774 let intf_addrs = if is_ipv4 {
2775 service.get_addrs_on_my_intf_v4(intf)
2776 } else {
2777 service.get_addrs_on_my_intf_v6(intf)
2778 };
2779 if intf_addrs.is_empty()
2780 && (qtype == RRType::A || qtype == RRType::AAAA)
2781 {
2782 let t = match qtype {
2783 RRType::A => "TYPE_A",
2784 RRType::AAAA => "TYPE_AAAA",
2785 _ => "invalid_type",
2786 };
2787 trace!(
2788 "Cannot find valid addrs for {} response on intf {:?}",
2789 t,
2790 &intf
2791 );
2792 return;
2793 }
2794 for address in intf_addrs {
2795 out.add_answer(
2796 &msg,
2797 DnsAddress::new(
2798 service_hostname,
2799 ip_address_rr_type(&address),
2800 CLASS_IN | CLASS_CACHE_FLUSH,
2801 service.get_host_ttl(),
2802 address,
2803 intf.into(),
2804 ),
2805 );
2806 }
2807 }
2808 }
2809 }
2810
2811 let query_name = question.entry_name().to_lowercase();
2812 let service_opt = self
2813 .my_services
2814 .iter()
2815 .find(|(k, _v)| {
2816 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2817 Some(new_name) => new_name,
2818 None => k,
2819 };
2820 service_name == &query_name
2821 })
2822 .map(|(_, v)| v);
2823
2824 let Some(service) = service_opt else {
2825 continue;
2826 };
2827
2828 if service.get_status(if_index) != ServiceStatus::Announced {
2829 continue;
2830 }
2831
2832 let intf_addrs = if is_ipv4 {
2833 service.get_addrs_on_my_intf_v4(intf)
2834 } else {
2835 service.get_addrs_on_my_intf_v6(intf)
2836 };
2837 if intf_addrs.is_empty() {
2838 debug!(
2839 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2840 &intf
2841 );
2842 continue;
2843 }
2844
2845 add_answer_of_service(
2846 &mut out,
2847 &msg,
2848 question.entry_name(),
2849 service,
2850 qtype,
2851 intf_addrs,
2852 );
2853 }
2854 }
2855
2856 if !out.answers_count() > 0 {
2857 out.set_id(msg.id());
2858 send_dns_outgoing(&out, intf, &sock.pktinfo);
2859
2860 let if_name = intf.name.clone();
2861
2862 self.increase_counter(Counter::Respond, 1);
2863 self.notify_monitors(DaemonEvent::Respond(if_name));
2864 }
2865
2866 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2867 }
2868
2869 fn increase_counter(&mut self, counter: Counter, count: i64) {
2871 let key = counter.to_string();
2872 match self.counters.get_mut(&key) {
2873 Some(v) => *v += count,
2874 None => {
2875 self.counters.insert(key, count);
2876 }
2877 }
2878 }
2879
2880 fn set_counter(&mut self, counter: Counter, count: i64) {
2882 let key = counter.to_string();
2883 self.counters.insert(key, count);
2884 }
2885
2886 fn signal_sock_drain(&self) {
2887 let mut signal_buf = [0; 1024];
2888
2889 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2891 trace!(
2892 "signal socket recvd: {}",
2893 String::from_utf8_lossy(&signal_buf[0..sz])
2894 );
2895 }
2896 }
2897
2898 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2899 self.retransmissions.push(ReRun { next_time, command });
2900 self.add_timer(next_time);
2901 }
2902
2903 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2906 for (ty_domain, sender) in self.service_queriers.iter() {
2907 if let Some(instances) = expired.get(ty_domain) {
2908 for instance_name in instances {
2909 let event = ServiceEvent::ServiceRemoved(
2910 ty_domain.to_string(),
2911 instance_name.to_string(),
2912 );
2913 match sender.send(event) {
2914 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2915 Err(e) => debug!("Failed to send event: {}", e),
2916 }
2917 }
2918 }
2919 }
2920 }
2921
2922 fn exec_command(&mut self, command: Command, repeating: bool) {
2926 trace!("exec_command: {:?} repeating: {}", &command, repeating);
2927 match command {
2928 Command::Browse(ty, next_delay, cache_only, listener) => {
2929 self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
2930 }
2931
2932 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2933 self.exec_command_resolve_hostname(
2934 repeating, hostname, next_delay, listener, timeout,
2935 );
2936 }
2937
2938 Command::Register(service_info) => {
2939 self.register_service(service_info);
2940 self.increase_counter(Counter::Register, 1);
2941 }
2942
2943 Command::RegisterResend(fullname, intf) => {
2944 trace!("register-resend service: {fullname} on {}", &intf);
2945 self.exec_command_register_resend(fullname, intf);
2946 }
2947
2948 Command::Unregister(fullname, resp_s) => {
2949 trace!("unregister service {} repeat {}", &fullname, &repeating);
2950 self.exec_command_unregister(repeating, fullname, resp_s);
2951 }
2952
2953 Command::UnregisterResend(packet, if_index, is_ipv4) => {
2954 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2955 }
2956
2957 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2958
2959 Command::StopResolveHostname(hostname) => {
2960 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2961 }
2962
2963 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2964
2965 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2966
2967 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2968 Ok(()) => trace!("Sent status to the client"),
2969 Err(e) => debug!("Failed to send status: {}", e),
2970 },
2971
2972 Command::Monitor(resp_s) => {
2973 self.monitors.push(resp_s);
2974 }
2975
2976 Command::SetOption(daemon_opt) => {
2977 self.process_set_option(daemon_opt);
2978 }
2979
2980 Command::GetOption(resp_s) => {
2981 let val = DaemonOptionVal {
2982 _service_name_len_max: self.service_name_len_max,
2983 ip_check_interval: self.ip_check_interval,
2984 };
2985 if let Err(e) = resp_s.send(val) {
2986 debug!("Failed to send options: {}", e);
2987 }
2988 }
2989
2990 Command::Verify(instance_fullname, timeout) => {
2991 self.exec_command_verify(instance_fullname, timeout, repeating);
2992 }
2993
2994 _ => {
2995 debug!("unexpected command: {:?}", &command);
2996 }
2997 }
2998 }
2999
3000 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3001 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3002 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3003 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3004 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3005 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3006 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3007 self.set_counter(Counter::Timer, self.timers.len() as i64);
3008
3009 let dns_registry_probe_count: usize = self
3010 .dns_registry_map
3011 .values()
3012 .map(|r| r.probing.len())
3013 .sum();
3014 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3015
3016 let dns_registry_active_count: usize = self
3017 .dns_registry_map
3018 .values()
3019 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3020 .sum();
3021 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3022
3023 let dns_registry_timer_count: usize = self
3024 .dns_registry_map
3025 .values()
3026 .map(|r| r.new_timers.len())
3027 .sum();
3028 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3029
3030 let dns_registry_name_change_count: usize = self
3031 .dns_registry_map
3032 .values()
3033 .map(|r| r.name_changes.len())
3034 .sum();
3035 self.set_counter(
3036 Counter::DnsRegistryNameChange,
3037 dns_registry_name_change_count as i64,
3038 );
3039
3040 if let Err(e) = resp_s.send(self.counters.clone()) {
3042 debug!("Failed to send metrics: {}", e);
3043 }
3044 }
3045
3046 fn exec_command_browse(
3047 &mut self,
3048 repeating: bool,
3049 ty: String,
3050 next_delay: u32,
3051 cache_only: bool,
3052 listener: Sender<ServiceEvent>,
3053 ) {
3054 let pretty_addrs: Vec<String> = self
3055 .my_intfs
3056 .iter()
3057 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3058 .collect();
3059
3060 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3061 "{ty} on {} interfaces [{}]",
3062 pretty_addrs.len(),
3063 pretty_addrs.join(", ")
3064 ))) {
3065 debug!(
3066 "Failed to send SearchStarted({})(repeating:{}): {}",
3067 &ty, repeating, e
3068 );
3069 return;
3070 }
3071
3072 let now = current_time_millis();
3073 if !repeating {
3074 self.service_queriers.insert(ty.clone(), listener.clone());
3078
3079 self.query_cache_for_service(&ty, &listener, now);
3081 }
3082
3083 if cache_only {
3084 match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3086 Ok(()) => debug!("SearchStopped sent for {}", &ty),
3087 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3088 }
3089 return;
3090 }
3091
3092 self.send_query(&ty, RRType::PTR);
3093 self.increase_counter(Counter::Browse, 1);
3094
3095 let next_time = now + (next_delay * 1000) as u64;
3096 let max_delay = 60 * 60;
3097 let delay = cmp::min(next_delay * 2, max_delay);
3098 self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3099 }
3100
3101 fn exec_command_resolve_hostname(
3102 &mut self,
3103 repeating: bool,
3104 hostname: String,
3105 next_delay: u32,
3106 listener: Sender<HostnameResolutionEvent>,
3107 timeout: Option<u64>,
3108 ) {
3109 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3110 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3111 "{} on addrs {:?}",
3112 &hostname, &addr_list
3113 ))) {
3114 debug!(
3115 "Failed to send ResolveStarted({})(repeating:{}): {}",
3116 &hostname, repeating, e
3117 );
3118 return;
3119 }
3120 if !repeating {
3121 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3122 self.query_cache_for_hostname(&hostname, listener.clone());
3124 }
3125
3126 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3127 self.increase_counter(Counter::ResolveHostname, 1);
3128
3129 let now = current_time_millis();
3130 let next_time = now + u64::from(next_delay) * 1000;
3131 let max_delay = 60 * 60;
3132 let delay = cmp::min(next_delay * 2, max_delay);
3133
3134 if self
3136 .hostname_resolvers
3137 .get(&hostname)
3138 .and_then(|(_sender, timeout)| *timeout)
3139 .map(|timeout| next_time < timeout)
3140 .unwrap_or(true)
3141 {
3142 self.add_retransmission(
3143 next_time,
3144 Command::ResolveHostname(hostname, delay, listener, None),
3145 );
3146 }
3147 }
3148
3149 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3150 let pending_query = self.query_unresolved(&instance);
3151 let max_try = 3;
3152 if pending_query && try_count < max_try {
3153 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3156 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3157 }
3158 }
3159
3160 fn exec_command_unregister(
3161 &mut self,
3162 repeating: bool,
3163 fullname: String,
3164 resp_s: Sender<UnregisterStatus>,
3165 ) {
3166 let response = match self.my_services.remove_entry(&fullname) {
3167 None => {
3168 debug!("unregister: cannot find such service {}", &fullname);
3169 UnregisterStatus::NotFound
3170 }
3171 Some((_k, info)) => {
3172 let mut timers = Vec::new();
3173
3174 for (if_index, intf) in self.my_intfs.iter() {
3175 if let Some(sock) = self.ipv4_sock.as_ref() {
3176 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3177 if !repeating && !packet.is_empty() {
3179 let next_time = current_time_millis() + 120;
3180 self.retransmissions.push(ReRun {
3181 next_time,
3182 command: Command::UnregisterResend(packet, *if_index, true),
3183 });
3184 timers.push(next_time);
3185 }
3186 }
3187
3188 if let Some(sock) = self.ipv6_sock.as_ref() {
3190 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3191 if !repeating && !packet.is_empty() {
3192 let next_time = current_time_millis() + 120;
3193 self.retransmissions.push(ReRun {
3194 next_time,
3195 command: Command::UnregisterResend(packet, *if_index, false),
3196 });
3197 timers.push(next_time);
3198 }
3199 }
3200 }
3201
3202 for t in timers {
3203 self.add_timer(t);
3204 }
3205
3206 self.increase_counter(Counter::Unregister, 1);
3207 UnregisterStatus::OK
3208 }
3209 };
3210 if let Err(e) = resp_s.send(response) {
3211 debug!("unregister: failed to send response: {}", e);
3212 }
3213 }
3214
3215 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3216 let Some(intf) = self.my_intfs.get(&if_index) else {
3217 return;
3218 };
3219 let sock_opt = if is_ipv4 {
3220 &self.ipv4_sock
3221 } else {
3222 &self.ipv6_sock
3223 };
3224 let Some(sock) = sock_opt else {
3225 return;
3226 };
3227
3228 let if_addr = if is_ipv4 {
3229 match intf.next_ifaddr_v4() {
3230 Some(addr) => addr,
3231 None => return,
3232 }
3233 } else {
3234 match intf.next_ifaddr_v6() {
3235 Some(addr) => addr,
3236 None => return,
3237 }
3238 };
3239
3240 debug!("UnregisterResend from {:?}", if_addr);
3241 multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, &sock.pktinfo);
3242
3243 self.increase_counter(Counter::UnregisterResend, 1);
3244 }
3245
3246 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3247 match self.service_queriers.remove_entry(&ty_domain) {
3248 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3249 Some((ty, sender)) => {
3250 trace!("StopBrowse: removed queryer for {}", &ty);
3252 let mut i = 0;
3253 while i < self.retransmissions.len() {
3254 if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3255 if t == &ty {
3256 self.retransmissions.remove(i);
3257 trace!("StopBrowse: removed retransmission for {}", &ty);
3258 continue;
3259 }
3260 }
3261 i += 1;
3262 }
3263
3264 self.cache.remove_service_type(&ty_domain);
3266
3267 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3269 Ok(()) => trace!("Sent SearchStopped to the listener"),
3270 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3271 }
3272 }
3273 }
3274 }
3275
3276 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3277 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3278 trace!("StopResolve: removed queryer for {}", &host);
3280 let mut i = 0;
3281 while i < self.retransmissions.len() {
3282 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3283 if t == &host {
3284 self.retransmissions.remove(i);
3285 trace!("StopResolve: removed retransmission for {}", &host);
3286 continue;
3287 }
3288 }
3289 i += 1;
3290 }
3291
3292 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3294 Ok(()) => trace!("Sent SearchStopped to the listener"),
3295 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3296 }
3297 }
3298 }
3299
3300 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3301 let Some(info) = self.my_services.get_mut(&fullname) else {
3302 trace!("announce: cannot find such service {}", &fullname);
3303 return;
3304 };
3305
3306 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3307 return;
3308 };
3309
3310 let Some(intf) = self.my_intfs.get(&if_index) else {
3311 return;
3312 };
3313
3314 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3315 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
3316 } else {
3317 false
3318 };
3319 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3320 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
3321 } else {
3322 false
3323 };
3324
3325 if announced_v4 || announced_v6 {
3326 let mut hostname = info.get_hostname();
3327 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3328 hostname = new_name;
3329 }
3330 let service_name = match dns_registry.name_changes.get(&fullname) {
3331 Some(new_name) => new_name.to_string(),
3332 None => fullname,
3333 };
3334
3335 debug!("resend: announce service {service_name} on {}", intf.name);
3336
3337 notify_monitors(
3338 &mut self.monitors,
3339 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3340 );
3341 info.set_status(if_index, ServiceStatus::Announced);
3342 } else {
3343 debug!("register-resend should not fail");
3344 }
3345
3346 self.increase_counter(Counter::RegisterResend, 1);
3347 }
3348
3349 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3350 let now = current_time_millis();
3360 let expire_at = if repeating {
3361 None
3362 } else {
3363 Some(now + timeout.as_millis() as u64)
3364 };
3365
3366 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3368
3369 if !record_vec.is_empty() {
3370 let query_vec: Vec<(&str, RRType)> = record_vec
3371 .iter()
3372 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3373 .collect();
3374 self.send_query_vec(&query_vec);
3375
3376 if let Some(new_expire) = expire_at {
3377 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3381 }
3382 }
3383 }
3384
3385 fn refresh_active_services(&mut self) {
3387 let mut query_ptr_count = 0;
3388 let mut query_srv_count = 0;
3389 let mut new_timers = HashSet::new();
3390 let mut query_addr_count = 0;
3391
3392 for (ty_domain, _sender) in self.service_queriers.iter() {
3393 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3394 if !refreshed_timers.is_empty() {
3395 trace!("sending refresh query for PTR: {}", ty_domain);
3396 self.send_query(ty_domain, RRType::PTR);
3397 query_ptr_count += 1;
3398 new_timers.extend(refreshed_timers);
3399 }
3400
3401 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3402 for (instance, types) in instances {
3403 trace!("sending refresh query for: {}", &instance);
3404 let query_vec = types
3405 .into_iter()
3406 .map(|ty| (instance.as_str(), ty))
3407 .collect::<Vec<_>>();
3408 self.send_query_vec(&query_vec);
3409 query_srv_count += 1;
3410 }
3411 new_timers.extend(timers);
3412 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3413 for hostname in hostnames.iter() {
3414 trace!("sending refresh queries for A and AAAA: {}", hostname);
3415 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3416 query_addr_count += 2;
3417 }
3418 new_timers.extend(timers);
3419 }
3420
3421 for timer in new_timers {
3422 self.add_timer(timer);
3423 }
3424
3425 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3426 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3427 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3428 }
3429}
3430
3431fn add_answer_of_service(
3433 out: &mut DnsOutgoing,
3434 msg: &DnsIncoming,
3435 entry_name: &str,
3436 service: &ServiceInfo,
3437 qtype: RRType,
3438 intf_addrs: Vec<IpAddr>,
3439) {
3440 if qtype == RRType::SRV || qtype == RRType::ANY {
3441 out.add_answer(
3442 msg,
3443 DnsSrv::new(
3444 entry_name,
3445 CLASS_IN | CLASS_CACHE_FLUSH,
3446 service.get_host_ttl(),
3447 service.get_priority(),
3448 service.get_weight(),
3449 service.get_port(),
3450 service.get_hostname().to_string(),
3451 ),
3452 );
3453 }
3454
3455 if qtype == RRType::TXT || qtype == RRType::ANY {
3456 out.add_answer(
3457 msg,
3458 DnsTxt::new(
3459 entry_name,
3460 CLASS_IN | CLASS_CACHE_FLUSH,
3461 service.get_other_ttl(),
3462 service.generate_txt(),
3463 ),
3464 );
3465 }
3466
3467 if qtype == RRType::SRV {
3468 for address in intf_addrs {
3469 out.add_additional_answer(DnsAddress::new(
3470 service.get_hostname(),
3471 ip_address_rr_type(&address),
3472 CLASS_IN | CLASS_CACHE_FLUSH,
3473 service.get_host_ttl(),
3474 address,
3475 InterfaceId::default(),
3476 ));
3477 }
3478 }
3479}
3480
3481#[derive(Clone, Debug)]
3484#[non_exhaustive]
3485pub enum ServiceEvent {
3486 SearchStarted(String),
3488
3489 ServiceFound(String, String),
3491
3492 ServiceResolved(Box<ResolvedService>),
3494
3495 ServiceRemoved(String, String),
3497
3498 SearchStopped(String),
3500}
3501
3502#[derive(Clone, Debug)]
3505#[non_exhaustive]
3506pub enum HostnameResolutionEvent {
3507 SearchStarted(String),
3509 AddressesFound(String, HashSet<ScopedIp>),
3511 AddressesRemoved(String, HashSet<ScopedIp>),
3513 SearchTimeout(String),
3515 SearchStopped(String),
3517}
3518
3519#[derive(Clone, Debug)]
3522#[non_exhaustive]
3523pub enum DaemonEvent {
3524 Announce(String, String),
3526
3527 Error(Error),
3529
3530 IpAdd(IpAddr),
3532
3533 IpDel(IpAddr),
3535
3536 NameChange(DnsNameChange),
3539
3540 Respond(String),
3542}
3543
3544#[derive(Clone, Debug)]
3547pub struct DnsNameChange {
3548 pub original: String,
3550
3551 pub new_name: String,
3561
3562 pub rr_type: RRType,
3564
3565 pub intf_name: String,
3567}
3568
3569#[derive(Debug)]
3571enum Command {
3572 Browse(String, u32, bool, Sender<ServiceEvent>),
3574
3575 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
3580
3581 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3602
3603 GetStatus(Sender<DaemonStatus>),
3605
3606 Monitor(Sender<DaemonEvent>),
3608
3609 SetOption(DaemonOption),
3610
3611 GetOption(Sender<DaemonOptionVal>),
3612
3613 Verify(String, Duration),
3618
3619 Exit(Sender<DaemonStatus>),
3620}
3621
3622impl fmt::Display for Command {
3623 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3624 match self {
3625 Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3626 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3627 Self::Exit(_) => write!(f, "Command Exit"),
3628 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3629 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3630 Self::Monitor(_) => write!(f, "Command Monitor"),
3631 Self::Register(_) => write!(f, "Command Register"),
3632 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3633 Self::SetOption(_) => write!(f, "Command SetOption"),
3634 Self::GetOption(_) => write!(f, "Command GetOption"),
3635 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3636 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3637 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3638 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3639 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3640 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3641 }
3642 }
3643}
3644
3645struct DaemonOptionVal {
3646 _service_name_len_max: u8,
3647 ip_check_interval: u64,
3648}
3649
3650#[derive(Debug)]
3651enum DaemonOption {
3652 ServiceNameLenMax(u8),
3653 IpCheckInterval(u64),
3654 EnableInterface(Vec<IfKind>),
3655 DisableInterface(Vec<IfKind>),
3656 MulticastLoopV4(bool),
3657 MulticastLoopV6(bool),
3658 AcceptUnsolicited(bool),
3659 #[cfg(test)]
3660 TestDownInterface(String),
3661 #[cfg(test)]
3662 TestUpInterface(String),
3663}
3664
3665const DOMAIN_LEN: usize = "._tcp.local.".len();
3667
3668fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3670 if ty_domain.len() <= DOMAIN_LEN + 1 {
3671 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3673 }
3674
3675 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3677 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3678 }
3679 Ok(())
3680}
3681
3682fn check_domain_suffix(name: &str) -> Result<()> {
3684 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3685 return Err(e_fmt!(
3686 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3687 name
3688 ));
3689 }
3690
3691 Ok(())
3692}
3693
3694fn check_service_name(fullname: &str) -> Result<()> {
3702 check_domain_suffix(fullname)?;
3703
3704 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3705 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3706
3707 if &name[0..1] != "_" {
3708 return Err(e_fmt!("Service name must start with '_'"));
3709 }
3710
3711 let name = &name[1..];
3712
3713 if name.contains("--") {
3714 return Err(e_fmt!("Service name must not contain '--'"));
3715 }
3716
3717 if name.starts_with('-') || name.ends_with('-') {
3718 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3719 }
3720
3721 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3722 if ascii_count < 1 {
3723 return Err(e_fmt!(
3724 "Service name must contain at least one letter (eg: 'A-Za-z')"
3725 ));
3726 }
3727
3728 Ok(())
3729}
3730
3731fn check_hostname(hostname: &str) -> Result<()> {
3733 if !hostname.ends_with(".local.") {
3734 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3735 }
3736
3737 if hostname == ".local." {
3738 return Err(e_fmt!(
3739 "The part of the hostname before '.local.' cannot be empty"
3740 ));
3741 }
3742
3743 if hostname.len() > 255 {
3744 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3745 }
3746
3747 Ok(())
3748}
3749
3750fn call_service_listener(
3751 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3752 ty_domain: &str,
3753 event: ServiceEvent,
3754) {
3755 if let Some(listener) = listeners_map.get(ty_domain) {
3756 match listener.send(event) {
3757 Ok(()) => trace!("Sent event to listener successfully"),
3758 Err(e) => debug!("Failed to send event: {}", e),
3759 }
3760 }
3761}
3762
3763fn call_hostname_resolution_listener(
3764 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3765 hostname: &str,
3766 event: HostnameResolutionEvent,
3767) {
3768 let hostname_lower = hostname.to_lowercase();
3769 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3770 match listener.send(event) {
3771 Ok(()) => trace!("Sent event to listener successfully"),
3772 Err(e) => debug!("Failed to send event: {}", e),
3773 }
3774 }
3775}
3776
3777fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3781 if_addrs::get_if_addrs()
3782 .unwrap_or_default()
3783 .into_iter()
3784 .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3785 .collect()
3786}
3787
3788fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3789 let if_name = &my_intf.name;
3790
3791 let if_addr = if sock.domain() == Domain::IPV4 {
3792 match my_intf.next_ifaddr_v4() {
3793 Some(addr) => addr,
3794 None => return vec![],
3795 }
3796 } else {
3797 match my_intf.next_ifaddr_v6() {
3798 Some(addr) => addr,
3799 None => return vec![],
3800 }
3801 };
3802
3803 send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3804}
3805
3806fn send_dns_outgoing_impl(
3808 out: &DnsOutgoing,
3809 if_name: &str,
3810 if_index: u32,
3811 if_addr: &IfAddr,
3812 sock: &PktInfoUdpSocket,
3813) -> Vec<Vec<u8>> {
3814 let qtype = if out.is_query() {
3815 "query"
3816 } else {
3817 if out.answers_count() == 0 && out.additionals().is_empty() {
3818 return vec![]; }
3820 "response"
3821 };
3822 trace!(
3823 "send {}: {} questions {} answers {} authorities {} additional",
3824 qtype,
3825 out.questions().len(),
3826 out.answers_count(),
3827 out.authorities().len(),
3828 out.additionals().len()
3829 );
3830
3831 match if_addr.ip() {
3832 IpAddr::V4(ipv4) => {
3833 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3834 debug!(
3835 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3836 ipv4, e
3837 );
3838 return vec![]; }
3840 }
3841 IpAddr::V6(ipv6) => {
3842 if let Err(e) = sock.set_multicast_if_v6(if_index) {
3843 debug!(
3844 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3845 ipv6, e
3846 );
3847 return vec![]; }
3849 }
3850 }
3851
3852 let packet_list = out.to_data_on_wire();
3853 for packet in packet_list.iter() {
3854 multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3855 }
3856 packet_list
3857}
3858
3859fn multicast_on_intf(
3861 packet: &[u8],
3862 if_name: &str,
3863 if_index: u32,
3864 if_addr: &IfAddr,
3865 socket: &PktInfoUdpSocket,
3866) {
3867 if packet.len() > MAX_MSG_ABSOLUTE {
3868 debug!("Drop over-sized packet ({})", packet.len());
3869 return;
3870 }
3871
3872 let addr: SocketAddr = match if_addr {
3873 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3874 if_addrs::IfAddr::V6(_) => {
3875 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3876 sock.set_scope_id(if_index); sock.into()
3878 }
3879 };
3880
3881 let sock_addr = addr.into();
3883 match socket.send_to(packet, &sock_addr) {
3884 Ok(sz) => trace!(
3885 "sent out {} bytes on interface {} (idx {}) addr {}",
3886 sz,
3887 if_name,
3888 if_index,
3889 if_addr.ip()
3890 ),
3891 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3892 }
3893}
3894
3895fn valid_instance_name(name: &str) -> bool {
3899 name.split('.').count() >= 5
3900}
3901
3902fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3903 monitors.retain(|sender| {
3904 if let Err(e) = sender.try_send(event.clone()) {
3905 debug!("notify_monitors: try_send: {}", &e);
3906 if matches!(e, TrySendError::Disconnected(_)) {
3907 return false; }
3909 }
3910 true
3911 });
3912}
3913
3914fn prepare_announce(
3917 info: &ServiceInfo,
3918 intf: &MyIntf,
3919 dns_registry: &mut DnsRegistry,
3920 is_ipv4: bool,
3921) -> Option<DnsOutgoing> {
3922 let intf_addrs = if is_ipv4 {
3923 info.get_addrs_on_my_intf_v4(intf)
3924 } else {
3925 info.get_addrs_on_my_intf_v6(intf)
3926 };
3927
3928 if intf_addrs.is_empty() {
3929 debug!(
3930 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3931 &intf.name
3932 );
3933 return None;
3934 }
3935
3936 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3938 Some(new_name) => new_name,
3939 None => info.get_fullname(),
3940 };
3941
3942 debug!(
3943 "prepare to announce service {service_fullname} on {:?}",
3944 &intf_addrs
3945 );
3946
3947 let mut probing_count = 0;
3948 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3949 let create_time = current_time_millis() + fastrand::u64(0..250);
3950
3951 out.add_answer_at_time(
3952 DnsPointer::new(
3953 info.get_type(),
3954 RRType::PTR,
3955 CLASS_IN,
3956 info.get_other_ttl(),
3957 service_fullname.to_string(),
3958 ),
3959 0,
3960 );
3961
3962 if let Some(sub) = info.get_subtype() {
3963 trace!("Adding subdomain {}", sub);
3964 out.add_answer_at_time(
3965 DnsPointer::new(
3966 sub,
3967 RRType::PTR,
3968 CLASS_IN,
3969 info.get_other_ttl(),
3970 service_fullname.to_string(),
3971 ),
3972 0,
3973 );
3974 }
3975
3976 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3978 Some(new_name) => new_name.to_string(),
3979 None => info.get_hostname().to_string(),
3980 };
3981
3982 let mut srv = DnsSrv::new(
3983 info.get_fullname(),
3984 CLASS_IN | CLASS_CACHE_FLUSH,
3985 info.get_host_ttl(),
3986 info.get_priority(),
3987 info.get_weight(),
3988 info.get_port(),
3989 hostname,
3990 );
3991
3992 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3993 srv.get_record_mut().set_new_name(new_name.to_string());
3994 }
3995
3996 if !info.requires_probe()
3997 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3998 {
3999 out.add_answer_at_time(srv, 0);
4000 } else {
4001 probing_count += 1;
4002 }
4003
4004 let mut txt = DnsTxt::new(
4007 info.get_fullname(),
4008 CLASS_IN | CLASS_CACHE_FLUSH,
4009 info.get_other_ttl(),
4010 info.generate_txt(),
4011 );
4012
4013 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4014 txt.get_record_mut().set_new_name(new_name.to_string());
4015 }
4016
4017 if !info.requires_probe()
4018 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4019 {
4020 out.add_answer_at_time(txt, 0);
4021 } else {
4022 probing_count += 1;
4023 }
4024
4025 let hostname = info.get_hostname();
4028 for address in intf_addrs {
4029 let mut dns_addr = DnsAddress::new(
4030 hostname,
4031 ip_address_rr_type(&address),
4032 CLASS_IN | CLASS_CACHE_FLUSH,
4033 info.get_host_ttl(),
4034 address,
4035 intf.into(),
4036 );
4037
4038 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4039 dns_addr.get_record_mut().set_new_name(new_name.to_string());
4040 }
4041
4042 if !info.requires_probe()
4043 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4044 {
4045 out.add_answer_at_time(dns_addr, 0);
4046 } else {
4047 probing_count += 1;
4048 }
4049 }
4050
4051 if probing_count > 0 {
4052 return None;
4053 }
4054
4055 Some(out)
4056}
4057
4058fn announce_service_on_intf(
4061 dns_registry: &mut DnsRegistry,
4062 info: &ServiceInfo,
4063 intf: &MyIntf,
4064 sock: &PktInfoUdpSocket,
4065) -> bool {
4066 let is_ipv4 = sock.domain() == Domain::IPV4;
4067 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4068 send_dns_outgoing(&out, intf, sock);
4069 return true;
4070 }
4071
4072 false
4073}
4074
4075fn name_change(original: &str) -> String {
4083 let mut parts: Vec<_> = original.split('.').collect();
4084 let Some(first_part) = parts.get_mut(0) else {
4085 return format!("{original} (2)");
4086 };
4087
4088 let mut new_name = format!("{first_part} (2)");
4089
4090 if let Some(paren_pos) = first_part.rfind(" (") {
4092 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4094 let absolute_end_pos = paren_pos + end_paren;
4095 if absolute_end_pos == first_part.len() - 1 {
4097 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4100 let base_name = &first_part[..paren_pos];
4101 new_name = format!("{} ({})", base_name, number + 1)
4102 }
4103 }
4104 }
4105 }
4106
4107 *first_part = &new_name;
4108 parts.join(".")
4109}
4110
4111fn hostname_change(original: &str) -> String {
4119 let mut parts: Vec<_> = original.split('.').collect();
4120 let Some(first_part) = parts.get_mut(0) else {
4121 return format!("{original}-2");
4122 };
4123
4124 let mut new_name = format!("{first_part}-2");
4125
4126 if let Some(hyphen_pos) = first_part.rfind('-') {
4128 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4130 let base_name = &first_part[..hyphen_pos];
4131 new_name = format!("{}-{}", base_name, number + 1);
4132 }
4133 }
4134
4135 *first_part = &new_name;
4136 parts.join(".")
4137}
4138
4139fn add_answer_with_additionals(
4140 out: &mut DnsOutgoing,
4141 msg: &DnsIncoming,
4142 service: &ServiceInfo,
4143 intf: &MyIntf,
4144 dns_registry: &DnsRegistry,
4145 is_ipv4: bool,
4146) {
4147 let intf_addrs = if is_ipv4 {
4148 service.get_addrs_on_my_intf_v4(intf)
4149 } else {
4150 service.get_addrs_on_my_intf_v6(intf)
4151 };
4152 if intf_addrs.is_empty() {
4153 trace!("No addrs on LAN of intf {:?}", intf);
4154 return;
4155 }
4156
4157 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4159 Some(new_name) => new_name,
4160 None => service.get_fullname(),
4161 };
4162
4163 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4164 Some(new_name) => new_name,
4165 None => service.get_hostname(),
4166 };
4167
4168 let ptr_added = out.add_answer(
4169 msg,
4170 DnsPointer::new(
4171 service.get_type(),
4172 RRType::PTR,
4173 CLASS_IN,
4174 service.get_other_ttl(),
4175 service_fullname.to_string(),
4176 ),
4177 );
4178
4179 if !ptr_added {
4180 trace!("answer was not added for msg {:?}", msg);
4181 return;
4182 }
4183
4184 if let Some(sub) = service.get_subtype() {
4185 trace!("Adding subdomain {}", sub);
4186 out.add_additional_answer(DnsPointer::new(
4187 sub,
4188 RRType::PTR,
4189 CLASS_IN,
4190 service.get_other_ttl(),
4191 service_fullname.to_string(),
4192 ));
4193 }
4194
4195 out.add_additional_answer(DnsSrv::new(
4198 service_fullname,
4199 CLASS_IN | CLASS_CACHE_FLUSH,
4200 service.get_host_ttl(),
4201 service.get_priority(),
4202 service.get_weight(),
4203 service.get_port(),
4204 hostname.to_string(),
4205 ));
4206
4207 out.add_additional_answer(DnsTxt::new(
4208 service_fullname,
4209 CLASS_IN | CLASS_CACHE_FLUSH,
4210 service.get_other_ttl(),
4211 service.generate_txt(),
4212 ));
4213
4214 for address in intf_addrs {
4215 out.add_additional_answer(DnsAddress::new(
4216 hostname,
4217 ip_address_rr_type(&address),
4218 CLASS_IN | CLASS_CACHE_FLUSH,
4219 service.get_host_ttl(),
4220 address,
4221 intf.into(),
4222 ));
4223 }
4224}
4225
4226fn check_probing(
4229 dns_registry: &mut DnsRegistry,
4230 timers: &mut BinaryHeap<Reverse<u64>>,
4231 now: u64,
4232) -> (DnsOutgoing, Vec<String>) {
4233 let mut expired_probes = Vec::new();
4234 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4235
4236 for (name, probe) in dns_registry.probing.iter_mut() {
4237 if now >= probe.next_send {
4238 if probe.expired(now) {
4239 expired_probes.push(name.clone());
4241 } else {
4242 out.add_question(name, RRType::ANY);
4243
4244 for record in probe.records.iter() {
4252 out.add_authority(record.clone());
4253 }
4254
4255 probe.update_next_send(now);
4256
4257 timers.push(Reverse(probe.next_send));
4259 }
4260 }
4261 }
4262
4263 (out, expired_probes)
4264}
4265
4266fn handle_expired_probes(
4271 expired_probes: Vec<String>,
4272 intf_name: &str,
4273 dns_registry: &mut DnsRegistry,
4274 monitors: &mut Vec<Sender<DaemonEvent>>,
4275) -> HashSet<String> {
4276 let mut waiting_services = HashSet::new();
4277
4278 for name in expired_probes {
4279 let Some(probe) = dns_registry.probing.remove(&name) else {
4280 continue;
4281 };
4282
4283 for record in probe.records.iter() {
4285 if let Some(new_name) = record.get_record().get_new_name() {
4286 dns_registry
4287 .name_changes
4288 .insert(name.clone(), new_name.to_string());
4289
4290 let event = DnsNameChange {
4291 original: record.get_record().get_original_name().to_string(),
4292 new_name: new_name.to_string(),
4293 rr_type: record.get_type(),
4294 intf_name: intf_name.to_string(),
4295 };
4296 debug!("Name change event: {:?}", &event);
4297 notify_monitors(monitors, DaemonEvent::NameChange(event));
4298 }
4299 }
4300
4301 debug!(
4303 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4304 probe.records.len(),
4305 probe.waiting_services.len(),
4306 );
4307
4308 if !probe.records.is_empty() {
4310 match dns_registry.active.get_mut(&name) {
4311 Some(records) => {
4312 records.extend(probe.records);
4313 }
4314 None => {
4315 dns_registry.active.insert(name, probe.records);
4316 }
4317 }
4318
4319 waiting_services.extend(probe.waiting_services);
4320 }
4321 }
4322
4323 waiting_services
4324}
4325
4326#[cfg(test)]
4327mod tests {
4328 use super::{
4329 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4330 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4331 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
4332 MDNS_PORT,
4333 };
4334 use crate::{
4335 dns_parser::{
4336 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4337 FLAGS_AA, FLAGS_QR_RESPONSE,
4338 },
4339 service_daemon::{add_answer_of_service, check_hostname},
4340 };
4341 use std::{
4342 net::{SocketAddr, SocketAddrV4},
4343 time::{Duration, SystemTime},
4344 };
4345 use test_log::test;
4346
4347 #[test]
4348 fn test_socketaddr_print() {
4349 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
4350 let print = format!("{}", addr);
4351 assert_eq!(print, "224.0.0.251:5353");
4352 }
4353
4354 #[test]
4355 fn test_instance_name() {
4356 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4357 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4358 assert!(!valid_instance_name("_printer._tcp.local."));
4359 }
4360
4361 #[test]
4362 fn test_check_service_name_length() {
4363 let result = check_service_name_length("_tcp", 100);
4364 assert!(result.is_err());
4365 if let Err(e) = result {
4366 println!("{}", e);
4367 }
4368 }
4369
4370 #[test]
4371 fn test_check_hostname() {
4372 for hostname in &[
4374 "my_host.local.",
4375 &("A".repeat(255 - ".local.".len()) + ".local."),
4376 ] {
4377 let result = check_hostname(hostname);
4378 assert!(result.is_ok());
4379 }
4380
4381 for hostname in &[
4383 "my_host.local",
4384 ".local.",
4385 &("A".repeat(256 - ".local.".len()) + ".local."),
4386 ] {
4387 let result = check_hostname(hostname);
4388 assert!(result.is_err());
4389 if let Err(e) = result {
4390 println!("{}", e);
4391 }
4392 }
4393 }
4394
4395 #[test]
4396 fn test_check_domain_suffix() {
4397 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4398 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4399 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4400 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4401 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4402 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4403 }
4404
4405 #[test]
4406 fn test_service_with_temporarily_invalidated_ptr() {
4407 let d = ServiceDaemon::new().expect("Failed to create daemon");
4409
4410 let service = "_test_inval_ptr._udp.local.";
4411 let host_name = "my_host_tmp_invalidated_ptr.local.";
4412 let intfs: Vec<_> = my_ip_interfaces(false);
4413 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4414 let port = 5201;
4415 let my_service =
4416 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4417 .expect("invalid service info")
4418 .enable_addr_auto();
4419 let result = d.register(my_service.clone());
4420 assert!(result.is_ok());
4421
4422 let browse_chan = d.browse(service).unwrap();
4424 let timeout = Duration::from_secs(2);
4425 let mut resolved = false;
4426
4427 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4428 match event {
4429 ServiceEvent::ServiceResolved(info) => {
4430 resolved = true;
4431 println!("Resolved a service of {}", &info.fullname);
4432 break;
4433 }
4434 e => {
4435 println!("Received event {:?}", e);
4436 }
4437 }
4438 }
4439
4440 assert!(resolved);
4441
4442 println!("Stopping browse of {}", service);
4443 d.stop_browse(service).unwrap();
4446
4447 let mut stopped = false;
4452 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4453 match event {
4454 ServiceEvent::SearchStopped(_) => {
4455 stopped = true;
4456 println!("Stopped browsing service");
4457 break;
4458 }
4459 e => {
4463 println!("Received event {:?}", e);
4464 }
4465 }
4466 }
4467
4468 assert!(stopped);
4469
4470 let invalidate_ptr_packet = DnsPointer::new(
4472 my_service.get_type(),
4473 RRType::PTR,
4474 CLASS_IN,
4475 0,
4476 my_service.get_fullname().to_string(),
4477 );
4478
4479 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4480 packet_buffer.add_additional_answer(invalidate_ptr_packet);
4481
4482 for intf in intfs {
4483 let sock = _new_socket_bind(&intf, true).unwrap();
4484 send_dns_outgoing_impl(
4485 &packet_buffer,
4486 &intf.name,
4487 intf.index.unwrap_or(0),
4488 &intf.addr,
4489 &sock.pktinfo,
4490 );
4491 }
4492
4493 println!(
4494 "Sent PTR record invalidation. Starting second browse for {}",
4495 service
4496 );
4497
4498 let browse_chan = d.browse(service).unwrap();
4500
4501 resolved = false;
4502 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4503 match event {
4504 ServiceEvent::ServiceResolved(info) => {
4505 resolved = true;
4506 println!("Resolved a service of {}", &info.fullname);
4507 break;
4508 }
4509 e => {
4510 println!("Received event {:?}", e);
4511 }
4512 }
4513 }
4514
4515 assert!(resolved);
4516 d.shutdown().unwrap();
4517 }
4518
4519 #[test]
4520 fn test_expired_srv() {
4521 let service_type = "_expired-srv._udp.local.";
4523 let instance = "test_instance";
4524 let host_name = "expired_srv_host.local.";
4525 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4526 .unwrap()
4527 .enable_addr_auto();
4528 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
4533
4534 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4536 let result = mdns_server.register(my_service);
4537 assert!(result.is_ok());
4538
4539 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4540 let browse_chan = mdns_client.browse(service_type).unwrap();
4541 let timeout = Duration::from_secs(2);
4542 let mut resolved = false;
4543
4544 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4545 match event {
4546 ServiceEvent::ServiceResolved(info) => {
4547 resolved = true;
4548 println!("Resolved a service of {}", &info.fullname);
4549 break;
4550 }
4551 _ => {}
4552 }
4553 }
4554
4555 assert!(resolved);
4556
4557 mdns_server.shutdown().unwrap();
4559
4560 let expire_timeout = Duration::from_secs(new_ttl as u64);
4562 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4563 match event {
4564 ServiceEvent::ServiceRemoved(service_type, full_name) => {
4565 println!("Service removed: {}: {}", &service_type, &full_name);
4566 break;
4567 }
4568 _ => {}
4569 }
4570 }
4571 }
4572
4573 #[test]
4574 fn test_hostname_resolution_address_removed() {
4575 let server = ServiceDaemon::new().expect("Failed to create server");
4577 let hostname = "addr_remove_host._tcp.local.";
4578 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4579 .iter()
4580 .find(|iface| iface.ip().is_ipv4())
4581 .map(|iface| iface.ip().into())
4582 .unwrap();
4583
4584 let mut my_service = ServiceInfo::new(
4585 "_host_res_test._tcp.local.",
4586 "my_instance",
4587 hostname,
4588 &service_ip_addr.to_ip_addr(),
4589 1234,
4590 None,
4591 )
4592 .expect("invalid service info");
4593
4594 let addr_ttl = 2;
4596 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
4599
4600 let client = ServiceDaemon::new().expect("Failed to create client");
4602 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4603 let resolved = loop {
4604 match event_receiver.recv() {
4605 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4606 assert!(found_hostname == hostname);
4607 assert!(addresses.contains(&service_ip_addr));
4608 println!("address found: {:?}", &addresses);
4609 break true;
4610 }
4611 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4612 Ok(_event) => {}
4613 Err(_) => break false,
4614 }
4615 };
4616
4617 assert!(resolved);
4618
4619 server.shutdown().unwrap();
4621
4622 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4624 let removed = loop {
4625 match event_receiver.recv_timeout(timeout) {
4626 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4627 assert!(removed_host == hostname);
4628 assert!(addresses.contains(&service_ip_addr));
4629
4630 println!(
4631 "address removed: hostname: {} addresses: {:?}",
4632 &hostname, &addresses
4633 );
4634 break true;
4635 }
4636 Ok(_event) => {}
4637 Err(_) => {
4638 break false;
4639 }
4640 }
4641 };
4642
4643 assert!(removed);
4644
4645 client.shutdown().unwrap();
4646 }
4647
4648 #[test]
4649 fn test_refresh_ptr() {
4650 let service_type = "_refresh-ptr._udp.local.";
4652 let instance = "test_instance";
4653 let host_name = "refresh_ptr_host.local.";
4654 let service_ip_addr = my_ip_interfaces(false)
4655 .iter()
4656 .find(|iface| iface.ip().is_ipv4())
4657 .map(|iface| iface.ip())
4658 .unwrap();
4659
4660 let mut my_service = ServiceInfo::new(
4661 service_type,
4662 instance,
4663 host_name,
4664 &service_ip_addr,
4665 5023,
4666 None,
4667 )
4668 .unwrap();
4669
4670 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4672
4673 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4675 let result = mdns_server.register(my_service);
4676 assert!(result.is_ok());
4677
4678 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4679 let browse_chan = mdns_client.browse(service_type).unwrap();
4680 let timeout = Duration::from_millis(1500); let mut resolved = false;
4682
4683 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4685 match event {
4686 ServiceEvent::ServiceResolved(info) => {
4687 resolved = true;
4688 println!("Resolved a service of {}", &info.fullname);
4689 break;
4690 }
4691 _ => {}
4692 }
4693 }
4694
4695 assert!(resolved);
4696
4697 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4699 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4700 println!("event: {:?}", &event);
4701 }
4702
4703 let metrics_chan = mdns_client.get_metrics().unwrap();
4705 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4706 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4707 assert_eq!(ptr_refresh_counter, 1);
4708 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4709 assert_eq!(srvtxt_refresh_counter, 1);
4710
4711 mdns_server.shutdown().unwrap();
4713 mdns_client.shutdown().unwrap();
4714 }
4715
4716 #[test]
4717 fn test_name_change() {
4718 assert_eq!(name_change("foo.local."), "foo (2).local.");
4719 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4720 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4721 assert_eq!(name_change("foo"), "foo (2)");
4722 assert_eq!(name_change("foo (2)"), "foo (3)");
4723 assert_eq!(name_change(""), " (2)");
4724
4725 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4730
4731 #[test]
4732 fn test_hostname_change() {
4733 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4734 assert_eq!(hostname_change("foo"), "foo-2");
4735 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4736 assert_eq!(hostname_change("foo-9"), "foo-10");
4737 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4738 }
4739
4740 #[test]
4741 fn test_add_answer_txt_ttl() {
4742 let service_type = "_test_add_answer._udp.local.";
4744 let instance = "test_instance";
4745 let host_name = "add_answer_host.local.";
4746 let service_intf = my_ip_interfaces(false)
4747 .into_iter()
4748 .find(|iface| iface.ip().is_ipv4())
4749 .unwrap();
4750 let service_ip_addr = service_intf.ip();
4751 let my_service = ServiceInfo::new(
4752 service_type,
4753 instance,
4754 host_name,
4755 &service_ip_addr,
4756 5023,
4757 None,
4758 )
4759 .unwrap();
4760
4761 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4763
4764 let mut dummy_data = out.to_data_on_wire();
4766 let interface_id = InterfaceId::from(&service_intf);
4767 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4768
4769 let if_addrs = vec![service_intf.ip()];
4771 add_answer_of_service(
4772 &mut out,
4773 &incoming,
4774 instance,
4775 &my_service,
4776 RRType::TXT,
4777 if_addrs,
4778 );
4779
4780 assert!(
4782 out.answers_count() > 0,
4783 "No answers added to the outgoing message"
4784 );
4785
4786 let answer = out._answers().first().unwrap();
4788 assert_eq!(answer.0.get_type(), RRType::TXT);
4789
4790 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4792 }
4793
4794 #[test]
4795 fn test_interface_flip() {
4796 let ty_domain = "_intf-flip._udp.local.";
4798 let host_name = "intf_flip.local.";
4799 let now = SystemTime::now()
4800 .duration_since(SystemTime::UNIX_EPOCH)
4801 .unwrap();
4802 let instance_name = now.as_micros().to_string(); let port = 5200;
4804
4805 let (ip_addr1, intf_name) = my_ip_interfaces(false)
4807 .iter()
4808 .find(|iface| iface.ip().is_ipv4())
4809 .map(|iface| (iface.ip(), iface.name.clone()))
4810 .unwrap();
4811
4812 println!("Using interface {} with IP {}", intf_name, ip_addr1);
4813
4814 let service1 =
4816 ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
4817 .expect("valid service info");
4818 let server1 = ServiceDaemon::new().expect("failed to start server");
4819 server1
4820 .register(service1)
4821 .expect("Failed to register service1");
4822
4823 std::thread::sleep(Duration::from_secs(2));
4825
4826 let client = ServiceDaemon::new().expect("failed to start client");
4828
4829 let receiver = client.browse(ty_domain).unwrap();
4830
4831 let timeout = Duration::from_secs(3);
4832 let mut got_data = false;
4833
4834 while let Ok(event) = receiver.recv_timeout(timeout) {
4835 match event {
4836 ServiceEvent::ServiceResolved(_) => {
4837 println!("Received ServiceResolved event");
4838 got_data = true;
4839 break;
4840 }
4841 _ => {}
4842 }
4843 }
4844
4845 assert!(got_data, "Should receive ServiceResolved event");
4846
4847 client.set_ip_check_interval(1).unwrap();
4849
4850 println!("Shutting down interface {}", &intf_name);
4852 client.test_down_interface(&intf_name).unwrap();
4853
4854 let mut got_removed = false;
4855
4856 while let Ok(event) = receiver.recv_timeout(timeout) {
4857 match event {
4858 ServiceEvent::ServiceRemoved(ty_domain, instance) => {
4859 got_removed = true;
4860 println!("removed: {ty_domain} : {instance}");
4861 break;
4862 }
4863 _ => {}
4864 }
4865 }
4866 assert!(got_removed, "Should receive ServiceRemoved event");
4867
4868 println!("Bringing up interface {}", &intf_name);
4869 client.test_up_interface(&intf_name).unwrap();
4870 let mut got_data = false;
4871 while let Ok(event) = receiver.recv_timeout(timeout) {
4872 match event {
4873 ServiceEvent::ServiceResolved(resolved) => {
4874 got_data = true;
4875 println!("Received ServiceResolved: {:?}", resolved);
4876 break;
4877 }
4878 _ => {}
4879 }
4880 }
4881 assert!(
4882 got_data,
4883 "Should receive ServiceResolved event after interface is back up"
4884 );
4885
4886 server1.shutdown().unwrap();
4887 client.shutdown().unwrap();
4888 }
4889
4890 #[test]
4891 fn test_cache_only() {
4892 let service_type = "_cache_only._udp.local.";
4894 let instance = "test_instance";
4895 let host_name = "cache_only_host.local.";
4896 let service_ip_addr = my_ip_interfaces(false)
4897 .iter()
4898 .find(|iface| iface.ip().is_ipv4())
4899 .map(|iface| iface.ip())
4900 .unwrap();
4901
4902 let mut my_service = ServiceInfo::new(
4903 service_type,
4904 instance,
4905 host_name,
4906 &service_ip_addr,
4907 5023,
4908 None,
4909 )
4910 .unwrap();
4911
4912 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4914
4915 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4916
4917 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4920 std::thread::sleep(Duration::from_secs(2));
4921
4922 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4924 let result = mdns_server.register(my_service);
4925 assert!(result.is_ok());
4926
4927 let timeout = Duration::from_millis(1500); let mut resolved = false;
4929
4930 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4932 match event {
4933 ServiceEvent::ServiceResolved(info) => {
4934 resolved = true;
4935 println!("Resolved a service of {}", &info.get_fullname());
4936 break;
4937 }
4938 _ => {}
4939 }
4940 }
4941
4942 assert!(resolved);
4943
4944 mdns_server.shutdown().unwrap();
4946 mdns_client.shutdown().unwrap();
4947 }
4948
4949 #[test]
4950 fn test_cache_only_unsolicited() {
4951 let service_type = "_cache_only._udp.local.";
4953 let instance = "test_instance";
4954 let host_name = "cache_only_host.local.";
4955 let service_ip_addr = my_ip_interfaces(false)
4956 .iter()
4957 .find(|iface| iface.ip().is_ipv4())
4958 .map(|iface| iface.ip())
4959 .unwrap();
4960
4961 let mut my_service = ServiceInfo::new(
4962 service_type,
4963 instance,
4964 host_name,
4965 &service_ip_addr,
4966 5023,
4967 None,
4968 )
4969 .unwrap();
4970
4971 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4973
4974 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4976 let result = mdns_server.register(my_service);
4977 assert!(result.is_ok());
4978
4979 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4980 mdns_client.accept_unsolicited(true).unwrap();
4981
4982 std::thread::sleep(Duration::from_secs(2));
4985 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4986 let timeout = Duration::from_millis(1500); let mut resolved = false;
4988
4989 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4991 match event {
4992 ServiceEvent::ServiceResolved(info) => {
4993 resolved = true;
4994 println!("Resolved a service of {}", &info.get_fullname());
4995 break;
4996 }
4997 _ => {}
4998 }
4999 }
5000
5001 assert!(resolved);
5002
5003 mdns_server.shutdown().unwrap();
5005 mdns_client.shutdown().unwrap();
5006 }
5007}