1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42 Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50 cmp::{self, Reverse},
51 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52 fmt, io,
53 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54 str, thread,
55 time::Duration,
56 vec,
57};
58
59pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78#[derive(Debug)]
80pub enum UnregisterStatus {
81 OK,
83 NotFound,
85}
86
87#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91 Running,
93
94 Shutdown,
96}
97
98#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102 Register,
103 RegisterResend,
104 Unregister,
105 UnregisterResend,
106 Browse,
107 ResolveHostname,
108 Respond,
109 CacheRefreshPTR,
110 CacheRefreshSrvTxt,
111 CacheRefreshAddr,
112 KnownAnswerSuppression,
113 CachedPTR,
114 CachedSRV,
115 CachedAddr,
116 CachedTxt,
117 CachedNSec,
118 CachedSubtype,
119 DnsRegistryProbe,
120 DnsRegistryActive,
121 DnsRegistryTimer,
122 DnsRegistryNameChange,
123 Timer,
124}
125
126impl fmt::Display for Counter {
127 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128 match self {
129 Self::Register => write!(f, "register"),
130 Self::RegisterResend => write!(f, "register-resend"),
131 Self::Unregister => write!(f, "unregister"),
132 Self::UnregisterResend => write!(f, "unregister-resend"),
133 Self::Browse => write!(f, "browse"),
134 Self::ResolveHostname => write!(f, "resolve-hostname"),
135 Self::Respond => write!(f, "respond"),
136 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140 Self::CachedPTR => write!(f, "cached-ptr"),
141 Self::CachedSRV => write!(f, "cached-srv"),
142 Self::CachedAddr => write!(f, "cached-addr"),
143 Self::CachedTxt => write!(f, "cached-txt"),
144 Self::CachedNSec => write!(f, "cached-nsec"),
145 Self::CachedSubtype => write!(f, "cached-subtype"),
146 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150 Self::Timer => write!(f, "timer"),
151 }
152 }
153}
154
155struct MyUdpSocket {
160 pktinfo: PktInfoUdpSocket,
163
164 mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171 let std_sock = pktinfo.try_clone_std()?;
172 let mio = MioUdpSocket::from_std(std_sock);
173
174 Ok(Self { pktinfo, mio })
175 }
176}
177
178impl Source for MyUdpSocket {
180 fn register(
181 &mut self,
182 registry: &Registry,
183 token: Token,
184 interests: Interest,
185 ) -> io::Result<()> {
186 self.mio.register(registry, token, interests)
187 }
188
189 fn reregister(
190 &mut self,
191 registry: &Registry,
192 token: Token,
193 interests: Interest,
194 ) -> io::Result<()> {
195 self.mio.reregister(registry, token, interests)
196 }
197
198 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199 self.mio.deregister(registry)
200 }
201}
202
203pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
215pub struct ServiceDaemon {
216 sender: Sender<Command>,
218
219 signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228 pub fn new() -> Result<Self> {
233 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237 let signal_sock = UdpSocket::bind(signal_addr)
238 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240 let signal_addr = signal_sock
242 .local_addr()
243 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245 signal_sock
247 .set_nonblocking(true)
248 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252 let (sender, receiver) = bounded(100);
253
254 let mio_sock = MioUdpSocket::from_std(signal_sock);
256 thread::Builder::new()
257 .name("mDNS_daemon".to_string())
258 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261 Ok(Self {
262 sender,
263 signal_addr,
264 })
265 }
266
267 fn send_cmd(&self, cmd: Command) -> Result<()> {
270 let cmd_name = cmd.to_string();
271
272 self.sender.try_send(cmd).map_err(|e| match e {
274 TrySendError::Full(_) => Error::Again,
275 e => e_fmt!("flume::channel::send failed: {}", e),
276 })?;
277
278 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280 let socket = UdpSocket::bind(addr)
281 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282 socket
283 .send_to(cmd_name.as_bytes(), self.signal_addr)
284 .map_err(|e| {
285 e_fmt!(
286 "signal socket send_to {} ({}) failed: {}",
287 self.signal_addr,
288 cmd_name,
289 e
290 )
291 })?;
292
293 Ok(())
294 }
295
296 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307 check_domain_suffix(service_type)?;
308
309 let (resp_s, resp_r) = bounded(10);
310 self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
311 Ok(resp_r)
312 }
313
314 pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
323 check_domain_suffix(service_type)?;
324
325 let (resp_s, resp_r) = bounded(10);
326 self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
327 Ok(resp_r)
328 }
329
330 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
335 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
336 }
337
338 pub fn resolve_hostname(
346 &self,
347 hostname: &str,
348 timeout: Option<u64>,
349 ) -> Result<Receiver<HostnameResolutionEvent>> {
350 check_hostname(hostname)?;
351 let (resp_s, resp_r) = bounded(10);
352 self.send_cmd(Command::ResolveHostname(
353 hostname.to_string(),
354 1,
355 resp_s,
356 timeout,
357 ))?;
358 Ok(resp_r)
359 }
360
361 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
366 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
367 }
368
369 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
377 check_service_name(service_info.get_fullname())?;
378 check_hostname(service_info.get_hostname())?;
379
380 self.send_cmd(Command::Register(service_info))
381 }
382
383 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
391 let (resp_s, resp_r) = bounded(1);
392 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
393 Ok(resp_r)
394 }
395
396 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
400 let (resp_s, resp_r) = bounded(100);
401 self.send_cmd(Command::Monitor(resp_s))?;
402 Ok(resp_r)
403 }
404
405 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
410 let (resp_s, resp_r) = bounded(1);
411 self.send_cmd(Command::Exit(resp_s))?;
412 Ok(resp_r)
413 }
414
415 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
421 let (resp_s, resp_r) = bounded(1);
422
423 if self.sender.is_disconnected() {
424 resp_s
425 .send(DaemonStatus::Shutdown)
426 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
427 } else {
428 self.send_cmd(Command::GetStatus(resp_s))?;
429 }
430
431 Ok(resp_r)
432 }
433
434 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
439 let (resp_s, resp_r) = bounded(1);
440 self.send_cmd(Command::GetMetrics(resp_s))?;
441 Ok(resp_r)
442 }
443
444 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
451 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
454 return Err(Error::Msg(format!(
455 "service name length max {len_max} is too large"
456 )));
457 }
458
459 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
460 }
461
462 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
468 let interval_in_millis = interval_in_secs as u64 * 1000;
469 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
470 interval_in_millis,
471 )))
472 }
473
474 pub fn get_ip_check_interval(&self) -> Result<u32> {
476 let (resp_s, resp_r) = bounded(1);
477 self.send_cmd(Command::GetOption(resp_s))?;
478
479 let option = resp_r
480 .recv_timeout(Duration::from_secs(10))
481 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
482 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
483 Ok(ip_check_interval_in_secs as u32)
484 }
485
486 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
493 let if_kind_vec = if_kind.into_vec();
494 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
495 if_kind_vec.kinds,
496 )))
497 }
498
499 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
506 let if_kind_vec = if_kind.into_vec();
507 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
508 if_kind_vec.kinds,
509 )))
510 }
511
512 pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
523 self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
524 }
525
526 #[cfg(test)]
527 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
528 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
529 ifname.to_string(),
530 )))
531 }
532
533 #[cfg(test)]
534 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
535 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
536 ifname.to_string(),
537 )))
538 }
539
540 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
556 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
557 }
558
559 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
575 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
576 }
577
578 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
591 self.send_cmd(Command::Verify(instance_fullname, timeout))
592 }
593
594 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
595 let mut zc = Zeroconf::new(signal_sock, poller);
596
597 if let Some(cmd) = zc.run(receiver) {
598 match cmd {
599 Command::Exit(resp_s) => {
600 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
603 debug!("exit: failed to send response of shutdown: {}", e);
604 }
605 }
606 _ => {
607 debug!("Unexpected command: {:?}", cmd);
608 }
609 }
610 }
611 }
612}
613
614fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
616 let intf_ip = &intf.ip();
619 match intf_ip {
620 IpAddr::V4(ip) => {
621 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
622 let sock = new_socket(addr.into(), true)?;
623
624 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
626 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
627
628 sock.set_multicast_if_v4(ip)
630 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
631
632 sock.set_multicast_ttl_v4(255)
637 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
638
639 if !should_loop {
640 sock.set_multicast_loop_v4(false)
641 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
642 }
643
644 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
646 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
647 for packet in test_packets {
648 sock.send_to(&packet, &multicast_addr)
649 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
650 }
651 MyUdpSocket::new(sock)
652 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
653 }
654 IpAddr::V6(ip) => {
655 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
656 let sock = new_socket(addr.into(), true)?;
657
658 let if_index = intf.index.unwrap_or(0);
659
660 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
662 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
663
664 sock.set_multicast_if_v6(if_index)
666 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
667
668 MyUdpSocket::new(sock)
673 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
674 }
675 }
676}
677
678fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
681 let domain = match addr {
682 SocketAddr::V4(_) => socket2::Domain::IPV4,
683 SocketAddr::V6(_) => socket2::Domain::IPV6,
684 };
685
686 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
687
688 fd.set_reuse_address(true)
689 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
690 #[cfg(unix)] fd.set_reuse_port(true)
692 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
693
694 if non_block {
695 fd.set_nonblocking(true)
696 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
697 }
698
699 fd.bind(&addr.into())
700 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
701
702 trace!("new socket bind to {}", &addr);
703 Ok(fd)
704}
705
706struct ReRun {
708 next_time: u64,
710 command: Command,
711}
712
713#[derive(Debug, Clone)]
717#[non_exhaustive]
718pub enum IfKind {
719 All,
721
722 IPv4,
724
725 IPv6,
727
728 Name(String),
730
731 Addr(IpAddr),
733
734 LoopbackV4,
739
740 LoopbackV6,
742}
743
744impl IfKind {
745 fn matches(&self, intf: &Interface) -> bool {
747 match self {
748 Self::All => true,
749 Self::IPv4 => intf.ip().is_ipv4(),
750 Self::IPv6 => intf.ip().is_ipv6(),
751 Self::Name(ifname) => ifname == &intf.name,
752 Self::Addr(addr) => addr == &intf.ip(),
753 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
754 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
755 }
756 }
757}
758
759impl From<&str> for IfKind {
762 fn from(val: &str) -> Self {
763 Self::Name(val.to_string())
764 }
765}
766
767impl From<&String> for IfKind {
768 fn from(val: &String) -> Self {
769 Self::Name(val.to_string())
770 }
771}
772
773impl From<IpAddr> for IfKind {
775 fn from(val: IpAddr) -> Self {
776 Self::Addr(val)
777 }
778}
779
780pub struct IfKindVec {
782 kinds: Vec<IfKind>,
783}
784
785pub trait IntoIfKindVec {
787 fn into_vec(self) -> IfKindVec;
788}
789
790impl<T: Into<IfKind>> IntoIfKindVec for T {
791 fn into_vec(self) -> IfKindVec {
792 let if_kind: IfKind = self.into();
793 IfKindVec {
794 kinds: vec![if_kind],
795 }
796 }
797}
798
799impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
800 fn into_vec(self) -> IfKindVec {
801 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
802 IfKindVec { kinds }
803 }
804}
805
806struct IfSelection {
808 if_kind: IfKind,
810
811 selected: bool,
813}
814
815struct Zeroconf {
817 my_intfs: HashMap<u32, MyIntf>,
819
820 ipv4_sock: MyUdpSocket,
822
823 ipv6_sock: MyUdpSocket,
825
826 my_services: HashMap<String, ServiceInfo>,
828
829 cache: DnsCache,
831
832 dns_registry_map: HashMap<u32, DnsRegistry>,
834
835 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
846
847 counters: Metrics,
848
849 poller: Poll,
851
852 monitors: Vec<Sender<DaemonEvent>>,
854
855 service_name_len_max: u8,
857
858 ip_check_interval: u64,
860
861 if_selections: Vec<IfSelection>,
863
864 signal_sock: MioUdpSocket,
866
867 timers: BinaryHeap<Reverse<u64>>,
873
874 status: DaemonStatus,
875
876 pending_resolves: HashSet<String>,
878
879 resolved: HashSet<String>,
881
882 multicast_loop_v4: bool,
883
884 multicast_loop_v6: bool,
885
886 accept_unsolicited: bool,
887
888 #[cfg(test)]
889 test_down_interfaces: HashSet<String>,
890}
891
892fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
894 let intf_ip = &intf.ip();
895 match intf_ip {
896 IpAddr::V4(ip) => {
897 debug!("join multicast group V4 on addr {}", ip);
899 my_sock
900 .join_multicast_v4(&GROUP_ADDR_V4, ip)
901 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
902 }
903 IpAddr::V6(ip) => {
904 let if_index = intf.index.unwrap_or(0);
905 debug!(
907 "join multicast group V6 on addr {} with index {}",
908 ip, if_index
909 );
910 my_sock
911 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
912 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
913 }
914 }
915 Ok(())
916}
917
918impl Zeroconf {
919 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
920 let my_ifaddrs = my_ip_interfaces(false);
922
923 let mut my_intfs = HashMap::new();
927 let mut dns_registry_map = HashMap::new();
928
929 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
932 let sock = new_socket(addr.into(), true).unwrap();
933
934 sock.set_multicast_ttl_v4(255)
939 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
940 .unwrap();
941
942 let ipv4_sock = MyUdpSocket::new(sock).unwrap();
944
945 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
946 let sock = new_socket(addr.into(), true).unwrap();
947
948 sock.set_multicast_hops_v6(255)
952 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
953 .unwrap();
954
955 let ipv6_sock = MyUdpSocket::new(sock).unwrap();
957
958 for intf in my_ifaddrs {
960 let sock = if intf.ip().is_ipv4() {
961 &ipv4_sock
962 } else {
963 &ipv6_sock
964 };
965
966 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
967 debug!(
968 "config socket to join multicast: {}: {e}. Skipped.",
969 &intf.ip()
970 );
971 }
972
973 let if_index = intf.index.unwrap_or(0);
974
975 dns_registry_map
977 .entry(if_index)
978 .or_insert_with(DnsRegistry::new);
979
980 my_intfs
981 .entry(if_index)
982 .and_modify(|v: &mut MyIntf| {
983 v.addrs.insert(intf.addr.clone());
984 })
985 .or_insert(MyIntf {
986 name: intf.name.clone(),
987 index: if_index,
988 addrs: HashSet::from([intf.addr]),
989 });
990 }
991
992 let monitors = Vec::new();
993 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
994 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
995
996 let timers = BinaryHeap::new();
997
998 let if_selections = vec![
1000 IfSelection {
1001 if_kind: IfKind::LoopbackV4,
1002 selected: false,
1003 },
1004 IfSelection {
1005 if_kind: IfKind::LoopbackV6,
1006 selected: false,
1007 },
1008 ];
1009
1010 let status = DaemonStatus::Running;
1011
1012 Self {
1013 my_intfs,
1014 ipv4_sock,
1015 ipv6_sock,
1016 my_services: HashMap::new(),
1018 cache: DnsCache::new(),
1019 dns_registry_map,
1020 hostname_resolvers: HashMap::new(),
1021 service_queriers: HashMap::new(),
1022 retransmissions: Vec::new(),
1023 counters: HashMap::new(),
1024 poller,
1025 monitors,
1026 service_name_len_max,
1027 ip_check_interval,
1028 if_selections,
1029 signal_sock,
1030 timers,
1031 status,
1032 pending_resolves: HashSet::new(),
1033 resolved: HashSet::new(),
1034 multicast_loop_v4: true,
1035 multicast_loop_v6: true,
1036 accept_unsolicited: false,
1037
1038 #[cfg(test)]
1039 test_down_interfaces: HashSet::new(),
1040 }
1041 }
1042
1043 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1052 if let Err(e) = self.poller.registry().register(
1054 &mut self.signal_sock,
1055 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1056 mio::Interest::READABLE,
1057 ) {
1058 debug!("failed to add signal socket to the poller: {}", e);
1059 return None;
1060 }
1061
1062 if let Err(e) = self.poller.registry().register(
1063 &mut self.ipv4_sock,
1064 mio::Token(IPV4_SOCK_EVENT_KEY),
1065 mio::Interest::READABLE,
1066 ) {
1067 debug!("failed to register ipv4 socket: {}", e);
1068 return None;
1069 }
1070
1071 if let Err(e) = self.poller.registry().register(
1072 &mut self.ipv6_sock,
1073 mio::Token(IPV6_SOCK_EVENT_KEY),
1074 mio::Interest::READABLE,
1075 ) {
1076 debug!("failed to register ipv6 socket: {}", e);
1077 return None;
1078 }
1079
1080 let mut next_ip_check = if self.ip_check_interval > 0 {
1082 current_time_millis() + self.ip_check_interval
1083 } else {
1084 0
1085 };
1086
1087 if next_ip_check > 0 {
1088 self.add_timer(next_ip_check);
1089 }
1090
1091 let mut events = mio::Events::with_capacity(1024);
1094 loop {
1095 let now = current_time_millis();
1096
1097 let earliest_timer = self.peek_earliest_timer();
1098 let timeout = earliest_timer.map(|timer| {
1099 let millis = if timer > now { timer - now } else { 1 };
1101 Duration::from_millis(millis)
1102 });
1103
1104 events.clear();
1106 match self.poller.poll(&mut events, timeout) {
1107 Ok(_) => self.handle_poller_events(&events),
1108 Err(e) => debug!("failed to select from sockets: {}", e),
1109 }
1110
1111 let now = current_time_millis();
1112
1113 self.pop_timers_till(now);
1115
1116 for hostname in self
1118 .hostname_resolvers
1119 .clone()
1120 .into_iter()
1121 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1122 .map(|(hostname, _)| hostname)
1123 {
1124 trace!("hostname resolver timeout for {}", &hostname);
1125 call_hostname_resolution_listener(
1126 &self.hostname_resolvers,
1127 &hostname,
1128 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1129 );
1130 call_hostname_resolution_listener(
1131 &self.hostname_resolvers,
1132 &hostname,
1133 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1134 );
1135 self.hostname_resolvers.remove(&hostname);
1136 }
1137
1138 while let Ok(command) = receiver.try_recv() {
1140 if matches!(command, Command::Exit(_)) {
1141 self.status = DaemonStatus::Shutdown;
1142 return Some(command);
1143 }
1144 self.exec_command(command, false);
1145 }
1146
1147 let mut i = 0;
1149 while i < self.retransmissions.len() {
1150 if now >= self.retransmissions[i].next_time {
1151 let rerun = self.retransmissions.remove(i);
1152 self.exec_command(rerun.command, true);
1153 } else {
1154 i += 1;
1155 }
1156 }
1157
1158 self.refresh_active_services();
1160
1161 let mut query_count = 0;
1163 for (hostname, _sender) in self.hostname_resolvers.iter() {
1164 for (hostname, ip_addr) in
1165 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1166 {
1167 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1168 query_count += 1;
1169 }
1170 }
1171
1172 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1173
1174 let now = current_time_millis();
1176
1177 let expired_services = self.cache.evict_expired_services(now);
1179 if !expired_services.is_empty() {
1180 debug!(
1181 "run: send {} service removal to listeners",
1182 expired_services.len()
1183 );
1184 self.notify_service_removal(expired_services);
1185 }
1186
1187 let expired_addrs = self.cache.evict_expired_addr(now);
1189 for (hostname, addrs) in expired_addrs {
1190 call_hostname_resolution_listener(
1191 &self.hostname_resolvers,
1192 &hostname,
1193 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1194 );
1195 let instances = self.cache.get_instances_on_host(&hostname);
1196 let instance_set: HashSet<String> = instances.into_iter().collect();
1197 self.resolve_updated_instances(&instance_set);
1198 }
1199
1200 self.probing_handler();
1202
1203 if now >= next_ip_check && next_ip_check > 0 {
1205 next_ip_check = now + self.ip_check_interval;
1206 self.add_timer(next_ip_check);
1207
1208 self.check_ip_changes();
1209 }
1210 }
1211 }
1212
1213 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1214 match daemon_opt {
1215 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1216 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1217 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1218 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1219 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1220 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1221 DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1222 #[cfg(test)]
1223 DaemonOption::TestDownInterface(ifname) => {
1224 self.test_down_interfaces.insert(ifname);
1225 }
1226 #[cfg(test)]
1227 DaemonOption::TestUpInterface(ifname) => {
1228 self.test_down_interfaces.remove(&ifname);
1229 }
1230 }
1231 }
1232
1233 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1234 debug!("enable_interface: {:?}", kinds);
1235 for if_kind in kinds {
1236 self.if_selections.push(IfSelection {
1237 if_kind,
1238 selected: true,
1239 });
1240 }
1241
1242 self.apply_intf_selections(my_ip_interfaces(true));
1243 }
1244
1245 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1246 debug!("disable_interface: {:?}", kinds);
1247 for if_kind in kinds {
1248 self.if_selections.push(IfSelection {
1249 if_kind,
1250 selected: false,
1251 });
1252 }
1253
1254 self.apply_intf_selections(my_ip_interfaces(true));
1255 }
1256
1257 fn set_multicast_loop_v4(&mut self, on: bool) {
1258 self.multicast_loop_v4 = on;
1259 self.ipv4_sock
1260 .pktinfo
1261 .set_multicast_loop_v4(on)
1262 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1263 .unwrap();
1264 }
1265
1266 fn set_multicast_loop_v6(&mut self, on: bool) {
1267 self.multicast_loop_v6 = on;
1268 self.ipv6_sock
1269 .pktinfo
1270 .set_multicast_loop_v6(on)
1271 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1272 .unwrap();
1273 }
1274
1275 fn set_accept_unsolicited(&mut self, accept: bool) {
1276 self.accept_unsolicited = accept;
1277 }
1278
1279 fn notify_monitors(&mut self, event: DaemonEvent) {
1280 self.monitors.retain(|sender| {
1282 if let Err(e) = sender.try_send(event.clone()) {
1283 debug!("notify_monitors: try_send: {}", &e);
1284 if matches!(e, TrySendError::Disconnected(_)) {
1285 return false; }
1287 }
1288 true
1289 });
1290 }
1291
1292 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1294 for (_, service_info) in self.my_services.iter_mut() {
1295 if service_info.is_addr_auto() {
1296 service_info.remove_ipaddr(addr);
1297 }
1298 }
1299 }
1300
1301 fn add_timer(&mut self, next_time: u64) {
1302 self.timers.push(Reverse(next_time));
1303 }
1304
1305 fn peek_earliest_timer(&self) -> Option<u64> {
1306 self.timers.peek().map(|Reverse(v)| *v)
1307 }
1308
1309 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1310 self.timers.pop().map(|Reverse(v)| v)
1311 }
1312
1313 fn pop_timers_till(&mut self, now: u64) {
1315 while let Some(Reverse(v)) = self.timers.peek() {
1316 if *v > now {
1317 break;
1318 }
1319 self.timers.pop();
1320 }
1321 }
1322
1323 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1325 let intf_count = interfaces.len();
1326 let mut intf_selections = vec![true; intf_count];
1327
1328 for selection in self.if_selections.iter() {
1330 for i in 0..intf_count {
1332 if selection.if_kind.matches(&interfaces[i]) {
1333 intf_selections[i] = selection.selected;
1334 }
1335 }
1336 }
1337
1338 let mut selected_addrs = HashSet::new();
1339 for i in 0..intf_count {
1340 if intf_selections[i] {
1341 selected_addrs.insert(interfaces[i].addr.ip());
1342 }
1343 }
1344
1345 selected_addrs
1346 }
1347
1348 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1353 let intf_count = interfaces.len();
1355 let mut intf_selections = vec![true; intf_count];
1356
1357 for selection in self.if_selections.iter() {
1359 for i in 0..intf_count {
1361 if selection.if_kind.matches(&interfaces[i]) {
1362 intf_selections[i] = selection.selected;
1363 }
1364 }
1365 }
1366
1367 for (idx, intf) in interfaces.into_iter().enumerate() {
1369 if intf_selections[idx] {
1370 self.add_interface(intf);
1372 } else {
1373 self.del_interface(&intf);
1375 }
1376 }
1377 }
1378
1379 fn del_ip(&mut self, ip: IpAddr) {
1380 self.del_addr_in_my_services(&ip);
1381 self.notify_monitors(DaemonEvent::IpDel(ip));
1382 }
1383
1384 fn check_ip_changes(&mut self) {
1386 let my_ifaddrs = my_ip_interfaces(true);
1388
1389 #[cfg(test)]
1390 let my_ifaddrs: Vec<_> = my_ifaddrs
1391 .into_iter()
1392 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1393 .collect();
1394
1395 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1396 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1397 let if_index = intf.index.unwrap_or(0);
1398 acc.entry(if_index).or_default().push(&intf.addr);
1399 acc
1400 });
1401
1402 let mut deleted_intfs = Vec::new();
1403 let mut deleted_ips = Vec::new();
1404
1405 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1406 let mut last_ipv4 = None;
1407 let mut last_ipv6 = None;
1408
1409 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1410 my_intf.addrs.retain(|addr| {
1411 if current_addrs.contains(&addr) {
1412 true
1413 } else {
1414 match addr.ip() {
1415 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1416 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1417 }
1418 deleted_ips.push(addr.ip());
1419 false
1420 }
1421 });
1422 if my_intf.addrs.is_empty() {
1423 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1424 }
1425 } else {
1426 debug!(
1428 "check_ip_changes: interface {} ({}) no longer exists, removing",
1429 my_intf.name, if_index
1430 );
1431 for addr in my_intf.addrs.iter() {
1432 match addr.ip() {
1433 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1434 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1435 }
1436 deleted_ips.push(addr.ip())
1437 }
1438 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1439 }
1440 }
1441
1442 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1443 debug!(
1444 "check_ip_changes: {} deleted ips {} deleted intfs",
1445 deleted_ips.len(),
1446 deleted_intfs.len()
1447 );
1448 }
1449
1450 for ip in deleted_ips {
1451 self.del_ip(ip);
1452 }
1453
1454 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1455 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1456 continue;
1457 };
1458
1459 if let Some(ipv4) = last_ipv4 {
1460 debug!("leave multicast for {ipv4}");
1461 if let Err(e) = self
1462 .ipv4_sock
1463 .pktinfo
1464 .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1465 {
1466 debug!("leave multicast group for addr {ipv4}: {e}");
1467 }
1468 }
1469
1470 if let Some(ipv6) = last_ipv6 {
1471 debug!("leave multicast for {ipv6}");
1472 if let Err(e) = self
1473 .ipv6_sock
1474 .pktinfo
1475 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1476 {
1477 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1478 }
1479 }
1480
1481 let intf_id = InterfaceId {
1483 name: my_intf.name.to_string(),
1484 index: my_intf.index,
1485 };
1486 let removed_instances = self.cache.remove_records_on_intf(intf_id);
1487 self.notify_service_removal(removed_instances);
1488 }
1489
1490 self.apply_intf_selections(my_ifaddrs);
1492 }
1493
1494 fn del_interface(&mut self, intf: &Interface) {
1495 let if_index = intf.index.unwrap_or(0);
1496 trace!(
1497 "del_interface: {} ({if_index}) addr {}",
1498 intf.name,
1499 intf.ip()
1500 );
1501
1502 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1503 debug!("del_interface: interface {} not found", intf.name);
1504 return;
1505 };
1506
1507 let mut ip_removed = false;
1508
1509 if my_intf.addrs.remove(&intf.addr) {
1510 ip_removed = true;
1511
1512 match intf.addr.ip() {
1513 IpAddr::V4(ipv4) => {
1514 if my_intf.next_ifaddr_v4().is_none() {
1515 if let Err(e) = self
1516 .ipv4_sock
1517 .pktinfo
1518 .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1519 {
1520 debug!("leave multicast group for addr {ipv4}: {e}");
1521 }
1522 }
1523 }
1524
1525 IpAddr::V6(ipv6) => {
1526 if my_intf.next_ifaddr_v6().is_none() {
1527 if let Err(e) = self
1528 .ipv6_sock
1529 .pktinfo
1530 .leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1531 {
1532 debug!("leave multicast group for addr {ipv6}: {e}");
1533 }
1534 }
1535 }
1536 }
1537
1538 if my_intf.addrs.is_empty() {
1539 debug!("del_interface: removing interface {}", intf.name);
1541 self.my_intfs.remove(&if_index);
1542 self.dns_registry_map.remove(&if_index);
1543 self.cache.remove_addrs_on_disabled_intf(if_index);
1544 }
1545 }
1546
1547 if ip_removed {
1548 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1550 self.del_addr_in_my_services(&intf.ip());
1552 }
1553 }
1554
1555 fn add_interface(&mut self, intf: Interface) {
1556 let sock = if intf.ip().is_ipv4() {
1557 &self.ipv4_sock
1558 } else {
1559 &self.ipv6_sock
1560 };
1561
1562 let if_index = intf.index.unwrap_or(0);
1563 let mut new_addr = false;
1564
1565 match self.my_intfs.entry(if_index) {
1566 Entry::Occupied(mut entry) => {
1567 let my_intf = entry.get_mut();
1569 if !my_intf.addrs.contains(&intf.addr) {
1570 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1571 debug!("add_interface: socket_config {}: {e}", &intf.name);
1572 }
1573 my_intf.addrs.insert(intf.addr.clone());
1574 new_addr = true;
1575 }
1576 }
1577 Entry::Vacant(entry) => {
1578 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1579 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1580 return;
1581 }
1582
1583 new_addr = true;
1584 let new_intf = MyIntf {
1585 name: intf.name.clone(),
1586 index: if_index,
1587 addrs: HashSet::from([intf.addr.clone()]),
1588 };
1589 entry.insert(new_intf);
1590 }
1591 }
1592
1593 if !new_addr {
1594 trace!("add_interface: interface {} already exists", &intf.name);
1595 return;
1596 }
1597
1598 debug!("add new interface {}: {}", intf.name, intf.ip());
1599
1600 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1601 debug!("add_interface: cannot find if_index {if_index}");
1602 return;
1603 };
1604
1605 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1606 Some(registry) => registry,
1607 None => self
1608 .dns_registry_map
1609 .entry(if_index)
1610 .or_insert_with(DnsRegistry::new),
1611 };
1612
1613 for (_, service_info) in self.my_services.iter_mut() {
1614 if service_info.is_addr_auto() {
1615 let new_ip = intf.ip();
1616 service_info.insert_ipaddr(new_ip);
1617
1618 if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1619 debug!(
1620 "Announce service {} on {}",
1621 service_info.get_fullname(),
1622 intf.ip()
1623 );
1624 service_info.set_status(if_index, ServiceStatus::Announced);
1625 } else {
1626 for timer in dns_registry.new_timers.drain(..) {
1627 self.timers.push(Reverse(timer));
1628 }
1629 service_info.set_status(if_index, ServiceStatus::Probing);
1630 }
1631 }
1632 }
1633
1634 let mut browse_reruns = Vec::new();
1636 let mut i = 0;
1637 while i < self.retransmissions.len() {
1638 if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1639 browse_reruns.push(self.retransmissions.remove(i));
1640 } else {
1641 i += 1;
1642 }
1643 }
1644
1645 for rerun in browse_reruns {
1646 self.exec_command(rerun.command, true);
1647 }
1648
1649 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1651 }
1652
1653 fn register_service(&mut self, mut info: ServiceInfo) {
1662 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1664 debug!("check_service_name_length: {}", &e);
1665 self.notify_monitors(DaemonEvent::Error(e));
1666 return;
1667 }
1668
1669 if info.is_addr_auto() {
1670 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1671 for addr in selected_addrs {
1672 info.insert_ipaddr(addr);
1673 }
1674 }
1675
1676 debug!("register service {:?}", &info);
1677
1678 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1679 if !outgoing_addrs.is_empty() {
1680 self.notify_monitors(DaemonEvent::Announce(
1681 info.get_fullname().to_string(),
1682 format!("{:?}", &outgoing_addrs),
1683 ));
1684 }
1685
1686 let service_fullname = info.get_fullname().to_lowercase();
1689 self.my_services.insert(service_fullname, info);
1690 }
1691
1692 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1695 let mut outgoing_addrs = Vec::new();
1696 let mut outgoing_intfs = HashSet::new();
1697
1698 for (if_index, intf) in self.my_intfs.iter() {
1699 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1700 Some(registry) => registry,
1701 None => self
1702 .dns_registry_map
1703 .entry(*if_index)
1704 .or_insert_with(DnsRegistry::new),
1705 };
1706
1707 let mut announced = false;
1708
1709 if announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo) {
1711 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1712 outgoing_addrs.push(addr.ip());
1713 }
1714 outgoing_intfs.insert(intf.index);
1715
1716 debug!(
1717 "Announce service IPv4 {} on {}",
1718 info.get_fullname(),
1719 intf.name
1720 );
1721 announced = true;
1722 }
1723
1724 if announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo) {
1725 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1726 outgoing_addrs.push(addr.ip());
1727 }
1728 outgoing_intfs.insert(intf.index);
1729
1730 debug!(
1731 "Announce service IPv6 {} on {}",
1732 info.get_fullname(),
1733 intf.name
1734 );
1735 announced = true;
1736 }
1737
1738 if announced {
1739 info.set_status(intf.index, ServiceStatus::Announced);
1740 } else {
1741 for timer in dns_registry.new_timers.drain(..) {
1742 self.timers.push(Reverse(timer));
1743 }
1744 info.set_status(*if_index, ServiceStatus::Probing);
1745 }
1746 }
1747
1748 let next_time = current_time_millis() + 1000;
1752 for if_index in outgoing_intfs {
1753 self.add_retransmission(
1754 next_time,
1755 Command::RegisterResend(info.get_fullname().to_string(), if_index),
1756 );
1757 }
1758
1759 outgoing_addrs
1760 }
1761
1762 fn probing_handler(&mut self) {
1764 let now = current_time_millis();
1765
1766 for (if_index, intf) in self.my_intfs.iter() {
1767 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1768 continue;
1769 };
1770
1771 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1772
1773 if !out.questions().is_empty() {
1775 trace!("sending out probing of questions: {:?}", out.questions());
1776 send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1777 send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1778 }
1779
1780 let waiting_services =
1782 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1783
1784 for service_name in waiting_services {
1785 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1787 if info.get_status(*if_index) == ServiceStatus::Announced {
1788 debug!("service {} already announced", info.get_fullname());
1789 continue;
1790 }
1791
1792 let announced_v4 =
1793 announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
1794 let announced_v6 =
1795 announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
1796
1797 if announced_v4 || announced_v6 {
1798 let next_time = now + 1000;
1799 let command =
1800 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1801 self.retransmissions.push(ReRun { next_time, command });
1802 self.timers.push(Reverse(next_time));
1803
1804 let fullname = match dns_registry.name_changes.get(&service_name) {
1805 Some(new_name) => new_name.to_string(),
1806 None => service_name.to_string(),
1807 };
1808
1809 let mut hostname = info.get_hostname();
1810 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1811 hostname = new_name;
1812 }
1813
1814 debug!("wake up: announce service {} on {}", fullname, intf.name);
1815 notify_monitors(
1816 &mut self.monitors,
1817 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1818 );
1819
1820 info.set_status(*if_index, ServiceStatus::Announced);
1821 }
1822 }
1823 }
1824 }
1825 }
1826
1827 fn unregister_service(
1828 &self,
1829 info: &ServiceInfo,
1830 intf: &MyIntf,
1831 sock: &PktInfoUdpSocket,
1832 ) -> Vec<u8> {
1833 let is_ipv4 = sock.domain() == Domain::IPV4;
1834
1835 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1836 out.add_answer_at_time(
1837 DnsPointer::new(
1838 info.get_type(),
1839 RRType::PTR,
1840 CLASS_IN,
1841 0,
1842 info.get_fullname().to_string(),
1843 ),
1844 0,
1845 );
1846
1847 if let Some(sub) = info.get_subtype() {
1848 trace!("Adding subdomain {}", sub);
1849 out.add_answer_at_time(
1850 DnsPointer::new(
1851 sub,
1852 RRType::PTR,
1853 CLASS_IN,
1854 0,
1855 info.get_fullname().to_string(),
1856 ),
1857 0,
1858 );
1859 }
1860
1861 out.add_answer_at_time(
1862 DnsSrv::new(
1863 info.get_fullname(),
1864 CLASS_IN | CLASS_CACHE_FLUSH,
1865 0,
1866 info.get_priority(),
1867 info.get_weight(),
1868 info.get_port(),
1869 info.get_hostname().to_string(),
1870 ),
1871 0,
1872 );
1873 out.add_answer_at_time(
1874 DnsTxt::new(
1875 info.get_fullname(),
1876 CLASS_IN | CLASS_CACHE_FLUSH,
1877 0,
1878 info.generate_txt(),
1879 ),
1880 0,
1881 );
1882
1883 let if_addrs = if is_ipv4 {
1884 info.get_addrs_on_my_intf_v4(intf)
1885 } else {
1886 info.get_addrs_on_my_intf_v6(intf)
1887 };
1888
1889 if if_addrs.is_empty() {
1890 return vec![];
1891 }
1892
1893 for address in if_addrs {
1894 out.add_answer_at_time(
1895 DnsAddress::new(
1896 info.get_hostname(),
1897 ip_address_rr_type(&address),
1898 CLASS_IN | CLASS_CACHE_FLUSH,
1899 0,
1900 address,
1901 intf.into(),
1902 ),
1903 0,
1904 );
1905 }
1906
1907 send_dns_outgoing(&out, intf, sock).remove(0)
1909 }
1910
1911 fn add_hostname_resolver(
1915 &mut self,
1916 hostname: String,
1917 listener: Sender<HostnameResolutionEvent>,
1918 timeout: Option<u64>,
1919 ) {
1920 let real_timeout = timeout.map(|t| current_time_millis() + t);
1921 self.hostname_resolvers
1922 .insert(hostname.to_lowercase(), (listener, real_timeout));
1923 if let Some(t) = real_timeout {
1924 self.add_timer(t);
1925 }
1926 }
1927
1928 fn send_query(&self, name: &str, qtype: RRType) {
1930 self.send_query_vec(&[(name, qtype)]);
1931 }
1932
1933 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1935 trace!("Sending query questions: {:?}", questions);
1936 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1937 let now = current_time_millis();
1938
1939 for (name, qtype) in questions {
1940 out.add_question(name, *qtype);
1941
1942 for record in self.cache.get_known_answers(name, *qtype, now) {
1943 trace!("add known answer: {:?}", record.record);
1951 let mut new_record = record.record.clone();
1952 new_record.get_record_mut().update_ttl(now);
1953 out.add_answer_box(new_record);
1954 }
1955 }
1956
1957 for (_, intf) in self.my_intfs.iter() {
1958 send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1959 send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1960 }
1961 }
1962
1963 fn handle_read(&mut self, event_key: usize) -> bool {
1968 let sock = match event_key {
1969 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
1970 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
1971 _ => {
1972 debug!("handle_read: unknown token {}", event_key);
1973 return false;
1974 }
1975 };
1976 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1977
1978 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
1985 Ok(sz) => sz,
1986 Err(e) => {
1987 if e.kind() != std::io::ErrorKind::WouldBlock {
1988 debug!("listening socket read failed: {}", e);
1989 }
1990 return false;
1991 }
1992 };
1993
1994 let pkt_if_index = pktinfo.if_index as u32;
1996 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
1997 debug!(
1998 "handle_read: no interface found for pktinfo if_index: {}",
1999 pktinfo.if_index
2000 );
2001 return true; };
2003
2004 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
2007 Ok(msg) => {
2008 if msg.is_query() {
2009 self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2010 } else if msg.is_response() {
2011 self.handle_response(msg, pkt_if_index);
2012 } else {
2013 debug!("Invalid message: not query and not response");
2014 }
2015 }
2016 Err(e) => debug!("Invalid incoming DNS message: {}", e),
2017 }
2018
2019 true
2020 }
2021
2022 fn query_unresolved(&mut self, instance: &str) -> bool {
2024 if !valid_instance_name(instance) {
2025 trace!("instance name {} not valid", instance);
2026 return false;
2027 }
2028
2029 if let Some(records) = self.cache.get_srv(instance) {
2030 for record in records {
2031 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2032 if self.cache.get_addr(srv.host()).is_none() {
2033 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2034 return true;
2035 }
2036 }
2037 }
2038 } else {
2039 self.send_query(instance, RRType::ANY);
2040 return true;
2041 }
2042
2043 false
2044 }
2045
2046 fn query_cache_for_service(
2049 &mut self,
2050 ty_domain: &str,
2051 sender: &Sender<ServiceEvent>,
2052 now: u64,
2053 ) {
2054 let mut resolved: HashSet<String> = HashSet::new();
2055 let mut unresolved: HashSet<String> = HashSet::new();
2056
2057 if let Some(records) = self.cache.get_ptr(ty_domain) {
2058 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2059 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2060 let mut new_event = None;
2061 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2062 Ok(resolved_service) => {
2063 if resolved_service.is_valid() {
2064 debug!("Resolved service from cache: {}", ptr.alias());
2065 new_event =
2066 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2067 } else {
2068 debug!("Resolved service is not valid: {}", ptr.alias());
2069 }
2070 }
2071 Err(err) => {
2072 debug!("Error while resolving service from cache: {}", err);
2073 continue;
2074 }
2075 }
2076
2077 match sender.send(ServiceEvent::ServiceFound(
2078 ty_domain.to_string(),
2079 ptr.alias().to_string(),
2080 )) {
2081 Ok(()) => debug!("sent service found {}", ptr.alias()),
2082 Err(e) => {
2083 debug!("failed to send service found: {}", e);
2084 continue;
2085 }
2086 }
2087
2088 if let Some(event) = new_event {
2089 resolved.insert(ptr.alias().to_string());
2090 match sender.send(event) {
2091 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2092 Err(e) => debug!("failed to send service resolved: {}", e),
2093 }
2094 } else {
2095 unresolved.insert(ptr.alias().to_string());
2096 }
2097 }
2098 }
2099 }
2100
2101 for instance in resolved.drain() {
2102 self.pending_resolves.remove(&instance);
2103 self.resolved.insert(instance);
2104 }
2105
2106 for instance in unresolved.drain() {
2107 self.add_pending_resolve(instance);
2108 }
2109 }
2110
2111 fn query_cache_for_hostname(
2114 &mut self,
2115 hostname: &str,
2116 sender: Sender<HostnameResolutionEvent>,
2117 ) {
2118 let addresses_map = self.cache.get_addresses_for_host(hostname);
2119 for (name, addresses) in addresses_map {
2120 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2121 Ok(()) => trace!("sent hostname addresses found"),
2122 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2123 }
2124 }
2125 }
2126
2127 fn add_pending_resolve(&mut self, instance: String) {
2128 if !self.pending_resolves.contains(&instance) {
2129 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2130 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2131 self.pending_resolves.insert(instance);
2132 }
2133 }
2134
2135 fn resolve_service_from_cache(
2137 &self,
2138 ty_domain: &str,
2139 fullname: &str,
2140 ) -> Result<ResolvedService> {
2141 let now = current_time_millis();
2142 let mut resolved_service = ResolvedService {
2143 ty_domain: ty_domain.to_string(),
2144 sub_ty_domain: None,
2145 fullname: fullname.to_string(),
2146 host: String::new(),
2147 port: 0,
2148 addresses: HashSet::new(),
2149 txt_properties: TxtProperties::new(),
2150 };
2151
2152 if let Some(subtype) = self.cache.get_subtype(fullname) {
2154 trace!(
2155 "ty_domain: {} found subtype {} for instance: {}",
2156 ty_domain,
2157 subtype,
2158 fullname
2159 );
2160 if resolved_service.sub_ty_domain.is_none() {
2161 resolved_service.sub_ty_domain = Some(subtype.to_string());
2162 }
2163 }
2164
2165 if let Some(records) = self.cache.get_srv(fullname) {
2167 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2168 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2169 resolved_service.host = dns_srv.host().to_string();
2170 resolved_service.port = dns_srv.port();
2171 }
2172 }
2173 }
2174
2175 if let Some(records) = self.cache.get_txt(fullname) {
2177 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2178 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2179 resolved_service.txt_properties = dns_txt.text().into();
2180 }
2181 }
2182 }
2183
2184 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2186 for answer in records.iter() {
2187 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2188 if dns_a.expires_soon(now) {
2189 trace!(
2190 "Addr expired or expires soon: {}",
2191 dns_a.address().to_ip_addr()
2192 );
2193 } else {
2194 resolved_service.addresses.insert(dns_a.address());
2195 }
2196 }
2197 }
2198 }
2199
2200 Ok(resolved_service)
2201 }
2202
2203 fn handle_poller_events(&mut self, events: &mio::Events) {
2204 for ev in events.iter() {
2205 trace!("event received with key {:?}", ev.token());
2206 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2207 self.signal_sock_drain();
2209
2210 if let Err(e) = self.poller.registry().reregister(
2211 &mut self.signal_sock,
2212 ev.token(),
2213 mio::Interest::READABLE,
2214 ) {
2215 debug!("failed to modify poller for signal socket: {}", e);
2216 }
2217 continue; }
2219
2220 while self.handle_read(ev.token().0) {}
2222
2223 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2225 if let Err(e) = self.poller.registry().reregister(
2227 &mut self.ipv4_sock,
2228 ev.token(),
2229 mio::Interest::READABLE,
2230 ) {
2231 debug!("modify poller for IPv4 socket: {}", e);
2232 }
2233 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2234 if let Err(e) = self.poller.registry().reregister(
2236 &mut self.ipv6_sock,
2237 ev.token(),
2238 mio::Interest::READABLE,
2239 ) {
2240 debug!("modify poller for IPv6 socket: {}", e);
2241 }
2242 }
2243 }
2244 }
2245
2246 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2249 let now = current_time_millis();
2250
2251 let mut record_predicate = |record: &DnsRecordBox| {
2253 if !record.get_record().is_expired(now) {
2254 return true;
2255 }
2256
2257 debug!("record is expired, removing it from cache.");
2258 if self.cache.remove(record) {
2259 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2261 call_service_listener(
2262 &self.service_queriers,
2263 dns_ptr.get_name(),
2264 ServiceEvent::ServiceRemoved(
2265 dns_ptr.get_name().to_string(),
2266 dns_ptr.alias().to_string(),
2267 ),
2268 );
2269 }
2270 }
2271 false
2272 };
2273 msg.answers_mut().retain(&mut record_predicate);
2274 msg.authorities_mut().retain(&mut record_predicate);
2275 msg.additionals_mut().retain(&mut record_predicate);
2276
2277 self.conflict_handler(&msg, if_index);
2279
2280 let mut is_for_us = true; for answer in msg.answers() {
2287 if answer.get_type() == RRType::PTR {
2288 if self.service_queriers.contains_key(answer.get_name()) {
2289 is_for_us = true;
2290 break; } else {
2292 is_for_us = false;
2293 }
2294 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2295 let answer_lowercase = answer.get_name().to_lowercase();
2297 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2298 is_for_us = true;
2299 break; }
2301 }
2302 }
2303
2304 if self.accept_unsolicited {
2306 is_for_us = true;
2307 }
2308
2309 struct InstanceChange {
2311 ty: RRType, name: String, }
2314
2315 let mut changes = Vec::new();
2323 let mut timers = Vec::new();
2324 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2325 return;
2326 };
2327 for record in msg.all_records() {
2328 match self
2329 .cache
2330 .add_or_update(my_intf, record, &mut timers, is_for_us)
2331 {
2332 Some((dns_record, true)) => {
2333 timers.push(dns_record.record.get_record().get_expire_time());
2334 timers.push(dns_record.record.get_record().get_refresh_time());
2335
2336 let ty = dns_record.record.get_type();
2337 let name = dns_record.record.get_name();
2338
2339 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2341 if self.service_queriers.contains_key(name) {
2342 timers.push(dns_record.record.get_record().get_refresh_time());
2343 }
2344
2345 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2347 {
2348 debug!("calling listener with service found: {name}");
2349 call_service_listener(
2350 &self.service_queriers,
2351 name,
2352 ServiceEvent::ServiceFound(
2353 name.to_string(),
2354 dns_ptr.alias().to_string(),
2355 ),
2356 );
2357 changes.push(InstanceChange {
2358 ty,
2359 name: dns_ptr.alias().to_string(),
2360 });
2361 }
2362 } else {
2363 changes.push(InstanceChange {
2364 ty,
2365 name: name.to_string(),
2366 });
2367 }
2368 }
2369 Some((dns_record, false)) => {
2370 timers.push(dns_record.record.get_record().get_expire_time());
2371 timers.push(dns_record.record.get_record().get_refresh_time());
2372 }
2373 _ => {}
2374 }
2375 }
2376
2377 for t in timers {
2379 self.add_timer(t);
2380 }
2381
2382 for change in changes
2384 .iter()
2385 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2386 {
2387 let addr_map = self.cache.get_addresses_for_host(&change.name);
2388 for (name, addresses) in addr_map {
2389 call_hostname_resolution_listener(
2390 &self.hostname_resolvers,
2391 &change.name,
2392 HostnameResolutionEvent::AddressesFound(name, addresses),
2393 )
2394 }
2395 }
2396
2397 let mut updated_instances = HashSet::new();
2399 for update in changes {
2400 match update.ty {
2401 RRType::PTR | RRType::SRV | RRType::TXT => {
2402 updated_instances.insert(update.name);
2403 }
2404 RRType::A | RRType::AAAA => {
2405 let instances = self.cache.get_instances_on_host(&update.name);
2406 updated_instances.extend(instances);
2407 }
2408 _ => {}
2409 }
2410 }
2411
2412 self.resolve_updated_instances(&updated_instances);
2413 }
2414
2415 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2416 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2417 debug!("handle_response: no intf found for index {if_index}");
2418 return;
2419 };
2420
2421 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2422 return;
2423 };
2424
2425 for answer in msg.answers().iter() {
2426 let mut new_records = Vec::new();
2427
2428 let name = answer.get_name();
2429 let Some(probe) = dns_registry.probing.get_mut(name) else {
2430 continue;
2431 };
2432
2433 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2435 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2436 if answer_addr.interface_id.index != if_index {
2437 debug!(
2438 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2439 answer_addr, my_intf.name
2440 );
2441 continue;
2442 }
2443 }
2444
2445 let any_match = probe.records.iter().any(|r| {
2448 r.get_type() == answer.get_type()
2449 && r.get_class() == answer.get_class()
2450 && r.rrdata_match(answer.as_ref())
2451 });
2452 if any_match {
2453 continue; }
2455 }
2456
2457 probe.records.retain(|record| {
2458 if record.get_type() == answer.get_type()
2459 && record.get_class() == answer.get_class()
2460 && !record.rrdata_match(answer.as_ref())
2461 {
2462 debug!(
2463 "found conflict name: '{name}' record: {}: {} PEER: {}",
2464 record.get_type(),
2465 record.rdata_print(),
2466 answer.rdata_print()
2467 );
2468
2469 let mut new_record = record.clone();
2472 let new_name = match record.get_type() {
2473 RRType::A => hostname_change(name),
2474 RRType::AAAA => hostname_change(name),
2475 _ => name_change(name),
2476 };
2477 new_record.get_record_mut().set_new_name(new_name);
2478 new_records.push(new_record);
2479 return false; }
2481
2482 true
2483 });
2484
2485 let create_time = current_time_millis() + fastrand::u64(0..250);
2492
2493 let waiting_services = probe.waiting_services.clone();
2494
2495 for record in new_records {
2496 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2497 self.timers.push(Reverse(create_time));
2498 }
2499
2500 dns_registry.name_changes.insert(
2502 record.get_record().get_original_name().to_string(),
2503 record.get_name().to_string(),
2504 );
2505
2506 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2507 Some(p) => p,
2508 None => {
2509 let new_probe = dns_registry
2510 .probing
2511 .entry(record.get_name().to_string())
2512 .or_insert_with(|| {
2513 debug!("conflict handler: new probe of {}", record.get_name());
2514 Probe::new(create_time)
2515 });
2516 self.timers.push(Reverse(new_probe.next_send));
2517 new_probe
2518 }
2519 };
2520
2521 debug!(
2522 "insert record with new name '{}' {} into probe",
2523 record.get_name(),
2524 record.get_type()
2525 );
2526 new_probe.insert_record(record);
2527
2528 new_probe.waiting_services.extend(waiting_services.clone());
2529 }
2530 }
2531 }
2532
2533 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2540 let mut resolved: HashSet<String> = HashSet::new();
2541 let mut unresolved: HashSet<String> = HashSet::new();
2542 let mut removed_instances = HashMap::new();
2543
2544 let now = current_time_millis();
2545
2546 for (ty_domain, records) in self.cache.all_ptr().iter() {
2547 if !self.service_queriers.contains_key(ty_domain) {
2548 continue;
2550 }
2551
2552 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2553 if let Some(dns_ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2554 if updated_instances.contains(dns_ptr.alias()) {
2555 let mut instance_found = false;
2556 let mut new_event = None;
2557
2558 if let Ok(resolved) =
2559 self.resolve_service_from_cache(ty_domain, dns_ptr.alias())
2560 {
2561 debug!("resolve_updated_instances: from cache: {}", dns_ptr.alias());
2562 instance_found = true;
2563 if resolved.is_valid() {
2564 new_event = Some(ServiceEvent::ServiceResolved(Box::new(resolved)));
2565 } else {
2566 debug!("Resolved service is not valid: {}", dns_ptr.alias());
2567 }
2568 }
2569
2570 if instance_found {
2571 if let Some(event) = new_event {
2572 debug!("call queriers to resolve {}", dns_ptr.alias());
2573 resolved.insert(dns_ptr.alias().to_string());
2574 call_service_listener(&self.service_queriers, ty_domain, event);
2575 } else {
2576 if self.resolved.remove(dns_ptr.alias()) {
2577 removed_instances
2578 .entry(ty_domain.to_string())
2579 .or_insert_with(HashSet::new)
2580 .insert(dns_ptr.alias().to_string());
2581 }
2582 unresolved.insert(dns_ptr.alias().to_string());
2583 }
2584 }
2585 }
2586 }
2587 }
2588 }
2589
2590 for instance in resolved.drain() {
2591 self.pending_resolves.remove(&instance);
2592 self.resolved.insert(instance);
2593 }
2594
2595 for instance in unresolved.drain() {
2596 self.add_pending_resolve(instance);
2597 }
2598
2599 if !removed_instances.is_empty() {
2600 debug!(
2601 "resolve_updated_instances: removed {}",
2602 &removed_instances.len()
2603 );
2604 self.notify_service_removal(removed_instances);
2605 }
2606 }
2607
2608 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2610 let sock = if is_ipv4 {
2611 &self.ipv4_sock
2612 } else {
2613 &self.ipv6_sock
2614 };
2615 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2616
2617 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2620
2621 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2622 debug!("missing dns registry for intf {}", if_index);
2623 return;
2624 };
2625
2626 let Some(intf) = self.my_intfs.get(&if_index) else {
2627 return;
2628 };
2629
2630 for question in msg.questions().iter() {
2631 let qtype = question.entry_type();
2632
2633 if qtype == RRType::PTR {
2634 for service in self.my_services.values() {
2635 if service.get_status(if_index) != ServiceStatus::Announced {
2636 continue;
2637 }
2638
2639 if question.entry_name() == service.get_type()
2640 || service
2641 .get_subtype()
2642 .as_ref()
2643 .is_some_and(|v| v == question.entry_name())
2644 {
2645 add_answer_with_additionals(
2646 &mut out,
2647 &msg,
2648 service,
2649 intf,
2650 dns_registry,
2651 is_ipv4,
2652 );
2653 } else if question.entry_name() == META_QUERY {
2654 let ptr_added = out.add_answer(
2655 &msg,
2656 DnsPointer::new(
2657 question.entry_name(),
2658 RRType::PTR,
2659 CLASS_IN,
2660 service.get_other_ttl(),
2661 service.get_type().to_string(),
2662 ),
2663 );
2664 if !ptr_added {
2665 trace!("answer was not added for meta-query {:?}", &question);
2666 }
2667 }
2668 }
2669 } else {
2670 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2672 let probe_name = question.entry_name();
2673
2674 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2675 let now = current_time_millis();
2676
2677 if probe.start_time < now {
2681 let incoming_records: Vec<_> = msg
2682 .authorities()
2683 .iter()
2684 .filter(|r| r.get_name() == probe_name)
2685 .collect();
2686
2687 probe.tiebreaking(&incoming_records, now, probe_name);
2688 }
2689 }
2690 }
2691
2692 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2693 for service in self.my_services.values() {
2694 if service.get_status(if_index) != ServiceStatus::Announced {
2695 continue;
2696 }
2697
2698 let service_hostname =
2699 match dns_registry.name_changes.get(service.get_hostname()) {
2700 Some(new_name) => new_name,
2701 None => service.get_hostname(),
2702 };
2703
2704 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2705 let intf_addrs = if is_ipv4 {
2706 service.get_addrs_on_my_intf_v4(intf)
2707 } else {
2708 service.get_addrs_on_my_intf_v6(intf)
2709 };
2710 if intf_addrs.is_empty()
2711 && (qtype == RRType::A || qtype == RRType::AAAA)
2712 {
2713 let t = match qtype {
2714 RRType::A => "TYPE_A",
2715 RRType::AAAA => "TYPE_AAAA",
2716 _ => "invalid_type",
2717 };
2718 trace!(
2719 "Cannot find valid addrs for {} response on intf {:?}",
2720 t,
2721 &intf
2722 );
2723 return;
2724 }
2725 for address in intf_addrs {
2726 out.add_answer(
2727 &msg,
2728 DnsAddress::new(
2729 service_hostname,
2730 ip_address_rr_type(&address),
2731 CLASS_IN | CLASS_CACHE_FLUSH,
2732 service.get_host_ttl(),
2733 address,
2734 intf.into(),
2735 ),
2736 );
2737 }
2738 }
2739 }
2740 }
2741
2742 let query_name = question.entry_name().to_lowercase();
2743 let service_opt = self
2744 .my_services
2745 .iter()
2746 .find(|(k, _v)| {
2747 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2748 Some(new_name) => new_name,
2749 None => k,
2750 };
2751 service_name == &query_name
2752 })
2753 .map(|(_, v)| v);
2754
2755 let Some(service) = service_opt else {
2756 continue;
2757 };
2758
2759 if service.get_status(if_index) != ServiceStatus::Announced {
2760 continue;
2761 }
2762
2763 let intf_addrs = if is_ipv4 {
2764 service.get_addrs_on_my_intf_v4(intf)
2765 } else {
2766 service.get_addrs_on_my_intf_v6(intf)
2767 };
2768 if intf_addrs.is_empty() {
2769 debug!(
2770 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2771 &intf
2772 );
2773 continue;
2774 }
2775
2776 add_answer_of_service(
2777 &mut out,
2778 &msg,
2779 question.entry_name(),
2780 service,
2781 qtype,
2782 intf_addrs,
2783 );
2784 }
2785 }
2786
2787 if !out.answers_count() > 0 {
2788 out.set_id(msg.id());
2789 send_dns_outgoing(&out, intf, &sock.pktinfo);
2790
2791 let if_name = intf.name.clone();
2792
2793 self.increase_counter(Counter::Respond, 1);
2794 self.notify_monitors(DaemonEvent::Respond(if_name));
2795 }
2796
2797 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2798 }
2799
2800 fn increase_counter(&mut self, counter: Counter, count: i64) {
2802 let key = counter.to_string();
2803 match self.counters.get_mut(&key) {
2804 Some(v) => *v += count,
2805 None => {
2806 self.counters.insert(key, count);
2807 }
2808 }
2809 }
2810
2811 fn set_counter(&mut self, counter: Counter, count: i64) {
2813 let key = counter.to_string();
2814 self.counters.insert(key, count);
2815 }
2816
2817 fn signal_sock_drain(&self) {
2818 let mut signal_buf = [0; 1024];
2819
2820 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2822 trace!(
2823 "signal socket recvd: {}",
2824 String::from_utf8_lossy(&signal_buf[0..sz])
2825 );
2826 }
2827 }
2828
2829 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2830 self.retransmissions.push(ReRun { next_time, command });
2831 self.add_timer(next_time);
2832 }
2833
2834 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2837 for (ty_domain, sender) in self.service_queriers.iter() {
2838 if let Some(instances) = expired.get(ty_domain) {
2839 for instance_name in instances {
2840 let event = ServiceEvent::ServiceRemoved(
2841 ty_domain.to_string(),
2842 instance_name.to_string(),
2843 );
2844 match sender.send(event) {
2845 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2846 Err(e) => debug!("Failed to send event: {}", e),
2847 }
2848 }
2849 }
2850 }
2851 }
2852
2853 fn exec_command(&mut self, command: Command, repeating: bool) {
2857 trace!("exec_command: {:?} repeating: {}", &command, repeating);
2858 match command {
2859 Command::Browse(ty, next_delay, cache_only, listener) => {
2860 self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
2861 }
2862
2863 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2864 self.exec_command_resolve_hostname(
2865 repeating, hostname, next_delay, listener, timeout,
2866 );
2867 }
2868
2869 Command::Register(service_info) => {
2870 self.register_service(service_info);
2871 self.increase_counter(Counter::Register, 1);
2872 }
2873
2874 Command::RegisterResend(fullname, intf) => {
2875 trace!("register-resend service: {fullname} on {}", &intf);
2876 self.exec_command_register_resend(fullname, intf);
2877 }
2878
2879 Command::Unregister(fullname, resp_s) => {
2880 trace!("unregister service {} repeat {}", &fullname, &repeating);
2881 self.exec_command_unregister(repeating, fullname, resp_s);
2882 }
2883
2884 Command::UnregisterResend(packet, if_index, is_ipv4) => {
2885 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2886 }
2887
2888 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2889
2890 Command::StopResolveHostname(hostname) => {
2891 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2892 }
2893
2894 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2895
2896 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2897
2898 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2899 Ok(()) => trace!("Sent status to the client"),
2900 Err(e) => debug!("Failed to send status: {}", e),
2901 },
2902
2903 Command::Monitor(resp_s) => {
2904 self.monitors.push(resp_s);
2905 }
2906
2907 Command::SetOption(daemon_opt) => {
2908 self.process_set_option(daemon_opt);
2909 }
2910
2911 Command::GetOption(resp_s) => {
2912 let val = DaemonOptionVal {
2913 _service_name_len_max: self.service_name_len_max,
2914 ip_check_interval: self.ip_check_interval,
2915 };
2916 if let Err(e) = resp_s.send(val) {
2917 debug!("Failed to send options: {}", e);
2918 }
2919 }
2920
2921 Command::Verify(instance_fullname, timeout) => {
2922 self.exec_command_verify(instance_fullname, timeout, repeating);
2923 }
2924
2925 _ => {
2926 debug!("unexpected command: {:?}", &command);
2927 }
2928 }
2929 }
2930
2931 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2932 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2933 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2934 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2935 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2936 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2937 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2938 self.set_counter(Counter::Timer, self.timers.len() as i64);
2939
2940 let dns_registry_probe_count: usize = self
2941 .dns_registry_map
2942 .values()
2943 .map(|r| r.probing.len())
2944 .sum();
2945 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2946
2947 let dns_registry_active_count: usize = self
2948 .dns_registry_map
2949 .values()
2950 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2951 .sum();
2952 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2953
2954 let dns_registry_timer_count: usize = self
2955 .dns_registry_map
2956 .values()
2957 .map(|r| r.new_timers.len())
2958 .sum();
2959 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2960
2961 let dns_registry_name_change_count: usize = self
2962 .dns_registry_map
2963 .values()
2964 .map(|r| r.name_changes.len())
2965 .sum();
2966 self.set_counter(
2967 Counter::DnsRegistryNameChange,
2968 dns_registry_name_change_count as i64,
2969 );
2970
2971 if let Err(e) = resp_s.send(self.counters.clone()) {
2973 debug!("Failed to send metrics: {}", e);
2974 }
2975 }
2976
2977 fn exec_command_browse(
2978 &mut self,
2979 repeating: bool,
2980 ty: String,
2981 next_delay: u32,
2982 cache_only: bool,
2983 listener: Sender<ServiceEvent>,
2984 ) {
2985 let pretty_addrs: Vec<String> = self
2986 .my_intfs
2987 .iter()
2988 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
2989 .collect();
2990
2991 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2992 "{ty} on {} interfaces [{}]",
2993 pretty_addrs.len(),
2994 pretty_addrs.join(", ")
2995 ))) {
2996 debug!(
2997 "Failed to send SearchStarted({})(repeating:{}): {}",
2998 &ty, repeating, e
2999 );
3000 return;
3001 }
3002
3003 let now = current_time_millis();
3004 if !repeating {
3005 self.service_queriers.insert(ty.clone(), listener.clone());
3009
3010 self.query_cache_for_service(&ty, &listener, now);
3012 }
3013
3014 if cache_only {
3015 match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3017 Ok(()) => debug!("SearchStopped sent for {}", &ty),
3018 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3019 }
3020 return;
3021 }
3022
3023 self.send_query(&ty, RRType::PTR);
3024 self.increase_counter(Counter::Browse, 1);
3025
3026 let next_time = now + (next_delay * 1000) as u64;
3027 let max_delay = 60 * 60;
3028 let delay = cmp::min(next_delay * 2, max_delay);
3029 self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3030 }
3031
3032 fn exec_command_resolve_hostname(
3033 &mut self,
3034 repeating: bool,
3035 hostname: String,
3036 next_delay: u32,
3037 listener: Sender<HostnameResolutionEvent>,
3038 timeout: Option<u64>,
3039 ) {
3040 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3041 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3042 "{} on addrs {:?}",
3043 &hostname, &addr_list
3044 ))) {
3045 debug!(
3046 "Failed to send ResolveStarted({})(repeating:{}): {}",
3047 &hostname, repeating, e
3048 );
3049 return;
3050 }
3051 if !repeating {
3052 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3053 self.query_cache_for_hostname(&hostname, listener.clone());
3055 }
3056
3057 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3058 self.increase_counter(Counter::ResolveHostname, 1);
3059
3060 let now = current_time_millis();
3061 let next_time = now + u64::from(next_delay) * 1000;
3062 let max_delay = 60 * 60;
3063 let delay = cmp::min(next_delay * 2, max_delay);
3064
3065 if self
3067 .hostname_resolvers
3068 .get(&hostname)
3069 .and_then(|(_sender, timeout)| *timeout)
3070 .map(|timeout| next_time < timeout)
3071 .unwrap_or(true)
3072 {
3073 self.add_retransmission(
3074 next_time,
3075 Command::ResolveHostname(hostname, delay, listener, None),
3076 );
3077 }
3078 }
3079
3080 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3081 let pending_query = self.query_unresolved(&instance);
3082 let max_try = 3;
3083 if pending_query && try_count < max_try {
3084 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3087 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3088 }
3089 }
3090
3091 fn exec_command_unregister(
3092 &mut self,
3093 repeating: bool,
3094 fullname: String,
3095 resp_s: Sender<UnregisterStatus>,
3096 ) {
3097 let response = match self.my_services.remove_entry(&fullname) {
3098 None => {
3099 debug!("unregister: cannot find such service {}", &fullname);
3100 UnregisterStatus::NotFound
3101 }
3102 Some((_k, info)) => {
3103 let mut timers = Vec::new();
3104
3105 for (if_index, intf) in self.my_intfs.iter() {
3106 let packet = self.unregister_service(&info, intf, &self.ipv4_sock.pktinfo);
3107 if !repeating && !packet.is_empty() {
3109 let next_time = current_time_millis() + 120;
3110 self.retransmissions.push(ReRun {
3111 next_time,
3112 command: Command::UnregisterResend(packet, *if_index, true),
3113 });
3114 timers.push(next_time);
3115 }
3116
3117 let packet = self.unregister_service(&info, intf, &self.ipv6_sock.pktinfo);
3120 if !repeating && !packet.is_empty() {
3121 let next_time = current_time_millis() + 120;
3122 self.retransmissions.push(ReRun {
3123 next_time,
3124 command: Command::UnregisterResend(packet, *if_index, false),
3125 });
3126 timers.push(next_time);
3127 }
3128 }
3129
3130 for t in timers {
3131 self.add_timer(t);
3132 }
3133
3134 self.increase_counter(Counter::Unregister, 1);
3135 UnregisterStatus::OK
3136 }
3137 };
3138 if let Err(e) = resp_s.send(response) {
3139 debug!("unregister: failed to send response: {}", e);
3140 }
3141 }
3142
3143 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3144 let Some(intf) = self.my_intfs.get(&if_index) else {
3145 return;
3146 };
3147 let sock = if is_ipv4 {
3148 &self.ipv4_sock.pktinfo
3149 } else {
3150 &self.ipv6_sock.pktinfo
3151 };
3152
3153 let if_addr = if is_ipv4 {
3154 match intf.next_ifaddr_v4() {
3155 Some(addr) => addr,
3156 None => return,
3157 }
3158 } else {
3159 match intf.next_ifaddr_v6() {
3160 Some(addr) => addr,
3161 None => return,
3162 }
3163 };
3164
3165 debug!("UnregisterResend from {:?}", if_addr);
3166 multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, sock);
3167
3168 self.increase_counter(Counter::UnregisterResend, 1);
3169 }
3170
3171 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3172 match self.service_queriers.remove_entry(&ty_domain) {
3173 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3174 Some((ty, sender)) => {
3175 trace!("StopBrowse: removed queryer for {}", &ty);
3177 let mut i = 0;
3178 while i < self.retransmissions.len() {
3179 if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3180 if t == &ty {
3181 self.retransmissions.remove(i);
3182 trace!("StopBrowse: removed retransmission for {}", &ty);
3183 continue;
3184 }
3185 }
3186 i += 1;
3187 }
3188
3189 self.cache.remove_service_type(&ty_domain);
3191
3192 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3194 Ok(()) => trace!("Sent SearchStopped to the listener"),
3195 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3196 }
3197 }
3198 }
3199 }
3200
3201 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3202 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3203 trace!("StopResolve: removed queryer for {}", &host);
3205 let mut i = 0;
3206 while i < self.retransmissions.len() {
3207 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3208 if t == &host {
3209 self.retransmissions.remove(i);
3210 trace!("StopResolve: removed retransmission for {}", &host);
3211 continue;
3212 }
3213 }
3214 i += 1;
3215 }
3216
3217 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3219 Ok(()) => trace!("Sent SearchStopped to the listener"),
3220 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3221 }
3222 }
3223 }
3224
3225 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3226 let Some(info) = self.my_services.get_mut(&fullname) else {
3227 trace!("announce: cannot find such service {}", &fullname);
3228 return;
3229 };
3230
3231 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3232 return;
3233 };
3234
3235 let Some(intf) = self.my_intfs.get(&if_index) else {
3236 return;
3237 };
3238
3239 let announced_v4 =
3240 announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
3241 let announced_v6 =
3242 announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
3243
3244 if announced_v4 || announced_v6 {
3245 let mut hostname = info.get_hostname();
3246 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3247 hostname = new_name;
3248 }
3249 let service_name = match dns_registry.name_changes.get(&fullname) {
3250 Some(new_name) => new_name.to_string(),
3251 None => fullname,
3252 };
3253
3254 debug!("resend: announce service {service_name} on {}", intf.name);
3255
3256 notify_monitors(
3257 &mut self.monitors,
3258 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3259 );
3260 info.set_status(if_index, ServiceStatus::Announced);
3261 } else {
3262 debug!("register-resend should not fail");
3263 }
3264
3265 self.increase_counter(Counter::RegisterResend, 1);
3266 }
3267
3268 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3269 let now = current_time_millis();
3279 let expire_at = if repeating {
3280 None
3281 } else {
3282 Some(now + timeout.as_millis() as u64)
3283 };
3284
3285 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3287
3288 if !record_vec.is_empty() {
3289 let query_vec: Vec<(&str, RRType)> = record_vec
3290 .iter()
3291 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3292 .collect();
3293 self.send_query_vec(&query_vec);
3294
3295 if let Some(new_expire) = expire_at {
3296 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3300 }
3301 }
3302 }
3303
3304 fn refresh_active_services(&mut self) {
3306 let mut query_ptr_count = 0;
3307 let mut query_srv_count = 0;
3308 let mut new_timers = HashSet::new();
3309 let mut query_addr_count = 0;
3310
3311 for (ty_domain, _sender) in self.service_queriers.iter() {
3312 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3313 if !refreshed_timers.is_empty() {
3314 trace!("sending refresh query for PTR: {}", ty_domain);
3315 self.send_query(ty_domain, RRType::PTR);
3316 query_ptr_count += 1;
3317 new_timers.extend(refreshed_timers);
3318 }
3319
3320 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3321 for (instance, types) in instances {
3322 trace!("sending refresh query for: {}", &instance);
3323 let query_vec = types
3324 .into_iter()
3325 .map(|ty| (instance.as_str(), ty))
3326 .collect::<Vec<_>>();
3327 self.send_query_vec(&query_vec);
3328 query_srv_count += 1;
3329 }
3330 new_timers.extend(timers);
3331 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3332 for hostname in hostnames.iter() {
3333 trace!("sending refresh queries for A and AAAA: {}", hostname);
3334 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3335 query_addr_count += 2;
3336 }
3337 new_timers.extend(timers);
3338 }
3339
3340 for timer in new_timers {
3341 self.add_timer(timer);
3342 }
3343
3344 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3345 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3346 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3347 }
3348}
3349
3350fn add_answer_of_service(
3352 out: &mut DnsOutgoing,
3353 msg: &DnsIncoming,
3354 entry_name: &str,
3355 service: &ServiceInfo,
3356 qtype: RRType,
3357 intf_addrs: Vec<IpAddr>,
3358) {
3359 if qtype == RRType::SRV || qtype == RRType::ANY {
3360 out.add_answer(
3361 msg,
3362 DnsSrv::new(
3363 entry_name,
3364 CLASS_IN | CLASS_CACHE_FLUSH,
3365 service.get_host_ttl(),
3366 service.get_priority(),
3367 service.get_weight(),
3368 service.get_port(),
3369 service.get_hostname().to_string(),
3370 ),
3371 );
3372 }
3373
3374 if qtype == RRType::TXT || qtype == RRType::ANY {
3375 out.add_answer(
3376 msg,
3377 DnsTxt::new(
3378 entry_name,
3379 CLASS_IN | CLASS_CACHE_FLUSH,
3380 service.get_other_ttl(),
3381 service.generate_txt(),
3382 ),
3383 );
3384 }
3385
3386 if qtype == RRType::SRV {
3387 for address in intf_addrs {
3388 out.add_additional_answer(DnsAddress::new(
3389 service.get_hostname(),
3390 ip_address_rr_type(&address),
3391 CLASS_IN | CLASS_CACHE_FLUSH,
3392 service.get_host_ttl(),
3393 address,
3394 InterfaceId::default(),
3395 ));
3396 }
3397 }
3398}
3399
3400#[derive(Clone, Debug)]
3403#[non_exhaustive]
3404pub enum ServiceEvent {
3405 SearchStarted(String),
3407
3408 ServiceFound(String, String),
3410
3411 ServiceResolved(Box<ResolvedService>),
3413
3414 ServiceRemoved(String, String),
3416
3417 SearchStopped(String),
3419}
3420
3421#[derive(Clone, Debug)]
3424#[non_exhaustive]
3425pub enum HostnameResolutionEvent {
3426 SearchStarted(String),
3428 AddressesFound(String, HashSet<ScopedIp>),
3430 AddressesRemoved(String, HashSet<ScopedIp>),
3432 SearchTimeout(String),
3434 SearchStopped(String),
3436}
3437
3438#[derive(Clone, Debug)]
3441#[non_exhaustive]
3442pub enum DaemonEvent {
3443 Announce(String, String),
3445
3446 Error(Error),
3448
3449 IpAdd(IpAddr),
3451
3452 IpDel(IpAddr),
3454
3455 NameChange(DnsNameChange),
3458
3459 Respond(String),
3461}
3462
3463#[derive(Clone, Debug)]
3466pub struct DnsNameChange {
3467 pub original: String,
3469
3470 pub new_name: String,
3480
3481 pub rr_type: RRType,
3483
3484 pub intf_name: String,
3486}
3487
3488#[derive(Debug)]
3490enum Command {
3491 Browse(String, u32, bool, Sender<ServiceEvent>),
3493
3494 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
3499
3500 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3521
3522 GetStatus(Sender<DaemonStatus>),
3524
3525 Monitor(Sender<DaemonEvent>),
3527
3528 SetOption(DaemonOption),
3529
3530 GetOption(Sender<DaemonOptionVal>),
3531
3532 Verify(String, Duration),
3537
3538 Exit(Sender<DaemonStatus>),
3539}
3540
3541impl fmt::Display for Command {
3542 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3543 match self {
3544 Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3545 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3546 Self::Exit(_) => write!(f, "Command Exit"),
3547 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3548 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3549 Self::Monitor(_) => write!(f, "Command Monitor"),
3550 Self::Register(_) => write!(f, "Command Register"),
3551 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3552 Self::SetOption(_) => write!(f, "Command SetOption"),
3553 Self::GetOption(_) => write!(f, "Command GetOption"),
3554 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3555 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3556 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3557 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3558 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3559 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3560 }
3561 }
3562}
3563
3564struct DaemonOptionVal {
3565 _service_name_len_max: u8,
3566 ip_check_interval: u64,
3567}
3568
3569#[derive(Debug)]
3570enum DaemonOption {
3571 ServiceNameLenMax(u8),
3572 IpCheckInterval(u64),
3573 EnableInterface(Vec<IfKind>),
3574 DisableInterface(Vec<IfKind>),
3575 MulticastLoopV4(bool),
3576 MulticastLoopV6(bool),
3577 AcceptUnsolicited(bool),
3578 #[cfg(test)]
3579 TestDownInterface(String),
3580 #[cfg(test)]
3581 TestUpInterface(String),
3582}
3583
3584const DOMAIN_LEN: usize = "._tcp.local.".len();
3586
3587fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3589 if ty_domain.len() <= DOMAIN_LEN + 1 {
3590 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3592 }
3593
3594 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3596 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3597 }
3598 Ok(())
3599}
3600
3601fn check_domain_suffix(name: &str) -> Result<()> {
3603 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3604 return Err(e_fmt!(
3605 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3606 name
3607 ));
3608 }
3609
3610 Ok(())
3611}
3612
3613fn check_service_name(fullname: &str) -> Result<()> {
3621 check_domain_suffix(fullname)?;
3622
3623 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3624 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3625
3626 if &name[0..1] != "_" {
3627 return Err(e_fmt!("Service name must start with '_'"));
3628 }
3629
3630 let name = &name[1..];
3631
3632 if name.contains("--") {
3633 return Err(e_fmt!("Service name must not contain '--'"));
3634 }
3635
3636 if name.starts_with('-') || name.ends_with('-') {
3637 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3638 }
3639
3640 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3641 if ascii_count < 1 {
3642 return Err(e_fmt!(
3643 "Service name must contain at least one letter (eg: 'A-Za-z')"
3644 ));
3645 }
3646
3647 Ok(())
3648}
3649
3650fn check_hostname(hostname: &str) -> Result<()> {
3652 if !hostname.ends_with(".local.") {
3653 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3654 }
3655
3656 if hostname == ".local." {
3657 return Err(e_fmt!(
3658 "The part of the hostname before '.local.' cannot be empty"
3659 ));
3660 }
3661
3662 if hostname.len() > 255 {
3663 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3664 }
3665
3666 Ok(())
3667}
3668
3669fn call_service_listener(
3670 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3671 ty_domain: &str,
3672 event: ServiceEvent,
3673) {
3674 if let Some(listener) = listeners_map.get(ty_domain) {
3675 match listener.send(event) {
3676 Ok(()) => trace!("Sent event to listener successfully"),
3677 Err(e) => debug!("Failed to send event: {}", e),
3678 }
3679 }
3680}
3681
3682fn call_hostname_resolution_listener(
3683 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3684 hostname: &str,
3685 event: HostnameResolutionEvent,
3686) {
3687 let hostname_lower = hostname.to_lowercase();
3688 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3689 match listener.send(event) {
3690 Ok(()) => trace!("Sent event to listener successfully"),
3691 Err(e) => debug!("Failed to send event: {}", e),
3692 }
3693 }
3694}
3695
3696fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3700 if_addrs::get_if_addrs()
3701 .unwrap_or_default()
3702 .into_iter()
3703 .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3704 .collect()
3705}
3706
3707fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3708 let if_name = &my_intf.name;
3709
3710 let if_addr = if sock.domain() == Domain::IPV4 {
3711 match my_intf.next_ifaddr_v4() {
3712 Some(addr) => addr,
3713 None => return vec![],
3714 }
3715 } else {
3716 match my_intf.next_ifaddr_v6() {
3717 Some(addr) => addr,
3718 None => return vec![],
3719 }
3720 };
3721
3722 send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3723}
3724
3725fn send_dns_outgoing_impl(
3727 out: &DnsOutgoing,
3728 if_name: &str,
3729 if_index: u32,
3730 if_addr: &IfAddr,
3731 sock: &PktInfoUdpSocket,
3732) -> Vec<Vec<u8>> {
3733 let qtype = if out.is_query() {
3734 "query"
3735 } else {
3736 if out.answers_count() == 0 && out.additionals().is_empty() {
3737 return vec![]; }
3739 "response"
3740 };
3741 trace!(
3742 "send {}: {} questions {} answers {} authorities {} additional",
3743 qtype,
3744 out.questions().len(),
3745 out.answers_count(),
3746 out.authorities().len(),
3747 out.additionals().len()
3748 );
3749
3750 match if_addr.ip() {
3751 IpAddr::V4(ipv4) => {
3752 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3753 debug!(
3754 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3755 ipv4, e
3756 );
3757 return vec![]; }
3759 }
3760 IpAddr::V6(ipv6) => {
3761 if let Err(e) = sock.set_multicast_if_v6(if_index) {
3762 debug!(
3763 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3764 ipv6, e
3765 );
3766 return vec![]; }
3768 }
3769 }
3770
3771 let packet_list = out.to_data_on_wire();
3772 for packet in packet_list.iter() {
3773 multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3774 }
3775 packet_list
3776}
3777
3778fn multicast_on_intf(
3780 packet: &[u8],
3781 if_name: &str,
3782 if_index: u32,
3783 if_addr: &IfAddr,
3784 socket: &PktInfoUdpSocket,
3785) {
3786 if packet.len() > MAX_MSG_ABSOLUTE {
3787 debug!("Drop over-sized packet ({})", packet.len());
3788 return;
3789 }
3790
3791 let addr: SocketAddr = match if_addr {
3792 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3793 if_addrs::IfAddr::V6(_) => {
3794 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3795 sock.set_scope_id(if_index); sock.into()
3797 }
3798 };
3799
3800 let sock_addr = addr.into();
3802 match socket.send_to(packet, &sock_addr) {
3803 Ok(sz) => trace!(
3804 "sent out {} bytes on interface {} (idx {}) addr {}",
3805 sz,
3806 if_name,
3807 if_index,
3808 if_addr.ip()
3809 ),
3810 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3811 }
3812}
3813
3814fn valid_instance_name(name: &str) -> bool {
3818 name.split('.').count() >= 5
3819}
3820
3821fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3822 monitors.retain(|sender| {
3823 if let Err(e) = sender.try_send(event.clone()) {
3824 debug!("notify_monitors: try_send: {}", &e);
3825 if matches!(e, TrySendError::Disconnected(_)) {
3826 return false; }
3828 }
3829 true
3830 });
3831}
3832
3833fn prepare_announce(
3836 info: &ServiceInfo,
3837 intf: &MyIntf,
3838 dns_registry: &mut DnsRegistry,
3839 is_ipv4: bool,
3840) -> Option<DnsOutgoing> {
3841 let intf_addrs = if is_ipv4 {
3842 info.get_addrs_on_my_intf_v4(intf)
3843 } else {
3844 info.get_addrs_on_my_intf_v6(intf)
3845 };
3846
3847 if intf_addrs.is_empty() {
3848 debug!(
3849 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3850 &intf.name
3851 );
3852 return None;
3853 }
3854
3855 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3857 Some(new_name) => new_name,
3858 None => info.get_fullname(),
3859 };
3860
3861 debug!(
3862 "prepare to announce service {service_fullname} on {:?}",
3863 &intf_addrs
3864 );
3865
3866 let mut probing_count = 0;
3867 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3868 let create_time = current_time_millis() + fastrand::u64(0..250);
3869
3870 out.add_answer_at_time(
3871 DnsPointer::new(
3872 info.get_type(),
3873 RRType::PTR,
3874 CLASS_IN,
3875 info.get_other_ttl(),
3876 service_fullname.to_string(),
3877 ),
3878 0,
3879 );
3880
3881 if let Some(sub) = info.get_subtype() {
3882 trace!("Adding subdomain {}", sub);
3883 out.add_answer_at_time(
3884 DnsPointer::new(
3885 sub,
3886 RRType::PTR,
3887 CLASS_IN,
3888 info.get_other_ttl(),
3889 service_fullname.to_string(),
3890 ),
3891 0,
3892 );
3893 }
3894
3895 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3897 Some(new_name) => new_name.to_string(),
3898 None => info.get_hostname().to_string(),
3899 };
3900
3901 let mut srv = DnsSrv::new(
3902 info.get_fullname(),
3903 CLASS_IN | CLASS_CACHE_FLUSH,
3904 info.get_host_ttl(),
3905 info.get_priority(),
3906 info.get_weight(),
3907 info.get_port(),
3908 hostname,
3909 );
3910
3911 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3912 srv.get_record_mut().set_new_name(new_name.to_string());
3913 }
3914
3915 if !info.requires_probe()
3916 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3917 {
3918 out.add_answer_at_time(srv, 0);
3919 } else {
3920 probing_count += 1;
3921 }
3922
3923 let mut txt = DnsTxt::new(
3926 info.get_fullname(),
3927 CLASS_IN | CLASS_CACHE_FLUSH,
3928 info.get_other_ttl(),
3929 info.generate_txt(),
3930 );
3931
3932 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3933 txt.get_record_mut().set_new_name(new_name.to_string());
3934 }
3935
3936 if !info.requires_probe()
3937 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3938 {
3939 out.add_answer_at_time(txt, 0);
3940 } else {
3941 probing_count += 1;
3942 }
3943
3944 let hostname = info.get_hostname();
3947 for address in intf_addrs {
3948 let mut dns_addr = DnsAddress::new(
3949 hostname,
3950 ip_address_rr_type(&address),
3951 CLASS_IN | CLASS_CACHE_FLUSH,
3952 info.get_host_ttl(),
3953 address,
3954 intf.into(),
3955 );
3956
3957 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3958 dns_addr.get_record_mut().set_new_name(new_name.to_string());
3959 }
3960
3961 if !info.requires_probe()
3962 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3963 {
3964 out.add_answer_at_time(dns_addr, 0);
3965 } else {
3966 probing_count += 1;
3967 }
3968 }
3969
3970 if probing_count > 0 {
3971 return None;
3972 }
3973
3974 Some(out)
3975}
3976
3977fn announce_service_on_intf(
3980 dns_registry: &mut DnsRegistry,
3981 info: &ServiceInfo,
3982 intf: &MyIntf,
3983 sock: &PktInfoUdpSocket,
3984) -> bool {
3985 let is_ipv4 = sock.domain() == Domain::IPV4;
3986 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
3987 send_dns_outgoing(&out, intf, sock);
3988 return true;
3989 }
3990
3991 false
3992}
3993
3994fn name_change(original: &str) -> String {
4002 let mut parts: Vec<_> = original.split('.').collect();
4003 let Some(first_part) = parts.get_mut(0) else {
4004 return format!("{original} (2)");
4005 };
4006
4007 let mut new_name = format!("{first_part} (2)");
4008
4009 if let Some(paren_pos) = first_part.rfind(" (") {
4011 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4013 let absolute_end_pos = paren_pos + end_paren;
4014 if absolute_end_pos == first_part.len() - 1 {
4016 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4019 let base_name = &first_part[..paren_pos];
4020 new_name = format!("{} ({})", base_name, number + 1)
4021 }
4022 }
4023 }
4024 }
4025
4026 *first_part = &new_name;
4027 parts.join(".")
4028}
4029
4030fn hostname_change(original: &str) -> String {
4038 let mut parts: Vec<_> = original.split('.').collect();
4039 let Some(first_part) = parts.get_mut(0) else {
4040 return format!("{original}-2");
4041 };
4042
4043 let mut new_name = format!("{first_part}-2");
4044
4045 if let Some(hyphen_pos) = first_part.rfind('-') {
4047 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4049 let base_name = &first_part[..hyphen_pos];
4050 new_name = format!("{}-{}", base_name, number + 1);
4051 }
4052 }
4053
4054 *first_part = &new_name;
4055 parts.join(".")
4056}
4057
4058fn add_answer_with_additionals(
4059 out: &mut DnsOutgoing,
4060 msg: &DnsIncoming,
4061 service: &ServiceInfo,
4062 intf: &MyIntf,
4063 dns_registry: &DnsRegistry,
4064 is_ipv4: bool,
4065) {
4066 let intf_addrs = if is_ipv4 {
4067 service.get_addrs_on_my_intf_v4(intf)
4068 } else {
4069 service.get_addrs_on_my_intf_v6(intf)
4070 };
4071 if intf_addrs.is_empty() {
4072 trace!("No addrs on LAN of intf {:?}", intf);
4073 return;
4074 }
4075
4076 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4078 Some(new_name) => new_name,
4079 None => service.get_fullname(),
4080 };
4081
4082 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4083 Some(new_name) => new_name,
4084 None => service.get_hostname(),
4085 };
4086
4087 let ptr_added = out.add_answer(
4088 msg,
4089 DnsPointer::new(
4090 service.get_type(),
4091 RRType::PTR,
4092 CLASS_IN,
4093 service.get_other_ttl(),
4094 service_fullname.to_string(),
4095 ),
4096 );
4097
4098 if !ptr_added {
4099 trace!("answer was not added for msg {:?}", msg);
4100 return;
4101 }
4102
4103 if let Some(sub) = service.get_subtype() {
4104 trace!("Adding subdomain {}", sub);
4105 out.add_additional_answer(DnsPointer::new(
4106 sub,
4107 RRType::PTR,
4108 CLASS_IN,
4109 service.get_other_ttl(),
4110 service_fullname.to_string(),
4111 ));
4112 }
4113
4114 out.add_additional_answer(DnsSrv::new(
4117 service_fullname,
4118 CLASS_IN | CLASS_CACHE_FLUSH,
4119 service.get_host_ttl(),
4120 service.get_priority(),
4121 service.get_weight(),
4122 service.get_port(),
4123 hostname.to_string(),
4124 ));
4125
4126 out.add_additional_answer(DnsTxt::new(
4127 service_fullname,
4128 CLASS_IN | CLASS_CACHE_FLUSH,
4129 service.get_other_ttl(),
4130 service.generate_txt(),
4131 ));
4132
4133 for address in intf_addrs {
4134 out.add_additional_answer(DnsAddress::new(
4135 hostname,
4136 ip_address_rr_type(&address),
4137 CLASS_IN | CLASS_CACHE_FLUSH,
4138 service.get_host_ttl(),
4139 address,
4140 intf.into(),
4141 ));
4142 }
4143}
4144
4145fn check_probing(
4148 dns_registry: &mut DnsRegistry,
4149 timers: &mut BinaryHeap<Reverse<u64>>,
4150 now: u64,
4151) -> (DnsOutgoing, Vec<String>) {
4152 let mut expired_probes = Vec::new();
4153 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4154
4155 for (name, probe) in dns_registry.probing.iter_mut() {
4156 if now >= probe.next_send {
4157 if probe.expired(now) {
4158 expired_probes.push(name.clone());
4160 } else {
4161 out.add_question(name, RRType::ANY);
4162
4163 for record in probe.records.iter() {
4171 out.add_authority(record.clone());
4172 }
4173
4174 probe.update_next_send(now);
4175
4176 timers.push(Reverse(probe.next_send));
4178 }
4179 }
4180 }
4181
4182 (out, expired_probes)
4183}
4184
4185fn handle_expired_probes(
4190 expired_probes: Vec<String>,
4191 intf_name: &str,
4192 dns_registry: &mut DnsRegistry,
4193 monitors: &mut Vec<Sender<DaemonEvent>>,
4194) -> HashSet<String> {
4195 let mut waiting_services = HashSet::new();
4196
4197 for name in expired_probes {
4198 let Some(probe) = dns_registry.probing.remove(&name) else {
4199 continue;
4200 };
4201
4202 for record in probe.records.iter() {
4204 if let Some(new_name) = record.get_record().get_new_name() {
4205 dns_registry
4206 .name_changes
4207 .insert(name.clone(), new_name.to_string());
4208
4209 let event = DnsNameChange {
4210 original: record.get_record().get_original_name().to_string(),
4211 new_name: new_name.to_string(),
4212 rr_type: record.get_type(),
4213 intf_name: intf_name.to_string(),
4214 };
4215 debug!("Name change event: {:?}", &event);
4216 notify_monitors(monitors, DaemonEvent::NameChange(event));
4217 }
4218 }
4219
4220 debug!(
4222 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4223 probe.records.len(),
4224 probe.waiting_services.len(),
4225 );
4226
4227 if !probe.records.is_empty() {
4229 match dns_registry.active.get_mut(&name) {
4230 Some(records) => {
4231 records.extend(probe.records);
4232 }
4233 None => {
4234 dns_registry.active.insert(name, probe.records);
4235 }
4236 }
4237
4238 waiting_services.extend(probe.waiting_services);
4239 }
4240 }
4241
4242 waiting_services
4243}
4244
4245#[cfg(test)]
4246mod tests {
4247 use super::{
4248 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4249 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4250 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
4251 MDNS_PORT,
4252 };
4253 use crate::{
4254 dns_parser::{
4255 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4256 FLAGS_AA, FLAGS_QR_RESPONSE,
4257 },
4258 service_daemon::{add_answer_of_service, check_hostname},
4259 };
4260 use std::{
4261 net::{SocketAddr, SocketAddrV4},
4262 time::{Duration, SystemTime},
4263 };
4264 use test_log::test;
4265
4266 #[test]
4267 fn test_socketaddr_print() {
4268 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
4269 let print = format!("{}", addr);
4270 assert_eq!(print, "224.0.0.251:5353");
4271 }
4272
4273 #[test]
4274 fn test_instance_name() {
4275 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4276 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4277 assert!(!valid_instance_name("_printer._tcp.local."));
4278 }
4279
4280 #[test]
4281 fn test_check_service_name_length() {
4282 let result = check_service_name_length("_tcp", 100);
4283 assert!(result.is_err());
4284 if let Err(e) = result {
4285 println!("{}", e);
4286 }
4287 }
4288
4289 #[test]
4290 fn test_check_hostname() {
4291 for hostname in &[
4293 "my_host.local.",
4294 &("A".repeat(255 - ".local.".len()) + ".local."),
4295 ] {
4296 let result = check_hostname(hostname);
4297 assert!(result.is_ok());
4298 }
4299
4300 for hostname in &[
4302 "my_host.local",
4303 ".local.",
4304 &("A".repeat(256 - ".local.".len()) + ".local."),
4305 ] {
4306 let result = check_hostname(hostname);
4307 assert!(result.is_err());
4308 if let Err(e) = result {
4309 println!("{}", e);
4310 }
4311 }
4312 }
4313
4314 #[test]
4315 fn test_check_domain_suffix() {
4316 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4317 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4318 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4319 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4320 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4321 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4322 }
4323
4324 #[test]
4325 fn test_service_with_temporarily_invalidated_ptr() {
4326 let d = ServiceDaemon::new().expect("Failed to create daemon");
4328
4329 let service = "_test_inval_ptr._udp.local.";
4330 let host_name = "my_host_tmp_invalidated_ptr.local.";
4331 let intfs: Vec<_> = my_ip_interfaces(false);
4332 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4333 let port = 5201;
4334 let my_service =
4335 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4336 .expect("invalid service info")
4337 .enable_addr_auto();
4338 let result = d.register(my_service.clone());
4339 assert!(result.is_ok());
4340
4341 let browse_chan = d.browse(service).unwrap();
4343 let timeout = Duration::from_secs(2);
4344 let mut resolved = false;
4345
4346 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4347 match event {
4348 ServiceEvent::ServiceResolved(info) => {
4349 resolved = true;
4350 println!("Resolved a service of {}", &info.fullname);
4351 break;
4352 }
4353 e => {
4354 println!("Received event {:?}", e);
4355 }
4356 }
4357 }
4358
4359 assert!(resolved);
4360
4361 println!("Stopping browse of {}", service);
4362 d.stop_browse(service).unwrap();
4365
4366 let mut stopped = false;
4371 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4372 match event {
4373 ServiceEvent::SearchStopped(_) => {
4374 stopped = true;
4375 println!("Stopped browsing service");
4376 break;
4377 }
4378 e => {
4382 println!("Received event {:?}", e);
4383 }
4384 }
4385 }
4386
4387 assert!(stopped);
4388
4389 let invalidate_ptr_packet = DnsPointer::new(
4391 my_service.get_type(),
4392 RRType::PTR,
4393 CLASS_IN,
4394 0,
4395 my_service.get_fullname().to_string(),
4396 );
4397
4398 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4399 packet_buffer.add_additional_answer(invalidate_ptr_packet);
4400
4401 for intf in intfs {
4402 let sock = _new_socket_bind(&intf, true).unwrap();
4403 send_dns_outgoing_impl(
4404 &packet_buffer,
4405 &intf.name,
4406 intf.index.unwrap_or(0),
4407 &intf.addr,
4408 &sock.pktinfo,
4409 );
4410 }
4411
4412 println!(
4413 "Sent PTR record invalidation. Starting second browse for {}",
4414 service
4415 );
4416
4417 let browse_chan = d.browse(service).unwrap();
4419
4420 resolved = false;
4421 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4422 match event {
4423 ServiceEvent::ServiceResolved(info) => {
4424 resolved = true;
4425 println!("Resolved a service of {}", &info.fullname);
4426 break;
4427 }
4428 e => {
4429 println!("Received event {:?}", e);
4430 }
4431 }
4432 }
4433
4434 assert!(resolved);
4435 d.shutdown().unwrap();
4436 }
4437
4438 #[test]
4439 fn test_expired_srv() {
4440 let service_type = "_expired-srv._udp.local.";
4442 let instance = "test_instance";
4443 let host_name = "expired_srv_host.local.";
4444 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4445 .unwrap()
4446 .enable_addr_auto();
4447 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
4452
4453 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4455 let result = mdns_server.register(my_service);
4456 assert!(result.is_ok());
4457
4458 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4459 let browse_chan = mdns_client.browse(service_type).unwrap();
4460 let timeout = Duration::from_secs(2);
4461 let mut resolved = false;
4462
4463 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4464 match event {
4465 ServiceEvent::ServiceResolved(info) => {
4466 resolved = true;
4467 println!("Resolved a service of {}", &info.fullname);
4468 break;
4469 }
4470 _ => {}
4471 }
4472 }
4473
4474 assert!(resolved);
4475
4476 mdns_server.shutdown().unwrap();
4478
4479 let expire_timeout = Duration::from_secs(new_ttl as u64);
4481 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4482 match event {
4483 ServiceEvent::ServiceRemoved(service_type, full_name) => {
4484 println!("Service removed: {}: {}", &service_type, &full_name);
4485 break;
4486 }
4487 _ => {}
4488 }
4489 }
4490 }
4491
4492 #[test]
4493 fn test_hostname_resolution_address_removed() {
4494 let server = ServiceDaemon::new().expect("Failed to create server");
4496 let hostname = "addr_remove_host._tcp.local.";
4497 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4498 .iter()
4499 .find(|iface| iface.ip().is_ipv4())
4500 .map(|iface| iface.ip().into())
4501 .unwrap();
4502
4503 let mut my_service = ServiceInfo::new(
4504 "_host_res_test._tcp.local.",
4505 "my_instance",
4506 hostname,
4507 &service_ip_addr.to_ip_addr(),
4508 1234,
4509 None,
4510 )
4511 .expect("invalid service info");
4512
4513 let addr_ttl = 2;
4515 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
4518
4519 let client = ServiceDaemon::new().expect("Failed to create client");
4521 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4522 let resolved = loop {
4523 match event_receiver.recv() {
4524 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4525 assert!(found_hostname == hostname);
4526 assert!(addresses.contains(&service_ip_addr));
4527 println!("address found: {:?}", &addresses);
4528 break true;
4529 }
4530 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4531 Ok(_event) => {}
4532 Err(_) => break false,
4533 }
4534 };
4535
4536 assert!(resolved);
4537
4538 server.shutdown().unwrap();
4540
4541 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4543 let removed = loop {
4544 match event_receiver.recv_timeout(timeout) {
4545 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4546 assert!(removed_host == hostname);
4547 assert!(addresses.contains(&service_ip_addr));
4548
4549 println!(
4550 "address removed: hostname: {} addresses: {:?}",
4551 &hostname, &addresses
4552 );
4553 break true;
4554 }
4555 Ok(_event) => {}
4556 Err(_) => {
4557 break false;
4558 }
4559 }
4560 };
4561
4562 assert!(removed);
4563
4564 client.shutdown().unwrap();
4565 }
4566
4567 #[test]
4568 fn test_refresh_ptr() {
4569 let service_type = "_refresh-ptr._udp.local.";
4571 let instance = "test_instance";
4572 let host_name = "refresh_ptr_host.local.";
4573 let service_ip_addr = my_ip_interfaces(false)
4574 .iter()
4575 .find(|iface| iface.ip().is_ipv4())
4576 .map(|iface| iface.ip())
4577 .unwrap();
4578
4579 let mut my_service = ServiceInfo::new(
4580 service_type,
4581 instance,
4582 host_name,
4583 &service_ip_addr,
4584 5023,
4585 None,
4586 )
4587 .unwrap();
4588
4589 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4591
4592 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4594 let result = mdns_server.register(my_service);
4595 assert!(result.is_ok());
4596
4597 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4598 let browse_chan = mdns_client.browse(service_type).unwrap();
4599 let timeout = Duration::from_millis(1500); let mut resolved = false;
4601
4602 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4604 match event {
4605 ServiceEvent::ServiceResolved(info) => {
4606 resolved = true;
4607 println!("Resolved a service of {}", &info.fullname);
4608 break;
4609 }
4610 _ => {}
4611 }
4612 }
4613
4614 assert!(resolved);
4615
4616 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4618 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4619 println!("event: {:?}", &event);
4620 }
4621
4622 let metrics_chan = mdns_client.get_metrics().unwrap();
4624 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4625 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4626 assert_eq!(ptr_refresh_counter, 1);
4627 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4628 assert_eq!(srvtxt_refresh_counter, 1);
4629
4630 mdns_server.shutdown().unwrap();
4632 mdns_client.shutdown().unwrap();
4633 }
4634
4635 #[test]
4636 fn test_name_change() {
4637 assert_eq!(name_change("foo.local."), "foo (2).local.");
4638 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4639 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4640 assert_eq!(name_change("foo"), "foo (2)");
4641 assert_eq!(name_change("foo (2)"), "foo (3)");
4642 assert_eq!(name_change(""), " (2)");
4643
4644 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4649
4650 #[test]
4651 fn test_hostname_change() {
4652 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4653 assert_eq!(hostname_change("foo"), "foo-2");
4654 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4655 assert_eq!(hostname_change("foo-9"), "foo-10");
4656 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4657 }
4658
4659 #[test]
4660 fn test_add_answer_txt_ttl() {
4661 let service_type = "_test_add_answer._udp.local.";
4663 let instance = "test_instance";
4664 let host_name = "add_answer_host.local.";
4665 let service_intf = my_ip_interfaces(false)
4666 .into_iter()
4667 .find(|iface| iface.ip().is_ipv4())
4668 .unwrap();
4669 let service_ip_addr = service_intf.ip();
4670 let my_service = ServiceInfo::new(
4671 service_type,
4672 instance,
4673 host_name,
4674 &service_ip_addr,
4675 5023,
4676 None,
4677 )
4678 .unwrap();
4679
4680 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4682
4683 let mut dummy_data = out.to_data_on_wire();
4685 let interface_id = InterfaceId::from(&service_intf);
4686 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4687
4688 let if_addrs = vec![service_intf.ip()];
4690 add_answer_of_service(
4691 &mut out,
4692 &incoming,
4693 instance,
4694 &my_service,
4695 RRType::TXT,
4696 if_addrs,
4697 );
4698
4699 assert!(
4701 out.answers_count() > 0,
4702 "No answers added to the outgoing message"
4703 );
4704
4705 let answer = out._answers().first().unwrap();
4707 assert_eq!(answer.0.get_type(), RRType::TXT);
4708
4709 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4711 }
4712
4713 #[test]
4714 fn test_interface_flip() {
4715 let ty_domain = "_intf-flip._udp.local.";
4717 let host_name = "intf_flip.local.";
4718 let now = SystemTime::now()
4719 .duration_since(SystemTime::UNIX_EPOCH)
4720 .unwrap();
4721 let instance_name = now.as_micros().to_string(); let port = 5200;
4723
4724 let (ip_addr1, intf_name) = my_ip_interfaces(false)
4726 .iter()
4727 .find(|iface| iface.ip().is_ipv4())
4728 .map(|iface| (iface.ip(), iface.name.clone()))
4729 .unwrap();
4730
4731 println!("Using interface {} with IP {}", intf_name, ip_addr1);
4732
4733 let service1 =
4735 ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
4736 .expect("valid service info");
4737 let server1 = ServiceDaemon::new().expect("failed to start server");
4738 server1
4739 .register(service1)
4740 .expect("Failed to register service1");
4741
4742 std::thread::sleep(Duration::from_secs(2));
4744
4745 let client = ServiceDaemon::new().expect("failed to start client");
4747
4748 let receiver = client.browse(ty_domain).unwrap();
4749
4750 let timeout = Duration::from_secs(3);
4751 let mut got_data = false;
4752
4753 while let Ok(event) = receiver.recv_timeout(timeout) {
4754 match event {
4755 ServiceEvent::ServiceResolved(_) => {
4756 println!("Received ServiceResolved event");
4757 got_data = true;
4758 break;
4759 }
4760 _ => {}
4761 }
4762 }
4763
4764 assert!(got_data, "Should receive ServiceResolved event");
4765
4766 client.set_ip_check_interval(1).unwrap();
4768
4769 println!("Shutting down interface {}", &intf_name);
4771 client.test_down_interface(&intf_name).unwrap();
4772
4773 let mut got_removed = false;
4774
4775 while let Ok(event) = receiver.recv_timeout(timeout) {
4776 match event {
4777 ServiceEvent::ServiceRemoved(ty_domain, instance) => {
4778 got_removed = true;
4779 println!("removed: {ty_domain} : {instance}");
4780 break;
4781 }
4782 _ => {}
4783 }
4784 }
4785 assert!(got_removed, "Should receive ServiceRemoved event");
4786
4787 println!("Bringing up interface {}", &intf_name);
4788 client.test_up_interface(&intf_name).unwrap();
4789 let mut got_data = false;
4790 while let Ok(event) = receiver.recv_timeout(timeout) {
4791 match event {
4792 ServiceEvent::ServiceResolved(resolved) => {
4793 got_data = true;
4794 println!("Received ServiceResolved: {:?}", resolved);
4795 break;
4796 }
4797 _ => {}
4798 }
4799 }
4800 assert!(
4801 got_data,
4802 "Should receive ServiceResolved event after interface is back up"
4803 );
4804
4805 server1.shutdown().unwrap();
4806 client.shutdown().unwrap();
4807 }
4808
4809 #[test]
4810 fn test_cache_only() {
4811 let service_type = "_cache_only._udp.local.";
4813 let instance = "test_instance";
4814 let host_name = "cache_only_host.local.";
4815 let service_ip_addr = my_ip_interfaces(false)
4816 .iter()
4817 .find(|iface| iface.ip().is_ipv4())
4818 .map(|iface| iface.ip())
4819 .unwrap();
4820
4821 let mut my_service = ServiceInfo::new(
4822 service_type,
4823 instance,
4824 host_name,
4825 &service_ip_addr,
4826 5023,
4827 None,
4828 )
4829 .unwrap();
4830
4831 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4833
4834 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4835
4836 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4839 std::thread::sleep(Duration::from_secs(2));
4840
4841 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4843 let result = mdns_server.register(my_service);
4844 assert!(result.is_ok());
4845
4846 let timeout = Duration::from_millis(1500); let mut resolved = false;
4848
4849 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4851 match event {
4852 ServiceEvent::ServiceResolved(info) => {
4853 resolved = true;
4854 println!("Resolved a service of {}", &info.get_fullname());
4855 break;
4856 }
4857 _ => {}
4858 }
4859 }
4860
4861 assert!(resolved);
4862
4863 mdns_server.shutdown().unwrap();
4865 mdns_client.shutdown().unwrap();
4866 }
4867
4868 #[test]
4869 fn test_cache_only_unsolicited() {
4870 let service_type = "_cache_only._udp.local.";
4872 let instance = "test_instance";
4873 let host_name = "cache_only_host.local.";
4874 let service_ip_addr = my_ip_interfaces(false)
4875 .iter()
4876 .find(|iface| iface.ip().is_ipv4())
4877 .map(|iface| iface.ip())
4878 .unwrap();
4879
4880 let mut my_service = ServiceInfo::new(
4881 service_type,
4882 instance,
4883 host_name,
4884 &service_ip_addr,
4885 5023,
4886 None,
4887 )
4888 .unwrap();
4889
4890 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4892
4893 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4895 let result = mdns_server.register(my_service);
4896 assert!(result.is_ok());
4897
4898 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4899 mdns_client.accept_unsolicited(true).unwrap();
4900
4901 std::thread::sleep(Duration::from_secs(2));
4904 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4905 let timeout = Duration::from_millis(1500); let mut resolved = false;
4907
4908 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4910 match event {
4911 ServiceEvent::ServiceResolved(info) => {
4912 resolved = true;
4913 println!("Resolved a service of {}", &info.get_fullname());
4914 break;
4915 }
4916 _ => {}
4917 }
4918 }
4919
4920 assert!(resolved);
4921
4922 mdns_server.shutdown().unwrap();
4924 mdns_client.shutdown().unwrap();
4925 }
4926}