1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42 Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50 cmp::{self, Reverse},
51 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52 fmt, io,
53 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54 str, thread,
55 time::Duration,
56 vec,
57};
58
59pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78#[derive(Debug)]
80pub enum UnregisterStatus {
81 OK,
83 NotFound,
85}
86
87#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91 Running,
93
94 Shutdown,
96}
97
98#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102 Register,
103 RegisterResend,
104 Unregister,
105 UnregisterResend,
106 Browse,
107 ResolveHostname,
108 Respond,
109 CacheRefreshPTR,
110 CacheRefreshSrvTxt,
111 CacheRefreshAddr,
112 KnownAnswerSuppression,
113 CachedPTR,
114 CachedSRV,
115 CachedAddr,
116 CachedTxt,
117 CachedNSec,
118 CachedSubtype,
119 DnsRegistryProbe,
120 DnsRegistryActive,
121 DnsRegistryTimer,
122 DnsRegistryNameChange,
123 Timer,
124}
125
126impl fmt::Display for Counter {
127 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128 match self {
129 Self::Register => write!(f, "register"),
130 Self::RegisterResend => write!(f, "register-resend"),
131 Self::Unregister => write!(f, "unregister"),
132 Self::UnregisterResend => write!(f, "unregister-resend"),
133 Self::Browse => write!(f, "browse"),
134 Self::ResolveHostname => write!(f, "resolve-hostname"),
135 Self::Respond => write!(f, "respond"),
136 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140 Self::CachedPTR => write!(f, "cached-ptr"),
141 Self::CachedSRV => write!(f, "cached-srv"),
142 Self::CachedAddr => write!(f, "cached-addr"),
143 Self::CachedTxt => write!(f, "cached-txt"),
144 Self::CachedNSec => write!(f, "cached-nsec"),
145 Self::CachedSubtype => write!(f, "cached-subtype"),
146 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150 Self::Timer => write!(f, "timer"),
151 }
152 }
153}
154
155struct MyUdpSocket {
160 pktinfo: PktInfoUdpSocket,
163
164 mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171 let std_sock = pktinfo.try_clone_std()?;
172 let mio = MioUdpSocket::from_std(std_sock);
173
174 Ok(Self { pktinfo, mio })
175 }
176}
177
178impl Source for MyUdpSocket {
180 fn register(
181 &mut self,
182 registry: &Registry,
183 token: Token,
184 interests: Interest,
185 ) -> io::Result<()> {
186 self.mio.register(registry, token, interests)
187 }
188
189 fn reregister(
190 &mut self,
191 registry: &Registry,
192 token: Token,
193 interests: Interest,
194 ) -> io::Result<()> {
195 self.mio.reregister(registry, token, interests)
196 }
197
198 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199 self.mio.deregister(registry)
200 }
201}
202
203pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
215pub struct ServiceDaemon {
216 sender: Sender<Command>,
218
219 signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228 pub fn new() -> Result<Self> {
233 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237 let signal_sock = UdpSocket::bind(signal_addr)
238 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240 let signal_addr = signal_sock
242 .local_addr()
243 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245 signal_sock
247 .set_nonblocking(true)
248 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252 let (sender, receiver) = bounded(100);
253
254 let mio_sock = MioUdpSocket::from_std(signal_sock);
256 thread::Builder::new()
257 .name("mDNS_daemon".to_string())
258 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261 Ok(Self {
262 sender,
263 signal_addr,
264 })
265 }
266
267 fn send_cmd(&self, cmd: Command) -> Result<()> {
270 let cmd_name = cmd.to_string();
271
272 self.sender.try_send(cmd).map_err(|e| match e {
274 TrySendError::Full(_) => Error::Again,
275 e => e_fmt!("flume::channel::send failed: {}", e),
276 })?;
277
278 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280 let socket = UdpSocket::bind(addr)
281 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282 socket
283 .send_to(cmd_name.as_bytes(), self.signal_addr)
284 .map_err(|e| {
285 e_fmt!(
286 "signal socket send_to {} ({}) failed: {}",
287 self.signal_addr,
288 cmd_name,
289 e
290 )
291 })?;
292
293 Ok(())
294 }
295
296 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307 check_domain_suffix(service_type)?;
308
309 let (resp_s, resp_r) = bounded(10);
310 self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
311 Ok(resp_r)
312 }
313
314 pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
323 check_domain_suffix(service_type)?;
324
325 let (resp_s, resp_r) = bounded(10);
326 self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
327 Ok(resp_r)
328 }
329
330 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
335 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
336 }
337
338 pub fn resolve_hostname(
346 &self,
347 hostname: &str,
348 timeout: Option<u64>,
349 ) -> Result<Receiver<HostnameResolutionEvent>> {
350 check_hostname(hostname)?;
351 let (resp_s, resp_r) = bounded(10);
352 self.send_cmd(Command::ResolveHostname(
353 hostname.to_string(),
354 1,
355 resp_s,
356 timeout,
357 ))?;
358 Ok(resp_r)
359 }
360
361 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
366 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
367 }
368
369 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
377 check_service_name(service_info.get_fullname())?;
378 check_hostname(service_info.get_hostname())?;
379
380 self.send_cmd(Command::Register(service_info.into()))
381 }
382
383 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
391 let (resp_s, resp_r) = bounded(1);
392 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
393 Ok(resp_r)
394 }
395
396 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
400 let (resp_s, resp_r) = bounded(100);
401 self.send_cmd(Command::Monitor(resp_s))?;
402 Ok(resp_r)
403 }
404
405 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
410 let (resp_s, resp_r) = bounded(1);
411 self.send_cmd(Command::Exit(resp_s))?;
412 Ok(resp_r)
413 }
414
415 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
421 let (resp_s, resp_r) = bounded(1);
422
423 if self.sender.is_disconnected() {
424 resp_s
425 .send(DaemonStatus::Shutdown)
426 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
427 } else {
428 self.send_cmd(Command::GetStatus(resp_s))?;
429 }
430
431 Ok(resp_r)
432 }
433
434 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
439 let (resp_s, resp_r) = bounded(1);
440 self.send_cmd(Command::GetMetrics(resp_s))?;
441 Ok(resp_r)
442 }
443
444 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
451 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
454 return Err(Error::Msg(format!(
455 "service name length max {len_max} is too large"
456 )));
457 }
458
459 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
460 }
461
462 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
468 let interval_in_millis = interval_in_secs as u64 * 1000;
469 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
470 interval_in_millis,
471 )))
472 }
473
474 pub fn get_ip_check_interval(&self) -> Result<u32> {
476 let (resp_s, resp_r) = bounded(1);
477 self.send_cmd(Command::GetOption(resp_s))?;
478
479 let option = resp_r
480 .recv_timeout(Duration::from_secs(10))
481 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
482 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
483 Ok(ip_check_interval_in_secs as u32)
484 }
485
486 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
493 let if_kind_vec = if_kind.into_vec();
494 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
495 if_kind_vec.kinds,
496 )))
497 }
498
499 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
506 let if_kind_vec = if_kind.into_vec();
507 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
508 if_kind_vec.kinds,
509 )))
510 }
511
512 pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
523 self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
524 }
525
526 #[cfg(test)]
527 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
528 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
529 ifname.to_string(),
530 )))
531 }
532
533 #[cfg(test)]
534 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
535 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
536 ifname.to_string(),
537 )))
538 }
539
540 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
556 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
557 }
558
559 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
575 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
576 }
577
578 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
591 self.send_cmd(Command::Verify(instance_fullname, timeout))
592 }
593
594 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
595 let mut zc = Zeroconf::new(signal_sock, poller);
596
597 if let Some(cmd) = zc.run(receiver) {
598 match cmd {
599 Command::Exit(resp_s) => {
600 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
603 debug!("exit: failed to send response of shutdown: {}", e);
604 }
605 }
606 _ => {
607 debug!("Unexpected command: {:?}", cmd);
608 }
609 }
610 }
611 }
612}
613
614fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
616 let intf_ip = &intf.ip();
619 match intf_ip {
620 IpAddr::V4(ip) => {
621 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
622 let sock = new_socket(addr.into(), true)?;
623
624 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
626 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
627
628 sock.set_multicast_if_v4(ip)
630 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
631
632 sock.set_multicast_ttl_v4(255)
637 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
638
639 if !should_loop {
640 sock.set_multicast_loop_v4(false)
641 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
642 }
643
644 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
646 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
647 for packet in test_packets {
648 sock.send_to(&packet, &multicast_addr)
649 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
650 }
651 MyUdpSocket::new(sock)
652 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
653 }
654 IpAddr::V6(ip) => {
655 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
656 let sock = new_socket(addr.into(), true)?;
657
658 let if_index = intf.index.unwrap_or(0);
659
660 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
662 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
663
664 sock.set_multicast_if_v6(if_index)
666 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
667
668 MyUdpSocket::new(sock)
673 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
674 }
675 }
676}
677
678fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
681 let domain = match addr {
682 SocketAddr::V4(_) => socket2::Domain::IPV4,
683 SocketAddr::V6(_) => socket2::Domain::IPV6,
684 };
685
686 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
687
688 fd.set_reuse_address(true)
689 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
690 #[cfg(unix)] fd.set_reuse_port(true)
692 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
693
694 if non_block {
695 fd.set_nonblocking(true)
696 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
697 }
698
699 fd.bind(&addr.into())
700 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
701
702 trace!("new socket bind to {}", &addr);
703 Ok(fd)
704}
705
706struct ReRun {
708 next_time: u64,
710 command: Command,
711}
712
713#[derive(Debug, Clone)]
717#[non_exhaustive]
718pub enum IfKind {
719 All,
721
722 IPv4,
724
725 IPv6,
727
728 Name(String),
730
731 Addr(IpAddr),
733
734 LoopbackV4,
738
739 LoopbackV6,
741}
742
743impl IfKind {
744 fn matches(&self, intf: &Interface) -> bool {
746 match self {
747 Self::All => true,
748 Self::IPv4 => intf.ip().is_ipv4(),
749 Self::IPv6 => intf.ip().is_ipv6(),
750 Self::Name(ifname) => ifname == &intf.name,
751 Self::Addr(addr) => addr == &intf.ip(),
752 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
753 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
754 }
755 }
756}
757
758impl From<&str> for IfKind {
761 fn from(val: &str) -> Self {
762 Self::Name(val.to_string())
763 }
764}
765
766impl From<&String> for IfKind {
767 fn from(val: &String) -> Self {
768 Self::Name(val.to_string())
769 }
770}
771
772impl From<IpAddr> for IfKind {
774 fn from(val: IpAddr) -> Self {
775 Self::Addr(val)
776 }
777}
778
779pub struct IfKindVec {
781 kinds: Vec<IfKind>,
782}
783
784pub trait IntoIfKindVec {
786 fn into_vec(self) -> IfKindVec;
787}
788
789impl<T: Into<IfKind>> IntoIfKindVec for T {
790 fn into_vec(self) -> IfKindVec {
791 let if_kind: IfKind = self.into();
792 IfKindVec {
793 kinds: vec![if_kind],
794 }
795 }
796}
797
798impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
799 fn into_vec(self) -> IfKindVec {
800 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
801 IfKindVec { kinds }
802 }
803}
804
805struct IfSelection {
807 if_kind: IfKind,
809
810 selected: bool,
812}
813
814struct Zeroconf {
816 my_intfs: HashMap<u32, MyIntf>,
818
819 ipv4_sock: Option<MyUdpSocket>,
821
822 ipv6_sock: Option<MyUdpSocket>,
824
825 my_services: HashMap<String, ServiceInfo>,
827
828 cache: DnsCache,
830
831 dns_registry_map: HashMap<u32, DnsRegistry>,
833
834 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
845
846 counters: Metrics,
847
848 poller: Poll,
850
851 monitors: Vec<Sender<DaemonEvent>>,
853
854 service_name_len_max: u8,
856
857 ip_check_interval: u64,
859
860 if_selections: Vec<IfSelection>,
862
863 signal_sock: MioUdpSocket,
865
866 timers: BinaryHeap<Reverse<u64>>,
872
873 status: DaemonStatus,
874
875 pending_resolves: HashSet<String>,
877
878 resolved: HashSet<String>,
880
881 multicast_loop_v4: bool,
882
883 multicast_loop_v6: bool,
884
885 accept_unsolicited: bool,
886
887 #[cfg(test)]
888 test_down_interfaces: HashSet<String>,
889}
890
891fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
893 let intf_ip = &intf.ip();
894 match intf_ip {
895 IpAddr::V4(ip) => {
896 debug!("join multicast group V4 on {} addr {ip}", intf.name);
898 my_sock
899 .join_multicast_v4(&GROUP_ADDR_V4, ip)
900 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
901 }
902 IpAddr::V6(ip) => {
903 let if_index = intf.index.unwrap_or(0);
904 debug!(
906 "join multicast group V6 on {} addr {ip} with index {if_index}",
907 intf.name
908 );
909 my_sock
910 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
911 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
912 }
913 }
914 Ok(())
915}
916
917impl Zeroconf {
918 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
919 let my_ifaddrs = my_ip_interfaces(true);
921
922 let mut my_intfs = HashMap::new();
926 let mut dns_registry_map = HashMap::new();
927
928 let mut ipv4_sock = None;
931 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
932 match new_socket(addr.into(), true) {
933 Ok(sock) => {
934 sock.set_multicast_ttl_v4(255)
939 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
940 .ok();
941
942 ipv4_sock = match MyUdpSocket::new(sock) {
944 Ok(s) => Some(s),
945 Err(e) => {
946 debug!("failed to create IPv4 MyUdpSocket: {e}");
947 None
948 }
949 };
950 }
951 Err(e) => debug!("failed to create IPv4 socket: {e}"),
953 }
954
955 let mut ipv6_sock = None;
956 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
957 match new_socket(addr.into(), true) {
958 Ok(sock) => {
959 sock.set_multicast_hops_v6(255)
963 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
964 .ok();
965
966 ipv6_sock = match MyUdpSocket::new(sock) {
968 Ok(s) => Some(s),
969 Err(e) => {
970 debug!("failed to create IPv6 MyUdpSocket: {e}");
971 None
972 }
973 };
974 }
975 Err(e) => debug!("failed to create IPv6 socket: {e}"),
976 }
977
978 for intf in my_ifaddrs {
980 let sock_opt = if intf.ip().is_ipv4() {
981 &ipv4_sock
982 } else {
983 &ipv6_sock
984 };
985 let Some(sock) = sock_opt else {
986 debug!(
987 "no socket available for interface {} with addr {}. Skipped.",
988 intf.name,
989 intf.ip()
990 );
991 continue;
992 };
993
994 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
995 debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
996 }
997
998 let if_index = intf.index.unwrap_or(0);
999
1000 dns_registry_map
1002 .entry(if_index)
1003 .or_insert_with(DnsRegistry::new);
1004
1005 my_intfs
1006 .entry(if_index)
1007 .and_modify(|v: &mut MyIntf| {
1008 v.addrs.insert(intf.addr.clone());
1009 })
1010 .or_insert(MyIntf {
1011 name: intf.name.clone(),
1012 index: if_index,
1013 addrs: HashSet::from([intf.addr]),
1014 });
1015 }
1016
1017 let monitors = Vec::new();
1018 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1019 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1020
1021 let timers = BinaryHeap::new();
1022
1023 let if_selections = vec![];
1025
1026 let status = DaemonStatus::Running;
1027
1028 Self {
1029 my_intfs,
1030 ipv4_sock,
1031 ipv6_sock,
1032 my_services: HashMap::new(),
1033 cache: DnsCache::new(),
1034 dns_registry_map,
1035 hostname_resolvers: HashMap::new(),
1036 service_queriers: HashMap::new(),
1037 retransmissions: Vec::new(),
1038 counters: HashMap::new(),
1039 poller,
1040 monitors,
1041 service_name_len_max,
1042 ip_check_interval,
1043 if_selections,
1044 signal_sock,
1045 timers,
1046 status,
1047 pending_resolves: HashSet::new(),
1048 resolved: HashSet::new(),
1049 multicast_loop_v4: true,
1050 multicast_loop_v6: true,
1051 accept_unsolicited: false,
1052
1053 #[cfg(test)]
1054 test_down_interfaces: HashSet::new(),
1055 }
1056 }
1057
1058 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1067 if let Err(e) = self.poller.registry().register(
1069 &mut self.signal_sock,
1070 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1071 mio::Interest::READABLE,
1072 ) {
1073 debug!("failed to add signal socket to the poller: {}", e);
1074 return None;
1075 }
1076
1077 if let Some(sock) = self.ipv4_sock.as_mut() {
1078 if let Err(e) = self.poller.registry().register(
1079 sock,
1080 mio::Token(IPV4_SOCK_EVENT_KEY),
1081 mio::Interest::READABLE,
1082 ) {
1083 debug!("failed to register ipv4 socket: {}", e);
1084 return None;
1085 }
1086 }
1087
1088 if let Some(sock) = self.ipv6_sock.as_mut() {
1089 if let Err(e) = self.poller.registry().register(
1090 sock,
1091 mio::Token(IPV6_SOCK_EVENT_KEY),
1092 mio::Interest::READABLE,
1093 ) {
1094 debug!("failed to register ipv6 socket: {}", e);
1095 return None;
1096 }
1097 }
1098
1099 let mut next_ip_check = if self.ip_check_interval > 0 {
1101 current_time_millis() + self.ip_check_interval
1102 } else {
1103 0
1104 };
1105
1106 if next_ip_check > 0 {
1107 self.add_timer(next_ip_check);
1108 }
1109
1110 let mut events = mio::Events::with_capacity(1024);
1113 loop {
1114 let now = current_time_millis();
1115
1116 let earliest_timer = self.peek_earliest_timer();
1117 let timeout = earliest_timer.map(|timer| {
1118 let millis = if timer > now { timer - now } else { 1 };
1120 Duration::from_millis(millis)
1121 });
1122
1123 events.clear();
1125 match self.poller.poll(&mut events, timeout) {
1126 Ok(_) => self.handle_poller_events(&events),
1127 Err(e) => debug!("failed to select from sockets: {}", e),
1128 }
1129
1130 let now = current_time_millis();
1131
1132 self.pop_timers_till(now);
1134
1135 for hostname in self
1137 .hostname_resolvers
1138 .clone()
1139 .into_iter()
1140 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1141 .map(|(hostname, _)| hostname)
1142 {
1143 trace!("hostname resolver timeout for {}", &hostname);
1144 call_hostname_resolution_listener(
1145 &self.hostname_resolvers,
1146 &hostname,
1147 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1148 );
1149 call_hostname_resolution_listener(
1150 &self.hostname_resolvers,
1151 &hostname,
1152 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1153 );
1154 self.hostname_resolvers.remove(&hostname);
1155 }
1156
1157 while let Ok(command) = receiver.try_recv() {
1159 if matches!(command, Command::Exit(_)) {
1160 self.status = DaemonStatus::Shutdown;
1161 return Some(command);
1162 }
1163 self.exec_command(command, false);
1164 }
1165
1166 let mut i = 0;
1168 while i < self.retransmissions.len() {
1169 if now >= self.retransmissions[i].next_time {
1170 let rerun = self.retransmissions.remove(i);
1171 self.exec_command(rerun.command, true);
1172 } else {
1173 i += 1;
1174 }
1175 }
1176
1177 self.refresh_active_services();
1179
1180 let mut query_count = 0;
1182 for (hostname, _sender) in self.hostname_resolvers.iter() {
1183 for (hostname, ip_addr) in
1184 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1185 {
1186 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1187 query_count += 1;
1188 }
1189 }
1190
1191 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1192
1193 let now = current_time_millis();
1195
1196 let expired_services = self.cache.evict_expired_services(now);
1198 if !expired_services.is_empty() {
1199 debug!(
1200 "run: send {} service removal to listeners",
1201 expired_services.len()
1202 );
1203 self.notify_service_removal(expired_services);
1204 }
1205
1206 let expired_addrs = self.cache.evict_expired_addr(now);
1208 for (hostname, addrs) in expired_addrs {
1209 call_hostname_resolution_listener(
1210 &self.hostname_resolvers,
1211 &hostname,
1212 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1213 );
1214 let instances = self.cache.get_instances_on_host(&hostname);
1215 let instance_set: HashSet<String> = instances.into_iter().collect();
1216 self.resolve_updated_instances(&instance_set);
1217 }
1218
1219 self.probing_handler();
1221
1222 if now >= next_ip_check && next_ip_check > 0 {
1224 next_ip_check = now + self.ip_check_interval;
1225 self.add_timer(next_ip_check);
1226
1227 self.check_ip_changes();
1228 }
1229 }
1230 }
1231
1232 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1233 match daemon_opt {
1234 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1235 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1236 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1237 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1238 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1239 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1240 DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1241 #[cfg(test)]
1242 DaemonOption::TestDownInterface(ifname) => {
1243 self.test_down_interfaces.insert(ifname);
1244 }
1245 #[cfg(test)]
1246 DaemonOption::TestUpInterface(ifname) => {
1247 self.test_down_interfaces.remove(&ifname);
1248 }
1249 }
1250 }
1251
1252 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1253 debug!("enable_interface: {:?}", kinds);
1254 for if_kind in kinds {
1255 self.if_selections.push(IfSelection {
1256 if_kind,
1257 selected: true,
1258 });
1259 }
1260
1261 self.apply_intf_selections(my_ip_interfaces(true));
1262 }
1263
1264 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1265 debug!("disable_interface: {:?}", kinds);
1266 for if_kind in kinds {
1267 self.if_selections.push(IfSelection {
1268 if_kind,
1269 selected: false,
1270 });
1271 }
1272
1273 self.apply_intf_selections(my_ip_interfaces(true));
1274 }
1275
1276 fn set_multicast_loop_v4(&mut self, on: bool) {
1277 let Some(sock) = self.ipv4_sock.as_mut() else {
1278 return;
1279 };
1280 self.multicast_loop_v4 = on;
1281 sock.pktinfo
1282 .set_multicast_loop_v4(on)
1283 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1284 .unwrap();
1285 }
1286
1287 fn set_multicast_loop_v6(&mut self, on: bool) {
1288 let Some(sock) = self.ipv6_sock.as_mut() else {
1289 return;
1290 };
1291 self.multicast_loop_v6 = on;
1292 sock.pktinfo
1293 .set_multicast_loop_v6(on)
1294 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1295 .unwrap();
1296 }
1297
1298 fn set_accept_unsolicited(&mut self, accept: bool) {
1299 self.accept_unsolicited = accept;
1300 }
1301
1302 fn notify_monitors(&mut self, event: DaemonEvent) {
1303 self.monitors.retain(|sender| {
1305 if let Err(e) = sender.try_send(event.clone()) {
1306 debug!("notify_monitors: try_send: {}", &e);
1307 if matches!(e, TrySendError::Disconnected(_)) {
1308 return false; }
1310 }
1311 true
1312 });
1313 }
1314
1315 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1317 for (_, service_info) in self.my_services.iter_mut() {
1318 if service_info.is_addr_auto() {
1319 service_info.remove_ipaddr(addr);
1320 }
1321 }
1322 }
1323
1324 fn add_timer(&mut self, next_time: u64) {
1325 self.timers.push(Reverse(next_time));
1326 }
1327
1328 fn peek_earliest_timer(&self) -> Option<u64> {
1329 self.timers.peek().map(|Reverse(v)| *v)
1330 }
1331
1332 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1333 self.timers.pop().map(|Reverse(v)| v)
1334 }
1335
1336 fn pop_timers_till(&mut self, now: u64) {
1338 while let Some(Reverse(v)) = self.timers.peek() {
1339 if *v > now {
1340 break;
1341 }
1342 self.timers.pop();
1343 }
1344 }
1345
1346 fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1348 let intf_count = interfaces.len();
1349 let mut intf_selections = vec![true; intf_count];
1350
1351 for selection in self.if_selections.iter() {
1353 for i in 0..intf_count {
1355 if selection.if_kind.matches(&interfaces[i]) {
1356 intf_selections[i] = selection.selected;
1357 }
1358 }
1359 }
1360
1361 let mut selected_addrs = HashSet::new();
1362 for i in 0..intf_count {
1363 if intf_selections[i] {
1364 selected_addrs.insert(interfaces[i].clone());
1365 }
1366 }
1367
1368 selected_addrs
1369 }
1370
1371 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1376 let intf_count = interfaces.len();
1378 let mut intf_selections = vec![true; intf_count];
1379
1380 for selection in self.if_selections.iter() {
1382 for i in 0..intf_count {
1384 if selection.if_kind.matches(&interfaces[i]) {
1385 intf_selections[i] = selection.selected;
1386 }
1387 }
1388 }
1389
1390 for (idx, intf) in interfaces.into_iter().enumerate() {
1392 if intf_selections[idx] {
1393 self.add_interface(intf);
1395 } else {
1396 self.del_interface(&intf);
1398 }
1399 }
1400 }
1401
1402 fn del_ip(&mut self, ip: IpAddr) {
1403 self.del_addr_in_my_services(&ip);
1404 self.notify_monitors(DaemonEvent::IpDel(ip));
1405 }
1406
1407 fn check_ip_changes(&mut self) {
1409 let my_ifaddrs = my_ip_interfaces(true);
1411
1412 #[cfg(test)]
1413 let my_ifaddrs: Vec<_> = my_ifaddrs
1414 .into_iter()
1415 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1416 .collect();
1417
1418 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1419 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1420 let if_index = intf.index.unwrap_or(0);
1421 acc.entry(if_index).or_default().push(&intf.addr);
1422 acc
1423 });
1424
1425 let mut deleted_intfs = Vec::new();
1426 let mut deleted_ips = Vec::new();
1427
1428 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1429 let mut last_ipv4 = None;
1430 let mut last_ipv6 = None;
1431
1432 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1433 my_intf.addrs.retain(|addr| {
1434 if current_addrs.contains(&addr) {
1435 true
1436 } else {
1437 match addr.ip() {
1438 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1439 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1440 }
1441 deleted_ips.push(addr.ip());
1442 false
1443 }
1444 });
1445 if my_intf.addrs.is_empty() {
1446 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1447 }
1448 } else {
1449 debug!(
1451 "check_ip_changes: interface {} ({}) no longer exists, removing",
1452 my_intf.name, if_index
1453 );
1454 for addr in my_intf.addrs.iter() {
1455 match addr.ip() {
1456 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1457 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1458 }
1459 deleted_ips.push(addr.ip())
1460 }
1461 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1462 }
1463 }
1464
1465 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1466 debug!(
1467 "check_ip_changes: {} deleted ips {} deleted intfs",
1468 deleted_ips.len(),
1469 deleted_intfs.len()
1470 );
1471 }
1472
1473 for ip in deleted_ips {
1474 self.del_ip(ip);
1475 }
1476
1477 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1478 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1479 continue;
1480 };
1481
1482 if let Some(ipv4) = last_ipv4 {
1483 debug!("leave multicast for {ipv4}");
1484 if let Some(sock) = self.ipv4_sock.as_mut() {
1485 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1486 debug!("leave multicast group for addr {ipv4}: {e}");
1487 }
1488 }
1489 }
1490
1491 if let Some(ipv6) = last_ipv6 {
1492 debug!("leave multicast for {ipv6}");
1493 if let Some(sock) = self.ipv6_sock.as_mut() {
1494 if let Err(e) = sock
1495 .pktinfo
1496 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1497 {
1498 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1499 }
1500 }
1501 }
1502
1503 let intf_id = InterfaceId {
1505 name: my_intf.name.to_string(),
1506 index: my_intf.index,
1507 };
1508 let removed_instances = self.cache.remove_records_on_intf(intf_id);
1509 self.notify_service_removal(removed_instances);
1510 }
1511
1512 self.apply_intf_selections(my_ifaddrs);
1514 }
1515
1516 fn del_interface(&mut self, intf: &Interface) {
1517 let if_index = intf.index.unwrap_or(0);
1518 trace!(
1519 "del_interface: {} ({if_index}) addr {}",
1520 intf.name,
1521 intf.ip()
1522 );
1523
1524 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1525 debug!("del_interface: interface {} not found", intf.name);
1526 return;
1527 };
1528
1529 let mut ip_removed = false;
1530
1531 if my_intf.addrs.remove(&intf.addr) {
1532 ip_removed = true;
1533
1534 match intf.addr.ip() {
1535 IpAddr::V4(ipv4) => {
1536 if my_intf.next_ifaddr_v4().is_none() {
1537 if let Some(sock) = self.ipv4_sock.as_mut() {
1538 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1539 debug!("leave multicast group for addr {ipv4}: {e}");
1540 }
1541 }
1542 }
1543 }
1544
1545 IpAddr::V6(ipv6) => {
1546 if my_intf.next_ifaddr_v6().is_none() {
1547 if let Some(sock) = self.ipv6_sock.as_mut() {
1548 if let Err(e) =
1549 sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1550 {
1551 debug!("leave multicast group for addr {ipv6}: {e}");
1552 }
1553 }
1554 }
1555 }
1556 }
1557
1558 if my_intf.addrs.is_empty() {
1559 debug!("del_interface: removing interface {}", intf.name);
1561 self.my_intfs.remove(&if_index);
1562 self.dns_registry_map.remove(&if_index);
1563 self.cache.remove_addrs_on_disabled_intf(if_index);
1564 }
1565 }
1566
1567 if ip_removed {
1568 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1570 self.del_addr_in_my_services(&intf.ip());
1572 }
1573 }
1574
1575 fn add_interface(&mut self, intf: Interface) {
1576 let sock_opt = if intf.ip().is_ipv4() {
1577 &self.ipv4_sock
1578 } else {
1579 &self.ipv6_sock
1580 };
1581
1582 let Some(sock) = sock_opt else {
1583 debug!(
1584 "add_interface: no socket available for interface {} with addr {}. Skipped.",
1585 intf.name,
1586 intf.ip()
1587 );
1588 return;
1589 };
1590
1591 let if_index = intf.index.unwrap_or(0);
1592 let mut new_addr = false;
1593
1594 match self.my_intfs.entry(if_index) {
1595 Entry::Occupied(mut entry) => {
1596 let my_intf = entry.get_mut();
1598 if !my_intf.addrs.contains(&intf.addr) {
1599 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1600 debug!("add_interface: socket_config {}: {e}", &intf.name);
1601 }
1602 my_intf.addrs.insert(intf.addr.clone());
1603 new_addr = true;
1604 }
1605 }
1606 Entry::Vacant(entry) => {
1607 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1608 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1609 return;
1610 }
1611
1612 new_addr = true;
1613 let new_intf = MyIntf {
1614 name: intf.name.clone(),
1615 index: if_index,
1616 addrs: HashSet::from([intf.addr.clone()]),
1617 };
1618 entry.insert(new_intf);
1619 }
1620 }
1621
1622 if !new_addr {
1623 trace!("add_interface: interface {} already exists", &intf.name);
1624 return;
1625 }
1626
1627 debug!("add new interface {}: {}", intf.name, intf.ip());
1628
1629 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1630 debug!("add_interface: cannot find if_index {if_index}");
1631 return;
1632 };
1633
1634 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1635 Some(registry) => registry,
1636 None => self
1637 .dns_registry_map
1638 .entry(if_index)
1639 .or_insert_with(DnsRegistry::new),
1640 };
1641
1642 for (_, service_info) in self.my_services.iter_mut() {
1643 if service_info.is_addr_auto() {
1644 service_info.insert_ipaddr(&intf);
1645
1646 if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1647 debug!(
1648 "Announce service {} on {}",
1649 service_info.get_fullname(),
1650 intf.ip()
1651 );
1652 service_info.set_status(if_index, ServiceStatus::Announced);
1653 } else {
1654 for timer in dns_registry.new_timers.drain(..) {
1655 self.timers.push(Reverse(timer));
1656 }
1657 service_info.set_status(if_index, ServiceStatus::Probing);
1658 }
1659 }
1660 }
1661
1662 let mut browse_reruns = Vec::new();
1664 let mut i = 0;
1665 while i < self.retransmissions.len() {
1666 if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1667 browse_reruns.push(self.retransmissions.remove(i));
1668 } else {
1669 i += 1;
1670 }
1671 }
1672
1673 for rerun in browse_reruns {
1674 self.exec_command(rerun.command, true);
1675 }
1676
1677 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1679 }
1680
1681 fn register_service(&mut self, mut info: ServiceInfo) {
1690 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1692 debug!("check_service_name_length: {}", &e);
1693 self.notify_monitors(DaemonEvent::Error(e));
1694 return;
1695 }
1696
1697 if info.is_addr_auto() {
1698 let selected_intfs = self.selected_intfs(my_ip_interfaces(true));
1699 for intf in selected_intfs {
1700 info.insert_ipaddr(&intf);
1701 }
1702 }
1703
1704 debug!("register service {:?}", &info);
1705
1706 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1707 if !outgoing_addrs.is_empty() {
1708 self.notify_monitors(DaemonEvent::Announce(
1709 info.get_fullname().to_string(),
1710 format!("{:?}", &outgoing_addrs),
1711 ));
1712 }
1713
1714 let service_fullname = info.get_fullname().to_lowercase();
1717 self.my_services.insert(service_fullname, info);
1718 }
1719
1720 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1723 let mut outgoing_addrs = Vec::new();
1724 let mut outgoing_intfs = HashSet::new();
1725
1726 for (if_index, intf) in self.my_intfs.iter() {
1727 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1728 Some(registry) => registry,
1729 None => self
1730 .dns_registry_map
1731 .entry(*if_index)
1732 .or_insert_with(DnsRegistry::new),
1733 };
1734
1735 let mut announced = false;
1736
1737 if let Some(sock) = self.ipv4_sock.as_mut() {
1739 if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo) {
1740 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1741 outgoing_addrs.push(addr.ip());
1742 }
1743 outgoing_intfs.insert(intf.index);
1744
1745 debug!(
1746 "Announce service IPv4 {} on {}",
1747 info.get_fullname(),
1748 intf.name
1749 );
1750 announced = true;
1751 }
1752 }
1753
1754 if let Some(sock) = self.ipv6_sock.as_mut() {
1755 if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo) {
1756 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1757 outgoing_addrs.push(addr.ip());
1758 }
1759 outgoing_intfs.insert(intf.index);
1760
1761 debug!(
1762 "Announce service IPv6 {} on {}",
1763 info.get_fullname(),
1764 intf.name
1765 );
1766 announced = true;
1767 }
1768 }
1769
1770 if announced {
1771 info.set_status(intf.index, ServiceStatus::Announced);
1772 } else {
1773 for timer in dns_registry.new_timers.drain(..) {
1774 self.timers.push(Reverse(timer));
1775 }
1776 info.set_status(*if_index, ServiceStatus::Probing);
1777 }
1778 }
1779
1780 let next_time = current_time_millis() + 1000;
1784 for if_index in outgoing_intfs {
1785 self.add_retransmission(
1786 next_time,
1787 Command::RegisterResend(info.get_fullname().to_string(), if_index),
1788 );
1789 }
1790
1791 outgoing_addrs
1792 }
1793
1794 fn probing_handler(&mut self) {
1796 let now = current_time_millis();
1797
1798 for (if_index, intf) in self.my_intfs.iter() {
1799 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1800 continue;
1801 };
1802
1803 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1804
1805 if !out.questions().is_empty() {
1807 trace!("sending out probing of questions: {:?}", out.questions());
1808 if let Some(sock) = self.ipv4_sock.as_mut() {
1809 send_dns_outgoing(&out, intf, &sock.pktinfo);
1810 }
1811 if let Some(sock) = self.ipv6_sock.as_mut() {
1812 send_dns_outgoing(&out, intf, &sock.pktinfo);
1813 }
1814 }
1815
1816 let waiting_services =
1818 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1819
1820 for service_name in waiting_services {
1821 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1823 if info.get_status(*if_index) == ServiceStatus::Announced {
1824 debug!("service {} already announced", info.get_fullname());
1825 continue;
1826 }
1827
1828 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
1829 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
1830 } else {
1831 false
1832 };
1833 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
1834 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
1835 } else {
1836 false
1837 };
1838
1839 if announced_v4 || announced_v6 {
1840 let next_time = now + 1000;
1841 let command =
1842 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1843 self.retransmissions.push(ReRun { next_time, command });
1844 self.timers.push(Reverse(next_time));
1845
1846 let fullname = match dns_registry.name_changes.get(&service_name) {
1847 Some(new_name) => new_name.to_string(),
1848 None => service_name.to_string(),
1849 };
1850
1851 let mut hostname = info.get_hostname();
1852 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1853 hostname = new_name;
1854 }
1855
1856 debug!("wake up: announce service {} on {}", fullname, intf.name);
1857 notify_monitors(
1858 &mut self.monitors,
1859 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1860 );
1861
1862 info.set_status(*if_index, ServiceStatus::Announced);
1863 }
1864 }
1865 }
1866 }
1867 }
1868
1869 fn unregister_service(
1870 &self,
1871 info: &ServiceInfo,
1872 intf: &MyIntf,
1873 sock: &PktInfoUdpSocket,
1874 ) -> Vec<u8> {
1875 let is_ipv4 = sock.domain() == Domain::IPV4;
1876
1877 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1878 out.add_answer_at_time(
1879 DnsPointer::new(
1880 info.get_type(),
1881 RRType::PTR,
1882 CLASS_IN,
1883 0,
1884 info.get_fullname().to_string(),
1885 ),
1886 0,
1887 );
1888
1889 if let Some(sub) = info.get_subtype() {
1890 trace!("Adding subdomain {}", sub);
1891 out.add_answer_at_time(
1892 DnsPointer::new(
1893 sub,
1894 RRType::PTR,
1895 CLASS_IN,
1896 0,
1897 info.get_fullname().to_string(),
1898 ),
1899 0,
1900 );
1901 }
1902
1903 out.add_answer_at_time(
1904 DnsSrv::new(
1905 info.get_fullname(),
1906 CLASS_IN | CLASS_CACHE_FLUSH,
1907 0,
1908 info.get_priority(),
1909 info.get_weight(),
1910 info.get_port(),
1911 info.get_hostname().to_string(),
1912 ),
1913 0,
1914 );
1915 out.add_answer_at_time(
1916 DnsTxt::new(
1917 info.get_fullname(),
1918 CLASS_IN | CLASS_CACHE_FLUSH,
1919 0,
1920 info.generate_txt(),
1921 ),
1922 0,
1923 );
1924
1925 let if_addrs = if is_ipv4 {
1926 info.get_addrs_on_my_intf_v4(intf)
1927 } else {
1928 info.get_addrs_on_my_intf_v6(intf)
1929 };
1930
1931 if if_addrs.is_empty() {
1932 return vec![];
1933 }
1934
1935 for address in if_addrs {
1936 out.add_answer_at_time(
1937 DnsAddress::new(
1938 info.get_hostname(),
1939 ip_address_rr_type(&address),
1940 CLASS_IN | CLASS_CACHE_FLUSH,
1941 0,
1942 address,
1943 intf.into(),
1944 ),
1945 0,
1946 );
1947 }
1948
1949 send_dns_outgoing(&out, intf, sock).remove(0)
1951 }
1952
1953 fn add_hostname_resolver(
1957 &mut self,
1958 hostname: String,
1959 listener: Sender<HostnameResolutionEvent>,
1960 timeout: Option<u64>,
1961 ) {
1962 let real_timeout = timeout.map(|t| current_time_millis() + t);
1963 self.hostname_resolvers
1964 .insert(hostname.to_lowercase(), (listener, real_timeout));
1965 if let Some(t) = real_timeout {
1966 self.add_timer(t);
1967 }
1968 }
1969
1970 fn send_query(&self, name: &str, qtype: RRType) {
1972 self.send_query_vec(&[(name, qtype)]);
1973 }
1974
1975 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1977 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1978 let now = current_time_millis();
1979
1980 for (name, qtype) in questions {
1981 out.add_question(name, *qtype);
1982
1983 for record in self.cache.get_known_answers(name, *qtype, now) {
1984 trace!("add known answer: {:?}", record.record);
1992 let mut new_record = record.record.clone();
1993 new_record.get_record_mut().update_ttl(now);
1994 out.add_answer_box(new_record);
1995 }
1996 }
1997
1998 for (_, intf) in self.my_intfs.iter() {
1999 if let Some(sock) = self.ipv4_sock.as_ref() {
2000 send_dns_outgoing(&out, intf, &sock.pktinfo);
2001 }
2002 if let Some(sock) = self.ipv6_sock.as_ref() {
2003 send_dns_outgoing(&out, intf, &sock.pktinfo);
2004 }
2005 }
2006 }
2007
2008 fn handle_read(&mut self, event_key: usize) -> bool {
2013 let sock_opt = match event_key {
2014 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2015 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2016 _ => {
2017 debug!("handle_read: unknown token {}", event_key);
2018 return false;
2019 }
2020 };
2021 let Some(sock) = sock_opt.as_mut() else {
2022 debug!("handle_read: socket not available for token {}", event_key);
2023 return false;
2024 };
2025 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2026
2027 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2034 Ok(sz) => sz,
2035 Err(e) => {
2036 if e.kind() != std::io::ErrorKind::WouldBlock {
2037 debug!("listening socket read failed: {}", e);
2038 }
2039 return false;
2040 }
2041 };
2042
2043 let pkt_if_index = pktinfo.if_index as u32;
2045 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2046 debug!(
2047 "handle_read: no interface found for pktinfo if_index: {}",
2048 pktinfo.if_index
2049 );
2050 return true; };
2052
2053 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
2056 Ok(msg) => {
2057 if msg.is_query() {
2058 self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2059 } else if msg.is_response() {
2060 self.handle_response(msg, pkt_if_index);
2061 } else {
2062 debug!("Invalid message: not query and not response");
2063 }
2064 }
2065 Err(e) => debug!("Invalid incoming DNS message: {}", e),
2066 }
2067
2068 true
2069 }
2070
2071 fn query_unresolved(&mut self, instance: &str) -> bool {
2073 if !valid_instance_name(instance) {
2074 trace!("instance name {} not valid", instance);
2075 return false;
2076 }
2077
2078 if let Some(records) = self.cache.get_srv(instance) {
2079 for record in records {
2080 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2081 if self.cache.get_addr(srv.host()).is_none() {
2082 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2083 return true;
2084 }
2085 }
2086 }
2087 } else {
2088 self.send_query(instance, RRType::ANY);
2089 return true;
2090 }
2091
2092 false
2093 }
2094
2095 fn query_cache_for_service(
2098 &mut self,
2099 ty_domain: &str,
2100 sender: &Sender<ServiceEvent>,
2101 now: u64,
2102 ) {
2103 let mut resolved: HashSet<String> = HashSet::new();
2104 let mut unresolved: HashSet<String> = HashSet::new();
2105
2106 if let Some(records) = self.cache.get_ptr(ty_domain) {
2107 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2108 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2109 let mut new_event = None;
2110 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2111 Ok(resolved_service) => {
2112 if resolved_service.is_valid() {
2113 debug!("Resolved service from cache: {}", ptr.alias());
2114 new_event =
2115 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2116 } else {
2117 debug!("Resolved service is not valid: {}", ptr.alias());
2118 }
2119 }
2120 Err(err) => {
2121 debug!("Error while resolving service from cache: {}", err);
2122 continue;
2123 }
2124 }
2125
2126 match sender.send(ServiceEvent::ServiceFound(
2127 ty_domain.to_string(),
2128 ptr.alias().to_string(),
2129 )) {
2130 Ok(()) => debug!("sent service found {}", ptr.alias()),
2131 Err(e) => {
2132 debug!("failed to send service found: {}", e);
2133 continue;
2134 }
2135 }
2136
2137 if let Some(event) = new_event {
2138 resolved.insert(ptr.alias().to_string());
2139 match sender.send(event) {
2140 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2141 Err(e) => debug!("failed to send service resolved: {}", e),
2142 }
2143 } else {
2144 unresolved.insert(ptr.alias().to_string());
2145 }
2146 }
2147 }
2148 }
2149
2150 for instance in resolved.drain() {
2151 self.pending_resolves.remove(&instance);
2152 self.resolved.insert(instance);
2153 }
2154
2155 for instance in unresolved.drain() {
2156 self.add_pending_resolve(instance);
2157 }
2158 }
2159
2160 fn query_cache_for_hostname(
2163 &mut self,
2164 hostname: &str,
2165 sender: Sender<HostnameResolutionEvent>,
2166 ) {
2167 let addresses_map = self.cache.get_addresses_for_host(hostname);
2168 for (name, addresses) in addresses_map {
2169 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2170 Ok(()) => trace!("sent hostname addresses found"),
2171 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2172 }
2173 }
2174 }
2175
2176 fn add_pending_resolve(&mut self, instance: String) {
2177 if !self.pending_resolves.contains(&instance) {
2178 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2179 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2180 self.pending_resolves.insert(instance);
2181 }
2182 }
2183
2184 fn resolve_service_from_cache(
2186 &self,
2187 ty_domain: &str,
2188 fullname: &str,
2189 ) -> Result<ResolvedService> {
2190 let now = current_time_millis();
2191 let mut resolved_service = ResolvedService {
2192 ty_domain: ty_domain.to_string(),
2193 sub_ty_domain: None,
2194 fullname: fullname.to_string(),
2195 host: String::new(),
2196 port: 0,
2197 addresses: HashSet::new(),
2198 txt_properties: TxtProperties::new(),
2199 };
2200
2201 if let Some(subtype) = self.cache.get_subtype(fullname) {
2203 trace!(
2204 "ty_domain: {} found subtype {} for instance: {}",
2205 ty_domain,
2206 subtype,
2207 fullname
2208 );
2209 if resolved_service.sub_ty_domain.is_none() {
2210 resolved_service.sub_ty_domain = Some(subtype.to_string());
2211 }
2212 }
2213
2214 if let Some(records) = self.cache.get_srv(fullname) {
2216 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2217 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2218 resolved_service.host = dns_srv.host().to_string();
2219 resolved_service.port = dns_srv.port();
2220 }
2221 }
2222 }
2223
2224 if let Some(records) = self.cache.get_txt(fullname) {
2226 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2227 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2228 resolved_service.txt_properties = dns_txt.text().into();
2229 }
2230 }
2231 }
2232
2233 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2235 for answer in records.iter() {
2236 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2237 if dns_a.expires_soon(now) {
2238 trace!(
2239 "Addr expired or expires soon: {}",
2240 dns_a.address().to_ip_addr()
2241 );
2242 } else {
2243 resolved_service.addresses.insert(dns_a.address());
2244 }
2245 }
2246 }
2247 }
2248
2249 Ok(resolved_service)
2250 }
2251
2252 fn handle_poller_events(&mut self, events: &mio::Events) {
2253 for ev in events.iter() {
2254 trace!("event received with key {:?}", ev.token());
2255 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2256 self.signal_sock_drain();
2258
2259 if let Err(e) = self.poller.registry().reregister(
2260 &mut self.signal_sock,
2261 ev.token(),
2262 mio::Interest::READABLE,
2263 ) {
2264 debug!("failed to modify poller for signal socket: {}", e);
2265 }
2266 continue; }
2268
2269 while self.handle_read(ev.token().0) {}
2271
2272 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2274 if let Some(sock) = self.ipv4_sock.as_mut() {
2276 if let Err(e) =
2277 self.poller
2278 .registry()
2279 .reregister(sock, ev.token(), mio::Interest::READABLE)
2280 {
2281 debug!("modify poller for IPv4 socket: {}", e);
2282 }
2283 }
2284 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2285 if let Some(sock) = self.ipv6_sock.as_mut() {
2287 if let Err(e) =
2288 self.poller
2289 .registry()
2290 .reregister(sock, ev.token(), mio::Interest::READABLE)
2291 {
2292 debug!("modify poller for IPv6 socket: {}", e);
2293 }
2294 }
2295 }
2296 }
2297 }
2298
2299 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2302 let now = current_time_millis();
2303
2304 let mut record_predicate = |record: &DnsRecordBox| {
2306 if !record.get_record().is_expired(now) {
2307 return true;
2308 }
2309
2310 debug!("record is expired, removing it from cache.");
2311 if self.cache.remove(record) {
2312 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2314 call_service_listener(
2315 &self.service_queriers,
2316 dns_ptr.get_name(),
2317 ServiceEvent::ServiceRemoved(
2318 dns_ptr.get_name().to_string(),
2319 dns_ptr.alias().to_string(),
2320 ),
2321 );
2322 }
2323 }
2324 false
2325 };
2326 msg.answers_mut().retain(&mut record_predicate);
2327 msg.authorities_mut().retain(&mut record_predicate);
2328 msg.additionals_mut().retain(&mut record_predicate);
2329
2330 self.conflict_handler(&msg, if_index);
2332
2333 let mut is_for_us = true; for answer in msg.answers() {
2340 if answer.get_type() == RRType::PTR {
2341 if self.service_queriers.contains_key(answer.get_name()) {
2342 is_for_us = true;
2343 break; } else {
2345 is_for_us = false;
2346 }
2347 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2348 let answer_lowercase = answer.get_name().to_lowercase();
2350 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2351 is_for_us = true;
2352 break; }
2354 }
2355 }
2356
2357 if self.accept_unsolicited {
2359 is_for_us = true;
2360 }
2361
2362 struct InstanceChange {
2364 ty: RRType, name: String, }
2367
2368 let mut changes = Vec::new();
2376 let mut timers = Vec::new();
2377 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2378 return;
2379 };
2380 for record in msg.all_records() {
2381 match self
2382 .cache
2383 .add_or_update(my_intf, record, &mut timers, is_for_us)
2384 {
2385 Some((dns_record, true)) => {
2386 timers.push(dns_record.record.get_record().get_expire_time());
2387 timers.push(dns_record.record.get_record().get_refresh_time());
2388
2389 let ty = dns_record.record.get_type();
2390 let name = dns_record.record.get_name();
2391
2392 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2394 if self.service_queriers.contains_key(name) {
2395 timers.push(dns_record.record.get_record().get_refresh_time());
2396 }
2397
2398 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2400 {
2401 debug!("calling listener with service found: {name}");
2402 call_service_listener(
2403 &self.service_queriers,
2404 name,
2405 ServiceEvent::ServiceFound(
2406 name.to_string(),
2407 dns_ptr.alias().to_string(),
2408 ),
2409 );
2410 changes.push(InstanceChange {
2411 ty,
2412 name: dns_ptr.alias().to_string(),
2413 });
2414 }
2415 } else {
2416 changes.push(InstanceChange {
2417 ty,
2418 name: name.to_string(),
2419 });
2420 }
2421 }
2422 Some((dns_record, false)) => {
2423 timers.push(dns_record.record.get_record().get_expire_time());
2424 timers.push(dns_record.record.get_record().get_refresh_time());
2425 }
2426 _ => {}
2427 }
2428 }
2429
2430 for t in timers {
2432 self.add_timer(t);
2433 }
2434
2435 for change in changes
2437 .iter()
2438 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2439 {
2440 let addr_map = self.cache.get_addresses_for_host(&change.name);
2441 for (name, addresses) in addr_map {
2442 call_hostname_resolution_listener(
2443 &self.hostname_resolvers,
2444 &change.name,
2445 HostnameResolutionEvent::AddressesFound(name, addresses),
2446 )
2447 }
2448 }
2449
2450 let mut updated_instances = HashSet::new();
2452 for update in changes {
2453 match update.ty {
2454 RRType::PTR | RRType::SRV | RRType::TXT => {
2455 updated_instances.insert(update.name);
2456 }
2457 RRType::A | RRType::AAAA => {
2458 let instances = self.cache.get_instances_on_host(&update.name);
2459 updated_instances.extend(instances);
2460 }
2461 _ => {}
2462 }
2463 }
2464
2465 self.resolve_updated_instances(&updated_instances);
2466 }
2467
2468 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2469 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2470 debug!("handle_response: no intf found for index {if_index}");
2471 return;
2472 };
2473
2474 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2475 return;
2476 };
2477
2478 for answer in msg.answers().iter() {
2479 let mut new_records = Vec::new();
2480
2481 let name = answer.get_name();
2482 let Some(probe) = dns_registry.probing.get_mut(name) else {
2483 continue;
2484 };
2485
2486 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2488 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2489 if answer_addr.interface_id.index != if_index {
2490 debug!(
2491 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2492 answer_addr, my_intf.name
2493 );
2494 continue;
2495 }
2496 }
2497
2498 let any_match = probe.records.iter().any(|r| {
2501 r.get_type() == answer.get_type()
2502 && r.get_class() == answer.get_class()
2503 && r.rrdata_match(answer.as_ref())
2504 });
2505 if any_match {
2506 continue; }
2508 }
2509
2510 probe.records.retain(|record| {
2511 if record.get_type() == answer.get_type()
2512 && record.get_class() == answer.get_class()
2513 && !record.rrdata_match(answer.as_ref())
2514 {
2515 debug!(
2516 "found conflict name: '{name}' record: {}: {} PEER: {}",
2517 record.get_type(),
2518 record.rdata_print(),
2519 answer.rdata_print()
2520 );
2521
2522 let mut new_record = record.clone();
2525 let new_name = match record.get_type() {
2526 RRType::A => hostname_change(name),
2527 RRType::AAAA => hostname_change(name),
2528 _ => name_change(name),
2529 };
2530 new_record.get_record_mut().set_new_name(new_name);
2531 new_records.push(new_record);
2532 return false; }
2534
2535 true
2536 });
2537
2538 let create_time = current_time_millis() + fastrand::u64(0..250);
2545
2546 let waiting_services = probe.waiting_services.clone();
2547
2548 for record in new_records {
2549 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2550 self.timers.push(Reverse(create_time));
2551 }
2552
2553 dns_registry.name_changes.insert(
2555 record.get_record().get_original_name().to_string(),
2556 record.get_name().to_string(),
2557 );
2558
2559 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2560 Some(p) => p,
2561 None => {
2562 let new_probe = dns_registry
2563 .probing
2564 .entry(record.get_name().to_string())
2565 .or_insert_with(|| {
2566 debug!("conflict handler: new probe of {}", record.get_name());
2567 Probe::new(create_time)
2568 });
2569 self.timers.push(Reverse(new_probe.next_send));
2570 new_probe
2571 }
2572 };
2573
2574 debug!(
2575 "insert record with new name '{}' {} into probe",
2576 record.get_name(),
2577 record.get_type()
2578 );
2579 new_probe.insert_record(record);
2580
2581 new_probe.waiting_services.extend(waiting_services.clone());
2582 }
2583 }
2584 }
2585
2586 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2593 let mut resolved: HashSet<String> = HashSet::new();
2594 let mut unresolved: HashSet<String> = HashSet::new();
2595 let mut removed_instances = HashMap::new();
2596
2597 let now = current_time_millis();
2598
2599 for (ty_domain, records) in self.cache.all_ptr().iter() {
2600 if !self.service_queriers.contains_key(ty_domain) {
2601 continue;
2603 }
2604
2605 for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2606 let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2607 continue;
2608 };
2609
2610 let instance = dns_ptr.alias();
2611 if !updated_instances.contains(instance) {
2612 continue;
2613 }
2614
2615 let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2616 else {
2617 continue;
2618 };
2619
2620 debug!("resolve_updated_instances: from cache: {instance}");
2621 if resolved_service.is_valid() {
2622 debug!("call queriers to resolve {instance}");
2623 resolved.insert(instance.to_string());
2624 let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2625 call_service_listener(&self.service_queriers, ty_domain, event);
2626 } else {
2627 debug!("Resolved service is not valid: {instance}");
2628 if self.resolved.remove(dns_ptr.alias()) {
2629 removed_instances
2630 .entry(ty_domain.to_string())
2631 .or_insert_with(HashSet::new)
2632 .insert(instance.to_string());
2633 }
2634 unresolved.insert(instance.to_string());
2635 }
2636 }
2637 }
2638
2639 for instance in resolved.drain() {
2640 self.pending_resolves.remove(&instance);
2641 self.resolved.insert(instance);
2642 }
2643
2644 for instance in unresolved.drain() {
2645 self.add_pending_resolve(instance);
2646 }
2647
2648 if !removed_instances.is_empty() {
2649 debug!(
2650 "resolve_updated_instances: removed {}",
2651 &removed_instances.len()
2652 );
2653 self.notify_service_removal(removed_instances);
2654 }
2655 }
2656
2657 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2659 let sock_opt = if is_ipv4 {
2660 &self.ipv4_sock
2661 } else {
2662 &self.ipv6_sock
2663 };
2664 let Some(sock) = sock_opt.as_ref() else {
2665 debug!("handle_query: socket not available for intf {}", if_index);
2666 return;
2667 };
2668 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2669
2670 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2673
2674 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2675 debug!("missing dns registry for intf {}", if_index);
2676 return;
2677 };
2678
2679 let Some(intf) = self.my_intfs.get(&if_index) else {
2680 debug!("handle_query: no intf found for index {if_index}");
2681 return;
2682 };
2683
2684 for question in msg.questions().iter() {
2685 let qtype = question.entry_type();
2686
2687 if qtype == RRType::PTR {
2688 for service in self.my_services.values() {
2689 if service.get_status(if_index) != ServiceStatus::Announced {
2690 continue;
2691 }
2692
2693 if question.entry_name() == service.get_type()
2694 || service
2695 .get_subtype()
2696 .as_ref()
2697 .is_some_and(|v| v == question.entry_name())
2698 {
2699 add_answer_with_additionals(
2700 &mut out,
2701 &msg,
2702 service,
2703 intf,
2704 dns_registry,
2705 is_ipv4,
2706 );
2707 } else if question.entry_name() == META_QUERY {
2708 let ptr_added = out.add_answer(
2709 &msg,
2710 DnsPointer::new(
2711 question.entry_name(),
2712 RRType::PTR,
2713 CLASS_IN,
2714 service.get_other_ttl(),
2715 service.get_type().to_string(),
2716 ),
2717 );
2718 if !ptr_added {
2719 trace!("answer was not added for meta-query {:?}", &question);
2720 }
2721 }
2722 }
2723 } else {
2724 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2726 let probe_name = question.entry_name();
2727
2728 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2729 let now = current_time_millis();
2730
2731 if probe.start_time < now {
2735 let incoming_records: Vec<_> = msg
2736 .authorities()
2737 .iter()
2738 .filter(|r| r.get_name() == probe_name)
2739 .collect();
2740
2741 probe.tiebreaking(&incoming_records, now, probe_name);
2742 }
2743 }
2744 }
2745
2746 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2747 for service in self.my_services.values() {
2748 if service.get_status(if_index) != ServiceStatus::Announced {
2749 continue;
2750 }
2751
2752 let service_hostname =
2753 match dns_registry.name_changes.get(service.get_hostname()) {
2754 Some(new_name) => new_name,
2755 None => service.get_hostname(),
2756 };
2757
2758 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2759 let intf_addrs = if is_ipv4 {
2760 service.get_addrs_on_my_intf_v4(intf)
2761 } else {
2762 service.get_addrs_on_my_intf_v6(intf)
2763 };
2764 if intf_addrs.is_empty()
2765 && (qtype == RRType::A || qtype == RRType::AAAA)
2766 {
2767 let t = match qtype {
2768 RRType::A => "TYPE_A",
2769 RRType::AAAA => "TYPE_AAAA",
2770 _ => "invalid_type",
2771 };
2772 trace!(
2773 "Cannot find valid addrs for {} response on intf {:?}",
2774 t,
2775 &intf
2776 );
2777 return;
2778 }
2779 for address in intf_addrs {
2780 out.add_answer(
2781 &msg,
2782 DnsAddress::new(
2783 service_hostname,
2784 ip_address_rr_type(&address),
2785 CLASS_IN | CLASS_CACHE_FLUSH,
2786 service.get_host_ttl(),
2787 address,
2788 intf.into(),
2789 ),
2790 );
2791 }
2792 }
2793 }
2794 }
2795
2796 let query_name = question.entry_name().to_lowercase();
2797 let service_opt = self
2798 .my_services
2799 .iter()
2800 .find(|(k, _v)| {
2801 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2802 Some(new_name) => new_name,
2803 None => k,
2804 };
2805 service_name == &query_name
2806 })
2807 .map(|(_, v)| v);
2808
2809 let Some(service) = service_opt else {
2810 continue;
2811 };
2812
2813 if service.get_status(if_index) != ServiceStatus::Announced {
2814 continue;
2815 }
2816
2817 let intf_addrs = if is_ipv4 {
2818 service.get_addrs_on_my_intf_v4(intf)
2819 } else {
2820 service.get_addrs_on_my_intf_v6(intf)
2821 };
2822 if intf_addrs.is_empty() {
2823 debug!(
2824 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2825 &intf
2826 );
2827 continue;
2828 }
2829
2830 add_answer_of_service(
2831 &mut out,
2832 &msg,
2833 question.entry_name(),
2834 service,
2835 qtype,
2836 intf_addrs,
2837 );
2838 }
2839 }
2840
2841 if out.answers_count() > 0 {
2842 debug!("sending response on intf {}", &intf.name);
2843 out.set_id(msg.id());
2844 send_dns_outgoing(&out, intf, &sock.pktinfo);
2845
2846 let if_name = intf.name.clone();
2847
2848 self.increase_counter(Counter::Respond, 1);
2849 self.notify_monitors(DaemonEvent::Respond(if_name));
2850 }
2851
2852 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2853 }
2854
2855 fn increase_counter(&mut self, counter: Counter, count: i64) {
2857 let key = counter.to_string();
2858 match self.counters.get_mut(&key) {
2859 Some(v) => *v += count,
2860 None => {
2861 self.counters.insert(key, count);
2862 }
2863 }
2864 }
2865
2866 fn set_counter(&mut self, counter: Counter, count: i64) {
2868 let key = counter.to_string();
2869 self.counters.insert(key, count);
2870 }
2871
2872 fn signal_sock_drain(&self) {
2873 let mut signal_buf = [0; 1024];
2874
2875 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2877 trace!(
2878 "signal socket recvd: {}",
2879 String::from_utf8_lossy(&signal_buf[0..sz])
2880 );
2881 }
2882 }
2883
2884 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2885 self.retransmissions.push(ReRun { next_time, command });
2886 self.add_timer(next_time);
2887 }
2888
2889 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2892 for (ty_domain, sender) in self.service_queriers.iter() {
2893 if let Some(instances) = expired.get(ty_domain) {
2894 for instance_name in instances {
2895 let event = ServiceEvent::ServiceRemoved(
2896 ty_domain.to_string(),
2897 instance_name.to_string(),
2898 );
2899 match sender.send(event) {
2900 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2901 Err(e) => debug!("Failed to send event: {}", e),
2902 }
2903 }
2904 }
2905 }
2906 }
2907
2908 fn exec_command(&mut self, command: Command, repeating: bool) {
2912 trace!("exec_command: {:?} repeating: {}", &command, repeating);
2913 match command {
2914 Command::Browse(ty, next_delay, cache_only, listener) => {
2915 self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
2916 }
2917
2918 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2919 self.exec_command_resolve_hostname(
2920 repeating, hostname, next_delay, listener, timeout,
2921 );
2922 }
2923
2924 Command::Register(service_info) => {
2925 self.register_service(*service_info);
2926 self.increase_counter(Counter::Register, 1);
2927 }
2928
2929 Command::RegisterResend(fullname, intf) => {
2930 trace!("register-resend service: {fullname} on {}", &intf);
2931 self.exec_command_register_resend(fullname, intf);
2932 }
2933
2934 Command::Unregister(fullname, resp_s) => {
2935 trace!("unregister service {} repeat {}", &fullname, &repeating);
2936 self.exec_command_unregister(repeating, fullname, resp_s);
2937 }
2938
2939 Command::UnregisterResend(packet, if_index, is_ipv4) => {
2940 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2941 }
2942
2943 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2944
2945 Command::StopResolveHostname(hostname) => {
2946 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2947 }
2948
2949 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2950
2951 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2952
2953 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2954 Ok(()) => trace!("Sent status to the client"),
2955 Err(e) => debug!("Failed to send status: {}", e),
2956 },
2957
2958 Command::Monitor(resp_s) => {
2959 self.monitors.push(resp_s);
2960 }
2961
2962 Command::SetOption(daemon_opt) => {
2963 self.process_set_option(daemon_opt);
2964 }
2965
2966 Command::GetOption(resp_s) => {
2967 let val = DaemonOptionVal {
2968 _service_name_len_max: self.service_name_len_max,
2969 ip_check_interval: self.ip_check_interval,
2970 };
2971 if let Err(e) = resp_s.send(val) {
2972 debug!("Failed to send options: {}", e);
2973 }
2974 }
2975
2976 Command::Verify(instance_fullname, timeout) => {
2977 self.exec_command_verify(instance_fullname, timeout, repeating);
2978 }
2979
2980 _ => {
2981 debug!("unexpected command: {:?}", &command);
2982 }
2983 }
2984 }
2985
2986 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2987 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2988 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2989 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2990 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2991 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2992 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2993 self.set_counter(Counter::Timer, self.timers.len() as i64);
2994
2995 let dns_registry_probe_count: usize = self
2996 .dns_registry_map
2997 .values()
2998 .map(|r| r.probing.len())
2999 .sum();
3000 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3001
3002 let dns_registry_active_count: usize = self
3003 .dns_registry_map
3004 .values()
3005 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3006 .sum();
3007 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3008
3009 let dns_registry_timer_count: usize = self
3010 .dns_registry_map
3011 .values()
3012 .map(|r| r.new_timers.len())
3013 .sum();
3014 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3015
3016 let dns_registry_name_change_count: usize = self
3017 .dns_registry_map
3018 .values()
3019 .map(|r| r.name_changes.len())
3020 .sum();
3021 self.set_counter(
3022 Counter::DnsRegistryNameChange,
3023 dns_registry_name_change_count as i64,
3024 );
3025
3026 if let Err(e) = resp_s.send(self.counters.clone()) {
3028 debug!("Failed to send metrics: {}", e);
3029 }
3030 }
3031
3032 fn exec_command_browse(
3033 &mut self,
3034 repeating: bool,
3035 ty: String,
3036 next_delay: u32,
3037 cache_only: bool,
3038 listener: Sender<ServiceEvent>,
3039 ) {
3040 let pretty_addrs: Vec<String> = self
3041 .my_intfs
3042 .iter()
3043 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3044 .collect();
3045
3046 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3047 "{ty} on {} interfaces [{}]",
3048 pretty_addrs.len(),
3049 pretty_addrs.join(", ")
3050 ))) {
3051 debug!(
3052 "Failed to send SearchStarted({})(repeating:{}): {}",
3053 &ty, repeating, e
3054 );
3055 return;
3056 }
3057
3058 let now = current_time_millis();
3059 if !repeating {
3060 self.service_queriers.insert(ty.clone(), listener.clone());
3064
3065 self.query_cache_for_service(&ty, &listener, now);
3067 }
3068
3069 if cache_only {
3070 match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3072 Ok(()) => debug!("SearchStopped sent for {}", &ty),
3073 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3074 }
3075 return;
3076 }
3077
3078 self.send_query(&ty, RRType::PTR);
3079 self.increase_counter(Counter::Browse, 1);
3080
3081 let next_time = now + (next_delay * 1000) as u64;
3082 let max_delay = 60 * 60;
3083 let delay = cmp::min(next_delay * 2, max_delay);
3084 self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3085 }
3086
3087 fn exec_command_resolve_hostname(
3088 &mut self,
3089 repeating: bool,
3090 hostname: String,
3091 next_delay: u32,
3092 listener: Sender<HostnameResolutionEvent>,
3093 timeout: Option<u64>,
3094 ) {
3095 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3096 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3097 "{} on addrs {:?}",
3098 &hostname, &addr_list
3099 ))) {
3100 debug!(
3101 "Failed to send ResolveStarted({})(repeating:{}): {}",
3102 &hostname, repeating, e
3103 );
3104 return;
3105 }
3106 if !repeating {
3107 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3108 self.query_cache_for_hostname(&hostname, listener.clone());
3110 }
3111
3112 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3113 self.increase_counter(Counter::ResolveHostname, 1);
3114
3115 let now = current_time_millis();
3116 let next_time = now + u64::from(next_delay) * 1000;
3117 let max_delay = 60 * 60;
3118 let delay = cmp::min(next_delay * 2, max_delay);
3119
3120 if self
3122 .hostname_resolvers
3123 .get(&hostname)
3124 .and_then(|(_sender, timeout)| *timeout)
3125 .map(|timeout| next_time < timeout)
3126 .unwrap_or(true)
3127 {
3128 self.add_retransmission(
3129 next_time,
3130 Command::ResolveHostname(hostname, delay, listener, None),
3131 );
3132 }
3133 }
3134
3135 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3136 let pending_query = self.query_unresolved(&instance);
3137 let max_try = 3;
3138 if pending_query && try_count < max_try {
3139 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3142 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3143 }
3144 }
3145
3146 fn exec_command_unregister(
3147 &mut self,
3148 repeating: bool,
3149 fullname: String,
3150 resp_s: Sender<UnregisterStatus>,
3151 ) {
3152 let response = match self.my_services.remove_entry(&fullname) {
3153 None => {
3154 debug!("unregister: cannot find such service {}", &fullname);
3155 UnregisterStatus::NotFound
3156 }
3157 Some((_k, info)) => {
3158 let mut timers = Vec::new();
3159
3160 for (if_index, intf) in self.my_intfs.iter() {
3161 if let Some(sock) = self.ipv4_sock.as_ref() {
3162 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3163 if !repeating && !packet.is_empty() {
3165 let next_time = current_time_millis() + 120;
3166 self.retransmissions.push(ReRun {
3167 next_time,
3168 command: Command::UnregisterResend(packet, *if_index, true),
3169 });
3170 timers.push(next_time);
3171 }
3172 }
3173
3174 if let Some(sock) = self.ipv6_sock.as_ref() {
3176 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3177 if !repeating && !packet.is_empty() {
3178 let next_time = current_time_millis() + 120;
3179 self.retransmissions.push(ReRun {
3180 next_time,
3181 command: Command::UnregisterResend(packet, *if_index, false),
3182 });
3183 timers.push(next_time);
3184 }
3185 }
3186 }
3187
3188 for t in timers {
3189 self.add_timer(t);
3190 }
3191
3192 self.increase_counter(Counter::Unregister, 1);
3193 UnregisterStatus::OK
3194 }
3195 };
3196 if let Err(e) = resp_s.send(response) {
3197 debug!("unregister: failed to send response: {}", e);
3198 }
3199 }
3200
3201 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3202 let Some(intf) = self.my_intfs.get(&if_index) else {
3203 return;
3204 };
3205 let sock_opt = if is_ipv4 {
3206 &self.ipv4_sock
3207 } else {
3208 &self.ipv6_sock
3209 };
3210 let Some(sock) = sock_opt else {
3211 return;
3212 };
3213
3214 let if_addr = if is_ipv4 {
3215 match intf.next_ifaddr_v4() {
3216 Some(addr) => addr,
3217 None => return,
3218 }
3219 } else {
3220 match intf.next_ifaddr_v6() {
3221 Some(addr) => addr,
3222 None => return,
3223 }
3224 };
3225
3226 debug!("UnregisterResend from {:?}", if_addr);
3227 multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, &sock.pktinfo);
3228
3229 self.increase_counter(Counter::UnregisterResend, 1);
3230 }
3231
3232 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3233 match self.service_queriers.remove_entry(&ty_domain) {
3234 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3235 Some((ty, sender)) => {
3236 trace!("StopBrowse: removed queryer for {}", &ty);
3238 let mut i = 0;
3239 while i < self.retransmissions.len() {
3240 if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3241 if t == &ty {
3242 self.retransmissions.remove(i);
3243 trace!("StopBrowse: removed retransmission for {}", &ty);
3244 continue;
3245 }
3246 }
3247 i += 1;
3248 }
3249
3250 self.cache.remove_service_type(&ty_domain);
3252
3253 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3255 Ok(()) => trace!("Sent SearchStopped to the listener"),
3256 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3257 }
3258 }
3259 }
3260 }
3261
3262 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3263 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3264 trace!("StopResolve: removed queryer for {}", &host);
3266 let mut i = 0;
3267 while i < self.retransmissions.len() {
3268 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3269 if t == &host {
3270 self.retransmissions.remove(i);
3271 trace!("StopResolve: removed retransmission for {}", &host);
3272 continue;
3273 }
3274 }
3275 i += 1;
3276 }
3277
3278 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3280 Ok(()) => trace!("Sent SearchStopped to the listener"),
3281 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3282 }
3283 }
3284 }
3285
3286 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3287 let Some(info) = self.my_services.get_mut(&fullname) else {
3288 trace!("announce: cannot find such service {}", &fullname);
3289 return;
3290 };
3291
3292 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3293 return;
3294 };
3295
3296 let Some(intf) = self.my_intfs.get(&if_index) else {
3297 return;
3298 };
3299
3300 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3301 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
3302 } else {
3303 false
3304 };
3305 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3306 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo)
3307 } else {
3308 false
3309 };
3310
3311 if announced_v4 || announced_v6 {
3312 let mut hostname = info.get_hostname();
3313 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3314 hostname = new_name;
3315 }
3316 let service_name = match dns_registry.name_changes.get(&fullname) {
3317 Some(new_name) => new_name.to_string(),
3318 None => fullname,
3319 };
3320
3321 debug!("resend: announce service {service_name} on {}", intf.name);
3322
3323 notify_monitors(
3324 &mut self.monitors,
3325 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3326 );
3327 info.set_status(if_index, ServiceStatus::Announced);
3328 } else {
3329 debug!("register-resend should not fail");
3330 }
3331
3332 self.increase_counter(Counter::RegisterResend, 1);
3333 }
3334
3335 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3336 let now = current_time_millis();
3346 let expire_at = if repeating {
3347 None
3348 } else {
3349 Some(now + timeout.as_millis() as u64)
3350 };
3351
3352 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3354
3355 if !record_vec.is_empty() {
3356 let query_vec: Vec<(&str, RRType)> = record_vec
3357 .iter()
3358 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3359 .collect();
3360 self.send_query_vec(&query_vec);
3361
3362 if let Some(new_expire) = expire_at {
3363 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3367 }
3368 }
3369 }
3370
3371 fn refresh_active_services(&mut self) {
3373 let mut query_ptr_count = 0;
3374 let mut query_srv_count = 0;
3375 let mut new_timers = HashSet::new();
3376 let mut query_addr_count = 0;
3377
3378 for (ty_domain, _sender) in self.service_queriers.iter() {
3379 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3380 if !refreshed_timers.is_empty() {
3381 trace!("sending refresh query for PTR: {}", ty_domain);
3382 self.send_query(ty_domain, RRType::PTR);
3383 query_ptr_count += 1;
3384 new_timers.extend(refreshed_timers);
3385 }
3386
3387 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3388 for (instance, types) in instances {
3389 trace!("sending refresh query for: {}", &instance);
3390 let query_vec = types
3391 .into_iter()
3392 .map(|ty| (instance.as_str(), ty))
3393 .collect::<Vec<_>>();
3394 self.send_query_vec(&query_vec);
3395 query_srv_count += 1;
3396 }
3397 new_timers.extend(timers);
3398 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3399 for hostname in hostnames.iter() {
3400 trace!("sending refresh queries for A and AAAA: {}", hostname);
3401 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3402 query_addr_count += 2;
3403 }
3404 new_timers.extend(timers);
3405 }
3406
3407 for timer in new_timers {
3408 self.add_timer(timer);
3409 }
3410
3411 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3412 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3413 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3414 }
3415}
3416
3417fn add_answer_of_service(
3419 out: &mut DnsOutgoing,
3420 msg: &DnsIncoming,
3421 entry_name: &str,
3422 service: &ServiceInfo,
3423 qtype: RRType,
3424 intf_addrs: Vec<IpAddr>,
3425) {
3426 if qtype == RRType::SRV || qtype == RRType::ANY {
3427 out.add_answer(
3428 msg,
3429 DnsSrv::new(
3430 entry_name,
3431 CLASS_IN | CLASS_CACHE_FLUSH,
3432 service.get_host_ttl(),
3433 service.get_priority(),
3434 service.get_weight(),
3435 service.get_port(),
3436 service.get_hostname().to_string(),
3437 ),
3438 );
3439 }
3440
3441 if qtype == RRType::TXT || qtype == RRType::ANY {
3442 out.add_answer(
3443 msg,
3444 DnsTxt::new(
3445 entry_name,
3446 CLASS_IN | CLASS_CACHE_FLUSH,
3447 service.get_other_ttl(),
3448 service.generate_txt(),
3449 ),
3450 );
3451 }
3452
3453 if qtype == RRType::SRV {
3454 for address in intf_addrs {
3455 out.add_additional_answer(DnsAddress::new(
3456 service.get_hostname(),
3457 ip_address_rr_type(&address),
3458 CLASS_IN | CLASS_CACHE_FLUSH,
3459 service.get_host_ttl(),
3460 address,
3461 InterfaceId::default(),
3462 ));
3463 }
3464 }
3465}
3466
3467#[derive(Clone, Debug)]
3470#[non_exhaustive]
3471pub enum ServiceEvent {
3472 SearchStarted(String),
3474
3475 ServiceFound(String, String),
3477
3478 ServiceResolved(Box<ResolvedService>),
3480
3481 ServiceRemoved(String, String),
3483
3484 SearchStopped(String),
3486}
3487
3488#[derive(Clone, Debug)]
3491#[non_exhaustive]
3492pub enum HostnameResolutionEvent {
3493 SearchStarted(String),
3495 AddressesFound(String, HashSet<ScopedIp>),
3497 AddressesRemoved(String, HashSet<ScopedIp>),
3499 SearchTimeout(String),
3501 SearchStopped(String),
3503}
3504
3505#[derive(Clone, Debug)]
3508#[non_exhaustive]
3509pub enum DaemonEvent {
3510 Announce(String, String),
3512
3513 Error(Error),
3515
3516 IpAdd(IpAddr),
3518
3519 IpDel(IpAddr),
3521
3522 NameChange(DnsNameChange),
3525
3526 Respond(String),
3528}
3529
3530#[derive(Clone, Debug)]
3533pub struct DnsNameChange {
3534 pub original: String,
3536
3537 pub new_name: String,
3547
3548 pub rr_type: RRType,
3550
3551 pub intf_name: String,
3553}
3554
3555#[derive(Debug)]
3557enum Command {
3558 Browse(String, u32, bool, Sender<ServiceEvent>),
3560
3561 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(Box<ServiceInfo>),
3566
3567 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3588
3589 GetStatus(Sender<DaemonStatus>),
3591
3592 Monitor(Sender<DaemonEvent>),
3594
3595 SetOption(DaemonOption),
3596
3597 GetOption(Sender<DaemonOptionVal>),
3598
3599 Verify(String, Duration),
3604
3605 Exit(Sender<DaemonStatus>),
3606}
3607
3608impl fmt::Display for Command {
3609 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3610 match self {
3611 Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3612 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3613 Self::Exit(_) => write!(f, "Command Exit"),
3614 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3615 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3616 Self::Monitor(_) => write!(f, "Command Monitor"),
3617 Self::Register(_) => write!(f, "Command Register"),
3618 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3619 Self::SetOption(_) => write!(f, "Command SetOption"),
3620 Self::GetOption(_) => write!(f, "Command GetOption"),
3621 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3622 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3623 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3624 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3625 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3626 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3627 }
3628 }
3629}
3630
3631struct DaemonOptionVal {
3632 _service_name_len_max: u8,
3633 ip_check_interval: u64,
3634}
3635
3636#[derive(Debug)]
3637enum DaemonOption {
3638 ServiceNameLenMax(u8),
3639 IpCheckInterval(u64),
3640 EnableInterface(Vec<IfKind>),
3641 DisableInterface(Vec<IfKind>),
3642 MulticastLoopV4(bool),
3643 MulticastLoopV6(bool),
3644 AcceptUnsolicited(bool),
3645 #[cfg(test)]
3646 TestDownInterface(String),
3647 #[cfg(test)]
3648 TestUpInterface(String),
3649}
3650
3651const DOMAIN_LEN: usize = "._tcp.local.".len();
3653
3654fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3656 if ty_domain.len() <= DOMAIN_LEN + 1 {
3657 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3659 }
3660
3661 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3663 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3664 }
3665 Ok(())
3666}
3667
3668fn check_domain_suffix(name: &str) -> Result<()> {
3670 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3671 return Err(e_fmt!(
3672 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3673 name
3674 ));
3675 }
3676
3677 Ok(())
3678}
3679
3680fn check_service_name(fullname: &str) -> Result<()> {
3688 check_domain_suffix(fullname)?;
3689
3690 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3691 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3692
3693 if &name[0..1] != "_" {
3694 return Err(e_fmt!("Service name must start with '_'"));
3695 }
3696
3697 let name = &name[1..];
3698
3699 if name.contains("--") {
3700 return Err(e_fmt!("Service name must not contain '--'"));
3701 }
3702
3703 if name.starts_with('-') || name.ends_with('-') {
3704 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3705 }
3706
3707 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3708 if ascii_count < 1 {
3709 return Err(e_fmt!(
3710 "Service name must contain at least one letter (eg: 'A-Za-z')"
3711 ));
3712 }
3713
3714 Ok(())
3715}
3716
3717fn check_hostname(hostname: &str) -> Result<()> {
3719 if !hostname.ends_with(".local.") {
3720 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3721 }
3722
3723 if hostname == ".local." {
3724 return Err(e_fmt!(
3725 "The part of the hostname before '.local.' cannot be empty"
3726 ));
3727 }
3728
3729 if hostname.len() > 255 {
3730 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3731 }
3732
3733 Ok(())
3734}
3735
3736fn call_service_listener(
3737 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3738 ty_domain: &str,
3739 event: ServiceEvent,
3740) {
3741 if let Some(listener) = listeners_map.get(ty_domain) {
3742 match listener.send(event) {
3743 Ok(()) => trace!("Sent event to listener successfully"),
3744 Err(e) => debug!("Failed to send event: {}", e),
3745 }
3746 }
3747}
3748
3749fn call_hostname_resolution_listener(
3750 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3751 hostname: &str,
3752 event: HostnameResolutionEvent,
3753) {
3754 let hostname_lower = hostname.to_lowercase();
3755 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3756 match listener.send(event) {
3757 Ok(()) => trace!("Sent event to listener successfully"),
3758 Err(e) => debug!("Failed to send event: {}", e),
3759 }
3760 }
3761}
3762
3763fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3767 if_addrs::get_if_addrs()
3768 .unwrap_or_default()
3769 .into_iter()
3770 .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3771 .collect()
3772}
3773
3774fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3775 let if_name = &my_intf.name;
3776
3777 let if_addr = if sock.domain() == Domain::IPV4 {
3778 match my_intf.next_ifaddr_v4() {
3779 Some(addr) => addr,
3780 None => return vec![],
3781 }
3782 } else {
3783 match my_intf.next_ifaddr_v6() {
3784 Some(addr) => addr,
3785 None => return vec![],
3786 }
3787 };
3788
3789 send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3790}
3791
3792fn send_dns_outgoing_impl(
3794 out: &DnsOutgoing,
3795 if_name: &str,
3796 if_index: u32,
3797 if_addr: &IfAddr,
3798 sock: &PktInfoUdpSocket,
3799) -> Vec<Vec<u8>> {
3800 let qtype = if out.is_query() {
3801 "query"
3802 } else {
3803 if out.answers_count() == 0 && out.additionals().is_empty() {
3804 return vec![]; }
3806 "response"
3807 };
3808 trace!(
3809 "send {}: {} questions {} answers {} authorities {} additional",
3810 qtype,
3811 out.questions().len(),
3812 out.answers_count(),
3813 out.authorities().len(),
3814 out.additionals().len()
3815 );
3816
3817 match if_addr.ip() {
3818 IpAddr::V4(ipv4) => {
3819 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3820 debug!(
3821 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3822 ipv4, e
3823 );
3824 return vec![]; }
3826 }
3827 IpAddr::V6(ipv6) => {
3828 if let Err(e) = sock.set_multicast_if_v6(if_index) {
3829 debug!(
3830 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3831 ipv6, e
3832 );
3833 return vec![]; }
3835 }
3836 }
3837
3838 let packet_list = out.to_data_on_wire();
3839 for packet in packet_list.iter() {
3840 multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3841 }
3842 packet_list
3843}
3844
3845fn multicast_on_intf(
3847 packet: &[u8],
3848 if_name: &str,
3849 if_index: u32,
3850 if_addr: &IfAddr,
3851 socket: &PktInfoUdpSocket,
3852) {
3853 if packet.len() > MAX_MSG_ABSOLUTE {
3854 debug!("Drop over-sized packet ({})", packet.len());
3855 return;
3856 }
3857
3858 let addr: SocketAddr = match if_addr {
3859 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3860 if_addrs::IfAddr::V6(_) => {
3861 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3862 sock.set_scope_id(if_index); sock.into()
3864 }
3865 };
3866
3867 let sock_addr = addr.into();
3869 match socket.send_to(packet, &sock_addr) {
3870 Ok(sz) => trace!(
3871 "sent out {} bytes on interface {} (idx {}) addr {}",
3872 sz,
3873 if_name,
3874 if_index,
3875 if_addr.ip()
3876 ),
3877 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3878 }
3879}
3880
3881fn valid_instance_name(name: &str) -> bool {
3885 name.split('.').count() >= 5
3886}
3887
3888fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3889 monitors.retain(|sender| {
3890 if let Err(e) = sender.try_send(event.clone()) {
3891 debug!("notify_monitors: try_send: {}", &e);
3892 if matches!(e, TrySendError::Disconnected(_)) {
3893 return false; }
3895 }
3896 true
3897 });
3898}
3899
3900fn prepare_announce(
3903 info: &ServiceInfo,
3904 intf: &MyIntf,
3905 dns_registry: &mut DnsRegistry,
3906 is_ipv4: bool,
3907) -> Option<DnsOutgoing> {
3908 let intf_addrs = if is_ipv4 {
3909 info.get_addrs_on_my_intf_v4(intf)
3910 } else {
3911 info.get_addrs_on_my_intf_v6(intf)
3912 };
3913
3914 if intf_addrs.is_empty() {
3915 debug!(
3916 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3917 &intf.name
3918 );
3919 return None;
3920 }
3921
3922 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3924 Some(new_name) => new_name,
3925 None => info.get_fullname(),
3926 };
3927
3928 debug!(
3929 "prepare to announce service {service_fullname} on {:?}",
3930 &intf_addrs
3931 );
3932
3933 let mut probing_count = 0;
3934 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3935 let create_time = current_time_millis() + fastrand::u64(0..250);
3936
3937 out.add_answer_at_time(
3938 DnsPointer::new(
3939 info.get_type(),
3940 RRType::PTR,
3941 CLASS_IN,
3942 info.get_other_ttl(),
3943 service_fullname.to_string(),
3944 ),
3945 0,
3946 );
3947
3948 if let Some(sub) = info.get_subtype() {
3949 trace!("Adding subdomain {}", sub);
3950 out.add_answer_at_time(
3951 DnsPointer::new(
3952 sub,
3953 RRType::PTR,
3954 CLASS_IN,
3955 info.get_other_ttl(),
3956 service_fullname.to_string(),
3957 ),
3958 0,
3959 );
3960 }
3961
3962 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3964 Some(new_name) => new_name.to_string(),
3965 None => info.get_hostname().to_string(),
3966 };
3967
3968 let mut srv = DnsSrv::new(
3969 info.get_fullname(),
3970 CLASS_IN | CLASS_CACHE_FLUSH,
3971 info.get_host_ttl(),
3972 info.get_priority(),
3973 info.get_weight(),
3974 info.get_port(),
3975 hostname,
3976 );
3977
3978 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3979 srv.get_record_mut().set_new_name(new_name.to_string());
3980 }
3981
3982 if !info.requires_probe()
3983 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3984 {
3985 out.add_answer_at_time(srv, 0);
3986 } else {
3987 probing_count += 1;
3988 }
3989
3990 let mut txt = DnsTxt::new(
3993 info.get_fullname(),
3994 CLASS_IN | CLASS_CACHE_FLUSH,
3995 info.get_other_ttl(),
3996 info.generate_txt(),
3997 );
3998
3999 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4000 txt.get_record_mut().set_new_name(new_name.to_string());
4001 }
4002
4003 if !info.requires_probe()
4004 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4005 {
4006 out.add_answer_at_time(txt, 0);
4007 } else {
4008 probing_count += 1;
4009 }
4010
4011 let hostname = info.get_hostname();
4014 for address in intf_addrs {
4015 let mut dns_addr = DnsAddress::new(
4016 hostname,
4017 ip_address_rr_type(&address),
4018 CLASS_IN | CLASS_CACHE_FLUSH,
4019 info.get_host_ttl(),
4020 address,
4021 intf.into(),
4022 );
4023
4024 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4025 dns_addr.get_record_mut().set_new_name(new_name.to_string());
4026 }
4027
4028 if !info.requires_probe()
4029 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4030 {
4031 out.add_answer_at_time(dns_addr, 0);
4032 } else {
4033 probing_count += 1;
4034 }
4035 }
4036
4037 if probing_count > 0 {
4038 return None;
4039 }
4040
4041 Some(out)
4042}
4043
4044fn announce_service_on_intf(
4047 dns_registry: &mut DnsRegistry,
4048 info: &ServiceInfo,
4049 intf: &MyIntf,
4050 sock: &PktInfoUdpSocket,
4051) -> bool {
4052 let is_ipv4 = sock.domain() == Domain::IPV4;
4053 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4054 send_dns_outgoing(&out, intf, sock);
4055 return true;
4056 }
4057
4058 false
4059}
4060
4061fn name_change(original: &str) -> String {
4069 let mut parts: Vec<_> = original.split('.').collect();
4070 let Some(first_part) = parts.get_mut(0) else {
4071 return format!("{original} (2)");
4072 };
4073
4074 let mut new_name = format!("{first_part} (2)");
4075
4076 if let Some(paren_pos) = first_part.rfind(" (") {
4078 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4080 let absolute_end_pos = paren_pos + end_paren;
4081 if absolute_end_pos == first_part.len() - 1 {
4083 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4086 let base_name = &first_part[..paren_pos];
4087 new_name = format!("{} ({})", base_name, number + 1)
4088 }
4089 }
4090 }
4091 }
4092
4093 *first_part = &new_name;
4094 parts.join(".")
4095}
4096
4097fn hostname_change(original: &str) -> String {
4105 let mut parts: Vec<_> = original.split('.').collect();
4106 let Some(first_part) = parts.get_mut(0) else {
4107 return format!("{original}-2");
4108 };
4109
4110 let mut new_name = format!("{first_part}-2");
4111
4112 if let Some(hyphen_pos) = first_part.rfind('-') {
4114 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4116 let base_name = &first_part[..hyphen_pos];
4117 new_name = format!("{}-{}", base_name, number + 1);
4118 }
4119 }
4120
4121 *first_part = &new_name;
4122 parts.join(".")
4123}
4124
4125fn add_answer_with_additionals(
4126 out: &mut DnsOutgoing,
4127 msg: &DnsIncoming,
4128 service: &ServiceInfo,
4129 intf: &MyIntf,
4130 dns_registry: &DnsRegistry,
4131 is_ipv4: bool,
4132) {
4133 let intf_addrs = if is_ipv4 {
4134 service.get_addrs_on_my_intf_v4(intf)
4135 } else {
4136 service.get_addrs_on_my_intf_v6(intf)
4137 };
4138 if intf_addrs.is_empty() {
4139 trace!("No addrs on LAN of intf {:?}", intf);
4140 return;
4141 }
4142
4143 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4145 Some(new_name) => new_name,
4146 None => service.get_fullname(),
4147 };
4148
4149 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4150 Some(new_name) => new_name,
4151 None => service.get_hostname(),
4152 };
4153
4154 let ptr_added = out.add_answer(
4155 msg,
4156 DnsPointer::new(
4157 service.get_type(),
4158 RRType::PTR,
4159 CLASS_IN,
4160 service.get_other_ttl(),
4161 service_fullname.to_string(),
4162 ),
4163 );
4164
4165 if !ptr_added {
4166 trace!("answer was not added for msg {:?}", msg);
4167 return;
4168 }
4169
4170 if let Some(sub) = service.get_subtype() {
4171 trace!("Adding subdomain {}", sub);
4172 out.add_additional_answer(DnsPointer::new(
4173 sub,
4174 RRType::PTR,
4175 CLASS_IN,
4176 service.get_other_ttl(),
4177 service_fullname.to_string(),
4178 ));
4179 }
4180
4181 out.add_additional_answer(DnsSrv::new(
4184 service_fullname,
4185 CLASS_IN | CLASS_CACHE_FLUSH,
4186 service.get_host_ttl(),
4187 service.get_priority(),
4188 service.get_weight(),
4189 service.get_port(),
4190 hostname.to_string(),
4191 ));
4192
4193 out.add_additional_answer(DnsTxt::new(
4194 service_fullname,
4195 CLASS_IN | CLASS_CACHE_FLUSH,
4196 service.get_other_ttl(),
4197 service.generate_txt(),
4198 ));
4199
4200 for address in intf_addrs {
4201 out.add_additional_answer(DnsAddress::new(
4202 hostname,
4203 ip_address_rr_type(&address),
4204 CLASS_IN | CLASS_CACHE_FLUSH,
4205 service.get_host_ttl(),
4206 address,
4207 intf.into(),
4208 ));
4209 }
4210}
4211
4212fn check_probing(
4215 dns_registry: &mut DnsRegistry,
4216 timers: &mut BinaryHeap<Reverse<u64>>,
4217 now: u64,
4218) -> (DnsOutgoing, Vec<String>) {
4219 let mut expired_probes = Vec::new();
4220 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4221
4222 for (name, probe) in dns_registry.probing.iter_mut() {
4223 if now >= probe.next_send {
4224 if probe.expired(now) {
4225 expired_probes.push(name.clone());
4227 } else {
4228 out.add_question(name, RRType::ANY);
4229
4230 for record in probe.records.iter() {
4238 out.add_authority(record.clone());
4239 }
4240
4241 probe.update_next_send(now);
4242
4243 timers.push(Reverse(probe.next_send));
4245 }
4246 }
4247 }
4248
4249 (out, expired_probes)
4250}
4251
4252fn handle_expired_probes(
4257 expired_probes: Vec<String>,
4258 intf_name: &str,
4259 dns_registry: &mut DnsRegistry,
4260 monitors: &mut Vec<Sender<DaemonEvent>>,
4261) -> HashSet<String> {
4262 let mut waiting_services = HashSet::new();
4263
4264 for name in expired_probes {
4265 let Some(probe) = dns_registry.probing.remove(&name) else {
4266 continue;
4267 };
4268
4269 for record in probe.records.iter() {
4271 if let Some(new_name) = record.get_record().get_new_name() {
4272 dns_registry
4273 .name_changes
4274 .insert(name.clone(), new_name.to_string());
4275
4276 let event = DnsNameChange {
4277 original: record.get_record().get_original_name().to_string(),
4278 new_name: new_name.to_string(),
4279 rr_type: record.get_type(),
4280 intf_name: intf_name.to_string(),
4281 };
4282 debug!("Name change event: {:?}", &event);
4283 notify_monitors(monitors, DaemonEvent::NameChange(event));
4284 }
4285 }
4286
4287 debug!(
4289 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4290 probe.records.len(),
4291 probe.waiting_services.len(),
4292 );
4293
4294 if !probe.records.is_empty() {
4296 match dns_registry.active.get_mut(&name) {
4297 Some(records) => {
4298 records.extend(probe.records);
4299 }
4300 None => {
4301 dns_registry.active.insert(name, probe.records);
4302 }
4303 }
4304
4305 waiting_services.extend(probe.waiting_services);
4306 }
4307 }
4308
4309 waiting_services
4310}
4311
4312#[cfg(test)]
4313mod tests {
4314 use super::{
4315 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4316 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4317 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo,
4318 };
4319 use crate::{
4320 dns_parser::{
4321 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4322 FLAGS_AA, FLAGS_QR_RESPONSE,
4323 },
4324 service_daemon::{add_answer_of_service, check_hostname},
4325 };
4326 use std::time::{Duration, SystemTime};
4327 use test_log::test;
4328
4329 #[test]
4330 fn test_instance_name() {
4331 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4332 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4333 assert!(!valid_instance_name("_printer._tcp.local."));
4334 }
4335
4336 #[test]
4337 fn test_check_service_name_length() {
4338 let result = check_service_name_length("_tcp", 100);
4339 assert!(result.is_err());
4340 if let Err(e) = result {
4341 println!("{}", e);
4342 }
4343 }
4344
4345 #[test]
4346 fn test_check_hostname() {
4347 for hostname in &[
4349 "my_host.local.",
4350 &("A".repeat(255 - ".local.".len()) + ".local."),
4351 ] {
4352 let result = check_hostname(hostname);
4353 assert!(result.is_ok());
4354 }
4355
4356 for hostname in &[
4358 "my_host.local",
4359 ".local.",
4360 &("A".repeat(256 - ".local.".len()) + ".local."),
4361 ] {
4362 let result = check_hostname(hostname);
4363 assert!(result.is_err());
4364 if let Err(e) = result {
4365 println!("{}", e);
4366 }
4367 }
4368 }
4369
4370 #[test]
4371 fn test_check_domain_suffix() {
4372 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4373 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4374 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4375 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4376 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4377 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4378 }
4379
4380 #[test]
4381 fn test_service_with_temporarily_invalidated_ptr() {
4382 let d = ServiceDaemon::new().expect("Failed to create daemon");
4384
4385 let service = "_test_inval_ptr._udp.local.";
4386 let host_name = "my_host_tmp_invalidated_ptr.local.";
4387 let intfs: Vec<_> = my_ip_interfaces(false);
4388 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4389 let port = 5201;
4390 let my_service =
4391 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4392 .expect("invalid service info")
4393 .enable_addr_auto();
4394 let result = d.register(my_service.clone());
4395 assert!(result.is_ok());
4396
4397 let browse_chan = d.browse(service).unwrap();
4399 let timeout = Duration::from_secs(2);
4400 let mut resolved = false;
4401
4402 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4403 match event {
4404 ServiceEvent::ServiceResolved(info) => {
4405 resolved = true;
4406 println!("Resolved a service of {}", &info.fullname);
4407 break;
4408 }
4409 e => {
4410 println!("Received event {:?}", e);
4411 }
4412 }
4413 }
4414
4415 assert!(resolved);
4416
4417 println!("Stopping browse of {}", service);
4418 d.stop_browse(service).unwrap();
4421
4422 let mut stopped = false;
4427 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4428 match event {
4429 ServiceEvent::SearchStopped(_) => {
4430 stopped = true;
4431 println!("Stopped browsing service");
4432 break;
4433 }
4434 e => {
4438 println!("Received event {:?}", e);
4439 }
4440 }
4441 }
4442
4443 assert!(stopped);
4444
4445 let invalidate_ptr_packet = DnsPointer::new(
4447 my_service.get_type(),
4448 RRType::PTR,
4449 CLASS_IN,
4450 0,
4451 my_service.get_fullname().to_string(),
4452 );
4453
4454 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4455 packet_buffer.add_additional_answer(invalidate_ptr_packet);
4456
4457 for intf in intfs {
4458 let sock = _new_socket_bind(&intf, true).unwrap();
4459 send_dns_outgoing_impl(
4460 &packet_buffer,
4461 &intf.name,
4462 intf.index.unwrap_or(0),
4463 &intf.addr,
4464 &sock.pktinfo,
4465 );
4466 }
4467
4468 println!(
4469 "Sent PTR record invalidation. Starting second browse for {}",
4470 service
4471 );
4472
4473 let browse_chan = d.browse(service).unwrap();
4475
4476 resolved = false;
4477 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4478 match event {
4479 ServiceEvent::ServiceResolved(info) => {
4480 resolved = true;
4481 println!("Resolved a service of {}", &info.fullname);
4482 break;
4483 }
4484 e => {
4485 println!("Received event {:?}", e);
4486 }
4487 }
4488 }
4489
4490 assert!(resolved);
4491 d.shutdown().unwrap();
4492 }
4493
4494 #[test]
4495 fn test_expired_srv() {
4496 let service_type = "_expired-srv._udp.local.";
4498 let instance = "test_instance";
4499 let host_name = "expired_srv_host.local.";
4500 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4501 .unwrap()
4502 .enable_addr_auto();
4503 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
4508
4509 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4511 let result = mdns_server.register(my_service);
4512 assert!(result.is_ok());
4513
4514 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4515 let browse_chan = mdns_client.browse(service_type).unwrap();
4516 let timeout = Duration::from_secs(2);
4517 let mut resolved = false;
4518
4519 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4520 if let ServiceEvent::ServiceResolved(info) = event {
4521 resolved = true;
4522 println!("Resolved a service of {}", &info.fullname);
4523 break;
4524 }
4525 }
4526
4527 assert!(resolved);
4528
4529 mdns_server.shutdown().unwrap();
4531
4532 let expire_timeout = Duration::from_secs(new_ttl as u64);
4534 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4535 if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4536 println!("Service removed: {}: {}", &service_type, &full_name);
4537 break;
4538 }
4539 }
4540 }
4541
4542 #[test]
4543 fn test_hostname_resolution_address_removed() {
4544 let server = ServiceDaemon::new().expect("Failed to create server");
4546 let hostname = "addr_remove_host._tcp.local.";
4547 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4548 .iter()
4549 .find(|iface| iface.ip().is_ipv4())
4550 .map(|iface| iface.ip().into())
4551 .unwrap();
4552
4553 let mut my_service = ServiceInfo::new(
4554 "_host_res_test._tcp.local.",
4555 "my_instance",
4556 hostname,
4557 service_ip_addr.to_ip_addr(),
4558 1234,
4559 None,
4560 )
4561 .expect("invalid service info");
4562
4563 let addr_ttl = 2;
4565 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
4568
4569 let client = ServiceDaemon::new().expect("Failed to create client");
4571 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4572 let resolved = loop {
4573 match event_receiver.recv() {
4574 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4575 assert!(found_hostname == hostname);
4576 assert!(addresses.contains(&service_ip_addr));
4577 println!("address found: {:?}", &addresses);
4578 break true;
4579 }
4580 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4581 Ok(_event) => {}
4582 Err(_) => break false,
4583 }
4584 };
4585
4586 assert!(resolved);
4587
4588 server.shutdown().unwrap();
4590
4591 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4593 let removed = loop {
4594 match event_receiver.recv_timeout(timeout) {
4595 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4596 assert!(removed_host == hostname);
4597 assert!(addresses.contains(&service_ip_addr));
4598
4599 println!(
4600 "address removed: hostname: {} addresses: {:?}",
4601 &hostname, &addresses
4602 );
4603 break true;
4604 }
4605 Ok(_event) => {}
4606 Err(_) => {
4607 break false;
4608 }
4609 }
4610 };
4611
4612 assert!(removed);
4613
4614 client.shutdown().unwrap();
4615 }
4616
4617 #[test]
4618 fn test_refresh_ptr() {
4619 let service_type = "_refresh-ptr._udp.local.";
4621 let instance = "test_instance";
4622 let host_name = "refresh_ptr_host.local.";
4623 let service_ip_addr = my_ip_interfaces(false)
4624 .iter()
4625 .find(|iface| iface.ip().is_ipv4())
4626 .map(|iface| iface.ip())
4627 .unwrap();
4628
4629 let mut my_service = ServiceInfo::new(
4630 service_type,
4631 instance,
4632 host_name,
4633 service_ip_addr,
4634 5023,
4635 None,
4636 )
4637 .unwrap();
4638
4639 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4641
4642 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4644 let result = mdns_server.register(my_service);
4645 assert!(result.is_ok());
4646
4647 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4648 let browse_chan = mdns_client.browse(service_type).unwrap();
4649 let timeout = Duration::from_millis(1500); let mut resolved = false;
4651
4652 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4654 if let ServiceEvent::ServiceResolved(info) = event {
4655 resolved = true;
4656 println!("Resolved a service of {}", &info.fullname);
4657 break;
4658 }
4659 }
4660
4661 assert!(resolved);
4662
4663 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4665 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4666 println!("event: {:?}", &event);
4667 }
4668
4669 let metrics_chan = mdns_client.get_metrics().unwrap();
4671 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4672 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4673 assert_eq!(ptr_refresh_counter, 1);
4674 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4675 assert_eq!(srvtxt_refresh_counter, 1);
4676
4677 mdns_server.shutdown().unwrap();
4679 mdns_client.shutdown().unwrap();
4680 }
4681
4682 #[test]
4683 fn test_name_change() {
4684 assert_eq!(name_change("foo.local."), "foo (2).local.");
4685 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4686 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4687 assert_eq!(name_change("foo"), "foo (2)");
4688 assert_eq!(name_change("foo (2)"), "foo (3)");
4689 assert_eq!(name_change(""), " (2)");
4690
4691 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4696
4697 #[test]
4698 fn test_hostname_change() {
4699 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4700 assert_eq!(hostname_change("foo"), "foo-2");
4701 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4702 assert_eq!(hostname_change("foo-9"), "foo-10");
4703 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4704 }
4705
4706 #[test]
4707 fn test_add_answer_txt_ttl() {
4708 let service_type = "_test_add_answer._udp.local.";
4710 let instance = "test_instance";
4711 let host_name = "add_answer_host.local.";
4712 let service_intf = my_ip_interfaces(false)
4713 .into_iter()
4714 .find(|iface| iface.ip().is_ipv4())
4715 .unwrap();
4716 let service_ip_addr = service_intf.ip();
4717 let my_service = ServiceInfo::new(
4718 service_type,
4719 instance,
4720 host_name,
4721 service_ip_addr,
4722 5023,
4723 None,
4724 )
4725 .unwrap();
4726
4727 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4729
4730 let mut dummy_data = out.to_data_on_wire();
4732 let interface_id = InterfaceId::from(&service_intf);
4733 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4734
4735 let if_addrs = vec![service_intf.ip()];
4737 add_answer_of_service(
4738 &mut out,
4739 &incoming,
4740 instance,
4741 &my_service,
4742 RRType::TXT,
4743 if_addrs,
4744 );
4745
4746 assert!(
4748 out.answers_count() > 0,
4749 "No answers added to the outgoing message"
4750 );
4751
4752 let answer = out._answers().first().unwrap();
4754 assert_eq!(answer.0.get_type(), RRType::TXT);
4755
4756 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4758 }
4759
4760 #[test]
4761 fn test_interface_flip() {
4762 let ty_domain = "_intf-flip._udp.local.";
4764 let host_name = "intf_flip.local.";
4765 let now = SystemTime::now()
4766 .duration_since(SystemTime::UNIX_EPOCH)
4767 .unwrap();
4768 let instance_name = now.as_micros().to_string(); let port = 5200;
4770
4771 let (ip_addr1, intf_name) = my_ip_interfaces(false)
4773 .iter()
4774 .find(|iface| iface.ip().is_ipv4())
4775 .map(|iface| (iface.ip(), iface.name.clone()))
4776 .unwrap();
4777
4778 println!("Using interface {} with IP {}", intf_name, ip_addr1);
4779
4780 let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
4782 .expect("valid service info");
4783 let server1 = ServiceDaemon::new().expect("failed to start server");
4784 server1
4785 .register(service1)
4786 .expect("Failed to register service1");
4787
4788 std::thread::sleep(Duration::from_secs(2));
4790
4791 let client = ServiceDaemon::new().expect("failed to start client");
4793
4794 let receiver = client.browse(ty_domain).unwrap();
4795
4796 let timeout = Duration::from_secs(3);
4797 let mut got_data = false;
4798
4799 while let Ok(event) = receiver.recv_timeout(timeout) {
4800 if let ServiceEvent::ServiceResolved(_) = event {
4801 println!("Received ServiceResolved event");
4802 got_data = true;
4803 break;
4804 }
4805 }
4806
4807 assert!(got_data, "Should receive ServiceResolved event");
4808
4809 client.set_ip_check_interval(1).unwrap();
4811
4812 println!("Shutting down interface {}", &intf_name);
4814 client.test_down_interface(&intf_name).unwrap();
4815
4816 let mut got_removed = false;
4817
4818 while let Ok(event) = receiver.recv_timeout(timeout) {
4819 if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
4820 got_removed = true;
4821 println!("removed: {ty_domain} : {instance}");
4822 break;
4823 }
4824 }
4825 assert!(got_removed, "Should receive ServiceRemoved event");
4826
4827 println!("Bringing up interface {}", &intf_name);
4828 client.test_up_interface(&intf_name).unwrap();
4829 let mut got_data = false;
4830 while let Ok(event) = receiver.recv_timeout(timeout) {
4831 if let ServiceEvent::ServiceResolved(resolved) = event {
4832 got_data = true;
4833 println!("Received ServiceResolved: {:?}", resolved);
4834 break;
4835 }
4836 }
4837 assert!(
4838 got_data,
4839 "Should receive ServiceResolved event after interface is back up"
4840 );
4841
4842 server1.shutdown().unwrap();
4843 client.shutdown().unwrap();
4844 }
4845
4846 #[test]
4847 fn test_cache_only() {
4848 let service_type = "_cache_only._udp.local.";
4850 let instance = "test_instance";
4851 let host_name = "cache_only_host.local.";
4852 let service_ip_addr = my_ip_interfaces(false)
4853 .iter()
4854 .find(|iface| iface.ip().is_ipv4())
4855 .map(|iface| iface.ip())
4856 .unwrap();
4857
4858 let mut my_service = ServiceInfo::new(
4859 service_type,
4860 instance,
4861 host_name,
4862 service_ip_addr,
4863 5023,
4864 None,
4865 )
4866 .unwrap();
4867
4868 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4870
4871 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4872
4873 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4876 std::thread::sleep(Duration::from_secs(2));
4877
4878 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4880 let result = mdns_server.register(my_service);
4881 assert!(result.is_ok());
4882
4883 let timeout = Duration::from_millis(1500); let mut resolved = false;
4885
4886 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4888 if let ServiceEvent::ServiceResolved(info) = event {
4889 resolved = true;
4890 println!("Resolved a service of {}", &info.get_fullname());
4891 break;
4892 }
4893 }
4894
4895 assert!(resolved);
4896
4897 mdns_server.shutdown().unwrap();
4899 mdns_client.shutdown().unwrap();
4900 }
4901
4902 #[test]
4903 fn test_cache_only_unsolicited() {
4904 let service_type = "_cache_only._udp.local.";
4906 let instance = "test_instance";
4907 let host_name = "cache_only_host.local.";
4908 let service_ip_addr = my_ip_interfaces(false)
4909 .iter()
4910 .find(|iface| iface.ip().is_ipv4())
4911 .map(|iface| iface.ip())
4912 .unwrap();
4913
4914 let mut my_service = ServiceInfo::new(
4915 service_type,
4916 instance,
4917 host_name,
4918 service_ip_addr,
4919 5023,
4920 None,
4921 )
4922 .unwrap();
4923
4924 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4926
4927 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4929 let result = mdns_server.register(my_service);
4930 assert!(result.is_ok());
4931
4932 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4933 mdns_client.accept_unsolicited(true).unwrap();
4934
4935 std::thread::sleep(Duration::from_secs(2));
4938 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4939 let timeout = Duration::from_millis(1500); let mut resolved = false;
4941
4942 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4944 if let ServiceEvent::ServiceResolved(info) = event {
4945 resolved = true;
4946 println!("Resolved a service of {}", &info.get_fullname());
4947 break;
4948 }
4949 }
4950
4951 assert!(resolved);
4952
4953 mdns_server.shutdown().unwrap();
4955 mdns_client.shutdown().unwrap();
4956 }
4957}