1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42 Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50 cmp::{self, Reverse},
51 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52 fmt, io,
53 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54 str, thread,
55 time::Duration,
56 vec,
57};
58
59pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71const MDNS_PORT: u16 = 5353;
72const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
73const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
74const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
75
76const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
77
78#[derive(Debug)]
80pub enum UnregisterStatus {
81 OK,
83 NotFound,
85}
86
87#[derive(Debug, PartialEq, Clone, Eq)]
89#[non_exhaustive]
90pub enum DaemonStatus {
91 Running,
93
94 Shutdown,
96}
97
98#[derive(Hash, Eq, PartialEq)]
101enum Counter {
102 Register,
103 RegisterResend,
104 Unregister,
105 UnregisterResend,
106 Browse,
107 ResolveHostname,
108 Respond,
109 CacheRefreshPTR,
110 CacheRefreshSrvTxt,
111 CacheRefreshAddr,
112 KnownAnswerSuppression,
113 CachedPTR,
114 CachedSRV,
115 CachedAddr,
116 CachedTxt,
117 CachedNSec,
118 CachedSubtype,
119 DnsRegistryProbe,
120 DnsRegistryActive,
121 DnsRegistryTimer,
122 DnsRegistryNameChange,
123 Timer,
124}
125
126impl fmt::Display for Counter {
127 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128 match self {
129 Self::Register => write!(f, "register"),
130 Self::RegisterResend => write!(f, "register-resend"),
131 Self::Unregister => write!(f, "unregister"),
132 Self::UnregisterResend => write!(f, "unregister-resend"),
133 Self::Browse => write!(f, "browse"),
134 Self::ResolveHostname => write!(f, "resolve-hostname"),
135 Self::Respond => write!(f, "respond"),
136 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
138 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140 Self::CachedPTR => write!(f, "cached-ptr"),
141 Self::CachedSRV => write!(f, "cached-srv"),
142 Self::CachedAddr => write!(f, "cached-addr"),
143 Self::CachedTxt => write!(f, "cached-txt"),
144 Self::CachedNSec => write!(f, "cached-nsec"),
145 Self::CachedSubtype => write!(f, "cached-subtype"),
146 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150 Self::Timer => write!(f, "timer"),
151 }
152 }
153}
154
155struct MyUdpSocket {
160 pktinfo: PktInfoUdpSocket,
163
164 mio: MioUdpSocket,
167}
168
169impl MyUdpSocket {
170 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
171 let std_sock = pktinfo.try_clone_std()?;
172 let mio = MioUdpSocket::from_std(std_sock);
173
174 Ok(Self { pktinfo, mio })
175 }
176}
177
178impl Source for MyUdpSocket {
180 fn register(
181 &mut self,
182 registry: &Registry,
183 token: Token,
184 interests: Interest,
185 ) -> io::Result<()> {
186 self.mio.register(registry, token, interests)
187 }
188
189 fn reregister(
190 &mut self,
191 registry: &Registry,
192 token: Token,
193 interests: Interest,
194 ) -> io::Result<()> {
195 self.mio.reregister(registry, token, interests)
196 }
197
198 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
199 self.mio.deregister(registry)
200 }
201}
202
203pub type Metrics = HashMap<String, i64>;
206
207const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
215pub struct ServiceDaemon {
216 sender: Sender<Command>,
218
219 signal_addr: SocketAddr,
225}
226
227impl ServiceDaemon {
228 pub fn new() -> Result<Self> {
233 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
236
237 let signal_sock = UdpSocket::bind(signal_addr)
238 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
239
240 let signal_addr = signal_sock
242 .local_addr()
243 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
244
245 signal_sock
247 .set_nonblocking(true)
248 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
249
250 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
251
252 let (sender, receiver) = bounded(100);
253
254 let mio_sock = MioUdpSocket::from_std(signal_sock);
256 thread::Builder::new()
257 .name("mDNS_daemon".to_string())
258 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
259 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
260
261 Ok(Self {
262 sender,
263 signal_addr,
264 })
265 }
266
267 fn send_cmd(&self, cmd: Command) -> Result<()> {
270 let cmd_name = cmd.to_string();
271
272 self.sender.try_send(cmd).map_err(|e| match e {
274 TrySendError::Full(_) => Error::Again,
275 e => e_fmt!("flume::channel::send failed: {}", e),
276 })?;
277
278 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280 let socket = UdpSocket::bind(addr)
281 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
282 socket
283 .send_to(cmd_name.as_bytes(), self.signal_addr)
284 .map_err(|e| {
285 e_fmt!(
286 "signal socket send_to {} ({}) failed: {}",
287 self.signal_addr,
288 cmd_name,
289 e
290 )
291 })?;
292
293 Ok(())
294 }
295
296 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
307 check_domain_suffix(service_type)?;
308
309 let (resp_s, resp_r) = bounded(10);
310 self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
311 Ok(resp_r)
312 }
313
314 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
319 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
320 }
321
322 pub fn resolve_hostname(
330 &self,
331 hostname: &str,
332 timeout: Option<u64>,
333 ) -> Result<Receiver<HostnameResolutionEvent>> {
334 check_hostname(hostname)?;
335 let (resp_s, resp_r) = bounded(10);
336 self.send_cmd(Command::ResolveHostname(
337 hostname.to_string(),
338 1,
339 resp_s,
340 timeout,
341 ))?;
342 Ok(resp_r)
343 }
344
345 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
350 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
351 }
352
353 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
361 check_service_name(service_info.get_fullname())?;
362 check_hostname(service_info.get_hostname())?;
363
364 self.send_cmd(Command::Register(service_info))
365 }
366
367 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
375 let (resp_s, resp_r) = bounded(1);
376 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
377 Ok(resp_r)
378 }
379
380 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
384 let (resp_s, resp_r) = bounded(100);
385 self.send_cmd(Command::Monitor(resp_s))?;
386 Ok(resp_r)
387 }
388
389 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
394 let (resp_s, resp_r) = bounded(1);
395 self.send_cmd(Command::Exit(resp_s))?;
396 Ok(resp_r)
397 }
398
399 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
405 let (resp_s, resp_r) = bounded(1);
406
407 if self.sender.is_disconnected() {
408 resp_s
409 .send(DaemonStatus::Shutdown)
410 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
411 } else {
412 self.send_cmd(Command::GetStatus(resp_s))?;
413 }
414
415 Ok(resp_r)
416 }
417
418 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
423 let (resp_s, resp_r) = bounded(1);
424 self.send_cmd(Command::GetMetrics(resp_s))?;
425 Ok(resp_r)
426 }
427
428 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
435 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
438 return Err(Error::Msg(format!(
439 "service name length max {len_max} is too large"
440 )));
441 }
442
443 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
444 }
445
446 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
452 let interval_in_millis = interval_in_secs as u64 * 1000;
453 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
454 interval_in_millis,
455 )))
456 }
457
458 pub fn get_ip_check_interval(&self) -> Result<u32> {
460 let (resp_s, resp_r) = bounded(1);
461 self.send_cmd(Command::GetOption(resp_s))?;
462
463 let option = resp_r
464 .recv_timeout(Duration::from_secs(10))
465 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
466 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
467 Ok(ip_check_interval_in_secs as u32)
468 }
469
470 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
477 let if_kind_vec = if_kind.into_vec();
478 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
479 if_kind_vec.kinds,
480 )))
481 }
482
483 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
490 let if_kind_vec = if_kind.into_vec();
491 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
492 if_kind_vec.kinds,
493 )))
494 }
495
496 #[cfg(test)]
497 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
498 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
499 ifname.to_string(),
500 )))
501 }
502
503 #[cfg(test)]
504 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
505 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
506 ifname.to_string(),
507 )))
508 }
509
510 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
526 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
527 }
528
529 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
545 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
546 }
547
548 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
561 self.send_cmd(Command::Verify(instance_fullname, timeout))
562 }
563
564 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
565 let mut zc = Zeroconf::new(signal_sock, poller);
566
567 if let Some(cmd) = zc.run(receiver) {
568 match cmd {
569 Command::Exit(resp_s) => {
570 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
573 debug!("exit: failed to send response of shutdown: {}", e);
574 }
575 }
576 _ => {
577 debug!("Unexpected command: {:?}", cmd);
578 }
579 }
580 }
581 }
582}
583
584fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
586 let intf_ip = &intf.ip();
589 match intf_ip {
590 IpAddr::V4(ip) => {
591 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
592 let sock = new_socket(addr.into(), true)?;
593
594 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
596 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
597
598 sock.set_multicast_if_v4(ip)
600 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
601
602 sock.set_multicast_ttl_v4(255)
607 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
608
609 if !should_loop {
610 sock.set_multicast_loop_v4(false)
611 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
612 }
613
614 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
616 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
617 for packet in test_packets {
618 sock.send_to(&packet, &multicast_addr)
619 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
620 }
621 MyUdpSocket::new(sock)
622 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
623 }
624 IpAddr::V6(ip) => {
625 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
626 let sock = new_socket(addr.into(), true)?;
627
628 let if_index = intf.index.unwrap_or(0);
629
630 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
632 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
633
634 sock.set_multicast_if_v6(if_index)
636 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
637
638 MyUdpSocket::new(sock)
643 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
644 }
645 }
646}
647
648fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
651 let domain = match addr {
652 SocketAddr::V4(_) => socket2::Domain::IPV4,
653 SocketAddr::V6(_) => socket2::Domain::IPV6,
654 };
655
656 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
657
658 fd.set_reuse_address(true)
659 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
660 #[cfg(unix)] fd.set_reuse_port(true)
662 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
663
664 if non_block {
665 fd.set_nonblocking(true)
666 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
667 }
668
669 fd.bind(&addr.into())
670 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
671
672 trace!("new socket bind to {}", &addr);
673 Ok(fd)
674}
675
676struct ReRun {
678 next_time: u64,
680 command: Command,
681}
682
683#[derive(Debug, Clone)]
687#[non_exhaustive]
688pub enum IfKind {
689 All,
691
692 IPv4,
694
695 IPv6,
697
698 Name(String),
700
701 Addr(IpAddr),
703
704 LoopbackV4,
709
710 LoopbackV6,
712}
713
714impl IfKind {
715 fn matches(&self, intf: &Interface) -> bool {
717 match self {
718 Self::All => true,
719 Self::IPv4 => intf.ip().is_ipv4(),
720 Self::IPv6 => intf.ip().is_ipv6(),
721 Self::Name(ifname) => ifname == &intf.name,
722 Self::Addr(addr) => addr == &intf.ip(),
723 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
724 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
725 }
726 }
727}
728
729impl From<&str> for IfKind {
732 fn from(val: &str) -> Self {
733 Self::Name(val.to_string())
734 }
735}
736
737impl From<&String> for IfKind {
738 fn from(val: &String) -> Self {
739 Self::Name(val.to_string())
740 }
741}
742
743impl From<IpAddr> for IfKind {
745 fn from(val: IpAddr) -> Self {
746 Self::Addr(val)
747 }
748}
749
750pub struct IfKindVec {
752 kinds: Vec<IfKind>,
753}
754
755pub trait IntoIfKindVec {
757 fn into_vec(self) -> IfKindVec;
758}
759
760impl<T: Into<IfKind>> IntoIfKindVec for T {
761 fn into_vec(self) -> IfKindVec {
762 let if_kind: IfKind = self.into();
763 IfKindVec {
764 kinds: vec![if_kind],
765 }
766 }
767}
768
769impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
770 fn into_vec(self) -> IfKindVec {
771 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
772 IfKindVec { kinds }
773 }
774}
775
776struct IfSelection {
778 if_kind: IfKind,
780
781 selected: bool,
783}
784
785struct Zeroconf {
787 my_intfs: HashMap<u32, MyIntf>,
789
790 ipv4_sock: MyUdpSocket,
792
793 ipv6_sock: MyUdpSocket,
795
796 my_services: HashMap<String, ServiceInfo>,
798
799 cache: DnsCache,
801
802 dns_registry_map: HashMap<u32, DnsRegistry>,
804
805 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
816
817 counters: Metrics,
818
819 poller: Poll,
821
822 monitors: Vec<Sender<DaemonEvent>>,
824
825 service_name_len_max: u8,
827
828 ip_check_interval: u64,
830
831 if_selections: Vec<IfSelection>,
833
834 signal_sock: MioUdpSocket,
836
837 timers: BinaryHeap<Reverse<u64>>,
843
844 status: DaemonStatus,
845
846 pending_resolves: HashSet<String>,
848
849 resolved: HashSet<String>,
851
852 multicast_loop_v4: bool,
853
854 multicast_loop_v6: bool,
855
856 #[cfg(test)]
857 test_down_interfaces: HashSet<String>,
858}
859
860fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
862 let intf_ip = &intf.ip();
863 match intf_ip {
864 IpAddr::V4(ip) => {
865 debug!("join multicast group V4 on addr {}", ip);
867 my_sock
868 .join_multicast_v4(&GROUP_ADDR_V4, ip)
869 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
870 }
871 IpAddr::V6(ip) => {
872 let if_index = intf.index.unwrap_or(0);
873 debug!(
875 "join multicast group V6 on addr {} with index {}",
876 ip, if_index
877 );
878 my_sock
879 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
880 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
881 }
882 }
883 Ok(())
884}
885
886impl Zeroconf {
887 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
888 let my_ifaddrs = my_ip_interfaces(false);
890
891 let mut my_intfs = HashMap::new();
895 let mut dns_registry_map = HashMap::new();
896
897 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
900 let sock = new_socket(addr.into(), true).unwrap();
901
902 sock.set_multicast_ttl_v4(255)
907 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
908 .unwrap();
909
910 let ipv4_sock = MyUdpSocket::new(sock).unwrap();
912
913 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
914 let sock = new_socket(addr.into(), true).unwrap();
915
916 sock.set_multicast_hops_v6(255)
920 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
921 .unwrap();
922
923 let ipv6_sock = MyUdpSocket::new(sock).unwrap();
925
926 for intf in my_ifaddrs {
928 let sock = if intf.ip().is_ipv4() {
929 &ipv4_sock
930 } else {
931 &ipv6_sock
932 };
933
934 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
935 debug!(
936 "config socket to join multicast: {}: {e}. Skipped.",
937 &intf.ip()
938 );
939 }
940
941 let if_index = intf.index.unwrap_or(0);
942
943 dns_registry_map
945 .entry(if_index)
946 .or_insert_with(DnsRegistry::new);
947
948 my_intfs
949 .entry(if_index)
950 .and_modify(|v: &mut MyIntf| {
951 v.addrs.insert(intf.addr.clone());
952 })
953 .or_insert(MyIntf {
954 name: intf.name.clone(),
955 index: if_index,
956 addrs: HashSet::from([intf.addr]),
957 });
958 }
959
960 let monitors = Vec::new();
961 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
962 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
963
964 let timers = BinaryHeap::new();
965
966 let if_selections = vec![
968 IfSelection {
969 if_kind: IfKind::LoopbackV4,
970 selected: false,
971 },
972 IfSelection {
973 if_kind: IfKind::LoopbackV6,
974 selected: false,
975 },
976 ];
977
978 let status = DaemonStatus::Running;
979
980 Self {
981 my_intfs,
982 ipv4_sock,
983 ipv6_sock,
984 my_services: HashMap::new(),
986 cache: DnsCache::new(),
987 dns_registry_map,
988 hostname_resolvers: HashMap::new(),
989 service_queriers: HashMap::new(),
990 retransmissions: Vec::new(),
991 counters: HashMap::new(),
992 poller,
993 monitors,
994 service_name_len_max,
995 ip_check_interval,
996 if_selections,
997 signal_sock,
998 timers,
999 status,
1000 pending_resolves: HashSet::new(),
1001 resolved: HashSet::new(),
1002 multicast_loop_v4: true,
1003 multicast_loop_v6: true,
1004
1005 #[cfg(test)]
1006 test_down_interfaces: HashSet::new(),
1007 }
1008 }
1009
1010 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1019 if let Err(e) = self.poller.registry().register(
1021 &mut self.signal_sock,
1022 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1023 mio::Interest::READABLE,
1024 ) {
1025 debug!("failed to add signal socket to the poller: {}", e);
1026 return None;
1027 }
1028
1029 if let Err(e) = self.poller.registry().register(
1030 &mut self.ipv4_sock,
1031 mio::Token(IPV4_SOCK_EVENT_KEY),
1032 mio::Interest::READABLE,
1033 ) {
1034 debug!("failed to register ipv4 socket: {}", e);
1035 return None;
1036 }
1037
1038 if let Err(e) = self.poller.registry().register(
1039 &mut self.ipv6_sock,
1040 mio::Token(IPV6_SOCK_EVENT_KEY),
1041 mio::Interest::READABLE,
1042 ) {
1043 debug!("failed to register ipv6 socket: {}", e);
1044 return None;
1045 }
1046
1047 let mut next_ip_check = if self.ip_check_interval > 0 {
1049 current_time_millis() + self.ip_check_interval
1050 } else {
1051 0
1052 };
1053
1054 if next_ip_check > 0 {
1055 self.add_timer(next_ip_check);
1056 }
1057
1058 let mut events = mio::Events::with_capacity(1024);
1061 loop {
1062 let now = current_time_millis();
1063
1064 let earliest_timer = self.peek_earliest_timer();
1065 let timeout = earliest_timer.map(|timer| {
1066 let millis = if timer > now { timer - now } else { 1 };
1068 Duration::from_millis(millis)
1069 });
1070
1071 events.clear();
1073 match self.poller.poll(&mut events, timeout) {
1074 Ok(_) => self.handle_poller_events(&events),
1075 Err(e) => debug!("failed to select from sockets: {}", e),
1076 }
1077
1078 let now = current_time_millis();
1079
1080 self.pop_timers_till(now);
1082
1083 for hostname in self
1085 .hostname_resolvers
1086 .clone()
1087 .into_iter()
1088 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1089 .map(|(hostname, _)| hostname)
1090 {
1091 trace!("hostname resolver timeout for {}", &hostname);
1092 call_hostname_resolution_listener(
1093 &self.hostname_resolvers,
1094 &hostname,
1095 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1096 );
1097 call_hostname_resolution_listener(
1098 &self.hostname_resolvers,
1099 &hostname,
1100 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1101 );
1102 self.hostname_resolvers.remove(&hostname);
1103 }
1104
1105 while let Ok(command) = receiver.try_recv() {
1107 if matches!(command, Command::Exit(_)) {
1108 self.status = DaemonStatus::Shutdown;
1109 return Some(command);
1110 }
1111 self.exec_command(command, false);
1112 }
1113
1114 let mut i = 0;
1116 while i < self.retransmissions.len() {
1117 if now >= self.retransmissions[i].next_time {
1118 let rerun = self.retransmissions.remove(i);
1119 self.exec_command(rerun.command, true);
1120 } else {
1121 i += 1;
1122 }
1123 }
1124
1125 self.refresh_active_services();
1127
1128 let mut query_count = 0;
1130 for (hostname, _sender) in self.hostname_resolvers.iter() {
1131 for (hostname, ip_addr) in
1132 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1133 {
1134 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1135 query_count += 1;
1136 }
1137 }
1138
1139 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1140
1141 let now = current_time_millis();
1143
1144 let expired_services = self.cache.evict_expired_services(now);
1146 if !expired_services.is_empty() {
1147 debug!(
1148 "run: send {} service removal to listeners",
1149 expired_services.len()
1150 );
1151 self.notify_service_removal(expired_services);
1152 }
1153
1154 let expired_addrs = self.cache.evict_expired_addr(now);
1156 for (hostname, addrs) in expired_addrs {
1157 call_hostname_resolution_listener(
1158 &self.hostname_resolvers,
1159 &hostname,
1160 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1161 );
1162 let instances = self.cache.get_instances_on_host(&hostname);
1163 let instance_set: HashSet<String> = instances.into_iter().collect();
1164 self.resolve_updated_instances(&instance_set);
1165 }
1166
1167 self.probing_handler();
1169
1170 if now >= next_ip_check && next_ip_check > 0 {
1172 next_ip_check = now + self.ip_check_interval;
1173 self.add_timer(next_ip_check);
1174
1175 self.check_ip_changes();
1176 }
1177 }
1178 }
1179
1180 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1181 match daemon_opt {
1182 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1183 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1184 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1185 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1186 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1187 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1188 #[cfg(test)]
1189 DaemonOption::TestDownInterface(ifname) => {
1190 self.test_down_interfaces.insert(ifname);
1191 }
1192 #[cfg(test)]
1193 DaemonOption::TestUpInterface(ifname) => {
1194 self.test_down_interfaces.remove(&ifname);
1195 }
1196 }
1197 }
1198
1199 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1200 debug!("enable_interface: {:?}", kinds);
1201 for if_kind in kinds {
1202 self.if_selections.push(IfSelection {
1203 if_kind,
1204 selected: true,
1205 });
1206 }
1207
1208 self.apply_intf_selections(my_ip_interfaces(true));
1209 }
1210
1211 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1212 debug!("disable_interface: {:?}", kinds);
1213 for if_kind in kinds {
1214 self.if_selections.push(IfSelection {
1215 if_kind,
1216 selected: false,
1217 });
1218 }
1219
1220 self.apply_intf_selections(my_ip_interfaces(true));
1221 }
1222
1223 fn set_multicast_loop_v4(&mut self, on: bool) {
1224 self.multicast_loop_v4 = on;
1225 self.ipv4_sock
1226 .pktinfo
1227 .set_multicast_loop_v4(on)
1228 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1229 .unwrap();
1230 }
1231
1232 fn set_multicast_loop_v6(&mut self, on: bool) {
1233 self.multicast_loop_v6 = on;
1234 self.ipv6_sock
1235 .pktinfo
1236 .set_multicast_loop_v6(on)
1237 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1238 .unwrap();
1239 }
1240
1241 fn notify_monitors(&mut self, event: DaemonEvent) {
1242 self.monitors.retain(|sender| {
1244 if let Err(e) = sender.try_send(event.clone()) {
1245 debug!("notify_monitors: try_send: {}", &e);
1246 if matches!(e, TrySendError::Disconnected(_)) {
1247 return false; }
1249 }
1250 true
1251 });
1252 }
1253
1254 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1256 for (_, service_info) in self.my_services.iter_mut() {
1257 if service_info.is_addr_auto() {
1258 service_info.remove_ipaddr(addr);
1259 }
1260 }
1261 }
1262
1263 fn add_timer(&mut self, next_time: u64) {
1264 self.timers.push(Reverse(next_time));
1265 }
1266
1267 fn peek_earliest_timer(&self) -> Option<u64> {
1268 self.timers.peek().map(|Reverse(v)| *v)
1269 }
1270
1271 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1272 self.timers.pop().map(|Reverse(v)| v)
1273 }
1274
1275 fn pop_timers_till(&mut self, now: u64) {
1277 while let Some(Reverse(v)) = self.timers.peek() {
1278 if *v > now {
1279 break;
1280 }
1281 self.timers.pop();
1282 }
1283 }
1284
1285 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1287 let intf_count = interfaces.len();
1288 let mut intf_selections = vec![true; intf_count];
1289
1290 for selection in self.if_selections.iter() {
1292 for i in 0..intf_count {
1294 if selection.if_kind.matches(&interfaces[i]) {
1295 intf_selections[i] = selection.selected;
1296 }
1297 }
1298 }
1299
1300 let mut selected_addrs = HashSet::new();
1301 for i in 0..intf_count {
1302 if intf_selections[i] {
1303 selected_addrs.insert(interfaces[i].addr.ip());
1304 }
1305 }
1306
1307 selected_addrs
1308 }
1309
1310 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1315 let intf_count = interfaces.len();
1317 let mut intf_selections = vec![true; intf_count];
1318
1319 for selection in self.if_selections.iter() {
1321 for i in 0..intf_count {
1323 if selection.if_kind.matches(&interfaces[i]) {
1324 intf_selections[i] = selection.selected;
1325 }
1326 }
1327 }
1328
1329 for (idx, intf) in interfaces.into_iter().enumerate() {
1331 if intf_selections[idx] {
1332 self.add_interface(intf);
1334 } else {
1335 self.del_interface(&intf);
1337 }
1338 }
1339 }
1340
1341 fn del_ip(&mut self, ip: IpAddr) {
1342 self.del_addr_in_my_services(&ip);
1343 self.notify_monitors(DaemonEvent::IpDel(ip));
1344 }
1345
1346 fn check_ip_changes(&mut self) {
1348 let my_ifaddrs = my_ip_interfaces(true);
1350
1351 #[cfg(test)]
1352 let my_ifaddrs: Vec<_> = my_ifaddrs
1353 .into_iter()
1354 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1355 .collect();
1356
1357 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1358 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1359 let if_index = intf.index.unwrap_or(0);
1360 acc.entry(if_index).or_default().push(&intf.addr);
1361 acc
1362 });
1363
1364 let mut deleted_intfs = Vec::new();
1365 let mut deleted_ips = Vec::new();
1366
1367 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1368 let mut last_ipv4 = None;
1369 let mut last_ipv6 = None;
1370
1371 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1372 my_intf.addrs.retain(|addr| {
1373 if current_addrs.contains(&addr) {
1374 true
1375 } else {
1376 match addr.ip() {
1377 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1378 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1379 }
1380 deleted_ips.push(addr.ip());
1381 false
1382 }
1383 });
1384 if my_intf.addrs.is_empty() {
1385 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1386 }
1387 } else {
1388 debug!(
1390 "check_ip_changes: interface {} ({}) no longer exists, removing",
1391 my_intf.name, if_index
1392 );
1393 for addr in my_intf.addrs.iter() {
1394 match addr.ip() {
1395 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1396 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1397 }
1398 deleted_ips.push(addr.ip())
1399 }
1400 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1401 }
1402 }
1403
1404 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1405 debug!(
1406 "check_ip_changes: {} deleted ips {} deleted intfs",
1407 deleted_ips.len(),
1408 deleted_intfs.len()
1409 );
1410 }
1411
1412 for ip in deleted_ips {
1413 self.del_ip(ip);
1414 }
1415
1416 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1417 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1418 continue;
1419 };
1420
1421 if let Some(ipv4) = last_ipv4 {
1422 debug!("leave multicast for {ipv4}");
1423 if let Err(e) = self
1424 .ipv4_sock
1425 .pktinfo
1426 .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1427 {
1428 debug!("leave multicast group for addr {ipv4}: {e}");
1429 }
1430 }
1431
1432 if let Some(ipv6) = last_ipv6 {
1433 debug!("leave multicast for {ipv6}");
1434 if let Err(e) = self
1435 .ipv6_sock
1436 .pktinfo
1437 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1438 {
1439 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1440 }
1441 }
1442
1443 let intf_id = InterfaceId {
1445 name: my_intf.name.to_string(),
1446 index: my_intf.index,
1447 };
1448 let removed_instances = self.cache.remove_records_on_intf(intf_id);
1449 self.notify_service_removal(removed_instances);
1450 }
1451
1452 self.apply_intf_selections(my_ifaddrs);
1454 }
1455
1456 fn del_interface(&mut self, intf: &Interface) {
1457 let if_index = intf.index.unwrap_or(0);
1458 trace!(
1459 "del_interface: {} ({if_index}) addr {}",
1460 intf.name,
1461 intf.ip()
1462 );
1463
1464 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1465 debug!("del_interface: interface {} not found", intf.name);
1466 return;
1467 };
1468
1469 let mut ip_removed = false;
1470
1471 if my_intf.addrs.remove(&intf.addr) {
1472 ip_removed = true;
1473
1474 match intf.addr.ip() {
1475 IpAddr::V4(ipv4) => {
1476 if my_intf.next_ifaddr_v4().is_none() {
1477 if let Err(e) = self
1478 .ipv4_sock
1479 .pktinfo
1480 .leave_multicast_v4(&GROUP_ADDR_V4, &ipv4)
1481 {
1482 debug!("leave multicast group for addr {ipv4}: {e}");
1483 }
1484 }
1485 }
1486
1487 IpAddr::V6(ipv6) => {
1488 if my_intf.next_ifaddr_v6().is_none() {
1489 if let Err(e) = self
1490 .ipv6_sock
1491 .pktinfo
1492 .leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1493 {
1494 debug!("leave multicast group for addr {ipv6}: {e}");
1495 }
1496 }
1497 }
1498 }
1499
1500 if my_intf.addrs.is_empty() {
1501 debug!("del_interface: removing interface {}", intf.name);
1503 self.my_intfs.remove(&if_index);
1504 self.dns_registry_map.remove(&if_index);
1505 self.cache.remove_addrs_on_disabled_intf(if_index);
1506 }
1507 }
1508
1509 if ip_removed {
1510 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1512 self.del_addr_in_my_services(&intf.ip());
1514 }
1515 }
1516
1517 fn add_interface(&mut self, intf: Interface) {
1518 let sock = if intf.ip().is_ipv4() {
1519 &self.ipv4_sock
1520 } else {
1521 &self.ipv6_sock
1522 };
1523
1524 let if_index = intf.index.unwrap_or(0);
1525 let mut new_addr = false;
1526
1527 match self.my_intfs.entry(if_index) {
1528 Entry::Occupied(mut entry) => {
1529 let my_intf = entry.get_mut();
1531 if !my_intf.addrs.contains(&intf.addr) {
1532 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1533 debug!("add_interface: socket_config {}: {e}", &intf.name);
1534 }
1535 my_intf.addrs.insert(intf.addr.clone());
1536 new_addr = true;
1537 }
1538 }
1539 Entry::Vacant(entry) => {
1540 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1541 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1542 return;
1543 }
1544
1545 new_addr = true;
1546 let new_intf = MyIntf {
1547 name: intf.name.clone(),
1548 index: if_index,
1549 addrs: HashSet::from([intf.addr.clone()]),
1550 };
1551 entry.insert(new_intf);
1552 }
1553 }
1554
1555 if !new_addr {
1556 trace!("add_interface: interface {} already exists", &intf.name);
1557 return;
1558 }
1559
1560 debug!("add new interface {}: {}", intf.name, intf.ip());
1561
1562 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1563 debug!("add_interface: cannot find if_index {if_index}");
1564 return;
1565 };
1566
1567 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1568 Some(registry) => registry,
1569 None => self
1570 .dns_registry_map
1571 .entry(if_index)
1572 .or_insert_with(DnsRegistry::new),
1573 };
1574
1575 for (_, service_info) in self.my_services.iter_mut() {
1576 if service_info.is_addr_auto() {
1577 let new_ip = intf.ip();
1578 service_info.insert_ipaddr(new_ip);
1579
1580 if announce_service_on_intf(dns_registry, service_info, my_intf, &sock.pktinfo) {
1581 debug!(
1582 "Announce service {} on {}",
1583 service_info.get_fullname(),
1584 intf.ip()
1585 );
1586 service_info.set_status(if_index, ServiceStatus::Announced);
1587 } else {
1588 for timer in dns_registry.new_timers.drain(..) {
1589 self.timers.push(Reverse(timer));
1590 }
1591 service_info.set_status(if_index, ServiceStatus::Probing);
1592 }
1593 }
1594 }
1595
1596 let mut browse_reruns = Vec::new();
1598 let mut i = 0;
1599 while i < self.retransmissions.len() {
1600 if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1601 browse_reruns.push(self.retransmissions.remove(i));
1602 } else {
1603 i += 1;
1604 }
1605 }
1606
1607 for rerun in browse_reruns {
1608 self.exec_command(rerun.command, true);
1609 }
1610
1611 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1613 }
1614
1615 fn register_service(&mut self, mut info: ServiceInfo) {
1624 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1626 debug!("check_service_name_length: {}", &e);
1627 self.notify_monitors(DaemonEvent::Error(e));
1628 return;
1629 }
1630
1631 if info.is_addr_auto() {
1632 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1633 for addr in selected_addrs {
1634 info.insert_ipaddr(addr);
1635 }
1636 }
1637
1638 debug!("register service {:?}", &info);
1639
1640 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1641 if !outgoing_addrs.is_empty() {
1642 self.notify_monitors(DaemonEvent::Announce(
1643 info.get_fullname().to_string(),
1644 format!("{:?}", &outgoing_addrs),
1645 ));
1646 }
1647
1648 let service_fullname = info.get_fullname().to_lowercase();
1651 self.my_services.insert(service_fullname, info);
1652 }
1653
1654 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1657 let mut outgoing_addrs = Vec::new();
1658 let mut outgoing_intfs = HashSet::new();
1659
1660 for (if_index, intf) in self.my_intfs.iter() {
1661 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1662 Some(registry) => registry,
1663 None => self
1664 .dns_registry_map
1665 .entry(*if_index)
1666 .or_insert_with(DnsRegistry::new),
1667 };
1668
1669 let mut announced = false;
1670
1671 if announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo) {
1673 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1674 outgoing_addrs.push(addr.ip());
1675 }
1676 outgoing_intfs.insert(intf.index);
1677
1678 debug!(
1679 "Announce service IPv4 {} on {}",
1680 info.get_fullname(),
1681 intf.name
1682 );
1683 announced = true;
1684 }
1685
1686 if announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo) {
1687 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1688 outgoing_addrs.push(addr.ip());
1689 }
1690 outgoing_intfs.insert(intf.index);
1691
1692 debug!(
1693 "Announce service IPv6 {} on {}",
1694 info.get_fullname(),
1695 intf.name
1696 );
1697 announced = true;
1698 }
1699
1700 if announced {
1701 info.set_status(intf.index, ServiceStatus::Announced);
1702 } else {
1703 for timer in dns_registry.new_timers.drain(..) {
1704 self.timers.push(Reverse(timer));
1705 }
1706 info.set_status(*if_index, ServiceStatus::Probing);
1707 }
1708 }
1709
1710 let next_time = current_time_millis() + 1000;
1714 for if_index in outgoing_intfs {
1715 self.add_retransmission(
1716 next_time,
1717 Command::RegisterResend(info.get_fullname().to_string(), if_index),
1718 );
1719 }
1720
1721 outgoing_addrs
1722 }
1723
1724 fn probing_handler(&mut self) {
1726 let now = current_time_millis();
1727
1728 for (if_index, intf) in self.my_intfs.iter() {
1729 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1730 continue;
1731 };
1732
1733 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1734
1735 if !out.questions().is_empty() {
1737 trace!("sending out probing of questions: {:?}", out.questions());
1738 send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1739 send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1740 }
1741
1742 let waiting_services =
1744 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1745
1746 for service_name in waiting_services {
1747 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1749 if info.get_status(*if_index) == ServiceStatus::Announced {
1750 debug!("service {} already announced", info.get_fullname());
1751 continue;
1752 }
1753
1754 let announced_v4 =
1755 announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
1756 let announced_v6 =
1757 announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
1758
1759 if announced_v4 || announced_v6 {
1760 let next_time = now + 1000;
1761 let command =
1762 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1763 self.retransmissions.push(ReRun { next_time, command });
1764 self.timers.push(Reverse(next_time));
1765
1766 let fullname = match dns_registry.name_changes.get(&service_name) {
1767 Some(new_name) => new_name.to_string(),
1768 None => service_name.to_string(),
1769 };
1770
1771 let mut hostname = info.get_hostname();
1772 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1773 hostname = new_name;
1774 }
1775
1776 debug!("wake up: announce service {} on {}", fullname, intf.name);
1777 notify_monitors(
1778 &mut self.monitors,
1779 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1780 );
1781
1782 info.set_status(*if_index, ServiceStatus::Announced);
1783 }
1784 }
1785 }
1786 }
1787 }
1788
1789 fn unregister_service(
1790 &self,
1791 info: &ServiceInfo,
1792 intf: &MyIntf,
1793 sock: &PktInfoUdpSocket,
1794 ) -> Vec<u8> {
1795 let is_ipv4 = sock.domain() == Domain::IPV4;
1796
1797 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1798 out.add_answer_at_time(
1799 DnsPointer::new(
1800 info.get_type(),
1801 RRType::PTR,
1802 CLASS_IN,
1803 0,
1804 info.get_fullname().to_string(),
1805 ),
1806 0,
1807 );
1808
1809 if let Some(sub) = info.get_subtype() {
1810 trace!("Adding subdomain {}", sub);
1811 out.add_answer_at_time(
1812 DnsPointer::new(
1813 sub,
1814 RRType::PTR,
1815 CLASS_IN,
1816 0,
1817 info.get_fullname().to_string(),
1818 ),
1819 0,
1820 );
1821 }
1822
1823 out.add_answer_at_time(
1824 DnsSrv::new(
1825 info.get_fullname(),
1826 CLASS_IN | CLASS_CACHE_FLUSH,
1827 0,
1828 info.get_priority(),
1829 info.get_weight(),
1830 info.get_port(),
1831 info.get_hostname().to_string(),
1832 ),
1833 0,
1834 );
1835 out.add_answer_at_time(
1836 DnsTxt::new(
1837 info.get_fullname(),
1838 CLASS_IN | CLASS_CACHE_FLUSH,
1839 0,
1840 info.generate_txt(),
1841 ),
1842 0,
1843 );
1844
1845 let if_addrs = if is_ipv4 {
1846 info.get_addrs_on_my_intf_v4(intf)
1847 } else {
1848 info.get_addrs_on_my_intf_v6(intf)
1849 };
1850
1851 if if_addrs.is_empty() {
1852 return vec![];
1853 }
1854
1855 for address in if_addrs {
1856 out.add_answer_at_time(
1857 DnsAddress::new(
1858 info.get_hostname(),
1859 ip_address_rr_type(&address),
1860 CLASS_IN | CLASS_CACHE_FLUSH,
1861 0,
1862 address,
1863 intf.into(),
1864 ),
1865 0,
1866 );
1867 }
1868
1869 send_dns_outgoing(&out, intf, sock).remove(0)
1871 }
1872
1873 fn add_hostname_resolver(
1877 &mut self,
1878 hostname: String,
1879 listener: Sender<HostnameResolutionEvent>,
1880 timeout: Option<u64>,
1881 ) {
1882 let real_timeout = timeout.map(|t| current_time_millis() + t);
1883 self.hostname_resolvers
1884 .insert(hostname.to_lowercase(), (listener, real_timeout));
1885 if let Some(t) = real_timeout {
1886 self.add_timer(t);
1887 }
1888 }
1889
1890 fn send_query(&self, name: &str, qtype: RRType) {
1892 self.send_query_vec(&[(name, qtype)]);
1893 }
1894
1895 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1897 trace!("Sending query questions: {:?}", questions);
1898 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1899 let now = current_time_millis();
1900
1901 for (name, qtype) in questions {
1902 out.add_question(name, *qtype);
1903
1904 for record in self.cache.get_known_answers(name, *qtype, now) {
1905 trace!("add known answer: {:?}", record.record);
1913 let mut new_record = record.record.clone();
1914 new_record.get_record_mut().update_ttl(now);
1915 out.add_answer_box(new_record);
1916 }
1917 }
1918
1919 for (_, intf) in self.my_intfs.iter() {
1920 send_dns_outgoing(&out, intf, &self.ipv4_sock.pktinfo);
1921 send_dns_outgoing(&out, intf, &self.ipv6_sock.pktinfo);
1922 }
1923 }
1924
1925 fn handle_read(&mut self, event_key: usize) -> bool {
1930 let sock = match event_key {
1931 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
1932 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
1933 _ => {
1934 debug!("handle_read: unknown token {}", event_key);
1935 return false;
1936 }
1937 };
1938 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1939
1940 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
1947 Ok(sz) => sz,
1948 Err(e) => {
1949 if e.kind() != std::io::ErrorKind::WouldBlock {
1950 debug!("listening socket read failed: {}", e);
1951 }
1952 return false;
1953 }
1954 };
1955
1956 let pkt_if_index = pktinfo.if_index as u32;
1958 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
1959 debug!(
1960 "handle_read: no interface found for pktinfo if_index: {}",
1961 pktinfo.if_index
1962 );
1963 return true; };
1965
1966 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
1969 Ok(msg) => {
1970 if msg.is_query() {
1971 self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
1972 } else if msg.is_response() {
1973 self.handle_response(msg, pkt_if_index);
1974 } else {
1975 debug!("Invalid message: not query and not response");
1976 }
1977 }
1978 Err(e) => debug!("Invalid incoming DNS message: {}", e),
1979 }
1980
1981 true
1982 }
1983
1984 fn query_unresolved(&mut self, instance: &str) -> bool {
1986 if !valid_instance_name(instance) {
1987 trace!("instance name {} not valid", instance);
1988 return false;
1989 }
1990
1991 if let Some(records) = self.cache.get_srv(instance) {
1992 for record in records {
1993 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
1994 if self.cache.get_addr(srv.host()).is_none() {
1995 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1996 return true;
1997 }
1998 }
1999 }
2000 } else {
2001 self.send_query(instance, RRType::ANY);
2002 return true;
2003 }
2004
2005 false
2006 }
2007
2008 fn query_cache_for_service(
2011 &mut self,
2012 ty_domain: &str,
2013 sender: &Sender<ServiceEvent>,
2014 now: u64,
2015 ) {
2016 let mut resolved: HashSet<String> = HashSet::new();
2017 let mut unresolved: HashSet<String> = HashSet::new();
2018
2019 if let Some(records) = self.cache.get_ptr(ty_domain) {
2020 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2021 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2022 let mut new_event = None;
2023 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2024 Ok(resolved_service) => {
2025 if resolved_service.is_valid() {
2026 debug!("Resolved service from cache: {}", ptr.alias());
2027 new_event =
2028 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2029 } else {
2030 debug!("Resolved service is not valid: {}", ptr.alias());
2031 }
2032 }
2033 Err(err) => {
2034 debug!("Error while resolving service from cache: {}", err);
2035 continue;
2036 }
2037 }
2038
2039 match sender.send(ServiceEvent::ServiceFound(
2040 ty_domain.to_string(),
2041 ptr.alias().to_string(),
2042 )) {
2043 Ok(()) => debug!("sent service found {}", ptr.alias()),
2044 Err(e) => {
2045 debug!("failed to send service found: {}", e);
2046 continue;
2047 }
2048 }
2049
2050 if let Some(event) = new_event {
2051 resolved.insert(ptr.alias().to_string());
2052 match sender.send(event) {
2053 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2054 Err(e) => debug!("failed to send service resolved: {}", e),
2055 }
2056 } else {
2057 unresolved.insert(ptr.alias().to_string());
2058 }
2059 }
2060 }
2061 }
2062
2063 for instance in resolved.drain() {
2064 self.pending_resolves.remove(&instance);
2065 self.resolved.insert(instance);
2066 }
2067
2068 for instance in unresolved.drain() {
2069 self.add_pending_resolve(instance);
2070 }
2071 }
2072
2073 fn query_cache_for_hostname(
2076 &mut self,
2077 hostname: &str,
2078 sender: Sender<HostnameResolutionEvent>,
2079 ) {
2080 let addresses_map = self.cache.get_addresses_for_host(hostname);
2081 for (name, addresses) in addresses_map {
2082 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2083 Ok(()) => trace!("sent hostname addresses found"),
2084 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2085 }
2086 }
2087 }
2088
2089 fn add_pending_resolve(&mut self, instance: String) {
2090 if !self.pending_resolves.contains(&instance) {
2091 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2092 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2093 self.pending_resolves.insert(instance);
2094 }
2095 }
2096
2097 fn resolve_service_from_cache(
2099 &self,
2100 ty_domain: &str,
2101 fullname: &str,
2102 ) -> Result<ResolvedService> {
2103 let now = current_time_millis();
2104 let mut resolved_service = ResolvedService {
2105 ty_domain: ty_domain.to_string(),
2106 sub_ty_domain: None,
2107 fullname: fullname.to_string(),
2108 host: String::new(),
2109 port: 0,
2110 addresses: HashSet::new(),
2111 txt_properties: TxtProperties::new(),
2112 };
2113
2114 if let Some(subtype) = self.cache.get_subtype(fullname) {
2116 trace!(
2117 "ty_domain: {} found subtype {} for instance: {}",
2118 ty_domain,
2119 subtype,
2120 fullname
2121 );
2122 if resolved_service.sub_ty_domain.is_none() {
2123 resolved_service.sub_ty_domain = Some(subtype.to_string());
2124 }
2125 }
2126
2127 if let Some(records) = self.cache.get_srv(fullname) {
2129 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2130 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2131 resolved_service.host = dns_srv.host().to_string();
2132 resolved_service.port = dns_srv.port();
2133 }
2134 }
2135 }
2136
2137 if let Some(records) = self.cache.get_txt(fullname) {
2139 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2140 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2141 resolved_service.txt_properties = dns_txt.text().into();
2142 }
2143 }
2144 }
2145
2146 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2148 for answer in records.iter() {
2149 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2150 if dns_a.expires_soon(now) {
2151 trace!(
2152 "Addr expired or expires soon: {}",
2153 dns_a.address().to_ip_addr()
2154 );
2155 } else {
2156 resolved_service.addresses.insert(dns_a.address());
2157 }
2158 }
2159 }
2160 }
2161
2162 Ok(resolved_service)
2163 }
2164
2165 fn handle_poller_events(&mut self, events: &mio::Events) {
2166 for ev in events.iter() {
2167 trace!("event received with key {:?}", ev.token());
2168 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2169 self.signal_sock_drain();
2171
2172 if let Err(e) = self.poller.registry().reregister(
2173 &mut self.signal_sock,
2174 ev.token(),
2175 mio::Interest::READABLE,
2176 ) {
2177 debug!("failed to modify poller for signal socket: {}", e);
2178 }
2179 continue; }
2181
2182 while self.handle_read(ev.token().0) {}
2184
2185 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2187 if let Err(e) = self.poller.registry().reregister(
2189 &mut self.ipv4_sock,
2190 ev.token(),
2191 mio::Interest::READABLE,
2192 ) {
2193 debug!("modify poller for IPv4 socket: {}", e);
2194 }
2195 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2196 if let Err(e) = self.poller.registry().reregister(
2198 &mut self.ipv6_sock,
2199 ev.token(),
2200 mio::Interest::READABLE,
2201 ) {
2202 debug!("modify poller for IPv6 socket: {}", e);
2203 }
2204 }
2205 }
2206 }
2207
2208 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2211 let now = current_time_millis();
2212
2213 let mut record_predicate = |record: &DnsRecordBox| {
2215 if !record.get_record().is_expired(now) {
2216 return true;
2217 }
2218
2219 debug!("record is expired, removing it from cache.");
2220 if self.cache.remove(record) {
2221 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2223 call_service_listener(
2224 &self.service_queriers,
2225 dns_ptr.get_name(),
2226 ServiceEvent::ServiceRemoved(
2227 dns_ptr.get_name().to_string(),
2228 dns_ptr.alias().to_string(),
2229 ),
2230 );
2231 }
2232 }
2233 false
2234 };
2235 msg.answers_mut().retain(&mut record_predicate);
2236 msg.authorities_mut().retain(&mut record_predicate);
2237 msg.additionals_mut().retain(&mut record_predicate);
2238
2239 self.conflict_handler(&msg, if_index);
2241
2242 let mut is_for_us = true; for answer in msg.answers() {
2249 if answer.get_type() == RRType::PTR {
2250 if self.service_queriers.contains_key(answer.get_name()) {
2251 is_for_us = true;
2252 break; } else {
2254 is_for_us = false;
2255 }
2256 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2257 let answer_lowercase = answer.get_name().to_lowercase();
2259 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2260 is_for_us = true;
2261 break; }
2263 }
2264 }
2265
2266 struct InstanceChange {
2268 ty: RRType, name: String, }
2271
2272 let mut changes = Vec::new();
2280 let mut timers = Vec::new();
2281 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2282 return;
2283 };
2284 for record in msg.all_records() {
2285 match self
2286 .cache
2287 .add_or_update(my_intf, record, &mut timers, is_for_us)
2288 {
2289 Some((dns_record, true)) => {
2290 timers.push(dns_record.record.get_record().get_expire_time());
2291 timers.push(dns_record.record.get_record().get_refresh_time());
2292
2293 let ty = dns_record.record.get_type();
2294 let name = dns_record.record.get_name();
2295
2296 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2298 if self.service_queriers.contains_key(name) {
2299 timers.push(dns_record.record.get_record().get_refresh_time());
2300 }
2301
2302 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2304 {
2305 debug!("calling listener with service found: {name}");
2306 call_service_listener(
2307 &self.service_queriers,
2308 name,
2309 ServiceEvent::ServiceFound(
2310 name.to_string(),
2311 dns_ptr.alias().to_string(),
2312 ),
2313 );
2314 changes.push(InstanceChange {
2315 ty,
2316 name: dns_ptr.alias().to_string(),
2317 });
2318 }
2319 } else {
2320 changes.push(InstanceChange {
2321 ty,
2322 name: name.to_string(),
2323 });
2324 }
2325 }
2326 Some((dns_record, false)) => {
2327 timers.push(dns_record.record.get_record().get_expire_time());
2328 timers.push(dns_record.record.get_record().get_refresh_time());
2329 }
2330 _ => {}
2331 }
2332 }
2333
2334 for t in timers {
2336 self.add_timer(t);
2337 }
2338
2339 for change in changes
2341 .iter()
2342 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2343 {
2344 let addr_map = self.cache.get_addresses_for_host(&change.name);
2345 for (name, addresses) in addr_map {
2346 call_hostname_resolution_listener(
2347 &self.hostname_resolvers,
2348 &change.name,
2349 HostnameResolutionEvent::AddressesFound(name, addresses),
2350 )
2351 }
2352 }
2353
2354 let mut updated_instances = HashSet::new();
2356 for update in changes {
2357 match update.ty {
2358 RRType::PTR | RRType::SRV | RRType::TXT => {
2359 updated_instances.insert(update.name);
2360 }
2361 RRType::A | RRType::AAAA => {
2362 let instances = self.cache.get_instances_on_host(&update.name);
2363 updated_instances.extend(instances);
2364 }
2365 _ => {}
2366 }
2367 }
2368
2369 self.resolve_updated_instances(&updated_instances);
2370 }
2371
2372 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2373 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2374 debug!("handle_response: no intf found for index {if_index}");
2375 return;
2376 };
2377
2378 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2379 return;
2380 };
2381
2382 for answer in msg.answers().iter() {
2383 let mut new_records = Vec::new();
2384
2385 let name = answer.get_name();
2386 let Some(probe) = dns_registry.probing.get_mut(name) else {
2387 continue;
2388 };
2389
2390 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2392 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2393 if answer_addr.interface_id.index != if_index {
2394 debug!(
2395 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2396 answer_addr, my_intf.name
2397 );
2398 continue;
2399 }
2400 }
2401
2402 let any_match = probe.records.iter().any(|r| {
2405 r.get_type() == answer.get_type()
2406 && r.get_class() == answer.get_class()
2407 && r.rrdata_match(answer.as_ref())
2408 });
2409 if any_match {
2410 continue; }
2412 }
2413
2414 probe.records.retain(|record| {
2415 if record.get_type() == answer.get_type()
2416 && record.get_class() == answer.get_class()
2417 && !record.rrdata_match(answer.as_ref())
2418 {
2419 debug!(
2420 "found conflict name: '{name}' record: {}: {} PEER: {}",
2421 record.get_type(),
2422 record.rdata_print(),
2423 answer.rdata_print()
2424 );
2425
2426 let mut new_record = record.clone();
2429 let new_name = match record.get_type() {
2430 RRType::A => hostname_change(name),
2431 RRType::AAAA => hostname_change(name),
2432 _ => name_change(name),
2433 };
2434 new_record.get_record_mut().set_new_name(new_name);
2435 new_records.push(new_record);
2436 return false; }
2438
2439 true
2440 });
2441
2442 let create_time = current_time_millis() + fastrand::u64(0..250);
2449
2450 let waiting_services = probe.waiting_services.clone();
2451
2452 for record in new_records {
2453 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2454 self.timers.push(Reverse(create_time));
2455 }
2456
2457 dns_registry.name_changes.insert(
2459 record.get_record().get_original_name().to_string(),
2460 record.get_name().to_string(),
2461 );
2462
2463 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2464 Some(p) => p,
2465 None => {
2466 let new_probe = dns_registry
2467 .probing
2468 .entry(record.get_name().to_string())
2469 .or_insert_with(|| {
2470 debug!("conflict handler: new probe of {}", record.get_name());
2471 Probe::new(create_time)
2472 });
2473 self.timers.push(Reverse(new_probe.next_send));
2474 new_probe
2475 }
2476 };
2477
2478 debug!(
2479 "insert record with new name '{}' {} into probe",
2480 record.get_name(),
2481 record.get_type()
2482 );
2483 new_probe.insert_record(record);
2484
2485 new_probe.waiting_services.extend(waiting_services.clone());
2486 }
2487 }
2488 }
2489
2490 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2497 let mut resolved: HashSet<String> = HashSet::new();
2498 let mut unresolved: HashSet<String> = HashSet::new();
2499 let mut removed_instances = HashMap::new();
2500
2501 let now = current_time_millis();
2502
2503 for (ty_domain, records) in self.cache.all_ptr().iter() {
2504 if !self.service_queriers.contains_key(ty_domain) {
2505 continue;
2507 }
2508
2509 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2510 if let Some(dns_ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2511 if updated_instances.contains(dns_ptr.alias()) {
2512 let mut instance_found = false;
2513 let mut new_event = None;
2514
2515 if let Ok(resolved) =
2516 self.resolve_service_from_cache(ty_domain, dns_ptr.alias())
2517 {
2518 debug!("resolve_updated_instances: from cache: {}", dns_ptr.alias());
2519 instance_found = true;
2520 if resolved.is_valid() {
2521 new_event = Some(ServiceEvent::ServiceResolved(Box::new(resolved)));
2522 } else {
2523 debug!("Resolved service is not valid: {}", dns_ptr.alias());
2524 }
2525 }
2526
2527 if instance_found {
2528 if let Some(event) = new_event {
2529 debug!("call queriers to resolve {}", dns_ptr.alias());
2530 resolved.insert(dns_ptr.alias().to_string());
2531 call_service_listener(&self.service_queriers, ty_domain, event);
2532 } else {
2533 if self.resolved.remove(dns_ptr.alias()) {
2534 removed_instances
2535 .entry(ty_domain.to_string())
2536 .or_insert_with(HashSet::new)
2537 .insert(dns_ptr.alias().to_string());
2538 }
2539 unresolved.insert(dns_ptr.alias().to_string());
2540 }
2541 }
2542 }
2543 }
2544 }
2545 }
2546
2547 for instance in resolved.drain() {
2548 self.pending_resolves.remove(&instance);
2549 self.resolved.insert(instance);
2550 }
2551
2552 for instance in unresolved.drain() {
2553 self.add_pending_resolve(instance);
2554 }
2555
2556 if !removed_instances.is_empty() {
2557 debug!(
2558 "resolve_updated_instances: removed {}",
2559 &removed_instances.len()
2560 );
2561 self.notify_service_removal(removed_instances);
2562 }
2563 }
2564
2565 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2567 let sock = if is_ipv4 {
2568 &self.ipv4_sock
2569 } else {
2570 &self.ipv6_sock
2571 };
2572 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2573
2574 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2577
2578 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2579 debug!("missing dns registry for intf {}", if_index);
2580 return;
2581 };
2582
2583 let Some(intf) = self.my_intfs.get(&if_index) else {
2584 return;
2585 };
2586
2587 for question in msg.questions().iter() {
2588 let qtype = question.entry_type();
2589
2590 if qtype == RRType::PTR {
2591 for service in self.my_services.values() {
2592 if service.get_status(if_index) != ServiceStatus::Announced {
2593 continue;
2594 }
2595
2596 if question.entry_name() == service.get_type()
2597 || service
2598 .get_subtype()
2599 .as_ref()
2600 .is_some_and(|v| v == question.entry_name())
2601 {
2602 add_answer_with_additionals(
2603 &mut out,
2604 &msg,
2605 service,
2606 intf,
2607 dns_registry,
2608 is_ipv4,
2609 );
2610 } else if question.entry_name() == META_QUERY {
2611 let ptr_added = out.add_answer(
2612 &msg,
2613 DnsPointer::new(
2614 question.entry_name(),
2615 RRType::PTR,
2616 CLASS_IN,
2617 service.get_other_ttl(),
2618 service.get_type().to_string(),
2619 ),
2620 );
2621 if !ptr_added {
2622 trace!("answer was not added for meta-query {:?}", &question);
2623 }
2624 }
2625 }
2626 } else {
2627 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2629 let probe_name = question.entry_name();
2630
2631 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2632 let now = current_time_millis();
2633
2634 if probe.start_time < now {
2638 let incoming_records: Vec<_> = msg
2639 .authorities()
2640 .iter()
2641 .filter(|r| r.get_name() == probe_name)
2642 .collect();
2643
2644 probe.tiebreaking(&incoming_records, now, probe_name);
2645 }
2646 }
2647 }
2648
2649 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2650 for service in self.my_services.values() {
2651 if service.get_status(if_index) != ServiceStatus::Announced {
2652 continue;
2653 }
2654
2655 let service_hostname =
2656 match dns_registry.name_changes.get(service.get_hostname()) {
2657 Some(new_name) => new_name,
2658 None => service.get_hostname(),
2659 };
2660
2661 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2662 let intf_addrs = if is_ipv4 {
2663 service.get_addrs_on_my_intf_v4(intf)
2664 } else {
2665 service.get_addrs_on_my_intf_v6(intf)
2666 };
2667 if intf_addrs.is_empty()
2668 && (qtype == RRType::A || qtype == RRType::AAAA)
2669 {
2670 let t = match qtype {
2671 RRType::A => "TYPE_A",
2672 RRType::AAAA => "TYPE_AAAA",
2673 _ => "invalid_type",
2674 };
2675 trace!(
2676 "Cannot find valid addrs for {} response on intf {:?}",
2677 t,
2678 &intf
2679 );
2680 return;
2681 }
2682 for address in intf_addrs {
2683 out.add_answer(
2684 &msg,
2685 DnsAddress::new(
2686 service_hostname,
2687 ip_address_rr_type(&address),
2688 CLASS_IN | CLASS_CACHE_FLUSH,
2689 service.get_host_ttl(),
2690 address,
2691 intf.into(),
2692 ),
2693 );
2694 }
2695 }
2696 }
2697 }
2698
2699 let query_name = question.entry_name().to_lowercase();
2700 let service_opt = self
2701 .my_services
2702 .iter()
2703 .find(|(k, _v)| {
2704 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2705 Some(new_name) => new_name,
2706 None => k,
2707 };
2708 service_name == &query_name
2709 })
2710 .map(|(_, v)| v);
2711
2712 let Some(service) = service_opt else {
2713 continue;
2714 };
2715
2716 if service.get_status(if_index) != ServiceStatus::Announced {
2717 continue;
2718 }
2719
2720 let intf_addrs = if is_ipv4 {
2721 service.get_addrs_on_my_intf_v4(intf)
2722 } else {
2723 service.get_addrs_on_my_intf_v6(intf)
2724 };
2725 if intf_addrs.is_empty() {
2726 debug!(
2727 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2728 &intf
2729 );
2730 continue;
2731 }
2732
2733 add_answer_of_service(
2734 &mut out,
2735 &msg,
2736 question.entry_name(),
2737 service,
2738 qtype,
2739 intf_addrs,
2740 );
2741 }
2742 }
2743
2744 if !out.answers_count() > 0 {
2745 out.set_id(msg.id());
2746 send_dns_outgoing(&out, intf, &sock.pktinfo);
2747
2748 let if_name = intf.name.clone();
2749
2750 self.increase_counter(Counter::Respond, 1);
2751 self.notify_monitors(DaemonEvent::Respond(if_name));
2752 }
2753
2754 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2755 }
2756
2757 fn increase_counter(&mut self, counter: Counter, count: i64) {
2759 let key = counter.to_string();
2760 match self.counters.get_mut(&key) {
2761 Some(v) => *v += count,
2762 None => {
2763 self.counters.insert(key, count);
2764 }
2765 }
2766 }
2767
2768 fn set_counter(&mut self, counter: Counter, count: i64) {
2770 let key = counter.to_string();
2771 self.counters.insert(key, count);
2772 }
2773
2774 fn signal_sock_drain(&self) {
2775 let mut signal_buf = [0; 1024];
2776
2777 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2779 trace!(
2780 "signal socket recvd: {}",
2781 String::from_utf8_lossy(&signal_buf[0..sz])
2782 );
2783 }
2784 }
2785
2786 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2787 self.retransmissions.push(ReRun { next_time, command });
2788 self.add_timer(next_time);
2789 }
2790
2791 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2794 for (ty_domain, sender) in self.service_queriers.iter() {
2795 if let Some(instances) = expired.get(ty_domain) {
2796 for instance_name in instances {
2797 let event = ServiceEvent::ServiceRemoved(
2798 ty_domain.to_string(),
2799 instance_name.to_string(),
2800 );
2801 match sender.send(event) {
2802 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2803 Err(e) => debug!("Failed to send event: {}", e),
2804 }
2805 }
2806 }
2807 }
2808 }
2809
2810 fn exec_command(&mut self, command: Command, repeating: bool) {
2814 trace!("exec_command: {:?} repeating: {}", &command, repeating);
2815 match command {
2816 Command::Browse(ty, next_delay, listener) => {
2817 self.exec_command_browse(repeating, ty, next_delay, listener);
2818 }
2819
2820 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2821 self.exec_command_resolve_hostname(
2822 repeating, hostname, next_delay, listener, timeout,
2823 );
2824 }
2825
2826 Command::Register(service_info) => {
2827 self.register_service(service_info);
2828 self.increase_counter(Counter::Register, 1);
2829 }
2830
2831 Command::RegisterResend(fullname, intf) => {
2832 trace!("register-resend service: {fullname} on {}", &intf);
2833 self.exec_command_register_resend(fullname, intf);
2834 }
2835
2836 Command::Unregister(fullname, resp_s) => {
2837 trace!("unregister service {} repeat {}", &fullname, &repeating);
2838 self.exec_command_unregister(repeating, fullname, resp_s);
2839 }
2840
2841 Command::UnregisterResend(packet, if_index, is_ipv4) => {
2842 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2843 }
2844
2845 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2846
2847 Command::StopResolveHostname(hostname) => {
2848 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2849 }
2850
2851 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2852
2853 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2854
2855 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2856 Ok(()) => trace!("Sent status to the client"),
2857 Err(e) => debug!("Failed to send status: {}", e),
2858 },
2859
2860 Command::Monitor(resp_s) => {
2861 self.monitors.push(resp_s);
2862 }
2863
2864 Command::SetOption(daemon_opt) => {
2865 self.process_set_option(daemon_opt);
2866 }
2867
2868 Command::GetOption(resp_s) => {
2869 let val = DaemonOptionVal {
2870 _service_name_len_max: self.service_name_len_max,
2871 ip_check_interval: self.ip_check_interval,
2872 };
2873 if let Err(e) = resp_s.send(val) {
2874 debug!("Failed to send options: {}", e);
2875 }
2876 }
2877
2878 Command::Verify(instance_fullname, timeout) => {
2879 self.exec_command_verify(instance_fullname, timeout, repeating);
2880 }
2881
2882 _ => {
2883 debug!("unexpected command: {:?}", &command);
2884 }
2885 }
2886 }
2887
2888 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2889 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2890 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2891 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2892 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2893 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2894 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2895 self.set_counter(Counter::Timer, self.timers.len() as i64);
2896
2897 let dns_registry_probe_count: usize = self
2898 .dns_registry_map
2899 .values()
2900 .map(|r| r.probing.len())
2901 .sum();
2902 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2903
2904 let dns_registry_active_count: usize = self
2905 .dns_registry_map
2906 .values()
2907 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2908 .sum();
2909 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2910
2911 let dns_registry_timer_count: usize = self
2912 .dns_registry_map
2913 .values()
2914 .map(|r| r.new_timers.len())
2915 .sum();
2916 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2917
2918 let dns_registry_name_change_count: usize = self
2919 .dns_registry_map
2920 .values()
2921 .map(|r| r.name_changes.len())
2922 .sum();
2923 self.set_counter(
2924 Counter::DnsRegistryNameChange,
2925 dns_registry_name_change_count as i64,
2926 );
2927
2928 if let Err(e) = resp_s.send(self.counters.clone()) {
2930 debug!("Failed to send metrics: {}", e);
2931 }
2932 }
2933
2934 fn exec_command_browse(
2935 &mut self,
2936 repeating: bool,
2937 ty: String,
2938 next_delay: u32,
2939 listener: Sender<ServiceEvent>,
2940 ) {
2941 let pretty_addrs: Vec<String> = self
2942 .my_intfs
2943 .iter()
2944 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
2945 .collect();
2946
2947 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2948 "{ty} on {} interfaces [{}]",
2949 pretty_addrs.len(),
2950 pretty_addrs.join(", ")
2951 ))) {
2952 debug!(
2953 "Failed to send SearchStarted({})(repeating:{}): {}",
2954 &ty, repeating, e
2955 );
2956 return;
2957 }
2958
2959 let now = current_time_millis();
2960 if !repeating {
2961 self.service_queriers.insert(ty.clone(), listener.clone());
2965
2966 self.query_cache_for_service(&ty, &listener, now);
2968 }
2969
2970 self.send_query(&ty, RRType::PTR);
2971 self.increase_counter(Counter::Browse, 1);
2972
2973 let next_time = now + (next_delay * 1000) as u64;
2974 let max_delay = 60 * 60;
2975 let delay = cmp::min(next_delay * 2, max_delay);
2976 self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2977 }
2978
2979 fn exec_command_resolve_hostname(
2980 &mut self,
2981 repeating: bool,
2982 hostname: String,
2983 next_delay: u32,
2984 listener: Sender<HostnameResolutionEvent>,
2985 timeout: Option<u64>,
2986 ) {
2987 let addr_list: Vec<_> = self.my_intfs.iter().collect();
2988 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2989 "{} on addrs {:?}",
2990 &hostname, &addr_list
2991 ))) {
2992 debug!(
2993 "Failed to send ResolveStarted({})(repeating:{}): {}",
2994 &hostname, repeating, e
2995 );
2996 return;
2997 }
2998 if !repeating {
2999 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3000 self.query_cache_for_hostname(&hostname, listener.clone());
3002 }
3003
3004 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3005 self.increase_counter(Counter::ResolveHostname, 1);
3006
3007 let now = current_time_millis();
3008 let next_time = now + u64::from(next_delay) * 1000;
3009 let max_delay = 60 * 60;
3010 let delay = cmp::min(next_delay * 2, max_delay);
3011
3012 if self
3014 .hostname_resolvers
3015 .get(&hostname)
3016 .and_then(|(_sender, timeout)| *timeout)
3017 .map(|timeout| next_time < timeout)
3018 .unwrap_or(true)
3019 {
3020 self.add_retransmission(
3021 next_time,
3022 Command::ResolveHostname(hostname, delay, listener, None),
3023 );
3024 }
3025 }
3026
3027 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3028 let pending_query = self.query_unresolved(&instance);
3029 let max_try = 3;
3030 if pending_query && try_count < max_try {
3031 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3034 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3035 }
3036 }
3037
3038 fn exec_command_unregister(
3039 &mut self,
3040 repeating: bool,
3041 fullname: String,
3042 resp_s: Sender<UnregisterStatus>,
3043 ) {
3044 let response = match self.my_services.remove_entry(&fullname) {
3045 None => {
3046 debug!("unregister: cannot find such service {}", &fullname);
3047 UnregisterStatus::NotFound
3048 }
3049 Some((_k, info)) => {
3050 let mut timers = Vec::new();
3051
3052 for (if_index, intf) in self.my_intfs.iter() {
3053 let packet = self.unregister_service(&info, intf, &self.ipv4_sock.pktinfo);
3054 if !repeating && !packet.is_empty() {
3056 let next_time = current_time_millis() + 120;
3057 self.retransmissions.push(ReRun {
3058 next_time,
3059 command: Command::UnregisterResend(packet, *if_index, true),
3060 });
3061 timers.push(next_time);
3062 }
3063
3064 let packet = self.unregister_service(&info, intf, &self.ipv6_sock.pktinfo);
3067 if !repeating && !packet.is_empty() {
3068 let next_time = current_time_millis() + 120;
3069 self.retransmissions.push(ReRun {
3070 next_time,
3071 command: Command::UnregisterResend(packet, *if_index, false),
3072 });
3073 timers.push(next_time);
3074 }
3075 }
3076
3077 for t in timers {
3078 self.add_timer(t);
3079 }
3080
3081 self.increase_counter(Counter::Unregister, 1);
3082 UnregisterStatus::OK
3083 }
3084 };
3085 if let Err(e) = resp_s.send(response) {
3086 debug!("unregister: failed to send response: {}", e);
3087 }
3088 }
3089
3090 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3091 let Some(intf) = self.my_intfs.get(&if_index) else {
3092 return;
3093 };
3094 let sock = if is_ipv4 {
3095 &self.ipv4_sock.pktinfo
3096 } else {
3097 &self.ipv6_sock.pktinfo
3098 };
3099
3100 let if_addr = if is_ipv4 {
3101 match intf.next_ifaddr_v4() {
3102 Some(addr) => addr,
3103 None => return,
3104 }
3105 } else {
3106 match intf.next_ifaddr_v6() {
3107 Some(addr) => addr,
3108 None => return,
3109 }
3110 };
3111
3112 debug!("UnregisterResend from {:?}", if_addr);
3113 multicast_on_intf(&packet[..], &intf.name, intf.index, if_addr, sock);
3114
3115 self.increase_counter(Counter::UnregisterResend, 1);
3116 }
3117
3118 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3119 match self.service_queriers.remove_entry(&ty_domain) {
3120 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3121 Some((ty, sender)) => {
3122 trace!("StopBrowse: removed queryer for {}", &ty);
3124 let mut i = 0;
3125 while i < self.retransmissions.len() {
3126 if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
3127 if t == &ty {
3128 self.retransmissions.remove(i);
3129 trace!("StopBrowse: removed retransmission for {}", &ty);
3130 continue;
3131 }
3132 }
3133 i += 1;
3134 }
3135
3136 self.cache.remove_service_type(&ty_domain);
3138
3139 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3141 Ok(()) => trace!("Sent SearchStopped to the listener"),
3142 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3143 }
3144 }
3145 }
3146 }
3147
3148 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3149 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3150 trace!("StopResolve: removed queryer for {}", &host);
3152 let mut i = 0;
3153 while i < self.retransmissions.len() {
3154 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3155 if t == &host {
3156 self.retransmissions.remove(i);
3157 trace!("StopResolve: removed retransmission for {}", &host);
3158 continue;
3159 }
3160 }
3161 i += 1;
3162 }
3163
3164 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3166 Ok(()) => trace!("Sent SearchStopped to the listener"),
3167 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3168 }
3169 }
3170 }
3171
3172 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3173 let Some(info) = self.my_services.get_mut(&fullname) else {
3174 trace!("announce: cannot find such service {}", &fullname);
3175 return;
3176 };
3177
3178 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3179 return;
3180 };
3181
3182 let Some(intf) = self.my_intfs.get(&if_index) else {
3183 return;
3184 };
3185
3186 let announced_v4 =
3187 announce_service_on_intf(dns_registry, info, intf, &self.ipv4_sock.pktinfo);
3188 let announced_v6 =
3189 announce_service_on_intf(dns_registry, info, intf, &self.ipv6_sock.pktinfo);
3190
3191 if announced_v4 || announced_v6 {
3192 let mut hostname = info.get_hostname();
3193 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3194 hostname = new_name;
3195 }
3196 let service_name = match dns_registry.name_changes.get(&fullname) {
3197 Some(new_name) => new_name.to_string(),
3198 None => fullname,
3199 };
3200
3201 debug!("resend: announce service {service_name} on {}", intf.name);
3202
3203 notify_monitors(
3204 &mut self.monitors,
3205 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3206 );
3207 info.set_status(if_index, ServiceStatus::Announced);
3208 } else {
3209 debug!("register-resend should not fail");
3210 }
3211
3212 self.increase_counter(Counter::RegisterResend, 1);
3213 }
3214
3215 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3216 let now = current_time_millis();
3226 let expire_at = if repeating {
3227 None
3228 } else {
3229 Some(now + timeout.as_millis() as u64)
3230 };
3231
3232 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3234
3235 if !record_vec.is_empty() {
3236 let query_vec: Vec<(&str, RRType)> = record_vec
3237 .iter()
3238 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3239 .collect();
3240 self.send_query_vec(&query_vec);
3241
3242 if let Some(new_expire) = expire_at {
3243 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3247 }
3248 }
3249 }
3250
3251 fn refresh_active_services(&mut self) {
3253 let mut query_ptr_count = 0;
3254 let mut query_srv_count = 0;
3255 let mut new_timers = HashSet::new();
3256 let mut query_addr_count = 0;
3257
3258 for (ty_domain, _sender) in self.service_queriers.iter() {
3259 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3260 if !refreshed_timers.is_empty() {
3261 trace!("sending refresh query for PTR: {}", ty_domain);
3262 self.send_query(ty_domain, RRType::PTR);
3263 query_ptr_count += 1;
3264 new_timers.extend(refreshed_timers);
3265 }
3266
3267 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3268 for (instance, types) in instances {
3269 trace!("sending refresh query for: {}", &instance);
3270 let query_vec = types
3271 .into_iter()
3272 .map(|ty| (instance.as_str(), ty))
3273 .collect::<Vec<_>>();
3274 self.send_query_vec(&query_vec);
3275 query_srv_count += 1;
3276 }
3277 new_timers.extend(timers);
3278 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3279 for hostname in hostnames.iter() {
3280 trace!("sending refresh queries for A and AAAA: {}", hostname);
3281 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3282 query_addr_count += 2;
3283 }
3284 new_timers.extend(timers);
3285 }
3286
3287 for timer in new_timers {
3288 self.add_timer(timer);
3289 }
3290
3291 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3292 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3293 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3294 }
3295}
3296
3297fn add_answer_of_service(
3299 out: &mut DnsOutgoing,
3300 msg: &DnsIncoming,
3301 entry_name: &str,
3302 service: &ServiceInfo,
3303 qtype: RRType,
3304 intf_addrs: Vec<IpAddr>,
3305) {
3306 if qtype == RRType::SRV || qtype == RRType::ANY {
3307 out.add_answer(
3308 msg,
3309 DnsSrv::new(
3310 entry_name,
3311 CLASS_IN | CLASS_CACHE_FLUSH,
3312 service.get_host_ttl(),
3313 service.get_priority(),
3314 service.get_weight(),
3315 service.get_port(),
3316 service.get_hostname().to_string(),
3317 ),
3318 );
3319 }
3320
3321 if qtype == RRType::TXT || qtype == RRType::ANY {
3322 out.add_answer(
3323 msg,
3324 DnsTxt::new(
3325 entry_name,
3326 CLASS_IN | CLASS_CACHE_FLUSH,
3327 service.get_other_ttl(),
3328 service.generate_txt(),
3329 ),
3330 );
3331 }
3332
3333 if qtype == RRType::SRV {
3334 for address in intf_addrs {
3335 out.add_additional_answer(DnsAddress::new(
3336 service.get_hostname(),
3337 ip_address_rr_type(&address),
3338 CLASS_IN | CLASS_CACHE_FLUSH,
3339 service.get_host_ttl(),
3340 address,
3341 InterfaceId::default(),
3342 ));
3343 }
3344 }
3345}
3346
3347#[derive(Clone, Debug)]
3350#[non_exhaustive]
3351pub enum ServiceEvent {
3352 SearchStarted(String),
3354
3355 ServiceFound(String, String),
3357
3358 ServiceResolved(Box<ResolvedService>),
3360
3361 ServiceRemoved(String, String),
3363
3364 SearchStopped(String),
3366}
3367
3368#[derive(Clone, Debug)]
3371#[non_exhaustive]
3372pub enum HostnameResolutionEvent {
3373 SearchStarted(String),
3375 AddressesFound(String, HashSet<ScopedIp>),
3377 AddressesRemoved(String, HashSet<ScopedIp>),
3379 SearchTimeout(String),
3381 SearchStopped(String),
3383}
3384
3385#[derive(Clone, Debug)]
3388#[non_exhaustive]
3389pub enum DaemonEvent {
3390 Announce(String, String),
3392
3393 Error(Error),
3395
3396 IpAdd(IpAddr),
3398
3399 IpDel(IpAddr),
3401
3402 NameChange(DnsNameChange),
3405
3406 Respond(String),
3408}
3409
3410#[derive(Clone, Debug)]
3413pub struct DnsNameChange {
3414 pub original: String,
3416
3417 pub new_name: String,
3427
3428 pub rr_type: RRType,
3430
3431 pub intf_name: String,
3433}
3434
3435#[derive(Debug)]
3437enum Command {
3438 Browse(String, u32, Sender<ServiceEvent>),
3440
3441 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
3446
3447 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3468
3469 GetStatus(Sender<DaemonStatus>),
3471
3472 Monitor(Sender<DaemonEvent>),
3474
3475 SetOption(DaemonOption),
3476
3477 GetOption(Sender<DaemonOptionVal>),
3478
3479 Verify(String, Duration),
3484
3485 Exit(Sender<DaemonStatus>),
3486}
3487
3488impl fmt::Display for Command {
3489 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3490 match self {
3491 Self::Browse(_, _, _) => write!(f, "Command Browse"),
3492 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3493 Self::Exit(_) => write!(f, "Command Exit"),
3494 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3495 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3496 Self::Monitor(_) => write!(f, "Command Monitor"),
3497 Self::Register(_) => write!(f, "Command Register"),
3498 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3499 Self::SetOption(_) => write!(f, "Command SetOption"),
3500 Self::GetOption(_) => write!(f, "Command GetOption"),
3501 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3502 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3503 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3504 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3505 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3506 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3507 }
3508 }
3509}
3510
3511struct DaemonOptionVal {
3512 _service_name_len_max: u8,
3513 ip_check_interval: u64,
3514}
3515
3516#[derive(Debug)]
3517enum DaemonOption {
3518 ServiceNameLenMax(u8),
3519 IpCheckInterval(u64),
3520 EnableInterface(Vec<IfKind>),
3521 DisableInterface(Vec<IfKind>),
3522 MulticastLoopV4(bool),
3523 MulticastLoopV6(bool),
3524 #[cfg(test)]
3525 TestDownInterface(String),
3526 #[cfg(test)]
3527 TestUpInterface(String),
3528}
3529
3530const DOMAIN_LEN: usize = "._tcp.local.".len();
3532
3533fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3535 if ty_domain.len() <= DOMAIN_LEN + 1 {
3536 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3538 }
3539
3540 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3542 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3543 }
3544 Ok(())
3545}
3546
3547fn check_domain_suffix(name: &str) -> Result<()> {
3549 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3550 return Err(e_fmt!(
3551 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3552 name
3553 ));
3554 }
3555
3556 Ok(())
3557}
3558
3559fn check_service_name(fullname: &str) -> Result<()> {
3567 check_domain_suffix(fullname)?;
3568
3569 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3570 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3571
3572 if &name[0..1] != "_" {
3573 return Err(e_fmt!("Service name must start with '_'"));
3574 }
3575
3576 let name = &name[1..];
3577
3578 if name.contains("--") {
3579 return Err(e_fmt!("Service name must not contain '--'"));
3580 }
3581
3582 if name.starts_with('-') || name.ends_with('-') {
3583 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3584 }
3585
3586 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3587 if ascii_count < 1 {
3588 return Err(e_fmt!(
3589 "Service name must contain at least one letter (eg: 'A-Za-z')"
3590 ));
3591 }
3592
3593 Ok(())
3594}
3595
3596fn check_hostname(hostname: &str) -> Result<()> {
3598 if !hostname.ends_with(".local.") {
3599 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3600 }
3601
3602 if hostname == ".local." {
3603 return Err(e_fmt!(
3604 "The part of the hostname before '.local.' cannot be empty"
3605 ));
3606 }
3607
3608 if hostname.len() > 255 {
3609 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3610 }
3611
3612 Ok(())
3613}
3614
3615fn call_service_listener(
3616 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3617 ty_domain: &str,
3618 event: ServiceEvent,
3619) {
3620 if let Some(listener) = listeners_map.get(ty_domain) {
3621 match listener.send(event) {
3622 Ok(()) => trace!("Sent event to listener successfully"),
3623 Err(e) => debug!("Failed to send event: {}", e),
3624 }
3625 }
3626}
3627
3628fn call_hostname_resolution_listener(
3629 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3630 hostname: &str,
3631 event: HostnameResolutionEvent,
3632) {
3633 let hostname_lower = hostname.to_lowercase();
3634 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3635 match listener.send(event) {
3636 Ok(()) => trace!("Sent event to listener successfully"),
3637 Err(e) => debug!("Failed to send event: {}", e),
3638 }
3639 }
3640}
3641
3642fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3646 if_addrs::get_if_addrs()
3647 .unwrap_or_default()
3648 .into_iter()
3649 .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3650 .collect()
3651}
3652
3653fn send_dns_outgoing(out: &DnsOutgoing, my_intf: &MyIntf, sock: &PktInfoUdpSocket) -> Vec<Vec<u8>> {
3654 let if_name = &my_intf.name;
3655
3656 let if_addr = if sock.domain() == Domain::IPV4 {
3657 match my_intf.next_ifaddr_v4() {
3658 Some(addr) => addr,
3659 None => return vec![],
3660 }
3661 } else {
3662 match my_intf.next_ifaddr_v6() {
3663 Some(addr) => addr,
3664 None => return vec![],
3665 }
3666 };
3667
3668 send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock)
3669}
3670
3671fn send_dns_outgoing_impl(
3673 out: &DnsOutgoing,
3674 if_name: &str,
3675 if_index: u32,
3676 if_addr: &IfAddr,
3677 sock: &PktInfoUdpSocket,
3678) -> Vec<Vec<u8>> {
3679 let qtype = if out.is_query() {
3680 "query"
3681 } else {
3682 if out.answers_count() == 0 && out.additionals().is_empty() {
3683 return vec![]; }
3685 "response"
3686 };
3687 trace!(
3688 "send {}: {} questions {} answers {} authorities {} additional",
3689 qtype,
3690 out.questions().len(),
3691 out.answers_count(),
3692 out.authorities().len(),
3693 out.additionals().len()
3694 );
3695
3696 match if_addr.ip() {
3697 IpAddr::V4(ipv4) => {
3698 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3699 debug!(
3700 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3701 ipv4, e
3702 );
3703 return vec![]; }
3705 }
3706 IpAddr::V6(ipv6) => {
3707 if let Err(e) = sock.set_multicast_if_v6(if_index) {
3708 debug!(
3709 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3710 ipv6, e
3711 );
3712 return vec![]; }
3714 }
3715 }
3716
3717 let packet_list = out.to_data_on_wire();
3718 for packet in packet_list.iter() {
3719 multicast_on_intf(packet, if_name, if_index, if_addr, sock);
3720 }
3721 packet_list
3722}
3723
3724fn multicast_on_intf(
3726 packet: &[u8],
3727 if_name: &str,
3728 if_index: u32,
3729 if_addr: &IfAddr,
3730 socket: &PktInfoUdpSocket,
3731) {
3732 if packet.len() > MAX_MSG_ABSOLUTE {
3733 debug!("Drop over-sized packet ({})", packet.len());
3734 return;
3735 }
3736
3737 let addr: SocketAddr = match if_addr {
3738 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3739 if_addrs::IfAddr::V6(_) => {
3740 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3741 sock.set_scope_id(if_index); sock.into()
3743 }
3744 };
3745
3746 let sock_addr = addr.into();
3748 match socket.send_to(packet, &sock_addr) {
3749 Ok(sz) => trace!(
3750 "sent out {} bytes on interface {} (idx {}) addr {}",
3751 sz,
3752 if_name,
3753 if_index,
3754 if_addr.ip()
3755 ),
3756 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3757 }
3758}
3759
3760fn valid_instance_name(name: &str) -> bool {
3764 name.split('.').count() >= 5
3765}
3766
3767fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3768 monitors.retain(|sender| {
3769 if let Err(e) = sender.try_send(event.clone()) {
3770 debug!("notify_monitors: try_send: {}", &e);
3771 if matches!(e, TrySendError::Disconnected(_)) {
3772 return false; }
3774 }
3775 true
3776 });
3777}
3778
3779fn prepare_announce(
3782 info: &ServiceInfo,
3783 intf: &MyIntf,
3784 dns_registry: &mut DnsRegistry,
3785 is_ipv4: bool,
3786) -> Option<DnsOutgoing> {
3787 let intf_addrs = if is_ipv4 {
3788 info.get_addrs_on_my_intf_v4(intf)
3789 } else {
3790 info.get_addrs_on_my_intf_v6(intf)
3791 };
3792
3793 if intf_addrs.is_empty() {
3794 debug!(
3795 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3796 &intf.name
3797 );
3798 return None;
3799 }
3800
3801 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3803 Some(new_name) => new_name,
3804 None => info.get_fullname(),
3805 };
3806
3807 debug!(
3808 "prepare to announce service {service_fullname} on {:?}",
3809 &intf_addrs
3810 );
3811
3812 let mut probing_count = 0;
3813 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3814 let create_time = current_time_millis() + fastrand::u64(0..250);
3815
3816 out.add_answer_at_time(
3817 DnsPointer::new(
3818 info.get_type(),
3819 RRType::PTR,
3820 CLASS_IN,
3821 info.get_other_ttl(),
3822 service_fullname.to_string(),
3823 ),
3824 0,
3825 );
3826
3827 if let Some(sub) = info.get_subtype() {
3828 trace!("Adding subdomain {}", sub);
3829 out.add_answer_at_time(
3830 DnsPointer::new(
3831 sub,
3832 RRType::PTR,
3833 CLASS_IN,
3834 info.get_other_ttl(),
3835 service_fullname.to_string(),
3836 ),
3837 0,
3838 );
3839 }
3840
3841 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3843 Some(new_name) => new_name.to_string(),
3844 None => info.get_hostname().to_string(),
3845 };
3846
3847 let mut srv = DnsSrv::new(
3848 info.get_fullname(),
3849 CLASS_IN | CLASS_CACHE_FLUSH,
3850 info.get_host_ttl(),
3851 info.get_priority(),
3852 info.get_weight(),
3853 info.get_port(),
3854 hostname,
3855 );
3856
3857 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3858 srv.get_record_mut().set_new_name(new_name.to_string());
3859 }
3860
3861 if !info.requires_probe()
3862 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3863 {
3864 out.add_answer_at_time(srv, 0);
3865 } else {
3866 probing_count += 1;
3867 }
3868
3869 let mut txt = DnsTxt::new(
3872 info.get_fullname(),
3873 CLASS_IN | CLASS_CACHE_FLUSH,
3874 info.get_other_ttl(),
3875 info.generate_txt(),
3876 );
3877
3878 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3879 txt.get_record_mut().set_new_name(new_name.to_string());
3880 }
3881
3882 if !info.requires_probe()
3883 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3884 {
3885 out.add_answer_at_time(txt, 0);
3886 } else {
3887 probing_count += 1;
3888 }
3889
3890 let hostname = info.get_hostname();
3893 for address in intf_addrs {
3894 let mut dns_addr = DnsAddress::new(
3895 hostname,
3896 ip_address_rr_type(&address),
3897 CLASS_IN | CLASS_CACHE_FLUSH,
3898 info.get_host_ttl(),
3899 address,
3900 intf.into(),
3901 );
3902
3903 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3904 dns_addr.get_record_mut().set_new_name(new_name.to_string());
3905 }
3906
3907 if !info.requires_probe()
3908 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3909 {
3910 out.add_answer_at_time(dns_addr, 0);
3911 } else {
3912 probing_count += 1;
3913 }
3914 }
3915
3916 if probing_count > 0 {
3917 return None;
3918 }
3919
3920 Some(out)
3921}
3922
3923fn announce_service_on_intf(
3926 dns_registry: &mut DnsRegistry,
3927 info: &ServiceInfo,
3928 intf: &MyIntf,
3929 sock: &PktInfoUdpSocket,
3930) -> bool {
3931 let is_ipv4 = sock.domain() == Domain::IPV4;
3932 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
3933 send_dns_outgoing(&out, intf, sock);
3934 return true;
3935 }
3936
3937 false
3938}
3939
3940fn name_change(original: &str) -> String {
3948 let mut parts: Vec<_> = original.split('.').collect();
3949 let Some(first_part) = parts.get_mut(0) else {
3950 return format!("{original} (2)");
3951 };
3952
3953 let mut new_name = format!("{first_part} (2)");
3954
3955 if let Some(paren_pos) = first_part.rfind(" (") {
3957 if let Some(end_paren) = first_part[paren_pos..].find(')') {
3959 let absolute_end_pos = paren_pos + end_paren;
3960 if absolute_end_pos == first_part.len() - 1 {
3962 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3965 let base_name = &first_part[..paren_pos];
3966 new_name = format!("{} ({})", base_name, number + 1)
3967 }
3968 }
3969 }
3970 }
3971
3972 *first_part = &new_name;
3973 parts.join(".")
3974}
3975
3976fn hostname_change(original: &str) -> String {
3984 let mut parts: Vec<_> = original.split('.').collect();
3985 let Some(first_part) = parts.get_mut(0) else {
3986 return format!("{original}-2");
3987 };
3988
3989 let mut new_name = format!("{first_part}-2");
3990
3991 if let Some(hyphen_pos) = first_part.rfind('-') {
3993 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3995 let base_name = &first_part[..hyphen_pos];
3996 new_name = format!("{}-{}", base_name, number + 1);
3997 }
3998 }
3999
4000 *first_part = &new_name;
4001 parts.join(".")
4002}
4003
4004fn add_answer_with_additionals(
4005 out: &mut DnsOutgoing,
4006 msg: &DnsIncoming,
4007 service: &ServiceInfo,
4008 intf: &MyIntf,
4009 dns_registry: &DnsRegistry,
4010 is_ipv4: bool,
4011) {
4012 let intf_addrs = if is_ipv4 {
4013 service.get_addrs_on_my_intf_v4(intf)
4014 } else {
4015 service.get_addrs_on_my_intf_v6(intf)
4016 };
4017 if intf_addrs.is_empty() {
4018 trace!("No addrs on LAN of intf {:?}", intf);
4019 return;
4020 }
4021
4022 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4024 Some(new_name) => new_name,
4025 None => service.get_fullname(),
4026 };
4027
4028 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4029 Some(new_name) => new_name,
4030 None => service.get_hostname(),
4031 };
4032
4033 let ptr_added = out.add_answer(
4034 msg,
4035 DnsPointer::new(
4036 service.get_type(),
4037 RRType::PTR,
4038 CLASS_IN,
4039 service.get_other_ttl(),
4040 service_fullname.to_string(),
4041 ),
4042 );
4043
4044 if !ptr_added {
4045 trace!("answer was not added for msg {:?}", msg);
4046 return;
4047 }
4048
4049 if let Some(sub) = service.get_subtype() {
4050 trace!("Adding subdomain {}", sub);
4051 out.add_additional_answer(DnsPointer::new(
4052 sub,
4053 RRType::PTR,
4054 CLASS_IN,
4055 service.get_other_ttl(),
4056 service_fullname.to_string(),
4057 ));
4058 }
4059
4060 out.add_additional_answer(DnsSrv::new(
4063 service_fullname,
4064 CLASS_IN | CLASS_CACHE_FLUSH,
4065 service.get_host_ttl(),
4066 service.get_priority(),
4067 service.get_weight(),
4068 service.get_port(),
4069 hostname.to_string(),
4070 ));
4071
4072 out.add_additional_answer(DnsTxt::new(
4073 service_fullname,
4074 CLASS_IN | CLASS_CACHE_FLUSH,
4075 service.get_other_ttl(),
4076 service.generate_txt(),
4077 ));
4078
4079 for address in intf_addrs {
4080 out.add_additional_answer(DnsAddress::new(
4081 hostname,
4082 ip_address_rr_type(&address),
4083 CLASS_IN | CLASS_CACHE_FLUSH,
4084 service.get_host_ttl(),
4085 address,
4086 intf.into(),
4087 ));
4088 }
4089}
4090
4091fn check_probing(
4094 dns_registry: &mut DnsRegistry,
4095 timers: &mut BinaryHeap<Reverse<u64>>,
4096 now: u64,
4097) -> (DnsOutgoing, Vec<String>) {
4098 let mut expired_probes = Vec::new();
4099 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4100
4101 for (name, probe) in dns_registry.probing.iter_mut() {
4102 if now >= probe.next_send {
4103 if probe.expired(now) {
4104 expired_probes.push(name.clone());
4106 } else {
4107 out.add_question(name, RRType::ANY);
4108
4109 for record in probe.records.iter() {
4117 out.add_authority(record.clone());
4118 }
4119
4120 probe.update_next_send(now);
4121
4122 timers.push(Reverse(probe.next_send));
4124 }
4125 }
4126 }
4127
4128 (out, expired_probes)
4129}
4130
4131fn handle_expired_probes(
4136 expired_probes: Vec<String>,
4137 intf_name: &str,
4138 dns_registry: &mut DnsRegistry,
4139 monitors: &mut Vec<Sender<DaemonEvent>>,
4140) -> HashSet<String> {
4141 let mut waiting_services = HashSet::new();
4142
4143 for name in expired_probes {
4144 let Some(probe) = dns_registry.probing.remove(&name) else {
4145 continue;
4146 };
4147
4148 for record in probe.records.iter() {
4150 if let Some(new_name) = record.get_record().get_new_name() {
4151 dns_registry
4152 .name_changes
4153 .insert(name.clone(), new_name.to_string());
4154
4155 let event = DnsNameChange {
4156 original: record.get_record().get_original_name().to_string(),
4157 new_name: new_name.to_string(),
4158 rr_type: record.get_type(),
4159 intf_name: intf_name.to_string(),
4160 };
4161 debug!("Name change event: {:?}", &event);
4162 notify_monitors(monitors, DaemonEvent::NameChange(event));
4163 }
4164 }
4165
4166 debug!(
4168 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4169 probe.records.len(),
4170 probe.waiting_services.len(),
4171 );
4172
4173 if !probe.records.is_empty() {
4175 match dns_registry.active.get_mut(&name) {
4176 Some(records) => {
4177 records.extend(probe.records);
4178 }
4179 None => {
4180 dns_registry.active.insert(name, probe.records);
4181 }
4182 }
4183
4184 waiting_services.extend(probe.waiting_services);
4185 }
4186 }
4187
4188 waiting_services
4189}
4190
4191#[cfg(test)]
4192mod tests {
4193 use super::{
4194 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4195 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4196 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
4197 MDNS_PORT,
4198 };
4199 use crate::{
4200 dns_parser::{
4201 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4202 FLAGS_AA, FLAGS_QR_RESPONSE,
4203 },
4204 service_daemon::{add_answer_of_service, check_hostname},
4205 };
4206 use std::{
4207 net::{SocketAddr, SocketAddrV4},
4208 time::{Duration, SystemTime},
4209 };
4210 use test_log::test;
4211
4212 #[test]
4213 fn test_socketaddr_print() {
4214 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
4215 let print = format!("{}", addr);
4216 assert_eq!(print, "224.0.0.251:5353");
4217 }
4218
4219 #[test]
4220 fn test_instance_name() {
4221 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4222 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4223 assert!(!valid_instance_name("_printer._tcp.local."));
4224 }
4225
4226 #[test]
4227 fn test_check_service_name_length() {
4228 let result = check_service_name_length("_tcp", 100);
4229 assert!(result.is_err());
4230 if let Err(e) = result {
4231 println!("{}", e);
4232 }
4233 }
4234
4235 #[test]
4236 fn test_check_hostname() {
4237 for hostname in &[
4239 "my_host.local.",
4240 &("A".repeat(255 - ".local.".len()) + ".local."),
4241 ] {
4242 let result = check_hostname(hostname);
4243 assert!(result.is_ok());
4244 }
4245
4246 for hostname in &[
4248 "my_host.local",
4249 ".local.",
4250 &("A".repeat(256 - ".local.".len()) + ".local."),
4251 ] {
4252 let result = check_hostname(hostname);
4253 assert!(result.is_err());
4254 if let Err(e) = result {
4255 println!("{}", e);
4256 }
4257 }
4258 }
4259
4260 #[test]
4261 fn test_check_domain_suffix() {
4262 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4263 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4264 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4265 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4266 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4267 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4268 }
4269
4270 #[test]
4271 fn test_service_with_temporarily_invalidated_ptr() {
4272 let d = ServiceDaemon::new().expect("Failed to create daemon");
4274
4275 let service = "_test_inval_ptr._udp.local.";
4276 let host_name = "my_host_tmp_invalidated_ptr.local.";
4277 let intfs: Vec<_> = my_ip_interfaces(false);
4278 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4279 let port = 5201;
4280 let my_service =
4281 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4282 .expect("invalid service info")
4283 .enable_addr_auto();
4284 let result = d.register(my_service.clone());
4285 assert!(result.is_ok());
4286
4287 let browse_chan = d.browse(service).unwrap();
4289 let timeout = Duration::from_secs(2);
4290 let mut resolved = false;
4291
4292 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4293 match event {
4294 ServiceEvent::ServiceResolved(info) => {
4295 resolved = true;
4296 println!("Resolved a service of {}", &info.fullname);
4297 break;
4298 }
4299 e => {
4300 println!("Received event {:?}", e);
4301 }
4302 }
4303 }
4304
4305 assert!(resolved);
4306
4307 println!("Stopping browse of {}", service);
4308 d.stop_browse(service).unwrap();
4311
4312 let mut stopped = false;
4317 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4318 match event {
4319 ServiceEvent::SearchStopped(_) => {
4320 stopped = true;
4321 println!("Stopped browsing service");
4322 break;
4323 }
4324 e => {
4328 println!("Received event {:?}", e);
4329 }
4330 }
4331 }
4332
4333 assert!(stopped);
4334
4335 let invalidate_ptr_packet = DnsPointer::new(
4337 my_service.get_type(),
4338 RRType::PTR,
4339 CLASS_IN,
4340 0,
4341 my_service.get_fullname().to_string(),
4342 );
4343
4344 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4345 packet_buffer.add_additional_answer(invalidate_ptr_packet);
4346
4347 for intf in intfs {
4348 let sock = _new_socket_bind(&intf, true).unwrap();
4349 send_dns_outgoing_impl(
4350 &packet_buffer,
4351 &intf.name,
4352 intf.index.unwrap_or(0),
4353 &intf.addr,
4354 &sock.pktinfo,
4355 );
4356 }
4357
4358 println!(
4359 "Sent PTR record invalidation. Starting second browse for {}",
4360 service
4361 );
4362
4363 let browse_chan = d.browse(service).unwrap();
4365
4366 resolved = false;
4367 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4368 match event {
4369 ServiceEvent::ServiceResolved(info) => {
4370 resolved = true;
4371 println!("Resolved a service of {}", &info.fullname);
4372 break;
4373 }
4374 e => {
4375 println!("Received event {:?}", e);
4376 }
4377 }
4378 }
4379
4380 assert!(resolved);
4381 d.shutdown().unwrap();
4382 }
4383
4384 #[test]
4385 fn test_expired_srv() {
4386 let service_type = "_expired-srv._udp.local.";
4388 let instance = "test_instance";
4389 let host_name = "expired_srv_host.local.";
4390 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4391 .unwrap()
4392 .enable_addr_auto();
4393 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
4398
4399 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4401 let result = mdns_server.register(my_service);
4402 assert!(result.is_ok());
4403
4404 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4405 let browse_chan = mdns_client.browse(service_type).unwrap();
4406 let timeout = Duration::from_secs(2);
4407 let mut resolved = false;
4408
4409 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4410 match event {
4411 ServiceEvent::ServiceResolved(info) => {
4412 resolved = true;
4413 println!("Resolved a service of {}", &info.fullname);
4414 break;
4415 }
4416 _ => {}
4417 }
4418 }
4419
4420 assert!(resolved);
4421
4422 mdns_server.shutdown().unwrap();
4424
4425 let expire_timeout = Duration::from_secs(new_ttl as u64);
4427 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4428 match event {
4429 ServiceEvent::ServiceRemoved(service_type, full_name) => {
4430 println!("Service removed: {}: {}", &service_type, &full_name);
4431 break;
4432 }
4433 _ => {}
4434 }
4435 }
4436 }
4437
4438 #[test]
4439 fn test_hostname_resolution_address_removed() {
4440 let server = ServiceDaemon::new().expect("Failed to create server");
4442 let hostname = "addr_remove_host._tcp.local.";
4443 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4444 .iter()
4445 .find(|iface| iface.ip().is_ipv4())
4446 .map(|iface| iface.ip().into())
4447 .unwrap();
4448
4449 let mut my_service = ServiceInfo::new(
4450 "_host_res_test._tcp.local.",
4451 "my_instance",
4452 hostname,
4453 &service_ip_addr.to_ip_addr(),
4454 1234,
4455 None,
4456 )
4457 .expect("invalid service info");
4458
4459 let addr_ttl = 2;
4461 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
4464
4465 let client = ServiceDaemon::new().expect("Failed to create client");
4467 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4468 let resolved = loop {
4469 match event_receiver.recv() {
4470 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4471 assert!(found_hostname == hostname);
4472 assert!(addresses.contains(&service_ip_addr));
4473 println!("address found: {:?}", &addresses);
4474 break true;
4475 }
4476 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4477 Ok(_event) => {}
4478 Err(_) => break false,
4479 }
4480 };
4481
4482 assert!(resolved);
4483
4484 server.shutdown().unwrap();
4486
4487 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4489 let removed = loop {
4490 match event_receiver.recv_timeout(timeout) {
4491 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4492 assert!(removed_host == hostname);
4493 assert!(addresses.contains(&service_ip_addr));
4494
4495 println!(
4496 "address removed: hostname: {} addresses: {:?}",
4497 &hostname, &addresses
4498 );
4499 break true;
4500 }
4501 Ok(_event) => {}
4502 Err(_) => {
4503 break false;
4504 }
4505 }
4506 };
4507
4508 assert!(removed);
4509
4510 client.shutdown().unwrap();
4511 }
4512
4513 #[test]
4514 fn test_refresh_ptr() {
4515 let service_type = "_refresh-ptr._udp.local.";
4517 let instance = "test_instance";
4518 let host_name = "refresh_ptr_host.local.";
4519 let service_ip_addr = my_ip_interfaces(false)
4520 .iter()
4521 .find(|iface| iface.ip().is_ipv4())
4522 .map(|iface| iface.ip())
4523 .unwrap();
4524
4525 let mut my_service = ServiceInfo::new(
4526 service_type,
4527 instance,
4528 host_name,
4529 &service_ip_addr,
4530 5023,
4531 None,
4532 )
4533 .unwrap();
4534
4535 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4537
4538 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4540 let result = mdns_server.register(my_service);
4541 assert!(result.is_ok());
4542
4543 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4544 let browse_chan = mdns_client.browse(service_type).unwrap();
4545 let timeout = Duration::from_millis(1500); let mut resolved = false;
4547
4548 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4550 match event {
4551 ServiceEvent::ServiceResolved(info) => {
4552 resolved = true;
4553 println!("Resolved a service of {}", &info.fullname);
4554 break;
4555 }
4556 _ => {}
4557 }
4558 }
4559
4560 assert!(resolved);
4561
4562 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4564 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4565 println!("event: {:?}", &event);
4566 }
4567
4568 let metrics_chan = mdns_client.get_metrics().unwrap();
4570 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4571 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4572 assert_eq!(ptr_refresh_counter, 1);
4573 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4574 assert_eq!(srvtxt_refresh_counter, 1);
4575
4576 mdns_server.shutdown().unwrap();
4578 mdns_client.shutdown().unwrap();
4579 }
4580
4581 #[test]
4582 fn test_name_change() {
4583 assert_eq!(name_change("foo.local."), "foo (2).local.");
4584 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4585 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4586 assert_eq!(name_change("foo"), "foo (2)");
4587 assert_eq!(name_change("foo (2)"), "foo (3)");
4588 assert_eq!(name_change(""), " (2)");
4589
4590 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4595
4596 #[test]
4597 fn test_hostname_change() {
4598 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4599 assert_eq!(hostname_change("foo"), "foo-2");
4600 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4601 assert_eq!(hostname_change("foo-9"), "foo-10");
4602 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4603 }
4604
4605 #[test]
4606 fn test_add_answer_txt_ttl() {
4607 let service_type = "_test_add_answer._udp.local.";
4609 let instance = "test_instance";
4610 let host_name = "add_answer_host.local.";
4611 let service_intf = my_ip_interfaces(false)
4612 .into_iter()
4613 .find(|iface| iface.ip().is_ipv4())
4614 .unwrap();
4615 let service_ip_addr = service_intf.ip();
4616 let my_service = ServiceInfo::new(
4617 service_type,
4618 instance,
4619 host_name,
4620 &service_ip_addr,
4621 5023,
4622 None,
4623 )
4624 .unwrap();
4625
4626 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4628
4629 let mut dummy_data = out.to_data_on_wire();
4631 let interface_id = InterfaceId::from(&service_intf);
4632 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4633
4634 let if_addrs = vec![service_intf.ip()];
4636 add_answer_of_service(
4637 &mut out,
4638 &incoming,
4639 instance,
4640 &my_service,
4641 RRType::TXT,
4642 if_addrs,
4643 );
4644
4645 assert!(
4647 out.answers_count() > 0,
4648 "No answers added to the outgoing message"
4649 );
4650
4651 let answer = out._answers().first().unwrap();
4653 assert_eq!(answer.0.get_type(), RRType::TXT);
4654
4655 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4657 }
4658
4659 #[test]
4660 fn test_interface_flip() {
4661 let ty_domain = "_intf-flip._udp.local.";
4663 let host_name = "intf_flip.local.";
4664 let now = SystemTime::now()
4665 .duration_since(SystemTime::UNIX_EPOCH)
4666 .unwrap();
4667 let instance_name = now.as_micros().to_string(); let port = 5200;
4669
4670 let (ip_addr1, intf_name) = my_ip_interfaces(false)
4672 .iter()
4673 .find(|iface| iface.ip().is_ipv4())
4674 .map(|iface| (iface.ip(), iface.name.clone()))
4675 .unwrap();
4676
4677 println!("Using interface {} with IP {}", intf_name, ip_addr1);
4678
4679 let service1 =
4681 ServiceInfo::new(ty_domain, &instance_name, host_name, &ip_addr1, port, None)
4682 .expect("valid service info");
4683 let server1 = ServiceDaemon::new().expect("failed to start server");
4684 server1
4685 .register(service1)
4686 .expect("Failed to register service1");
4687
4688 std::thread::sleep(Duration::from_secs(2));
4690
4691 let client = ServiceDaemon::new().expect("failed to start client");
4693
4694 let receiver = client.browse(ty_domain).unwrap();
4695
4696 let timeout = Duration::from_secs(3);
4697 let mut got_data = false;
4698
4699 while let Ok(event) = receiver.recv_timeout(timeout) {
4700 match event {
4701 ServiceEvent::ServiceResolved(_) => {
4702 println!("Received ServiceResolved event");
4703 got_data = true;
4704 break;
4705 }
4706 _ => {}
4707 }
4708 }
4709
4710 assert!(got_data, "Should receive ServiceResolved event");
4711
4712 client.set_ip_check_interval(1).unwrap();
4714
4715 println!("Shutting down interface {}", &intf_name);
4717 client.test_down_interface(&intf_name).unwrap();
4718
4719 let mut got_removed = false;
4720
4721 while let Ok(event) = receiver.recv_timeout(timeout) {
4722 match event {
4723 ServiceEvent::ServiceRemoved(ty_domain, instance) => {
4724 got_removed = true;
4725 println!("removed: {ty_domain} : {instance}");
4726 break;
4727 }
4728 _ => {}
4729 }
4730 }
4731 assert!(got_removed, "Should receive ServiceRemoved event");
4732
4733 println!("Bringing up interface {}", &intf_name);
4734 client.test_up_interface(&intf_name).unwrap();
4735 let mut got_data = false;
4736 while let Ok(event) = receiver.recv_timeout(timeout) {
4737 match event {
4738 ServiceEvent::ServiceResolved(resolved) => {
4739 got_data = true;
4740 println!("Received ServiceResolved: {:?}", resolved);
4741 break;
4742 }
4743 _ => {}
4744 }
4745 }
4746 assert!(
4747 got_data,
4748 "Should receive ServiceResolved event after interface is back up"
4749 );
4750
4751 server1.shutdown().unwrap();
4752 client.shutdown().unwrap();
4753 }
4754}