1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42 Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50 cmp::{self, Reverse},
51 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52 fmt, io,
53 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54 str, thread,
55 time::Duration,
56 vec,
57};
58
59pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78#[derive(Debug)]
80pub enum UnregisterStatus {
81 OK,
83 NotFound,
85}
86
87#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91 Running,
93
94 Shutdown,
96}
97
98#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102 Register,
103 RegisterResend,
104 Unregister,
105 UnregisterResend,
106 Browse,
107 ResolveHostname,
108 Respond,
109 CacheRefreshPTR,
110 CacheRefreshSrvTxt,
111 CacheRefreshAddr,
112 KnownAnswerSuppression,
113 CachedPTR,
114 CachedSRV,
115 CachedAddr,
116 CachedTxt,
117 CachedNSec,
118 CachedSubtype,
119 DnsRegistryProbe,
120 DnsRegistryActive,
121 DnsRegistryTimer,
122 DnsRegistryNameChange,
123 Timer,
124}
125
126impl fmt::Display for Counter {
127 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128 match self {
129 Self::Register => write!(f, "register"),
130 Self::RegisterResend => write!(f, "register-resend"),
131 Self::Unregister => write!(f, "unregister"),
132 Self::UnregisterResend => write!(f, "unregister-resend"),
133 Self::Browse => write!(f, "browse"),
134 Self::ResolveHostname => write!(f, "resolve-hostname"),
135 Self::Respond => write!(f, "respond"),
136 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140 Self::CachedPTR => write!(f, "cached-ptr"),
141 Self::CachedSRV => write!(f, "cached-srv"),
142 Self::CachedAddr => write!(f, "cached-addr"),
143 Self::CachedTxt => write!(f, "cached-txt"),
144 Self::CachedNSec => write!(f, "cached-nsec"),
145 Self::CachedSubtype => write!(f, "cached-subtype"),
146 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150 Self::Timer => write!(f, "timer"),
151 }
152 }
153}
154
155struct MyUdpSocket {
160 pktinfo: PktInfoUdpSocket,
163
164 mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171 let std_sock = pktinfo.try_clone_std()?;
172 let mio = MioUdpSocket::from_std(std_sock);
173
174 Ok(Self { pktinfo, mio })
175 }
176}
177
178impl Source for MyUdpSocket {
180 fn register(
181 &mut self,
182 registry: &Registry,
183 token: Token,
184 interests: Interest,
185 ) -> io::Result<()> {
186 self.mio.register(registry, token, interests)
187 }
188
189 fn reregister(
190 &mut self,
191 registry: &Registry,
192 token: Token,
193 interests: Interest,
194 ) -> io::Result<()> {
195 self.mio.reregister(registry, token, interests)
196 }
197
198 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199 self.mio.deregister(registry)
200 }
201}
202
203pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
215pub struct ServiceDaemon {
216 sender: Sender<Command>,
218
219 signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228 pub fn new() -> Result<Self> {
233 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237 let signal_sock = UdpSocket::bind(signal_addr)
238 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240 let signal_addr = signal_sock
242 .local_addr()
243 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245 signal_sock
247 .set_nonblocking(true)
248 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252 let (sender, receiver) = bounded(100);
253
254 let mio_sock = MioUdpSocket::from_std(signal_sock);
256 thread::Builder::new()
257 .name("mDNS_daemon".to_string())
258 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261 Ok(Self {
262 sender,
263 signal_addr,
264 })
265 }
266
267 fn send_cmd(&self, cmd: Command) -> Result<()> {
270 let cmd_name = cmd.to_string();
271
272 self.sender.try_send(cmd).map_err(|e| match e {
274 TrySendError::Full(_) => Error::Again,
275 e => e_fmt!("flume::channel::send failed: {}", e),
276 })?;
277
278 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280 let socket = UdpSocket::bind(addr)
281 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282 socket
283 .send_to(cmd_name.as_bytes(), self.signal_addr)
284 .map_err(|e| {
285 e_fmt!(
286 "signal socket send_to {} ({}) failed: {}",
287 self.signal_addr,
288 cmd_name,
289 e
290 )
291 })?;
292
293 Ok(())
294 }
295
296 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307 check_domain_suffix(service_type)?;
308
309 let (resp_s, resp_r) = bounded(10);
310 self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
311 Ok(resp_r)
312 }
313
314 pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
323 check_domain_suffix(service_type)?;
324
325 let (resp_s, resp_r) = bounded(10);
326 self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
327 Ok(resp_r)
328 }
329
330 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
335 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
336 }
337
338 pub fn resolve_hostname(
346 &self,
347 hostname: &str,
348 timeout: Option<u64>,
349 ) -> Result<Receiver<HostnameResolutionEvent>> {
350 check_hostname(hostname)?;
351 let (resp_s, resp_r) = bounded(10);
352 self.send_cmd(Command::ResolveHostname(
353 hostname.to_string(),
354 1,
355 resp_s,
356 timeout,
357 ))?;
358 Ok(resp_r)
359 }
360
361 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
366 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
367 }
368
369 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
377 check_service_name(service_info.get_fullname())?;
378 check_hostname(service_info.get_hostname())?;
379
380 self.send_cmd(Command::Register(service_info))
381 }
382
383 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
391 let (resp_s, resp_r) = bounded(1);
392 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
393 Ok(resp_r)
394 }
395
396 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
400 let (resp_s, resp_r) = bounded(100);
401 self.send_cmd(Command::Monitor(resp_s))?;
402 Ok(resp_r)
403 }
404
405 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
410 let (resp_s, resp_r) = bounded(1);
411 self.send_cmd(Command::Exit(resp_s))?;
412 Ok(resp_r)
413 }
414
415 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
421 let (resp_s, resp_r) = bounded(1);
422
423 if self.sender.is_disconnected() {
424 resp_s
425 .send(DaemonStatus::Shutdown)
426 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
427 } else {
428 self.send_cmd(Command::GetStatus(resp_s))?;
429 }
430
431 Ok(resp_r)
432 }
433
434 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
439 let (resp_s, resp_r) = bounded(1);
440 self.send_cmd(Command::GetMetrics(resp_s))?;
441 Ok(resp_r)
442 }
443
444 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
451 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
454 return Err(Error::Msg(format!(
455 "service name length max {len_max} is too large"
456 )));
457 }
458
459 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
460 }
461
462 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
468 let interval_in_millis = interval_in_secs as u64 * 1000;
469 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
470 interval_in_millis,
471 )))
472 }
473
474 pub fn get_ip_check_interval(&self) -> Result<u32> {
476 let (resp_s, resp_r) = bounded(1);
477 self.send_cmd(Command::GetOption(resp_s))?;
478
479 let option = resp_r
480 .recv_timeout(Duration::from_secs(10))
481 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
482 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
483 Ok(ip_check_interval_in_secs as u32)
484 }
485
486 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
493 let if_kind_vec = if_kind.into_vec();
494 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
495 if_kind_vec.kinds,
496 )))
497 }
498
499 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
506 let if_kind_vec = if_kind.into_vec();
507 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
508 if_kind_vec.kinds,
509 )))
510 }
511
512 pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
523 self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
524 }
525
526 #[cfg(test)]
527 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
528 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
529 ifname.to_string(),
530 )))
531 }
532
533 #[cfg(test)]
534 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
535 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
536 ifname.to_string(),
537 )))
538 }
539
540 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
556 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
557 }
558
559 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
575 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
576 }
577
578 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
591 self.send_cmd(Command::Verify(instance_fullname, timeout))
592 }
593
594 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
595 let mut zc = Zeroconf::new(signal_sock, poller);
596
597 if let Some(cmd) = zc.run(receiver) {
598 match cmd {
599 Command::Exit(resp_s) => {
600 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
603 debug!("exit: failed to send response of shutdown: {}", e);
604 }
605 }
606 _ => {
607 debug!("Unexpected command: {:?}", cmd);
608 }
609 }
610 }
611 }
612}
613
614fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
616 let intf_ip = &intf.ip();
619 match intf_ip {
620 IpAddr::V4(ip) => {
621 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
622 let sock = new_socket(addr.into(), true)?;
623
624 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
626 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
627
628 sock.set_multicast_if_v4(ip)
630 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
631
632 sock.set_multicast_ttl_v4(255)
637 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
638
639 if !should_loop {
640 sock.set_multicast_loop_v4(false)
641 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
642 }
643
644 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
646 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
647 for packet in test_packets {
648 sock.send_to(&packet, &multicast_addr)
649 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
650 }
651 MyUdpSocket::new(sock)
652 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
653 }
654 IpAddr::V6(ip) => {
655 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
656 let sock = new_socket(addr.into(), true)?;
657
658 let if_index = intf.index.unwrap_or(0);
659
660 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
662 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
663
664 sock.set_multicast_if_v6(if_index)
666 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
667
668 MyUdpSocket::new(sock)
673 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
674 }
675 }
676}
677
678fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
681 let domain = match addr {
682 SocketAddr::V4(_) => socket2::Domain::IPV4,
683 SocketAddr::V6(_) => socket2::Domain::IPV6,
684 };
685
686 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
687
688 fd.set_reuse_address(true)
689 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
690 #[cfg(unix)] fd.set_reuse_port(true)
692 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
693
694 if non_block {
695 fd.set_nonblocking(true)
696 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
697 }
698
699 fd.bind(&addr.into())
700 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
701
702 trace!("new socket bind to {}", &addr);
703 Ok(fd)
704}
705
706struct ReRun {
708 next_time: u64,
710 command: Command,
711}
712
713#[derive(Debug, Clone)]
717#[non_exhaustive]
718pub enum IfKind {
719 All,
721
722 IPv4,
724
725 IPv6,
727
728 Name(String),
730
731 Addr(IpAddr),
733
734 LoopbackV4,
739
740 LoopbackV6,
742}
743
744impl IfKind {
745 fn matches(&self, intf: &Interface) -> bool {
747 match self {
748 Self::All => true,
749 Self::IPv4 => intf.ip().is_ipv4(),
750 Self::IPv6 => intf.ip().is_ipv6(),
751 Self::Name(ifname) => ifname == &intf.name,
752 Self::Addr(addr) => addr == &intf.ip(),
753 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
754 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
755 }
756 }
757}
758
759impl From<&str> for IfKind {
762 fn from(val: &str) -> Self {
763 Self::Name(val.to_string())
764 }
765}
766
767impl From<&String> for IfKind {
768 fn from(val: &String) -> Self {
769 Self::Name(val.to_string())
770 }
771}
772
773impl From<IpAddr> for IfKind {
775 fn from(val: IpAddr) -> Self {
776 Self::Addr(val)
777 }
778}
779
780pub struct IfKindVec {
782 kinds: Vec<IfKind>,
783}
784
785pub trait IntoIfKindVec {
787 fn into_vec(self) -> IfKindVec;
788}
789
790impl<T: Into<IfKind>> IntoIfKindVec for T {
791 fn into_vec(self) -> IfKindVec {
792 let if_kind: IfKind = self.into();
793 IfKindVec {
794 kinds: vec![if_kind],
795 }
796 }
797}
798
799impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
800 fn into_vec(self) -> IfKindVec {
801 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
802 IfKindVec { kinds }
803 }
804}
805
806struct IfSelection {
808 if_kind: IfKind,
810
811 selected: bool,
813}
814
815struct Zeroconf {
817 my_intfs: HashMap<u32, MyIntf>,
819
820 ipv4_sock: MyUdpSocket,
822
823 ipv6_sock: MyUdpSocket,
825
826 my_services: HashMap<String, ServiceInfo>,
828
829 cache: DnsCache,
831
832 dns_registry_map: HashMap<u32, DnsRegistry>,
834
835 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
846
847 counters: Metrics,
848
849 poller: Poll,
851
852 monitors: Vec<Sender<DaemonEvent>>,
854
855 service_name_len_max: u8,
857
858 ip_check_interval: u64,
860
861 if_selections: Vec<IfSelection>,
863
864 signal_sock: MioUdpSocket,
866
867 timers: BinaryHeap<Reverse<u64>>,
873
874 status: DaemonStatus,
875
876 pending_resolves: HashSet<String>,
878
879 resolved: HashSet<String>,
881
882 multicast_loop_v4: bool,
883
884 multicast_loop_v6: bool,
885
886 accept_unsolicited: bool,
887
888 #[cfg(test)]
889 test_down_interfaces: HashSet<String>,
890}
891
892fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
894 let intf_ip = &intf.ip();
895 match intf_ip {
896 IpAddr::V4(ip) => {
897 debug!("join multicast group V4 on addr {}", ip);
899 my_sock
900 .join_multicast_v4(&GROUP_ADDR_V4, ip)
901 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
902 }
903 IpAddr::V6(ip) => {
904 let if_index = intf.index.unwrap_or(0);
905 debug!(
907 "join multicast group V6 on addr {} with index {}",
908 ip, if_index
909 );
910 my_sock
911 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
912 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
913 }
914 }
915 Ok(())
916}
917
918impl Zeroconf {
919 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
920 let my_ifaddrs = my_ip_interfaces(false);
922
923 let mut my_intfs = HashMap::new();
927 let mut dns_registry_map = HashMap::new();
928
929 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
932 let sock = new_socket(addr.into(), true).unwrap();
933
934 sock.set_multicast_ttl_v4(255)
939 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
940 .unwrap();
941
942 let ipv4_sock = MyUdpSocket::new(sock).unwrap();
944
945 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
946 let sock = new_socket(addr.into(), true).unwrap();
947
948 sock.set_multicast_hops_v6(255)
952 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
953 .unwrap();
954
955 let ipv6_sock = MyUdpSocket::new(sock).unwrap();
957
958 for intf in my_ifaddrs {
960 let sock = if intf.ip().is_ipv4() {
961 &ipv4_sock
962 } else {
963 &ipv6_sock
964 };
965
966 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
967 debug!(
968 "config socket to join multicast: {}: {e}. Skipped.",
969 &intf.ip()
970 );
971 }
972
973 let if_index = intf.index.unwrap_or(0);
974
975 dns_registry_map
977 .entry(if_index)
978 .or_insert_with(DnsRegistry::new);
979
980 my_intfs
981 .entry(if_index)
982 .and_modify(|v: &mut MyIntf| {
983 v.addrs.insert(intf.addr.clone());
984 })
985 .or_insert(MyIntf {
986 name: intf.name.clone(),
987 index: if_index,
988 addrs: HashSet::from([intf.addr]),
989 });
990 }
991
992 let monitors = Vec::new();
993 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
994 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
995
996 let timers = BinaryHeap::new();
997
998 let if_selections = vec![
1000 IfSelection {
1001 if_kind: IfKind::LoopbackV4,
1002 selected: false,
1003 },
1004 IfSelection {
1005 if_kind: IfKind::LoopbackV6,
1006 selected: false,
1007 },
1008 ];
1009
1010 let status = DaemonStatus::Running;
1011
1012 Self {
1013 my_intfs,
1014 ipv4_sock,
1015 ipv6_sock,
1016 my_services: HashMap::new(),
1018 cache: DnsCache::new(),
1019 dns_registry_map,
1020 hostname_resolvers: HashMap::new(),
1021 service_queriers: HashMap::new(),
1022 retransmissions: Vec::new(),
1023 counters: HashMap::new(),
1024 poller,
1025 monitors,
1026 service_name_len_max,
1027 ip_check_interval,
1028 if_selections,
1029 signal_sock,
1030 timers,
1031 status,
1032 pending_resolves: HashSet::new(),
1033 resolved: HashSet::new(),
1034 multicast_loop_v4: true,
1035 multicast_loop_v6: true,
1036 accept_unsolicited: false,
1037
1038 #[cfg(test)]
1039 test_down_interfaces: HashSet::new(),
1040 }
1041 }
1042
1043 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1052 if let Err(e) = self.poller.registry().register(
1054 &mut self.signal_sock,
1055 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1056 mio::Interest::READABLE,
1057 ) {
1058 debug!("failed to add signal socket to the poller: {}", e);
1059 return None;
1060 }
1061
1062 if let Err(e) = self.poller.registry().register(
1063 &mut self.ipv4_sock,
1064 mio::Token(IPV4_SOCK_EVENT_KEY),
1065 mio::Interest::READABLE,
1066 ) {
1067 debug!("failed to register ipv4 socket: {}", e);
1068 return None;
1069 }
1070
1071 if let Err(e) = self.poller.registry().register(
1072 &mut self.ipv6_sock,
1073 mio::Token(IPV6_SOCK_EVENT_KEY),
1074 mio::Interest::READABLE,
1075 ) {
1076 debug!("failed to register ipv6 socket: {}", e);
1077 return None;
1078 }
1079
1080 let mut next_ip_check = if self.ip_check_interval > 0 {
1082 current_time_millis() + self.ip_check_interval
1083 } else {
1084 0
1085 };
1086
1087 if next_ip_check > 0 {
1088 self.add_timer(next_ip_check);
1089 }
1090
1091 let mut events = mio::Events::with_capacity(1024);
1094 loop {
1095 let now = current_time_millis();
1096
1097 let earliest_timer = self.peek_earliest_timer();
1098 let timeout = earliest_timer.map(|timer| {
1099 let millis = if timer > now { timer - now } else { 1 };
1101 Duration::from_millis(millis)
1102 });
1103
1104 events.clear();
1106 match self.poller.poll(&mut events, timeout) {
1107 Ok(_) => self.handle_poller_events(&events),
1108 Err(e) => debug!("failed to select from sockets: {}", e),
1109 }
1110
1111 let now = current_time_millis();
1112
1113 self.pop_timers_till(now);
1115
1116 for hostname in self
1118 .hostname_resolvers
1119 .clone()
1120 .into_iter()
1121 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1122 .map(|(hostname, _)| hostname)
1123 {
1124 trace!("hostname resolver timeout for {}", &hostname);
1125 call_hostname_resolution_listener(
1126 &self.hostname_resolvers,
1127 &hostname,
1128 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1129 );
1130 call_hostname_resolution_listener(
1131 &self.hostname_resolvers,
1132 &hostname,
1133 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1134 );
1135 self.hostname_resolvers.remove(&hostname);
1136 }
1137
1138 while let Ok(command) = receiver.try_recv() {
1140 if matches!(command, Command::Exit(_)) {
1141 self.status = DaemonStatus::Shutdown;
1142 return Some(command);
1143 }
1144 self.exec_command(command, false);
1145 }
1146
1147 let mut i = 0;
1149 while i < self.retransmissions.len() {
1150 if now >= self.retransmissions[i].next_time {
1151 let rerun = self.retransmissions.remove(i);
1152 self.exec_command(rerun.command, true);
1153 } else {
1154 i += 1;
1155 }
1156 }
1157
1158 self.refresh_active_services();
1160
1161 let mut query_count = 0;
1163 for (hostname, _sender) in self.hostname_resolvers.iter() {
1164 for (hostname, ip_addr) in
1165 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1166 {
1167 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1168 query_count += 1;
1169 }
1170 }
1171
1172 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1173
1174 let now = current_time_millis();
1176
1177 let expired_services = self.cache.evict_expired_services(now);
1179 if !expired_services.is_empty() {
1180 debug!(
1181 "run: send {} service removal to listeners",
1182 expired_services.len()
1183 );
1184 self.notify_service_removal(expired_services);
1185 }
1186
1187 let expired_addrs = self.cache.evict_expired_addr(now);
1189 for (hostname, addrs) in expired_addrs {
1190 call_hostname_resolution_listener(
1191 &self.hostname_resolvers,
1192 &hostname,
1193 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1194 );
1195 let instances = self.cache.get_instances_on_host(&hostname);
1196 let instance_set: HashSet<String> = instances.into_iter().collect();
1197 self.resolve_updated_instances(&instance_set);
1198 }
1199
1200 self.probing_handler();
1202
1203 if now >= next_ip_check && next_ip_check > 0 {
1205 next_ip_check = now + self.ip_check_interval;
1206 self.add_timer(next_ip_check);
1207
1208 self.check_ip_changes();
1209 }
1210 }
1211 }
1212
1213 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1214 match daemon_opt {
1215 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1216 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1217 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1218 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1219 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1220 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1221 DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1222 #[cfg(test)]
1223 DaemonOption::TestDownInterface(ifname) => {
1224 self.test_down_interfaces.insert(ifname);
1225 }
1226 #[cfg(test)]
1227 DaemonOption::TestUpInterface(ifname) => {
1228 self.test_down_interfaces.remove(&ifname);
1229 }
1230 }
1231 }
1232
1233 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1234 debug!("enable_interface: {:?}", kinds);
1235 for if_kind in kinds {
1236 self.if_selections.push(IfSelection {
1237 if_kind,
1238 selected: true,
1239 });
1240 }
1241
1242 self.apply_intf_selections(my_ip_interfaces(true));
1243 }
1244
1245 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1246 debug!("disable_interface: {:?}", kinds);
1247 for if_kind in kinds {
1248 self.if_selections.push(IfSelection {
1249 if_kind,
1250 selected: false,
1251 });
1252 }
1253
1254 self.apply_intf_selections(my_ip_interfaces(true));
1255 }
1256
1257 fn set_multicast_loop_v4(&mut self, on: bool) {
1258 self.multicast_loop_v4 = on;
1259 self.ipv4_sock
1260 .pktinfo
1261 .set_multicast_loop_v4(on)
1262 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1263 .unwrap();
1264 }
1265
1266 fn set_multicast_loop_v6(&mut self, on: bool) {
1267 self.multicast_loop_v6 = on;
1268 self.ipv6_sock
1269 .pktinfo
1270 .set_multicast_loop_v6(on)
1271 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1272 .unwrap();
1273 }
1274
1275 fn set_accept_unsolicited(&mut self, accept: bool) {
1276 self.accept_unsolicited = accept;
1277 }
1278
1279 fn notify_monitors(&mut self, event: DaemonEvent) {
1280 self.monitors.retain(|sender| {
1282 if let Err(e) = sender.try_send(event.clone()) {
1283 debug!("notify_monitors: try_send: {}", &e);
1284 if matches!(e, TrySendError::Disconnected(_)) {
1285 return false; }
1287 }
1288 true
1289 });
1290 }
1291
1292 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1294 for (_, service_info) in self.my_services.iter_mut() {
1295 if service_info.is_addr_auto() {
1296 service_info.remove_ipaddr(addr);
1297 }
1298 }
1299 }
1300
1301 fn add_timer(&mut self, next_time: u64) {
1302 self.timers.push(Reverse(next_time));
1303 }
1304
1305 fn peek_earliest_timer(&self) -> Option<u64> {
1306 self.timers.peek().map(|Reverse(v)| *v)
1307 }
1308
1309 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1310 self.timers.pop().map(|Reverse(v)| v)
1311 }
1312
1313 fn pop_timers_till(&mut self, now: u64) {
1315 while let Some(Reverse(v)) = self.timers.peek() {
1316 if *v > now {
1317 break;
1318 }
1319 self.timers.pop();
1320 }
1321 }
1322
1323 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1325 let intf_count = interfaces.len();
1326 let mut intf_selections = vec![true; intf_count];
1327
1328 for selection in self.if_selections.iter() {
1330 for i in 0..intf_count {
1332 if selection.if_kind.matches(&interfaces[i]) {
1333 intf_selections[i] = selection.selected;
1334 }
1335 }
1336 }
1337
1338 let mut selected_addrs = HashSet::new();
1339 for i in 0..intf_count {
1340 if intf_selections[i] {
1341 selected_addrs.insert(interfaces[i].addr.ip());
1342 }
1343 }
1344
1345 selected_addrs
1346 }
1347
1348 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1353 let intf_count = interfaces.len();
1355 let mut intf_selections = vec![true; intf_count];
1356
1357 for selection in self.if_selections.iter() {
1359 for i in 0..intf_count {
1361 if selection.if_kind.matches(&interfaces[i]) {
1362 intf_selections[i] = selection.selected;
1363 }
1364 }
1365 }
1366
1367 for (idx, intf) in interfaces.into_iter().enumerate() {
1369 if intf_selections[idx] {
1370 self.add_interface(intf);
1372 } else {
1373 self.del_interface(&intf);
1375 }
1376 }
1377 }
1378
1379 fn del_ip(&mut self, ip: IpAddr) {
1380 self.del_addr_in_my_services(&ip);
1381 self.notify_monitors(DaemonEvent::IpDel(ip));
1382 }
1383
1384 fn check_ip_changes(&mut self) {
1386 let my_ifaddrs = my_ip_interfaces(true);
1388
1389 #[cfg(test)]
1390 let my_ifaddrs: Vec<_> = my_ifaddrs
1391 .into_iter()
1392 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1393 .collect();
1394
1395 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1396 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1397 let if_index = intf.index.unwrap_or(0);
1398 acc.entry(if_index).or_default().push(&intf.addr);
1399 acc
1400 });
1401
1402 let mut deleted_intfs = Vec::new();
1403 let mut deleted_ips = Vec::new();
1404
1405 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1406 let mut last_ipv4 = None;
1407 let mut last_ipv6 = None;
1408
1409 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1410 my_intf.addrs.retain(|addr| {
1411 if current_addrs.contains(&addr) {
1412 true
1413 } else {
1414 match addr.ip() {
1415 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1416 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1417 }
1418 deleted_ips.push(addr.ip());
1419 false
1420 }
1421 });
1422 if my_intf.addrs.is_empty() {
1423 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1424 }
1425 } else {
1426 debug!(
1428 "check_ip_changes: interface {} ({}) no longer exists, removing",
1429 my_intf.name, if_index
1430 );
1431 for addr in my_intf.addrs.iter() {
1432 match addr.ip() {
1433 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1434 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1435 }
1436 deleted_ips.push(addr.ip())
1437 }
1438 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1439 }
1440 }
1441
1442 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1443 debug!(
1444 "check_ip_changes: {} deleted ips {} deleted intfs",
1445 deleted_ips.len(),
1446 deleted_intfs.len()
1447 );
1448 }
1449
1450 for ip in deleted_ips {
1451 self.del_ip(ip);
1452 }
1453
1454 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1455 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1456 continue;
1457 };
1458
1459 if let Some(ipv4) = last_ipv4 {
1460 debug!("leave multicast for {ipv4}");
1461 if let Err(e) = self
1462 .ipv4_sock
1463 .pktinfo
1464 .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1465 {
1466 debug!("leave multicast group for addr {ipv4}: {e}");
1467 }
1468 }
1469
1470 if let Some(ipv6) = last_ipv6 {
1471 debug!("leave multicast for {ipv6}");
1472 if let Err(e) = self
1473 .ipv6_sock
1474 .pktinfo
1475 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1476 {
1477 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1478 }
1479 }
1480
1481 let intf_id = InterfaceId {
1483 name: my_intf.name.to_string(),
1484 index: my_intf.index,
1485 };
1486 let removed_instances = self.cache.remove_records_on_intf(intf_id);
1487 self.notify_service_removal(removed_instances);
1488 }
1489
1490 self.apply_intf_selections(my_ifaddrs);
1492 }
1493
1494 fn del_interface(&mut self, intf: &Interface) {
1495 let if_index = intf.index.unwrap_or(0);
1496 trace!(
1497 "del_interface: {} ({if_index}) addr {}",
1498 intf.name,
1499 intf.ip()
1500 );
1501
1502 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1503 debug!("del_interface: interface {} not found", intf.name);
1504 return;
1505 };
1506
1507 let mut ip_removed = false;
1508
1509 if my_intf.addrs.remove(&intf.addr) {
1510 ip_removed = true;
1511
1512 match intf.addr.ip() {
1513 IpAddr::V4(ipv4) => {
1514 if my_intf.next_ifaddr_v4().is_none() {
1515 if let Err(e) = self
1516 .ipv4_sock
1517 .pktinfo
1518 .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1519 {
1520 debug!("leave multicast group for addr {ipv4}: {e}");
1521 }
1522 }
1523 }
1524
1525 IpAddr::V6(ipv6) => {
1526 if my_intf.next_ifaddr_v6().is_none() {
1527 if let Err(e) = self
1528 .ipv6_sock
1529 .pktinfo
1530 .leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1531 {
1532 debug!("leave multicast group for addr {ipv6}: {e}");
1533 }
1534 }
1535 }
1536 }
1537
1538 if my_intf.addrs.is_empty() {
1539 debug!("del_interface: removing interface {}", intf.name);
1541 self.my_intfs.remove(&if_index);
1542 self.dns_registry_map.remove(&if_index);
1543 self.cache.remove_addrs_on_disabled_intf(if_index);
1544 }
1545 }
1546
1547 if ip_removed {
1548 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1550 self.del_addr_in_my_services(&intf.ip());
1552 }
1553 }
1554
1555 fn add_interface(&mut self, intf: Interface) {
1556 let sock = if intf.ip().is_ipv4() {
1557 &self.ipv4_sock
1558 } else {
1559 &self.ipv6_sock
1560 };
1561
1562 let if_index = intf.index.unwrap_or(0);
1563 let mut new_addr = false;
1564
1565 match self.my_intfs.entry(if_index) {
1566 Entry::Occupied(mut entry) => {
1567 let my_intf = entry.get_mut();
1569 if !my_intf.addrs.contains(&intf.addr) {
1570 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1571 debug!("add_interface: socket_config {}: {e}", &intf.name);
1572 }
1573 my_intf.addrs.insert(intf.addr.clone());
1574 new_addr = true;
1575 }
1576 }
1577 Entry::Vacant(entry) => {
1578 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1579 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1580 return;
1581 }
1582
1583 new_addr = true;
1584 let new_intf = MyIntf {
1585 name: intf.name.clone(),
1586 index: if_index,
1587 addrs: HashSet::from([intf.addr.clone()]),
1588 };
1589 entry.insert(new_intf);
1590 }
1591 }
1592
1593 if !new_addr {
1594 trace!("add_interface: interface {} already exists", &intf.name);
1595 return;
1596 }
1597
1598 debug!("add new interface {}: {}", intf.name, intf.ip());
1599
1600 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1601 debug!("add_interface: cannot find if_index {if_index}");
1602 return;
1603 };
1604
1605 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1606 Some(registry) => registry,
1607 None => self
1608 .dns_registry_map
1609 .entry(if_index)
1610 .or_insert_with(DnsRegistry::new),
1611 };
1612
1613 for (_, service_info) in self.my_services.iter_mut() {
1614 if service_info.is_addr_auto() {
1615 let new_ip = intf.ip();
1616 service_info.insert_ipaddr(new_ip);
1617
1618 if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1619 debug!(
1620 "Announce service {} on {}",
1621 service_info.get_fullname(),
1622 intf.ip()
1623 );
1624 service_info.set_status(if_index, ServiceStatus::Announced);
1625 } else {
1626 for timer in dns_registry.new_timers.drain(..) {
1627 self.timers.push(Reverse(timer));
1628 }
1629 service_info.set_status(if_index, ServiceStatus::Probing);
1630 }
1631 }
1632 }
1633
1634 let mut browse_reruns = Vec::new();
1636 let mut i = 0;
1637 while i < self.retransmissions.len() {
1638 if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1639 browse_reruns.push(self.retransmissions.remove(i));
1640 } else {
1641 i += 1;
1642 }
1643 }
1644
1645 for rerun in browse_reruns {
1646 self.exec_command(rerun.command, true);
1647 }
1648
1649 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1651 }
1652
1653 fn register_service(&mut self, mut info: ServiceInfo) {
1662 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1664 debug!("check_service_name_length: {}", &e);
1665 self.notify_monitors(DaemonEvent::Error(e));
1666 return;
1667 }
1668
1669 if info.is_addr_auto() {
1670 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1671 for addr in selected_addrs {
1672 info.insert_ipaddr(addr);
1673 }
1674 }
1675
1676 debug!("register service {:?}", &info);
1677
1678 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1679 if !outgoing_addrs.is_empty() {
1680 self.notify_monitors(DaemonEvent::Announce(
1681 info.get_fullname().to_string(),
1682 format!("{:?}", &outgoing_addrs),
1683 ));
1684 }
1685
1686 let service_fullname = info.get_fullname().to_lowercase();
1689 self.my_services.insert(service_fullname, info);
1690 }
1691
1692 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1695 let mut outgoing_addrs = Vec::new();
1696 let mut outgoing_intfs = HashSet::new();
1697
1698 for (if_index, intf) in self.my_intfs.iter() {
1699 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1700 Some(registry) => registry,
1701 None => self
1702 .dns_registry_map
1703 .entry(*if_index)
1704 .or_insert_with(DnsRegistry::new),
1705 };
1706
1707 let mut announced = false;
1708
1709 if announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo) {
1711 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1712 outgoing_addrs.push(addr.ip());
1713 }
1714 outgoing_intfs.insert(intf.index);
1715
1716 debug!(
1717 "Announce service IPv4 {} on {}",
1718 info.get_fullname(),
1719 intf.name
1720 );
1721 announced = true;
1722 }
1723
1724 if announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo) {
1725 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1726 outgoing_addrs.push(addr.ip());
1727 }
1728 outgoing_intfs.insert(intf.index);
1729
1730 debug!(
1731 "Announce service IPv6 {} on {}",
1732 info.get_fullname(),
1733 intf.name
1734 );
1735 announced = true;
1736 }
1737
1738 if announced {
1739 info.set_status(intf.index, ServiceStatus::Announced);
1740 } else {
1741 for timer in dns_registry.new_timers.drain(..) {
1742 self.timers.push(Reverse(timer));
1743 }
1744 info.set_status(*if_index, ServiceStatus::Probing);
1745 }
1746 }
1747
1748 let next_time = current_time_millis() + 1000;
1752 for if_index in outgoing_intfs {
1753 self.add_retransmission(
1754 next_time,
1755 Command::RegisterResend(info.get_fullname().to_string(), if_index),
1756 );
1757 }
1758
1759 outgoing_addrs
1760 }
1761
1762 fn probing_handler(&mut self) {
1764 let now = current_time_millis();
1765
1766 for (if_index, intf) in self.my_intfs.iter() {
1767 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1768 continue;
1769 };
1770
1771 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1772
1773 if !out.questions().is_empty() {
1775 trace!("sending out probing of questions: {:?}", out.questions());
1776 send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1777 send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1778 }
1779
1780 let waiting_services =
1782 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1783
1784 for service_name in waiting_services {
1785 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1787 if info.get_status(*if_index) == ServiceStatus::Announced {
1788 debug!("service {} already announced", info.get_fullname());
1789 continue;
1790 }
1791
1792 let announced_v4 =
1793 announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
1794 let announced_v6 =
1795 announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
1796
1797 if announced_v4 || announced_v6 {
1798 let next_time = now + 1000;
1799 let command =
1800 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1801 self.retransmissions.push(ReRun { next_time, command });
1802 self.timers.push(Reverse(next_time));
1803
1804 let fullname = match dns_registry.name_changes.get(&service_name) {
1805 Some(new_name) => new_name.to_string(),
1806 None => service_name.to_string(),
1807 };
1808
1809 let mut hostname = info.get_hostname();
1810 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1811 hostname = new_name;
1812 }
1813
1814 debug!("wake up: announce service {} on {}", fullname, intf.name);
1815 notify_monitors(
1816 &mut self.monitors,
1817 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1818 );
1819
1820 info.set_status(*if_index, ServiceStatus::Announced);
1821 }
1822 }
1823 }
1824 }
1825 }
1826
1827 fn unregister_service(
1828 &self,
1829 info: &ServiceInfo,
1830 intf: &MyIntf,
1831 sock: &PktInfoUdpSocket,
1832 ) -> Vec<u8> {
1833 let is_ipv4 = sock.domain() == Domain::IPV4;
1834
1835 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1836 out.add_answer_at_time(
1837 DnsPointer::new(
1838 info.get_type(),
1839 RRType::PTR,
1840 CLASS_IN,
1841 0,
1842 info.get_fullname().to_string(),
1843 ),
1844 0,
1845 );
1846
1847 if let Some(sub) = info.get_subtype() {
1848 trace!("Adding subdomain {}", sub);
1849 out.add_answer_at_time(
1850 DnsPointer::new(
1851 sub,
1852 RRType::PTR,
1853 CLASS_IN,
1854 0,
1855 info.get_fullname().to_string(),
1856 ),
1857 0,
1858 );
1859 }
1860
1861 out.add_answer_at_time(
1862 DnsSrv::new(
1863 info.get_fullname(),
1864 CLASS_IN | CLASS_CACHE_FLUSH,
1865 0,
1866 info.get_priority(),
1867 info.get_weight(),
1868 info.get_port(),
1869 info.get_hostname().to_string(),
1870 ),
1871 0,
1872 );
1873 out.add_answer_at_time(
1874 DnsTxt::new(
1875 info.get_fullname(),
1876 CLASS_IN | CLASS_CACHE_FLUSH,
1877 0,
1878 info.generate_txt(),
1879 ),
1880 0,
1881 );
1882
1883 let if_addrs = if is_ipv4 {
1884 info.get_addrs_on_my_intf_v4(intf)
1885 } else {
1886 info.get_addrs_on_my_intf_v6(intf)
1887 };
1888
1889 if if_addrs.is_empty() {
1890 return vec![];
1891 }
1892
1893 for address in if_addrs {
1894 out.add_answer_at_time(
1895 DnsAddress::new(
1896 info.get_hostname(),
1897 ip_address_rr_type(&address),
1898 CLASS_IN | CLASS_CACHE_FLUSH,
1899 0,
1900 address,
1901 intf.into(),
1902 ),
1903 0,
1904 );
1905 }
1906
1907 send_dns_outgoing(&out, intf, sock)
1909 .into_iter()
1910 .next()
1911 .unwrap_or_default()
1912 }
1913
1914 fn add_hostname_resolver(
1918 &mut self,
1919 hostname: String,
1920 listener: Sender<HostnameResolutionEvent>,
1921 timeout: Option<u64>,
1922 ) {
1923 let real_timeout = timeout.map(|t| current_time_millis() + t);
1924 self.hostname_resolvers
1925 .insert(hostname.to_lowercase(), (listener, real_timeout));
1926 if let Some(t) = real_timeout {
1927 self.add_timer(t);
1928 }
1929 }
1930
1931 fn send_query(&self, name: &str, qtype: RRType) {
1933 self.send_query_vec(&[(name, qtype)]);
1934 }
1935
1936 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1938 trace!("Sending query questions: {:?}", questions);
1939 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1940 let now = current_time_millis();
1941
1942 for (name, qtype) in questions {
1943 out.add_question(name, *qtype);
1944
1945 for record in self.cache.get_known_answers(name, *qtype, now) {
1946 trace!("add known answer: {:?}", record.record);
1954 let mut new_record = record.record.clone();
1955 new_record.get_record_mut().update_ttl(now);
1956 out.add_answer_box(new_record);
1957 }
1958 }
1959
1960 for (_, intf) in self.my_intfs.iter() {
1961 send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1962 send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1963 }
1964 }
1965
1966 fn handle_read(&mut self, event_key: usize) -> bool {
1971 let sock = match event_key {
1972 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
1973 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
1974 _ => {
1975 debug!("handle_read: unknown token {}", event_key);
1976 return false;
1977 }
1978 };
1979 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1980
1981 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
1988 Ok(sz) => sz,
1989 Err(e) => {
1990 if e.kind() != std::io::ErrorKind::WouldBlock {
1991 debug!("listening socket read failed: {}", e);
1992 }
1993 return false;
1994 }
1995 };
1996
1997 let pkt_if_index = pktinfo.if_index as u32;
1999 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2000 debug!(
2001 "handle_read: no interface found for pktinfo if_index: {}",
2002 pktinfo.if_index
2003 );
2004 return true; };
2006
2007 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
2010 Ok(msg) => {
2011 if msg.is_query() {
2012 self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2013 } else if msg.is_response() {
2014 self.handle_response(msg, pkt_if_index);
2015 } else {
2016 debug!("Invalid message: not query and not response");
2017 }
2018 }
2019 Err(e) => debug!("Invalid incoming DNS message: {}", e),
2020 }
2021
2022 true
2023 }
2024
2025 fn query_unresolved(&mut self, instance: &str) -> bool {
2027 if !valid_instance_name(instance) {
2028 trace!("instance name {} not valid", instance);
2029 return false;
2030 }
2031
2032 if let Some(records) = self.cache.get_srv(instance) {
2033 for record in records {
2034 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2035 if self.cache.get_addr(srv.host()).is_none() {
2036 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2037 return true;
2038 }
2039 }
2040 }
2041 } else {
2042 self.send_query(instance, RRType::ANY);
2043 return true;
2044 }
2045
2046 false
2047 }
2048
2049 fn query_cache_for_service(
2052 &mut self,
2053 ty_domain: &str,
2054 sender: &Sender<ServiceEvent>,
2055 now: u64,
2056 ) {
2057 let mut resolved: HashSet<String> = HashSet::new();
2058 let mut unresolved: HashSet<String> = HashSet::new();
2059
2060 if let Some(records) = self.cache.get_ptr(ty_domain) {
2061 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2062 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2063 let mut new_event = None;
2064 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2065 Ok(resolved_service) => {
2066 if resolved_service.is_valid() {
2067 debug!("Resolved service from cache: {}", ptr.alias());
2068 new_event =
2069 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2070 } else {
2071 debug!("Resolved service is not valid: {}", ptr.alias());
2072 }
2073 }
2074 Err(err) => {
2075 debug!("Error while resolving service from cache: {}", err);
2076 continue;
2077 }
2078 }
2079
2080 match sender.send(ServiceEvent::ServiceFound(
2081 ty_domain.to_string(),
2082 ptr.alias().to_string(),
2083 )) {
2084 Ok(()) => debug!("sent service found {}", ptr.alias()),
2085 Err(e) => {
2086 debug!("failed to send service found: {}", e);
2087 continue;
2088 }
2089 }
2090
2091 if let Some(event) = new_event {
2092 resolved.insert(ptr.alias().to_string());
2093 match sender.send(event) {
2094 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2095 Err(e) => debug!("failed to send service resolved: {}", e),
2096 }
2097 } else {
2098 unresolved.insert(ptr.alias().to_string());
2099 }
2100 }
2101 }
2102 }
2103
2104 for instance in resolved.drain() {
2105 self.pending_resolves.remove(&instance);
2106 self.resolved.insert(instance);
2107 }
2108
2109 for instance in unresolved.drain() {
2110 self.add_pending_resolve(instance);
2111 }
2112 }
2113
2114 fn query_cache_for_hostname(
2117 &mut self,
2118 hostname: &str,
2119 sender: Sender<HostnameResolutionEvent>,
2120 ) {
2121 let addresses_map = self.cache.get_addresses_for_host(hostname);
2122 for (name, addresses) in addresses_map {
2123 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2124 Ok(()) => trace!("sent hostname addresses found"),
2125 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2126 }
2127 }
2128 }
2129
2130 fn add_pending_resolve(&mut self, instance: String) {
2131 if !self.pending_resolves.contains(&instance) {
2132 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2133 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2134 self.pending_resolves.insert(instance);
2135 }
2136 }
2137
2138 fn resolve_service_from_cache(
2140 &self,
2141 ty_domain: &str,
2142 fullname: &str,
2143 ) -> Result<ResolvedService> {
2144 let now = current_time_millis();
2145 let mut resolved_service = ResolvedService {
2146 ty_domain: ty_domain.to_string(),
2147 sub_ty_domain: None,
2148 fullname: fullname.to_string(),
2149 host: String::new(),
2150 port: 0,
2151 addresses: HashSet::new(),
2152 txt_properties: TxtProperties::new(),
2153 };
2154
2155 if let Some(subtype) = self.cache.get_subtype(fullname) {
2157 trace!(
2158 "ty_domain: {} found subtype {} for instance: {}",
2159 ty_domain,
2160 subtype,
2161 fullname
2162 );
2163 if resolved_service.sub_ty_domain.is_none() {
2164 resolved_service.sub_ty_domain = Some(subtype.to_string());
2165 }
2166 }
2167
2168 if let Some(records) = self.cache.get_srv(fullname) {
2170 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2171 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2172 resolved_service.host = dns_srv.host().to_string();
2173 resolved_service.port = dns_srv.port();
2174 }
2175 }
2176 }
2177
2178 if let Some(records) = self.cache.get_txt(fullname) {
2180 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2181 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2182 resolved_service.txt_properties = dns_txt.text().into();
2183 }
2184 }
2185 }
2186
2187 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2189 for answer in records.iter() {
2190 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2191 if dns_a.expires_soon(now) {
2192 trace!(
2193 "Addr expired or expires soon: {}",
2194 dns_a.address().to_ip_addr()
2195 );
2196 } else {
2197 resolved_service.addresses.insert(dns_a.address());
2198 }
2199 }
2200 }
2201 }
2202
2203 Ok(resolved_service)
2204 }
2205
2206 fn handle_poller_events(&mut self, events: &mio::Events) {
2207 for ev in events.iter() {
2208 trace!("event received with key {:?}", ev.token());
2209 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2210 self.signal_sock_drain();
2212
2213 if let Err(e) = self.poller.registry().reregister(
2214 &mut self.signal_sock,
2215 ev.token(),
2216 mio::Interest::READABLE,
2217 ) {
2218 debug!("failed to modify poller for signal socket: {}", e);
2219 }
2220 continue; }
2222
2223 while self.handle_read(ev.token().0) {}
2225
2226 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2228 if let Err(e) = self.poller.registry().reregister(
2230 &mut self.ipv4_sock,
2231 ev.token(),
2232 mio::Interest::READABLE,
2233 ) {
2234 debug!("modify poller for IPv4 socket: {}", e);
2235 }
2236 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2237 if let Err(e) = self.poller.registry().reregister(
2239 &mut self.ipv6_sock,
2240 ev.token(),
2241 mio::Interest::READABLE,
2242 ) {
2243 debug!("modify poller for IPv6 socket: {}", e);
2244 }
2245 }
2246 }
2247 }
2248
2249 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2252 let now = current_time_millis();
2253
2254 let mut record_predicate = |record: &DnsRecordBox| {
2256 if !record.get_record().is_expired(now) {
2257 return true;
2258 }
2259
2260 debug!("record is expired, removing it from cache.");
2261 if self.cache.remove(record) {
2262 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2264 call_service_listener(
2265 &self.service_queriers,
2266 dns_ptr.get_name(),
2267 ServiceEvent::ServiceRemoved(
2268 dns_ptr.get_name().to_string(),
2269 dns_ptr.alias().to_string(),
2270 ),
2271 );
2272 }
2273 }
2274 false
2275 };
2276 msg.answers_mut().retain(&mut record_predicate);
2277 msg.authorities_mut().retain(&mut record_predicate);
2278 msg.additionals_mut().retain(&mut record_predicate);
2279
2280 self.conflict_handler(&msg, if_index);
2282
2283 let mut is_for_us = true; for answer in msg.answers() {
2290 if answer.get_type() == RRType::PTR {
2291 if self.service_queriers.contains_key(answer.get_name()) {
2292 is_for_us = true;
2293 break; } else {
2295 is_for_us = false;
2296 }
2297 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2298 let answer_lowercase = answer.get_name().to_lowercase();
2300 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2301 is_for_us = true;
2302 break; }
2304 }
2305 }
2306
2307 if self.accept_unsolicited {
2309 is_for_us = true;
2310 }
2311
2312 struct InstanceChange {
2314 ty: RRType, name: String, }
2317
2318 let mut changes = Vec::new();
2326 let mut timers = Vec::new();
2327 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2328 return;
2329 };
2330 for record in msg.all_records() {
2331 match self
2332 .cache
2333 .add_or_update(my_intf, record, &mut timers, is_for_us)
2334 {
2335 Some((dns_record, true)) => {
2336 timers.push(dns_record.record.get_record().get_expire_time());
2337 timers.push(dns_record.record.get_record().get_refresh_time());
2338
2339 let ty = dns_record.record.get_type();
2340 let name = dns_record.record.get_name();
2341
2342 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2344 if self.service_queriers.contains_key(name) {
2345 timers.push(dns_record.record.get_record().get_refresh_time());
2346 }
2347
2348 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2350 {
2351 debug!("calling listener with service found: {name}");
2352 call_service_listener(
2353 &self.service_queriers,
2354 name,
2355 ServiceEvent::ServiceFound(
2356 name.to_string(),
2357 dns_ptr.alias().to_string(),
2358 ),
2359 );
2360 changes.push(InstanceChange {
2361 ty,
2362 name: dns_ptr.alias().to_string(),
2363 });
2364 }
2365 } else {
2366 changes.push(InstanceChange {
2367 ty,
2368 name: name.to_string(),
2369 });
2370 }
2371 }
2372 Some((dns_record, false)) => {
2373 timers.push(dns_record.record.get_record().get_expire_time());
2374 timers.push(dns_record.record.get_record().get_refresh_time());
2375 }
2376 _ => {}
2377 }
2378 }
2379
2380 for t in timers {
2382 self.add_timer(t);
2383 }
2384
2385 for change in changes
2387 .iter()
2388 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2389 {
2390 let addr_map = self.cache.get_addresses_for_host(&change.name);
2391 for (name, addresses) in addr_map {
2392 call_hostname_resolution_listener(
2393 &self.hostname_resolvers,
2394 &change.name,
2395 HostnameResolutionEvent::AddressesFound(name, addresses),
2396 )
2397 }
2398 }
2399
2400 let mut updated_instances = HashSet::new();
2402 for update in changes {
2403 match update.ty {
2404 RRType::PTR | RRType::SRV | RRType::TXT => {
2405 updated_instances.insert(update.name);
2406 }
2407 RRType::A | RRType::AAAA => {
2408 let instances = self.cache.get_instances_on_host(&update.name);
2409 updated_instances.extend(instances);
2410 }
2411 _ => {}
2412 }
2413 }
2414
2415 self.resolve_updated_instances(&updated_instances);
2416 }
2417
2418 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2419 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2420 debug!("handle_response: no intf found for index {if_index}");
2421 return;
2422 };
2423
2424 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2425 return;
2426 };
2427
2428 for answer in msg.answers().iter() {
2429 let mut new_records = Vec::new();
2430
2431 let name = answer.get_name();
2432 let Some(probe) = dns_registry.probing.get_mut(name) else {
2433 continue;
2434 };
2435
2436 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2438 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2439 if answer_addr.interface_id.index != if_index {
2440 debug!(
2441 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2442 answer_addr, my_intf.name
2443 );
2444 continue;
2445 }
2446 }
2447
2448 let any_match = probe.records.iter().any(|r| {
2451 r.get_type() == answer.get_type()
2452 && r.get_class() == answer.get_class()
2453 && r.rrdata_match(answer.as_ref())
2454 });
2455 if any_match {
2456 continue; }
2458 }
2459
2460 probe.records.retain(|record| {
2461 if record.get_type() == answer.get_type()
2462 && record.get_class() == answer.get_class()
2463 && !record.rrdata_match(answer.as_ref())
2464 {
2465 debug!(
2466 "found conflict name: '{name}' record: {}: {} PEER: {}",
2467 record.get_type(),
2468 record.rdata_print(),
2469 answer.rdata_print()
2470 );
2471
2472 let mut new_record = record.clone();
2475 let new_name = match record.get_type() {
2476 RRType::A => hostname_change(name),
2477 RRType::AAAA => hostname_change(name),
2478 _ => name_change(name),
2479 };
2480 new_record.get_record_mut().set_new_name(new_name);
2481 new_records.push(new_record);
2482 return false; }
2484
2485 true
2486 });
2487
2488 let create_time = current_time_millis() + fastrand::u64(0..250);
2495
2496 let waiting_services = probe.waiting_services.clone();
2497
2498 for record in new_records {
2499 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2500 self.timers.push(Reverse(create_time));
2501 }
2502
2503 dns_registry.name_changes.insert(
2505 record.get_record().get_original_name().to_string(),
2506 record.get_name().to_string(),
2507 );
2508
2509 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2510 Some(p) => p,
2511 None => {
2512 let new_probe = dns_registry
2513 .probing
2514 .entry(record.get_name().to_string())
2515 .or_insert_with(|| {
2516 debug!("conflict handler: new probe of {}", record.get_name());
2517 Probe::new(create_time)
2518 });
2519 self.timers.push(Reverse(new_probe.next_send));
2520 new_probe
2521 }
2522 };
2523
2524 debug!(
2525 "insert record with new name '{}' {} into probe",
2526 record.get_name(),
2527 record.get_type()
2528 );
2529 new_probe.insert_record(record);
2530
2531 new_probe.waiting_services.extend(waiting_services.clone());
2532 }
2533 }
2534 }
2535
2536 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2543 let mut resolved: HashSet<String> = HashSet::new();
2544 let mut unresolved: HashSet<String> = HashSet::new();
2545 let mut removed_instances = HashMap::new();
2546
2547 let now = current_time_millis();
2548
2549 for (ty_domain, records) in self.cache.all_ptr().iter() {
2550 if !self.service_queriers.contains_key(ty_domain) {
2551 continue;
2553 }
2554
2555 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2556 if let Some(dns_ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2557 if updated_instances.contains(dns_ptr.alias()) {
2558 let mut instance_found = false;
2559 let mut new_event = None;
2560
2561 if let Ok(resolved) =
2562 self.resolve_service_from_cache(ty_domain, dns_ptr.alias())
2563 {
2564 debug!("resolve_updated_instances: from cache: {}", dns_ptr.alias());
2565 instance_found = true;
2566 if resolved.is_valid() {
2567 new_event = Some(ServiceEvent::ServiceResolved(Box::new(resolved)));
2568 } else {
2569 debug!("Resolved service is not valid: {}", dns_ptr.alias());
2570 }
2571 }
2572
2573 if instance_found {
2574 if let Some(event) = new_event {
2575 debug!("call queriers to resolve {}", dns_ptr.alias());
2576 resolved.insert(dns_ptr.alias().to_string());
2577 call_service_listener(&self.service_queriers, ty_domain, event);
2578 } else {
2579 if self.resolved.remove(dns_ptr.alias()) {
2580 removed_instances
2581 .entry(ty_domain.to_string())
2582 .or_insert_with(HashSet::new)
2583 .insert(dns_ptr.alias().to_string());
2584 }
2585 unresolved.insert(dns_ptr.alias().to_string());
2586 }
2587 }
2588 }
2589 }
2590 }
2591 }
2592
2593 for instance in resolved.drain() {
2594 self.pending_resolves.remove(&instance);
2595 self.resolved.insert(instance);
2596 }
2597
2598 for instance in unresolved.drain() {
2599 self.add_pending_resolve(instance);
2600 }
2601
2602 if !removed_instances.is_empty() {
2603 debug!(
2604 "resolve_updated_instances: removed {}",
2605 &removed_instances.len()
2606 );
2607 self.notify_service_removal(removed_instances);
2608 }
2609 }
2610
2611 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2613 let sock = if is_ipv4 {
2614 &self.ipv4_sock
2615 } else {
2616 &self.ipv6_sock
2617 };
2618 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2619
2620 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2623
2624 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2625 debug!("missing dns registry for intf {}", if_index);
2626 return;
2627 };
2628
2629 let Some(intf) = self.my_intfs.get(&if_index) else {
2630 return;
2631 };
2632
2633 for question in msg.questions().iter() {
2634 let qtype = question.entry_type();
2635
2636 if qtype == RRType::PTR {
2637 for service in self.my_services.values() {
2638 if service.get_status(if_index) != ServiceStatus::Announced {
2639 continue;
2640 }
2641
2642 if question.entry_name() == service.get_type()
2643 || service
2644 .get_subtype()
2645 .as_ref()
2646 .is_some_and(|v| v == question.entry_name())
2647 {
2648 add_answer_with_additionals(
2649 &mut out,
2650 &msg,
2651 service,
2652 intf,
2653 dns_registry,
2654 is_ipv4,
2655 );
2656 } else if question.entry_name() == META_QUERY {
2657 let ptr_added = out.add_answer(
2658 &msg,
2659 DnsPointer::new(
2660 question.entry_name(),
2661 RRType::PTR,
2662 CLASS_IN,
2663 service.get_other_ttl(),
2664 service.get_type().to_string(),
2665 ),
2666 );
2667 if !ptr_added {
2668 trace!("answer was not added for meta-query {:?}", &question);
2669 }
2670 }
2671 }
2672 } else {
2673 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2675 let probe_name = question.entry_name();
2676
2677 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2678 let now = current_time_millis();
2679
2680 if probe.start_time < now {
2684 let incoming_records: Vec<_> = msg
2685 .authorities()
2686 .iter()
2687 .filter(|r| r.get_name() == probe_name)
2688 .collect();
2689
2690 probe.tiebreaking(&incoming_records, now, probe_name);
2691 }
2692 }
2693 }
2694
2695 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2696 for service in self.my_services.values() {
2697 if service.get_status(if_index) != ServiceStatus::Announced {
2698 continue;
2699 }
2700
2701 let service_hostname =
2702 match dns_registry.name_changes.get(service.get_hostname()) {
2703 Some(new_name) => new_name,
2704 None => service.get_hostname(),
2705 };
2706
2707 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2708 let intf_addrs = if is_ipv4 {
2709 service.get_addrs_on_my_intf_v4(intf)
2710 } else {
2711 service.get_addrs_on_my_intf_v6(intf)
2712 };
2713 if intf_addrs.is_empty()
2714 && (qtype == RRType::A || qtype == RRType::AAAA)
2715 {
2716 let t = match qtype {
2717 RRType::A => "TYPE_A",
2718 RRType::AAAA => "TYPE_AAAA",
2719 _ => "invalid_type",
2720 };
2721 trace!(
2722 "Cannot find valid addrs for {} response on intf {:?}",
2723 t,
2724 &intf
2725 );
2726 return;
2727 }
2728 for address in intf_addrs {
2729 out.add_answer(
2730 &msg,
2731 DnsAddress::new(
2732 service_hostname,
2733 ip_address_rr_type(&address),
2734 CLASS_IN | CLASS_CACHE_FLUSH,
2735 service.get_host_ttl(),
2736 address,
2737 intf.into(),
2738 ),
2739 );
2740 }
2741 }
2742 }
2743 }
2744
2745 let query_name = question.entry_name().to_lowercase();
2746 let service_opt = self
2747 .my_services
2748 .iter()
2749 .find(|(k, _v)| {
2750 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2751 Some(new_name) => new_name,
2752 None => k,
2753 };
2754 service_name == &query_name
2755 })
2756 .map(|(_, v)| v);
2757
2758 let Some(service) = service_opt else {
2759 continue;
2760 };
2761
2762 if service.get_status(if_index) != ServiceStatus::Announced {
2763 continue;
2764 }
2765
2766 let intf_addrs = if is_ipv4 {
2767 service.get_addrs_on_my_intf_v4(intf)
2768 } else {
2769 service.get_addrs_on_my_intf_v6(intf)
2770 };
2771 if intf_addrs.is_empty() {
2772 debug!(
2773 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2774 &intf
2775 );
2776 continue;
2777 }
2778
2779 add_answer_of_service(
2780 &mut out,
2781 &msg,
2782 question.entry_name(),
2783 service,
2784 qtype,
2785 intf_addrs,
2786 );
2787 }
2788 }
2789
2790 if !out.answers_count() > 0 {
2791 out.set_id(msg.id());
2792 send_dns_outgoing(&out, intf, &sock.pktinfo);
2793
2794 let if_name = intf.name.clone();
2795
2796 self.increase_counter(Counter::Respond, 1);
2797 self.notify_monitors(DaemonEvent::Respond(if_name));
2798 }
2799
2800 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2801 }
2802
2803 fn increase_counter(&mut self, counter: Counter, count: i64) {
2805 let key = counter.to_string();
2806 match self.counters.get_mut(&key) {
2807 Some(v) => *v += count,
2808 None => {
2809 self.counters.insert(key, count);
2810 }
2811 }
2812 }
2813
2814 fn set_counter(&mut self, counter: Counter, count: i64) {
2816 let key = counter.to_string();
2817 self.counters.insert(key, count);
2818 }
2819
2820 fn signal_sock_drain(&self) {
2821 let mut signal_buf = [0; 1024];
2822
2823 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2825 trace!(
2826 "signal socket recvd: {}",
2827 String::from_utf8_lossy(&signal_buf[0..sz])
2828 );
2829 }
2830 }
2831
2832 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2833 self.retransmissions.push(ReRun { next_time, command });
2834 self.add_timer(next_time);
2835 }
2836
2837 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2840 for (ty_domain, sender) in self.service_queriers.iter() {
2841 if let Some(instances) = expired.get(ty_domain) {
2842 for instance_name in instances {
2843 let event = ServiceEvent::ServiceRemoved(
2844 ty_domain.to_string(),
2845 instance_name.to_string(),
2846 );
2847 match sender.send(event) {
2848 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2849 Err(e) => debug!("Failed to send event: {}", e),
2850 }
2851 }
2852 }
2853 }
2854 }
2855
2856 fn exec_command(&mut self, command: Command, repeating: bool) {
2860 trace!("exec_command: {:?} repeating: {}", &command, repeating);
2861 match command {
2862 Command::Browse(ty, next_delay, cache_only, listener) => {
2863 self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
2864 }
2865
2866 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2867 self.exec_command_resolve_hostname(
2868 repeating, hostname, next_delay, listener, timeout,
2869 );
2870 }
2871
2872 Command::Register(service_info) => {
2873 self.register_service(service_info);
2874 self.increase_counter(Counter::Register, 1);
2875 }
2876
2877 Command::RegisterResend(fullname, intf) => {
2878 trace!("register-resend service: {fullname} on {}", &intf);
2879 self.exec_command_register_resend(fullname, intf);
2880 }
2881
2882 Command::Unregister(fullname, resp_s) => {
2883 trace!("unregister service {} repeat {}", &fullname, &repeating);
2884 self.exec_command_unregister(repeating, fullname, resp_s);
2885 }
2886
2887 Command::UnregisterResend(packet, if_index, is_ipv4) => {
2888 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2889 }
2890
2891 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2892
2893 Command::StopResolveHostname(hostname) => {
2894 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2895 }
2896
2897 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2898
2899 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2900
2901 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2902 Ok(()) => trace!("Sent status to the client"),
2903 Err(e) => debug!("Failed to send status: {}", e),
2904 },
2905
2906 Command::Monitor(resp_s) => {
2907 self.monitors.push(resp_s);
2908 }
2909
2910 Command::SetOption(daemon_opt) => {
2911 self.process_set_option(daemon_opt);
2912 }
2913
2914 Command::GetOption(resp_s) => {
2915 let val = DaemonOptionVal {
2916 _service_name_len_max: self.service_name_len_max,
2917 ip_check_interval: self.ip_check_interval,
2918 };
2919 if let Err(e) = resp_s.send(val) {
2920 debug!("Failed to send options: {}", e);
2921 }
2922 }
2923
2924 Command::Verify(instance_fullname, timeout) => {
2925 self.exec_command_verify(instance_fullname, timeout, repeating);
2926 }
2927
2928 _ => {
2929 debug!("unexpected command: {:?}", &command);
2930 }
2931 }
2932 }
2933
2934 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2935 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2936 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2937 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2938 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2939 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2940 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2941 self.set_counter(Counter::Timer, self.timers.len() as i64);
2942
2943 let dns_registry_probe_count: usize = self
2944 .dns_registry_map
2945 .values()
2946 .map(|r| r.probing.len())
2947 .sum();
2948 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2949
2950 let dns_registry_active_count: usize = self
2951 .dns_registry_map
2952 .values()
2953 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2954 .sum();
2955 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2956
2957 let dns_registry_timer_count: usize = self
2958 .dns_registry_map
2959 .values()
2960 .map(|r| r.new_timers.len())
2961 .sum();
2962 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2963
2964 let dns_registry_name_change_count: usize = self
2965 .dns_registry_map
2966 .values()
2967 .map(|r| r.name_changes.len())
2968 .sum();
2969 self.set_counter(
2970 Counter::DnsRegistryNameChange,
2971 dns_registry_name_change_count as i64,
2972 );
2973
2974 if let Err(e) = resp_s.send(self.counters.clone()) {
2976 debug!("Failed to send metrics: {}", e);
2977 }
2978 }
2979
2980 fn exec_command_browse(
2981 &mut self,
2982 repeating: bool,
2983 ty: String,
2984 next_delay: u32,
2985 cache_only: bool,
2986 listener: Sender<ServiceEvent>,
2987 ) {
2988 let pretty_addrs: Vec<String> = self
2989 .my_intfs
2990 .iter()
2991 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
2992 .collect();
2993
2994 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2995 "{ty} on {} interfaces [{}]",
2996 pretty_addrs.len(),
2997 pretty_addrs.join(", ")
2998 ))) {
2999 debug!(
3000 "Failed to send SearchStarted({})(repeating:{}): {}",
3001 &ty, repeating, e
3002 );
3003 return;
3004 }
3005
3006 let now = current_time_millis();
3007 if !repeating {
3008 self.service_queriers.insert(ty.clone(), listener.clone());
3012
3013 self.query_cache_for_service(&ty, &listener, now);
3015 }
3016
3017 if cache_only {
3018 match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3020 Ok(()) => debug!("SearchStopped sent for {}", &ty),
3021 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3022 }
3023 return;
3024 }
3025
3026 self.send_query(&ty, RRType::PTR);
3027 self.increase_counter(Counter::Browse, 1);
3028
3029 let next_time = now + (next_delay * 1000) as u64;
3030 let max_delay = 60 * 60;
3031 let delay = cmp::min(next_delay * 2, max_delay);
3032 self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3033 }
3034
3035 fn exec_command_resolve_hostname(
3036 &mut self,
3037 repeating: bool,
3038 hostname: String,
3039 next_delay: u32,
3040 listener: Sender<HostnameResolutionEvent>,
3041 timeout: Option<u64>,
3042 ) {
3043 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3044 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3045 "{} on addrs {:?}",
3046 &hostname, &addr_list
3047 ))) {
3048 debug!(
3049 "Failed to send ResolveStarted({})(repeating:{}): {}",
3050 &hostname, repeating, e
3051 );
3052 return;
3053 }
3054 if !repeating {
3055 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3056 self.query_cache_for_hostname(&hostname, listener.clone());
3058 }
3059
3060 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3061 self.increase_counter(Counter::ResolveHostname, 1);
3062
3063 let now = current_time_millis();
3064 let next_time = now + u64::from(next_delay) * 1000;
3065 let max_delay = 60 * 60;
3066 let delay = cmp::min(next_delay * 2, max_delay);
3067
3068 if self
3070 .hostname_resolvers
3071 .get(&hostname)
3072 .and_then(|(_sender, timeout)| *timeout)
3073 .map(|timeout| next_time < timeout)
3074 .unwrap_or(true)
3075 {
3076 self.add_retransmission(
3077 next_time,
3078 Command::ResolveHostname(hostname, delay, listener, None),
3079 );
3080 }
3081 }
3082
3083 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3084 let pending_query = self.query_unresolved(&instance);
3085 let max_try = 3;
3086 if pending_query && try_count < max_try {
3087 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3090 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3091 }
3092 }
3093
3094 fn exec_command_unregister(
3095 &mut self,
3096 repeating: bool,
3097 fullname: String,
3098 resp_s: Sender<UnregisterStatus>,
3099 ) {
3100 let response = match self.my_services.remove_entry(&fullname) {
3101 None => {
3102 debug!("unregister: cannot find such service {}", &fullname);
3103 UnregisterStatus::NotFound
3104 }
3105 Some((_k, info)) => {
3106 let mut timers = Vec::new();
3107
3108 for (if_index, intf) in self.my_intfs.iter() {
3109 let packet = self.unregister_service(&info, intf, &self.ipv4_sock.pktinfo);
3110 if !repeating && !packet.is_empty() {
3112 let next_time = current_time_millis() + 120;
3113 self.retransmissions.push(ReRun {
3114 next_time,
3115 command: Command::UnregisterResend(packet, *if_index, true),
3116 });
3117 timers.push(next_time);
3118 }
3119
3120 let packet = self.unregister_service(&info, intf, &self.ipv6_sock.pktinfo);
3123 if !repeating && !packet.is_empty() {
3124 let next_time = current_time_millis() + 120;
3125 self.retransmissions.push(ReRun {
3126 next_time,
3127 command: Command::UnregisterResend(packet, *if_index, false),
3128 });
3129 timers.push(next_time);
3130 }
3131 }
3132
3133 for t in timers {
3134 self.add_timer(t);
3135 }
3136
3137 self.increase_counter(Counter::Unregister, 1);
3138 UnregisterStatus::OK
3139 }
3140 };
3141 if let Err(e) = resp_s.send(response) {
3142 debug!("unregister: failed to send response: {}", e);
3143 }
3144 }
3145
3146 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3147 let Some(intf) = self.my_intfs.get(&if_index) else {
3148 return;
3149 };
3150 let sock = if is_ipv4 {
3151 &self.ipv4_sock.pktinfo
3152 } else {
3153 &self.ipv6_sock.pktinfo
3154 };
3155
3156 let if_addr = if is_ipv4 {
3157 match intf.next_ifaddr_v4() {
3158 Some(addr) => addr,
3159 None => return,
3160 }
3161 } else {
3162 match intf.next_ifaddr_v6() {
3163 Some(addr) => addr,
3164 None => return,
3165 }
3166 };
3167
3168 debug!("UnregisterResend from {:?}", if_addr);
3169 multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, sock);
3170
3171 self.increase_counter(Counter::UnregisterResend, 1);
3172 }
3173
3174 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3175 match self.service_queriers.remove_entry(&ty_domain) {
3176 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3177 Some((ty, sender)) => {
3178 trace!("StopBrowse: removed queryer for {}", &ty);
3180 let mut i = 0;
3181 while i < self.retransmissions.len() {
3182 if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3183 if t == &ty {
3184 self.retransmissions.remove(i);
3185 trace!("StopBrowse: removed retransmission for {}", &ty);
3186 continue;
3187 }
3188 }
3189 i += 1;
3190 }
3191
3192 self.cache.remove_service_type(&ty_domain);
3194
3195 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3197 Ok(()) => trace!("Sent SearchStopped to the listener"),
3198 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3199 }
3200 }
3201 }
3202 }
3203
3204 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3205 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3206 trace!("StopResolve: removed queryer for {}", &host);
3208 let mut i = 0;
3209 while i < self.retransmissions.len() {
3210 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3211 if t == &host {
3212 self.retransmissions.remove(i);
3213 trace!("StopResolve: removed retransmission for {}", &host);
3214 continue;
3215 }
3216 }
3217 i += 1;
3218 }
3219
3220 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3222 Ok(()) => trace!("Sent SearchStopped to the listener"),
3223 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3224 }
3225 }
3226 }
3227
3228 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3229 let Some(info) = self.my_services.get_mut(&fullname) else {
3230 trace!("announce: cannot find such service {}", &fullname);
3231 return;
3232 };
3233
3234 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3235 return;
3236 };
3237
3238 let Some(intf) = self.my_intfs.get(&if_index) else {
3239 return;
3240 };
3241
3242 let announced_v4 =
3243 announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
3244 let announced_v6 =
3245 announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
3246
3247 if announced_v4 || announced_v6 {
3248 let mut hostname = info.get_hostname();
3249 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3250 hostname = new_name;
3251 }
3252 let service_name = match dns_registry.name_changes.get(&fullname) {
3253 Some(new_name) => new_name.to_string(),
3254 None => fullname,
3255 };
3256
3257 debug!("resend: announce service {service_name} on {}", intf.name);
3258
3259 notify_monitors(
3260 &mut self.monitors,
3261 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3262 );
3263 info.set_status(if_index, ServiceStatus::Announced);
3264 } else {
3265 debug!("register-resend should not fail");
3266 }
3267
3268 self.increase_counter(Counter::RegisterResend, 1);
3269 }
3270
3271 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3272 let now = current_time_millis();
3282 let expire_at = if repeating {
3283 None
3284 } else {
3285 Some(now + timeout.as_millis() as u64)
3286 };
3287
3288 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3290
3291 if !record_vec.is_empty() {
3292 let query_vec: Vec<(&str, RRType)> = record_vec
3293 .iter()
3294 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3295 .collect();
3296 self.send_query_vec(&query_vec);
3297
3298 if let Some(new_expire) = expire_at {
3299 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3303 }
3304 }
3305 }
3306
3307 fn refresh_active_services(&mut self) {
3309 let mut query_ptr_count = 0;
3310 let mut query_srv_count = 0;
3311 let mut new_timers = HashSet::new();
3312 let mut query_addr_count = 0;
3313
3314 for (ty_domain, _sender) in self.service_queriers.iter() {
3315 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3316 if !refreshed_timers.is_empty() {
3317 trace!("sending refresh query for PTR: {}", ty_domain);
3318 self.send_query(ty_domain, RRType::PTR);
3319 query_ptr_count += 1;
3320 new_timers.extend(refreshed_timers);
3321 }
3322
3323 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3324 for (instance, types) in instances {
3325 trace!("sending refresh query for: {}", &instance);
3326 let query_vec = types
3327 .into_iter()
3328 .map(|ty| (instance.as_str(), ty))
3329 .collect::<Vec<_>>();
3330 self.send_query_vec(&query_vec);
3331 query_srv_count += 1;
3332 }
3333 new_timers.extend(timers);
3334 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3335 for hostname in hostnames.iter() {
3336 trace!("sending refresh queries for A and AAAA: {}", hostname);
3337 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3338 query_addr_count += 2;
3339 }
3340 new_timers.extend(timers);
3341 }
3342
3343 for timer in new_timers {
3344 self.add_timer(timer);
3345 }
3346
3347 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3348 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3349 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3350 }
3351}
3352
3353fn add_answer_of_service(
3355 out: &mut DnsOutgoing,
3356 msg: &DnsIncoming,
3357 entry_name: &str,
3358 service: &ServiceInfo,
3359 qtype: RRType,
3360 intf_addrs: Vec<IpAddr>,
3361) {
3362 if qtype == RRType::SRV || qtype == RRType::ANY {
3363 out.add_answer(
3364 msg,
3365 DnsSrv::new(
3366 entry_name,
3367 CLASS_IN | CLASS_CACHE_FLUSH,
3368 service.get_host_ttl(),
3369 service.get_priority(),
3370 service.get_weight(),
3371 service.get_port(),
3372 service.get_hostname().to_string(),
3373 ),
3374 );
3375 }
3376
3377 if qtype == RRType::TXT || qtype == RRType::ANY {
3378 out.add_answer(
3379 msg,
3380 DnsTxt::new(
3381 entry_name,
3382 CLASS_IN | CLASS_CACHE_FLUSH,
3383 service.get_other_ttl(),
3384 service.generate_txt(),
3385 ),
3386 );
3387 }
3388
3389 if qtype == RRType::SRV {
3390 for address in intf_addrs {
3391 out.add_additional_answer(DnsAddress::new(
3392 service.get_hostname(),
3393 ip_address_rr_type(&address),
3394 CLASS_IN | CLASS_CACHE_FLUSH,
3395 service.get_host_ttl(),
3396 address,
3397 InterfaceId::default(),
3398 ));
3399 }
3400 }
3401}
3402
3403#[derive(Clone, Debug)]
3406#[non_exhaustive]
3407pub enum ServiceEvent {
3408 SearchStarted(String),
3410
3411 ServiceFound(String, String),
3413
3414 ServiceResolved(Box<ResolvedService>),
3416
3417 ServiceRemoved(String, String),
3419
3420 SearchStopped(String),
3422}
3423
3424#[derive(Clone, Debug)]
3427#[non_exhaustive]
3428pub enum HostnameResolutionEvent {
3429 SearchStarted(String),
3431 AddressesFound(String, HashSet<ScopedIp>),
3433 AddressesRemoved(String, HashSet<ScopedIp>),
3435 SearchTimeout(String),
3437 SearchStopped(String),
3439}
3440
3441#[derive(Clone, Debug)]
3444#[non_exhaustive]
3445pub enum DaemonEvent {
3446 Announce(String, String),
3448
3449 Error(Error),
3451
3452 IpAdd(IpAddr),
3454
3455 IpDel(IpAddr),
3457
3458 NameChange(DnsNameChange),
3461
3462 Respond(String),
3464}
3465
3466#[derive(Clone, Debug)]
3469pub struct DnsNameChange {
3470 pub original: String,
3472
3473 pub new_name: String,
3483
3484 pub rr_type: RRType,
3486
3487 pub intf_name: String,
3489}
3490
3491#[derive(Debug)]
3493enum Command {
3494 Browse(String, u32, bool, Sender<ServiceEvent>),
3496
3497 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
3502
3503 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3524
3525 GetStatus(Sender<DaemonStatus>),
3527
3528 Monitor(Sender<DaemonEvent>),
3530
3531 SetOption(DaemonOption),
3532
3533 GetOption(Sender<DaemonOptionVal>),
3534
3535 Verify(String, Duration),
3540
3541 Exit(Sender<DaemonStatus>),
3542}
3543
3544impl fmt::Display for Command {
3545 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3546 match self {
3547 Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3548 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3549 Self::Exit(_) => write!(f, "Command Exit"),
3550 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3551 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3552 Self::Monitor(_) => write!(f, "Command Monitor"),
3553 Self::Register(_) => write!(f, "Command Register"),
3554 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3555 Self::SetOption(_) => write!(f, "Command SetOption"),
3556 Self::GetOption(_) => write!(f, "Command GetOption"),
3557 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3558 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3559 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3560 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3561 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3562 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3563 }
3564 }
3565}
3566
3567struct DaemonOptionVal {
3568 _service_name_len_max: u8,
3569 ip_check_interval: u64,
3570}
3571
3572#[derive(Debug)]
3573enum DaemonOption {
3574 ServiceNameLenMax(u8),
3575 IpCheckInterval(u64),
3576 EnableInterface(Vec<IfKind>),
3577 DisableInterface(Vec<IfKind>),
3578 MulticastLoopV4(bool),
3579 MulticastLoopV6(bool),
3580 AcceptUnsolicited(bool),
3581 #[cfg(test)]
3582 TestDownInterface(String),
3583 #[cfg(test)]
3584 TestUpInterface(String),
3585}
3586
3587const DOMAIN_LEN: usize = "._tcp.local.".len();
3589
3590fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3592 if ty_domain.len() <= DOMAIN_LEN + 1 {
3593 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3595 }
3596
3597 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3599 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3600 }
3601 Ok(())
3602}
3603
3604fn check_domain_suffix(name: &str) -> Result<()> {
3606 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3607 return Err(e_fmt!(
3608 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3609 name
3610 ));
3611 }
3612
3613 Ok(())
3614}
3615
3616fn check_service_name(fullname: &str) -> Result<()> {
3624 check_domain_suffix(fullname)?;
3625
3626 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3627 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3628
3629 if &name[0..1] != "_" {
3630 return Err(e_fmt!("Service name must start with '_'"));
3631 }
3632
3633 let name = &name[1..];
3634
3635 if name.contains("--") {
3636 return Err(e_fmt!("Service name must not contain '--'"));
3637 }
3638
3639 if name.starts_with('-') || name.ends_with('-') {
3640 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3641 }
3642
3643 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3644 if ascii_count < 1 {
3645 return Err(e_fmt!(
3646 "Service name must contain at least one letter (eg: 'A-Za-z')"
3647 ));
3648 }
3649
3650 Ok(())
3651}
3652
3653fn check_hostname(hostname: &str) -> Result<()> {
3655 if !hostname.ends_with(".local.") {
3656 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3657 }
3658
3659 if hostname == ".local." {
3660 return Err(e_fmt!(
3661 "The part of the hostname before '.local.' cannot be empty"
3662 ));
3663 }
3664
3665 if hostname.len() > 255 {
3666 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3667 }
3668
3669 Ok(())
3670}
3671
3672fn call_service_listener(
3673 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3674 ty_domain: &str,
3675 event: ServiceEvent,
3676) {
3677 if let Some(listener) = listeners_map.get(ty_domain) {
3678 match listener.send(event) {
3679 Ok(()) => trace!("Sent event to listener successfully"),
3680 Err(e) => debug!("Failed to send event: {}", e),
3681 }
3682 }
3683}
3684
3685fn call_hostname_resolution_listener(
3686 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3687 hostname: &str,
3688 event: HostnameResolutionEvent,
3689) {
3690 let hostname_lower = hostname.to_lowercase();
3691 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3692 match listener.send(event) {
3693 Ok(()) => trace!("Sent event to listener successfully"),
3694 Err(e) => debug!("Failed to send event: {}", e),
3695 }
3696 }
3697}
3698
3699fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3703 if_addrs::get_if_addrs()
3704 .unwrap_or_default()
3705 .into_iter()
3706 .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3707 .collect()
3708}
3709
3710fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3713 let if_name = &my_intf.name;
3714
3715 let if_addr = if sock.domain() == Domain::IPV4 {
3716 match my_intf.next_ifaddr_v4() {
3717 Some(addr) => addr,
3718 None => return vec![],
3719 }
3720 } else {
3721 match my_intf.next_ifaddr_v6() {
3722 Some(addr) => addr,
3723 None => return vec![],
3724 }
3725 };
3726
3727 send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3728}
3729
3730fn send_dns_outgoing_impl(
3732 out: &DnsOutgoing,
3733 if_name: &str,
3734 if_index: u32,
3735 if_addr: &IfAddr,
3736 sock: &PktInfoUdpSocket,
3737) -> Vec<Vec<u8>> {
3738 let qtype = if out.is_query() {
3739 "query"
3740 } else {
3741 if out.answers_count() == 0 && out.additionals().is_empty() {
3742 return vec![]; }
3744 "response"
3745 };
3746 trace!(
3747 "send {}: {} questions {} answers {} authorities {} additional",
3748 qtype,
3749 out.questions().len(),
3750 out.answers_count(),
3751 out.authorities().len(),
3752 out.additionals().len()
3753 );
3754
3755 match if_addr.ip() {
3756 IpAddr::V4(ipv4) => {
3757 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3758 debug!(
3759 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3760 ipv4, e
3761 );
3762 return vec![]; }
3764 }
3765 IpAddr::V6(ipv6) => {
3766 if let Err(e) = sock.set_multicast_if_v6(if_index) {
3767 debug!(
3768 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3769 ipv6, e
3770 );
3771 return vec![]; }
3773 }
3774 }
3775
3776 let packet_list = out.to_data_on_wire();
3777 for packet in packet_list.iter() {
3778 multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3779 }
3780 packet_list
3781}
3782
3783fn multicast_on_intf(
3785 packet: &[u8],
3786 if_name: &str,
3787 if_index: u32,
3788 if_addr: &IfAddr,
3789 socket: &PktInfoUdpSocket,
3790) {
3791 if packet.len() > MAX_MSG_ABSOLUTE {
3792 debug!("Drop over-sized packet ({})", packet.len());
3793 return;
3794 }
3795
3796 let addr: SocketAddr = match if_addr {
3797 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3798 if_addrs::IfAddr::V6(_) => {
3799 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3800 sock.set_scope_id(if_index); sock.into()
3802 }
3803 };
3804
3805 let sock_addr = addr.into();
3807 match socket.send_to(packet, &sock_addr) {
3808 Ok(sz) => trace!(
3809 "sent out {} bytes on interface {} (idx {}) addr {}",
3810 sz,
3811 if_name,
3812 if_index,
3813 if_addr.ip()
3814 ),
3815 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3816 }
3817}
3818
3819fn valid_instance_name(name: &str) -> bool {
3823 name.split('.').count() >= 5
3824}
3825
3826fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3827 monitors.retain(|sender| {
3828 if let Err(e) = sender.try_send(event.clone()) {
3829 debug!("notify_monitors: try_send: {}", &e);
3830 if matches!(e, TrySendError::Disconnected(_)) {
3831 return false; }
3833 }
3834 true
3835 });
3836}
3837
3838fn prepare_announce(
3841 info: &ServiceInfo,
3842 intf: &MyIntf,
3843 dns_registry: &mut DnsRegistry,
3844 is_ipv4: bool,
3845) -> Option<DnsOutgoing> {
3846 let intf_addrs = if is_ipv4 {
3847 info.get_addrs_on_my_intf_v4(intf)
3848 } else {
3849 info.get_addrs_on_my_intf_v6(intf)
3850 };
3851
3852 if intf_addrs.is_empty() {
3853 debug!(
3854 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3855 &intf.name
3856 );
3857 return None;
3858 }
3859
3860 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3862 Some(new_name) => new_name,
3863 None => info.get_fullname(),
3864 };
3865
3866 debug!(
3867 "prepare to announce service {service_fullname} on {:?}",
3868 &intf_addrs
3869 );
3870
3871 let mut probing_count = 0;
3872 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3873 let create_time = current_time_millis() + fastrand::u64(0..250);
3874
3875 out.add_answer_at_time(
3876 DnsPointer::new(
3877 info.get_type(),
3878 RRType::PTR,
3879 CLASS_IN,
3880 info.get_other_ttl(),
3881 service_fullname.to_string(),
3882 ),
3883 0,
3884 );
3885
3886 if let Some(sub) = info.get_subtype() {
3887 trace!("Adding subdomain {}", sub);
3888 out.add_answer_at_time(
3889 DnsPointer::new(
3890 sub,
3891 RRType::PTR,
3892 CLASS_IN,
3893 info.get_other_ttl(),
3894 service_fullname.to_string(),
3895 ),
3896 0,
3897 );
3898 }
3899
3900 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3902 Some(new_name) => new_name.to_string(),
3903 None => info.get_hostname().to_string(),
3904 };
3905
3906 let mut srv = DnsSrv::new(
3907 info.get_fullname(),
3908 CLASS_IN | CLASS_CACHE_FLUSH,
3909 info.get_host_ttl(),
3910 info.get_priority(),
3911 info.get_weight(),
3912 info.get_port(),
3913 hostname,
3914 );
3915
3916 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3917 srv.get_record_mut().set_new_name(new_name.to_string());
3918 }
3919
3920 if !info.requires_probe()
3921 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3922 {
3923 out.add_answer_at_time(srv, 0);
3924 } else {
3925 probing_count += 1;
3926 }
3927
3928 let mut txt = DnsTxt::new(
3931 info.get_fullname(),
3932 CLASS_IN | CLASS_CACHE_FLUSH,
3933 info.get_other_ttl(),
3934 info.generate_txt(),
3935 );
3936
3937 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3938 txt.get_record_mut().set_new_name(new_name.to_string());
3939 }
3940
3941 if !info.requires_probe()
3942 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3943 {
3944 out.add_answer_at_time(txt, 0);
3945 } else {
3946 probing_count += 1;
3947 }
3948
3949 let hostname = info.get_hostname();
3952 for address in intf_addrs {
3953 let mut dns_addr = DnsAddress::new(
3954 hostname,
3955 ip_address_rr_type(&address),
3956 CLASS_IN | CLASS_CACHE_FLUSH,
3957 info.get_host_ttl(),
3958 address,
3959 intf.into(),
3960 );
3961
3962 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3963 dns_addr.get_record_mut().set_new_name(new_name.to_string());
3964 }
3965
3966 if !info.requires_probe()
3967 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3968 {
3969 out.add_answer_at_time(dns_addr, 0);
3970 } else {
3971 probing_count += 1;
3972 }
3973 }
3974
3975 if probing_count > 0 {
3976 return None;
3977 }
3978
3979 Some(out)
3980}
3981
3982fn announce_service_on_intf(
3985 dns_registry: &mut DnsRegistry,
3986 info: &ServiceInfo,
3987 intf: &MyIntf,
3988 sock: &PktInfoUdpSocket,
3989) -> bool {
3990 let is_ipv4 = sock.domain() == Domain::IPV4;
3991 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
3992 send_dns_outgoing(&out, intf, sock);
3993 return true;
3994 }
3995
3996 false
3997}
3998
3999fn name_change(original: &str) -> String {
4007 let mut parts: Vec<_> = original.split('.').collect();
4008 let Some(first_part) = parts.get_mut(0) else {
4009 return format!("{original} (2)");
4010 };
4011
4012 let mut new_name = format!("{first_part} (2)");
4013
4014 if let Some(paren_pos) = first_part.rfind(" (") {
4016 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4018 let absolute_end_pos = paren_pos + end_paren;
4019 if absolute_end_pos == first_part.len() - 1 {
4021 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4024 let base_name = &first_part[..paren_pos];
4025 new_name = format!("{} ({})", base_name, number + 1)
4026 }
4027 }
4028 }
4029 }
4030
4031 *first_part = &new_name;
4032 parts.join(".")
4033}
4034
4035fn hostname_change(original: &str) -> String {
4043 let mut parts: Vec<_> = original.split('.').collect();
4044 let Some(first_part) = parts.get_mut(0) else {
4045 return format!("{original}-2");
4046 };
4047
4048 let mut new_name = format!("{first_part}-2");
4049
4050 if let Some(hyphen_pos) = first_part.rfind('-') {
4052 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4054 let base_name = &first_part[..hyphen_pos];
4055 new_name = format!("{}-{}", base_name, number + 1);
4056 }
4057 }
4058
4059 *first_part = &new_name;
4060 parts.join(".")
4061}
4062
4063fn add_answer_with_additionals(
4064 out: &mut DnsOutgoing,
4065 msg: &DnsIncoming,
4066 service: &ServiceInfo,
4067 intf: &MyIntf,
4068 dns_registry: &DnsRegistry,
4069 is_ipv4: bool,
4070) {
4071 let intf_addrs = if is_ipv4 {
4072 service.get_addrs_on_my_intf_v4(intf)
4073 } else {
4074 service.get_addrs_on_my_intf_v6(intf)
4075 };
4076 if intf_addrs.is_empty() {
4077 trace!("No addrs on LAN of intf {:?}", intf);
4078 return;
4079 }
4080
4081 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4083 Some(new_name) => new_name,
4084 None => service.get_fullname(),
4085 };
4086
4087 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4088 Some(new_name) => new_name,
4089 None => service.get_hostname(),
4090 };
4091
4092 let ptr_added = out.add_answer(
4093 msg,
4094 DnsPointer::new(
4095 service.get_type(),
4096 RRType::PTR,
4097 CLASS_IN,
4098 service.get_other_ttl(),
4099 service_fullname.to_string(),
4100 ),
4101 );
4102
4103 if !ptr_added {
4104 trace!("answer was not added for msg {:?}", msg);
4105 return;
4106 }
4107
4108 if let Some(sub) = service.get_subtype() {
4109 trace!("Adding subdomain {}", sub);
4110 out.add_additional_answer(DnsPointer::new(
4111 sub,
4112 RRType::PTR,
4113 CLASS_IN,
4114 service.get_other_ttl(),
4115 service_fullname.to_string(),
4116 ));
4117 }
4118
4119 out.add_additional_answer(DnsSrv::new(
4122 service_fullname,
4123 CLASS_IN | CLASS_CACHE_FLUSH,
4124 service.get_host_ttl(),
4125 service.get_priority(),
4126 service.get_weight(),
4127 service.get_port(),
4128 hostname.to_string(),
4129 ));
4130
4131 out.add_additional_answer(DnsTxt::new(
4132 service_fullname,
4133 CLASS_IN | CLASS_CACHE_FLUSH,
4134 service.get_other_ttl(),
4135 service.generate_txt(),
4136 ));
4137
4138 for address in intf_addrs {
4139 out.add_additional_answer(DnsAddress::new(
4140 hostname,
4141 ip_address_rr_type(&address),
4142 CLASS_IN | CLASS_CACHE_FLUSH,
4143 service.get_host_ttl(),
4144 address,
4145 intf.into(),
4146 ));
4147 }
4148}
4149
4150fn check_probing(
4153 dns_registry: &mut DnsRegistry,
4154 timers: &mut BinaryHeap<Reverse<u64>>,
4155 now: u64,
4156) -> (DnsOutgoing, Vec<String>) {
4157 let mut expired_probes = Vec::new();
4158 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4159
4160 for (name, probe) in dns_registry.probing.iter_mut() {
4161 if now >= probe.next_send {
4162 if probe.expired(now) {
4163 expired_probes.push(name.clone());
4165 } else {
4166 out.add_question(name, RRType::ANY);
4167
4168 for record in probe.records.iter() {
4176 out.add_authority(record.clone());
4177 }
4178
4179 probe.update_next_send(now);
4180
4181 timers.push(Reverse(probe.next_send));
4183 }
4184 }
4185 }
4186
4187 (out, expired_probes)
4188}
4189
4190fn handle_expired_probes(
4195 expired_probes: Vec<String>,
4196 intf_name: &str,
4197 dns_registry: &mut DnsRegistry,
4198 monitors: &mut Vec<Sender<DaemonEvent>>,
4199) -> HashSet<String> {
4200 let mut waiting_services = HashSet::new();
4201
4202 for name in expired_probes {
4203 let Some(probe) = dns_registry.probing.remove(&name) else {
4204 continue;
4205 };
4206
4207 for record in probe.records.iter() {
4209 if let Some(new_name) = record.get_record().get_new_name() {
4210 dns_registry
4211 .name_changes
4212 .insert(name.clone(), new_name.to_string());
4213
4214 let event = DnsNameChange {
4215 original: record.get_record().get_original_name().to_string(),
4216 new_name: new_name.to_string(),
4217 rr_type: record.get_type(),
4218 intf_name: intf_name.to_string(),
4219 };
4220 debug!("Name change event: {:?}", &event);
4221 notify_monitors(monitors, DaemonEvent::NameChange(event));
4222 }
4223 }
4224
4225 debug!(
4227 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4228 probe.records.len(),
4229 probe.waiting_services.len(),
4230 );
4231
4232 if !probe.records.is_empty() {
4234 match dns_registry.active.get_mut(&name) {
4235 Some(records) => {
4236 records.extend(probe.records);
4237 }
4238 None => {
4239 dns_registry.active.insert(name, probe.records);
4240 }
4241 }
4242
4243 waiting_services.extend(probe.waiting_services);
4244 }
4245 }
4246
4247 waiting_services
4248}
4249
4250#[cfg(test)]
4251mod tests {
4252 use super::{
4253 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4254 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4255 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
4256 MDNS_PORT,
4257 };
4258 use crate::{
4259 dns_parser::{
4260 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4261 FLAGS_AA, FLAGS_QR_RESPONSE,
4262 },
4263 service_daemon::{add_answer_of_service, check_hostname},
4264 };
4265 use std::{
4266 net::{SocketAddr, SocketAddrV4},
4267 time::{Duration, SystemTime},
4268 };
4269 use test_log::test;
4270
4271 #[test]
4272 fn test_socketaddr_print() {
4273 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
4274 let print = format!("{}", addr);
4275 assert_eq!(print, "224.0.0.251:5353");
4276 }
4277
4278 #[test]
4279 fn test_instance_name() {
4280 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4281 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4282 assert!(!valid_instance_name("_printer._tcp.local."));
4283 }
4284
4285 #[test]
4286 fn test_check_service_name_length() {
4287 let result = check_service_name_length("_tcp", 100);
4288 assert!(result.is_err());
4289 if let Err(e) = result {
4290 println!("{}", e);
4291 }
4292 }
4293
4294 #[test]
4295 fn test_check_hostname() {
4296 for hostname in &[
4298 "my_host.local.",
4299 &("A".repeat(255 - ".local.".len()) + ".local."),
4300 ] {
4301 let result = check_hostname(hostname);
4302 assert!(result.is_ok());
4303 }
4304
4305 for hostname in &[
4307 "my_host.local",
4308 ".local.",
4309 &("A".repeat(256 - ".local.".len()) + ".local."),
4310 ] {
4311 let result = check_hostname(hostname);
4312 assert!(result.is_err());
4313 if let Err(e) = result {
4314 println!("{}", e);
4315 }
4316 }
4317 }
4318
4319 #[test]
4320 fn test_check_domain_suffix() {
4321 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4322 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4323 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4324 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4325 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4326 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4327 }
4328
4329 #[test]
4330 fn test_service_with_temporarily_invalidated_ptr() {
4331 let d = ServiceDaemon::new().expect("Failed to create daemon");
4333
4334 let service = "_test_inval_ptr._udp.local.";
4335 let host_name = "my_host_tmp_invalidated_ptr.local.";
4336 let intfs: Vec<_> = my_ip_interfaces(false);
4337 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4338 let port = 5201;
4339 let my_service =
4340 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4341 .expect("invalid service info")
4342 .enable_addr_auto();
4343 let result = d.register(my_service.clone());
4344 assert!(result.is_ok());
4345
4346 let browse_chan = d.browse(service).unwrap();
4348 let timeout = Duration::from_secs(2);
4349 let mut resolved = false;
4350
4351 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4352 match event {
4353 ServiceEvent::ServiceResolved(info) => {
4354 resolved = true;
4355 println!("Resolved a service of {}", &info.fullname);
4356 break;
4357 }
4358 e => {
4359 println!("Received event {:?}", e);
4360 }
4361 }
4362 }
4363
4364 assert!(resolved);
4365
4366 println!("Stopping browse of {}", service);
4367 d.stop_browse(service).unwrap();
4370
4371 let mut stopped = false;
4376 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4377 match event {
4378 ServiceEvent::SearchStopped(_) => {
4379 stopped = true;
4380 println!("Stopped browsing service");
4381 break;
4382 }
4383 e => {
4387 println!("Received event {:?}", e);
4388 }
4389 }
4390 }
4391
4392 assert!(stopped);
4393
4394 let invalidate_ptr_packet = DnsPointer::new(
4396 my_service.get_type(),
4397 RRType::PTR,
4398 CLASS_IN,
4399 0,
4400 my_service.get_fullname().to_string(),
4401 );
4402
4403 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4404 packet_buffer.add_additional_answer(invalidate_ptr_packet);
4405
4406 for intf in intfs {
4407 let sock = _new_socket_bind(&intf, true).unwrap();
4408 send_dns_outgoing_impl(
4409 &packet_buffer,
4410 &intf.name,
4411 intf.index.unwrap_or(0),
4412 &intf.addr,
4413 &sock.pktinfo,
4414 );
4415 }
4416
4417 println!(
4418 "Sent PTR record invalidation. Starting second browse for {}",
4419 service
4420 );
4421
4422 let browse_chan = d.browse(service).unwrap();
4424
4425 resolved = false;
4426 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4427 match event {
4428 ServiceEvent::ServiceResolved(info) => {
4429 resolved = true;
4430 println!("Resolved a service of {}", &info.fullname);
4431 break;
4432 }
4433 e => {
4434 println!("Received event {:?}", e);
4435 }
4436 }
4437 }
4438
4439 assert!(resolved);
4440 d.shutdown().unwrap();
4441 }
4442
4443 #[test]
4444 fn test_expired_srv() {
4445 let service_type = "_expired-srv._udp.local.";
4447 let instance = "test_instance";
4448 let host_name = "expired_srv_host.local.";
4449 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4450 .unwrap()
4451 .enable_addr_auto();
4452 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
4457
4458 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4460 let result = mdns_server.register(my_service);
4461 assert!(result.is_ok());
4462
4463 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4464 let browse_chan = mdns_client.browse(service_type).unwrap();
4465 let timeout = Duration::from_secs(2);
4466 let mut resolved = false;
4467
4468 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4469 match event {
4470 ServiceEvent::ServiceResolved(info) => {
4471 resolved = true;
4472 println!("Resolved a service of {}", &info.fullname);
4473 break;
4474 }
4475 _ => {}
4476 }
4477 }
4478
4479 assert!(resolved);
4480
4481 mdns_server.shutdown().unwrap();
4483
4484 let expire_timeout = Duration::from_secs(new_ttl as u64);
4486 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4487 match event {
4488 ServiceEvent::ServiceRemoved(service_type, full_name) => {
4489 println!("Service removed: {}: {}", &service_type, &full_name);
4490 break;
4491 }
4492 _ => {}
4493 }
4494 }
4495 }
4496
4497 #[test]
4498 fn test_hostname_resolution_address_removed() {
4499 let server = ServiceDaemon::new().expect("Failed to create server");
4501 let hostname = "addr_remove_host._tcp.local.";
4502 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4503 .iter()
4504 .find(|iface| iface.ip().is_ipv4())
4505 .map(|iface| iface.ip().into())
4506 .unwrap();
4507
4508 let mut my_service = ServiceInfo::new(
4509 "_host_res_test._tcp.local.",
4510 "my_instance",
4511 hostname,
4512 &service_ip_addr.to_ip_addr(),
4513 1234,
4514 None,
4515 )
4516 .expect("invalid service info");
4517
4518 let addr_ttl = 2;
4520 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
4523
4524 let client = ServiceDaemon::new().expect("Failed to create client");
4526 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4527 let resolved = loop {
4528 match event_receiver.recv() {
4529 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4530 assert!(found_hostname == hostname);
4531 assert!(addresses.contains(&service_ip_addr));
4532 println!("address found: {:?}", &addresses);
4533 break true;
4534 }
4535 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4536 Ok(_event) => {}
4537 Err(_) => break false,
4538 }
4539 };
4540
4541 assert!(resolved);
4542
4543 server.shutdown().unwrap();
4545
4546 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4548 let removed = loop {
4549 match event_receiver.recv_timeout(timeout) {
4550 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4551 assert!(removed_host == hostname);
4552 assert!(addresses.contains(&service_ip_addr));
4553
4554 println!(
4555 "address removed: hostname: {} addresses: {:?}",
4556 &hostname, &addresses
4557 );
4558 break true;
4559 }
4560 Ok(_event) => {}
4561 Err(_) => {
4562 break false;
4563 }
4564 }
4565 };
4566
4567 assert!(removed);
4568
4569 client.shutdown().unwrap();
4570 }
4571
4572 #[test]
4573 fn test_refresh_ptr() {
4574 let service_type = "_refresh-ptr._udp.local.";
4576 let instance = "test_instance";
4577 let host_name = "refresh_ptr_host.local.";
4578 let service_ip_addr = my_ip_interfaces(false)
4579 .iter()
4580 .find(|iface| iface.ip().is_ipv4())
4581 .map(|iface| iface.ip())
4582 .unwrap();
4583
4584 let mut my_service = ServiceInfo::new(
4585 service_type,
4586 instance,
4587 host_name,
4588 &service_ip_addr,
4589 5023,
4590 None,
4591 )
4592 .unwrap();
4593
4594 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4596
4597 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4599 let result = mdns_server.register(my_service);
4600 assert!(result.is_ok());
4601
4602 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4603 let browse_chan = mdns_client.browse(service_type).unwrap();
4604 let timeout = Duration::from_millis(1500); let mut resolved = false;
4606
4607 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4609 match event {
4610 ServiceEvent::ServiceResolved(info) => {
4611 resolved = true;
4612 println!("Resolved a service of {}", &info.fullname);
4613 break;
4614 }
4615 _ => {}
4616 }
4617 }
4618
4619 assert!(resolved);
4620
4621 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4623 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4624 println!("event: {:?}", &event);
4625 }
4626
4627 let metrics_chan = mdns_client.get_metrics().unwrap();
4629 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4630 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4631 assert_eq!(ptr_refresh_counter, 1);
4632 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4633 assert_eq!(srvtxt_refresh_counter, 1);
4634
4635 mdns_server.shutdown().unwrap();
4637 mdns_client.shutdown().unwrap();
4638 }
4639
4640 #[test]
4641 fn test_name_change() {
4642 assert_eq!(name_change("foo.local."), "foo (2).local.");
4643 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4644 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4645 assert_eq!(name_change("foo"), "foo (2)");
4646 assert_eq!(name_change("foo (2)"), "foo (3)");
4647 assert_eq!(name_change(""), " (2)");
4648
4649 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4654
4655 #[test]
4656 fn test_hostname_change() {
4657 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4658 assert_eq!(hostname_change("foo"), "foo-2");
4659 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4660 assert_eq!(hostname_change("foo-9"), "foo-10");
4661 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4662 }
4663
4664 #[test]
4665 fn test_add_answer_txt_ttl() {
4666 let service_type = "_test_add_answer._udp.local.";
4668 let instance = "test_instance";
4669 let host_name = "add_answer_host.local.";
4670 let service_intf = my_ip_interfaces(false)
4671 .into_iter()
4672 .find(|iface| iface.ip().is_ipv4())
4673 .unwrap();
4674 let service_ip_addr = service_intf.ip();
4675 let my_service = ServiceInfo::new(
4676 service_type,
4677 instance,
4678 host_name,
4679 &service_ip_addr,
4680 5023,
4681 None,
4682 )
4683 .unwrap();
4684
4685 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4687
4688 let mut dummy_data = out.to_data_on_wire();
4690 let interface_id = InterfaceId::from(&service_intf);
4691 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4692
4693 let if_addrs = vec![service_intf.ip()];
4695 add_answer_of_service(
4696 &mut out,
4697 &incoming,
4698 instance,
4699 &my_service,
4700 RRType::TXT,
4701 if_addrs,
4702 );
4703
4704 assert!(
4706 out.answers_count() > 0,
4707 "No answers added to the outgoing message"
4708 );
4709
4710 let answer = out._answers().first().unwrap();
4712 assert_eq!(answer.0.get_type(), RRType::TXT);
4713
4714 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4716 }
4717
4718 #[test]
4719 fn test_interface_flip() {
4720 let ty_domain = "_intf-flip._udp.local.";
4722 let host_name = "intf_flip.local.";
4723 let now = SystemTime::now()
4724 .duration_since(SystemTime::UNIX_EPOCH)
4725 .unwrap();
4726 let instance_name = now.as_micros().to_string(); let port = 5200;
4728
4729 let (ip_addr1, intf_name) = my_ip_interfaces(false)
4731 .iter()
4732 .find(|iface| iface.ip().is_ipv4())
4733 .map(|iface| (iface.ip(), iface.name.clone()))
4734 .unwrap();
4735
4736 println!("Using interface {} with IP {}", intf_name, ip_addr1);
4737
4738 let service1 =
4740 ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
4741 .expect("valid service info");
4742 let server1 = ServiceDaemon::new().expect("failed to start server");
4743 server1
4744 .register(service1)
4745 .expect("Failed to register service1");
4746
4747 std::thread::sleep(Duration::from_secs(2));
4749
4750 let client = ServiceDaemon::new().expect("failed to start client");
4752
4753 let receiver = client.browse(ty_domain).unwrap();
4754
4755 let timeout = Duration::from_secs(3);
4756 let mut got_data = false;
4757
4758 while let Ok(event) = receiver.recv_timeout(timeout) {
4759 match event {
4760 ServiceEvent::ServiceResolved(_) => {
4761 println!("Received ServiceResolved event");
4762 got_data = true;
4763 break;
4764 }
4765 _ => {}
4766 }
4767 }
4768
4769 assert!(got_data, "Should receive ServiceResolved event");
4770
4771 client.set_ip_check_interval(1).unwrap();
4773
4774 println!("Shutting down interface {}", &intf_name);
4776 client.test_down_interface(&intf_name).unwrap();
4777
4778 let mut got_removed = false;
4779
4780 while let Ok(event) = receiver.recv_timeout(timeout) {
4781 match event {
4782 ServiceEvent::ServiceRemoved(ty_domain, instance) => {
4783 got_removed = true;
4784 println!("removed: {ty_domain} : {instance}");
4785 break;
4786 }
4787 _ => {}
4788 }
4789 }
4790 assert!(got_removed, "Should receive ServiceRemoved event");
4791
4792 println!("Bringing up interface {}", &intf_name);
4793 client.test_up_interface(&intf_name).unwrap();
4794 let mut got_data = false;
4795 while let Ok(event) = receiver.recv_timeout(timeout) {
4796 match event {
4797 ServiceEvent::ServiceResolved(resolved) => {
4798 got_data = true;
4799 println!("Received ServiceResolved: {:?}", resolved);
4800 break;
4801 }
4802 _ => {}
4803 }
4804 }
4805 assert!(
4806 got_data,
4807 "Should receive ServiceResolved event after interface is back up"
4808 );
4809
4810 server1.shutdown().unwrap();
4811 client.shutdown().unwrap();
4812 }
4813
4814 #[test]
4815 fn test_cache_only() {
4816 let service_type = "_cache_only._udp.local.";
4818 let instance = "test_instance";
4819 let host_name = "cache_only_host.local.";
4820 let service_ip_addr = my_ip_interfaces(false)
4821 .iter()
4822 .find(|iface| iface.ip().is_ipv4())
4823 .map(|iface| iface.ip())
4824 .unwrap();
4825
4826 let mut my_service = ServiceInfo::new(
4827 service_type,
4828 instance,
4829 host_name,
4830 &service_ip_addr,
4831 5023,
4832 None,
4833 )
4834 .unwrap();
4835
4836 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4838
4839 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4840
4841 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4844 std::thread::sleep(Duration::from_secs(2));
4845
4846 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4848 let result = mdns_server.register(my_service);
4849 assert!(result.is_ok());
4850
4851 let timeout = Duration::from_millis(1500); let mut resolved = false;
4853
4854 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4856 match event {
4857 ServiceEvent::ServiceResolved(info) => {
4858 resolved = true;
4859 println!("Resolved a service of {}", &info.get_fullname());
4860 break;
4861 }
4862 _ => {}
4863 }
4864 }
4865
4866 assert!(resolved);
4867
4868 mdns_server.shutdown().unwrap();
4870 mdns_client.shutdown().unwrap();
4871 }
4872
4873 #[test]
4874 fn test_cache_only_unsolicited() {
4875 let service_type = "_cache_only._udp.local.";
4877 let instance = "test_instance";
4878 let host_name = "cache_only_host.local.";
4879 let service_ip_addr = my_ip_interfaces(false)
4880 .iter()
4881 .find(|iface| iface.ip().is_ipv4())
4882 .map(|iface| iface.ip())
4883 .unwrap();
4884
4885 let mut my_service = ServiceInfo::new(
4886 service_type,
4887 instance,
4888 host_name,
4889 &service_ip_addr,
4890 5023,
4891 None,
4892 )
4893 .unwrap();
4894
4895 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4897
4898 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4900 let result = mdns_server.register(my_service);
4901 assert!(result.is_ok());
4902
4903 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4904 mdns_client.accept_unsolicited(true).unwrap();
4905
4906 std::thread::sleep(Duration::from_secs(2));
4909 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4910 let timeout = Duration::from_millis(1500); let mut resolved = false;
4912
4913 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4915 match event {
4916 ServiceEvent::ServiceResolved(info) => {
4917 resolved = true;
4918 println!("Resolved a service of {}", &info.get_fullname());
4919 break;
4920 }
4921 _ => {}
4922 }
4923 }
4924
4925 assert!(resolved);
4926
4927 mdns_server.shutdown().unwrap();
4929 mdns_client.shutdown().unwrap();
4930 }
4931}