1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38 FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{
42 split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43 },
44 Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51 cmp::{self, Reverse},
52 collections::{BinaryHeap, HashMap, HashSet},
53 fmt,
54 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55 str, thread,
56 time::Duration,
57 vec,
58};
59
60pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79#[derive(Debug)]
81pub enum UnregisterStatus {
82 OK,
84 NotFound,
86}
87
88#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92 Running,
94
95 Shutdown,
97}
98
99#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103 Register,
104 RegisterResend,
105 Unregister,
106 UnregisterResend,
107 Browse,
108 ResolveHostname,
109 Respond,
110 CacheRefreshPTR,
111 CacheRefreshSRV,
112 CacheRefreshAddr,
113 KnownAnswerSuppression,
114 CachedPTR,
115 CachedSRV,
116 CachedAddr,
117 CachedTxt,
118 CachedNSec,
119 CachedSubtype,
120 DnsRegistryProbe,
121 DnsRegistryActive,
122 DnsRegistryTimer,
123 DnsRegistryNameChange,
124}
125
126impl fmt::Display for Counter {
127 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128 match self {
129 Self::Register => write!(f, "register"),
130 Self::RegisterResend => write!(f, "register-resend"),
131 Self::Unregister => write!(f, "unregister"),
132 Self::UnregisterResend => write!(f, "unregister-resend"),
133 Self::Browse => write!(f, "browse"),
134 Self::ResolveHostname => write!(f, "resolve-hostname"),
135 Self::Respond => write!(f, "respond"),
136 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
137 Self::CacheRefreshSRV => write!(f, "cache-refresh-srv"),
138 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
139 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
140 Self::CachedPTR => write!(f, "cached-ptr"),
141 Self::CachedSRV => write!(f, "cached-srv"),
142 Self::CachedAddr => write!(f, "cached-addr"),
143 Self::CachedTxt => write!(f, "cached-txt"),
144 Self::CachedNSec => write!(f, "cached-nsec"),
145 Self::CachedSubtype => write!(f, "cached-subtype"),
146 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
147 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
148 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
149 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
150 }
151 }
152}
153
154pub type Metrics = HashMap<String, i64>;
157
158const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
164pub struct ServiceDaemon {
165 sender: Sender<Command>,
167
168 signal_addr: SocketAddr,
174}
175
176impl ServiceDaemon {
177 pub fn new() -> Result<Self> {
182 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
185
186 let signal_sock = UdpSocket::bind(signal_addr)
187 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
188
189 let signal_addr = signal_sock
191 .local_addr()
192 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
193
194 signal_sock
196 .set_nonblocking(true)
197 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
198
199 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
200
201 let (sender, receiver) = bounded(100);
202
203 let mio_sock = MioUdpSocket::from_std(signal_sock);
205 thread::Builder::new()
206 .name("mDNS_daemon".to_string())
207 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
208 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
209
210 Ok(Self {
211 sender,
212 signal_addr,
213 })
214 }
215
216 fn send_cmd(&self, cmd: Command) -> Result<()> {
219 let cmd_name = cmd.to_string();
220
221 self.sender.try_send(cmd).map_err(|e| match e {
223 TrySendError::Full(_) => Error::Again,
224 e => e_fmt!("flume::channel::send failed: {}", e),
225 })?;
226
227 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
229 let socket = UdpSocket::bind(addr)
230 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
231 socket
232 .send_to(cmd_name.as_bytes(), self.signal_addr)
233 .map_err(|e| {
234 e_fmt!(
235 "signal socket send_to {} ({}) failed: {}",
236 self.signal_addr,
237 cmd_name,
238 e
239 )
240 })?;
241
242 Ok(())
243 }
244
245 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
256 check_domain_suffix(service_type)?;
257
258 let (resp_s, resp_r) = bounded(10);
259 self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
260 Ok(resp_r)
261 }
262
263 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
268 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
269 }
270
271 pub fn resolve_hostname(
279 &self,
280 hostname: &str,
281 timeout: Option<u64>,
282 ) -> Result<Receiver<HostnameResolutionEvent>> {
283 check_hostname(hostname)?;
284 let (resp_s, resp_r) = bounded(10);
285 self.send_cmd(Command::ResolveHostname(
286 hostname.to_string(),
287 1,
288 resp_s,
289 timeout,
290 ))?;
291 Ok(resp_r)
292 }
293
294 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
299 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
300 }
301
302 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
310 check_service_name(service_info.get_fullname())?;
311 check_hostname(service_info.get_hostname())?;
312
313 self.send_cmd(Command::Register(service_info))
314 }
315
316 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
324 let (resp_s, resp_r) = bounded(1);
325 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
326 Ok(resp_r)
327 }
328
329 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
333 let (resp_s, resp_r) = bounded(100);
334 self.send_cmd(Command::Monitor(resp_s))?;
335 Ok(resp_r)
336 }
337
338 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
343 let (resp_s, resp_r) = bounded(1);
344 self.send_cmd(Command::Exit(resp_s))?;
345 Ok(resp_r)
346 }
347
348 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
354 let (resp_s, resp_r) = bounded(1);
355
356 if self.sender.is_disconnected() {
357 resp_s
358 .send(DaemonStatus::Shutdown)
359 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
360 } else {
361 self.send_cmd(Command::GetStatus(resp_s))?;
362 }
363
364 Ok(resp_r)
365 }
366
367 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
372 let (resp_s, resp_r) = bounded(1);
373 self.send_cmd(Command::GetMetrics(resp_s))?;
374 Ok(resp_r)
375 }
376
377 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
384 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
387 return Err(Error::Msg(format!(
388 "service name length max {} is too large",
389 len_max
390 )));
391 }
392
393 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
394 }
395
396 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
402 let interval_in_millis = interval_in_secs as u64 * 1000;
403 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
404 interval_in_millis,
405 )))
406 }
407
408 pub fn get_ip_check_interval(&self) -> Result<u32> {
410 let (resp_s, resp_r) = bounded(1);
411 self.send_cmd(Command::GetOption(resp_s))?;
412
413 let option = resp_r
414 .recv_timeout(Duration::from_secs(10))
415 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
416 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
417 Ok(ip_check_interval_in_secs as u32)
418 }
419
420 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
427 let if_kind_vec = if_kind.into_vec();
428 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
429 if_kind_vec.kinds,
430 )))
431 }
432
433 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
440 let if_kind_vec = if_kind.into_vec();
441 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
442 if_kind_vec.kinds,
443 )))
444 }
445
446 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
462 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
463 }
464
465 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
481 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
482 }
483
484 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
497 self.send_cmd(Command::Verify(instance_fullname, timeout))
498 }
499
500 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
501 let zc = Zeroconf::new(signal_sock, poller);
502
503 if let Some(cmd) = Self::run(zc, receiver) {
504 match cmd {
505 Command::Exit(resp_s) => {
506 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
509 debug!("exit: failed to send response of shutdown: {}", e);
510 }
511 }
512 _ => {
513 debug!("Unexpected command: {:?}", cmd);
514 }
515 }
516 }
517 }
518
519 fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
528 if let Err(e) = zc.poller.registry().register(
530 &mut zc.signal_sock,
531 mio::Token(SIGNAL_SOCK_EVENT_KEY),
532 mio::Interest::READABLE,
533 ) {
534 debug!("failed to add signal socket to the poller: {}", e);
535 return None;
536 }
537
538 for (intf, sock) in zc.intf_socks.iter_mut() {
540 let key =
541 Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
542
543 if let Err(e) =
544 zc.poller
545 .registry()
546 .register(sock, mio::Token(key), mio::Interest::READABLE)
547 {
548 debug!("add socket of {:?} to poller: {e}", intf);
549 return None;
550 }
551 }
552
553 let mut next_ip_check = if zc.ip_check_interval > 0 {
555 current_time_millis() + zc.ip_check_interval
556 } else {
557 0
558 };
559
560 if next_ip_check > 0 {
561 zc.add_timer(next_ip_check);
562 }
563
564 let mut events = mio::Events::with_capacity(1024);
567 loop {
568 let now = current_time_millis();
569
570 let earliest_timer = zc.peek_earliest_timer();
571 let timeout = earliest_timer.map(|timer| {
572 let millis = if timer > now { timer - now } else { 1 };
574 Duration::from_millis(millis)
575 });
576
577 events.clear();
579 match zc.poller.poll(&mut events, timeout) {
580 Ok(_) => zc.handle_poller_events(&events),
581 Err(e) => debug!("failed to select from sockets: {}", e),
582 }
583
584 let now = current_time_millis();
585
586 zc.pop_timers_till(now);
588
589 for hostname in zc
591 .hostname_resolvers
592 .clone()
593 .into_iter()
594 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
595 .map(|(hostname, _)| hostname)
596 {
597 trace!("hostname resolver timeout for {}", &hostname);
598 call_hostname_resolution_listener(
599 &zc.hostname_resolvers,
600 &hostname,
601 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
602 );
603 call_hostname_resolution_listener(
604 &zc.hostname_resolvers,
605 &hostname,
606 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
607 );
608 zc.hostname_resolvers.remove(&hostname);
609 }
610
611 while let Ok(command) = receiver.try_recv() {
613 if matches!(command, Command::Exit(_)) {
614 zc.status = DaemonStatus::Shutdown;
615 return Some(command);
616 }
617 zc.exec_command(command, false);
618 }
619
620 let mut i = 0;
622 while i < zc.retransmissions.len() {
623 if now >= zc.retransmissions[i].next_time {
624 let rerun = zc.retransmissions.remove(i);
625 zc.exec_command(rerun.command, true);
626 } else {
627 i += 1;
628 }
629 }
630
631 zc.refresh_active_services();
633
634 let mut query_count = 0;
636 for (hostname, _sender) in zc.hostname_resolvers.iter() {
637 for (hostname, ip_addr) in
638 zc.cache.refresh_due_hostname_resolutions(hostname).iter()
639 {
640 zc.send_query(hostname, ip_address_rr_type(ip_addr));
641 query_count += 1;
642 }
643 }
644
645 zc.increase_counter(Counter::CacheRefreshAddr, query_count);
646
647 let now = current_time_millis();
649
650 let expired_services = zc.cache.evict_expired_services(now);
652 zc.notify_service_removal(expired_services);
653
654 let expired_addrs = zc.cache.evict_expired_addr(now);
656 for (hostname, addrs) in expired_addrs {
657 call_hostname_resolution_listener(
658 &zc.hostname_resolvers,
659 &hostname,
660 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
661 );
662 let instances = zc.cache.get_instances_on_host(&hostname);
663 let instance_set: HashSet<String> = instances.into_iter().collect();
664 zc.resolve_updated_instances(&instance_set);
665 }
666
667 zc.probing_handler();
669
670 if now >= next_ip_check && next_ip_check > 0 {
672 next_ip_check = now + zc.ip_check_interval;
673 zc.add_timer(next_ip_check);
674
675 zc.check_ip_changes();
676 }
677 }
678 }
679}
680
681fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
683 let intf_ip = &intf.ip();
686 match intf_ip {
687 IpAddr::V4(ip) => {
688 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
689 let sock = new_socket(addr.into(), true)?;
690
691 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
693 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
694
695 sock.set_multicast_if_v4(ip)
697 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
698
699 if !should_loop {
700 sock.set_multicast_loop_v4(false)
701 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
702 }
703
704 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
706 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
707 for packet in test_packets {
708 sock.send_to(&packet, &multicast_addr)
709 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
710 }
711 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
712 }
713 IpAddr::V6(ip) => {
714 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
715 let sock = new_socket(addr.into(), true)?;
716
717 sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
719 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
720
721 sock.set_multicast_if_v6(intf.index.unwrap_or(0))
723 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
724
725 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
730 }
731 }
732}
733
734fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
737 let domain = match addr {
738 SocketAddr::V4(_) => socket2::Domain::IPV4,
739 SocketAddr::V6(_) => socket2::Domain::IPV6,
740 };
741
742 let fd = Socket::new(domain, socket2::Type::DGRAM, None)
743 .map_err(|e| e_fmt!("create socket failed: {}", e))?;
744
745 fd.set_reuse_address(true)
746 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
747 #[cfg(unix)] fd.set_reuse_port(true)
749 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
750
751 if non_block {
752 fd.set_nonblocking(true)
753 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
754 }
755
756 fd.bind(&addr.into())
757 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
758
759 trace!("new socket bind to {}", &addr);
760 Ok(fd)
761}
762
763struct ReRun {
765 next_time: u64,
767 command: Command,
768}
769
770#[derive(Debug, Eq, Hash, PartialEq)]
772enum IpVersion {
773 V4,
774 V6,
775}
776
777#[derive(Debug, Eq, Hash, PartialEq)]
779struct MulticastSendTracker {
780 intf_index: u32,
781 ip_version: IpVersion,
782}
783
784fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
786 match intf.index {
787 Some(index) => {
788 let ip_ver = match intf.addr {
789 IfAddr::V4(_) => IpVersion::V4,
790 IfAddr::V6(_) => IpVersion::V6,
791 };
792 Some(MulticastSendTracker {
793 intf_index: index,
794 ip_version: ip_ver,
795 })
796 }
797 None => None,
798 }
799}
800
801#[derive(Debug, Clone)]
805#[non_exhaustive]
806pub enum IfKind {
807 All,
809
810 IPv4,
812
813 IPv6,
815
816 Name(String),
818
819 Addr(IpAddr),
821
822 LoopbackV4,
827
828 LoopbackV6,
830}
831
832impl IfKind {
833 fn matches(&self, intf: &Interface) -> bool {
835 match self {
836 Self::All => true,
837 Self::IPv4 => intf.ip().is_ipv4(),
838 Self::IPv6 => intf.ip().is_ipv6(),
839 Self::Name(ifname) => ifname == &intf.name,
840 Self::Addr(addr) => addr == &intf.ip(),
841 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
842 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
843 }
844 }
845}
846
847impl From<&str> for IfKind {
850 fn from(val: &str) -> Self {
851 Self::Name(val.to_string())
852 }
853}
854
855impl From<&String> for IfKind {
856 fn from(val: &String) -> Self {
857 Self::Name(val.to_string())
858 }
859}
860
861impl From<IpAddr> for IfKind {
863 fn from(val: IpAddr) -> Self {
864 Self::Addr(val)
865 }
866}
867
868pub struct IfKindVec {
870 kinds: Vec<IfKind>,
871}
872
873pub trait IntoIfKindVec {
875 fn into_vec(self) -> IfKindVec;
876}
877
878impl<T: Into<IfKind>> IntoIfKindVec for T {
879 fn into_vec(self) -> IfKindVec {
880 let if_kind: IfKind = self.into();
881 IfKindVec {
882 kinds: vec![if_kind],
883 }
884 }
885}
886
887impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
888 fn into_vec(self) -> IfKindVec {
889 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
890 IfKindVec { kinds }
891 }
892}
893
894struct IfSelection {
896 if_kind: IfKind,
898
899 selected: bool,
901}
902
903struct Zeroconf {
905 intf_socks: HashMap<Interface, MioUdpSocket>,
907
908 poll_ids: HashMap<usize, Interface>,
910
911 poll_id_count: usize,
913
914 my_services: HashMap<String, ServiceInfo>,
916
917 cache: DnsCache,
919
920 dns_registry_map: HashMap<Interface, DnsRegistry>,
922
923 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
934
935 counters: Metrics,
936
937 poller: Poll,
939
940 monitors: Vec<Sender<DaemonEvent>>,
942
943 service_name_len_max: u8,
945
946 ip_check_interval: u64,
948
949 if_selections: Vec<IfSelection>,
951
952 signal_sock: MioUdpSocket,
954
955 timers: BinaryHeap<Reverse<u64>>,
961
962 status: DaemonStatus,
963
964 pending_resolves: HashSet<String>,
966
967 resolved: HashSet<String>,
969
970 multicast_loop_v4: bool,
971
972 multicast_loop_v6: bool,
973}
974
975impl Zeroconf {
976 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
977 let my_ifaddrs = my_ip_interfaces(false);
979
980 let mut intf_socks = HashMap::new();
984 let mut dns_registry_map = HashMap::new();
985
986 for intf in my_ifaddrs {
987 let sock = match new_socket_bind(&intf, true) {
988 Ok(s) => s,
989 Err(e) => {
990 trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
991 continue;
992 }
993 };
994
995 dns_registry_map.insert(intf.clone(), DnsRegistry::new());
996
997 intf_socks.insert(intf, sock);
998 }
999
1000 let monitors = Vec::new();
1001 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1002 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1003
1004 let timers = BinaryHeap::new();
1005
1006 let if_selections = vec![
1008 IfSelection {
1009 if_kind: IfKind::LoopbackV4,
1010 selected: false,
1011 },
1012 IfSelection {
1013 if_kind: IfKind::LoopbackV6,
1014 selected: false,
1015 },
1016 ];
1017
1018 let status = DaemonStatus::Running;
1019
1020 Self {
1021 intf_socks,
1022 poll_ids: HashMap::new(),
1023 poll_id_count: 0,
1024 my_services: HashMap::new(),
1025 cache: DnsCache::new(),
1026 dns_registry_map,
1027 hostname_resolvers: HashMap::new(),
1028 service_queriers: HashMap::new(),
1029 retransmissions: Vec::new(),
1030 counters: HashMap::new(),
1031 poller,
1032 monitors,
1033 service_name_len_max,
1034 ip_check_interval,
1035 if_selections,
1036 signal_sock,
1037 timers,
1038 status,
1039 pending_resolves: HashSet::new(),
1040 resolved: HashSet::new(),
1041 multicast_loop_v4: true,
1042 multicast_loop_v6: true,
1043 }
1044 }
1045
1046 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1047 match daemon_opt {
1048 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1049 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1050 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1051 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1052 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1053 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1054 }
1055 }
1056
1057 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1058 for if_kind in kinds {
1059 self.if_selections.push(IfSelection {
1060 if_kind,
1061 selected: true,
1062 });
1063 }
1064
1065 self.apply_intf_selections(my_ip_interfaces(true));
1066 }
1067
1068 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1069 for if_kind in kinds {
1070 self.if_selections.push(IfSelection {
1071 if_kind,
1072 selected: false,
1073 });
1074 }
1075
1076 self.apply_intf_selections(my_ip_interfaces(true));
1077 }
1078
1079 fn set_multicast_loop_v4(&mut self, on: bool) {
1080 for (_, sock) in self.intf_socks.iter_mut() {
1081 if let Err(e) = sock.set_multicast_loop_v4(on) {
1082 debug!("failed to set multicast loop v4: {e}");
1083 }
1084 }
1085 }
1086
1087 fn set_multicast_loop_v6(&mut self, on: bool) {
1088 for (_, sock) in self.intf_socks.iter_mut() {
1089 if let Err(e) = sock.set_multicast_loop_v6(on) {
1090 debug!("failed to set multicast loop v6: {e}");
1091 }
1092 }
1093 }
1094
1095 fn notify_monitors(&mut self, event: DaemonEvent) {
1096 self.monitors.retain(|sender| {
1098 if let Err(e) = sender.try_send(event.clone()) {
1099 debug!("notify_monitors: try_send: {}", &e);
1100 if matches!(e, TrySendError::Disconnected(_)) {
1101 return false; }
1103 }
1104 true
1105 });
1106 }
1107
1108 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1110 for (_, service_info) in self.my_services.iter_mut() {
1111 if service_info.is_addr_auto() {
1112 service_info.remove_ipaddr(addr);
1113 }
1114 }
1115 }
1116
1117 fn add_poll(&mut self, intf: Interface) -> usize {
1119 Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1120 }
1121
1122 fn add_poll_impl(
1126 poll_ids: &mut HashMap<usize, Interface>,
1127 poll_id_count: &mut usize,
1128 intf: Interface,
1129 ) -> usize {
1130 let key = *poll_id_count;
1131 *poll_id_count += 1;
1132 let _ = (*poll_ids).insert(key, intf);
1133 key
1134 }
1135
1136 fn add_timer(&mut self, next_time: u64) {
1137 self.timers.push(Reverse(next_time));
1138 }
1139
1140 fn peek_earliest_timer(&self) -> Option<u64> {
1141 self.timers.peek().map(|Reverse(v)| *v)
1142 }
1143
1144 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1145 self.timers.pop().map(|Reverse(v)| v)
1146 }
1147
1148 fn pop_timers_till(&mut self, now: u64) {
1150 while let Some(Reverse(v)) = self.timers.peek() {
1151 if *v > now {
1152 break;
1153 }
1154 self.timers.pop();
1155 }
1156 }
1157
1158 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1160 let intf_count = interfaces.len();
1161 let mut intf_selections = vec![true; intf_count];
1162
1163 for selection in self.if_selections.iter() {
1165 for i in 0..intf_count {
1167 if selection.if_kind.matches(&interfaces[i]) {
1168 intf_selections[i] = selection.selected;
1169 }
1170 }
1171 }
1172
1173 let mut selected_addrs = HashSet::new();
1174 for i in 0..intf_count {
1175 if intf_selections[i] {
1176 selected_addrs.insert(interfaces[i].addr.ip());
1177 }
1178 }
1179
1180 selected_addrs
1181 }
1182
1183 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1188 let intf_count = interfaces.len();
1190 let mut intf_selections = vec![true; intf_count];
1191
1192 for selection in self.if_selections.iter() {
1194 for i in 0..intf_count {
1196 if selection.if_kind.matches(&interfaces[i]) {
1197 intf_selections[i] = selection.selected;
1198 }
1199 }
1200 }
1201
1202 for (idx, intf) in interfaces.into_iter().enumerate() {
1204 if intf_selections[idx] {
1205 if !self.intf_socks.contains_key(&intf) {
1207 debug!("apply_intf_selections: add {:?}", &intf.ip());
1208 self.add_new_interface(intf);
1209 }
1210 } else {
1211 if let Some(mut sock) = self.intf_socks.remove(&intf) {
1213 match self.poller.registry().deregister(&mut sock) {
1214 Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1215 Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1216 }
1217
1218 self.poll_ids.retain(|_, v| v != &intf);
1220
1221 self.cache.remove_addrs_on_disabled_intf(&intf);
1223 }
1224 }
1225 }
1226 }
1227
1228 fn check_ip_changes(&mut self) {
1230 let my_ifaddrs = my_ip_interfaces(true);
1232
1233 let poll_ids = &mut self.poll_ids;
1234 let poller = &mut self.poller;
1235 let deleted_addrs = self
1237 .intf_socks
1238 .iter_mut()
1239 .filter_map(|(intf, sock)| {
1240 if !my_ifaddrs.contains(intf) {
1241 if let Err(e) = poller.registry().deregister(sock) {
1242 debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1243 }
1244 poll_ids.retain(|_, v| v != intf);
1246 Some(intf.ip())
1247 } else {
1248 None
1249 }
1250 })
1251 .collect::<Vec<IpAddr>>();
1252
1253 for ip in deleted_addrs.iter() {
1255 self.del_addr_in_my_services(ip);
1256 self.notify_monitors(DaemonEvent::IpDel(*ip));
1257 }
1258
1259 self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1261
1262 self.apply_intf_selections(my_ifaddrs);
1264 }
1265
1266 fn add_new_interface(&mut self, intf: Interface) {
1267 let new_ip = intf.ip();
1269 let should_loop = if new_ip.is_ipv4() {
1270 self.multicast_loop_v4
1271 } else {
1272 self.multicast_loop_v6
1273 };
1274 let mut sock = match new_socket_bind(&intf, should_loop) {
1275 Ok(s) => s,
1276 Err(e) => {
1277 debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1278 return;
1279 }
1280 };
1281
1282 let key = self.add_poll(intf.clone());
1284 if let Err(e) =
1285 self.poller
1286 .registry()
1287 .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1288 {
1289 debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1290 return;
1291 }
1292
1293 debug!("add new interface {}: {new_ip}", intf.name);
1294 let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1295 Some(registry) => registry,
1296 None => self
1297 .dns_registry_map
1298 .entry(intf.clone())
1299 .or_insert_with(DnsRegistry::new),
1300 };
1301
1302 for (_, service_info) in self.my_services.iter_mut() {
1303 if service_info.is_addr_auto() {
1304 service_info.insert_ipaddr(new_ip);
1305
1306 if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1307 debug!(
1308 "Announce service {} on {}",
1309 service_info.get_fullname(),
1310 intf.ip()
1311 );
1312 service_info.set_status(&intf, ServiceStatus::Announced);
1313 } else {
1314 for timer in dns_registry.new_timers.drain(..) {
1315 self.timers.push(Reverse(timer));
1316 }
1317 service_info.set_status(&intf, ServiceStatus::Probing);
1318 }
1319 }
1320 }
1321
1322 self.intf_socks.insert(intf, sock);
1323
1324 self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1326 }
1327
1328 fn register_service(&mut self, mut info: ServiceInfo) {
1337 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1339 debug!("check_service_name_length: {}", &e);
1340 self.notify_monitors(DaemonEvent::Error(e));
1341 return;
1342 }
1343
1344 if info.is_addr_auto() {
1345 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1346 for addr in selected_addrs {
1347 info.insert_ipaddr(addr);
1348 }
1349 }
1350
1351 debug!("register service {:?}", &info);
1352
1353 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1354 if !outgoing_addrs.is_empty() {
1355 self.notify_monitors(DaemonEvent::Announce(
1356 info.get_fullname().to_string(),
1357 format!("{:?}", &outgoing_addrs),
1358 ));
1359 }
1360
1361 let service_fullname = info.get_fullname().to_lowercase();
1364 self.my_services.insert(service_fullname, info);
1365 }
1366
1367 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1370 let mut outgoing_addrs = Vec::new();
1371 let mut multicast_sent_trackers = HashSet::new();
1373
1374 let mut outgoing_intfs = Vec::new();
1375
1376 for (intf, sock) in self.intf_socks.iter() {
1377 if let Some(tracker) = multicast_send_tracker(intf) {
1378 if multicast_sent_trackers.contains(&tracker) {
1379 continue; }
1381 }
1382
1383 let dns_registry = match self.dns_registry_map.get_mut(intf) {
1384 Some(registry) => registry,
1385 None => self
1386 .dns_registry_map
1387 .entry(intf.clone())
1388 .or_insert_with(DnsRegistry::new),
1389 };
1390
1391 if announce_service_on_intf(dns_registry, info, intf, sock) {
1392 if let Some(tracker) = multicast_send_tracker(intf) {
1393 multicast_sent_trackers.insert(tracker);
1394 }
1395 outgoing_addrs.push(intf.ip());
1396 outgoing_intfs.push(intf.clone());
1397
1398 debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1399
1400 info.set_status(intf, ServiceStatus::Announced);
1401 } else {
1402 for timer in dns_registry.new_timers.drain(..) {
1403 self.timers.push(Reverse(timer));
1404 }
1405 info.set_status(intf, ServiceStatus::Probing);
1406 }
1407 }
1408
1409 let next_time = current_time_millis() + 1000;
1413 for intf in outgoing_intfs {
1414 self.add_retransmission(
1415 next_time,
1416 Command::RegisterResend(info.get_fullname().to_string(), intf),
1417 );
1418 }
1419
1420 outgoing_addrs
1421 }
1422
1423 fn probing_handler(&mut self) {
1425 let now = current_time_millis();
1426
1427 for (intf, sock) in self.intf_socks.iter() {
1428 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1429 continue;
1430 };
1431
1432 let mut expired_probe_names = Vec::new();
1433 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1434
1435 for (name, probe) in dns_registry.probing.iter_mut() {
1436 if now >= probe.next_send {
1437 if probe.expired(now) {
1438 expired_probe_names.push(name.clone());
1440 } else {
1441 out.add_question(name, RRType::ANY);
1442
1443 for record in probe.records.iter() {
1451 out.add_authority(record.clone());
1452 }
1453
1454 probe.update_next_send(now);
1455
1456 self.timers.push(Reverse(probe.next_send));
1458 }
1459 }
1460 }
1461
1462 if !out.questions().is_empty() {
1464 debug!("sending out probing of {} questions", out.questions().len());
1465 send_dns_outgoing(&out, intf, sock);
1466 }
1467
1468 let mut waiting_services = HashSet::new();
1469
1470 for name in expired_probe_names {
1471 let Some(probe) = dns_registry.probing.remove(&name) else {
1472 continue;
1473 };
1474
1475 for record in probe.records.iter() {
1477 if let Some(new_name) = record.get_record().get_new_name() {
1478 dns_registry
1479 .name_changes
1480 .insert(name.clone(), new_name.to_string());
1481
1482 let event = DnsNameChange {
1483 original: record.get_record().get_original_name().to_string(),
1484 new_name: new_name.to_string(),
1485 rr_type: record.get_type(),
1486 intf_name: intf.name.to_string(),
1487 };
1488 notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1489 }
1490 }
1491
1492 debug!(
1494 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1495 probe.records.len(),
1496 probe.waiting_services.len(),
1497 );
1498
1499 if !probe.records.is_empty() {
1501 match dns_registry.active.get_mut(&name) {
1502 Some(records) => {
1503 records.extend(probe.records);
1504 }
1505 None => {
1506 dns_registry.active.insert(name, probe.records);
1507 }
1508 }
1509
1510 waiting_services.extend(probe.waiting_services);
1511 }
1512 }
1513
1514 for service_name in waiting_services {
1516 debug!(
1517 "try to announce service {service_name} on intf {}",
1518 intf.ip()
1519 );
1520 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1522 if info.get_status(intf) == ServiceStatus::Announced {
1523 debug!("service {} already announced", info.get_fullname());
1524 continue;
1525 }
1526
1527 if announce_service_on_intf(dns_registry, info, intf, sock) {
1528 let next_time = now + 1000;
1529 let command =
1530 Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1531 self.retransmissions.push(ReRun { next_time, command });
1532 self.timers.push(Reverse(next_time));
1533
1534 let fullname = match dns_registry.name_changes.get(&service_name) {
1535 Some(new_name) => new_name.to_string(),
1536 None => service_name.to_string(),
1537 };
1538
1539 let mut hostname = info.get_hostname();
1540 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1541 hostname = new_name;
1542 }
1543
1544 debug!("wake up: announce service {} on {}", fullname, intf.ip());
1545 notify_monitors(
1546 &mut self.monitors,
1547 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1548 );
1549
1550 info.set_status(intf, ServiceStatus::Announced);
1551 }
1552 }
1553 }
1554 }
1555 }
1556
1557 fn unregister_service(
1558 &self,
1559 info: &ServiceInfo,
1560 intf: &Interface,
1561 sock: &MioUdpSocket,
1562 ) -> Vec<u8> {
1563 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1564 out.add_answer_at_time(
1565 DnsPointer::new(
1566 info.get_type(),
1567 RRType::PTR,
1568 CLASS_IN,
1569 0,
1570 info.get_fullname().to_string(),
1571 ),
1572 0,
1573 );
1574
1575 if let Some(sub) = info.get_subtype() {
1576 trace!("Adding subdomain {}", sub);
1577 out.add_answer_at_time(
1578 DnsPointer::new(
1579 sub,
1580 RRType::PTR,
1581 CLASS_IN,
1582 0,
1583 info.get_fullname().to_string(),
1584 ),
1585 0,
1586 );
1587 }
1588
1589 out.add_answer_at_time(
1590 DnsSrv::new(
1591 info.get_fullname(),
1592 CLASS_IN | CLASS_CACHE_FLUSH,
1593 0,
1594 info.get_priority(),
1595 info.get_weight(),
1596 info.get_port(),
1597 info.get_hostname().to_string(),
1598 ),
1599 0,
1600 );
1601 out.add_answer_at_time(
1602 DnsTxt::new(
1603 info.get_fullname(),
1604 CLASS_IN | CLASS_CACHE_FLUSH,
1605 0,
1606 info.generate_txt(),
1607 ),
1608 0,
1609 );
1610
1611 for address in info.get_addrs_on_intf(intf) {
1612 out.add_answer_at_time(
1613 DnsAddress::new(
1614 info.get_hostname(),
1615 ip_address_rr_type(&address),
1616 CLASS_IN | CLASS_CACHE_FLUSH,
1617 0,
1618 address,
1619 ),
1620 0,
1621 );
1622 }
1623
1624 send_dns_outgoing(&out, intf, sock).remove(0)
1626 }
1627
1628 fn add_hostname_resolver(
1632 &mut self,
1633 hostname: String,
1634 listener: Sender<HostnameResolutionEvent>,
1635 timeout: Option<u64>,
1636 ) {
1637 let real_timeout = timeout.map(|t| current_time_millis() + t);
1638 self.hostname_resolvers
1639 .insert(hostname.to_lowercase(), (listener, real_timeout));
1640 if let Some(t) = real_timeout {
1641 self.add_timer(t);
1642 }
1643 }
1644
1645 fn send_query(&self, name: &str, qtype: RRType) {
1647 self.send_query_vec(&[(name, qtype)]);
1648 }
1649
1650 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1652 trace!("Sending query questions: {:?}", questions);
1653 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1654 let now = current_time_millis();
1655
1656 for (name, qtype) in questions {
1657 out.add_question(name, *qtype);
1658
1659 for record in self.cache.get_known_answers(name, *qtype, now) {
1660 trace!("add known answer: {:?}", record);
1668 let mut new_record = record.clone();
1669 new_record.get_record_mut().update_ttl(now);
1670 out.add_answer_box(new_record);
1671 }
1672 }
1673
1674 let mut multicast_sent_trackers = HashSet::new();
1676 for (intf, sock) in self.intf_socks.iter() {
1677 if let Some(tracker) = multicast_send_tracker(intf) {
1678 if multicast_sent_trackers.contains(&tracker) {
1679 continue; }
1681 multicast_sent_trackers.insert(tracker);
1682 }
1683 send_dns_outgoing(&out, intf, sock);
1684 }
1685 }
1686
1687 fn handle_read(&mut self, intf: &Interface) -> bool {
1692 let sock = match self.intf_socks.get_mut(intf) {
1693 Some(if_sock) => if_sock,
1694 None => return false,
1695 };
1696 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1697
1698 let sz = match sock.recv(&mut buf) {
1705 Ok(sz) => sz,
1706 Err(e) => {
1707 if e.kind() != std::io::ErrorKind::WouldBlock {
1708 debug!("listening socket read failed: {}", e);
1709 }
1710 return false;
1711 }
1712 };
1713
1714 trace!("received {} bytes at IP: {}", sz, intf.ip());
1715
1716 if sz == 0 {
1718 debug!("socket {:?} was likely shutdown", &sock);
1719 if let Err(e) = self.poller.registry().deregister(sock) {
1720 debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1721 }
1722
1723 let should_loop = if intf.ip().is_ipv4() {
1725 self.multicast_loop_v4
1726 } else {
1727 self.multicast_loop_v6
1728 };
1729 match new_socket_bind(intf, should_loop) {
1730 Ok(new_sock) => {
1731 trace!("reset socket for IP {}", intf.ip());
1732 self.intf_socks.insert(intf.clone(), new_sock);
1733 }
1734 Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1735 }
1736
1737 return false;
1738 }
1739
1740 buf.truncate(sz); match DnsIncoming::new(buf) {
1743 Ok(msg) => {
1744 if msg.is_query() {
1745 self.handle_query(msg, intf);
1746 } else if msg.is_response() {
1747 self.handle_response(msg, intf);
1748 } else {
1749 debug!("Invalid message: not query and not response");
1750 }
1751 }
1752 Err(e) => debug!("Invalid incoming DNS message: {}", e),
1753 }
1754
1755 true
1756 }
1757
1758 fn query_unresolved(&mut self, instance: &str) -> bool {
1760 if !valid_instance_name(instance) {
1761 trace!("instance name {} not valid", instance);
1762 return false;
1763 }
1764
1765 if let Some(records) = self.cache.get_srv(instance) {
1766 for record in records {
1767 if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1768 if self.cache.get_addr(srv.host()).is_none() {
1769 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1770 return true;
1771 }
1772 }
1773 }
1774 } else {
1775 self.send_query(instance, RRType::ANY);
1776 return true;
1777 }
1778
1779 false
1780 }
1781
1782 fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1785 let mut resolved: HashSet<String> = HashSet::new();
1786 let mut unresolved: HashSet<String> = HashSet::new();
1787
1788 if let Some(records) = self.cache.get_ptr(ty_domain) {
1789 for record in records.iter() {
1790 if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1791 let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1792 Ok(ok) => ok,
1793 Err(err) => {
1794 debug!("Error while creating service info from cache: {}", err);
1795 continue;
1796 }
1797 };
1798
1799 match sender.send(ServiceEvent::ServiceFound(
1800 ty_domain.to_string(),
1801 ptr.alias().to_string(),
1802 )) {
1803 Ok(()) => debug!("send service found {}", ptr.alias()),
1804 Err(e) => {
1805 debug!("failed to send service found: {}", e);
1806 continue;
1807 }
1808 }
1809
1810 if info.is_ready() {
1811 resolved.insert(ptr.alias().to_string());
1812 match sender.send(ServiceEvent::ServiceResolved(info)) {
1813 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1814 Err(e) => debug!("failed to send service resolved: {}", e),
1815 }
1816 } else {
1817 unresolved.insert(ptr.alias().to_string());
1818 }
1819 }
1820 }
1821 }
1822
1823 for instance in resolved.drain() {
1824 self.pending_resolves.remove(&instance);
1825 self.resolved.insert(instance);
1826 }
1827
1828 for instance in unresolved.drain() {
1829 self.add_pending_resolve(instance);
1830 }
1831 }
1832
1833 fn query_cache_for_hostname(
1836 &mut self,
1837 hostname: &str,
1838 sender: Sender<HostnameResolutionEvent>,
1839 ) {
1840 let addresses_map = self.cache.get_addresses_for_host(hostname);
1841 for (name, addresses) in addresses_map {
1842 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1843 Ok(()) => trace!("sent hostname addresses found"),
1844 Err(e) => debug!("failed to send hostname addresses found: {}", e),
1845 }
1846 }
1847 }
1848
1849 fn add_pending_resolve(&mut self, instance: String) {
1850 if !self.pending_resolves.contains(&instance) {
1851 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1852 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1853 self.pending_resolves.insert(instance);
1854 }
1855 }
1856
1857 fn create_service_info_from_cache(
1858 &self,
1859 ty_domain: &str,
1860 fullname: &str,
1861 ) -> Result<ServiceInfo> {
1862 let my_name = {
1863 let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1864 name.strip_suffix('.').unwrap_or(name).to_string()
1865 };
1866
1867 let now = current_time_millis();
1868 let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1869
1870 if let Some(subtype) = self.cache.get_subtype(fullname) {
1872 trace!(
1873 "ty_domain: {} found subtype {} for instance: {}",
1874 ty_domain,
1875 subtype,
1876 fullname
1877 );
1878 if info.get_subtype().is_none() {
1879 info.set_subtype(subtype.clone());
1880 }
1881 }
1882
1883 if let Some(records) = self.cache.get_srv(fullname) {
1885 if let Some(answer) = records.first() {
1886 if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1887 info.set_hostname(dns_srv.host().to_string());
1888 info.set_port(dns_srv.port());
1889 }
1890 }
1891 }
1892
1893 if let Some(records) = self.cache.get_txt(fullname) {
1895 if let Some(record) = records.first() {
1896 if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1897 info.set_properties_from_txt(dns_txt.text());
1898 }
1899 }
1900 }
1901
1902 if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1904 for answer in records.iter() {
1905 if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1906 if dns_a.get_record().is_expired(now) {
1907 trace!("Addr expired: {}", dns_a.address());
1908 } else {
1909 info.insert_ipaddr(dns_a.address());
1910 }
1911 }
1912 }
1913 }
1914
1915 Ok(info)
1916 }
1917
1918 fn handle_poller_events(&mut self, events: &mio::Events) {
1919 for ev in events.iter() {
1920 trace!("event received with key {:?}", ev.token());
1921 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1922 self.signal_sock_drain();
1924
1925 if let Err(e) = self.poller.registry().reregister(
1926 &mut self.signal_sock,
1927 ev.token(),
1928 mio::Interest::READABLE,
1929 ) {
1930 debug!("failed to modify poller for signal socket: {}", e);
1931 }
1932 continue; }
1934
1935 let intf = match self.poll_ids.get(&ev.token().0) {
1937 Some(interface) => interface.clone(),
1938 None => {
1939 debug!("Ip for event key {} not found", ev.token().0);
1940 break;
1941 }
1942 };
1943 while self.handle_read(&intf) {}
1944
1945 if let Some(sock) = self.intf_socks.get_mut(&intf) {
1947 if let Err(e) =
1948 self.poller
1949 .registry()
1950 .reregister(sock, ev.token(), mio::Interest::READABLE)
1951 {
1952 debug!("modify poller for interface {:?}: {}", &intf, e);
1953 break;
1954 }
1955 }
1956 }
1957 }
1958
1959 fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1962 trace!(
1963 "handle_response: {} answers {} authorities {} additionals",
1964 msg.answers().len(),
1965 &msg.authorities().len(),
1966 &msg.num_additionals()
1967 );
1968 let now = current_time_millis();
1969
1970 let mut record_predicate = |record: &DnsRecordBox| {
1972 if !record.get_record().is_expired(now) {
1973 return true;
1974 }
1975
1976 debug!("record is expired, removing it from cache.");
1977 if self.cache.remove(record) {
1978 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1980 call_service_listener(
1981 &self.service_queriers,
1982 dns_ptr.get_name(),
1983 ServiceEvent::ServiceRemoved(
1984 dns_ptr.get_name().to_string(),
1985 dns_ptr.alias().to_string(),
1986 ),
1987 );
1988 }
1989 }
1990 false
1991 };
1992 msg.answers_mut().retain(&mut record_predicate);
1993 msg.authorities_mut().retain(&mut record_predicate);
1994 msg.additionals_mut().retain(&mut record_predicate);
1995
1996 self.conflict_handler(&msg, intf);
1998
1999 let mut is_for_us = true; for answer in msg.answers() {
2006 if answer.get_type() == RRType::PTR {
2007 if self.service_queriers.contains_key(answer.get_name()) {
2008 is_for_us = true;
2009 break; } else {
2011 is_for_us = false;
2012 }
2013 }
2014 }
2015
2016 struct InstanceChange {
2018 ty: RRType, name: String, }
2021
2022 let mut changes = Vec::new();
2030 let mut timers = Vec::new();
2031 for record in msg.all_records() {
2032 match self
2033 .cache
2034 .add_or_update(intf, record, &mut timers, is_for_us)
2035 {
2036 Some((dns_record, true)) => {
2037 timers.push(dns_record.get_record().get_expire_time());
2038 timers.push(dns_record.get_record().get_refresh_time());
2039
2040 let ty = dns_record.get_type();
2041 let name = dns_record.get_name();
2042 if ty == RRType::PTR {
2043 if self.service_queriers.contains_key(name) {
2044 timers.push(dns_record.get_record().get_refresh_time());
2045 }
2046
2047 if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2049 call_service_listener(
2050 &self.service_queriers,
2051 name,
2052 ServiceEvent::ServiceFound(
2053 name.to_string(),
2054 dns_ptr.alias().to_string(),
2055 ),
2056 );
2057 changes.push(InstanceChange {
2058 ty,
2059 name: dns_ptr.alias().to_string(),
2060 });
2061 }
2062 } else {
2063 changes.push(InstanceChange {
2064 ty,
2065 name: name.to_string(),
2066 });
2067 }
2068 }
2069 Some((dns_record, false)) => {
2070 timers.push(dns_record.get_record().get_expire_time());
2071 timers.push(dns_record.get_record().get_refresh_time());
2072 }
2073 _ => {}
2074 }
2075 }
2076
2077 for t in timers {
2079 self.add_timer(t);
2080 }
2081
2082 for change in changes
2084 .iter()
2085 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2086 {
2087 let addr_map = self.cache.get_addresses_for_host(&change.name);
2088 for (name, addresses) in addr_map {
2089 call_hostname_resolution_listener(
2090 &self.hostname_resolvers,
2091 &change.name,
2092 HostnameResolutionEvent::AddressesFound(name, addresses),
2093 )
2094 }
2095 }
2096
2097 let mut updated_instances = HashSet::new();
2099 for update in changes {
2100 match update.ty {
2101 RRType::PTR | RRType::SRV | RRType::TXT => {
2102 updated_instances.insert(update.name);
2103 }
2104 RRType::A | RRType::AAAA => {
2105 let instances = self.cache.get_instances_on_host(&update.name);
2106 updated_instances.extend(instances);
2107 }
2108 _ => {}
2109 }
2110 }
2111
2112 self.resolve_updated_instances(&updated_instances);
2113 }
2114
2115 fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2116 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2117 return;
2118 };
2119
2120 for answer in msg.answers().iter() {
2121 let mut new_records = Vec::new();
2122
2123 let name = answer.get_name();
2124 let Some(probe) = dns_registry.probing.get_mut(name) else {
2125 continue;
2126 };
2127
2128 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2130 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2131 if !valid_ip_on_intf(&answer_addr.address(), intf) {
2132 debug!(
2133 "conflict handler: answer addr {:?} not in the subnet of {:?}",
2134 answer_addr, intf
2135 );
2136 continue;
2137 }
2138 }
2139
2140 let any_match = probe.records.iter().any(|r| {
2143 r.get_type() == answer.get_type()
2144 && r.get_class() == answer.get_class()
2145 && r.rrdata_match(answer.as_ref())
2146 });
2147 if any_match {
2148 continue; }
2150 }
2151
2152 probe.records.retain(|record| {
2153 if record.get_type() == answer.get_type()
2154 && record.get_class() == answer.get_class()
2155 && !record.rrdata_match(answer.as_ref())
2156 {
2157 debug!(
2158 "found conflict name: '{name}' record: {}: {} PEER: {}",
2159 record.get_type(),
2160 record.rdata_print(),
2161 answer.rdata_print()
2162 );
2163
2164 let mut new_record = record.clone();
2167 let new_name = match record.get_type() {
2168 RRType::A => hostname_change(name),
2169 RRType::AAAA => hostname_change(name),
2170 _ => name_change(name),
2171 };
2172 new_record.get_record_mut().set_new_name(new_name);
2173 new_records.push(new_record);
2174 return false; }
2176
2177 true
2178 });
2179
2180 let create_time = current_time_millis() + fastrand::u64(0..250);
2187
2188 let waiting_services = probe.waiting_services.clone();
2189
2190 for record in new_records {
2191 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2192 self.timers.push(Reverse(create_time));
2193 }
2194
2195 dns_registry.name_changes.insert(
2197 record.get_record().get_original_name().to_string(),
2198 record.get_name().to_string(),
2199 );
2200
2201 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2202 Some(p) => p,
2203 None => {
2204 let new_probe = dns_registry
2205 .probing
2206 .entry(record.get_name().to_string())
2207 .or_insert_with(|| {
2208 debug!("conflict handler: new probe of {}", record.get_name());
2209 Probe::new(create_time)
2210 });
2211 self.timers.push(Reverse(new_probe.next_send));
2212 new_probe
2213 }
2214 };
2215
2216 debug!(
2217 "insert record with new name '{}' {} into probe",
2218 record.get_name(),
2219 record.get_type()
2220 );
2221 new_probe.insert_record(record);
2222
2223 new_probe.waiting_services.extend(waiting_services.clone());
2224 }
2225 }
2226 }
2227
2228 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2235 let mut resolved: HashSet<String> = HashSet::new();
2236 let mut unresolved: HashSet<String> = HashSet::new();
2237 let mut removed_instances = HashMap::new();
2238
2239 for (ty_domain, records) in self.cache.all_ptr().iter() {
2240 if !self.service_queriers.contains_key(ty_domain) {
2241 continue;
2243 }
2244
2245 for record in records.iter() {
2246 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2247 if updated_instances.contains(dns_ptr.alias()) {
2248 if let Ok(info) =
2249 self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2250 {
2251 if info.is_ready() {
2252 debug!("call queriers to resolve {}", dns_ptr.alias());
2253 resolved.insert(dns_ptr.alias().to_string());
2254 call_service_listener(
2255 &self.service_queriers,
2256 ty_domain,
2257 ServiceEvent::ServiceResolved(info),
2258 );
2259 } else {
2260 if self.resolved.remove(dns_ptr.alias()) {
2261 removed_instances
2262 .entry(ty_domain.to_string())
2263 .or_insert_with(HashSet::new)
2264 .insert(dns_ptr.alias().to_string());
2265 }
2266 unresolved.insert(dns_ptr.alias().to_string());
2267 }
2268 }
2269 }
2270 }
2271 }
2272 }
2273
2274 for instance in resolved.drain() {
2275 self.pending_resolves.remove(&instance);
2276 self.resolved.insert(instance);
2277 }
2278
2279 for instance in unresolved.drain() {
2280 self.add_pending_resolve(instance);
2281 }
2282
2283 self.notify_service_removal(removed_instances);
2284 }
2285
2286 fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2288 let sock = match self.intf_socks.get(intf) {
2289 Some(sock) => sock,
2290 None => return,
2291 };
2292 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2293
2294 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2297
2298 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2299 debug!("missing dns registry for intf {}", intf.ip());
2300 return;
2301 };
2302
2303 for question in msg.questions().iter() {
2304 trace!("query question: {:?}", &question);
2305
2306 let qtype = question.entry_type();
2307
2308 if qtype == RRType::PTR {
2309 for service in self.my_services.values() {
2310 if service.get_status(intf) != ServiceStatus::Announced {
2311 continue;
2312 }
2313
2314 if question.entry_name() == service.get_type()
2315 || service
2316 .get_subtype()
2317 .as_ref()
2318 .is_some_and(|v| v == question.entry_name())
2319 {
2320 add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2321 } else if question.entry_name() == META_QUERY {
2322 let ptr_added = out.add_answer(
2323 &msg,
2324 DnsPointer::new(
2325 question.entry_name(),
2326 RRType::PTR,
2327 CLASS_IN,
2328 service.get_other_ttl(),
2329 service.get_type().to_string(),
2330 ),
2331 );
2332 if !ptr_added {
2333 trace!("answer was not added for meta-query {:?}", &question);
2334 }
2335 }
2336 }
2337 } else {
2338 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2340 let probe_name = question.entry_name();
2341
2342 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2343 let now = current_time_millis();
2344
2345 if probe.start_time < now {
2349 let incoming_records: Vec<_> = msg
2350 .authorities()
2351 .iter()
2352 .filter(|r| r.get_name() == probe_name)
2353 .collect();
2354
2355 probe.tiebreaking(&incoming_records, now, probe_name);
2356 }
2357 }
2358 }
2359
2360 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2361 for service in self.my_services.values() {
2362 if service.get_status(intf) != ServiceStatus::Announced {
2363 continue;
2364 }
2365
2366 let service_hostname =
2367 match dns_registry.name_changes.get(service.get_hostname()) {
2368 Some(new_name) => new_name,
2369 None => service.get_hostname(),
2370 };
2371
2372 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2373 let intf_addrs = service.get_addrs_on_intf(intf);
2374 if intf_addrs.is_empty()
2375 && (qtype == RRType::A || qtype == RRType::AAAA)
2376 {
2377 let t = match qtype {
2378 RRType::A => "TYPE_A",
2379 RRType::AAAA => "TYPE_AAAA",
2380 _ => "invalid_type",
2381 };
2382 trace!(
2383 "Cannot find valid addrs for {} response on intf {:?}",
2384 t,
2385 &intf
2386 );
2387 return;
2388 }
2389 for address in intf_addrs {
2390 out.add_answer(
2391 &msg,
2392 DnsAddress::new(
2393 service_hostname,
2394 ip_address_rr_type(&address),
2395 CLASS_IN | CLASS_CACHE_FLUSH,
2396 service.get_host_ttl(),
2397 address,
2398 ),
2399 );
2400 }
2401 }
2402 }
2403 }
2404
2405 let query_name = question.entry_name().to_lowercase();
2406 let service_opt = self
2407 .my_services
2408 .iter()
2409 .find(|(k, _v)| {
2410 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2411 Some(new_name) => new_name,
2412 None => k,
2413 };
2414 service_name == &query_name
2415 })
2416 .map(|(_, v)| v);
2417
2418 let Some(service) = service_opt else {
2419 continue;
2420 };
2421
2422 if service.get_status(intf) != ServiceStatus::Announced {
2423 continue;
2424 }
2425
2426 if qtype == RRType::SRV || qtype == RRType::ANY {
2427 out.add_answer(
2428 &msg,
2429 DnsSrv::new(
2430 question.entry_name(),
2431 CLASS_IN | CLASS_CACHE_FLUSH,
2432 service.get_host_ttl(),
2433 service.get_priority(),
2434 service.get_weight(),
2435 service.get_port(),
2436 service.get_hostname().to_string(),
2437 ),
2438 );
2439 }
2440
2441 if qtype == RRType::TXT || qtype == RRType::ANY {
2442 out.add_answer(
2443 &msg,
2444 DnsTxt::new(
2445 question.entry_name(),
2446 CLASS_IN | CLASS_CACHE_FLUSH,
2447 service.get_host_ttl(),
2448 service.generate_txt(),
2449 ),
2450 );
2451 }
2452
2453 if qtype == RRType::SRV {
2454 let intf_addrs = service.get_addrs_on_intf(intf);
2455 if intf_addrs.is_empty() {
2456 debug!(
2457 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2458 &intf
2459 );
2460 return;
2461 }
2462 for address in intf_addrs {
2463 out.add_additional_answer(DnsAddress::new(
2464 service.get_hostname(),
2465 ip_address_rr_type(&address),
2466 CLASS_IN | CLASS_CACHE_FLUSH,
2467 service.get_host_ttl(),
2468 address,
2469 ));
2470 }
2471 }
2472 }
2473 }
2474
2475 if !out.answers_count() > 0 {
2476 out.set_id(msg.id());
2477 send_dns_outgoing(&out, intf, sock);
2478
2479 self.increase_counter(Counter::Respond, 1);
2480 self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2481 }
2482
2483 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2484 }
2485
2486 fn increase_counter(&mut self, counter: Counter, count: i64) {
2488 let key = counter.to_string();
2489 match self.counters.get_mut(&key) {
2490 Some(v) => *v += count,
2491 None => {
2492 self.counters.insert(key, count);
2493 }
2494 }
2495 }
2496
2497 fn set_counter(&mut self, counter: Counter, count: i64) {
2499 let key = counter.to_string();
2500 self.counters.insert(key, count);
2501 }
2502
2503 fn signal_sock_drain(&self) {
2504 let mut signal_buf = [0; 1024];
2505
2506 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2508 trace!(
2509 "signal socket recvd: {}",
2510 String::from_utf8_lossy(&signal_buf[0..sz])
2511 );
2512 }
2513 }
2514
2515 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2516 self.retransmissions.push(ReRun { next_time, command });
2517 self.add_timer(next_time);
2518 }
2519
2520 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2522 for (ty_domain, sender) in self.service_queriers.iter() {
2523 if let Some(instances) = expired.get(ty_domain) {
2524 for instance_name in instances {
2525 let event = ServiceEvent::ServiceRemoved(
2526 ty_domain.to_string(),
2527 instance_name.to_string(),
2528 );
2529 match sender.send(event) {
2530 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2531 Err(e) => debug!("Failed to send event: {}", e),
2532 }
2533 }
2534 }
2535 }
2536 }
2537
2538 fn exec_command(&mut self, command: Command, repeating: bool) {
2542 match command {
2543 Command::Browse(ty, next_delay, listener) => {
2544 self.exec_command_browse(repeating, ty, next_delay, listener);
2545 }
2546
2547 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2548 self.exec_command_resolve_hostname(
2549 repeating, hostname, next_delay, listener, timeout,
2550 );
2551 }
2552
2553 Command::Register(service_info) => {
2554 self.register_service(service_info);
2555 self.increase_counter(Counter::Register, 1);
2556 }
2557
2558 Command::RegisterResend(fullname, intf) => {
2559 trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2560 self.exec_command_register_resend(fullname, intf);
2561 }
2562
2563 Command::Unregister(fullname, resp_s) => {
2564 trace!("unregister service {} repeat {}", &fullname, &repeating);
2565 self.exec_command_unregister(repeating, fullname, resp_s);
2566 }
2567
2568 Command::UnregisterResend(packet, ip) => {
2569 self.exec_command_unregister_resend(packet, ip);
2570 }
2571
2572 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2573
2574 Command::StopResolveHostname(hostname) => {
2575 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2576 }
2577
2578 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2579
2580 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2581
2582 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2583 Ok(()) => trace!("Sent status to the client"),
2584 Err(e) => debug!("Failed to send status: {}", e),
2585 },
2586
2587 Command::Monitor(resp_s) => {
2588 self.monitors.push(resp_s);
2589 }
2590
2591 Command::SetOption(daemon_opt) => {
2592 self.process_set_option(daemon_opt);
2593 }
2594
2595 Command::GetOption(resp_s) => {
2596 let val = DaemonOptionVal {
2597 _service_name_len_max: self.service_name_len_max,
2598 ip_check_interval: self.ip_check_interval,
2599 };
2600 if let Err(e) = resp_s.send(val) {
2601 debug!("Failed to send options: {}", e);
2602 }
2603 }
2604
2605 Command::Verify(instance_fullname, timeout) => {
2606 self.exec_command_verify(instance_fullname, timeout, repeating);
2607 }
2608
2609 _ => {
2610 debug!("unexpected command: {:?}", &command);
2611 }
2612 }
2613 }
2614
2615 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2616 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2617 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2618 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2619 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2620 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2621 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2622
2623 let dns_registry_probe_count: usize = self
2624 .dns_registry_map
2625 .values()
2626 .map(|r| r.probing.len())
2627 .sum();
2628 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2629
2630 let dns_registry_active_count: usize = self
2631 .dns_registry_map
2632 .values()
2633 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2634 .sum();
2635 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2636
2637 let dns_registry_timer_count: usize = self
2638 .dns_registry_map
2639 .values()
2640 .map(|r| r.new_timers.len())
2641 .sum();
2642 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2643
2644 let dns_registry_name_change_count: usize = self
2645 .dns_registry_map
2646 .values()
2647 .map(|r| r.name_changes.len())
2648 .sum();
2649 self.set_counter(
2650 Counter::DnsRegistryNameChange,
2651 dns_registry_name_change_count as i64,
2652 );
2653
2654 if let Err(e) = resp_s.send(self.counters.clone()) {
2656 debug!("Failed to send metrics: {}", e);
2657 }
2658 }
2659
2660 fn exec_command_browse(
2661 &mut self,
2662 repeating: bool,
2663 ty: String,
2664 next_delay: u32,
2665 listener: Sender<ServiceEvent>,
2666 ) {
2667 let pretty_addrs: Vec<String> = self
2668 .intf_socks
2669 .keys()
2670 .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2671 .collect();
2672
2673 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2674 "{ty} on {} interfaces [{}]",
2675 pretty_addrs.len(),
2676 pretty_addrs.join(", ")
2677 ))) {
2678 debug!(
2679 "Failed to send SearchStarted({})(repeating:{}): {}",
2680 &ty, repeating, e
2681 );
2682 return;
2683 }
2684 if !repeating {
2685 self.service_queriers.insert(ty.clone(), listener.clone());
2689
2690 self.query_cache_for_service(&ty, &listener);
2692 }
2693
2694 self.send_query(&ty, RRType::PTR);
2695 self.increase_counter(Counter::Browse, 1);
2696
2697 let next_time = current_time_millis() + (next_delay * 1000) as u64;
2698 let max_delay = 60 * 60;
2699 let delay = cmp::min(next_delay * 2, max_delay);
2700 self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2701 }
2702
2703 fn exec_command_resolve_hostname(
2704 &mut self,
2705 repeating: bool,
2706 hostname: String,
2707 next_delay: u32,
2708 listener: Sender<HostnameResolutionEvent>,
2709 timeout: Option<u64>,
2710 ) {
2711 let addr_list: Vec<_> = self.intf_socks.keys().collect();
2712 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2713 "{} on addrs {:?}",
2714 &hostname, &addr_list
2715 ))) {
2716 debug!(
2717 "Failed to send ResolveStarted({})(repeating:{}): {}",
2718 &hostname, repeating, e
2719 );
2720 return;
2721 }
2722 if !repeating {
2723 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2724 self.query_cache_for_hostname(&hostname, listener.clone());
2726 }
2727
2728 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2729 self.increase_counter(Counter::ResolveHostname, 1);
2730
2731 let now = current_time_millis();
2732 let next_time = now + u64::from(next_delay) * 1000;
2733 let max_delay = 60 * 60;
2734 let delay = cmp::min(next_delay * 2, max_delay);
2735
2736 if self
2738 .hostname_resolvers
2739 .get(&hostname)
2740 .and_then(|(_sender, timeout)| *timeout)
2741 .map(|timeout| next_time < timeout)
2742 .unwrap_or(true)
2743 {
2744 self.add_retransmission(
2745 next_time,
2746 Command::ResolveHostname(hostname, delay, listener, None),
2747 );
2748 }
2749 }
2750
2751 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2752 let pending_query = self.query_unresolved(&instance);
2753 let max_try = 3;
2754 if pending_query && try_count < max_try {
2755 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2758 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2759 }
2760 }
2761
2762 fn exec_command_unregister(
2763 &mut self,
2764 repeating: bool,
2765 fullname: String,
2766 resp_s: Sender<UnregisterStatus>,
2767 ) {
2768 let response = match self.my_services.remove_entry(&fullname) {
2769 None => {
2770 debug!("unregister: cannot find such service {}", &fullname);
2771 UnregisterStatus::NotFound
2772 }
2773 Some((_k, info)) => {
2774 let mut timers = Vec::new();
2775 let mut multicast_sent_trackers = HashSet::new();
2777
2778 for (intf, sock) in self.intf_socks.iter() {
2779 if let Some(tracker) = multicast_send_tracker(intf) {
2780 if multicast_sent_trackers.contains(&tracker) {
2781 continue; }
2783 multicast_sent_trackers.insert(tracker);
2784 }
2785 let packet = self.unregister_service(&info, intf, sock);
2786 if !repeating && !packet.is_empty() {
2788 let next_time = current_time_millis() + 120;
2789 self.retransmissions.push(ReRun {
2790 next_time,
2791 command: Command::UnregisterResend(packet, intf.clone()),
2792 });
2793 timers.push(next_time);
2794 }
2795 }
2796
2797 for t in timers {
2798 self.add_timer(t);
2799 }
2800
2801 self.increase_counter(Counter::Unregister, 1);
2802 UnregisterStatus::OK
2803 }
2804 };
2805 if let Err(e) = resp_s.send(response) {
2806 debug!("unregister: failed to send response: {}", e);
2807 }
2808 }
2809
2810 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2811 if let Some(sock) = self.intf_socks.get(&intf) {
2812 debug!("UnregisterResend from {}", &intf.ip());
2813 multicast_on_intf(&packet[..], &intf, sock);
2814 self.increase_counter(Counter::UnregisterResend, 1);
2815 }
2816 }
2817
2818 fn exec_command_stop_browse(&mut self, ty_domain: String) {
2819 match self.service_queriers.remove_entry(&ty_domain) {
2820 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2821 Some((ty, sender)) => {
2822 trace!("StopBrowse: removed queryer for {}", &ty);
2824 let mut i = 0;
2825 while i < self.retransmissions.len() {
2826 if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2827 if t == &ty {
2828 self.retransmissions.remove(i);
2829 trace!("StopBrowse: removed retransmission for {}", &ty);
2830 continue;
2831 }
2832 }
2833 i += 1;
2834 }
2835
2836 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2838 Ok(()) => trace!("Sent SearchStopped to the listener"),
2839 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2840 }
2841 }
2842 }
2843 }
2844
2845 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2846 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2847 trace!("StopResolve: removed queryer for {}", &host);
2849 let mut i = 0;
2850 while i < self.retransmissions.len() {
2851 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2852 if t == &host {
2853 self.retransmissions.remove(i);
2854 trace!("StopResolve: removed retransmission for {}", &host);
2855 continue;
2856 }
2857 }
2858 i += 1;
2859 }
2860
2861 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2863 Ok(()) => trace!("Sent SearchStopped to the listener"),
2864 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2865 }
2866 }
2867 }
2868
2869 fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2870 let Some(info) = self.my_services.get_mut(&fullname) else {
2871 trace!("announce: cannot find such service {}", &fullname);
2872 return;
2873 };
2874
2875 let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2876 return;
2877 };
2878
2879 let Some(sock) = self.intf_socks.get(&intf) else {
2880 return;
2881 };
2882
2883 if announce_service_on_intf(dns_registry, info, &intf, sock) {
2884 let mut hostname = info.get_hostname();
2885 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2886 hostname = new_name;
2887 }
2888 let service_name = match dns_registry.name_changes.get(&fullname) {
2889 Some(new_name) => new_name.to_string(),
2890 None => fullname,
2891 };
2892
2893 debug!("resend: announce service {} on {}", service_name, intf.ip());
2894
2895 notify_monitors(
2896 &mut self.monitors,
2897 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2898 );
2899 info.set_status(&intf, ServiceStatus::Announced);
2900 } else {
2901 debug!("register-resend should not fail");
2902 }
2903
2904 self.increase_counter(Counter::RegisterResend, 1);
2905 }
2906
2907 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2908 let now = current_time_millis();
2918 let expire_at = if repeating {
2919 None
2920 } else {
2921 Some(now + timeout.as_millis() as u64)
2922 };
2923
2924 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2926
2927 if !record_vec.is_empty() {
2928 let query_vec: Vec<(&str, RRType)> = record_vec
2929 .iter()
2930 .map(|(record, rr_type)| (record.as_str(), *rr_type))
2931 .collect();
2932 self.send_query_vec(&query_vec);
2933
2934 if let Some(new_expire) = expire_at {
2935 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2939 }
2940 }
2941 }
2942
2943 fn refresh_active_services(&mut self) {
2945 let mut query_ptr_count = 0;
2946 let mut query_srv_count = 0;
2947 let mut new_timers = HashSet::new();
2948 let mut query_addr_count = 0;
2949
2950 for (ty_domain, _sender) in self.service_queriers.iter() {
2951 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2952 if !refreshed_timers.is_empty() {
2953 trace!("sending refresh query for PTR: {}", ty_domain);
2954 self.send_query(ty_domain, RRType::PTR);
2955 query_ptr_count += 1;
2956 new_timers.extend(refreshed_timers);
2957 }
2958
2959 let (instances, timers) = self.cache.refresh_due_srv(ty_domain);
2960 for instance in instances.iter() {
2961 trace!("sending refresh query for SRV: {}", instance);
2962 self.send_query(instance, RRType::SRV);
2963 query_srv_count += 1;
2964 }
2965 new_timers.extend(timers);
2966 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2967 for hostname in hostnames.iter() {
2968 trace!("sending refresh queries for A and AAAA: {}", hostname);
2969 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2970 query_addr_count += 2;
2971 }
2972 new_timers.extend(timers);
2973 }
2974
2975 for timer in new_timers {
2976 self.add_timer(timer);
2977 }
2978
2979 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2980 self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
2981 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2982 }
2983}
2984
2985#[derive(Debug)]
2988pub enum ServiceEvent {
2989 SearchStarted(String),
2991
2992 ServiceFound(String, String),
2994
2995 ServiceResolved(ServiceInfo),
2997
2998 ServiceRemoved(String, String),
3000
3001 SearchStopped(String),
3003}
3004
3005#[derive(Debug)]
3008#[non_exhaustive]
3009pub enum HostnameResolutionEvent {
3010 SearchStarted(String),
3012 AddressesFound(String, HashSet<IpAddr>),
3014 AddressesRemoved(String, HashSet<IpAddr>),
3016 SearchTimeout(String),
3018 SearchStopped(String),
3020}
3021
3022#[derive(Clone, Debug)]
3025#[non_exhaustive]
3026pub enum DaemonEvent {
3027 Announce(String, String),
3029
3030 Error(Error),
3032
3033 IpAdd(IpAddr),
3035
3036 IpDel(IpAddr),
3038
3039 NameChange(DnsNameChange),
3042
3043 Respond(IpAddr),
3045}
3046
3047#[derive(Clone, Debug)]
3050pub struct DnsNameChange {
3051 pub original: String,
3053
3054 pub new_name: String,
3064
3065 pub rr_type: RRType,
3067
3068 pub intf_name: String,
3070}
3071
3072#[derive(Debug)]
3074enum Command {
3075 Browse(String, u32, Sender<ServiceEvent>),
3077
3078 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
3083
3084 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, Interface), UnregisterResend(Vec<u8>, Interface), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3105
3106 GetStatus(Sender<DaemonStatus>),
3108
3109 Monitor(Sender<DaemonEvent>),
3111
3112 SetOption(DaemonOption),
3113
3114 GetOption(Sender<DaemonOptionVal>),
3115
3116 Verify(String, Duration),
3121
3122 Exit(Sender<DaemonStatus>),
3123}
3124
3125impl fmt::Display for Command {
3126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3127 match self {
3128 Self::Browse(_, _, _) => write!(f, "Command Browse"),
3129 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3130 Self::Exit(_) => write!(f, "Command Exit"),
3131 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3132 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3133 Self::Monitor(_) => write!(f, "Command Monitor"),
3134 Self::Register(_) => write!(f, "Command Register"),
3135 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3136 Self::SetOption(_) => write!(f, "Command SetOption"),
3137 Self::GetOption(_) => write!(f, "Command GetOption"),
3138 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3139 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3140 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3141 Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3142 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3143 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3144 }
3145 }
3146}
3147
3148struct DaemonOptionVal {
3149 _service_name_len_max: u8,
3150 ip_check_interval: u64,
3151}
3152
3153#[derive(Debug)]
3154enum DaemonOption {
3155 ServiceNameLenMax(u8),
3156 IpCheckInterval(u64),
3157 EnableInterface(Vec<IfKind>),
3158 DisableInterface(Vec<IfKind>),
3159 MulticastLoopV4(bool),
3160 MulticastLoopV6(bool),
3161}
3162
3163const DOMAIN_LEN: usize = "._tcp.local.".len();
3165
3166fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3168 if ty_domain.len() <= DOMAIN_LEN + 1 {
3169 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3171 }
3172
3173 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3175 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3176 }
3177 Ok(())
3178}
3179
3180fn check_domain_suffix(name: &str) -> Result<()> {
3182 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3183 return Err(e_fmt!(
3184 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3185 name
3186 ));
3187 }
3188
3189 Ok(())
3190}
3191
3192fn check_service_name(fullname: &str) -> Result<()> {
3200 check_domain_suffix(fullname)?;
3201
3202 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3203 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3204
3205 if &name[0..1] != "_" {
3206 return Err(e_fmt!("Service name must start with '_'"));
3207 }
3208
3209 let name = &name[1..];
3210
3211 if name.contains("--") {
3212 return Err(e_fmt!("Service name must not contain '--'"));
3213 }
3214
3215 if name.starts_with('-') || name.ends_with('-') {
3216 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3217 }
3218
3219 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3220 if ascii_count < 1 {
3221 return Err(e_fmt!(
3222 "Service name must contain at least one letter (eg: 'A-Za-z')"
3223 ));
3224 }
3225
3226 Ok(())
3227}
3228
3229fn check_hostname(hostname: &str) -> Result<()> {
3231 if !hostname.ends_with(".local.") {
3232 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3233 }
3234
3235 if hostname == ".local." {
3236 return Err(e_fmt!(
3237 "The part of the hostname before '.local.' cannot be empty"
3238 ));
3239 }
3240
3241 if hostname.len() > 255 {
3242 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3243 }
3244
3245 Ok(())
3246}
3247
3248fn call_service_listener(
3249 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3250 ty_domain: &str,
3251 event: ServiceEvent,
3252) {
3253 if let Some(listener) = listeners_map.get(ty_domain) {
3254 match listener.send(event) {
3255 Ok(()) => trace!("Sent event to listener successfully"),
3256 Err(e) => debug!("Failed to send event: {}", e),
3257 }
3258 }
3259}
3260
3261fn call_hostname_resolution_listener(
3262 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3263 hostname: &str,
3264 event: HostnameResolutionEvent,
3265) {
3266 let hostname_lower = hostname.to_lowercase();
3267 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3268 match listener.send(event) {
3269 Ok(()) => trace!("Sent event to listener successfully"),
3270 Err(e) => debug!("Failed to send event: {}", e),
3271 }
3272 }
3273}
3274
3275fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3278 if_addrs::get_if_addrs()
3279 .unwrap_or_default()
3280 .into_iter()
3281 .filter(|i| !i.is_loopback() || with_loopback)
3282 .collect()
3283}
3284
3285fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3287 let qtype = if out.is_query() { "query" } else { "response" };
3288 trace!(
3289 "send outgoing {}: {} questions {} answers {} authorities {} additional",
3290 qtype,
3291 out.questions().len(),
3292 out.answers_count(),
3293 out.authorities().len(),
3294 out.additionals().len()
3295 );
3296 let packet_list = out.to_data_on_wire();
3297 for packet in packet_list.iter() {
3298 multicast_on_intf(packet, intf, sock);
3299 }
3300 packet_list
3301}
3302
3303fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3305 if packet.len() > MAX_MSG_ABSOLUTE {
3306 debug!("Drop over-sized packet ({})", packet.len());
3307 return;
3308 }
3309
3310 let addr: SocketAddr = match intf.addr {
3311 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3312 if_addrs::IfAddr::V6(_) => {
3313 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3314 sock.set_scope_id(intf.index.unwrap_or(0)); sock.into()
3316 }
3317 };
3318
3319 send_packet(packet, addr, intf, socket);
3320}
3321
3322fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3324 match sock.send_to(packet, addr) {
3325 Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3326 Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3327 }
3328}
3329
3330fn valid_instance_name(name: &str) -> bool {
3334 name.split('.').count() >= 5
3335}
3336
3337fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3338 monitors.retain(|sender| {
3339 if let Err(e) = sender.try_send(event.clone()) {
3340 debug!("notify_monitors: try_send: {}", &e);
3341 if matches!(e, TrySendError::Disconnected(_)) {
3342 return false; }
3344 }
3345 true
3346 });
3347}
3348
3349fn prepare_announce(
3352 info: &ServiceInfo,
3353 intf: &Interface,
3354 dns_registry: &mut DnsRegistry,
3355) -> Option<DnsOutgoing> {
3356 let intf_addrs = info.get_addrs_on_intf(intf);
3357 if intf_addrs.is_empty() {
3358 trace!("No valid addrs to add on intf {:?}", &intf);
3359 return None;
3360 }
3361
3362 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3364 Some(new_name) => new_name,
3365 None => info.get_fullname(),
3366 };
3367
3368 debug!(
3369 "prepare to announce service {service_fullname} on {}: {}",
3370 &intf.name,
3371 &intf.ip()
3372 );
3373
3374 let mut probing_count = 0;
3375 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3376 let create_time = current_time_millis() + fastrand::u64(0..250);
3377
3378 out.add_answer_at_time(
3379 DnsPointer::new(
3380 info.get_type(),
3381 RRType::PTR,
3382 CLASS_IN,
3383 info.get_other_ttl(),
3384 service_fullname.to_string(),
3385 ),
3386 0,
3387 );
3388
3389 if let Some(sub) = info.get_subtype() {
3390 trace!("Adding subdomain {}", sub);
3391 out.add_answer_at_time(
3392 DnsPointer::new(
3393 sub,
3394 RRType::PTR,
3395 CLASS_IN,
3396 info.get_other_ttl(),
3397 service_fullname.to_string(),
3398 ),
3399 0,
3400 );
3401 }
3402
3403 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3405 Some(new_name) => new_name.to_string(),
3406 None => info.get_hostname().to_string(),
3407 };
3408
3409 let mut srv = DnsSrv::new(
3410 info.get_fullname(),
3411 CLASS_IN | CLASS_CACHE_FLUSH,
3412 info.get_host_ttl(),
3413 info.get_priority(),
3414 info.get_weight(),
3415 info.get_port(),
3416 hostname,
3417 );
3418
3419 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3420 srv.get_record_mut().set_new_name(new_name.to_string());
3421 }
3422
3423 if !info.requires_probe()
3424 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3425 {
3426 out.add_answer_at_time(srv, 0);
3427 } else {
3428 probing_count += 1;
3429 }
3430
3431 let mut txt = DnsTxt::new(
3434 info.get_fullname(),
3435 CLASS_IN | CLASS_CACHE_FLUSH,
3436 info.get_other_ttl(),
3437 info.generate_txt(),
3438 );
3439
3440 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3441 txt.get_record_mut().set_new_name(new_name.to_string());
3442 }
3443
3444 if !info.requires_probe()
3445 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3446 {
3447 out.add_answer_at_time(txt, 0);
3448 } else {
3449 probing_count += 1;
3450 }
3451
3452 let hostname = info.get_hostname();
3455 for address in intf_addrs {
3456 let mut dns_addr = DnsAddress::new(
3457 hostname,
3458 ip_address_rr_type(&address),
3459 CLASS_IN | CLASS_CACHE_FLUSH,
3460 info.get_host_ttl(),
3461 address,
3462 );
3463
3464 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3465 dns_addr.get_record_mut().set_new_name(new_name.to_string());
3466 }
3467
3468 if !info.requires_probe()
3469 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3470 {
3471 out.add_answer_at_time(dns_addr, 0);
3472 } else {
3473 probing_count += 1;
3474 }
3475 }
3476
3477 if probing_count > 0 {
3478 return None;
3479 }
3480
3481 Some(out)
3482}
3483
3484fn announce_service_on_intf(
3487 dns_registry: &mut DnsRegistry,
3488 info: &ServiceInfo,
3489 intf: &Interface,
3490 sock: &MioUdpSocket,
3491) -> bool {
3492 if let Some(out) = prepare_announce(info, intf, dns_registry) {
3493 send_dns_outgoing(&out, intf, sock);
3494 return true;
3495 }
3496 false
3497}
3498
3499fn name_change(original: &str) -> String {
3507 let mut parts: Vec<_> = original.split('.').collect();
3508 let Some(first_part) = parts.get_mut(0) else {
3509 return format!("{original} (2)");
3510 };
3511
3512 let mut new_name = format!("{} (2)", first_part);
3513
3514 if let Some(paren_pos) = first_part.rfind(" (") {
3516 if let Some(end_paren) = first_part[paren_pos..].find(')') {
3518 let absolute_end_pos = paren_pos + end_paren;
3519 if absolute_end_pos == first_part.len() - 1 {
3521 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3524 let base_name = &first_part[..paren_pos];
3525 new_name = format!("{} ({})", base_name, number + 1)
3526 }
3527 }
3528 }
3529 }
3530
3531 *first_part = &new_name;
3532 parts.join(".")
3533}
3534
3535fn hostname_change(original: &str) -> String {
3543 let mut parts: Vec<_> = original.split('.').collect();
3544 let Some(first_part) = parts.get_mut(0) else {
3545 return format!("{original}-2");
3546 };
3547
3548 let mut new_name = format!("{}-2", first_part);
3549
3550 if let Some(hyphen_pos) = first_part.rfind('-') {
3552 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3554 let base_name = &first_part[..hyphen_pos];
3555 new_name = format!("{}-{}", base_name, number + 1);
3556 }
3557 }
3558
3559 *first_part = &new_name;
3560 parts.join(".")
3561}
3562
3563fn add_answer_with_additionals(
3564 out: &mut DnsOutgoing,
3565 msg: &DnsIncoming,
3566 service: &ServiceInfo,
3567 intf: &Interface,
3568 dns_registry: &DnsRegistry,
3569) {
3570 let intf_addrs = service.get_addrs_on_intf(intf);
3571 if intf_addrs.is_empty() {
3572 trace!("No addrs on LAN of intf {:?}", intf);
3573 return;
3574 }
3575
3576 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3578 Some(new_name) => new_name,
3579 None => service.get_fullname(),
3580 };
3581
3582 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3583 Some(new_name) => new_name,
3584 None => service.get_hostname(),
3585 };
3586
3587 let ptr_added = out.add_answer(
3588 msg,
3589 DnsPointer::new(
3590 service.get_type(),
3591 RRType::PTR,
3592 CLASS_IN,
3593 service.get_other_ttl(),
3594 service_fullname.to_string(),
3595 ),
3596 );
3597
3598 if !ptr_added {
3599 trace!("answer was not added for msg {:?}", msg);
3600 return;
3601 }
3602
3603 if let Some(sub) = service.get_subtype() {
3604 trace!("Adding subdomain {}", sub);
3605 out.add_additional_answer(DnsPointer::new(
3606 sub,
3607 RRType::PTR,
3608 CLASS_IN,
3609 service.get_other_ttl(),
3610 service_fullname.to_string(),
3611 ));
3612 }
3613
3614 out.add_additional_answer(DnsSrv::new(
3617 service_fullname,
3618 CLASS_IN | CLASS_CACHE_FLUSH,
3619 service.get_host_ttl(),
3620 service.get_priority(),
3621 service.get_weight(),
3622 service.get_port(),
3623 hostname.to_string(),
3624 ));
3625
3626 out.add_additional_answer(DnsTxt::new(
3627 service_fullname,
3628 CLASS_IN | CLASS_CACHE_FLUSH,
3629 service.get_host_ttl(),
3630 service.generate_txt(),
3631 ));
3632
3633 for address in intf_addrs {
3634 out.add_additional_answer(DnsAddress::new(
3635 hostname,
3636 ip_address_rr_type(&address),
3637 CLASS_IN | CLASS_CACHE_FLUSH,
3638 service.get_host_ttl(),
3639 address,
3640 ));
3641 }
3642}
3643
3644#[cfg(test)]
3645mod tests {
3646 use super::{
3647 check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3648 name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3649 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3650 MDNS_PORT,
3651 };
3652 use crate::{
3653 dns_parser::{DnsOutgoing, DnsPointer, RRType, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE},
3654 service_daemon::check_hostname,
3655 };
3656 use std::{
3657 net::{SocketAddr, SocketAddrV4},
3658 time::Duration,
3659 };
3660 use test_log::test;
3661
3662 #[test]
3663 fn test_socketaddr_print() {
3664 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3665 let print = format!("{}", addr);
3666 assert_eq!(print, "224.0.0.251:5353");
3667 }
3668
3669 #[test]
3670 fn test_instance_name() {
3671 assert!(valid_instance_name("my-laser._printer._tcp.local."));
3672 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3673 assert!(!valid_instance_name("_printer._tcp.local."));
3674 }
3675
3676 #[test]
3677 fn test_check_service_name_length() {
3678 let result = check_service_name_length("_tcp", 100);
3679 assert!(result.is_err());
3680 if let Err(e) = result {
3681 println!("{}", e);
3682 }
3683 }
3684
3685 #[test]
3686 fn test_check_hostname() {
3687 for hostname in &[
3689 "my_host.local.",
3690 &("A".repeat(255 - ".local.".len()) + ".local."),
3691 ] {
3692 let result = check_hostname(hostname);
3693 assert!(result.is_ok());
3694 }
3695
3696 for hostname in &[
3698 "my_host.local",
3699 ".local.",
3700 &("A".repeat(256 - ".local.".len()) + ".local."),
3701 ] {
3702 let result = check_hostname(hostname);
3703 assert!(result.is_err());
3704 if let Err(e) = result {
3705 println!("{}", e);
3706 }
3707 }
3708 }
3709
3710 #[test]
3711 fn test_check_domain_suffix() {
3712 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3713 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3714 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3715 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3716 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3717 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3718 }
3719
3720 #[test]
3721 fn test_service_with_temporarily_invalidated_ptr() {
3722 let d = ServiceDaemon::new().expect("Failed to create daemon");
3724
3725 let service = "_test_inval_ptr._udp.local.";
3726 let host_name = "my_host_tmp_invalidated_ptr.local.";
3727 let intfs: Vec<_> = my_ip_interfaces(false);
3728 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3729 let port = 5201;
3730 let my_service =
3731 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3732 .expect("invalid service info")
3733 .enable_addr_auto();
3734 let result = d.register(my_service.clone());
3735 assert!(result.is_ok());
3736
3737 let browse_chan = d.browse(service).unwrap();
3739 let timeout = Duration::from_secs(2);
3740 let mut resolved = false;
3741
3742 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3743 match event {
3744 ServiceEvent::ServiceResolved(info) => {
3745 resolved = true;
3746 println!("Resolved a service of {}", &info.get_fullname());
3747 break;
3748 }
3749 e => {
3750 println!("Received event {:?}", e);
3751 }
3752 }
3753 }
3754
3755 assert!(resolved);
3756
3757 println!("Stopping browse of {}", service);
3758 d.stop_browse(service).unwrap();
3761
3762 let mut stopped = false;
3767 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3768 match event {
3769 ServiceEvent::SearchStopped(_) => {
3770 stopped = true;
3771 println!("Stopped browsing service");
3772 break;
3773 }
3774 e => {
3778 println!("Received event {:?}", e);
3779 }
3780 }
3781 }
3782
3783 assert!(stopped);
3784
3785 let invalidate_ptr_packet = DnsPointer::new(
3787 my_service.get_type(),
3788 RRType::PTR,
3789 CLASS_IN,
3790 0,
3791 my_service.get_fullname().to_string(),
3792 );
3793
3794 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3795 packet_buffer.add_additional_answer(invalidate_ptr_packet);
3796
3797 for intf in intfs {
3798 let sock = new_socket_bind(&intf, true).unwrap();
3799 send_dns_outgoing(&packet_buffer, &intf, &sock);
3800 }
3801
3802 println!(
3803 "Sent PTR record invalidation. Starting second browse for {}",
3804 service
3805 );
3806
3807 let browse_chan = d.browse(service).unwrap();
3809
3810 resolved = false;
3811 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3812 match event {
3813 ServiceEvent::ServiceResolved(info) => {
3814 resolved = true;
3815 println!("Resolved a service of {}", &info.get_fullname());
3816 break;
3817 }
3818 e => {
3819 println!("Received event {:?}", e);
3820 }
3821 }
3822 }
3823
3824 assert!(resolved);
3825 d.shutdown().unwrap();
3826 }
3827
3828 #[test]
3829 fn test_expired_srv() {
3830 let service_type = "_expired-srv._udp.local.";
3832 let instance = "test_instance";
3833 let host_name = "expired_srv_host.local.";
3834 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3835 .unwrap()
3836 .enable_addr_auto();
3837 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
3842
3843 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3845 let result = mdns_server.register(my_service);
3846 assert!(result.is_ok());
3847
3848 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3849 let browse_chan = mdns_client.browse(service_type).unwrap();
3850 let timeout = Duration::from_secs(2);
3851 let mut resolved = false;
3852
3853 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3854 match event {
3855 ServiceEvent::ServiceResolved(info) => {
3856 resolved = true;
3857 println!("Resolved a service of {}", &info.get_fullname());
3858 break;
3859 }
3860 _ => {}
3861 }
3862 }
3863
3864 assert!(resolved);
3865
3866 mdns_server.shutdown().unwrap();
3868
3869 let expire_timeout = Duration::from_secs(new_ttl as u64);
3871 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3872 match event {
3873 ServiceEvent::ServiceRemoved(service_type, full_name) => {
3874 println!("Service removed: {}: {}", &service_type, &full_name);
3875 break;
3876 }
3877 _ => {}
3878 }
3879 }
3880 }
3881
3882 #[test]
3883 fn test_hostname_resolution_address_removed() {
3884 let server = ServiceDaemon::new().expect("Failed to create server");
3886 let hostname = "addr_remove_host._tcp.local.";
3887 let service_ip_addr = my_ip_interfaces(false)
3888 .iter()
3889 .find(|iface| iface.ip().is_ipv4())
3890 .map(|iface| iface.ip())
3891 .unwrap();
3892
3893 let mut my_service = ServiceInfo::new(
3894 "_host_res_test._tcp.local.",
3895 "my_instance",
3896 hostname,
3897 &service_ip_addr,
3898 1234,
3899 None,
3900 )
3901 .expect("invalid service info");
3902
3903 let addr_ttl = 2;
3905 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
3908
3909 let client = ServiceDaemon::new().expect("Failed to create client");
3911 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3912 let resolved = loop {
3913 match event_receiver.recv() {
3914 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3915 assert!(found_hostname == hostname);
3916 assert!(addresses.contains(&service_ip_addr));
3917 println!("address found: {:?}", &addresses);
3918 break true;
3919 }
3920 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3921 Ok(_event) => {}
3922 Err(_) => break false,
3923 }
3924 };
3925
3926 assert!(resolved);
3927
3928 server.shutdown().unwrap();
3930
3931 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3933 let removed = loop {
3934 match event_receiver.recv_timeout(timeout) {
3935 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3936 assert!(removed_host == hostname);
3937 assert!(addresses.contains(&service_ip_addr));
3938
3939 println!(
3940 "address removed: hostname: {} addresses: {:?}",
3941 &hostname, &addresses
3942 );
3943 break true;
3944 }
3945 Ok(_event) => {}
3946 Err(_) => {
3947 break false;
3948 }
3949 }
3950 };
3951
3952 assert!(removed);
3953
3954 client.shutdown().unwrap();
3955 }
3956
3957 #[test]
3958 fn test_refresh_ptr() {
3959 let service_type = "_refresh-ptr._udp.local.";
3961 let instance = "test_instance";
3962 let host_name = "refresh_ptr_host.local.";
3963 let service_ip_addr = my_ip_interfaces(false)
3964 .iter()
3965 .find(|iface| iface.ip().is_ipv4())
3966 .map(|iface| iface.ip())
3967 .unwrap();
3968
3969 let mut my_service = ServiceInfo::new(
3970 service_type,
3971 instance,
3972 host_name,
3973 &service_ip_addr,
3974 5023,
3975 None,
3976 )
3977 .unwrap();
3978
3979 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
3981
3982 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3984 let result = mdns_server.register(my_service);
3985 assert!(result.is_ok());
3986
3987 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3988 let browse_chan = mdns_client.browse(service_type).unwrap();
3989 let timeout = Duration::from_millis(1500); let mut resolved = false;
3991
3992 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3994 match event {
3995 ServiceEvent::ServiceResolved(info) => {
3996 resolved = true;
3997 println!("Resolved a service of {}", &info.get_fullname());
3998 break;
3999 }
4000 _ => {}
4001 }
4002 }
4003
4004 assert!(resolved);
4005
4006 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4008 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4009 println!("event: {:?}", &event);
4010 }
4011
4012 let metrics_chan = mdns_client.get_metrics().unwrap();
4014 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4015 let refresh_counter = metrics["cache-refresh-ptr"];
4016 assert_eq!(refresh_counter, 1);
4017
4018 mdns_server.shutdown().unwrap();
4020 mdns_client.shutdown().unwrap();
4021 }
4022
4023 #[test]
4024 fn test_name_change() {
4025 assert_eq!(name_change("foo.local."), "foo (2).local.");
4026 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4027 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4028 assert_eq!(name_change("foo"), "foo (2)");
4029 assert_eq!(name_change("foo (2)"), "foo (3)");
4030 assert_eq!(name_change(""), " (2)");
4031
4032 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4037
4038 #[test]
4039 fn test_hostname_change() {
4040 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4041 assert_eq!(hostname_change("foo"), "foo-2");
4042 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4043 assert_eq!(hostname_change("foo-9"), "foo-10");
4044 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4045 }
4046}