1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{split_sub_domain, DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42 Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50 cmp::{self, Reverse},
51 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52 fmt, io,
53 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54 str, thread,
55 time::Duration,
56 vec,
57};
58
59pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78#[derive(Debug)]
80pub enum UnregisterStatus {
81 OK,
83 NotFound,
85}
86
87#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91 Running,
93
94 Shutdown,
96}
97
98#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102 Register,
103 RegisterResend,
104 Unregister,
105 UnregisterResend,
106 Browse,
107 ResolveHostname,
108 Respond,
109 CacheRefreshPTR,
110 CacheRefreshSrvTxt,
111 CacheRefreshAddr,
112 KnownAnswerSuppression,
113 CachedPTR,
114 CachedSRV,
115 CachedAddr,
116 CachedTxt,
117 CachedNSec,
118 CachedSubtype,
119 DnsRegistryProbe,
120 DnsRegistryActive,
121 DnsRegistryTimer,
122 DnsRegistryNameChange,
123 Timer,
124}
125
126impl fmt::Display for Counter {
127 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128 match self {
129 Self::Register => write!(f, "register"),
130 Self::RegisterResend => write!(f, "register-resend"),
131 Self::Unregister => write!(f, "unregister"),
132 Self::UnregisterResend => write!(f, "unregister-resend"),
133 Self::Browse => write!(f, "browse"),
134 Self::ResolveHostname => write!(f, "resolve-hostname"),
135 Self::Respond => write!(f, "respond"),
136 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140 Self::CachedPTR => write!(f, "cached-ptr"),
141 Self::CachedSRV => write!(f, "cached-srv"),
142 Self::CachedAddr => write!(f, "cached-addr"),
143 Self::CachedTxt => write!(f, "cached-txt"),
144 Self::CachedNSec => write!(f, "cached-nsec"),
145 Self::CachedSubtype => write!(f, "cached-subtype"),
146 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150 Self::Timer => write!(f, "timer"),
151 }
152 }
153}
154
155struct MyUdpSocket {
160 pktinfo: PktInfoUdpSocket,
163
164 mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171 let std_sock = pktinfo.try_clone_std()?;
172 let mio = MioUdpSocket::from_std(std_sock);
173
174 Ok(Self { pktinfo, mio })
175 }
176}
177
178impl Source for MyUdpSocket {
180 fn register(
181 &mut self,
182 registry: &Registry,
183 token: Token,
184 interests: Interest,
185 ) -> io::Result<()> {
186 self.mio.register(registry, token, interests)
187 }
188
189 fn reregister(
190 &mut self,
191 registry: &Registry,
192 token: Token,
193 interests: Interest,
194 ) -> io::Result<()> {
195 self.mio.reregister(registry, token, interests)
196 }
197
198 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199 self.mio.deregister(registry)
200 }
201}
202
203pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
215pub struct ServiceDaemon {
216 sender: Sender<Command>,
218
219 signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228 pub fn new() -> Result<Self> {
233 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237 let signal_sock = UdpSocket::bind(signal_addr)
238 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240 let signal_addr = signal_sock
242 .local_addr()
243 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245 signal_sock
247 .set_nonblocking(true)
248 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252 let (sender, receiver) = bounded(100);
253
254 let mio_sock = MioUdpSocket::from_std(signal_sock);
256 thread::Builder::new()
257 .name("mDNS_daemon".to_string())
258 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261 Ok(Self {
262 sender,
263 signal_addr,
264 })
265 }
266
267 fn send_cmd(&self, cmd: Command) -> Result<()> {
270 let cmd_name = cmd.to_string();
271
272 self.sender.try_send(cmd).map_err(|e| match e {
274 TrySendError::Full(_) => Error::Again,
275 e => e_fmt!("flume::channel::send failed: {}", e),
276 })?;
277
278 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280 let socket = UdpSocket::bind(addr)
281 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282 socket
283 .send_to(cmd_name.as_bytes(), self.signal_addr)
284 .map_err(|e| {
285 e_fmt!(
286 "signal socket send_to {} ({}) failed: {}",
287 self.signal_addr,
288 cmd_name,
289 e
290 )
291 })?;
292
293 Ok(())
294 }
295
296 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307 check_domain_suffix(service_type)?;
308
309 let (resp_s, resp_r) = bounded(10);
310 self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
311 Ok(resp_r)
312 }
313
314 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
319 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
320 }
321
322 pub fn resolve_hostname(
330 &self,
331 hostname: &str,
332 timeout: Option<u64>,
333 ) -> Result<Receiver<HostnameResolutionEvent>> {
334 check_hostname(hostname)?;
335 let (resp_s, resp_r) = bounded(10);
336 self.send_cmd(Command::ResolveHostname(
337 hostname.to_string(),
338 1,
339 resp_s,
340 timeout,
341 ))?;
342 Ok(resp_r)
343 }
344
345 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
350 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
351 }
352
353 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
361 check_service_name(service_info.get_fullname())?;
362 check_hostname(service_info.get_hostname())?;
363
364 self.send_cmd(Command::Register(service_info))
365 }
366
367 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
375 let (resp_s, resp_r) = bounded(1);
376 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
377 Ok(resp_r)
378 }
379
380 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
384 let (resp_s, resp_r) = bounded(100);
385 self.send_cmd(Command::Monitor(resp_s))?;
386 Ok(resp_r)
387 }
388
389 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
394 let (resp_s, resp_r) = bounded(1);
395 self.send_cmd(Command::Exit(resp_s))?;
396 Ok(resp_r)
397 }
398
399 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
405 let (resp_s, resp_r) = bounded(1);
406
407 if self.sender.is_disconnected() {
408 resp_s
409 .send(DaemonStatus::Shutdown)
410 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
411 } else {
412 self.send_cmd(Command::GetStatus(resp_s))?;
413 }
414
415 Ok(resp_r)
416 }
417
418 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
423 let (resp_s, resp_r) = bounded(1);
424 self.send_cmd(Command::GetMetrics(resp_s))?;
425 Ok(resp_r)
426 }
427
428 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
435 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
438 return Err(Error::Msg(format!(
439 "service name length max {len_max} is too large"
440 )));
441 }
442
443 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
444 }
445
446 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
452 let interval_in_millis = interval_in_secs as u64 * 1000;
453 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
454 interval_in_millis,
455 )))
456 }
457
458 pub fn get_ip_check_interval(&self) -> Result<u32> {
460 let (resp_s, resp_r) = bounded(1);
461 self.send_cmd(Command::GetOption(resp_s))?;
462
463 let option = resp_r
464 .recv_timeout(Duration::from_secs(10))
465 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
466 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
467 Ok(ip_check_interval_in_secs as u32)
468 }
469
470 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
477 let if_kind_vec = if_kind.into_vec();
478 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
479 if_kind_vec.kinds,
480 )))
481 }
482
483 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
490 let if_kind_vec = if_kind.into_vec();
491 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
492 if_kind_vec.kinds,
493 )))
494 }
495
496 #[cfg(test)]
497 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
498 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
499 ifname.to_string(),
500 )))
501 }
502
503 #[cfg(test)]
504 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
505 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
506 ifname.to_string(),
507 )))
508 }
509
510 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
526 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
527 }
528
529 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
545 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
546 }
547
548 pub fn use_service_data(&self, on: bool) -> Result<()> {
552 self.send_cmd(Command::SetOption(DaemonOption::UseServiceData(on)))
553 }
554
555 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
568 self.send_cmd(Command::Verify(instance_fullname, timeout))
569 }
570
571 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
572 let mut zc = Zeroconf::new(signal_sock, poller);
573
574 if let Some(cmd) = zc.run(receiver) {
575 match cmd {
576 Command::Exit(resp_s) => {
577 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
580 debug!("exit: failed to send response of shutdown: {}", e);
581 }
582 }
583 _ => {
584 debug!("Unexpected command: {:?}", cmd);
585 }
586 }
587 }
588 }
589}
590
591fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
593 let intf_ip = &intf.ip();
596 match intf_ip {
597 IpAddr::V4(ip) => {
598 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
599 let sock = new_socket(addr.into(), true)?;
600
601 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
603 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
604
605 sock.set_multicast_if_v4(ip)
607 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
608
609 sock.set_multicast_ttl_v4(255)
614 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
615
616 if !should_loop {
617 sock.set_multicast_loop_v4(false)
618 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
619 }
620
621 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
623 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
624 for packet in test_packets {
625 sock.send_to(&packet, &multicast_addr)
626 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
627 }
628 MyUdpSocket::new(sock)
629 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
630 }
631 IpAddr::V6(ip) => {
632 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
633 let sock = new_socket(addr.into(), true)?;
634
635 let if_index = intf.index.unwrap_or(0);
636
637 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
639 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
640
641 sock.set_multicast_if_v6(if_index)
643 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
644
645 MyUdpSocket::new(sock)
650 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
651 }
652 }
653}
654
655fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
658 let domain = match addr {
659 SocketAddr::V4(_) => socket2::Domain::IPV4,
660 SocketAddr::V6(_) => socket2::Domain::IPV6,
661 };
662
663 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
664
665 fd.set_reuse_address(true)
666 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
667 #[cfg(unix)] fd.set_reuse_port(true)
669 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
670
671 if non_block {
672 fd.set_nonblocking(true)
673 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
674 }
675
676 fd.bind(&addr.into())
677 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
678
679 trace!("new socket bind to {}", &addr);
680 Ok(fd)
681}
682
683struct ReRun {
685 next_time: u64,
687 command: Command,
688}
689
690#[derive(Debug, Clone)]
694#[non_exhaustive]
695pub enum IfKind {
696 All,
698
699 IPv4,
701
702 IPv6,
704
705 Name(String),
707
708 Addr(IpAddr),
710
711 LoopbackV4,
716
717 LoopbackV6,
719}
720
721impl IfKind {
722 fn matches(&self, intf: &Interface) -> bool {
724 match self {
725 Self::All => true,
726 Self::IPv4 => intf.ip().is_ipv4(),
727 Self::IPv6 => intf.ip().is_ipv6(),
728 Self::Name(ifname) => ifname == &intf.name,
729 Self::Addr(addr) => addr == &intf.ip(),
730 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
731 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
732 }
733 }
734}
735
736impl From<&str> for IfKind {
739 fn from(val: &str) -> Self {
740 Self::Name(val.to_string())
741 }
742}
743
744impl From<&String> for IfKind {
745 fn from(val: &String) -> Self {
746 Self::Name(val.to_string())
747 }
748}
749
750impl From<IpAddr> for IfKind {
752 fn from(val: IpAddr) -> Self {
753 Self::Addr(val)
754 }
755}
756
757pub struct IfKindVec {
759 kinds: Vec<IfKind>,
760}
761
762pub trait IntoIfKindVec {
764 fn into_vec(self) -> IfKindVec;
765}
766
767impl<T: Into<IfKind>> IntoIfKindVec for T {
768 fn into_vec(self) -> IfKindVec {
769 let if_kind: IfKind = self.into();
770 IfKindVec {
771 kinds: vec![if_kind],
772 }
773 }
774}
775
776impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
777 fn into_vec(self) -> IfKindVec {
778 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
779 IfKindVec { kinds }
780 }
781}
782
783struct IfSelection {
785 if_kind: IfKind,
787
788 selected: bool,
790}
791
792struct Zeroconf {
794 my_intfs: HashMap<u32, MyIntf>,
796
797 ipv4_sock: MyUdpSocket,
799
800 ipv6_sock: MyUdpSocket,
802
803 my_services: HashMap<String, ServiceInfo>,
805
806 cache: DnsCache,
808
809 dns_registry_map: HashMap<u32, DnsRegistry>,
811
812 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
823
824 counters: Metrics,
825
826 poller: Poll,
828
829 monitors: Vec<Sender<DaemonEvent>>,
831
832 service_name_len_max: u8,
834
835 ip_check_interval: u64,
837
838 if_selections: Vec<IfSelection>,
840
841 signal_sock: MioUdpSocket,
843
844 timers: BinaryHeap<Reverse<u64>>,
850
851 status: DaemonStatus,
852
853 pending_resolves: HashSet<String>,
855
856 resolved: HashSet<String>,
858
859 multicast_loop_v4: bool,
860
861 multicast_loop_v6: bool,
862
863 use_service_data: bool,
865
866 #[cfg(test)]
867 test_down_interfaces: HashSet<String>,
868}
869
870fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
872 let intf_ip = &intf.ip();
873 match intf_ip {
874 IpAddr::V4(ip) => {
875 debug!("join multicast group V4 on addr {}", ip);
877 my_sock
878 .join_multicast_v4(&GROUP_ADDR_V4, ip)
879 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
880 }
881 IpAddr::V6(ip) => {
882 let if_index = intf.index.unwrap_or(0);
883 debug!(
885 "join multicast group V6 on addr {} with index {}",
886 ip, if_index
887 );
888 my_sock
889 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
890 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
891 }
892 }
893 Ok(())
894}
895
896impl Zeroconf {
897 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
898 let my_ifaddrs = my_ip_interfaces(false);
900
901 let mut my_intfs = HashMap::new();
905 let mut dns_registry_map = HashMap::new();
906
907 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
910 let sock = new_socket(addr.into(), true).unwrap();
911
912 sock.set_multicast_ttl_v4(255)
917 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
918 .unwrap();
919
920 let ipv4_sock = MyUdpSocket::new(sock).unwrap();
922
923 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
924 let sock = new_socket(addr.into(), true).unwrap();
925
926 sock.set_multicast_hops_v6(255)
930 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
931 .unwrap();
932
933 let ipv6_sock = MyUdpSocket::new(sock).unwrap();
935
936 for intf in my_ifaddrs {
938 let sock = if intf.ip().is_ipv4() {
939 &ipv4_sock
940 } else {
941 &ipv6_sock
942 };
943
944 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
945 debug!(
946 "config socket to join multicast: {}: {e}. Skipped.",
947 &intf.ip()
948 );
949 }
950
951 let if_index = intf.index.unwrap_or(0);
952
953 dns_registry_map
955 .entry(if_index)
956 .or_insert_with(DnsRegistry::new);
957
958 my_intfs
959 .entry(if_index)
960 .and_modify(|v: &mut MyIntf| {
961 v.addrs.insert(intf.addr.clone());
962 })
963 .or_insert(MyIntf {
964 name: intf.name.clone(),
965 index: if_index,
966 addrs: HashSet::from([intf.addr]),
967 });
968 }
969
970 let monitors = Vec::new();
971 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
972 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
973
974 let timers = BinaryHeap::new();
975
976 let if_selections = vec![
978 IfSelection {
979 if_kind: IfKind::LoopbackV4,
980 selected: false,
981 },
982 IfSelection {
983 if_kind: IfKind::LoopbackV6,
984 selected: false,
985 },
986 ];
987
988 let status = DaemonStatus::Running;
989
990 Self {
991 my_intfs,
992 ipv4_sock,
993 ipv6_sock,
994 my_services: HashMap::new(),
996 cache: DnsCache::new(),
997 dns_registry_map,
998 hostname_resolvers: HashMap::new(),
999 service_queriers: HashMap::new(),
1000 retransmissions: Vec::new(),
1001 counters: HashMap::new(),
1002 poller,
1003 monitors,
1004 service_name_len_max,
1005 ip_check_interval,
1006 if_selections,
1007 signal_sock,
1008 timers,
1009 status,
1010 pending_resolves: HashSet::new(),
1011 resolved: HashSet::new(),
1012 multicast_loop_v4: true,
1013 multicast_loop_v6: true,
1014 use_service_data: false,
1015
1016 #[cfg(test)]
1017 test_down_interfaces: HashSet::new(),
1018 }
1019 }
1020
1021 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1030 if let Err(e) = self.poller.registry().register(
1032 &mut self.signal_sock,
1033 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1034 mio::Interest::READABLE,
1035 ) {
1036 debug!("failed to add signal socket to the poller: {}", e);
1037 return None;
1038 }
1039
1040 if let Err(e) = self.poller.registry().register(
1041 &mut self.ipv4_sock,
1042 mio::Token(IPV4_SOCK_EVENT_KEY),
1043 mio::Interest::READABLE,
1044 ) {
1045 debug!("failed to register ipv4 socket: {}", e);
1046 return None;
1047 }
1048
1049 if let Err(e) = self.poller.registry().register(
1050 &mut self.ipv6_sock,
1051 mio::Token(IPV6_SOCK_EVENT_KEY),
1052 mio::Interest::READABLE,
1053 ) {
1054 debug!("failed to register ipv6 socket: {}", e);
1055 return None;
1056 }
1057
1058 let mut next_ip_check = if self.ip_check_interval > 0 {
1060 current_time_millis() + self.ip_check_interval
1061 } else {
1062 0
1063 };
1064
1065 if next_ip_check > 0 {
1066 self.add_timer(next_ip_check);
1067 }
1068
1069 let mut events = mio::Events::with_capacity(1024);
1072 loop {
1073 let now = current_time_millis();
1074
1075 let earliest_timer = self.peek_earliest_timer();
1076 let timeout = earliest_timer.map(|timer| {
1077 let millis = if timer > now { timer - now } else { 1 };
1079 Duration::from_millis(millis)
1080 });
1081
1082 events.clear();
1084 match self.poller.poll(&mut events, timeout) {
1085 Ok(_) => self.handle_poller_events(&events),
1086 Err(e) => debug!("failed to select from sockets: {}", e),
1087 }
1088
1089 let now = current_time_millis();
1090
1091 self.pop_timers_till(now);
1093
1094 for hostname in self
1096 .hostname_resolvers
1097 .clone()
1098 .into_iter()
1099 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1100 .map(|(hostname, _)| hostname)
1101 {
1102 trace!("hostname resolver timeout for {}", &hostname);
1103 call_hostname_resolution_listener(
1104 &self.hostname_resolvers,
1105 &hostname,
1106 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1107 );
1108 call_hostname_resolution_listener(
1109 &self.hostname_resolvers,
1110 &hostname,
1111 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1112 );
1113 self.hostname_resolvers.remove(&hostname);
1114 }
1115
1116 while let Ok(command) = receiver.try_recv() {
1118 if matches!(command, Command::Exit(_)) {
1119 self.status = DaemonStatus::Shutdown;
1120 return Some(command);
1121 }
1122 self.exec_command(command, false);
1123 }
1124
1125 let mut i = 0;
1127 while i < self.retransmissions.len() {
1128 if now >= self.retransmissions[i].next_time {
1129 let rerun = self.retransmissions.remove(i);
1130 self.exec_command(rerun.command, true);
1131 } else {
1132 i += 1;
1133 }
1134 }
1135
1136 self.refresh_active_services();
1138
1139 let mut query_count = 0;
1141 for (hostname, _sender) in self.hostname_resolvers.iter() {
1142 for (hostname, ip_addr) in
1143 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1144 {
1145 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1146 query_count += 1;
1147 }
1148 }
1149
1150 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1151
1152 let now = current_time_millis();
1154
1155 let expired_services = self.cache.evict_expired_services(now);
1157 if !expired_services.is_empty() {
1158 debug!(
1159 "run: send {} service removal to listeners",
1160 expired_services.len()
1161 );
1162 self.notify_service_removal(expired_services);
1163 }
1164
1165 let expired_addrs = self.cache.evict_expired_addr(now);
1167 for (hostname, addrs) in expired_addrs {
1168 call_hostname_resolution_listener(
1169 &self.hostname_resolvers,
1170 &hostname,
1171 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1172 );
1173 let instances = self.cache.get_instances_on_host(&hostname);
1174 let instance_set: HashSet<String> = instances.into_iter().collect();
1175 self.resolve_updated_instances(&instance_set);
1176 }
1177
1178 self.probing_handler();
1180
1181 if now >= next_ip_check && next_ip_check > 0 {
1183 next_ip_check = now + self.ip_check_interval;
1184 self.add_timer(next_ip_check);
1185
1186 self.check_ip_changes();
1187 }
1188 }
1189 }
1190
1191 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1192 match daemon_opt {
1193 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1194 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1195 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1196 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1197 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1198 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1199 DaemonOption::UseServiceData(on) => self.use_service_data = on,
1200 #[cfg(test)]
1201 DaemonOption::TestDownInterface(ifname) => {
1202 self.test_down_interfaces.insert(ifname);
1203 }
1204 #[cfg(test)]
1205 DaemonOption::TestUpInterface(ifname) => {
1206 self.test_down_interfaces.remove(&ifname);
1207 }
1208 }
1209 }
1210
1211 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1212 debug!("enable_interface: {:?}", kinds);
1213 for if_kind in kinds {
1214 self.if_selections.push(IfSelection {
1215 if_kind,
1216 selected: true,
1217 });
1218 }
1219
1220 self.apply_intf_selections(my_ip_interfaces(true));
1221 }
1222
1223 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1224 debug!("disable_interface: {:?}", kinds);
1225 for if_kind in kinds {
1226 self.if_selections.push(IfSelection {
1227 if_kind,
1228 selected: false,
1229 });
1230 }
1231
1232 self.apply_intf_selections(my_ip_interfaces(true));
1233 }
1234
1235 fn set_multicast_loop_v4(&mut self, on: bool) {
1236 self.multicast_loop_v4 = on;
1237 self.ipv4_sock
1238 .pktinfo
1239 .set_multicast_loop_v4(on)
1240 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1241 .unwrap();
1242 }
1243
1244 fn set_multicast_loop_v6(&mut self, on: bool) {
1245 self.multicast_loop_v6 = on;
1246 self.ipv6_sock
1247 .pktinfo
1248 .set_multicast_loop_v6(on)
1249 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1250 .unwrap();
1251 }
1252
1253 fn notify_monitors(&mut self, event: DaemonEvent) {
1254 self.monitors.retain(|sender| {
1256 if let Err(e) = sender.try_send(event.clone()) {
1257 debug!("notify_monitors: try_send: {}", &e);
1258 if matches!(e, TrySendError::Disconnected(_)) {
1259 return false; }
1261 }
1262 true
1263 });
1264 }
1265
1266 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1268 for (_, service_info) in self.my_services.iter_mut() {
1269 if service_info.is_addr_auto() {
1270 service_info.remove_ipaddr(addr);
1271 }
1272 }
1273 }
1274
1275 fn add_timer(&mut self, next_time: u64) {
1276 self.timers.push(Reverse(next_time));
1277 }
1278
1279 fn peek_earliest_timer(&self) -> Option<u64> {
1280 self.timers.peek().map(|Reverse(v)| *v)
1281 }
1282
1283 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1284 self.timers.pop().map(|Reverse(v)| v)
1285 }
1286
1287 fn pop_timers_till(&mut self, now: u64) {
1289 while let Some(Reverse(v)) = self.timers.peek() {
1290 if *v > now {
1291 break;
1292 }
1293 self.timers.pop();
1294 }
1295 }
1296
1297 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1299 let intf_count = interfaces.len();
1300 let mut intf_selections = vec![true; intf_count];
1301
1302 for selection in self.if_selections.iter() {
1304 for i in 0..intf_count {
1306 if selection.if_kind.matches(&interfaces[i]) {
1307 intf_selections[i] = selection.selected;
1308 }
1309 }
1310 }
1311
1312 let mut selected_addrs = HashSet::new();
1313 for i in 0..intf_count {
1314 if intf_selections[i] {
1315 selected_addrs.insert(interfaces[i].addr.ip());
1316 }
1317 }
1318
1319 selected_addrs
1320 }
1321
1322 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1327 let intf_count = interfaces.len();
1329 let mut intf_selections = vec![true; intf_count];
1330
1331 for selection in self.if_selections.iter() {
1333 for i in 0..intf_count {
1335 if selection.if_kind.matches(&interfaces[i]) {
1336 intf_selections[i] = selection.selected;
1337 }
1338 }
1339 }
1340
1341 for (idx, intf) in interfaces.into_iter().enumerate() {
1343 if intf_selections[idx] {
1344 self.add_interface(intf);
1346 } else {
1347 self.del_interface(&intf);
1349 }
1350 }
1351 }
1352
1353 fn del_ip(&mut self, ip: IpAddr) {
1354 self.del_addr_in_my_services(&ip);
1355 self.notify_monitors(DaemonEvent::IpDel(ip));
1356 }
1357
1358 fn check_ip_changes(&mut self) {
1360 let my_ifaddrs = my_ip_interfaces(true);
1362
1363 #[cfg(test)]
1364 let my_ifaddrs: Vec<_> = my_ifaddrs
1365 .into_iter()
1366 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1367 .collect();
1368
1369 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1370 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1371 let if_index = intf.index.unwrap_or(0);
1372 acc.entry(if_index).or_default().push(&intf.addr);
1373 acc
1374 });
1375
1376 let mut deleted_intfs = Vec::new();
1377 let mut deleted_ips = Vec::new();
1378
1379 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1380 let mut last_ipv4 = None;
1381 let mut last_ipv6 = None;
1382
1383 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1384 my_intf.addrs.retain(|addr| {
1385 if current_addrs.contains(&addr) {
1386 true
1387 } else {
1388 match addr.ip() {
1389 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1390 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1391 }
1392 deleted_ips.push(addr.ip());
1393 false
1394 }
1395 });
1396 if my_intf.addrs.is_empty() {
1397 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1398 }
1399 } else {
1400 debug!(
1402 "check_ip_changes: interface {} ({}) no longer exists, removing",
1403 my_intf.name, if_index
1404 );
1405 for addr in my_intf.addrs.iter() {
1406 match addr.ip() {
1407 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1408 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1409 }
1410 deleted_ips.push(addr.ip())
1411 }
1412 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1413 }
1414 }
1415
1416 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1417 debug!(
1418 "check_ip_changes: {} deleted ips {} deleted intfs",
1419 deleted_ips.len(),
1420 deleted_intfs.len()
1421 );
1422 }
1423
1424 for ip in deleted_ips {
1425 self.del_ip(ip);
1426 }
1427
1428 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1429 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1430 continue;
1431 };
1432
1433 if let Some(ipv4) = last_ipv4 {
1434 debug!("leave multicast for {ipv4}");
1435 if let Err(e) = self
1436 .ipv4_sock
1437 .pktinfo
1438 .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1439 {
1440 debug!("leave multicast group for addr {ipv4}: {e}");
1441 }
1442 }
1443
1444 if let Some(ipv6) = last_ipv6 {
1445 debug!("leave multicast for {ipv6}");
1446 if let Err(e) = self
1447 .ipv6_sock
1448 .pktinfo
1449 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1450 {
1451 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1452 }
1453 }
1454
1455 let intf_id = InterfaceId {
1457 name: my_intf.name.to_string(),
1458 index: my_intf.index,
1459 };
1460 let removed_instances = self.cache.remove_records_on_intf(intf_id);
1461 self.notify_service_removal(removed_instances);
1462 }
1463
1464 self.apply_intf_selections(my_ifaddrs);
1466 }
1467
1468 fn del_interface(&mut self, intf: &Interface) {
1469 let if_index = intf.index.unwrap_or(0);
1470 trace!(
1471 "del_interface: {} ({if_index}) addr {}",
1472 intf.name,
1473 intf.ip()
1474 );
1475
1476 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1477 debug!("del_interface: interface {} not found", intf.name);
1478 return;
1479 };
1480
1481 let mut ip_removed = false;
1482
1483 if my_intf.addrs.remove(&intf.addr) {
1484 ip_removed = true;
1485
1486 match intf.addr.ip() {
1487 IpAddr::V4(ipv4) => {
1488 if my_intf.next_ifaddr_v4().is_none() {
1489 if let Err(e) = self
1490 .ipv4_sock
1491 .pktinfo
1492 .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1493 {
1494 debug!("leave multicast group for addr {ipv4}: {e}");
1495 }
1496 }
1497 }
1498
1499 IpAddr::V6(ipv6) => {
1500 if my_intf.next_ifaddr_v6().is_none() {
1501 if let Err(e) = self
1502 .ipv6_sock
1503 .pktinfo
1504 .leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1505 {
1506 debug!("leave multicast group for addr {ipv6}: {e}");
1507 }
1508 }
1509 }
1510 }
1511
1512 if my_intf.addrs.is_empty() {
1513 debug!("del_interface: removing interface {}", intf.name);
1515 self.my_intfs.remove(&if_index);
1516 self.dns_registry_map.remove(&if_index);
1517 self.cache.remove_addrs_on_disabled_intf(if_index);
1518 }
1519 }
1520
1521 if ip_removed {
1522 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1524 self.del_addr_in_my_services(&intf.ip());
1526 }
1527 }
1528
1529 fn add_interface(&mut self, intf: Interface) {
1530 let sock = if intf.ip().is_ipv4() {
1531 &self.ipv4_sock
1532 } else {
1533 &self.ipv6_sock
1534 };
1535
1536 let if_index = intf.index.unwrap_or(0);
1537 let mut new_addr = false;
1538
1539 match self.my_intfs.entry(if_index) {
1540 Entry::Occupied(mut entry) => {
1541 let my_intf = entry.get_mut();
1543 if !my_intf.addrs.contains(&intf.addr) {
1544 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1545 debug!("add_interface: socket_config {}: {e}", &intf.name);
1546 }
1547 my_intf.addrs.insert(intf.addr.clone());
1548 new_addr = true;
1549 }
1550 }
1551 Entry::Vacant(entry) => {
1552 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1553 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1554 return;
1555 }
1556
1557 new_addr = true;
1558 let new_intf = MyIntf {
1559 name: intf.name.clone(),
1560 index: if_index,
1561 addrs: HashSet::from([intf.addr.clone()]),
1562 };
1563 entry.insert(new_intf);
1564 }
1565 }
1566
1567 if !new_addr {
1568 trace!("add_interface: interface {} already exists", &intf.name);
1569 return;
1570 }
1571
1572 debug!("add new interface {}: {}", intf.name, intf.ip());
1573
1574 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1575 debug!("add_interface: cannot find if_index {if_index}");
1576 return;
1577 };
1578
1579 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1580 Some(registry) => registry,
1581 None => self
1582 .dns_registry_map
1583 .entry(if_index)
1584 .or_insert_with(DnsRegistry::new),
1585 };
1586
1587 for (_, service_info) in self.my_services.iter_mut() {
1588 if service_info.is_addr_auto() {
1589 let new_ip = intf.ip();
1590 service_info.insert_ipaddr(new_ip);
1591
1592 if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1593 debug!(
1594 "Announce service {} on {}",
1595 service_info.get_fullname(),
1596 intf.ip()
1597 );
1598 service_info.set_status(if_index, ServiceStatus::Announced);
1599 } else {
1600 for timer in dns_registry.new_timers.drain(..) {
1601 self.timers.push(Reverse(timer));
1602 }
1603 service_info.set_status(if_index, ServiceStatus::Probing);
1604 }
1605 }
1606 }
1607
1608 let mut browse_reruns = Vec::new();
1610 let mut i = 0;
1611 while i < self.retransmissions.len() {
1612 if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1613 browse_reruns.push(self.retransmissions.remove(i));
1614 } else {
1615 i += 1;
1616 }
1617 }
1618
1619 for rerun in browse_reruns {
1620 self.exec_command(rerun.command, true);
1621 }
1622
1623 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1625 }
1626
1627 fn register_service(&mut self, mut info: ServiceInfo) {
1636 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1638 debug!("check_service_name_length: {}", &e);
1639 self.notify_monitors(DaemonEvent::Error(e));
1640 return;
1641 }
1642
1643 if info.is_addr_auto() {
1644 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1645 for addr in selected_addrs {
1646 info.insert_ipaddr(addr);
1647 }
1648 }
1649
1650 debug!("register service {:?}", &info);
1651
1652 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1653 if !outgoing_addrs.is_empty() {
1654 self.notify_monitors(DaemonEvent::Announce(
1655 info.get_fullname().to_string(),
1656 format!("{:?}", &outgoing_addrs),
1657 ));
1658 }
1659
1660 let service_fullname = info.get_fullname().to_lowercase();
1663 self.my_services.insert(service_fullname, info);
1664 }
1665
1666 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1669 let mut outgoing_addrs = Vec::new();
1670 let mut outgoing_intfs = HashSet::new();
1671
1672 for (if_index, intf) in self.my_intfs.iter() {
1673 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1674 Some(registry) => registry,
1675 None => self
1676 .dns_registry_map
1677 .entry(*if_index)
1678 .or_insert_with(DnsRegistry::new),
1679 };
1680
1681 let mut announced = false;
1682
1683 if announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo) {
1685 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1686 outgoing_addrs.push(addr.ip());
1687 }
1688 outgoing_intfs.insert(intf.index);
1689
1690 debug!(
1691 "Announce service IPv4 {} on {}",
1692 info.get_fullname(),
1693 intf.name
1694 );
1695 announced = true;
1696 }
1697
1698 if announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo) {
1699 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1700 outgoing_addrs.push(addr.ip());
1701 }
1702 outgoing_intfs.insert(intf.index);
1703
1704 debug!(
1705 "Announce service IPv6 {} on {}",
1706 info.get_fullname(),
1707 intf.name
1708 );
1709 announced = true;
1710 }
1711
1712 if announced {
1713 info.set_status(intf.index, ServiceStatus::Announced);
1714 } else {
1715 for timer in dns_registry.new_timers.drain(..) {
1716 self.timers.push(Reverse(timer));
1717 }
1718 info.set_status(*if_index, ServiceStatus::Probing);
1719 }
1720 }
1721
1722 let next_time = current_time_millis() + 1000;
1726 for if_index in outgoing_intfs {
1727 self.add_retransmission(
1728 next_time,
1729 Command::RegisterResend(info.get_fullname().to_string(), if_index),
1730 );
1731 }
1732
1733 outgoing_addrs
1734 }
1735
1736 fn probing_handler(&mut self) {
1738 let now = current_time_millis();
1739
1740 for (if_index, intf) in self.my_intfs.iter() {
1741 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1742 continue;
1743 };
1744
1745 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1746
1747 if !out.questions().is_empty() {
1749 trace!("sending out probing of questions: {:?}", out.questions());
1750 send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1751 send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1752 }
1753
1754 let waiting_services =
1756 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1757
1758 for service_name in waiting_services {
1759 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1761 if info.get_status(*if_index) == ServiceStatus::Announced {
1762 debug!("service {} already announced", info.get_fullname());
1763 continue;
1764 }
1765
1766 let announced_v4 =
1767 announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
1768 let announced_v6 =
1769 announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
1770
1771 if announced_v4 || announced_v6 {
1772 let next_time = now + 1000;
1773 let command =
1774 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1775 self.retransmissions.push(ReRun { next_time, command });
1776 self.timers.push(Reverse(next_time));
1777
1778 let fullname = match dns_registry.name_changes.get(&service_name) {
1779 Some(new_name) => new_name.to_string(),
1780 None => service_name.to_string(),
1781 };
1782
1783 let mut hostname = info.get_hostname();
1784 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1785 hostname = new_name;
1786 }
1787
1788 debug!("wake up: announce service {} on {}", fullname, intf.name);
1789 notify_monitors(
1790 &mut self.monitors,
1791 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1792 );
1793
1794 info.set_status(*if_index, ServiceStatus::Announced);
1795 }
1796 }
1797 }
1798 }
1799 }
1800
1801 fn unregister_service(
1802 &self,
1803 info: &ServiceInfo,
1804 intf: &MyIntf,
1805 sock: &PktInfoUdpSocket,
1806 ) -> Vec<u8> {
1807 let is_ipv4 = sock.domain() == Domain::IPV4;
1808
1809 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1810 out.add_answer_at_time(
1811 DnsPointer::new(
1812 info.get_type(),
1813 RRType::PTR,
1814 CLASS_IN,
1815 0,
1816 info.get_fullname().to_string(),
1817 ),
1818 0,
1819 );
1820
1821 if let Some(sub) = info.get_subtype() {
1822 trace!("Adding subdomain {}", sub);
1823 out.add_answer_at_time(
1824 DnsPointer::new(
1825 sub,
1826 RRType::PTR,
1827 CLASS_IN,
1828 0,
1829 info.get_fullname().to_string(),
1830 ),
1831 0,
1832 );
1833 }
1834
1835 out.add_answer_at_time(
1836 DnsSrv::new(
1837 info.get_fullname(),
1838 CLASS_IN | CLASS_CACHE_FLUSH,
1839 0,
1840 info.get_priority(),
1841 info.get_weight(),
1842 info.get_port(),
1843 info.get_hostname().to_string(),
1844 ),
1845 0,
1846 );
1847 out.add_answer_at_time(
1848 DnsTxt::new(
1849 info.get_fullname(),
1850 CLASS_IN | CLASS_CACHE_FLUSH,
1851 0,
1852 info.generate_txt(),
1853 ),
1854 0,
1855 );
1856
1857 let if_addrs = if is_ipv4 {
1858 info.get_addrs_on_my_intf_v4(intf)
1859 } else {
1860 info.get_addrs_on_my_intf_v6(intf)
1861 };
1862
1863 if if_addrs.is_empty() {
1864 return vec![];
1865 }
1866
1867 for address in if_addrs {
1868 out.add_answer_at_time(
1869 DnsAddress::new(
1870 info.get_hostname(),
1871 ip_address_rr_type(&address),
1872 CLASS_IN | CLASS_CACHE_FLUSH,
1873 0,
1874 address,
1875 intf.into(),
1876 ),
1877 0,
1878 );
1879 }
1880
1881 send_dns_outgoing(&out, intf, sock).remove(0)
1883 }
1884
1885 fn add_hostname_resolver(
1889 &mut self,
1890 hostname: String,
1891 listener: Sender<HostnameResolutionEvent>,
1892 timeout: Option<u64>,
1893 ) {
1894 let real_timeout = timeout.map(|t| current_time_millis() + t);
1895 self.hostname_resolvers
1896 .insert(hostname.to_lowercase(), (listener, real_timeout));
1897 if let Some(t) = real_timeout {
1898 self.add_timer(t);
1899 }
1900 }
1901
1902 fn send_query(&self, name: &str, qtype: RRType) {
1904 self.send_query_vec(&[(name, qtype)]);
1905 }
1906
1907 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1909 trace!("Sending query questions: {:?}", questions);
1910 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1911 let now = current_time_millis();
1912
1913 for (name, qtype) in questions {
1914 out.add_question(name, *qtype);
1915
1916 for record in self.cache.get_known_answers(name, *qtype, now) {
1917 trace!("add known answer: {:?}", record.record);
1925 let mut new_record = record.record.clone();
1926 new_record.get_record_mut().update_ttl(now);
1927 out.add_answer_box(new_record);
1928 }
1929 }
1930
1931 for (_, intf) in self.my_intfs.iter() {
1932 send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1933 send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1934 }
1935 }
1936
1937 fn handle_read(&mut self, event_key: usize) -> bool {
1942 let sock = match event_key {
1943 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
1944 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
1945 _ => {
1946 debug!("handle_read: unknown token {}", event_key);
1947 return false;
1948 }
1949 };
1950 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1951
1952 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
1959 Ok(sz) => sz,
1960 Err(e) => {
1961 if e.kind() != std::io::ErrorKind::WouldBlock {
1962 debug!("listening socket read failed: {}", e);
1963 }
1964 return false;
1965 }
1966 };
1967
1968 let pkt_if_index = pktinfo.if_index as u32;
1970 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
1971 debug!(
1972 "handle_read: no interface found for pktinfo if_index: {}",
1973 pktinfo.if_index
1974 );
1975 return true; };
1977
1978 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
1981 Ok(msg) => {
1982 if msg.is_query() {
1983 self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
1984 } else if msg.is_response() {
1985 self.handle_response(msg, pkt_if_index);
1986 } else {
1987 debug!("Invalid message: not query and not response");
1988 }
1989 }
1990 Err(e) => debug!("Invalid incoming DNS message: {}", e),
1991 }
1992
1993 true
1994 }
1995
1996 fn query_unresolved(&mut self, instance: &str) -> bool {
1998 if !valid_instance_name(instance) {
1999 trace!("instance name {} not valid", instance);
2000 return false;
2001 }
2002
2003 if let Some(records) = self.cache.get_srv(instance) {
2004 for record in records {
2005 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2006 if self.cache.get_addr(srv.host()).is_none() {
2007 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2008 return true;
2009 }
2010 }
2011 }
2012 } else {
2013 self.send_query(instance, RRType::ANY);
2014 return true;
2015 }
2016
2017 false
2018 }
2019
2020 fn query_cache_for_service(
2023 &mut self,
2024 ty_domain: &str,
2025 sender: &Sender<ServiceEvent>,
2026 now: u64,
2027 ) {
2028 let mut resolved: HashSet<String> = HashSet::new();
2029 let mut unresolved: HashSet<String> = HashSet::new();
2030
2031 if let Some(records) = self.cache.get_ptr(ty_domain) {
2032 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2033 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2034 let mut new_event = None;
2035 if self.use_service_data {
2036 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2037 Ok(resolved_service) => {
2038 if resolved_service.is_valid() {
2039 debug!("Resolved service from cache: {}", ptr.alias());
2040 new_event =
2041 Some(ServiceEvent::ServiceData(Box::new(resolved_service)));
2042 } else {
2043 debug!("Resolved service is not valid: {}", ptr.alias());
2044 }
2045 }
2046 Err(err) => {
2047 debug!("Error while resolving service from cache: {}", err);
2048 continue;
2049 }
2050 }
2051 } else {
2052 match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
2054 Ok(info) => {
2055 if info.is_ready() {
2056 new_event = Some(ServiceEvent::ServiceResolved(info));
2057 } else {
2058 debug!("Service info is not ready: {}", ptr.alias());
2059 }
2060 }
2061 Err(err) => {
2062 debug!("Error while creating service info from cache: {}", err);
2063 continue;
2064 }
2065 }
2066 }
2067
2068 match sender.send(ServiceEvent::ServiceFound(
2069 ty_domain.to_string(),
2070 ptr.alias().to_string(),
2071 )) {
2072 Ok(()) => debug!("send service found {}", ptr.alias()),
2073 Err(e) => {
2074 debug!("failed to send service found: {}", e);
2075 continue;
2076 }
2077 }
2078
2079 if let Some(event) = new_event {
2080 resolved.insert(ptr.alias().to_string());
2081 match sender.send(event) {
2082 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2083 Err(e) => debug!("failed to send service resolved: {}", e),
2084 }
2085 } else {
2086 unresolved.insert(ptr.alias().to_string());
2087 }
2088 }
2089 }
2090 }
2091
2092 for instance in resolved.drain() {
2093 self.pending_resolves.remove(&instance);
2094 self.resolved.insert(instance);
2095 }
2096
2097 for instance in unresolved.drain() {
2098 self.add_pending_resolve(instance);
2099 }
2100 }
2101
2102 fn query_cache_for_hostname(
2105 &mut self,
2106 hostname: &str,
2107 sender: Sender<HostnameResolutionEvent>,
2108 ) {
2109 let addresses_map = self.cache.get_addresses_for_host(hostname);
2110 for (name, addresses) in addresses_map {
2111 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2112 Ok(()) => trace!("sent hostname addresses found"),
2113 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2114 }
2115 }
2116 }
2117
2118 fn add_pending_resolve(&mut self, instance: String) {
2119 if !self.pending_resolves.contains(&instance) {
2120 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2121 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2122 self.pending_resolves.insert(instance);
2123 }
2124 }
2125
2126 fn create_service_info_from_cache(
2127 &self,
2128 ty_domain: &str,
2129 fullname: &str,
2130 ) -> Result<ServiceInfo> {
2131 let my_name = {
2132 let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
2133 name.strip_suffix('.').unwrap_or(name).to_string()
2134 };
2135
2136 let now = current_time_millis();
2137 let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
2138
2139 if let Some(subtype) = self.cache.get_subtype(fullname) {
2141 trace!(
2142 "ty_domain: {} found subtype {} for instance: {}",
2143 ty_domain,
2144 subtype,
2145 fullname
2146 );
2147 if info.get_subtype().is_none() {
2148 info.set_subtype(subtype.clone());
2149 }
2150 }
2151
2152 if let Some(records) = self.cache.get_srv(fullname) {
2154 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2155 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2156 info.set_hostname(dns_srv.host().to_string());
2157 info.set_port(dns_srv.port());
2158 }
2159 }
2160 }
2161
2162 if let Some(records) = self.cache.get_txt(fullname) {
2164 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2165 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2166 info.set_properties_from_txt(dns_txt.text());
2167 }
2168 }
2169 }
2170
2171 if let Some(records) = self.cache.get_addr(info.get_hostname()) {
2173 for answer in records.iter() {
2174 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2175 if dns_a.expires_soon(now) {
2176 trace!(
2177 "Addr expired or expires soon: {}",
2178 dns_a.address().to_ip_addr()
2179 );
2180 } else {
2181 info.insert_ipaddr(dns_a.address().to_ip_addr());
2182 }
2183 }
2184 }
2185 }
2186
2187 Ok(info)
2188 }
2189
2190 fn resolve_service_from_cache(
2192 &self,
2193 ty_domain: &str,
2194 fullname: &str,
2195 ) -> Result<ResolvedService> {
2196 let now = current_time_millis();
2197 let mut resolved_service = ResolvedService {
2198 ty_domain: ty_domain.to_string(),
2199 sub_ty_domain: None,
2200 fullname: fullname.to_string(),
2201 host: String::new(),
2202 port: 0,
2203 addresses: HashSet::new(),
2204 txt_properties: TxtProperties::new(),
2205 };
2206
2207 if let Some(subtype) = self.cache.get_subtype(fullname) {
2209 trace!(
2210 "ty_domain: {} found subtype {} for instance: {}",
2211 ty_domain,
2212 subtype,
2213 fullname
2214 );
2215 if resolved_service.sub_ty_domain.is_none() {
2216 resolved_service.sub_ty_domain = Some(subtype.to_string());
2217 }
2218 }
2219
2220 if let Some(records) = self.cache.get_srv(fullname) {
2222 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2223 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2224 resolved_service.host = dns_srv.host().to_string();
2225 resolved_service.port = dns_srv.port();
2226 }
2227 }
2228 }
2229
2230 if let Some(records) = self.cache.get_txt(fullname) {
2232 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2233 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2234 resolved_service.txt_properties = dns_txt.text().into();
2235 }
2236 }
2237 }
2238
2239 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2241 for answer in records.iter() {
2242 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2243 if dns_a.expires_soon(now) {
2244 trace!(
2245 "Addr expired or expires soon: {}",
2246 dns_a.address().to_ip_addr()
2247 );
2248 } else {
2249 resolved_service.addresses.insert(dns_a.address());
2250 }
2251 }
2252 }
2253 }
2254
2255 Ok(resolved_service)
2256 }
2257
2258 fn handle_poller_events(&mut self, events: &mio::Events) {
2259 for ev in events.iter() {
2260 trace!("event received with key {:?}", ev.token());
2261 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2262 self.signal_sock_drain();
2264
2265 if let Err(e) = self.poller.registry().reregister(
2266 &mut self.signal_sock,
2267 ev.token(),
2268 mio::Interest::READABLE,
2269 ) {
2270 debug!("failed to modify poller for signal socket: {}", e);
2271 }
2272 continue; }
2274
2275 while self.handle_read(ev.token().0) {}
2277
2278 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2280 if let Err(e) = self.poller.registry().reregister(
2282 &mut self.ipv4_sock,
2283 ev.token(),
2284 mio::Interest::READABLE,
2285 ) {
2286 debug!("modify poller for IPv4 socket: {}", e);
2287 }
2288 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2289 if let Err(e) = self.poller.registry().reregister(
2291 &mut self.ipv6_sock,
2292 ev.token(),
2293 mio::Interest::READABLE,
2294 ) {
2295 debug!("modify poller for IPv6 socket: {}", e);
2296 }
2297 }
2298 }
2299 }
2300
2301 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2304 let now = current_time_millis();
2305
2306 let mut record_predicate = |record: &DnsRecordBox| {
2308 if !record.get_record().is_expired(now) {
2309 return true;
2310 }
2311
2312 debug!("record is expired, removing it from cache.");
2313 if self.cache.remove(record) {
2314 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2316 call_service_listener(
2317 &self.service_queriers,
2318 dns_ptr.get_name(),
2319 ServiceEvent::ServiceRemoved(
2320 dns_ptr.get_name().to_string(),
2321 dns_ptr.alias().to_string(),
2322 ),
2323 );
2324 }
2325 }
2326 false
2327 };
2328 msg.answers_mut().retain(&mut record_predicate);
2329 msg.authorities_mut().retain(&mut record_predicate);
2330 msg.additionals_mut().retain(&mut record_predicate);
2331
2332 self.conflict_handler(&msg, if_index);
2334
2335 let mut is_for_us = true; for answer in msg.answers() {
2342 if answer.get_type() == RRType::PTR {
2343 if self.service_queriers.contains_key(answer.get_name()) {
2344 is_for_us = true;
2345 break; } else {
2347 is_for_us = false;
2348 }
2349 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2350 let answer_lowercase = answer.get_name().to_lowercase();
2352 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2353 is_for_us = true;
2354 break; }
2356 }
2357 }
2358
2359 struct InstanceChange {
2361 ty: RRType, name: String, }
2364
2365 let mut changes = Vec::new();
2373 let mut timers = Vec::new();
2374 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2375 return;
2376 };
2377 for record in msg.all_records() {
2378 match self
2379 .cache
2380 .add_or_update(my_intf, record, &mut timers, is_for_us)
2381 {
2382 Some((dns_record, true)) => {
2383 timers.push(dns_record.record.get_record().get_expire_time());
2384 timers.push(dns_record.record.get_record().get_refresh_time());
2385
2386 let ty = dns_record.record.get_type();
2387 let name = dns_record.record.get_name();
2388
2389 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2391 if self.service_queriers.contains_key(name) {
2392 timers.push(dns_record.record.get_record().get_refresh_time());
2393 }
2394
2395 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2397 {
2398 call_service_listener(
2399 &self.service_queriers,
2400 name,
2401 ServiceEvent::ServiceFound(
2402 name.to_string(),
2403 dns_ptr.alias().to_string(),
2404 ),
2405 );
2406 changes.push(InstanceChange {
2407 ty,
2408 name: dns_ptr.alias().to_string(),
2409 });
2410 }
2411 } else {
2412 changes.push(InstanceChange {
2413 ty,
2414 name: name.to_string(),
2415 });
2416 }
2417 }
2418 Some((dns_record, false)) => {
2419 timers.push(dns_record.record.get_record().get_expire_time());
2420 timers.push(dns_record.record.get_record().get_refresh_time());
2421 }
2422 _ => {}
2423 }
2424 }
2425
2426 for t in timers {
2428 self.add_timer(t);
2429 }
2430
2431 for change in changes
2433 .iter()
2434 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2435 {
2436 let addr_map = self.cache.get_addresses_for_host(&change.name);
2437 for (name, addresses) in addr_map {
2438 call_hostname_resolution_listener(
2439 &self.hostname_resolvers,
2440 &change.name,
2441 HostnameResolutionEvent::AddressesFound(name, addresses),
2442 )
2443 }
2444 }
2445
2446 let mut updated_instances = HashSet::new();
2448 for update in changes {
2449 match update.ty {
2450 RRType::PTR | RRType::SRV | RRType::TXT => {
2451 updated_instances.insert(update.name);
2452 }
2453 RRType::A | RRType::AAAA => {
2454 let instances = self.cache.get_instances_on_host(&update.name);
2455 updated_instances.extend(instances);
2456 }
2457 _ => {}
2458 }
2459 }
2460
2461 self.resolve_updated_instances(&updated_instances);
2462 }
2463
2464 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2465 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2466 debug!("handle_response: no intf found for index {if_index}");
2467 return;
2468 };
2469
2470 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2471 return;
2472 };
2473
2474 for answer in msg.answers().iter() {
2475 let mut new_records = Vec::new();
2476
2477 let name = answer.get_name();
2478 let Some(probe) = dns_registry.probing.get_mut(name) else {
2479 continue;
2480 };
2481
2482 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2484 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2485 if answer_addr.interface_id.index != if_index {
2486 debug!(
2487 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2488 answer_addr, my_intf.name
2489 );
2490 continue;
2491 }
2492 }
2493
2494 let any_match = probe.records.iter().any(|r| {
2497 r.get_type() == answer.get_type()
2498 && r.get_class() == answer.get_class()
2499 && r.rrdata_match(answer.as_ref())
2500 });
2501 if any_match {
2502 continue; }
2504 }
2505
2506 probe.records.retain(|record| {
2507 if record.get_type() == answer.get_type()
2508 && record.get_class() == answer.get_class()
2509 && !record.rrdata_match(answer.as_ref())
2510 {
2511 debug!(
2512 "found conflict name: '{name}' record: {}: {} PEER: {}",
2513 record.get_type(),
2514 record.rdata_print(),
2515 answer.rdata_print()
2516 );
2517
2518 let mut new_record = record.clone();
2521 let new_name = match record.get_type() {
2522 RRType::A => hostname_change(name),
2523 RRType::AAAA => hostname_change(name),
2524 _ => name_change(name),
2525 };
2526 new_record.get_record_mut().set_new_name(new_name);
2527 new_records.push(new_record);
2528 return false; }
2530
2531 true
2532 });
2533
2534 let create_time = current_time_millis() + fastrand::u64(0..250);
2541
2542 let waiting_services = probe.waiting_services.clone();
2543
2544 for record in new_records {
2545 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2546 self.timers.push(Reverse(create_time));
2547 }
2548
2549 dns_registry.name_changes.insert(
2551 record.get_record().get_original_name().to_string(),
2552 record.get_name().to_string(),
2553 );
2554
2555 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2556 Some(p) => p,
2557 None => {
2558 let new_probe = dns_registry
2559 .probing
2560 .entry(record.get_name().to_string())
2561 .or_insert_with(|| {
2562 debug!("conflict handler: new probe of {}", record.get_name());
2563 Probe::new(create_time)
2564 });
2565 self.timers.push(Reverse(new_probe.next_send));
2566 new_probe
2567 }
2568 };
2569
2570 debug!(
2571 "insert record with new name '{}' {} into probe",
2572 record.get_name(),
2573 record.get_type()
2574 );
2575 new_probe.insert_record(record);
2576
2577 new_probe.waiting_services.extend(waiting_services.clone());
2578 }
2579 }
2580 }
2581
2582 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2589 let mut resolved: HashSet<String> = HashSet::new();
2590 let mut unresolved: HashSet<String> = HashSet::new();
2591 let mut removed_instances = HashMap::new();
2592
2593 let now = current_time_millis();
2594
2595 for (ty_domain, records) in self.cache.all_ptr().iter() {
2596 if !self.service_queriers.contains_key(ty_domain) {
2597 continue;
2599 }
2600
2601 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2602 if let Some(dns_ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2603 if updated_instances.contains(dns_ptr.alias()) {
2604 let mut instance_found = false;
2605 let mut new_event = None;
2606
2607 if self.use_service_data {
2608 if let Ok(resolved) =
2609 self.resolve_service_from_cache(ty_domain, dns_ptr.alias())
2610 {
2611 debug!(
2612 "resolve_updated_instances: from cache: {}",
2613 dns_ptr.alias()
2614 );
2615 instance_found = true;
2616 if resolved.is_valid() {
2617 new_event = Some(ServiceEvent::ServiceData(Box::new(resolved)));
2618 } else {
2619 debug!("Resolved service is not valid: {}", dns_ptr.alias());
2620 }
2621 }
2622 } else if let Ok(info) =
2623 self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2624 {
2625 instance_found = true;
2626 if info.is_ready() {
2627 new_event = Some(ServiceEvent::ServiceResolved(info));
2628 } else {
2629 debug!("Service info is not ready: {}", dns_ptr.alias());
2630 }
2631 }
2632
2633 if instance_found {
2634 if let Some(event) = new_event {
2635 debug!("call queriers to resolve {}", dns_ptr.alias());
2636 resolved.insert(dns_ptr.alias().to_string());
2637 call_service_listener(&self.service_queriers, ty_domain, event);
2638 } else {
2639 if self.resolved.remove(dns_ptr.alias()) {
2640 removed_instances
2641 .entry(ty_domain.to_string())
2642 .or_insert_with(HashSet::new)
2643 .insert(dns_ptr.alias().to_string());
2644 }
2645 unresolved.insert(dns_ptr.alias().to_string());
2646 }
2647 }
2648 }
2649 }
2650 }
2651 }
2652
2653 for instance in resolved.drain() {
2654 self.pending_resolves.remove(&instance);
2655 self.resolved.insert(instance);
2656 }
2657
2658 for instance in unresolved.drain() {
2659 self.add_pending_resolve(instance);
2660 }
2661
2662 if !removed_instances.is_empty() {
2663 debug!(
2664 "resolve_updated_instances: removed {}",
2665 &removed_instances.len()
2666 );
2667 self.notify_service_removal(removed_instances);
2668 }
2669 }
2670
2671 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2673 let sock = if is_ipv4 {
2674 &self.ipv4_sock
2675 } else {
2676 &self.ipv6_sock
2677 };
2678 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2679
2680 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2683
2684 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2685 debug!("missing dns registry for intf {}", if_index);
2686 return;
2687 };
2688
2689 let Some(intf) = self.my_intfs.get(&if_index) else {
2690 return;
2691 };
2692
2693 for question in msg.questions().iter() {
2694 let qtype = question.entry_type();
2695
2696 if qtype == RRType::PTR {
2697 for service in self.my_services.values() {
2698 if service.get_status(if_index) != ServiceStatus::Announced {
2699 continue;
2700 }
2701
2702 if question.entry_name() == service.get_type()
2703 || service
2704 .get_subtype()
2705 .as_ref()
2706 .is_some_and(|v| v == question.entry_name())
2707 {
2708 add_answer_with_additionals(
2709 &mut out,
2710 &msg,
2711 service,
2712 intf,
2713 dns_registry,
2714 is_ipv4,
2715 );
2716 } else if question.entry_name() == META_QUERY {
2717 let ptr_added = out.add_answer(
2718 &msg,
2719 DnsPointer::new(
2720 question.entry_name(),
2721 RRType::PTR,
2722 CLASS_IN,
2723 service.get_other_ttl(),
2724 service.get_type().to_string(),
2725 ),
2726 );
2727 if !ptr_added {
2728 trace!("answer was not added for meta-query {:?}", &question);
2729 }
2730 }
2731 }
2732 } else {
2733 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2735 let probe_name = question.entry_name();
2736
2737 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2738 let now = current_time_millis();
2739
2740 if probe.start_time < now {
2744 let incoming_records: Vec<_> = msg
2745 .authorities()
2746 .iter()
2747 .filter(|r| r.get_name() == probe_name)
2748 .collect();
2749
2750 probe.tiebreaking(&incoming_records, now, probe_name);
2751 }
2752 }
2753 }
2754
2755 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2756 for service in self.my_services.values() {
2757 if service.get_status(if_index) != ServiceStatus::Announced {
2758 continue;
2759 }
2760
2761 let service_hostname =
2762 match dns_registry.name_changes.get(service.get_hostname()) {
2763 Some(new_name) => new_name,
2764 None => service.get_hostname(),
2765 };
2766
2767 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2768 let intf_addrs = if is_ipv4 {
2769 service.get_addrs_on_my_intf_v4(intf)
2770 } else {
2771 service.get_addrs_on_my_intf_v6(intf)
2772 };
2773 if intf_addrs.is_empty()
2774 && (qtype == RRType::A || qtype == RRType::AAAA)
2775 {
2776 let t = match qtype {
2777 RRType::A => "TYPE_A",
2778 RRType::AAAA => "TYPE_AAAA",
2779 _ => "invalid_type",
2780 };
2781 trace!(
2782 "Cannot find valid addrs for {} response on intf {:?}",
2783 t,
2784 &intf
2785 );
2786 return;
2787 }
2788 for address in intf_addrs {
2789 out.add_answer(
2790 &msg,
2791 DnsAddress::new(
2792 service_hostname,
2793 ip_address_rr_type(&address),
2794 CLASS_IN | CLASS_CACHE_FLUSH,
2795 service.get_host_ttl(),
2796 address,
2797 intf.into(),
2798 ),
2799 );
2800 }
2801 }
2802 }
2803 }
2804
2805 let query_name = question.entry_name().to_lowercase();
2806 let service_opt = self
2807 .my_services
2808 .iter()
2809 .find(|(k, _v)| {
2810 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2811 Some(new_name) => new_name,
2812 None => k,
2813 };
2814 service_name == &query_name
2815 })
2816 .map(|(_, v)| v);
2817
2818 let Some(service) = service_opt else {
2819 continue;
2820 };
2821
2822 if service.get_status(if_index) != ServiceStatus::Announced {
2823 continue;
2824 }
2825
2826 let intf_addrs = if is_ipv4 {
2827 service.get_addrs_on_my_intf_v4(intf)
2828 } else {
2829 service.get_addrs_on_my_intf_v6(intf)
2830 };
2831 if intf_addrs.is_empty() {
2832 debug!(
2833 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2834 &intf
2835 );
2836 continue;
2837 }
2838
2839 add_answer_of_service(
2840 &mut out,
2841 &msg,
2842 question.entry_name(),
2843 service,
2844 qtype,
2845 intf_addrs,
2846 );
2847 }
2848 }
2849
2850 if !out.answers_count() > 0 {
2851 out.set_id(msg.id());
2852 send_dns_outgoing(&out, intf, &sock.pktinfo);
2853
2854 let if_name = intf.name.clone();
2855
2856 self.increase_counter(Counter::Respond, 1);
2857 self.notify_monitors(DaemonEvent::Respond(if_name));
2858 }
2859
2860 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2861 }
2862
2863 fn increase_counter(&mut self, counter: Counter, count: i64) {
2865 let key = counter.to_string();
2866 match self.counters.get_mut(&key) {
2867 Some(v) => *v += count,
2868 None => {
2869 self.counters.insert(key, count);
2870 }
2871 }
2872 }
2873
2874 fn set_counter(&mut self, counter: Counter, count: i64) {
2876 let key = counter.to_string();
2877 self.counters.insert(key, count);
2878 }
2879
2880 fn signal_sock_drain(&self) {
2881 let mut signal_buf = [0; 1024];
2882
2883 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2885 trace!(
2886 "signal socket recvd: {}",
2887 String::from_utf8_lossy(&signal_buf[0..sz])
2888 );
2889 }
2890 }
2891
2892 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2893 self.retransmissions.push(ReRun { next_time, command });
2894 self.add_timer(next_time);
2895 }
2896
2897 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2900 for (ty_domain, sender) in self.service_queriers.iter() {
2901 if let Some(instances) = expired.get(ty_domain) {
2902 for instance_name in instances {
2903 let event = ServiceEvent::ServiceRemoved(
2904 ty_domain.to_string(),
2905 instance_name.to_string(),
2906 );
2907 match sender.send(event) {
2908 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2909 Err(e) => debug!("Failed to send event: {}", e),
2910 }
2911 }
2912 }
2913 }
2914 }
2915
2916 fn exec_command(&mut self, command: Command, repeating: bool) {
2920 trace!("exec_command: {:?} repeating: {}", &command, repeating);
2921 match command {
2922 Command::Browse(ty, next_delay, listener) => {
2923 self.exec_command_browse(repeating, ty, next_delay, listener);
2924 }
2925
2926 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2927 self.exec_command_resolve_hostname(
2928 repeating, hostname, next_delay, listener, timeout,
2929 );
2930 }
2931
2932 Command::Register(service_info) => {
2933 self.register_service(service_info);
2934 self.increase_counter(Counter::Register, 1);
2935 }
2936
2937 Command::RegisterResend(fullname, intf) => {
2938 trace!("register-resend service: {fullname} on {}", &intf);
2939 self.exec_command_register_resend(fullname, intf);
2940 }
2941
2942 Command::Unregister(fullname, resp_s) => {
2943 trace!("unregister service {} repeat {}", &fullname, &repeating);
2944 self.exec_command_unregister(repeating, fullname, resp_s);
2945 }
2946
2947 Command::UnregisterResend(packet, if_index, is_ipv4) => {
2948 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2949 }
2950
2951 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2952
2953 Command::StopResolveHostname(hostname) => {
2954 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2955 }
2956
2957 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2958
2959 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2960
2961 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2962 Ok(()) => trace!("Sent status to the client"),
2963 Err(e) => debug!("Failed to send status: {}", e),
2964 },
2965
2966 Command::Monitor(resp_s) => {
2967 self.monitors.push(resp_s);
2968 }
2969
2970 Command::SetOption(daemon_opt) => {
2971 self.process_set_option(daemon_opt);
2972 }
2973
2974 Command::GetOption(resp_s) => {
2975 let val = DaemonOptionVal {
2976 _service_name_len_max: self.service_name_len_max,
2977 ip_check_interval: self.ip_check_interval,
2978 };
2979 if let Err(e) = resp_s.send(val) {
2980 debug!("Failed to send options: {}", e);
2981 }
2982 }
2983
2984 Command::Verify(instance_fullname, timeout) => {
2985 self.exec_command_verify(instance_fullname, timeout, repeating);
2986 }
2987
2988 _ => {
2989 debug!("unexpected command: {:?}", &command);
2990 }
2991 }
2992 }
2993
2994 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2995 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2996 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2997 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2998 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2999 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3000 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3001 self.set_counter(Counter::Timer, self.timers.len() as i64);
3002
3003 let dns_registry_probe_count: usize = self
3004 .dns_registry_map
3005 .values()
3006 .map(|r| r.probing.len())
3007 .sum();
3008 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3009
3010 let dns_registry_active_count: usize = self
3011 .dns_registry_map
3012 .values()
3013 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3014 .sum();
3015 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3016
3017 let dns_registry_timer_count: usize = self
3018 .dns_registry_map
3019 .values()
3020 .map(|r| r.new_timers.len())
3021 .sum();
3022 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3023
3024 let dns_registry_name_change_count: usize = self
3025 .dns_registry_map
3026 .values()
3027 .map(|r| r.name_changes.len())
3028 .sum();
3029 self.set_counter(
3030 Counter::DnsRegistryNameChange,
3031 dns_registry_name_change_count as i64,
3032 );
3033
3034 if let Err(e) = resp_s.send(self.counters.clone()) {
3036 debug!("Failed to send metrics: {}", e);
3037 }
3038 }
3039
3040 fn exec_command_browse(
3041 &mut self,
3042 repeating: bool,
3043 ty: String,
3044 next_delay: u32,
3045 listener: Sender<ServiceEvent>,
3046 ) {
3047 let pretty_addrs: Vec<String> = self
3048 .my_intfs
3049 .iter()
3050 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3051 .collect();
3052
3053 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3054 "{ty} on {} interfaces [{}]",
3055 pretty_addrs.len(),
3056 pretty_addrs.join(", ")
3057 ))) {
3058 debug!(
3059 "Failed to send SearchStarted({})(repeating:{}): {}",
3060 &ty, repeating, e
3061 );
3062 return;
3063 }
3064
3065 let now = current_time_millis();
3066 if !repeating {
3067 self.service_queriers.insert(ty.clone(), listener.clone());
3071
3072 self.query_cache_for_service(&ty, &listener, now);
3074 }
3075
3076 self.send_query(&ty, RRType::PTR);
3077 self.increase_counter(Counter::Browse, 1);
3078
3079 let next_time = now + (next_delay * 1000) as u64;
3080 let max_delay = 60 * 60;
3081 let delay = cmp::min(next_delay * 2, max_delay);
3082 self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
3083 }
3084
3085 fn exec_command_resolve_hostname(
3086 &mut self,
3087 repeating: bool,
3088 hostname: String,
3089 next_delay: u32,
3090 listener: Sender<HostnameResolutionEvent>,
3091 timeout: Option<u64>,
3092 ) {
3093 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3094 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3095 "{} on addrs {:?}",
3096 &hostname, &addr_list
3097 ))) {
3098 debug!(
3099 "Failed to send ResolveStarted({})(repeating:{}): {}",
3100 &hostname, repeating, e
3101 );
3102 return;
3103 }
3104 if !repeating {
3105 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3106 self.query_cache_for_hostname(&hostname, listener.clone());
3108 }
3109
3110 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3111 self.increase_counter(Counter::ResolveHostname, 1);
3112
3113 let now = current_time_millis();
3114 let next_time = now + u64::from(next_delay) * 1000;
3115 let max_delay = 60 * 60;
3116 let delay = cmp::min(next_delay * 2, max_delay);
3117
3118 if self
3120 .hostname_resolvers
3121 .get(&hostname)
3122 .and_then(|(_sender, timeout)| *timeout)
3123 .map(|timeout| next_time < timeout)
3124 .unwrap_or(true)
3125 {
3126 self.add_retransmission(
3127 next_time,
3128 Command::ResolveHostname(hostname, delay, listener, None),
3129 );
3130 }
3131 }
3132
3133 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3134 let pending_query = self.query_unresolved(&instance);
3135 let max_try = 3;
3136 if pending_query && try_count < max_try {
3137 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3140 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3141 }
3142 }
3143
3144 fn exec_command_unregister(
3145 &mut self,
3146 repeating: bool,
3147 fullname: String,
3148 resp_s: Sender<UnregisterStatus>,
3149 ) {
3150 let response = match self.my_services.remove_entry(&fullname) {
3151 None => {
3152 debug!("unregister: cannot find such service {}", &fullname);
3153 UnregisterStatus::NotFound
3154 }
3155 Some((_k, info)) => {
3156 let mut timers = Vec::new();
3157
3158 for (if_index, intf) in self.my_intfs.iter() {
3159 let packet = self.unregister_service(&info, intf, &self.ipv4_sock.pktinfo);
3160 if !repeating && !packet.is_empty() {
3162 let next_time = current_time_millis() + 120;
3163 self.retransmissions.push(ReRun {
3164 next_time,
3165 command: Command::UnregisterResend(packet, *if_index, true),
3166 });
3167 timers.push(next_time);
3168 }
3169
3170 let packet = self.unregister_service(&info, intf, &self.ipv6_sock.pktinfo);
3173 if !repeating && !packet.is_empty() {
3174 let next_time = current_time_millis() + 120;
3175 self.retransmissions.push(ReRun {
3176 next_time,
3177 command: Command::UnregisterResend(packet, *if_index, false),
3178 });
3179 timers.push(next_time);
3180 }
3181 }
3182
3183 for t in timers {
3184 self.add_timer(t);
3185 }
3186
3187 self.increase_counter(Counter::Unregister, 1);
3188 UnregisterStatus::OK
3189 }
3190 };
3191 if let Err(e) = resp_s.send(response) {
3192 debug!("unregister: failed to send response: {}", e);
3193 }
3194 }
3195
3196 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3197 let Some(intf) = self.my_intfs.get(&if_index) else {
3198 return;
3199 };
3200 let sock = if is_ipv4 {
3201 &self.ipv4_sock.pktinfo
3202 } else {
3203 &self.ipv6_sock.pktinfo
3204 };
3205
3206 let if_addr = if is_ipv4 {
3207 match intf.next_ifaddr_v4() {
3208 Some(addr) => addr,
3209 None => return,
3210 }
3211 } else {
3212 match intf.next_ifaddr_v6() {
3213 Some(addr) => addr,
3214 None => return,
3215 }
3216 };
3217
3218 debug!("UnregisterResend from {:?}", if_addr);
3219 multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, sock);
3220
3221 self.increase_counter(Counter::UnregisterResend, 1);
3222 }
3223
3224 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3225 match self.service_queriers.remove_entry(&ty_domain) {
3226 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3227 Some((ty, sender)) => {
3228 trace!("StopBrowse: removed queryer for {}", &ty);
3230 let mut i = 0;
3231 while i < self.retransmissions.len() {
3232 if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
3233 if t == &ty {
3234 self.retransmissions.remove(i);
3235 trace!("StopBrowse: removed retransmission for {}", &ty);
3236 continue;
3237 }
3238 }
3239 i += 1;
3240 }
3241
3242 self.cache.remove_service_type(&ty_domain);
3244
3245 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3247 Ok(()) => trace!("Sent SearchStopped to the listener"),
3248 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3249 }
3250 }
3251 }
3252 }
3253
3254 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3255 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3256 trace!("StopResolve: removed queryer for {}", &host);
3258 let mut i = 0;
3259 while i < self.retransmissions.len() {
3260 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3261 if t == &host {
3262 self.retransmissions.remove(i);
3263 trace!("StopResolve: removed retransmission for {}", &host);
3264 continue;
3265 }
3266 }
3267 i += 1;
3268 }
3269
3270 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3272 Ok(()) => trace!("Sent SearchStopped to the listener"),
3273 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3274 }
3275 }
3276 }
3277
3278 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3279 let Some(info) = self.my_services.get_mut(&fullname) else {
3280 trace!("announce: cannot find such service {}", &fullname);
3281 return;
3282 };
3283
3284 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3285 return;
3286 };
3287
3288 let Some(intf) = self.my_intfs.get(&if_index) else {
3289 return;
3290 };
3291
3292 let announced_v4 =
3293 announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
3294 let announced_v6 =
3295 announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
3296
3297 if announced_v4 || announced_v6 {
3298 let mut hostname = info.get_hostname();
3299 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3300 hostname = new_name;
3301 }
3302 let service_name = match dns_registry.name_changes.get(&fullname) {
3303 Some(new_name) => new_name.to_string(),
3304 None => fullname,
3305 };
3306
3307 debug!("resend: announce service {service_name} on {}", intf.name);
3308
3309 notify_monitors(
3310 &mut self.monitors,
3311 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3312 );
3313 info.set_status(if_index, ServiceStatus::Announced);
3314 } else {
3315 debug!("register-resend should not fail");
3316 }
3317
3318 self.increase_counter(Counter::RegisterResend, 1);
3319 }
3320
3321 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3322 let now = current_time_millis();
3332 let expire_at = if repeating {
3333 None
3334 } else {
3335 Some(now + timeout.as_millis() as u64)
3336 };
3337
3338 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3340
3341 if !record_vec.is_empty() {
3342 let query_vec: Vec<(&str, RRType)> = record_vec
3343 .iter()
3344 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3345 .collect();
3346 self.send_query_vec(&query_vec);
3347
3348 if let Some(new_expire) = expire_at {
3349 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3353 }
3354 }
3355 }
3356
3357 fn refresh_active_services(&mut self) {
3359 let mut query_ptr_count = 0;
3360 let mut query_srv_count = 0;
3361 let mut new_timers = HashSet::new();
3362 let mut query_addr_count = 0;
3363
3364 for (ty_domain, _sender) in self.service_queriers.iter() {
3365 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3366 if !refreshed_timers.is_empty() {
3367 trace!("sending refresh query for PTR: {}", ty_domain);
3368 self.send_query(ty_domain, RRType::PTR);
3369 query_ptr_count += 1;
3370 new_timers.extend(refreshed_timers);
3371 }
3372
3373 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3374 for (instance, types) in instances {
3375 trace!("sending refresh query for: {}", &instance);
3376 let query_vec = types
3377 .into_iter()
3378 .map(|ty| (instance.as_str(), ty))
3379 .collect::<Vec<_>>();
3380 self.send_query_vec(&query_vec);
3381 query_srv_count += 1;
3382 }
3383 new_timers.extend(timers);
3384 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3385 for hostname in hostnames.iter() {
3386 trace!("sending refresh queries for A and AAAA: {}", hostname);
3387 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3388 query_addr_count += 2;
3389 }
3390 new_timers.extend(timers);
3391 }
3392
3393 for timer in new_timers {
3394 self.add_timer(timer);
3395 }
3396
3397 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3398 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3399 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3400 }
3401}
3402
3403fn add_answer_of_service(
3405 out: &mut DnsOutgoing,
3406 msg: &DnsIncoming,
3407 entry_name: &str,
3408 service: &ServiceInfo,
3409 qtype: RRType,
3410 intf_addrs: Vec<IpAddr>,
3411) {
3412 if qtype == RRType::SRV || qtype == RRType::ANY {
3413 out.add_answer(
3414 msg,
3415 DnsSrv::new(
3416 entry_name,
3417 CLASS_IN | CLASS_CACHE_FLUSH,
3418 service.get_host_ttl(),
3419 service.get_priority(),
3420 service.get_weight(),
3421 service.get_port(),
3422 service.get_hostname().to_string(),
3423 ),
3424 );
3425 }
3426
3427 if qtype == RRType::TXT || qtype == RRType::ANY {
3428 out.add_answer(
3429 msg,
3430 DnsTxt::new(
3431 entry_name,
3432 CLASS_IN | CLASS_CACHE_FLUSH,
3433 service.get_other_ttl(),
3434 service.generate_txt(),
3435 ),
3436 );
3437 }
3438
3439 if qtype == RRType::SRV {
3440 for address in intf_addrs {
3441 out.add_additional_answer(DnsAddress::new(
3442 service.get_hostname(),
3443 ip_address_rr_type(&address),
3444 CLASS_IN | CLASS_CACHE_FLUSH,
3445 service.get_host_ttl(),
3446 address,
3447 InterfaceId::default(),
3448 ));
3449 }
3450 }
3451}
3452
3453#[derive(Clone, Debug)]
3456#[non_exhaustive]
3457pub enum ServiceEvent {
3458 SearchStarted(String),
3460
3461 ServiceFound(String, String),
3463
3464 ServiceResolved(ServiceInfo),
3468
3469 ServiceData(Box<ResolvedService>),
3473
3474 ServiceRemoved(String, String),
3476
3477 SearchStopped(String),
3479}
3480
3481#[derive(Clone, Debug)]
3484#[non_exhaustive]
3485pub enum HostnameResolutionEvent {
3486 SearchStarted(String),
3488 AddressesFound(String, HashSet<ScopedIp>),
3490 AddressesRemoved(String, HashSet<ScopedIp>),
3492 SearchTimeout(String),
3494 SearchStopped(String),
3496}
3497
3498#[derive(Clone, Debug)]
3501#[non_exhaustive]
3502pub enum DaemonEvent {
3503 Announce(String, String),
3505
3506 Error(Error),
3508
3509 IpAdd(IpAddr),
3511
3512 IpDel(IpAddr),
3514
3515 NameChange(DnsNameChange),
3518
3519 Respond(String),
3521}
3522
3523#[derive(Clone, Debug)]
3526pub struct DnsNameChange {
3527 pub original: String,
3529
3530 pub new_name: String,
3540
3541 pub rr_type: RRType,
3543
3544 pub intf_name: String,
3546}
3547
3548#[derive(Debug)]
3550enum Command {
3551 Browse(String, u32, Sender<ServiceEvent>),
3553
3554 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
3559
3560 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3581
3582 GetStatus(Sender<DaemonStatus>),
3584
3585 Monitor(Sender<DaemonEvent>),
3587
3588 SetOption(DaemonOption),
3589
3590 GetOption(Sender<DaemonOptionVal>),
3591
3592 Verify(String, Duration),
3597
3598 Exit(Sender<DaemonStatus>),
3599}
3600
3601impl fmt::Display for Command {
3602 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3603 match self {
3604 Self::Browse(_, _, _) => write!(f, "Command Browse"),
3605 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3606 Self::Exit(_) => write!(f, "Command Exit"),
3607 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3608 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3609 Self::Monitor(_) => write!(f, "Command Monitor"),
3610 Self::Register(_) => write!(f, "Command Register"),
3611 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3612 Self::SetOption(_) => write!(f, "Command SetOption"),
3613 Self::GetOption(_) => write!(f, "Command GetOption"),
3614 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3615 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3616 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3617 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3618 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3619 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3620 }
3621 }
3622}
3623
3624struct DaemonOptionVal {
3625 _service_name_len_max: u8,
3626 ip_check_interval: u64,
3627}
3628
3629#[derive(Debug)]
3630enum DaemonOption {
3631 ServiceNameLenMax(u8),
3632 IpCheckInterval(u64),
3633 EnableInterface(Vec<IfKind>),
3634 DisableInterface(Vec<IfKind>),
3635 MulticastLoopV4(bool),
3636 MulticastLoopV6(bool),
3637 UseServiceData(bool),
3638 #[cfg(test)]
3639 TestDownInterface(String),
3640 #[cfg(test)]
3641 TestUpInterface(String),
3642}
3643
3644const DOMAIN_LEN: usize = "._tcp.local.".len();
3646
3647fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3649 if ty_domain.len() <= DOMAIN_LEN + 1 {
3650 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3652 }
3653
3654 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3656 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3657 }
3658 Ok(())
3659}
3660
3661fn check_domain_suffix(name: &str) -> Result<()> {
3663 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3664 return Err(e_fmt!(
3665 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3666 name
3667 ));
3668 }
3669
3670 Ok(())
3671}
3672
3673fn check_service_name(fullname: &str) -> Result<()> {
3681 check_domain_suffix(fullname)?;
3682
3683 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3684 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3685
3686 if &name[0..1] != "_" {
3687 return Err(e_fmt!("Service name must start with '_'"));
3688 }
3689
3690 let name = &name[1..];
3691
3692 if name.contains("--") {
3693 return Err(e_fmt!("Service name must not contain '--'"));
3694 }
3695
3696 if name.starts_with('-') || name.ends_with('-') {
3697 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3698 }
3699
3700 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3701 if ascii_count < 1 {
3702 return Err(e_fmt!(
3703 "Service name must contain at least one letter (eg: 'A-Za-z')"
3704 ));
3705 }
3706
3707 Ok(())
3708}
3709
3710fn check_hostname(hostname: &str) -> Result<()> {
3712 if !hostname.ends_with(".local.") {
3713 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3714 }
3715
3716 if hostname == ".local." {
3717 return Err(e_fmt!(
3718 "The part of the hostname before '.local.' cannot be empty"
3719 ));
3720 }
3721
3722 if hostname.len() > 255 {
3723 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3724 }
3725
3726 Ok(())
3727}
3728
3729fn call_service_listener(
3730 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3731 ty_domain: &str,
3732 event: ServiceEvent,
3733) {
3734 if let Some(listener) = listeners_map.get(ty_domain) {
3735 match listener.send(event) {
3736 Ok(()) => trace!("Sent event to listener successfully"),
3737 Err(e) => debug!("Failed to send event: {}", e),
3738 }
3739 }
3740}
3741
3742fn call_hostname_resolution_listener(
3743 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3744 hostname: &str,
3745 event: HostnameResolutionEvent,
3746) {
3747 let hostname_lower = hostname.to_lowercase();
3748 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3749 match listener.send(event) {
3750 Ok(()) => trace!("Sent event to listener successfully"),
3751 Err(e) => debug!("Failed to send event: {}", e),
3752 }
3753 }
3754}
3755
3756fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3759 if_addrs::get_if_addrs()
3760 .unwrap_or_default()
3761 .into_iter()
3762 .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3763 .collect()
3764}
3765
3766fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3767 let if_name = &my_intf.name;
3768
3769 let if_addr = if sock.domain() == Domain::IPV4 {
3770 match my_intf.next_ifaddr_v4() {
3771 Some(addr) => addr,
3772 None => return vec![],
3773 }
3774 } else {
3775 match my_intf.next_ifaddr_v6() {
3776 Some(addr) => addr,
3777 None => return vec![],
3778 }
3779 };
3780
3781 send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3782}
3783
3784fn send_dns_outgoing_impl(
3786 out: &DnsOutgoing,
3787 if_name: &str,
3788 if_index: u32,
3789 if_addr: &IfAddr,
3790 sock: &PktInfoUdpSocket,
3791) -> Vec<Vec<u8>> {
3792 let qtype = if out.is_query() {
3793 "query"
3794 } else {
3795 if out.answers_count() == 0 && out.additionals().is_empty() {
3796 return vec![]; }
3798 "response"
3799 };
3800 trace!(
3801 "send {}: {} questions {} answers {} authorities {} additional",
3802 qtype,
3803 out.questions().len(),
3804 out.answers_count(),
3805 out.authorities().len(),
3806 out.additionals().len()
3807 );
3808
3809 match if_addr.ip() {
3810 IpAddr::V4(ipv4) => {
3811 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3812 debug!(
3813 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3814 ipv4, e
3815 );
3816 return vec![]; }
3818 }
3819 IpAddr::V6(ipv6) => {
3820 if let Err(e) = sock.set_multicast_if_v6(if_index) {
3821 debug!(
3822 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3823 ipv6, e
3824 );
3825 return vec![]; }
3827 }
3828 }
3829
3830 let packet_list = out.to_data_on_wire();
3831 for packet in packet_list.iter() {
3832 multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3833 }
3834 packet_list
3835}
3836
3837fn multicast_on_intf(
3839 packet: &[u8],
3840 if_name: &str,
3841 if_index: u32,
3842 if_addr: &IfAddr,
3843 socket: &PktInfoUdpSocket,
3844) {
3845 if packet.len() > MAX_MSG_ABSOLUTE {
3846 debug!("Drop over-sized packet ({})", packet.len());
3847 return;
3848 }
3849
3850 let addr: SocketAddr = match if_addr {
3851 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3852 if_addrs::IfAddr::V6(_) => {
3853 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3854 sock.set_scope_id(if_index); sock.into()
3856 }
3857 };
3858
3859 let sock_addr = addr.into();
3861 match socket.send_to(packet, &sock_addr) {
3862 Ok(sz) => trace!(
3863 "sent out {} bytes on interface {} (idx {}) addr {}",
3864 sz,
3865 if_name,
3866 if_index,
3867 if_addr.ip()
3868 ),
3869 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3870 }
3871}
3872
3873fn valid_instance_name(name: &str) -> bool {
3877 name.split('.').count() >= 5
3878}
3879
3880fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3881 monitors.retain(|sender| {
3882 if let Err(e) = sender.try_send(event.clone()) {
3883 debug!("notify_monitors: try_send: {}", &e);
3884 if matches!(e, TrySendError::Disconnected(_)) {
3885 return false; }
3887 }
3888 true
3889 });
3890}
3891
3892fn prepare_announce(
3895 info: &ServiceInfo,
3896 intf: &MyIntf,
3897 dns_registry: &mut DnsRegistry,
3898 is_ipv4: bool,
3899) -> Option<DnsOutgoing> {
3900 let intf_addrs = if is_ipv4 {
3901 info.get_addrs_on_my_intf_v4(intf)
3902 } else {
3903 info.get_addrs_on_my_intf_v6(intf)
3904 };
3905
3906 if intf_addrs.is_empty() {
3907 debug!(
3908 "prepare_announce: no valid addrs on interface {}",
3909 &intf.name
3910 );
3911 return None;
3912 }
3913
3914 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3916 Some(new_name) => new_name,
3917 None => info.get_fullname(),
3918 };
3919
3920 debug!(
3921 "prepare to announce service {service_fullname} on {:?}",
3922 &intf_addrs
3923 );
3924
3925 let mut probing_count = 0;
3926 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3927 let create_time = current_time_millis() + fastrand::u64(0..250);
3928
3929 out.add_answer_at_time(
3930 DnsPointer::new(
3931 info.get_type(),
3932 RRType::PTR,
3933 CLASS_IN,
3934 info.get_other_ttl(),
3935 service_fullname.to_string(),
3936 ),
3937 0,
3938 );
3939
3940 if let Some(sub) = info.get_subtype() {
3941 trace!("Adding subdomain {}", sub);
3942 out.add_answer_at_time(
3943 DnsPointer::new(
3944 sub,
3945 RRType::PTR,
3946 CLASS_IN,
3947 info.get_other_ttl(),
3948 service_fullname.to_string(),
3949 ),
3950 0,
3951 );
3952 }
3953
3954 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3956 Some(new_name) => new_name.to_string(),
3957 None => info.get_hostname().to_string(),
3958 };
3959
3960 let mut srv = DnsSrv::new(
3961 info.get_fullname(),
3962 CLASS_IN | CLASS_CACHE_FLUSH,
3963 info.get_host_ttl(),
3964 info.get_priority(),
3965 info.get_weight(),
3966 info.get_port(),
3967 hostname,
3968 );
3969
3970 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3971 srv.get_record_mut().set_new_name(new_name.to_string());
3972 }
3973
3974 if !info.requires_probe()
3975 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3976 {
3977 out.add_answer_at_time(srv, 0);
3978 } else {
3979 probing_count += 1;
3980 }
3981
3982 let mut txt = DnsTxt::new(
3985 info.get_fullname(),
3986 CLASS_IN | CLASS_CACHE_FLUSH,
3987 info.get_other_ttl(),
3988 info.generate_txt(),
3989 );
3990
3991 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3992 txt.get_record_mut().set_new_name(new_name.to_string());
3993 }
3994
3995 if !info.requires_probe()
3996 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3997 {
3998 out.add_answer_at_time(txt, 0);
3999 } else {
4000 probing_count += 1;
4001 }
4002
4003 let hostname = info.get_hostname();
4006 for address in intf_addrs {
4007 let mut dns_addr = DnsAddress::new(
4008 hostname,
4009 ip_address_rr_type(&address),
4010 CLASS_IN | CLASS_CACHE_FLUSH,
4011 info.get_host_ttl(),
4012 address,
4013 intf.into(),
4014 );
4015
4016 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4017 dns_addr.get_record_mut().set_new_name(new_name.to_string());
4018 }
4019
4020 if !info.requires_probe()
4021 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4022 {
4023 out.add_answer_at_time(dns_addr, 0);
4024 } else {
4025 probing_count += 1;
4026 }
4027 }
4028
4029 if probing_count > 0 {
4030 return None;
4031 }
4032
4033 Some(out)
4034}
4035
4036fn announce_service_on_intf(
4039 dns_registry: &mut DnsRegistry,
4040 info: &ServiceInfo,
4041 intf: &MyIntf,
4042 sock: &PktInfoUdpSocket,
4043) -> bool {
4044 let is_ipv4 = sock.domain() == Domain::IPV4;
4045 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4046 send_dns_outgoing(&out, intf, sock);
4047 return true;
4048 }
4049
4050 false
4051}
4052
4053fn name_change(original: &str) -> String {
4061 let mut parts: Vec<_> = original.split('.').collect();
4062 let Some(first_part) = parts.get_mut(0) else {
4063 return format!("{original} (2)");
4064 };
4065
4066 let mut new_name = format!("{first_part} (2)");
4067
4068 if let Some(paren_pos) = first_part.rfind(" (") {
4070 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4072 let absolute_end_pos = paren_pos + end_paren;
4073 if absolute_end_pos == first_part.len() - 1 {
4075 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4078 let base_name = &first_part[..paren_pos];
4079 new_name = format!("{} ({})", base_name, number + 1)
4080 }
4081 }
4082 }
4083 }
4084
4085 *first_part = &new_name;
4086 parts.join(".")
4087}
4088
4089fn hostname_change(original: &str) -> String {
4097 let mut parts: Vec<_> = original.split('.').collect();
4098 let Some(first_part) = parts.get_mut(0) else {
4099 return format!("{original}-2");
4100 };
4101
4102 let mut new_name = format!("{first_part}-2");
4103
4104 if let Some(hyphen_pos) = first_part.rfind('-') {
4106 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4108 let base_name = &first_part[..hyphen_pos];
4109 new_name = format!("{}-{}", base_name, number + 1);
4110 }
4111 }
4112
4113 *first_part = &new_name;
4114 parts.join(".")
4115}
4116
4117fn add_answer_with_additionals(
4118 out: &mut DnsOutgoing,
4119 msg: &DnsIncoming,
4120 service: &ServiceInfo,
4121 intf: &MyIntf,
4122 dns_registry: &DnsRegistry,
4123 is_ipv4: bool,
4124) {
4125 let intf_addrs = if is_ipv4 {
4126 service.get_addrs_on_my_intf_v4(intf)
4127 } else {
4128 service.get_addrs_on_my_intf_v6(intf)
4129 };
4130 if intf_addrs.is_empty() {
4131 trace!("No addrs on LAN of intf {:?}", intf);
4132 return;
4133 }
4134
4135 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4137 Some(new_name) => new_name,
4138 None => service.get_fullname(),
4139 };
4140
4141 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4142 Some(new_name) => new_name,
4143 None => service.get_hostname(),
4144 };
4145
4146 let ptr_added = out.add_answer(
4147 msg,
4148 DnsPointer::new(
4149 service.get_type(),
4150 RRType::PTR,
4151 CLASS_IN,
4152 service.get_other_ttl(),
4153 service_fullname.to_string(),
4154 ),
4155 );
4156
4157 if !ptr_added {
4158 trace!("answer was not added for msg {:?}", msg);
4159 return;
4160 }
4161
4162 if let Some(sub) = service.get_subtype() {
4163 trace!("Adding subdomain {}", sub);
4164 out.add_additional_answer(DnsPointer::new(
4165 sub,
4166 RRType::PTR,
4167 CLASS_IN,
4168 service.get_other_ttl(),
4169 service_fullname.to_string(),
4170 ));
4171 }
4172
4173 out.add_additional_answer(DnsSrv::new(
4176 service_fullname,
4177 CLASS_IN | CLASS_CACHE_FLUSH,
4178 service.get_host_ttl(),
4179 service.get_priority(),
4180 service.get_weight(),
4181 service.get_port(),
4182 hostname.to_string(),
4183 ));
4184
4185 out.add_additional_answer(DnsTxt::new(
4186 service_fullname,
4187 CLASS_IN | CLASS_CACHE_FLUSH,
4188 service.get_other_ttl(),
4189 service.generate_txt(),
4190 ));
4191
4192 for address in intf_addrs {
4193 out.add_additional_answer(DnsAddress::new(
4194 hostname,
4195 ip_address_rr_type(&address),
4196 CLASS_IN | CLASS_CACHE_FLUSH,
4197 service.get_host_ttl(),
4198 address,
4199 intf.into(),
4200 ));
4201 }
4202}
4203
4204fn check_probing(
4207 dns_registry: &mut DnsRegistry,
4208 timers: &mut BinaryHeap<Reverse<u64>>,
4209 now: u64,
4210) -> (DnsOutgoing, Vec<String>) {
4211 let mut expired_probes = Vec::new();
4212 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4213
4214 for (name, probe) in dns_registry.probing.iter_mut() {
4215 if now >= probe.next_send {
4216 if probe.expired(now) {
4217 expired_probes.push(name.clone());
4219 } else {
4220 out.add_question(name, RRType::ANY);
4221
4222 for record in probe.records.iter() {
4230 out.add_authority(record.clone());
4231 }
4232
4233 probe.update_next_send(now);
4234
4235 timers.push(Reverse(probe.next_send));
4237 }
4238 }
4239 }
4240
4241 (out, expired_probes)
4242}
4243
4244fn handle_expired_probes(
4249 expired_probes: Vec<String>,
4250 intf_name: &str,
4251 dns_registry: &mut DnsRegistry,
4252 monitors: &mut Vec<Sender<DaemonEvent>>,
4253) -> HashSet<String> {
4254 let mut waiting_services = HashSet::new();
4255
4256 for name in expired_probes {
4257 let Some(probe) = dns_registry.probing.remove(&name) else {
4258 continue;
4259 };
4260
4261 for record in probe.records.iter() {
4263 if let Some(new_name) = record.get_record().get_new_name() {
4264 dns_registry
4265 .name_changes
4266 .insert(name.clone(), new_name.to_string());
4267
4268 let event = DnsNameChange {
4269 original: record.get_record().get_original_name().to_string(),
4270 new_name: new_name.to_string(),
4271 rr_type: record.get_type(),
4272 intf_name: intf_name.to_string(),
4273 };
4274 debug!("Name change event: {:?}", &event);
4275 notify_monitors(monitors, DaemonEvent::NameChange(event));
4276 }
4277 }
4278
4279 debug!(
4281 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4282 probe.records.len(),
4283 probe.waiting_services.len(),
4284 );
4285
4286 if !probe.records.is_empty() {
4288 match dns_registry.active.get_mut(&name) {
4289 Some(records) => {
4290 records.extend(probe.records);
4291 }
4292 None => {
4293 dns_registry.active.insert(name, probe.records);
4294 }
4295 }
4296
4297 waiting_services.extend(probe.waiting_services);
4298 }
4299 }
4300
4301 waiting_services
4302}
4303
4304#[cfg(test)]
4305mod tests {
4306 use super::{
4307 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4308 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4309 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
4310 MDNS_PORT,
4311 };
4312 use crate::{
4313 dns_parser::{
4314 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4315 FLAGS_AA, FLAGS_QR_RESPONSE,
4316 },
4317 service_daemon::{add_answer_of_service, check_hostname},
4318 };
4319 use std::{
4320 net::{SocketAddr, SocketAddrV4},
4321 time::{Duration, SystemTime},
4322 };
4323 use test_log::test;
4324
4325 #[test]
4326 fn test_socketaddr_print() {
4327 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
4328 let print = format!("{}", addr);
4329 assert_eq!(print, "224.0.0.251:5353");
4330 }
4331
4332 #[test]
4333 fn test_instance_name() {
4334 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4335 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4336 assert!(!valid_instance_name("_printer._tcp.local."));
4337 }
4338
4339 #[test]
4340 fn test_check_service_name_length() {
4341 let result = check_service_name_length("_tcp", 100);
4342 assert!(result.is_err());
4343 if let Err(e) = result {
4344 println!("{}", e);
4345 }
4346 }
4347
4348 #[test]
4349 fn test_check_hostname() {
4350 for hostname in &[
4352 "my_host.local.",
4353 &("A".repeat(255 - ".local.".len()) + ".local."),
4354 ] {
4355 let result = check_hostname(hostname);
4356 assert!(result.is_ok());
4357 }
4358
4359 for hostname in &[
4361 "my_host.local",
4362 ".local.",
4363 &("A".repeat(256 - ".local.".len()) + ".local."),
4364 ] {
4365 let result = check_hostname(hostname);
4366 assert!(result.is_err());
4367 if let Err(e) = result {
4368 println!("{}", e);
4369 }
4370 }
4371 }
4372
4373 #[test]
4374 fn test_check_domain_suffix() {
4375 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4376 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4377 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4378 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4379 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4380 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4381 }
4382
4383 #[test]
4384 fn test_service_with_temporarily_invalidated_ptr() {
4385 let d = ServiceDaemon::new().expect("Failed to create daemon");
4387
4388 let service = "_test_inval_ptr._udp.local.";
4389 let host_name = "my_host_tmp_invalidated_ptr.local.";
4390 let intfs: Vec<_> = my_ip_interfaces(false);
4391 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4392 let port = 5201;
4393 let my_service =
4394 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4395 .expect("invalid service info")
4396 .enable_addr_auto();
4397 let result = d.register(my_service.clone());
4398 assert!(result.is_ok());
4399
4400 let browse_chan = d.browse(service).unwrap();
4402 let timeout = Duration::from_secs(2);
4403 let mut resolved = false;
4404
4405 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4406 match event {
4407 ServiceEvent::ServiceResolved(info) => {
4408 resolved = true;
4409 println!("Resolved a service of {}", &info.get_fullname());
4410 break;
4411 }
4412 e => {
4413 println!("Received event {:?}", e);
4414 }
4415 }
4416 }
4417
4418 assert!(resolved);
4419
4420 println!("Stopping browse of {}", service);
4421 d.stop_browse(service).unwrap();
4424
4425 let mut stopped = false;
4430 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4431 match event {
4432 ServiceEvent::SearchStopped(_) => {
4433 stopped = true;
4434 println!("Stopped browsing service");
4435 break;
4436 }
4437 e => {
4441 println!("Received event {:?}", e);
4442 }
4443 }
4444 }
4445
4446 assert!(stopped);
4447
4448 let invalidate_ptr_packet = DnsPointer::new(
4450 my_service.get_type(),
4451 RRType::PTR,
4452 CLASS_IN,
4453 0,
4454 my_service.get_fullname().to_string(),
4455 );
4456
4457 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4458 packet_buffer.add_additional_answer(invalidate_ptr_packet);
4459
4460 for intf in intfs {
4461 let sock = _new_socket_bind(&intf, true).unwrap();
4462 send_dns_outgoing_impl(
4463 &packet_buffer,
4464 &intf.name,
4465 intf.index.unwrap_or(0),
4466 &intf.addr,
4467 &sock.pktinfo,
4468 );
4469 }
4470
4471 println!(
4472 "Sent PTR record invalidation. Starting second browse for {}",
4473 service
4474 );
4475
4476 let browse_chan = d.browse(service).unwrap();
4478
4479 resolved = false;
4480 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4481 match event {
4482 ServiceEvent::ServiceResolved(info) => {
4483 resolved = true;
4484 println!("Resolved a service of {}", &info.get_fullname());
4485 break;
4486 }
4487 e => {
4488 println!("Received event {:?}", e);
4489 }
4490 }
4491 }
4492
4493 assert!(resolved);
4494 d.shutdown().unwrap();
4495 }
4496
4497 #[test]
4498 fn test_expired_srv() {
4499 let service_type = "_expired-srv._udp.local.";
4501 let instance = "test_instance";
4502 let host_name = "expired_srv_host.local.";
4503 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4504 .unwrap()
4505 .enable_addr_auto();
4506 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
4511
4512 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4514 let result = mdns_server.register(my_service);
4515 assert!(result.is_ok());
4516
4517 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4518 let browse_chan = mdns_client.browse(service_type).unwrap();
4519 let timeout = Duration::from_secs(2);
4520 let mut resolved = false;
4521
4522 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4523 match event {
4524 ServiceEvent::ServiceResolved(info) => {
4525 resolved = true;
4526 println!("Resolved a service of {}", &info.get_fullname());
4527 break;
4528 }
4529 _ => {}
4530 }
4531 }
4532
4533 assert!(resolved);
4534
4535 mdns_server.shutdown().unwrap();
4537
4538 let expire_timeout = Duration::from_secs(new_ttl as u64);
4540 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4541 match event {
4542 ServiceEvent::ServiceRemoved(service_type, full_name) => {
4543 println!("Service removed: {}: {}", &service_type, &full_name);
4544 break;
4545 }
4546 _ => {}
4547 }
4548 }
4549 }
4550
4551 #[test]
4552 fn test_hostname_resolution_address_removed() {
4553 let server = ServiceDaemon::new().expect("Failed to create server");
4555 let hostname = "addr_remove_host._tcp.local.";
4556 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4557 .iter()
4558 .find(|iface| iface.ip().is_ipv4())
4559 .map(|iface| iface.ip().into())
4560 .unwrap();
4561
4562 let mut my_service = ServiceInfo::new(
4563 "_host_res_test._tcp.local.",
4564 "my_instance",
4565 hostname,
4566 &service_ip_addr.to_ip_addr(),
4567 1234,
4568 None,
4569 )
4570 .expect("invalid service info");
4571
4572 let addr_ttl = 2;
4574 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
4577
4578 let client = ServiceDaemon::new().expect("Failed to create client");
4580 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4581 let resolved = loop {
4582 match event_receiver.recv() {
4583 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4584 assert!(found_hostname == hostname);
4585 assert!(addresses.contains(&service_ip_addr));
4586 println!("address found: {:?}", &addresses);
4587 break true;
4588 }
4589 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4590 Ok(_event) => {}
4591 Err(_) => break false,
4592 }
4593 };
4594
4595 assert!(resolved);
4596
4597 server.shutdown().unwrap();
4599
4600 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4602 let removed = loop {
4603 match event_receiver.recv_timeout(timeout) {
4604 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4605 assert!(removed_host == hostname);
4606 assert!(addresses.contains(&service_ip_addr));
4607
4608 println!(
4609 "address removed: hostname: {} addresses: {:?}",
4610 &hostname, &addresses
4611 );
4612 break true;
4613 }
4614 Ok(_event) => {}
4615 Err(_) => {
4616 break false;
4617 }
4618 }
4619 };
4620
4621 assert!(removed);
4622
4623 client.shutdown().unwrap();
4624 }
4625
4626 #[test]
4627 fn test_refresh_ptr() {
4628 let service_type = "_refresh-ptr._udp.local.";
4630 let instance = "test_instance";
4631 let host_name = "refresh_ptr_host.local.";
4632 let service_ip_addr = my_ip_interfaces(false)
4633 .iter()
4634 .find(|iface| iface.ip().is_ipv4())
4635 .map(|iface| iface.ip())
4636 .unwrap();
4637
4638 let mut my_service = ServiceInfo::new(
4639 service_type,
4640 instance,
4641 host_name,
4642 &service_ip_addr,
4643 5023,
4644 None,
4645 )
4646 .unwrap();
4647
4648 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4650
4651 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4653 let result = mdns_server.register(my_service);
4654 assert!(result.is_ok());
4655
4656 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4657 let browse_chan = mdns_client.browse(service_type).unwrap();
4658 let timeout = Duration::from_millis(1500); let mut resolved = false;
4660
4661 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4663 match event {
4664 ServiceEvent::ServiceResolved(info) => {
4665 resolved = true;
4666 println!("Resolved a service of {}", &info.get_fullname());
4667 break;
4668 }
4669 _ => {}
4670 }
4671 }
4672
4673 assert!(resolved);
4674
4675 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4677 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4678 println!("event: {:?}", &event);
4679 }
4680
4681 let metrics_chan = mdns_client.get_metrics().unwrap();
4683 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4684 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4685 assert_eq!(ptr_refresh_counter, 1);
4686 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4687 assert_eq!(srvtxt_refresh_counter, 1);
4688
4689 mdns_server.shutdown().unwrap();
4691 mdns_client.shutdown().unwrap();
4692 }
4693
4694 #[test]
4695 fn test_name_change() {
4696 assert_eq!(name_change("foo.local."), "foo (2).local.");
4697 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4698 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4699 assert_eq!(name_change("foo"), "foo (2)");
4700 assert_eq!(name_change("foo (2)"), "foo (3)");
4701 assert_eq!(name_change(""), " (2)");
4702
4703 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4708
4709 #[test]
4710 fn test_hostname_change() {
4711 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4712 assert_eq!(hostname_change("foo"), "foo-2");
4713 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4714 assert_eq!(hostname_change("foo-9"), "foo-10");
4715 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4716 }
4717
4718 #[test]
4719 fn test_add_answer_txt_ttl() {
4720 let service_type = "_test_add_answer._udp.local.";
4722 let instance = "test_instance";
4723 let host_name = "add_answer_host.local.";
4724 let service_intf = my_ip_interfaces(false)
4725 .into_iter()
4726 .find(|iface| iface.ip().is_ipv4())
4727 .unwrap();
4728 let service_ip_addr = service_intf.ip();
4729 let my_service = ServiceInfo::new(
4730 service_type,
4731 instance,
4732 host_name,
4733 &service_ip_addr,
4734 5023,
4735 None,
4736 )
4737 .unwrap();
4738
4739 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4741
4742 let mut dummy_data = out.to_data_on_wire();
4744 let interface_id = InterfaceId::from(&service_intf);
4745 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4746
4747 let if_addrs = vec![service_intf.ip()];
4749 add_answer_of_service(
4750 &mut out,
4751 &incoming,
4752 instance,
4753 &my_service,
4754 RRType::TXT,
4755 if_addrs,
4756 );
4757
4758 assert!(
4760 out.answers_count() > 0,
4761 "No answers added to the outgoing message"
4762 );
4763
4764 let answer = out._answers().first().unwrap();
4766 assert_eq!(answer.0.get_type(), RRType::TXT);
4767
4768 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4770 }
4771
4772 #[test]
4773 fn test_interface_flip() {
4774 let ty_domain = "_intf-flip._udp.local.";
4776 let host_name = "intf_flip.local.";
4777 let now = SystemTime::now()
4778 .duration_since(SystemTime::UNIX_EPOCH)
4779 .unwrap();
4780 let instance_name = now.as_micros().to_string(); let port = 5200;
4782
4783 let (ip_addr1, intf_name) = my_ip_interfaces(false)
4785 .iter()
4786 .find(|iface| iface.ip().is_ipv4())
4787 .map(|iface| (iface.ip(), iface.name.clone()))
4788 .unwrap();
4789
4790 println!("Using interface {} with IP {}", intf_name, ip_addr1);
4791
4792 let service1 =
4794 ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
4795 .expect("valid service info");
4796 let server1 = ServiceDaemon::new().expect("failed to start server");
4797 server1
4798 .register(service1)
4799 .expect("Failed to register service1");
4800
4801 std::thread::sleep(Duration::from_secs(2));
4803
4804 let client = ServiceDaemon::new().expect("failed to start client");
4806 client.use_service_data(true).unwrap();
4807
4808 let receiver = client.browse(ty_domain).unwrap();
4809
4810 let timeout = Duration::from_secs(3);
4811 let mut got_data = false;
4812
4813 while let Ok(event) = receiver.recv_timeout(timeout) {
4814 match event {
4815 ServiceEvent::ServiceData(_) => {
4816 println!("Received ServiceData event");
4817 got_data = true;
4818 break;
4819 }
4820 _ => {}
4821 }
4822 }
4823
4824 assert!(got_data, "Should receive ServiceData event");
4825
4826 client.set_ip_check_interval(1).unwrap();
4828
4829 println!("Shutting down interface {}", &intf_name);
4831 client.test_down_interface(&intf_name).unwrap();
4832
4833 let mut got_removed = false;
4834
4835 while let Ok(event) = receiver.recv_timeout(timeout) {
4836 match event {
4837 ServiceEvent::ServiceRemoved(ty_domain, instance) => {
4838 got_removed = true;
4839 println!("removed: {ty_domain} : {instance}");
4840 break;
4841 }
4842 _ => {}
4843 }
4844 }
4845 assert!(got_removed, "Should receive ServiceRemoved event");
4846
4847 println!("Bringing up interface {}", &intf_name);
4848 client.test_up_interface(&intf_name).unwrap();
4849 let mut got_data = false;
4850 while let Ok(event) = receiver.recv_timeout(timeout) {
4851 match event {
4852 ServiceEvent::ServiceData(resolved) => {
4853 got_data = true;
4854 println!("Received ServiceData: {:?}", resolved);
4855 break;
4856 }
4857 _ => {}
4858 }
4859 }
4860 assert!(
4861 got_data,
4862 "Should receive ServiceData event after interface is back up"
4863 );
4864
4865 server1.shutdown().unwrap();
4866 client.shutdown().unwrap();
4867 }
4868}