1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38 FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{
42 split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43 },
44 Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51 cmp::{self, Reverse},
52 collections::{BinaryHeap, HashMap, HashSet},
53 fmt,
54 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55 str, thread,
56 time::Duration,
57 vec,
58};
59
60pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
68
69const MDNS_PORT: u16 = 5353;
70const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
71const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
72const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
73
74const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
75
76#[derive(Debug)]
78pub enum UnregisterStatus {
79 OK,
81 NotFound,
83}
84
85#[derive(Debug, PartialEq, Clone, Eq)]
87#[non_exhaustive]
88pub enum DaemonStatus {
89 Running,
91
92 Shutdown,
94}
95
96#[derive(Hash, Eq, PartialEq)]
99enum Counter {
100 Register,
101 RegisterResend,
102 Unregister,
103 UnregisterResend,
104 Browse,
105 ResolveHostname,
106 Respond,
107 CacheRefreshPTR,
108 CacheRefreshSRV,
109 CacheRefreshAddr,
110 KnownAnswerSuppression,
111}
112
113impl fmt::Display for Counter {
114 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
115 match self {
116 Self::Register => write!(f, "register"),
117 Self::RegisterResend => write!(f, "register-resend"),
118 Self::Unregister => write!(f, "unregister"),
119 Self::UnregisterResend => write!(f, "unregister-resend"),
120 Self::Browse => write!(f, "browse"),
121 Self::ResolveHostname => write!(f, "resolve-hostname"),
122 Self::Respond => write!(f, "respond"),
123 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
124 Self::CacheRefreshSRV => write!(f, "cache-refresh-srv"),
125 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
126 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
127 }
128 }
129}
130
131pub type Metrics = HashMap<String, i64>;
134
135const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
141pub struct ServiceDaemon {
142 sender: Sender<Command>,
144
145 signal_addr: SocketAddr,
151}
152
153impl ServiceDaemon {
154 pub fn new() -> Result<Self> {
159 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
162
163 let signal_sock = UdpSocket::bind(signal_addr)
164 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
165
166 let signal_addr = signal_sock
168 .local_addr()
169 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
170
171 signal_sock
173 .set_nonblocking(true)
174 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
175
176 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
177
178 let (sender, receiver) = bounded(100);
179
180 let mio_sock = MioUdpSocket::from_std(signal_sock);
182 thread::Builder::new()
183 .name("mDNS_daemon".to_string())
184 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
185 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
186
187 Ok(Self {
188 sender,
189 signal_addr,
190 })
191 }
192
193 fn send_cmd(&self, cmd: Command) -> Result<()> {
196 let cmd_name = cmd.to_string();
197
198 self.sender.try_send(cmd).map_err(|e| match e {
200 TrySendError::Full(_) => Error::Again,
201 e => e_fmt!("flume::channel::send failed: {}", e),
202 })?;
203
204 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
206 let socket = UdpSocket::bind(addr)
207 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
208 socket
209 .send_to(cmd_name.as_bytes(), self.signal_addr)
210 .map_err(|e| {
211 e_fmt!(
212 "signal socket send_to {} ({}) failed: {}",
213 self.signal_addr,
214 cmd_name,
215 e
216 )
217 })?;
218
219 Ok(())
220 }
221
222 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
233 check_domain_suffix(service_type)?;
234
235 let (resp_s, resp_r) = bounded(10);
236 self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
237 Ok(resp_r)
238 }
239
240 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
245 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
246 }
247
248 pub fn resolve_hostname(
256 &self,
257 hostname: &str,
258 timeout: Option<u64>,
259 ) -> Result<Receiver<HostnameResolutionEvent>> {
260 check_hostname(hostname)?;
261 let (resp_s, resp_r) = bounded(10);
262 self.send_cmd(Command::ResolveHostname(
263 hostname.to_string(),
264 1,
265 resp_s,
266 timeout,
267 ))?;
268 Ok(resp_r)
269 }
270
271 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
276 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
277 }
278
279 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
287 check_service_name(service_info.get_fullname())?;
288 check_hostname(service_info.get_hostname())?;
289
290 self.send_cmd(Command::Register(service_info))
291 }
292
293 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
301 let (resp_s, resp_r) = bounded(1);
302 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
303 Ok(resp_r)
304 }
305
306 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
310 let (resp_s, resp_r) = bounded(100);
311 self.send_cmd(Command::Monitor(resp_s))?;
312 Ok(resp_r)
313 }
314
315 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
320 let (resp_s, resp_r) = bounded(1);
321 self.send_cmd(Command::Exit(resp_s))?;
322 Ok(resp_r)
323 }
324
325 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
331 let (resp_s, resp_r) = bounded(1);
332
333 if self.sender.is_disconnected() {
334 resp_s
335 .send(DaemonStatus::Shutdown)
336 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
337 } else {
338 self.send_cmd(Command::GetStatus(resp_s))?;
339 }
340
341 Ok(resp_r)
342 }
343
344 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
349 let (resp_s, resp_r) = bounded(1);
350 self.send_cmd(Command::GetMetrics(resp_s))?;
351 Ok(resp_r)
352 }
353
354 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
361 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
364 return Err(Error::Msg(format!(
365 "service name length max {} is too large",
366 len_max
367 )));
368 }
369
370 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
371 }
372
373 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
380 let if_kind_vec = if_kind.into_vec();
381 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
382 if_kind_vec.kinds,
383 )))
384 }
385
386 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
393 let if_kind_vec = if_kind.into_vec();
394 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
395 if_kind_vec.kinds,
396 )))
397 }
398
399 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
415 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
416 }
417
418 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
434 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
435 }
436
437 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
450 self.send_cmd(Command::Verify(instance_fullname, timeout))
451 }
452
453 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
454 let zc = Zeroconf::new(signal_sock, poller);
455
456 if let Some(cmd) = Self::run(zc, receiver) {
457 match cmd {
458 Command::Exit(resp_s) => {
459 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
462 debug!("exit: failed to send response of shutdown: {}", e);
463 }
464 }
465 _ => {
466 debug!("Unexpected command: {:?}", cmd);
467 }
468 }
469 }
470 }
471
472 fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
481 if let Err(e) = zc.poller.registry().register(
483 &mut zc.signal_sock,
484 mio::Token(SIGNAL_SOCK_EVENT_KEY),
485 mio::Interest::READABLE,
486 ) {
487 debug!("failed to add signal socket to the poller: {}", e);
488 return None;
489 }
490
491 for (intf, sock) in zc.intf_socks.iter_mut() {
493 let key =
494 Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
495
496 if let Err(e) =
497 zc.poller
498 .registry()
499 .register(sock, mio::Token(key), mio::Interest::READABLE)
500 {
501 debug!("add socket of {:?} to poller: {e}", intf);
502 return None;
503 }
504 }
505
506 const IP_CHECK_INTERVAL_MILLIS: u64 = 30_000;
508 let mut next_ip_check = current_time_millis() + IP_CHECK_INTERVAL_MILLIS;
509 zc.add_timer(next_ip_check);
510
511 let mut events = mio::Events::with_capacity(1024);
514 loop {
515 let now = current_time_millis();
516
517 let earliest_timer = zc.peek_earliest_timer();
518 let timeout = earliest_timer.map(|timer| {
519 let millis = if timer > now { timer - now } else { 1 };
521 Duration::from_millis(millis)
522 });
523
524 events.clear();
526 match zc.poller.poll(&mut events, timeout) {
527 Ok(_) => zc.handle_poller_events(&events),
528 Err(e) => debug!("failed to select from sockets: {}", e),
529 }
530
531 let now = current_time_millis();
532
533 if let Some(timer) = earliest_timer {
535 if now >= timer {
536 zc.pop_earliest_timer();
537 }
538 }
539
540 for hostname in zc
542 .hostname_resolvers
543 .clone()
544 .into_iter()
545 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
546 .map(|(hostname, _)| hostname)
547 {
548 trace!("hostname resolver timeout for {}", &hostname);
549 call_hostname_resolution_listener(
550 &zc.hostname_resolvers,
551 &hostname,
552 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
553 );
554 call_hostname_resolution_listener(
555 &zc.hostname_resolvers,
556 &hostname,
557 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
558 );
559 zc.hostname_resolvers.remove(&hostname);
560 }
561
562 while let Ok(command) = receiver.try_recv() {
564 if matches!(command, Command::Exit(_)) {
565 zc.status = DaemonStatus::Shutdown;
566 return Some(command);
567 }
568 zc.exec_command(command, false);
569 }
570
571 let mut i = 0;
573 while i < zc.retransmissions.len() {
574 if now >= zc.retransmissions[i].next_time {
575 let rerun = zc.retransmissions.remove(i);
576 zc.exec_command(rerun.command, true);
577 } else {
578 i += 1;
579 }
580 }
581
582 zc.refresh_active_services();
584
585 let mut query_count = 0;
587 for (hostname, _sender) in zc.hostname_resolvers.iter() {
588 for (hostname, ip_addr) in
589 zc.cache.refresh_due_hostname_resolutions(hostname).iter()
590 {
591 zc.send_query(hostname, ip_address_rr_type(ip_addr));
592 query_count += 1;
593 }
594 }
595
596 zc.increase_counter(Counter::CacheRefreshAddr, query_count);
597
598 let now = current_time_millis();
600
601 let expired_services = zc.cache.evict_expired_services(now);
603 zc.notify_service_removal(expired_services);
604
605 let expired_addrs = zc.cache.evict_expired_addr(now);
607 for (hostname, addrs) in expired_addrs {
608 call_hostname_resolution_listener(
609 &zc.hostname_resolvers,
610 &hostname,
611 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
612 );
613 let instances = zc.cache.get_instances_on_host(&hostname);
614 let instance_set: HashSet<String> = instances.into_iter().collect();
615 zc.resolve_updated_instances(&instance_set);
616 }
617
618 zc.probing_handler();
620
621 if now > next_ip_check {
623 next_ip_check = now + IP_CHECK_INTERVAL_MILLIS;
624 zc.check_ip_changes();
625 zc.add_timer(next_ip_check);
626 }
627 }
628 }
629}
630
631fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
633 let intf_ip = &intf.ip();
636 match intf_ip {
637 IpAddr::V4(ip) => {
638 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
639 let sock = new_socket(addr.into(), true)?;
640
641 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
643 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
644
645 sock.set_multicast_if_v4(ip)
647 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
648
649 if !should_loop {
650 sock.set_multicast_loop_v4(false)
651 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
652 }
653
654 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
656 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
657 for packet in test_packets {
658 sock.send_to(&packet, &multicast_addr)
659 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
660 }
661 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
662 }
663 IpAddr::V6(ip) => {
664 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
665 let sock = new_socket(addr.into(), true)?;
666
667 sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
669 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
670
671 sock.set_multicast_if_v6(intf.index.unwrap_or(0))
673 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
674
675 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
680 }
681 }
682}
683
684fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
687 let domain = match addr {
688 SocketAddr::V4(_) => socket2::Domain::IPV4,
689 SocketAddr::V6(_) => socket2::Domain::IPV6,
690 };
691
692 let fd = Socket::new(domain, socket2::Type::DGRAM, None)
693 .map_err(|e| e_fmt!("create socket failed: {}", e))?;
694
695 fd.set_reuse_address(true)
696 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
697 #[cfg(unix)] fd.set_reuse_port(true)
699 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
700
701 if non_block {
702 fd.set_nonblocking(true)
703 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
704 }
705
706 fd.bind(&addr.into())
707 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
708
709 trace!("new socket bind to {}", &addr);
710 Ok(fd)
711}
712
713struct ReRun {
715 next_time: u64,
717 command: Command,
718}
719
720#[derive(Debug, Eq, Hash, PartialEq)]
722enum IpVersion {
723 V4,
724 V6,
725}
726
727#[derive(Debug, Eq, Hash, PartialEq)]
729struct MulticastSendTracker {
730 intf_index: u32,
731 ip_version: IpVersion,
732}
733
734fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
736 match intf.index {
737 Some(index) => {
738 let ip_ver = match intf.addr {
739 IfAddr::V4(_) => IpVersion::V4,
740 IfAddr::V6(_) => IpVersion::V6,
741 };
742 Some(MulticastSendTracker {
743 intf_index: index,
744 ip_version: ip_ver,
745 })
746 }
747 None => None,
748 }
749}
750
751#[derive(Debug, Clone)]
755#[non_exhaustive]
756pub enum IfKind {
757 All,
759
760 IPv4,
762
763 IPv6,
765
766 Name(String),
768
769 Addr(IpAddr),
771
772 LoopbackV4,
777
778 LoopbackV6,
780}
781
782impl IfKind {
783 fn matches(&self, intf: &Interface) -> bool {
785 match self {
786 Self::All => true,
787 Self::IPv4 => intf.ip().is_ipv4(),
788 Self::IPv6 => intf.ip().is_ipv6(),
789 Self::Name(ifname) => ifname == &intf.name,
790 Self::Addr(addr) => addr == &intf.ip(),
791 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
792 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
793 }
794 }
795}
796
797impl From<&str> for IfKind {
800 fn from(val: &str) -> Self {
801 Self::Name(val.to_string())
802 }
803}
804
805impl From<&String> for IfKind {
806 fn from(val: &String) -> Self {
807 Self::Name(val.to_string())
808 }
809}
810
811impl From<IpAddr> for IfKind {
813 fn from(val: IpAddr) -> Self {
814 Self::Addr(val)
815 }
816}
817
818pub struct IfKindVec {
820 kinds: Vec<IfKind>,
821}
822
823pub trait IntoIfKindVec {
825 fn into_vec(self) -> IfKindVec;
826}
827
828impl<T: Into<IfKind>> IntoIfKindVec for T {
829 fn into_vec(self) -> IfKindVec {
830 let if_kind: IfKind = self.into();
831 IfKindVec {
832 kinds: vec![if_kind],
833 }
834 }
835}
836
837impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
838 fn into_vec(self) -> IfKindVec {
839 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
840 IfKindVec { kinds }
841 }
842}
843
844struct IfSelection {
846 if_kind: IfKind,
848
849 selected: bool,
851}
852
853struct Zeroconf {
855 intf_socks: HashMap<Interface, MioUdpSocket>,
857
858 poll_ids: HashMap<usize, Interface>,
860
861 poll_id_count: usize,
863
864 my_services: HashMap<String, ServiceInfo>,
866
867 cache: DnsCache,
869
870 dns_registry_map: HashMap<Interface, DnsRegistry>,
872
873 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
883
884 counters: Metrics,
885
886 poller: Poll,
888
889 monitors: Vec<Sender<DaemonEvent>>,
891
892 service_name_len_max: u8,
894
895 if_selections: Vec<IfSelection>,
897
898 signal_sock: MioUdpSocket,
900
901 timers: BinaryHeap<Reverse<u64>>,
907
908 status: DaemonStatus,
909
910 pending_resolves: HashSet<String>,
912
913 resolved: HashSet<String>,
915
916 multicast_loop_v4: bool,
917
918 multicast_loop_v6: bool,
919}
920
921impl Zeroconf {
922 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
923 let my_ifaddrs = my_ip_interfaces(false);
925
926 let mut intf_socks = HashMap::new();
930 let mut dns_registry_map = HashMap::new();
931
932 for intf in my_ifaddrs {
933 let sock = match new_socket_bind(&intf, true) {
934 Ok(s) => s,
935 Err(e) => {
936 trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
937 continue;
938 }
939 };
940
941 dns_registry_map.insert(intf.clone(), DnsRegistry::new());
942
943 intf_socks.insert(intf, sock);
944 }
945
946 let monitors = Vec::new();
947 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
948
949 let timers = BinaryHeap::new();
950
951 let if_selections = vec![
953 IfSelection {
954 if_kind: IfKind::LoopbackV4,
955 selected: false,
956 },
957 IfSelection {
958 if_kind: IfKind::LoopbackV6,
959 selected: false,
960 },
961 ];
962
963 let status = DaemonStatus::Running;
964
965 Self {
966 intf_socks,
967 poll_ids: HashMap::new(),
968 poll_id_count: 0,
969 my_services: HashMap::new(),
970 cache: DnsCache::new(),
971 dns_registry_map,
972 hostname_resolvers: HashMap::new(),
973 service_queriers: HashMap::new(),
974 retransmissions: Vec::new(),
975 counters: HashMap::new(),
976 poller,
977 monitors,
978 service_name_len_max,
979 if_selections,
980 signal_sock,
981 timers,
982 status,
983 pending_resolves: HashSet::new(),
984 resolved: HashSet::new(),
985 multicast_loop_v4: true,
986 multicast_loop_v6: true,
987 }
988 }
989
990 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
991 match daemon_opt {
992 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
993 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
994 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
995 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
996 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
997 }
998 }
999
1000 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1001 for if_kind in kinds {
1002 self.if_selections.push(IfSelection {
1003 if_kind,
1004 selected: true,
1005 });
1006 }
1007
1008 self.apply_intf_selections(my_ip_interfaces(true));
1009 }
1010
1011 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1012 for if_kind in kinds {
1013 self.if_selections.push(IfSelection {
1014 if_kind,
1015 selected: false,
1016 });
1017 }
1018
1019 self.apply_intf_selections(my_ip_interfaces(true));
1020 }
1021
1022 fn set_multicast_loop_v4(&mut self, on: bool) {
1023 for (_, sock) in self.intf_socks.iter_mut() {
1024 if let Err(e) = sock.set_multicast_loop_v4(on) {
1025 debug!("failed to set multicast loop v4: {e}");
1026 }
1027 }
1028 }
1029
1030 fn set_multicast_loop_v6(&mut self, on: bool) {
1031 for (_, sock) in self.intf_socks.iter_mut() {
1032 if let Err(e) = sock.set_multicast_loop_v6(on) {
1033 debug!("failed to set multicast loop v6: {e}");
1034 }
1035 }
1036 }
1037
1038 fn notify_monitors(&mut self, event: DaemonEvent) {
1039 self.monitors.retain(|sender| {
1041 if let Err(e) = sender.try_send(event.clone()) {
1042 debug!("notify_monitors: try_send: {}", &e);
1043 if matches!(e, TrySendError::Disconnected(_)) {
1044 return false; }
1046 }
1047 true
1048 });
1049 }
1050
1051 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1053 for (_, service_info) in self.my_services.iter_mut() {
1054 if service_info.is_addr_auto() {
1055 service_info.remove_ipaddr(addr);
1056 }
1057 }
1058 }
1059
1060 fn add_poll(&mut self, intf: Interface) -> usize {
1062 Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1063 }
1064
1065 fn add_poll_impl(
1069 poll_ids: &mut HashMap<usize, Interface>,
1070 poll_id_count: &mut usize,
1071 intf: Interface,
1072 ) -> usize {
1073 let key = *poll_id_count;
1074 *poll_id_count += 1;
1075 let _ = (*poll_ids).insert(key, intf);
1076 key
1077 }
1078
1079 fn add_timer(&mut self, next_time: u64) {
1080 self.timers.push(Reverse(next_time));
1081 }
1082
1083 fn peek_earliest_timer(&self) -> Option<u64> {
1084 self.timers.peek().map(|Reverse(v)| *v)
1085 }
1086
1087 fn pop_earliest_timer(&mut self) -> Option<u64> {
1088 self.timers.pop().map(|Reverse(v)| v)
1089 }
1090
1091 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1093 let intf_count = interfaces.len();
1094 let mut intf_selections = vec![true; intf_count];
1095
1096 for selection in self.if_selections.iter() {
1098 for i in 0..intf_count {
1100 if selection.if_kind.matches(&interfaces[i]) {
1101 intf_selections[i] = selection.selected;
1102 }
1103 }
1104 }
1105
1106 let mut selected_addrs = HashSet::new();
1107 for i in 0..intf_count {
1108 if intf_selections[i] {
1109 selected_addrs.insert(interfaces[i].addr.ip());
1110 }
1111 }
1112
1113 selected_addrs
1114 }
1115
1116 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1121 let intf_count = interfaces.len();
1123 let mut intf_selections = vec![true; intf_count];
1124
1125 for selection in self.if_selections.iter() {
1127 for i in 0..intf_count {
1129 if selection.if_kind.matches(&interfaces[i]) {
1130 intf_selections[i] = selection.selected;
1131 }
1132 }
1133 }
1134
1135 for (idx, intf) in interfaces.into_iter().enumerate() {
1137 if intf_selections[idx] {
1138 if !self.intf_socks.contains_key(&intf) {
1140 debug!("apply_intf_selections: add {:?}", &intf.ip());
1141 self.add_new_interface(intf);
1142 }
1143 } else {
1144 if let Some(mut sock) = self.intf_socks.remove(&intf) {
1146 match self.poller.registry().deregister(&mut sock) {
1147 Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1148 Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1149 }
1150
1151 self.poll_ids.retain(|_, v| v != &intf);
1153
1154 self.cache.remove_addrs_on_disabled_intf(&intf);
1156 }
1157 }
1158 }
1159 }
1160
1161 fn check_ip_changes(&mut self) {
1163 let my_ifaddrs = my_ip_interfaces(true);
1165
1166 let poll_ids = &mut self.poll_ids;
1167 let poller = &mut self.poller;
1168 let deleted_addrs = self
1170 .intf_socks
1171 .iter_mut()
1172 .filter_map(|(intf, sock)| {
1173 if !my_ifaddrs.contains(intf) {
1174 if let Err(e) = poller.registry().deregister(sock) {
1175 debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1176 }
1177 poll_ids.retain(|_, v| v != intf);
1179 Some(intf.ip())
1180 } else {
1181 None
1182 }
1183 })
1184 .collect::<Vec<IpAddr>>();
1185
1186 for ip in deleted_addrs.iter() {
1188 self.del_addr_in_my_services(ip);
1189 self.notify_monitors(DaemonEvent::IpDel(*ip));
1190 }
1191
1192 self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1194
1195 self.apply_intf_selections(my_ifaddrs);
1197 }
1198
1199 fn add_new_interface(&mut self, intf: Interface) {
1200 let new_ip = intf.ip();
1202 let should_loop = if new_ip.is_ipv4() {
1203 self.multicast_loop_v4
1204 } else {
1205 self.multicast_loop_v6
1206 };
1207 let mut sock = match new_socket_bind(&intf, should_loop) {
1208 Ok(s) => s,
1209 Err(e) => {
1210 debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1211 return;
1212 }
1213 };
1214
1215 let key = self.add_poll(intf.clone());
1217 if let Err(e) =
1218 self.poller
1219 .registry()
1220 .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1221 {
1222 debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1223 return;
1224 }
1225
1226 debug!("add new interface {}: {new_ip}", intf.name);
1227 let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1228 Some(registry) => registry,
1229 None => self
1230 .dns_registry_map
1231 .entry(intf.clone())
1232 .or_insert_with(DnsRegistry::new),
1233 };
1234
1235 for (_, service_info) in self.my_services.iter_mut() {
1236 if service_info.is_addr_auto() {
1237 service_info.insert_ipaddr(new_ip);
1238
1239 if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1240 debug!(
1241 "Announce service {} on {}",
1242 service_info.get_fullname(),
1243 intf.ip()
1244 );
1245 service_info.set_status(&intf, ServiceStatus::Announced);
1246 } else {
1247 for timer in dns_registry.new_timers.drain(..) {
1248 self.timers.push(Reverse(timer));
1249 }
1250 service_info.set_status(&intf, ServiceStatus::Probing);
1251 }
1252 }
1253 }
1254
1255 self.intf_socks.insert(intf, sock);
1256
1257 self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1259 }
1260
1261 fn register_service(&mut self, mut info: ServiceInfo) {
1270 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1272 debug!("check_service_name_length: {}", &e);
1273 self.notify_monitors(DaemonEvent::Error(e));
1274 return;
1275 }
1276
1277 if info.is_addr_auto() {
1278 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1279 for addr in selected_addrs {
1280 info.insert_ipaddr(addr);
1281 }
1282 }
1283
1284 debug!("register service {:?}", &info);
1285
1286 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1287 if !outgoing_addrs.is_empty() {
1288 self.notify_monitors(DaemonEvent::Announce(
1289 info.get_fullname().to_string(),
1290 format!("{:?}", &outgoing_addrs),
1291 ));
1292 }
1293
1294 let service_fullname = info.get_fullname().to_lowercase();
1297 self.my_services.insert(service_fullname, info);
1298 }
1299
1300 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1303 let mut outgoing_addrs = Vec::new();
1304 let mut multicast_sent_trackers = HashSet::new();
1306
1307 let mut outgoing_intfs = Vec::new();
1308
1309 for (intf, sock) in self.intf_socks.iter() {
1310 if let Some(tracker) = multicast_send_tracker(intf) {
1311 if multicast_sent_trackers.contains(&tracker) {
1312 continue; }
1314 }
1315
1316 let dns_registry = match self.dns_registry_map.get_mut(intf) {
1317 Some(registry) => registry,
1318 None => self
1319 .dns_registry_map
1320 .entry(intf.clone())
1321 .or_insert_with(DnsRegistry::new),
1322 };
1323
1324 if announce_service_on_intf(dns_registry, info, intf, sock) {
1325 if let Some(tracker) = multicast_send_tracker(intf) {
1326 multicast_sent_trackers.insert(tracker);
1327 }
1328 outgoing_addrs.push(intf.ip());
1329 outgoing_intfs.push(intf.clone());
1330
1331 debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1332
1333 info.set_status(intf, ServiceStatus::Announced);
1334 } else {
1335 for timer in dns_registry.new_timers.drain(..) {
1336 self.timers.push(Reverse(timer));
1337 }
1338 info.set_status(intf, ServiceStatus::Probing);
1339 }
1340 }
1341
1342 let next_time = current_time_millis() + 1000;
1346 for intf in outgoing_intfs {
1347 self.add_retransmission(
1348 next_time,
1349 Command::RegisterResend(info.get_fullname().to_string(), intf),
1350 );
1351 }
1352
1353 outgoing_addrs
1354 }
1355
1356 fn probing_handler(&mut self) {
1358 let now = current_time_millis();
1359
1360 for (intf, sock) in self.intf_socks.iter() {
1361 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1362 continue;
1363 };
1364
1365 let mut expired_probe_names = Vec::new();
1366 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1367
1368 for (name, probe) in dns_registry.probing.iter_mut() {
1369 if now >= probe.next_send {
1370 if probe.expired(now) {
1371 expired_probe_names.push(name.clone());
1373 } else {
1374 out.add_question(name, RRType::ANY);
1375
1376 for record in probe.records.iter() {
1384 out.add_authority(record.clone());
1385 }
1386
1387 probe.update_next_send(now);
1388
1389 self.timers.push(Reverse(probe.next_send));
1391 }
1392 }
1393 }
1394
1395 if !out.questions().is_empty() {
1397 debug!("sending out probing of {} questions", out.questions().len());
1398 send_dns_outgoing(&out, intf, sock);
1399 }
1400
1401 let mut waiting_services = HashSet::new();
1402
1403 for name in expired_probe_names {
1404 let Some(probe) = dns_registry.probing.remove(&name) else {
1405 continue;
1406 };
1407
1408 for record in probe.records.iter() {
1410 if let Some(new_name) = record.get_record().get_new_name() {
1411 dns_registry
1412 .name_changes
1413 .insert(name.clone(), new_name.to_string());
1414
1415 let event = DnsNameChange {
1416 original: record.get_record().get_original_name().to_string(),
1417 new_name: new_name.to_string(),
1418 rr_type: record.get_type(),
1419 intf_name: intf.name.to_string(),
1420 };
1421 notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1422 }
1423 }
1424
1425 debug!(
1427 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1428 probe.records.len(),
1429 probe.waiting_services.len(),
1430 );
1431
1432 if !probe.records.is_empty() {
1434 match dns_registry.active.get_mut(&name) {
1435 Some(records) => {
1436 records.extend(probe.records);
1437 }
1438 None => {
1439 dns_registry.active.insert(name, probe.records);
1440 }
1441 }
1442
1443 waiting_services.extend(probe.waiting_services);
1444 }
1445 }
1446
1447 for service_name in waiting_services {
1449 debug!(
1450 "try to announce service {service_name} on intf {}",
1451 intf.ip()
1452 );
1453 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1455 if info.get_status(intf) == ServiceStatus::Announced {
1456 debug!("service {} already announced", info.get_fullname());
1457 continue;
1458 }
1459
1460 if announce_service_on_intf(dns_registry, info, intf, sock) {
1461 let next_time = now + 1000;
1462 let command =
1463 Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1464 self.retransmissions.push(ReRun { next_time, command });
1465 self.timers.push(Reverse(next_time));
1466
1467 let fullname = match dns_registry.name_changes.get(&service_name) {
1468 Some(new_name) => new_name.to_string(),
1469 None => service_name.to_string(),
1470 };
1471
1472 let mut hostname = info.get_hostname();
1473 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1474 hostname = new_name;
1475 }
1476
1477 debug!("wake up: announce service {} on {}", fullname, intf.ip());
1478 notify_monitors(
1479 &mut self.monitors,
1480 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1481 );
1482
1483 info.set_status(intf, ServiceStatus::Announced);
1484 }
1485 }
1486 }
1487 }
1488 }
1489
1490 fn unregister_service(
1491 &self,
1492 info: &ServiceInfo,
1493 intf: &Interface,
1494 sock: &MioUdpSocket,
1495 ) -> Vec<u8> {
1496 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1497 out.add_answer_at_time(
1498 DnsPointer::new(
1499 info.get_type(),
1500 RRType::PTR,
1501 CLASS_IN,
1502 0,
1503 info.get_fullname().to_string(),
1504 ),
1505 0,
1506 );
1507
1508 if let Some(sub) = info.get_subtype() {
1509 trace!("Adding subdomain {}", sub);
1510 out.add_answer_at_time(
1511 DnsPointer::new(
1512 sub,
1513 RRType::PTR,
1514 CLASS_IN,
1515 0,
1516 info.get_fullname().to_string(),
1517 ),
1518 0,
1519 );
1520 }
1521
1522 out.add_answer_at_time(
1523 DnsSrv::new(
1524 info.get_fullname(),
1525 CLASS_IN | CLASS_CACHE_FLUSH,
1526 0,
1527 info.get_priority(),
1528 info.get_weight(),
1529 info.get_port(),
1530 info.get_hostname().to_string(),
1531 ),
1532 0,
1533 );
1534 out.add_answer_at_time(
1535 DnsTxt::new(
1536 info.get_fullname(),
1537 CLASS_IN | CLASS_CACHE_FLUSH,
1538 0,
1539 info.generate_txt(),
1540 ),
1541 0,
1542 );
1543
1544 for address in info.get_addrs_on_intf(intf) {
1545 out.add_answer_at_time(
1546 DnsAddress::new(
1547 info.get_hostname(),
1548 ip_address_rr_type(&address),
1549 CLASS_IN | CLASS_CACHE_FLUSH,
1550 0,
1551 address,
1552 ),
1553 0,
1554 );
1555 }
1556
1557 send_dns_outgoing(&out, intf, sock).remove(0)
1559 }
1560
1561 fn add_hostname_resolver(
1565 &mut self,
1566 hostname: String,
1567 listener: Sender<HostnameResolutionEvent>,
1568 timeout: Option<u64>,
1569 ) {
1570 let real_timeout = timeout.map(|t| current_time_millis() + t);
1571 self.hostname_resolvers
1572 .insert(hostname, (listener, real_timeout));
1573 if let Some(t) = real_timeout {
1574 self.add_timer(t);
1575 }
1576 }
1577
1578 fn send_query(&self, name: &str, qtype: RRType) {
1580 self.send_query_vec(&[(name, qtype)]);
1581 }
1582
1583 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1585 trace!("Sending query questions: {:?}", questions);
1586 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1587 let now = current_time_millis();
1588
1589 for (name, qtype) in questions {
1590 out.add_question(name, *qtype);
1591
1592 for record in self.cache.get_known_answers(name, *qtype, now) {
1593 trace!("add known answer: {:?}", record);
1601 let mut new_record = record.clone();
1602 new_record.get_record_mut().update_ttl(now);
1603 out.add_answer_box(new_record);
1604 }
1605 }
1606
1607 let mut multicast_sent_trackers = HashSet::new();
1609 for (intf, sock) in self.intf_socks.iter() {
1610 if let Some(tracker) = multicast_send_tracker(intf) {
1611 if multicast_sent_trackers.contains(&tracker) {
1612 continue; }
1614 multicast_sent_trackers.insert(tracker);
1615 }
1616 send_dns_outgoing(&out, intf, sock);
1617 }
1618 }
1619
1620 fn handle_read(&mut self, intf: &Interface) -> bool {
1625 let sock = match self.intf_socks.get_mut(intf) {
1626 Some(if_sock) => if_sock,
1627 None => return false,
1628 };
1629 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1630
1631 let sz = match sock.recv(&mut buf) {
1638 Ok(sz) => sz,
1639 Err(e) => {
1640 if e.kind() != std::io::ErrorKind::WouldBlock {
1641 debug!("listening socket read failed: {}", e);
1642 }
1643 return false;
1644 }
1645 };
1646
1647 trace!("received {} bytes at IP: {}", sz, intf.ip());
1648
1649 if sz == 0 {
1651 debug!("socket {:?} was likely shutdown", &sock);
1652 if let Err(e) = self.poller.registry().deregister(sock) {
1653 debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1654 }
1655
1656 let should_loop = if intf.ip().is_ipv4() {
1658 self.multicast_loop_v4
1659 } else {
1660 self.multicast_loop_v6
1661 };
1662 match new_socket_bind(intf, should_loop) {
1663 Ok(new_sock) => {
1664 trace!("reset socket for IP {}", intf.ip());
1665 self.intf_socks.insert(intf.clone(), new_sock);
1666 }
1667 Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1668 }
1669
1670 return false;
1671 }
1672
1673 buf.truncate(sz); match DnsIncoming::new(buf) {
1676 Ok(msg) => {
1677 if msg.is_query() {
1678 self.handle_query(msg, intf);
1679 } else if msg.is_response() {
1680 self.handle_response(msg, intf);
1681 } else {
1682 debug!("Invalid message: not query and not response");
1683 }
1684 }
1685 Err(e) => debug!("Invalid incoming DNS message: {}", e),
1686 }
1687
1688 true
1689 }
1690
1691 fn query_unresolved(&mut self, instance: &str) -> bool {
1693 if !valid_instance_name(instance) {
1694 trace!("instance name {} not valid", instance);
1695 return false;
1696 }
1697
1698 if let Some(records) = self.cache.get_srv(instance) {
1699 for record in records {
1700 if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1701 if self.cache.get_addr(srv.host()).is_none() {
1702 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1703 return true;
1704 }
1705 }
1706 }
1707 } else {
1708 self.send_query(instance, RRType::ANY);
1709 return true;
1710 }
1711
1712 false
1713 }
1714
1715 fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1718 let mut resolved: HashSet<String> = HashSet::new();
1719 let mut unresolved: HashSet<String> = HashSet::new();
1720
1721 if let Some(records) = self.cache.get_ptr(ty_domain) {
1722 for record in records.iter() {
1723 if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1724 let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1725 Ok(ok) => ok,
1726 Err(err) => {
1727 debug!("Error while creating service info from cache: {}", err);
1728 continue;
1729 }
1730 };
1731
1732 match sender.send(ServiceEvent::ServiceFound(
1733 ty_domain.to_string(),
1734 ptr.alias().to_string(),
1735 )) {
1736 Ok(()) => debug!("send service found {}", ptr.alias()),
1737 Err(e) => {
1738 debug!("failed to send service found: {}", e);
1739 continue;
1740 }
1741 }
1742
1743 if info.is_ready() {
1744 resolved.insert(ptr.alias().to_string());
1745 match sender.send(ServiceEvent::ServiceResolved(info)) {
1746 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1747 Err(e) => debug!("failed to send service resolved: {}", e),
1748 }
1749 } else {
1750 unresolved.insert(ptr.alias().to_string());
1751 }
1752 }
1753 }
1754 }
1755
1756 for instance in resolved.drain() {
1757 self.pending_resolves.remove(&instance);
1758 self.resolved.insert(instance);
1759 }
1760
1761 for instance in unresolved.drain() {
1762 self.add_pending_resolve(instance);
1763 }
1764 }
1765
1766 fn query_cache_for_hostname(
1769 &mut self,
1770 hostname: &str,
1771 sender: Sender<HostnameResolutionEvent>,
1772 ) {
1773 let addresses = self.cache.get_addresses_for_host(hostname);
1774 if !addresses.is_empty() {
1775 match sender.send(HostnameResolutionEvent::AddressesFound(
1776 hostname.to_string(),
1777 addresses,
1778 )) {
1779 Ok(()) => trace!("sent hostname addresses found"),
1780 Err(e) => debug!("failed to send hostname addresses found: {}", e),
1781 }
1782 }
1783 }
1784
1785 fn add_pending_resolve(&mut self, instance: String) {
1786 if !self.pending_resolves.contains(&instance) {
1787 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1788 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1789 self.pending_resolves.insert(instance);
1790 }
1791 }
1792
1793 fn create_service_info_from_cache(
1794 &self,
1795 ty_domain: &str,
1796 fullname: &str,
1797 ) -> Result<ServiceInfo> {
1798 let my_name = {
1799 let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1800 name.strip_suffix('.').unwrap_or(name).to_string()
1801 };
1802
1803 let now = current_time_millis();
1804 let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1805
1806 if let Some(subtype) = self.cache.get_subtype(fullname) {
1808 trace!(
1809 "ty_domain: {} found subtype {} for instance: {}",
1810 ty_domain,
1811 subtype,
1812 fullname
1813 );
1814 if info.get_subtype().is_none() {
1815 info.set_subtype(subtype.clone());
1816 }
1817 }
1818
1819 if let Some(records) = self.cache.get_srv(fullname) {
1821 if let Some(answer) = records.first() {
1822 if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1823 info.set_hostname(dns_srv.host().to_string());
1824 info.set_port(dns_srv.port());
1825 }
1826 }
1827 }
1828
1829 if let Some(records) = self.cache.get_txt(fullname) {
1831 if let Some(record) = records.first() {
1832 if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1833 info.set_properties_from_txt(dns_txt.text());
1834 }
1835 }
1836 }
1837
1838 if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1840 for answer in records.iter() {
1841 if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1842 if dns_a.get_record().is_expired(now) {
1843 trace!("Addr expired: {}", dns_a.address());
1844 } else {
1845 info.insert_ipaddr(dns_a.address());
1846 }
1847 }
1848 }
1849 }
1850
1851 Ok(info)
1852 }
1853
1854 fn handle_poller_events(&mut self, events: &mio::Events) {
1855 for ev in events.iter() {
1856 trace!("event received with key {:?}", ev.token());
1857 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1858 self.signal_sock_drain();
1860
1861 if let Err(e) = self.poller.registry().reregister(
1862 &mut self.signal_sock,
1863 ev.token(),
1864 mio::Interest::READABLE,
1865 ) {
1866 debug!("failed to modify poller for signal socket: {}", e);
1867 }
1868 continue; }
1870
1871 let intf = match self.poll_ids.get(&ev.token().0) {
1873 Some(interface) => interface.clone(),
1874 None => {
1875 debug!("Ip for event key {} not found", ev.token().0);
1876 break;
1877 }
1878 };
1879 while self.handle_read(&intf) {}
1880
1881 if let Some(sock) = self.intf_socks.get_mut(&intf) {
1883 if let Err(e) =
1884 self.poller
1885 .registry()
1886 .reregister(sock, ev.token(), mio::Interest::READABLE)
1887 {
1888 debug!("modify poller for interface {:?}: {}", &intf, e);
1889 break;
1890 }
1891 }
1892 }
1893 }
1894
1895 fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1898 trace!(
1899 "handle_response: {} answers {} authorities {} additionals",
1900 msg.answers().len(),
1901 &msg.authorities().len(),
1902 &msg.num_additionals()
1903 );
1904 let now = current_time_millis();
1905
1906 let mut record_predicate = |record: &DnsRecordBox| {
1908 if !record.get_record().is_expired(now) {
1909 return true;
1910 }
1911
1912 debug!("record is expired, removing it from cache.");
1913 if self.cache.remove(record) {
1914 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1916 call_service_listener(
1917 &self.service_queriers,
1918 dns_ptr.get_name(),
1919 ServiceEvent::ServiceRemoved(
1920 dns_ptr.get_name().to_string(),
1921 dns_ptr.alias().to_string(),
1922 ),
1923 );
1924 }
1925 }
1926 false
1927 };
1928 msg.answers_mut().retain(&mut record_predicate);
1929 msg.authorities_mut().retain(&mut record_predicate);
1930 msg.additionals_mut().retain(&mut record_predicate);
1931
1932 self.conflict_handler(&msg, intf);
1934
1935 struct InstanceChange {
1937 ty: RRType, name: String, }
1940
1941 let mut changes = Vec::new();
1949 let mut timers = Vec::new();
1950 for record in msg.all_records() {
1951 match self.cache.add_or_update(intf, record, &mut timers) {
1952 Some((dns_record, true)) => {
1953 timers.push(dns_record.get_record().get_expire_time());
1954 timers.push(dns_record.get_record().get_refresh_time());
1955
1956 let ty = dns_record.get_type();
1957 let name = dns_record.get_name();
1958 if ty == RRType::PTR {
1959 if self.service_queriers.contains_key(name) {
1960 timers.push(dns_record.get_record().get_refresh_time());
1961 }
1962
1963 if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
1965 call_service_listener(
1966 &self.service_queriers,
1967 name,
1968 ServiceEvent::ServiceFound(
1969 name.to_string(),
1970 dns_ptr.alias().to_string(),
1971 ),
1972 );
1973 changes.push(InstanceChange {
1974 ty,
1975 name: dns_ptr.alias().to_string(),
1976 });
1977 }
1978 } else {
1979 changes.push(InstanceChange {
1980 ty,
1981 name: name.to_string(),
1982 });
1983 }
1984 }
1985 Some((dns_record, false)) => {
1986 timers.push(dns_record.get_record().get_expire_time());
1987 timers.push(dns_record.get_record().get_refresh_time());
1988 }
1989 _ => {}
1990 }
1991 }
1992
1993 for t in timers {
1995 self.add_timer(t);
1996 }
1997
1998 changes
2000 .iter()
2001 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2002 .map(|change| change.name.clone())
2003 .collect::<HashSet<String>>()
2004 .iter()
2005 .map(|hostname| (hostname, self.cache.get_addresses_for_host(hostname)))
2006 .for_each(|(hostname, addresses)| {
2007 call_hostname_resolution_listener(
2008 &self.hostname_resolvers,
2009 hostname,
2010 HostnameResolutionEvent::AddressesFound(hostname.to_string(), addresses),
2011 )
2012 });
2013
2014 let mut updated_instances = HashSet::new();
2016 for update in changes {
2017 match update.ty {
2018 RRType::PTR | RRType::SRV | RRType::TXT => {
2019 updated_instances.insert(update.name);
2020 }
2021 RRType::A | RRType::AAAA => {
2022 let instances = self.cache.get_instances_on_host(&update.name);
2023 updated_instances.extend(instances);
2024 }
2025 _ => {}
2026 }
2027 }
2028
2029 self.resolve_updated_instances(&updated_instances);
2030 }
2031
2032 fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2033 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2034 return;
2035 };
2036
2037 for answer in msg.answers().iter() {
2038 let mut new_records = Vec::new();
2039
2040 let name = answer.get_name();
2041 let Some(probe) = dns_registry.probing.get_mut(name) else {
2042 continue;
2043 };
2044
2045 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2047 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2048 if !valid_ip_on_intf(&answer_addr.address(), intf) {
2049 debug!(
2050 "conflict handler: answer addr {:?} not in the subnet of {:?}",
2051 answer_addr, intf
2052 );
2053 continue;
2054 }
2055 }
2056
2057 let any_match = probe.records.iter().any(|r| {
2060 r.get_type() == answer.get_type()
2061 && r.get_class() == answer.get_class()
2062 && r.rrdata_match(answer.as_ref())
2063 });
2064 if any_match {
2065 continue; }
2067 }
2068
2069 probe.records.retain(|record| {
2070 if record.get_type() == answer.get_type()
2071 && record.get_class() == answer.get_class()
2072 && !record.rrdata_match(answer.as_ref())
2073 {
2074 debug!(
2075 "found conflict name: '{name}' record: {}: {} PEER: {}",
2076 record.get_type(),
2077 record.rdata_print(),
2078 answer.rdata_print()
2079 );
2080
2081 let mut new_record = record.clone();
2084 let new_name = match record.get_type() {
2085 RRType::A => hostname_change(name),
2086 RRType::AAAA => hostname_change(name),
2087 _ => name_change(name),
2088 };
2089 new_record.get_record_mut().set_new_name(new_name);
2090 new_records.push(new_record);
2091 return false; }
2093
2094 true
2095 });
2096
2097 let create_time = current_time_millis() + fastrand::u64(0..250);
2104
2105 let waiting_services = probe.waiting_services.clone();
2106
2107 for record in new_records {
2108 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2109 self.timers.push(Reverse(create_time));
2110 }
2111
2112 dns_registry.name_changes.insert(
2114 record.get_record().get_original_name().to_string(),
2115 record.get_name().to_string(),
2116 );
2117
2118 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2119 Some(p) => p,
2120 None => {
2121 let new_probe = dns_registry
2122 .probing
2123 .entry(record.get_name().to_string())
2124 .or_insert_with(|| {
2125 debug!("conflict handler: new probe of {}", record.get_name());
2126 Probe::new(create_time)
2127 });
2128 self.timers.push(Reverse(new_probe.next_send));
2129 new_probe
2130 }
2131 };
2132
2133 debug!(
2134 "insert record with new name '{}' {} into probe",
2135 record.get_name(),
2136 record.get_type()
2137 );
2138 new_probe.insert_record(record);
2139
2140 new_probe.waiting_services.extend(waiting_services.clone());
2141 }
2142 }
2143 }
2144
2145 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2152 let mut resolved: HashSet<String> = HashSet::new();
2153 let mut unresolved: HashSet<String> = HashSet::new();
2154 let mut removed_instances = HashMap::new();
2155
2156 for (ty_domain, records) in self.cache.all_ptr().iter() {
2157 if !self.service_queriers.contains_key(ty_domain) {
2158 continue;
2160 }
2161
2162 for record in records.iter() {
2163 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2164 if updated_instances.contains(dns_ptr.alias()) {
2165 if let Ok(info) =
2166 self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2167 {
2168 if info.is_ready() {
2169 debug!("call queriers to resolve {}", dns_ptr.alias());
2170 resolved.insert(dns_ptr.alias().to_string());
2171 call_service_listener(
2172 &self.service_queriers,
2173 ty_domain,
2174 ServiceEvent::ServiceResolved(info),
2175 );
2176 } else {
2177 if self.resolved.remove(dns_ptr.alias()) {
2178 removed_instances
2179 .entry(ty_domain.to_string())
2180 .or_insert_with(HashSet::new)
2181 .insert(dns_ptr.alias().to_string());
2182 }
2183 unresolved.insert(dns_ptr.alias().to_string());
2184 }
2185 }
2186 }
2187 }
2188 }
2189 }
2190
2191 for instance in resolved.drain() {
2192 self.pending_resolves.remove(&instance);
2193 self.resolved.insert(instance);
2194 }
2195
2196 for instance in unresolved.drain() {
2197 self.add_pending_resolve(instance);
2198 }
2199
2200 self.notify_service_removal(removed_instances);
2201 }
2202
2203 fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2205 let sock = match self.intf_socks.get(intf) {
2206 Some(sock) => sock,
2207 None => return,
2208 };
2209 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2210
2211 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2214
2215 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2216 debug!("missing dns registry for intf {}", intf.ip());
2217 return;
2218 };
2219
2220 for question in msg.questions().iter() {
2221 trace!("query question: {:?}", &question);
2222
2223 let qtype = question.entry_type();
2224
2225 if qtype == RRType::PTR {
2226 for service in self.my_services.values() {
2227 if service.get_status(intf) != ServiceStatus::Announced {
2228 continue;
2229 }
2230
2231 if question.entry_name() == service.get_type()
2232 || service
2233 .get_subtype()
2234 .as_ref()
2235 .is_some_and(|v| v == question.entry_name())
2236 {
2237 add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2238 } else if question.entry_name() == META_QUERY {
2239 let ptr_added = out.add_answer(
2240 &msg,
2241 DnsPointer::new(
2242 question.entry_name(),
2243 RRType::PTR,
2244 CLASS_IN,
2245 service.get_other_ttl(),
2246 service.get_type().to_string(),
2247 ),
2248 );
2249 if !ptr_added {
2250 trace!("answer was not added for meta-query {:?}", &question);
2251 }
2252 }
2253 }
2254 } else {
2255 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2257 let probe_name = question.entry_name();
2258
2259 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2260 let now = current_time_millis();
2261
2262 if probe.start_time < now {
2266 let incoming_records: Vec<_> = msg
2267 .authorities()
2268 .iter()
2269 .filter(|r| r.get_name() == probe_name)
2270 .collect();
2271
2272 match probe.tiebreaking(&incoming_records) {
2282 cmp::Ordering::Less => {
2283 debug!(
2284 "tiebreaking '{}': LOST, will wait for one second",
2285 probe_name
2286 );
2287 probe.start_time = now + 1000; probe.next_send = now + 1000;
2289 }
2290 ordering => {
2291 debug!("tiebreaking '{}': {:?}", probe_name, ordering);
2292 }
2293 }
2294 }
2295 }
2296 }
2297
2298 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2299 for service in self.my_services.values() {
2300 if service.get_status(intf) != ServiceStatus::Announced {
2301 continue;
2302 }
2303
2304 let service_hostname =
2305 match dns_registry.name_changes.get(service.get_hostname()) {
2306 Some(new_name) => new_name,
2307 None => service.get_hostname(),
2308 };
2309
2310 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2311 let intf_addrs = service.get_addrs_on_intf(intf);
2312 if intf_addrs.is_empty()
2313 && (qtype == RRType::A || qtype == RRType::AAAA)
2314 {
2315 let t = match qtype {
2316 RRType::A => "TYPE_A",
2317 RRType::AAAA => "TYPE_AAAA",
2318 _ => "invalid_type",
2319 };
2320 trace!(
2321 "Cannot find valid addrs for {} response on intf {:?}",
2322 t,
2323 &intf
2324 );
2325 return;
2326 }
2327 for address in intf_addrs {
2328 out.add_answer(
2329 &msg,
2330 DnsAddress::new(
2331 question.entry_name(),
2332 ip_address_rr_type(&address),
2333 CLASS_IN | CLASS_CACHE_FLUSH,
2334 service.get_host_ttl(),
2335 address,
2336 ),
2337 );
2338 }
2339 }
2340 }
2341 }
2342
2343 let query_name = question.entry_name().to_lowercase();
2344 let service_opt = self
2345 .my_services
2346 .iter()
2347 .find(|(k, _v)| {
2348 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2349 Some(new_name) => new_name,
2350 None => k,
2351 };
2352 service_name == &query_name
2353 })
2354 .map(|(_, v)| v);
2355
2356 let Some(service) = service_opt else {
2357 continue;
2358 };
2359
2360 if service.get_status(intf) != ServiceStatus::Announced {
2361 continue;
2362 }
2363
2364 if qtype == RRType::SRV || qtype == RRType::ANY {
2365 out.add_answer(
2366 &msg,
2367 DnsSrv::new(
2368 question.entry_name(),
2369 CLASS_IN | CLASS_CACHE_FLUSH,
2370 service.get_host_ttl(),
2371 service.get_priority(),
2372 service.get_weight(),
2373 service.get_port(),
2374 service.get_hostname().to_string(),
2375 ),
2376 );
2377 }
2378
2379 if qtype == RRType::TXT || qtype == RRType::ANY {
2380 out.add_answer(
2381 &msg,
2382 DnsTxt::new(
2383 question.entry_name(),
2384 CLASS_IN | CLASS_CACHE_FLUSH,
2385 service.get_host_ttl(),
2386 service.generate_txt(),
2387 ),
2388 );
2389 }
2390
2391 if qtype == RRType::SRV {
2392 let intf_addrs = service.get_addrs_on_intf(intf);
2393 if intf_addrs.is_empty() {
2394 debug!(
2395 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2396 &intf
2397 );
2398 return;
2399 }
2400 for address in intf_addrs {
2401 out.add_additional_answer(DnsAddress::new(
2402 service.get_hostname(),
2403 ip_address_rr_type(&address),
2404 CLASS_IN | CLASS_CACHE_FLUSH,
2405 service.get_host_ttl(),
2406 address,
2407 ));
2408 }
2409 }
2410 }
2411 }
2412
2413 if !out.answers_count() > 0 {
2414 out.set_id(msg.id());
2415 send_dns_outgoing(&out, intf, sock);
2416
2417 self.increase_counter(Counter::Respond, 1);
2418 self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2419 }
2420
2421 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2422 }
2423
2424 fn increase_counter(&mut self, counter: Counter, count: i64) {
2426 let key = counter.to_string();
2427 match self.counters.get_mut(&key) {
2428 Some(v) => *v += count,
2429 None => {
2430 self.counters.insert(key, count);
2431 }
2432 }
2433 }
2434
2435 fn signal_sock_drain(&self) {
2436 let mut signal_buf = [0; 1024];
2437
2438 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2440 trace!(
2441 "signal socket recvd: {}",
2442 String::from_utf8_lossy(&signal_buf[0..sz])
2443 );
2444 }
2445 }
2446
2447 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2448 self.retransmissions.push(ReRun { next_time, command });
2449 self.add_timer(next_time);
2450 }
2451
2452 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2454 for (ty_domain, sender) in self.service_queriers.iter() {
2455 if let Some(instances) = expired.get(ty_domain) {
2456 for instance_name in instances {
2457 let event = ServiceEvent::ServiceRemoved(
2458 ty_domain.to_string(),
2459 instance_name.to_string(),
2460 );
2461 match sender.send(event) {
2462 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2463 Err(e) => debug!("Failed to send event: {}", e),
2464 }
2465 }
2466 }
2467 }
2468 }
2469
2470 fn exec_command(&mut self, command: Command, repeating: bool) {
2474 match command {
2475 Command::Browse(ty, next_delay, listener) => {
2476 self.exec_command_browse(repeating, ty, next_delay, listener);
2477 }
2478
2479 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2480 self.exec_command_resolve_hostname(
2481 repeating, hostname, next_delay, listener, timeout,
2482 );
2483 }
2484
2485 Command::Register(service_info) => {
2486 self.register_service(service_info);
2487 self.increase_counter(Counter::Register, 1);
2488 }
2489
2490 Command::RegisterResend(fullname, intf) => {
2491 trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2492 self.exec_command_register_resend(fullname, intf);
2493 }
2494
2495 Command::Unregister(fullname, resp_s) => {
2496 trace!("unregister service {} repeat {}", &fullname, &repeating);
2497 self.exec_command_unregister(repeating, fullname, resp_s);
2498 }
2499
2500 Command::UnregisterResend(packet, ip) => {
2501 self.exec_command_unregister_resend(packet, ip);
2502 }
2503
2504 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2505
2506 Command::StopResolveHostname(hostname) => {
2507 self.exec_command_stop_resolve_hostname(hostname)
2508 }
2509
2510 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2511
2512 Command::GetMetrics(resp_s) => match resp_s.send(self.counters.clone()) {
2513 Ok(()) => trace!("Sent metrics to the client"),
2514 Err(e) => debug!("Failed to send metrics: {}", e),
2515 },
2516
2517 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2518 Ok(()) => trace!("Sent status to the client"),
2519 Err(e) => debug!("Failed to send status: {}", e),
2520 },
2521
2522 Command::Monitor(resp_s) => {
2523 self.monitors.push(resp_s);
2524 }
2525
2526 Command::SetOption(daemon_opt) => {
2527 self.process_set_option(daemon_opt);
2528 }
2529
2530 Command::Verify(instance_fullname, timeout) => {
2531 self.exec_command_verify(instance_fullname, timeout, repeating);
2532 }
2533
2534 _ => {
2535 debug!("unexpected command: {:?}", &command);
2536 }
2537 }
2538 }
2539
2540 fn exec_command_browse(
2541 &mut self,
2542 repeating: bool,
2543 ty: String,
2544 next_delay: u32,
2545 listener: Sender<ServiceEvent>,
2546 ) {
2547 let pretty_addrs: Vec<String> = self
2548 .intf_socks
2549 .keys()
2550 .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2551 .collect();
2552
2553 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2554 "{ty} on {} interfaces [{}]",
2555 pretty_addrs.len(),
2556 pretty_addrs.join(", ")
2557 ))) {
2558 debug!(
2559 "Failed to send SearchStarted({})(repeating:{}): {}",
2560 &ty, repeating, e
2561 );
2562 return;
2563 }
2564 if !repeating {
2565 self.service_queriers.insert(ty.clone(), listener.clone());
2569
2570 self.query_cache_for_service(&ty, &listener);
2572 }
2573
2574 self.send_query(&ty, RRType::PTR);
2575 self.increase_counter(Counter::Browse, 1);
2576
2577 let next_time = current_time_millis() + (next_delay * 1000) as u64;
2578 let max_delay = 60 * 60;
2579 let delay = cmp::min(next_delay * 2, max_delay);
2580 self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2581 }
2582
2583 fn exec_command_resolve_hostname(
2584 &mut self,
2585 repeating: bool,
2586 hostname: String,
2587 next_delay: u32,
2588 listener: Sender<HostnameResolutionEvent>,
2589 timeout: Option<u64>,
2590 ) {
2591 let addr_list: Vec<_> = self.intf_socks.keys().collect();
2592 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2593 "{} on addrs {:?}",
2594 &hostname, &addr_list
2595 ))) {
2596 debug!(
2597 "Failed to send ResolveStarted({})(repeating:{}): {}",
2598 &hostname, repeating, e
2599 );
2600 return;
2601 }
2602 if !repeating {
2603 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2604 self.query_cache_for_hostname(&hostname, listener.clone());
2606 }
2607
2608 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2609 self.increase_counter(Counter::ResolveHostname, 1);
2610
2611 let now = current_time_millis();
2612 let next_time = now + u64::from(next_delay) * 1000;
2613 let max_delay = 60 * 60;
2614 let delay = cmp::min(next_delay * 2, max_delay);
2615
2616 if self
2618 .hostname_resolvers
2619 .get(&hostname)
2620 .and_then(|(_sender, timeout)| *timeout)
2621 .map(|timeout| next_time < timeout)
2622 .unwrap_or(true)
2623 {
2624 self.add_retransmission(
2625 next_time,
2626 Command::ResolveHostname(hostname, delay, listener, None),
2627 );
2628 }
2629 }
2630
2631 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2632 let pending_query = self.query_unresolved(&instance);
2633 let max_try = 3;
2634 if pending_query && try_count < max_try {
2635 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2638 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2639 }
2640 }
2641
2642 fn exec_command_unregister(
2643 &mut self,
2644 repeating: bool,
2645 fullname: String,
2646 resp_s: Sender<UnregisterStatus>,
2647 ) {
2648 let response = match self.my_services.remove_entry(&fullname) {
2649 None => {
2650 debug!("unregister: cannot find such service {}", &fullname);
2651 UnregisterStatus::NotFound
2652 }
2653 Some((_k, info)) => {
2654 let mut timers = Vec::new();
2655 let mut multicast_sent_trackers = HashSet::new();
2657
2658 for (intf, sock) in self.intf_socks.iter() {
2659 if let Some(tracker) = multicast_send_tracker(intf) {
2660 if multicast_sent_trackers.contains(&tracker) {
2661 continue; }
2663 multicast_sent_trackers.insert(tracker);
2664 }
2665 let packet = self.unregister_service(&info, intf, sock);
2666 if !repeating && !packet.is_empty() {
2668 let next_time = current_time_millis() + 120;
2669 self.retransmissions.push(ReRun {
2670 next_time,
2671 command: Command::UnregisterResend(packet, intf.clone()),
2672 });
2673 timers.push(next_time);
2674 }
2675 }
2676
2677 for t in timers {
2678 self.add_timer(t);
2679 }
2680
2681 self.increase_counter(Counter::Unregister, 1);
2682 UnregisterStatus::OK
2683 }
2684 };
2685 if let Err(e) = resp_s.send(response) {
2686 debug!("unregister: failed to send response: {}", e);
2687 }
2688 }
2689
2690 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2691 if let Some(sock) = self.intf_socks.get(&intf) {
2692 debug!("UnregisterResend from {}", &intf.ip());
2693 multicast_on_intf(&packet[..], &intf, sock);
2694 self.increase_counter(Counter::UnregisterResend, 1);
2695 }
2696 }
2697
2698 fn exec_command_stop_browse(&mut self, ty_domain: String) {
2699 match self.service_queriers.remove_entry(&ty_domain) {
2700 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2701 Some((ty, sender)) => {
2702 trace!("StopBrowse: removed queryer for {}", &ty);
2704 let mut i = 0;
2705 while i < self.retransmissions.len() {
2706 if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2707 if t == &ty {
2708 self.retransmissions.remove(i);
2709 trace!("StopBrowse: removed retransmission for {}", &ty);
2710 continue;
2711 }
2712 }
2713 i += 1;
2714 }
2715
2716 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2718 Ok(()) => trace!("Sent SearchStopped to the listener"),
2719 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2720 }
2721 }
2722 }
2723 }
2724
2725 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2726 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2727 trace!("StopResolve: removed queryer for {}", &host);
2729 let mut i = 0;
2730 while i < self.retransmissions.len() {
2731 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2732 if t == &host {
2733 self.retransmissions.remove(i);
2734 trace!("StopResolve: removed retransmission for {}", &host);
2735 continue;
2736 }
2737 }
2738 i += 1;
2739 }
2740
2741 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2743 Ok(()) => trace!("Sent SearchStopped to the listener"),
2744 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2745 }
2746 }
2747 }
2748
2749 fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2750 let Some(info) = self.my_services.get_mut(&fullname) else {
2751 trace!("announce: cannot find such service {}", &fullname);
2752 return;
2753 };
2754
2755 let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2756 return;
2757 };
2758
2759 let Some(sock) = self.intf_socks.get(&intf) else {
2760 return;
2761 };
2762
2763 if announce_service_on_intf(dns_registry, info, &intf, sock) {
2764 let mut hostname = info.get_hostname();
2765 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2766 hostname = new_name;
2767 }
2768 let service_name = match dns_registry.name_changes.get(&fullname) {
2769 Some(new_name) => new_name.to_string(),
2770 None => fullname,
2771 };
2772
2773 debug!("resend: announce service {} on {}", service_name, intf.ip());
2774
2775 notify_monitors(
2776 &mut self.monitors,
2777 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2778 );
2779 info.set_status(&intf, ServiceStatus::Announced);
2780 } else {
2781 debug!("register-resend should not fail");
2782 }
2783
2784 self.increase_counter(Counter::RegisterResend, 1);
2785 }
2786
2787 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2788 let now = current_time_millis();
2798 let expire_at = if repeating {
2799 None
2800 } else {
2801 Some(now + timeout.as_millis() as u64)
2802 };
2803
2804 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2806
2807 if !record_vec.is_empty() {
2808 let query_vec: Vec<(&str, RRType)> = record_vec
2809 .iter()
2810 .map(|(record, rr_type)| (record.as_str(), *rr_type))
2811 .collect();
2812 self.send_query_vec(&query_vec);
2813
2814 if let Some(new_expire) = expire_at {
2815 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2819 }
2820 }
2821 }
2822
2823 fn refresh_active_services(&mut self) {
2825 let mut query_ptr_count = 0;
2826 let mut query_srv_count = 0;
2827 let mut new_timers = HashSet::new();
2828 let mut query_addr_count = 0;
2829
2830 for (ty_domain, _sender) in self.service_queriers.iter() {
2831 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2832 if !refreshed_timers.is_empty() {
2833 trace!("sending refresh query for PTR: {}", ty_domain);
2834 self.send_query(ty_domain, RRType::PTR);
2835 query_ptr_count += 1;
2836 new_timers.extend(refreshed_timers);
2837 }
2838
2839 let (instances, timers) = self.cache.refresh_due_srv(ty_domain);
2840 for instance in instances.iter() {
2841 trace!("sending refresh query for SRV: {}", instance);
2842 self.send_query(instance, RRType::SRV);
2843 query_srv_count += 1;
2844 }
2845 new_timers.extend(timers);
2846 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2847 for hostname in hostnames.iter() {
2848 trace!("sending refresh queries for A and AAAA: {}", hostname);
2849 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2850 query_addr_count += 2;
2851 }
2852 new_timers.extend(timers);
2853 }
2854
2855 for timer in new_timers {
2856 self.add_timer(timer);
2857 }
2858
2859 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2860 self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
2861 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2862 }
2863}
2864
2865#[derive(Debug)]
2868pub enum ServiceEvent {
2869 SearchStarted(String),
2871
2872 ServiceFound(String, String),
2874
2875 ServiceResolved(ServiceInfo),
2877
2878 ServiceRemoved(String, String),
2880
2881 SearchStopped(String),
2883}
2884
2885#[derive(Debug)]
2888#[non_exhaustive]
2889pub enum HostnameResolutionEvent {
2890 SearchStarted(String),
2892 AddressesFound(String, HashSet<IpAddr>),
2894 AddressesRemoved(String, HashSet<IpAddr>),
2896 SearchTimeout(String),
2898 SearchStopped(String),
2900}
2901
2902#[derive(Clone, Debug)]
2905#[non_exhaustive]
2906pub enum DaemonEvent {
2907 Announce(String, String),
2909
2910 Error(Error),
2912
2913 IpAdd(IpAddr),
2915
2916 IpDel(IpAddr),
2918
2919 NameChange(DnsNameChange),
2922
2923 Respond(IpAddr),
2925}
2926
2927#[derive(Clone, Debug)]
2930pub struct DnsNameChange {
2931 pub original: String,
2933
2934 pub new_name: String,
2944
2945 pub rr_type: RRType,
2947
2948 pub intf_name: String,
2950}
2951
2952#[derive(Debug)]
2954enum Command {
2955 Browse(String, u32, Sender<ServiceEvent>),
2957
2958 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
2963
2964 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, Interface), UnregisterResend(Vec<u8>, Interface), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
2985
2986 GetStatus(Sender<DaemonStatus>),
2988
2989 Monitor(Sender<DaemonEvent>),
2991
2992 SetOption(DaemonOption),
2993
2994 Verify(String, Duration),
2999
3000 Exit(Sender<DaemonStatus>),
3001}
3002
3003impl fmt::Display for Command {
3004 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3005 match self {
3006 Self::Browse(_, _, _) => write!(f, "Command Browse"),
3007 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3008 Self::Exit(_) => write!(f, "Command Exit"),
3009 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3010 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3011 Self::Monitor(_) => write!(f, "Command Monitor"),
3012 Self::Register(_) => write!(f, "Command Register"),
3013 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3014 Self::SetOption(_) => write!(f, "Command SetOption"),
3015 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3016 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3017 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3018 Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3019 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3020 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3021 }
3022 }
3023}
3024
3025#[derive(Debug)]
3026enum DaemonOption {
3027 ServiceNameLenMax(u8),
3028 EnableInterface(Vec<IfKind>),
3029 DisableInterface(Vec<IfKind>),
3030 MulticastLoopV4(bool),
3031 MulticastLoopV6(bool),
3032}
3033
3034const DOMAIN_LEN: usize = "._tcp.local.".len();
3036
3037fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3039 if ty_domain.len() <= DOMAIN_LEN + 1 {
3040 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3042 }
3043
3044 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3046 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3047 }
3048 Ok(())
3049}
3050
3051fn check_domain_suffix(name: &str) -> Result<()> {
3053 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3054 return Err(e_fmt!(
3055 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3056 name
3057 ));
3058 }
3059
3060 Ok(())
3061}
3062
3063fn check_service_name(fullname: &str) -> Result<()> {
3071 check_domain_suffix(fullname)?;
3072
3073 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3074 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3075
3076 if &name[0..1] != "_" {
3077 return Err(e_fmt!("Service name must start with '_'"));
3078 }
3079
3080 let name = &name[1..];
3081
3082 if name.contains("--") {
3083 return Err(e_fmt!("Service name must not contain '--'"));
3084 }
3085
3086 if name.starts_with('-') || name.ends_with('-') {
3087 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3088 }
3089
3090 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3091 if ascii_count < 1 {
3092 return Err(e_fmt!(
3093 "Service name must contain at least one letter (eg: 'A-Za-z')"
3094 ));
3095 }
3096
3097 Ok(())
3098}
3099
3100fn check_hostname(hostname: &str) -> Result<()> {
3102 if !hostname.ends_with(".local.") {
3103 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3104 }
3105
3106 if hostname == ".local." {
3107 return Err(e_fmt!(
3108 "The part of the hostname before '.local.' cannot be empty"
3109 ));
3110 }
3111
3112 if hostname.len() > 255 {
3113 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3114 }
3115
3116 Ok(())
3117}
3118
3119fn call_service_listener(
3120 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3121 ty_domain: &str,
3122 event: ServiceEvent,
3123) {
3124 if let Some(listener) = listeners_map.get(ty_domain) {
3125 match listener.send(event) {
3126 Ok(()) => trace!("Sent event to listener successfully"),
3127 Err(e) => debug!("Failed to send event: {}", e),
3128 }
3129 }
3130}
3131
3132fn call_hostname_resolution_listener(
3133 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3134 hostname: &str,
3135 event: HostnameResolutionEvent,
3136) {
3137 if let Some(listener) = listeners_map.get(hostname).map(|(l, _)| l) {
3138 match listener.send(event) {
3139 Ok(()) => trace!("Sent event to listener successfully"),
3140 Err(e) => debug!("Failed to send event: {}", e),
3141 }
3142 }
3143}
3144
3145fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3148 if_addrs::get_if_addrs()
3149 .unwrap_or_default()
3150 .into_iter()
3151 .filter(|i| !i.is_loopback() || with_loopback)
3152 .collect()
3153}
3154
3155fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3157 let qtype = if out.is_query() { "query" } else { "response" };
3158 trace!(
3159 "send outgoing {}: {} questions {} answers {} authorities {} additional",
3160 qtype,
3161 out.questions().len(),
3162 out.answers_count(),
3163 out.authorities().len(),
3164 out.additionals().len()
3165 );
3166 let packet_list = out.to_data_on_wire();
3167 for packet in packet_list.iter() {
3168 multicast_on_intf(packet, intf, sock);
3169 }
3170 packet_list
3171}
3172
3173fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3175 if packet.len() > MAX_MSG_ABSOLUTE {
3176 debug!("Drop over-sized packet ({})", packet.len());
3177 return;
3178 }
3179
3180 let addr: SocketAddr = match intf.addr {
3181 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3182 if_addrs::IfAddr::V6(_) => {
3183 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3184 sock.set_scope_id(intf.index.unwrap_or(0)); sock.into()
3186 }
3187 };
3188
3189 send_packet(packet, addr, intf, socket);
3190}
3191
3192fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3194 match sock.send_to(packet, addr) {
3195 Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3196 Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3197 }
3198}
3199
3200fn valid_instance_name(name: &str) -> bool {
3204 name.split('.').count() >= 5
3205}
3206
3207fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3208 monitors.retain(|sender| {
3209 if let Err(e) = sender.try_send(event.clone()) {
3210 debug!("notify_monitors: try_send: {}", &e);
3211 if matches!(e, TrySendError::Disconnected(_)) {
3212 return false; }
3214 }
3215 true
3216 });
3217}
3218
3219fn prepare_announce(
3222 info: &ServiceInfo,
3223 intf: &Interface,
3224 dns_registry: &mut DnsRegistry,
3225) -> Option<DnsOutgoing> {
3226 let intf_addrs = info.get_addrs_on_intf(intf);
3227 if intf_addrs.is_empty() {
3228 trace!("No valid addrs to add on intf {:?}", &intf);
3229 return None;
3230 }
3231
3232 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3234 Some(new_name) => new_name,
3235 None => info.get_fullname(),
3236 };
3237
3238 debug!(
3239 "prepare to announce service {service_fullname} on {}: {}",
3240 &intf.name,
3241 &intf.ip()
3242 );
3243
3244 let mut probing_count = 0;
3245 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3246 let create_time = current_time_millis() + fastrand::u64(0..250);
3247
3248 out.add_answer_at_time(
3249 DnsPointer::new(
3250 info.get_type(),
3251 RRType::PTR,
3252 CLASS_IN,
3253 info.get_other_ttl(),
3254 service_fullname.to_string(),
3255 ),
3256 0,
3257 );
3258
3259 if let Some(sub) = info.get_subtype() {
3260 trace!("Adding subdomain {}", sub);
3261 out.add_answer_at_time(
3262 DnsPointer::new(
3263 sub,
3264 RRType::PTR,
3265 CLASS_IN,
3266 info.get_other_ttl(),
3267 service_fullname.to_string(),
3268 ),
3269 0,
3270 );
3271 }
3272
3273 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3275 Some(new_name) => new_name.to_string(),
3276 None => info.get_hostname().to_string(),
3277 };
3278
3279 let mut srv = DnsSrv::new(
3280 info.get_fullname(),
3281 CLASS_IN | CLASS_CACHE_FLUSH,
3282 info.get_host_ttl(),
3283 info.get_priority(),
3284 info.get_weight(),
3285 info.get_port(),
3286 hostname,
3287 );
3288
3289 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3290 srv.get_record_mut().set_new_name(new_name.to_string());
3291 }
3292
3293 if !info.requires_probe()
3294 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3295 {
3296 out.add_answer_at_time(srv, 0);
3297 } else {
3298 probing_count += 1;
3299 }
3300
3301 let mut txt = DnsTxt::new(
3304 info.get_fullname(),
3305 CLASS_IN | CLASS_CACHE_FLUSH,
3306 info.get_other_ttl(),
3307 info.generate_txt(),
3308 );
3309
3310 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3311 txt.get_record_mut().set_new_name(new_name.to_string());
3312 }
3313
3314 if !info.requires_probe()
3315 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3316 {
3317 out.add_answer_at_time(txt, 0);
3318 } else {
3319 probing_count += 1;
3320 }
3321
3322 let hostname = info.get_hostname();
3325 for address in intf_addrs {
3326 let mut dns_addr = DnsAddress::new(
3327 hostname,
3328 ip_address_rr_type(&address),
3329 CLASS_IN | CLASS_CACHE_FLUSH,
3330 info.get_host_ttl(),
3331 address,
3332 );
3333
3334 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3335 dns_addr.get_record_mut().set_new_name(new_name.to_string());
3336 }
3337
3338 if !info.requires_probe()
3339 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3340 {
3341 out.add_answer_at_time(dns_addr, 0);
3342 } else {
3343 probing_count += 1;
3344 }
3345 }
3346
3347 if probing_count > 0 {
3348 return None;
3349 }
3350
3351 Some(out)
3352}
3353
3354fn announce_service_on_intf(
3357 dns_registry: &mut DnsRegistry,
3358 info: &ServiceInfo,
3359 intf: &Interface,
3360 sock: &MioUdpSocket,
3361) -> bool {
3362 if let Some(out) = prepare_announce(info, intf, dns_registry) {
3363 send_dns_outgoing(&out, intf, sock);
3364 return true;
3365 }
3366 false
3367}
3368
3369fn name_change(original: &str) -> String {
3377 let mut parts: Vec<_> = original.split('.').collect();
3378 let Some(first_part) = parts.get_mut(0) else {
3379 return format!("{original} (2)");
3380 };
3381
3382 let mut new_name = format!("{} (2)", first_part);
3383
3384 if let Some(paren_pos) = first_part.rfind(" (") {
3386 if let Some(end_paren) = first_part[paren_pos..].find(')') {
3388 let absolute_end_pos = paren_pos + end_paren;
3389 if absolute_end_pos == first_part.len() - 1 {
3391 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3394 let base_name = &first_part[..paren_pos];
3395 new_name = format!("{} ({})", base_name, number + 1)
3396 }
3397 }
3398 }
3399 }
3400
3401 *first_part = &new_name;
3402 parts.join(".")
3403}
3404
3405fn hostname_change(original: &str) -> String {
3413 let mut parts: Vec<_> = original.split('.').collect();
3414 let Some(first_part) = parts.get_mut(0) else {
3415 return format!("{original}-2");
3416 };
3417
3418 let mut new_name = format!("{}-2", first_part);
3419
3420 if let Some(hyphen_pos) = first_part.rfind('-') {
3422 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3424 let base_name = &first_part[..hyphen_pos];
3425 new_name = format!("{}-{}", base_name, number + 1);
3426 }
3427 }
3428
3429 *first_part = &new_name;
3430 parts.join(".")
3431}
3432
3433fn add_answer_with_additionals(
3434 out: &mut DnsOutgoing,
3435 msg: &DnsIncoming,
3436 service: &ServiceInfo,
3437 intf: &Interface,
3438 dns_registry: &DnsRegistry,
3439) {
3440 let intf_addrs = service.get_addrs_on_intf(intf);
3441 if intf_addrs.is_empty() {
3442 trace!("No addrs on LAN of intf {:?}", intf);
3443 return;
3444 }
3445
3446 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3448 Some(new_name) => new_name,
3449 None => service.get_fullname(),
3450 };
3451
3452 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3453 Some(new_name) => new_name,
3454 None => service.get_hostname(),
3455 };
3456
3457 let ptr_added = out.add_answer(
3458 msg,
3459 DnsPointer::new(
3460 service.get_type(),
3461 RRType::PTR,
3462 CLASS_IN,
3463 service.get_other_ttl(),
3464 service_fullname.to_string(),
3465 ),
3466 );
3467
3468 if !ptr_added {
3469 trace!("answer was not added for msg {:?}", msg);
3470 return;
3471 }
3472
3473 if let Some(sub) = service.get_subtype() {
3474 trace!("Adding subdomain {}", sub);
3475 out.add_additional_answer(DnsPointer::new(
3476 sub,
3477 RRType::PTR,
3478 CLASS_IN,
3479 service.get_other_ttl(),
3480 service_fullname.to_string(),
3481 ));
3482 }
3483
3484 out.add_additional_answer(DnsSrv::new(
3487 service_fullname,
3488 CLASS_IN | CLASS_CACHE_FLUSH,
3489 service.get_host_ttl(),
3490 service.get_priority(),
3491 service.get_weight(),
3492 service.get_port(),
3493 hostname.to_string(),
3494 ));
3495
3496 out.add_additional_answer(DnsTxt::new(
3497 service_fullname,
3498 CLASS_IN | CLASS_CACHE_FLUSH,
3499 service.get_host_ttl(),
3500 service.generate_txt(),
3501 ));
3502
3503 for address in intf_addrs {
3504 out.add_additional_answer(DnsAddress::new(
3505 hostname,
3506 ip_address_rr_type(&address),
3507 CLASS_IN | CLASS_CACHE_FLUSH,
3508 service.get_host_ttl(),
3509 address,
3510 ));
3511 }
3512}
3513
3514#[cfg(test)]
3515mod tests {
3516 use super::{
3517 check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3518 name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3519 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3520 MDNS_PORT,
3521 };
3522 use crate::{
3523 dns_parser::{DnsOutgoing, DnsPointer, RRType, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE},
3524 service_daemon::check_hostname,
3525 };
3526 use std::{
3527 net::{SocketAddr, SocketAddrV4},
3528 time::Duration,
3529 };
3530 use test_log::test;
3531
3532 #[test]
3533 fn test_socketaddr_print() {
3534 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3535 let print = format!("{}", addr);
3536 assert_eq!(print, "224.0.0.251:5353");
3537 }
3538
3539 #[test]
3540 fn test_instance_name() {
3541 assert!(valid_instance_name("my-laser._printer._tcp.local."));
3542 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3543 assert!(!valid_instance_name("_printer._tcp.local."));
3544 }
3545
3546 #[test]
3547 fn test_check_service_name_length() {
3548 let result = check_service_name_length("_tcp", 100);
3549 assert!(result.is_err());
3550 if let Err(e) = result {
3551 println!("{}", e);
3552 }
3553 }
3554
3555 #[test]
3556 fn test_check_hostname() {
3557 for hostname in &[
3559 "my_host.local.",
3560 &("A".repeat(255 - ".local.".len()) + ".local."),
3561 ] {
3562 let result = check_hostname(hostname);
3563 assert!(result.is_ok());
3564 }
3565
3566 for hostname in &[
3568 "my_host.local",
3569 ".local.",
3570 &("A".repeat(256 - ".local.".len()) + ".local."),
3571 ] {
3572 let result = check_hostname(hostname);
3573 assert!(result.is_err());
3574 if let Err(e) = result {
3575 println!("{}", e);
3576 }
3577 }
3578 }
3579
3580 #[test]
3581 fn test_check_domain_suffix() {
3582 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3583 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3584 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3585 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3586 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3587 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3588 }
3589
3590 #[test]
3591 fn test_service_with_temporarily_invalidated_ptr() {
3592 let d = ServiceDaemon::new().expect("Failed to create daemon");
3594
3595 let service = "_test_inval_ptr._udp.local.";
3596 let host_name = "my_host_tmp_invalidated_ptr.local.";
3597 let intfs: Vec<_> = my_ip_interfaces(false);
3598 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3599 let port = 5201;
3600 let my_service =
3601 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3602 .expect("invalid service info")
3603 .enable_addr_auto();
3604 let result = d.register(my_service.clone());
3605 assert!(result.is_ok());
3606
3607 let browse_chan = d.browse(service).unwrap();
3609 let timeout = Duration::from_secs(2);
3610 let mut resolved = false;
3611
3612 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3613 match event {
3614 ServiceEvent::ServiceResolved(info) => {
3615 resolved = true;
3616 println!("Resolved a service of {}", &info.get_fullname());
3617 break;
3618 }
3619 e => {
3620 println!("Received event {:?}", e);
3621 }
3622 }
3623 }
3624
3625 assert!(resolved);
3626
3627 println!("Stopping browse of {}", service);
3628 d.stop_browse(service).unwrap();
3631
3632 let mut stopped = false;
3637 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3638 match event {
3639 ServiceEvent::SearchStopped(_) => {
3640 stopped = true;
3641 println!("Stopped browsing service");
3642 break;
3643 }
3644 e => {
3648 println!("Received event {:?}", e);
3649 }
3650 }
3651 }
3652
3653 assert!(stopped);
3654
3655 let invalidate_ptr_packet = DnsPointer::new(
3657 my_service.get_type(),
3658 RRType::PTR,
3659 CLASS_IN,
3660 0,
3661 my_service.get_fullname().to_string(),
3662 );
3663
3664 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3665 packet_buffer.add_additional_answer(invalidate_ptr_packet);
3666
3667 for intf in intfs {
3668 let sock = new_socket_bind(&intf, true).unwrap();
3669 send_dns_outgoing(&packet_buffer, &intf, &sock);
3670 }
3671
3672 println!(
3673 "Sent PTR record invalidation. Starting second browse for {}",
3674 service
3675 );
3676
3677 let browse_chan = d.browse(service).unwrap();
3679
3680 resolved = false;
3681 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3682 match event {
3683 ServiceEvent::ServiceResolved(info) => {
3684 resolved = true;
3685 println!("Resolved a service of {}", &info.get_fullname());
3686 break;
3687 }
3688 e => {
3689 println!("Received event {:?}", e);
3690 }
3691 }
3692 }
3693
3694 assert!(resolved);
3695 d.shutdown().unwrap();
3696 }
3697
3698 #[test]
3699 fn test_expired_srv() {
3700 let service_type = "_expired-srv._udp.local.";
3702 let instance = "test_instance";
3703 let host_name = "expired_srv_host.local.";
3704 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3705 .unwrap()
3706 .enable_addr_auto();
3707 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
3712
3713 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3715 let result = mdns_server.register(my_service);
3716 assert!(result.is_ok());
3717
3718 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3719 let browse_chan = mdns_client.browse(service_type).unwrap();
3720 let timeout = Duration::from_secs(2);
3721 let mut resolved = false;
3722
3723 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3724 match event {
3725 ServiceEvent::ServiceResolved(info) => {
3726 resolved = true;
3727 println!("Resolved a service of {}", &info.get_fullname());
3728 break;
3729 }
3730 _ => {}
3731 }
3732 }
3733
3734 assert!(resolved);
3735
3736 mdns_server.shutdown().unwrap();
3738
3739 let expire_timeout = Duration::from_secs(new_ttl as u64);
3741 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3742 match event {
3743 ServiceEvent::ServiceRemoved(service_type, full_name) => {
3744 println!("Service removed: {}: {}", &service_type, &full_name);
3745 break;
3746 }
3747 _ => {}
3748 }
3749 }
3750 }
3751
3752 #[test]
3753 fn test_hostname_resolution_address_removed() {
3754 let server = ServiceDaemon::new().expect("Failed to create server");
3756 let hostname = "addr_remove_host._tcp.local.";
3757 let service_ip_addr = my_ip_interfaces(false)
3758 .iter()
3759 .find(|iface| iface.ip().is_ipv4())
3760 .map(|iface| iface.ip())
3761 .unwrap();
3762
3763 let mut my_service = ServiceInfo::new(
3764 "_host_res_test._tcp.local.",
3765 "my_instance",
3766 hostname,
3767 &service_ip_addr,
3768 1234,
3769 None,
3770 )
3771 .expect("invalid service info");
3772
3773 let addr_ttl = 2;
3775 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
3778
3779 let client = ServiceDaemon::new().expect("Failed to create client");
3781 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3782 let resolved = loop {
3783 match event_receiver.recv() {
3784 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3785 assert!(found_hostname == hostname);
3786 assert!(addresses.contains(&service_ip_addr));
3787 println!("address found: {:?}", &addresses);
3788 break true;
3789 }
3790 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3791 Ok(_event) => {}
3792 Err(_) => break false,
3793 }
3794 };
3795
3796 assert!(resolved);
3797
3798 server.shutdown().unwrap();
3800
3801 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3803 let removed = loop {
3804 match event_receiver.recv_timeout(timeout) {
3805 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3806 assert!(removed_host == hostname);
3807 assert!(addresses.contains(&service_ip_addr));
3808
3809 println!(
3810 "address removed: hostname: {} addresses: {:?}",
3811 &hostname, &addresses
3812 );
3813 break true;
3814 }
3815 Ok(_event) => {}
3816 Err(_) => {
3817 break false;
3818 }
3819 }
3820 };
3821
3822 assert!(removed);
3823
3824 client.shutdown().unwrap();
3825 }
3826
3827 #[test]
3828 fn test_refresh_ptr() {
3829 let service_type = "_refresh-ptr._udp.local.";
3831 let instance = "test_instance";
3832 let host_name = "refresh_ptr_host.local.";
3833 let service_ip_addr = my_ip_interfaces(false)
3834 .iter()
3835 .find(|iface| iface.ip().is_ipv4())
3836 .map(|iface| iface.ip())
3837 .unwrap();
3838
3839 let mut my_service = ServiceInfo::new(
3840 service_type,
3841 instance,
3842 host_name,
3843 &service_ip_addr,
3844 5023,
3845 None,
3846 )
3847 .unwrap();
3848
3849 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
3851
3852 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3854 let result = mdns_server.register(my_service);
3855 assert!(result.is_ok());
3856
3857 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3858 let browse_chan = mdns_client.browse(service_type).unwrap();
3859 let timeout = Duration::from_millis(1500); let mut resolved = false;
3861
3862 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3864 match event {
3865 ServiceEvent::ServiceResolved(info) => {
3866 resolved = true;
3867 println!("Resolved a service of {}", &info.get_fullname());
3868 break;
3869 }
3870 _ => {}
3871 }
3872 }
3873
3874 assert!(resolved);
3875
3876 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
3878 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3879 println!("event: {:?}", &event);
3880 }
3881
3882 let metrics_chan = mdns_client.get_metrics().unwrap();
3884 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
3885 let refresh_counter = metrics["cache-refresh-ptr"];
3886 assert_eq!(refresh_counter, 1);
3887
3888 mdns_server.shutdown().unwrap();
3890 mdns_client.shutdown().unwrap();
3891 }
3892
3893 #[test]
3894 fn test_name_change() {
3895 assert_eq!(name_change("foo.local."), "foo (2).local.");
3896 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
3897 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
3898 assert_eq!(name_change("foo"), "foo (2)");
3899 assert_eq!(name_change("foo (2)"), "foo (3)");
3900 assert_eq!(name_change(""), " (2)");
3901
3902 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
3907
3908 #[test]
3909 fn test_hostname_change() {
3910 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
3911 assert_eq!(hostname_change("foo"), "foo-2");
3912 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
3913 assert_eq!(hostname_change("foo-9"), "foo-10");
3914 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
3915 }
3916}