1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38 FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{
42 split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43 },
44 Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51 cmp::{self, Reverse},
52 collections::{BinaryHeap, HashMap, HashSet},
53 fmt,
54 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55 str, thread,
56 time::Duration,
57 vec,
58};
59
60pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79#[derive(Debug)]
81pub enum UnregisterStatus {
82 OK,
84 NotFound,
86}
87
88#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92 Running,
94
95 Shutdown,
97}
98
99#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103 Register,
104 RegisterResend,
105 Unregister,
106 UnregisterResend,
107 Browse,
108 ResolveHostname,
109 Respond,
110 CacheRefreshPTR,
111 CacheRefreshSrvTxt,
112 CacheRefreshAddr,
113 KnownAnswerSuppression,
114 CachedPTR,
115 CachedSRV,
116 CachedAddr,
117 CachedTxt,
118 CachedNSec,
119 CachedSubtype,
120 DnsRegistryProbe,
121 DnsRegistryActive,
122 DnsRegistryTimer,
123 DnsRegistryNameChange,
124 Timer,
125}
126
127impl fmt::Display for Counter {
128 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
129 match self {
130 Self::Register => write!(f, "register"),
131 Self::RegisterResend => write!(f, "register-resend"),
132 Self::Unregister => write!(f, "unregister"),
133 Self::UnregisterResend => write!(f, "unregister-resend"),
134 Self::Browse => write!(f, "browse"),
135 Self::ResolveHostname => write!(f, "resolve-hostname"),
136 Self::Respond => write!(f, "respond"),
137 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
138 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
139 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
140 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
141 Self::CachedPTR => write!(f, "cached-ptr"),
142 Self::CachedSRV => write!(f, "cached-srv"),
143 Self::CachedAddr => write!(f, "cached-addr"),
144 Self::CachedTxt => write!(f, "cached-txt"),
145 Self::CachedNSec => write!(f, "cached-nsec"),
146 Self::CachedSubtype => write!(f, "cached-subtype"),
147 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
148 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
149 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
150 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
151 Self::Timer => write!(f, "timer"),
152 }
153 }
154}
155
156pub type Metrics = HashMap<String, i64>;
159
160const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
166pub struct ServiceDaemon {
167 sender: Sender<Command>,
169
170 signal_addr: SocketAddr,
176}
177
178impl ServiceDaemon {
179 pub fn new() -> Result<Self> {
184 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
187
188 let signal_sock = UdpSocket::bind(signal_addr)
189 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
190
191 let signal_addr = signal_sock
193 .local_addr()
194 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
195
196 signal_sock
198 .set_nonblocking(true)
199 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
200
201 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
202
203 let (sender, receiver) = bounded(100);
204
205 let mio_sock = MioUdpSocket::from_std(signal_sock);
207 thread::Builder::new()
208 .name("mDNS_daemon".to_string())
209 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
210 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
211
212 Ok(Self {
213 sender,
214 signal_addr,
215 })
216 }
217
218 fn send_cmd(&self, cmd: Command) -> Result<()> {
221 let cmd_name = cmd.to_string();
222
223 self.sender.try_send(cmd).map_err(|e| match e {
225 TrySendError::Full(_) => Error::Again,
226 e => e_fmt!("flume::channel::send failed: {}", e),
227 })?;
228
229 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
231 let socket = UdpSocket::bind(addr)
232 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
233 socket
234 .send_to(cmd_name.as_bytes(), self.signal_addr)
235 .map_err(|e| {
236 e_fmt!(
237 "signal socket send_to {} ({}) failed: {}",
238 self.signal_addr,
239 cmd_name,
240 e
241 )
242 })?;
243
244 Ok(())
245 }
246
247 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
258 check_domain_suffix(service_type)?;
259
260 let (resp_s, resp_r) = bounded(10);
261 self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
262 Ok(resp_r)
263 }
264
265 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
270 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
271 }
272
273 pub fn resolve_hostname(
281 &self,
282 hostname: &str,
283 timeout: Option<u64>,
284 ) -> Result<Receiver<HostnameResolutionEvent>> {
285 check_hostname(hostname)?;
286 let (resp_s, resp_r) = bounded(10);
287 self.send_cmd(Command::ResolveHostname(
288 hostname.to_string(),
289 1,
290 resp_s,
291 timeout,
292 ))?;
293 Ok(resp_r)
294 }
295
296 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
301 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
302 }
303
304 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
312 check_service_name(service_info.get_fullname())?;
313 check_hostname(service_info.get_hostname())?;
314
315 self.send_cmd(Command::Register(service_info))
316 }
317
318 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
326 let (resp_s, resp_r) = bounded(1);
327 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
328 Ok(resp_r)
329 }
330
331 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
335 let (resp_s, resp_r) = bounded(100);
336 self.send_cmd(Command::Monitor(resp_s))?;
337 Ok(resp_r)
338 }
339
340 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
345 let (resp_s, resp_r) = bounded(1);
346 self.send_cmd(Command::Exit(resp_s))?;
347 Ok(resp_r)
348 }
349
350 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
356 let (resp_s, resp_r) = bounded(1);
357
358 if self.sender.is_disconnected() {
359 resp_s
360 .send(DaemonStatus::Shutdown)
361 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
362 } else {
363 self.send_cmd(Command::GetStatus(resp_s))?;
364 }
365
366 Ok(resp_r)
367 }
368
369 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
374 let (resp_s, resp_r) = bounded(1);
375 self.send_cmd(Command::GetMetrics(resp_s))?;
376 Ok(resp_r)
377 }
378
379 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
386 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
389 return Err(Error::Msg(format!(
390 "service name length max {} is too large",
391 len_max
392 )));
393 }
394
395 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
396 }
397
398 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
404 let interval_in_millis = interval_in_secs as u64 * 1000;
405 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
406 interval_in_millis,
407 )))
408 }
409
410 pub fn get_ip_check_interval(&self) -> Result<u32> {
412 let (resp_s, resp_r) = bounded(1);
413 self.send_cmd(Command::GetOption(resp_s))?;
414
415 let option = resp_r
416 .recv_timeout(Duration::from_secs(10))
417 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
418 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
419 Ok(ip_check_interval_in_secs as u32)
420 }
421
422 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
429 let if_kind_vec = if_kind.into_vec();
430 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
431 if_kind_vec.kinds,
432 )))
433 }
434
435 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
442 let if_kind_vec = if_kind.into_vec();
443 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
444 if_kind_vec.kinds,
445 )))
446 }
447
448 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
464 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
465 }
466
467 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
483 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
484 }
485
486 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
499 self.send_cmd(Command::Verify(instance_fullname, timeout))
500 }
501
502 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
503 let zc = Zeroconf::new(signal_sock, poller);
504
505 if let Some(cmd) = Self::run(zc, receiver) {
506 match cmd {
507 Command::Exit(resp_s) => {
508 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
511 debug!("exit: failed to send response of shutdown: {}", e);
512 }
513 }
514 _ => {
515 debug!("Unexpected command: {:?}", cmd);
516 }
517 }
518 }
519 }
520
521 fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
530 if let Err(e) = zc.poller.registry().register(
532 &mut zc.signal_sock,
533 mio::Token(SIGNAL_SOCK_EVENT_KEY),
534 mio::Interest::READABLE,
535 ) {
536 debug!("failed to add signal socket to the poller: {}", e);
537 return None;
538 }
539
540 for (intf, sock) in zc.intf_socks.iter_mut() {
542 let key =
543 Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
544
545 if let Err(e) =
546 zc.poller
547 .registry()
548 .register(sock, mio::Token(key), mio::Interest::READABLE)
549 {
550 debug!("add socket of {:?} to poller: {e}", intf);
551 return None;
552 }
553 }
554
555 let mut next_ip_check = if zc.ip_check_interval > 0 {
557 current_time_millis() + zc.ip_check_interval
558 } else {
559 0
560 };
561
562 if next_ip_check > 0 {
563 zc.add_timer(next_ip_check);
564 }
565
566 let mut events = mio::Events::with_capacity(1024);
569 loop {
570 let now = current_time_millis();
571
572 let earliest_timer = zc.peek_earliest_timer();
573 let timeout = earliest_timer.map(|timer| {
574 let millis = if timer > now { timer - now } else { 1 };
576 Duration::from_millis(millis)
577 });
578
579 events.clear();
581 match zc.poller.poll(&mut events, timeout) {
582 Ok(_) => zc.handle_poller_events(&events),
583 Err(e) => debug!("failed to select from sockets: {}", e),
584 }
585
586 let now = current_time_millis();
587
588 zc.pop_timers_till(now);
590
591 for hostname in zc
593 .hostname_resolvers
594 .clone()
595 .into_iter()
596 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
597 .map(|(hostname, _)| hostname)
598 {
599 trace!("hostname resolver timeout for {}", &hostname);
600 call_hostname_resolution_listener(
601 &zc.hostname_resolvers,
602 &hostname,
603 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
604 );
605 call_hostname_resolution_listener(
606 &zc.hostname_resolvers,
607 &hostname,
608 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
609 );
610 zc.hostname_resolvers.remove(&hostname);
611 }
612
613 while let Ok(command) = receiver.try_recv() {
615 if matches!(command, Command::Exit(_)) {
616 zc.status = DaemonStatus::Shutdown;
617 return Some(command);
618 }
619 zc.exec_command(command, false);
620 }
621
622 let mut i = 0;
624 while i < zc.retransmissions.len() {
625 if now >= zc.retransmissions[i].next_time {
626 let rerun = zc.retransmissions.remove(i);
627 zc.exec_command(rerun.command, true);
628 } else {
629 i += 1;
630 }
631 }
632
633 zc.refresh_active_services();
635
636 let mut query_count = 0;
638 for (hostname, _sender) in zc.hostname_resolvers.iter() {
639 for (hostname, ip_addr) in
640 zc.cache.refresh_due_hostname_resolutions(hostname).iter()
641 {
642 zc.send_query(hostname, ip_address_rr_type(ip_addr));
643 query_count += 1;
644 }
645 }
646
647 zc.increase_counter(Counter::CacheRefreshAddr, query_count);
648
649 let now = current_time_millis();
651
652 let expired_services = zc.cache.evict_expired_services(now);
654 zc.notify_service_removal(expired_services);
655
656 let expired_addrs = zc.cache.evict_expired_addr(now);
658 for (hostname, addrs) in expired_addrs {
659 call_hostname_resolution_listener(
660 &zc.hostname_resolvers,
661 &hostname,
662 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
663 );
664 let instances = zc.cache.get_instances_on_host(&hostname);
665 let instance_set: HashSet<String> = instances.into_iter().collect();
666 zc.resolve_updated_instances(&instance_set);
667 }
668
669 zc.probing_handler();
671
672 if now >= next_ip_check && next_ip_check > 0 {
674 next_ip_check = now + zc.ip_check_interval;
675 zc.add_timer(next_ip_check);
676
677 zc.check_ip_changes();
678 }
679 }
680 }
681}
682
683fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
685 let intf_ip = &intf.ip();
688 match intf_ip {
689 IpAddr::V4(ip) => {
690 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
691 let sock = new_socket(addr.into(), true)?;
692
693 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
695 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
696
697 sock.set_multicast_if_v4(ip)
699 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
700
701 if !should_loop {
702 sock.set_multicast_loop_v4(false)
703 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
704 }
705
706 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
708 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
709 for packet in test_packets {
710 sock.send_to(&packet, &multicast_addr)
711 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
712 }
713 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
714 }
715 IpAddr::V6(ip) => {
716 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
717 let sock = new_socket(addr.into(), true)?;
718
719 sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
721 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
722
723 sock.set_multicast_if_v6(intf.index.unwrap_or(0))
725 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
726
727 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
732 }
733 }
734}
735
736fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
739 let domain = match addr {
740 SocketAddr::V4(_) => socket2::Domain::IPV4,
741 SocketAddr::V6(_) => socket2::Domain::IPV6,
742 };
743
744 let fd = Socket::new(domain, socket2::Type::DGRAM, None)
745 .map_err(|e| e_fmt!("create socket failed: {}", e))?;
746
747 fd.set_reuse_address(true)
748 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
749 #[cfg(unix)] fd.set_reuse_port(true)
751 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
752
753 if non_block {
754 fd.set_nonblocking(true)
755 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
756 }
757
758 fd.bind(&addr.into())
759 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
760
761 trace!("new socket bind to {}", &addr);
762 Ok(fd)
763}
764
765struct ReRun {
767 next_time: u64,
769 command: Command,
770}
771
772#[derive(Debug, Eq, Hash, PartialEq)]
774enum IpVersion {
775 V4,
776 V6,
777}
778
779#[derive(Debug, Eq, Hash, PartialEq)]
781struct MulticastSendTracker {
782 intf_index: u32,
783 ip_version: IpVersion,
784}
785
786fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
788 match intf.index {
789 Some(index) => {
790 let ip_ver = match intf.addr {
791 IfAddr::V4(_) => IpVersion::V4,
792 IfAddr::V6(_) => IpVersion::V6,
793 };
794 Some(MulticastSendTracker {
795 intf_index: index,
796 ip_version: ip_ver,
797 })
798 }
799 None => None,
800 }
801}
802
803#[derive(Debug, Clone)]
807#[non_exhaustive]
808pub enum IfKind {
809 All,
811
812 IPv4,
814
815 IPv6,
817
818 Name(String),
820
821 Addr(IpAddr),
823
824 LoopbackV4,
829
830 LoopbackV6,
832}
833
834impl IfKind {
835 fn matches(&self, intf: &Interface) -> bool {
837 match self {
838 Self::All => true,
839 Self::IPv4 => intf.ip().is_ipv4(),
840 Self::IPv6 => intf.ip().is_ipv6(),
841 Self::Name(ifname) => ifname == &intf.name,
842 Self::Addr(addr) => addr == &intf.ip(),
843 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
844 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
845 }
846 }
847}
848
849impl From<&str> for IfKind {
852 fn from(val: &str) -> Self {
853 Self::Name(val.to_string())
854 }
855}
856
857impl From<&String> for IfKind {
858 fn from(val: &String) -> Self {
859 Self::Name(val.to_string())
860 }
861}
862
863impl From<IpAddr> for IfKind {
865 fn from(val: IpAddr) -> Self {
866 Self::Addr(val)
867 }
868}
869
870pub struct IfKindVec {
872 kinds: Vec<IfKind>,
873}
874
875pub trait IntoIfKindVec {
877 fn into_vec(self) -> IfKindVec;
878}
879
880impl<T: Into<IfKind>> IntoIfKindVec for T {
881 fn into_vec(self) -> IfKindVec {
882 let if_kind: IfKind = self.into();
883 IfKindVec {
884 kinds: vec![if_kind],
885 }
886 }
887}
888
889impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
890 fn into_vec(self) -> IfKindVec {
891 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
892 IfKindVec { kinds }
893 }
894}
895
896struct IfSelection {
898 if_kind: IfKind,
900
901 selected: bool,
903}
904
905struct Zeroconf {
907 intf_socks: HashMap<Interface, MioUdpSocket>,
909
910 poll_ids: HashMap<usize, Interface>,
912
913 poll_id_count: usize,
915
916 my_services: HashMap<String, ServiceInfo>,
918
919 cache: DnsCache,
921
922 dns_registry_map: HashMap<Interface, DnsRegistry>,
924
925 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
936
937 counters: Metrics,
938
939 poller: Poll,
941
942 monitors: Vec<Sender<DaemonEvent>>,
944
945 service_name_len_max: u8,
947
948 ip_check_interval: u64,
950
951 if_selections: Vec<IfSelection>,
953
954 signal_sock: MioUdpSocket,
956
957 timers: BinaryHeap<Reverse<u64>>,
963
964 status: DaemonStatus,
965
966 pending_resolves: HashSet<String>,
968
969 resolved: HashSet<String>,
971
972 multicast_loop_v4: bool,
973
974 multicast_loop_v6: bool,
975}
976
977impl Zeroconf {
978 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
979 let my_ifaddrs = my_ip_interfaces(false);
981
982 let mut intf_socks = HashMap::new();
986 let mut dns_registry_map = HashMap::new();
987
988 for intf in my_ifaddrs {
989 let sock = match new_socket_bind(&intf, true) {
990 Ok(s) => s,
991 Err(e) => {
992 trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
993 continue;
994 }
995 };
996
997 dns_registry_map.insert(intf.clone(), DnsRegistry::new());
998
999 intf_socks.insert(intf, sock);
1000 }
1001
1002 let monitors = Vec::new();
1003 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1004 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1005
1006 let timers = BinaryHeap::new();
1007
1008 let if_selections = vec![
1010 IfSelection {
1011 if_kind: IfKind::LoopbackV4,
1012 selected: false,
1013 },
1014 IfSelection {
1015 if_kind: IfKind::LoopbackV6,
1016 selected: false,
1017 },
1018 ];
1019
1020 let status = DaemonStatus::Running;
1021
1022 Self {
1023 intf_socks,
1024 poll_ids: HashMap::new(),
1025 poll_id_count: 0,
1026 my_services: HashMap::new(),
1027 cache: DnsCache::new(),
1028 dns_registry_map,
1029 hostname_resolvers: HashMap::new(),
1030 service_queriers: HashMap::new(),
1031 retransmissions: Vec::new(),
1032 counters: HashMap::new(),
1033 poller,
1034 monitors,
1035 service_name_len_max,
1036 ip_check_interval,
1037 if_selections,
1038 signal_sock,
1039 timers,
1040 status,
1041 pending_resolves: HashSet::new(),
1042 resolved: HashSet::new(),
1043 multicast_loop_v4: true,
1044 multicast_loop_v6: true,
1045 }
1046 }
1047
1048 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1049 match daemon_opt {
1050 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1051 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1052 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1053 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1054 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1055 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1056 }
1057 }
1058
1059 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1060 for if_kind in kinds {
1061 self.if_selections.push(IfSelection {
1062 if_kind,
1063 selected: true,
1064 });
1065 }
1066
1067 self.apply_intf_selections(my_ip_interfaces(true));
1068 }
1069
1070 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1071 for if_kind in kinds {
1072 self.if_selections.push(IfSelection {
1073 if_kind,
1074 selected: false,
1075 });
1076 }
1077
1078 self.apply_intf_selections(my_ip_interfaces(true));
1079 }
1080
1081 fn set_multicast_loop_v4(&mut self, on: bool) {
1082 for (_, sock) in self.intf_socks.iter_mut() {
1083 if let Err(e) = sock.set_multicast_loop_v4(on) {
1084 debug!("failed to set multicast loop v4: {e}");
1085 }
1086 }
1087 }
1088
1089 fn set_multicast_loop_v6(&mut self, on: bool) {
1090 for (_, sock) in self.intf_socks.iter_mut() {
1091 if let Err(e) = sock.set_multicast_loop_v6(on) {
1092 debug!("failed to set multicast loop v6: {e}");
1093 }
1094 }
1095 }
1096
1097 fn notify_monitors(&mut self, event: DaemonEvent) {
1098 self.monitors.retain(|sender| {
1100 if let Err(e) = sender.try_send(event.clone()) {
1101 debug!("notify_monitors: try_send: {}", &e);
1102 if matches!(e, TrySendError::Disconnected(_)) {
1103 return false; }
1105 }
1106 true
1107 });
1108 }
1109
1110 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1112 for (_, service_info) in self.my_services.iter_mut() {
1113 if service_info.is_addr_auto() {
1114 service_info.remove_ipaddr(addr);
1115 }
1116 }
1117 }
1118
1119 fn add_poll(&mut self, intf: Interface) -> usize {
1121 Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1122 }
1123
1124 fn add_poll_impl(
1128 poll_ids: &mut HashMap<usize, Interface>,
1129 poll_id_count: &mut usize,
1130 intf: Interface,
1131 ) -> usize {
1132 let key = *poll_id_count;
1133 *poll_id_count += 1;
1134 let _ = (*poll_ids).insert(key, intf);
1135 key
1136 }
1137
1138 fn add_timer(&mut self, next_time: u64) {
1139 self.timers.push(Reverse(next_time));
1140 }
1141
1142 fn peek_earliest_timer(&self) -> Option<u64> {
1143 self.timers.peek().map(|Reverse(v)| *v)
1144 }
1145
1146 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1147 self.timers.pop().map(|Reverse(v)| v)
1148 }
1149
1150 fn pop_timers_till(&mut self, now: u64) {
1152 while let Some(Reverse(v)) = self.timers.peek() {
1153 if *v > now {
1154 break;
1155 }
1156 self.timers.pop();
1157 }
1158 }
1159
1160 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1162 let intf_count = interfaces.len();
1163 let mut intf_selections = vec![true; intf_count];
1164
1165 for selection in self.if_selections.iter() {
1167 for i in 0..intf_count {
1169 if selection.if_kind.matches(&interfaces[i]) {
1170 intf_selections[i] = selection.selected;
1171 }
1172 }
1173 }
1174
1175 let mut selected_addrs = HashSet::new();
1176 for i in 0..intf_count {
1177 if intf_selections[i] {
1178 selected_addrs.insert(interfaces[i].addr.ip());
1179 }
1180 }
1181
1182 selected_addrs
1183 }
1184
1185 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1190 let intf_count = interfaces.len();
1192 let mut intf_selections = vec![true; intf_count];
1193
1194 for selection in self.if_selections.iter() {
1196 for i in 0..intf_count {
1198 if selection.if_kind.matches(&interfaces[i]) {
1199 intf_selections[i] = selection.selected;
1200 }
1201 }
1202 }
1203
1204 for (idx, intf) in interfaces.into_iter().enumerate() {
1206 if intf_selections[idx] {
1207 if !self.intf_socks.contains_key(&intf) {
1209 debug!("apply_intf_selections: add {:?}", &intf.ip());
1210 self.add_new_interface(intf);
1211 }
1212 } else {
1213 if let Some(mut sock) = self.intf_socks.remove(&intf) {
1215 match self.poller.registry().deregister(&mut sock) {
1216 Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1217 Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1218 }
1219
1220 self.poll_ids.retain(|_, v| v != &intf);
1222
1223 self.cache.remove_addrs_on_disabled_intf(&intf);
1225 }
1226 }
1227 }
1228 }
1229
1230 fn check_ip_changes(&mut self) {
1232 let my_ifaddrs = my_ip_interfaces(true);
1234
1235 let poll_ids = &mut self.poll_ids;
1236 let poller = &mut self.poller;
1237 let deleted_addrs = self
1239 .intf_socks
1240 .iter_mut()
1241 .filter_map(|(intf, sock)| {
1242 if !my_ifaddrs.contains(intf) {
1243 if let Err(e) = poller.registry().deregister(sock) {
1244 debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1245 }
1246 poll_ids.retain(|_, v| v != intf);
1248 Some(intf.ip())
1249 } else {
1250 None
1251 }
1252 })
1253 .collect::<Vec<IpAddr>>();
1254
1255 for ip in deleted_addrs.iter() {
1257 self.del_addr_in_my_services(ip);
1258 self.notify_monitors(DaemonEvent::IpDel(*ip));
1259 }
1260
1261 self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1263
1264 self.apply_intf_selections(my_ifaddrs);
1266 }
1267
1268 fn add_new_interface(&mut self, intf: Interface) {
1269 let new_ip = intf.ip();
1271 let should_loop = if new_ip.is_ipv4() {
1272 self.multicast_loop_v4
1273 } else {
1274 self.multicast_loop_v6
1275 };
1276 let mut sock = match new_socket_bind(&intf, should_loop) {
1277 Ok(s) => s,
1278 Err(e) => {
1279 debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1280 return;
1281 }
1282 };
1283
1284 let key = self.add_poll(intf.clone());
1286 if let Err(e) =
1287 self.poller
1288 .registry()
1289 .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1290 {
1291 debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1292 return;
1293 }
1294
1295 debug!("add new interface {}: {new_ip}", intf.name);
1296 let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1297 Some(registry) => registry,
1298 None => self
1299 .dns_registry_map
1300 .entry(intf.clone())
1301 .or_insert_with(DnsRegistry::new),
1302 };
1303
1304 for (_, service_info) in self.my_services.iter_mut() {
1305 if service_info.is_addr_auto() {
1306 service_info.insert_ipaddr(new_ip);
1307
1308 if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1309 debug!(
1310 "Announce service {} on {}",
1311 service_info.get_fullname(),
1312 intf.ip()
1313 );
1314 service_info.set_status(&intf, ServiceStatus::Announced);
1315 } else {
1316 for timer in dns_registry.new_timers.drain(..) {
1317 self.timers.push(Reverse(timer));
1318 }
1319 service_info.set_status(&intf, ServiceStatus::Probing);
1320 }
1321 }
1322 }
1323
1324 self.intf_socks.insert(intf, sock);
1325
1326 self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1328 }
1329
1330 fn register_service(&mut self, mut info: ServiceInfo) {
1339 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1341 debug!("check_service_name_length: {}", &e);
1342 self.notify_monitors(DaemonEvent::Error(e));
1343 return;
1344 }
1345
1346 if info.is_addr_auto() {
1347 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1348 for addr in selected_addrs {
1349 info.insert_ipaddr(addr);
1350 }
1351 }
1352
1353 debug!("register service {:?}", &info);
1354
1355 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1356 if !outgoing_addrs.is_empty() {
1357 self.notify_monitors(DaemonEvent::Announce(
1358 info.get_fullname().to_string(),
1359 format!("{:?}", &outgoing_addrs),
1360 ));
1361 }
1362
1363 let service_fullname = info.get_fullname().to_lowercase();
1366 self.my_services.insert(service_fullname, info);
1367 }
1368
1369 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1372 let mut outgoing_addrs = Vec::new();
1373 let mut multicast_sent_trackers = HashSet::new();
1375
1376 let mut outgoing_intfs = Vec::new();
1377
1378 for (intf, sock) in self.intf_socks.iter() {
1379 if let Some(tracker) = multicast_send_tracker(intf) {
1380 if multicast_sent_trackers.contains(&tracker) {
1381 continue; }
1383 }
1384
1385 let dns_registry = match self.dns_registry_map.get_mut(intf) {
1386 Some(registry) => registry,
1387 None => self
1388 .dns_registry_map
1389 .entry(intf.clone())
1390 .or_insert_with(DnsRegistry::new),
1391 };
1392
1393 if announce_service_on_intf(dns_registry, info, intf, sock) {
1394 if let Some(tracker) = multicast_send_tracker(intf) {
1395 multicast_sent_trackers.insert(tracker);
1396 }
1397 outgoing_addrs.push(intf.ip());
1398 outgoing_intfs.push(intf.clone());
1399
1400 debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1401
1402 info.set_status(intf, ServiceStatus::Announced);
1403 } else {
1404 for timer in dns_registry.new_timers.drain(..) {
1405 self.timers.push(Reverse(timer));
1406 }
1407 info.set_status(intf, ServiceStatus::Probing);
1408 }
1409 }
1410
1411 let next_time = current_time_millis() + 1000;
1415 for intf in outgoing_intfs {
1416 self.add_retransmission(
1417 next_time,
1418 Command::RegisterResend(info.get_fullname().to_string(), intf),
1419 );
1420 }
1421
1422 outgoing_addrs
1423 }
1424
1425 fn probing_handler(&mut self) {
1427 let now = current_time_millis();
1428
1429 for (intf, sock) in self.intf_socks.iter() {
1430 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1431 continue;
1432 };
1433
1434 let mut expired_probe_names = Vec::new();
1435 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1436
1437 for (name, probe) in dns_registry.probing.iter_mut() {
1438 if now >= probe.next_send {
1439 if probe.expired(now) {
1440 expired_probe_names.push(name.clone());
1442 } else {
1443 out.add_question(name, RRType::ANY);
1444
1445 for record in probe.records.iter() {
1453 out.add_authority(record.clone());
1454 }
1455
1456 probe.update_next_send(now);
1457
1458 self.timers.push(Reverse(probe.next_send));
1460 }
1461 }
1462 }
1463
1464 if !out.questions().is_empty() {
1466 debug!("sending out probing of {} questions", out.questions().len());
1467 send_dns_outgoing(&out, intf, sock);
1468 }
1469
1470 let mut waiting_services = HashSet::new();
1471
1472 for name in expired_probe_names {
1473 let Some(probe) = dns_registry.probing.remove(&name) else {
1474 continue;
1475 };
1476
1477 for record in probe.records.iter() {
1479 if let Some(new_name) = record.get_record().get_new_name() {
1480 dns_registry
1481 .name_changes
1482 .insert(name.clone(), new_name.to_string());
1483
1484 let event = DnsNameChange {
1485 original: record.get_record().get_original_name().to_string(),
1486 new_name: new_name.to_string(),
1487 rr_type: record.get_type(),
1488 intf_name: intf.name.to_string(),
1489 };
1490 notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1491 }
1492 }
1493
1494 debug!(
1496 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1497 probe.records.len(),
1498 probe.waiting_services.len(),
1499 );
1500
1501 if !probe.records.is_empty() {
1503 match dns_registry.active.get_mut(&name) {
1504 Some(records) => {
1505 records.extend(probe.records);
1506 }
1507 None => {
1508 dns_registry.active.insert(name, probe.records);
1509 }
1510 }
1511
1512 waiting_services.extend(probe.waiting_services);
1513 }
1514 }
1515
1516 for service_name in waiting_services {
1518 debug!(
1519 "try to announce service {service_name} on intf {}",
1520 intf.ip()
1521 );
1522 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1524 if info.get_status(intf) == ServiceStatus::Announced {
1525 debug!("service {} already announced", info.get_fullname());
1526 continue;
1527 }
1528
1529 if announce_service_on_intf(dns_registry, info, intf, sock) {
1530 let next_time = now + 1000;
1531 let command =
1532 Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1533 self.retransmissions.push(ReRun { next_time, command });
1534 self.timers.push(Reverse(next_time));
1535
1536 let fullname = match dns_registry.name_changes.get(&service_name) {
1537 Some(new_name) => new_name.to_string(),
1538 None => service_name.to_string(),
1539 };
1540
1541 let mut hostname = info.get_hostname();
1542 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1543 hostname = new_name;
1544 }
1545
1546 debug!("wake up: announce service {} on {}", fullname, intf.ip());
1547 notify_monitors(
1548 &mut self.monitors,
1549 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1550 );
1551
1552 info.set_status(intf, ServiceStatus::Announced);
1553 }
1554 }
1555 }
1556 }
1557 }
1558
1559 fn unregister_service(
1560 &self,
1561 info: &ServiceInfo,
1562 intf: &Interface,
1563 sock: &MioUdpSocket,
1564 ) -> Vec<u8> {
1565 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1566 out.add_answer_at_time(
1567 DnsPointer::new(
1568 info.get_type(),
1569 RRType::PTR,
1570 CLASS_IN,
1571 0,
1572 info.get_fullname().to_string(),
1573 ),
1574 0,
1575 );
1576
1577 if let Some(sub) = info.get_subtype() {
1578 trace!("Adding subdomain {}", sub);
1579 out.add_answer_at_time(
1580 DnsPointer::new(
1581 sub,
1582 RRType::PTR,
1583 CLASS_IN,
1584 0,
1585 info.get_fullname().to_string(),
1586 ),
1587 0,
1588 );
1589 }
1590
1591 out.add_answer_at_time(
1592 DnsSrv::new(
1593 info.get_fullname(),
1594 CLASS_IN | CLASS_CACHE_FLUSH,
1595 0,
1596 info.get_priority(),
1597 info.get_weight(),
1598 info.get_port(),
1599 info.get_hostname().to_string(),
1600 ),
1601 0,
1602 );
1603 out.add_answer_at_time(
1604 DnsTxt::new(
1605 info.get_fullname(),
1606 CLASS_IN | CLASS_CACHE_FLUSH,
1607 0,
1608 info.generate_txt(),
1609 ),
1610 0,
1611 );
1612
1613 for address in info.get_addrs_on_intf(intf) {
1614 out.add_answer_at_time(
1615 DnsAddress::new(
1616 info.get_hostname(),
1617 ip_address_rr_type(&address),
1618 CLASS_IN | CLASS_CACHE_FLUSH,
1619 0,
1620 address,
1621 intf.into(),
1622 ),
1623 0,
1624 );
1625 }
1626
1627 send_dns_outgoing(&out, intf, sock).remove(0)
1629 }
1630
1631 fn add_hostname_resolver(
1635 &mut self,
1636 hostname: String,
1637 listener: Sender<HostnameResolutionEvent>,
1638 timeout: Option<u64>,
1639 ) {
1640 let real_timeout = timeout.map(|t| current_time_millis() + t);
1641 self.hostname_resolvers
1642 .insert(hostname.to_lowercase(), (listener, real_timeout));
1643 if let Some(t) = real_timeout {
1644 self.add_timer(t);
1645 }
1646 }
1647
1648 fn send_query(&self, name: &str, qtype: RRType) {
1650 self.send_query_vec(&[(name, qtype)]);
1651 }
1652
1653 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1655 trace!("Sending query questions: {:?}", questions);
1656 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1657 let now = current_time_millis();
1658
1659 for (name, qtype) in questions {
1660 out.add_question(name, *qtype);
1661
1662 for record in self.cache.get_known_answers(name, *qtype, now) {
1663 trace!("add known answer: {:?}", record);
1671 let mut new_record = record.clone();
1672 new_record.get_record_mut().update_ttl(now);
1673 out.add_answer_box(new_record);
1674 }
1675 }
1676
1677 let mut multicast_sent_trackers = HashSet::new();
1679 for (intf, sock) in self.intf_socks.iter() {
1680 if let Some(tracker) = multicast_send_tracker(intf) {
1681 if multicast_sent_trackers.contains(&tracker) {
1682 continue; }
1684 multicast_sent_trackers.insert(tracker);
1685 }
1686 send_dns_outgoing(&out, intf, sock);
1687 }
1688 }
1689
1690 fn handle_read(&mut self, intf: &Interface) -> bool {
1695 let sock = match self.intf_socks.get_mut(intf) {
1696 Some(if_sock) => if_sock,
1697 None => return false,
1698 };
1699 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1700
1701 let sz = match sock.recv(&mut buf) {
1708 Ok(sz) => sz,
1709 Err(e) => {
1710 if e.kind() != std::io::ErrorKind::WouldBlock {
1711 debug!("listening socket read failed: {}", e);
1712 }
1713 return false;
1714 }
1715 };
1716
1717 trace!("received {} bytes at IP: {}", sz, intf.ip());
1718
1719 if sz == 0 {
1721 debug!("socket {:?} was likely shutdown", &sock);
1722 if let Err(e) = self.poller.registry().deregister(sock) {
1723 debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1724 }
1725
1726 let should_loop = if intf.ip().is_ipv4() {
1728 self.multicast_loop_v4
1729 } else {
1730 self.multicast_loop_v6
1731 };
1732 match new_socket_bind(intf, should_loop) {
1733 Ok(new_sock) => {
1734 trace!("reset socket for IP {}", intf.ip());
1735 self.intf_socks.insert(intf.clone(), new_sock);
1736 }
1737 Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1738 }
1739
1740 return false;
1741 }
1742
1743 buf.truncate(sz); match DnsIncoming::new(buf, intf.into()) {
1746 Ok(msg) => {
1747 if msg.is_query() {
1748 self.handle_query(msg, intf);
1749 } else if msg.is_response() {
1750 self.handle_response(msg, intf);
1751 } else {
1752 debug!("Invalid message: not query and not response");
1753 }
1754 }
1755 Err(e) => debug!("Invalid incoming DNS message: {}", e),
1756 }
1757
1758 true
1759 }
1760
1761 fn query_unresolved(&mut self, instance: &str) -> bool {
1763 if !valid_instance_name(instance) {
1764 trace!("instance name {} not valid", instance);
1765 return false;
1766 }
1767
1768 if let Some(records) = self.cache.get_srv(instance) {
1769 for record in records {
1770 if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1771 if self.cache.get_addr(srv.host()).is_none() {
1772 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1773 return true;
1774 }
1775 }
1776 }
1777 } else {
1778 self.send_query(instance, RRType::ANY);
1779 return true;
1780 }
1781
1782 false
1783 }
1784
1785 fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1788 let mut resolved: HashSet<String> = HashSet::new();
1789 let mut unresolved: HashSet<String> = HashSet::new();
1790
1791 if let Some(records) = self.cache.get_ptr(ty_domain) {
1792 for record in records.iter() {
1793 if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1794 let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1795 Ok(ok) => ok,
1796 Err(err) => {
1797 debug!("Error while creating service info from cache: {}", err);
1798 continue;
1799 }
1800 };
1801
1802 match sender.send(ServiceEvent::ServiceFound(
1803 ty_domain.to_string(),
1804 ptr.alias().to_string(),
1805 )) {
1806 Ok(()) => debug!("send service found {}", ptr.alias()),
1807 Err(e) => {
1808 debug!("failed to send service found: {}", e);
1809 continue;
1810 }
1811 }
1812
1813 if info.is_ready() {
1814 resolved.insert(ptr.alias().to_string());
1815 match sender.send(ServiceEvent::ServiceResolved(info)) {
1816 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1817 Err(e) => debug!("failed to send service resolved: {}", e),
1818 }
1819 } else {
1820 unresolved.insert(ptr.alias().to_string());
1821 }
1822 }
1823 }
1824 }
1825
1826 for instance in resolved.drain() {
1827 self.pending_resolves.remove(&instance);
1828 self.resolved.insert(instance);
1829 }
1830
1831 for instance in unresolved.drain() {
1832 self.add_pending_resolve(instance);
1833 }
1834 }
1835
1836 fn query_cache_for_hostname(
1839 &mut self,
1840 hostname: &str,
1841 sender: Sender<HostnameResolutionEvent>,
1842 ) {
1843 let addresses_map = self.cache.get_addresses_for_host(hostname);
1844 for (name, addresses) in addresses_map {
1845 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1846 Ok(()) => trace!("sent hostname addresses found"),
1847 Err(e) => debug!("failed to send hostname addresses found: {}", e),
1848 }
1849 }
1850 }
1851
1852 fn add_pending_resolve(&mut self, instance: String) {
1853 if !self.pending_resolves.contains(&instance) {
1854 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1855 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1856 self.pending_resolves.insert(instance);
1857 }
1858 }
1859
1860 fn create_service_info_from_cache(
1861 &self,
1862 ty_domain: &str,
1863 fullname: &str,
1864 ) -> Result<ServiceInfo> {
1865 let my_name = {
1866 let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1867 name.strip_suffix('.').unwrap_or(name).to_string()
1868 };
1869
1870 let now = current_time_millis();
1871 let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1872
1873 if let Some(subtype) = self.cache.get_subtype(fullname) {
1875 trace!(
1876 "ty_domain: {} found subtype {} for instance: {}",
1877 ty_domain,
1878 subtype,
1879 fullname
1880 );
1881 if info.get_subtype().is_none() {
1882 info.set_subtype(subtype.clone());
1883 }
1884 }
1885
1886 if let Some(records) = self.cache.get_srv(fullname) {
1888 if let Some(answer) = records.first() {
1889 if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1890 info.set_hostname(dns_srv.host().to_string());
1891 info.set_port(dns_srv.port());
1892 }
1893 }
1894 }
1895
1896 if let Some(records) = self.cache.get_txt(fullname) {
1898 if let Some(record) = records.first() {
1899 if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1900 info.set_properties_from_txt(dns_txt.text());
1901 }
1902 }
1903 }
1904
1905 if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1907 for answer in records.iter() {
1908 if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1909 if dns_a.get_record().is_expired(now) {
1910 trace!("Addr expired: {}", dns_a.address());
1911 } else {
1912 info.insert_ipaddr(dns_a.address());
1913 }
1914 }
1915 }
1916 }
1917
1918 Ok(info)
1919 }
1920
1921 fn handle_poller_events(&mut self, events: &mio::Events) {
1922 for ev in events.iter() {
1923 trace!("event received with key {:?}", ev.token());
1924 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1925 self.signal_sock_drain();
1927
1928 if let Err(e) = self.poller.registry().reregister(
1929 &mut self.signal_sock,
1930 ev.token(),
1931 mio::Interest::READABLE,
1932 ) {
1933 debug!("failed to modify poller for signal socket: {}", e);
1934 }
1935 continue; }
1937
1938 let intf = match self.poll_ids.get(&ev.token().0) {
1940 Some(interface) => interface.clone(),
1941 None => {
1942 debug!("Ip for event key {} not found", ev.token().0);
1943 break;
1944 }
1945 };
1946 while self.handle_read(&intf) {}
1947
1948 if let Some(sock) = self.intf_socks.get_mut(&intf) {
1950 if let Err(e) =
1951 self.poller
1952 .registry()
1953 .reregister(sock, ev.token(), mio::Interest::READABLE)
1954 {
1955 debug!("modify poller for interface {:?}: {}", &intf, e);
1956 break;
1957 }
1958 }
1959 }
1960 }
1961
1962 fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1965 trace!(
1966 "handle_response: {} answers {} authorities {} additionals",
1967 msg.answers().len(),
1968 &msg.authorities().len(),
1969 &msg.num_additionals()
1970 );
1971 let now = current_time_millis();
1972
1973 let mut record_predicate = |record: &DnsRecordBox| {
1975 if !record.get_record().is_expired(now) {
1976 return true;
1977 }
1978
1979 debug!("record is expired, removing it from cache.");
1980 if self.cache.remove(record) {
1981 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1983 call_service_listener(
1984 &self.service_queriers,
1985 dns_ptr.get_name(),
1986 ServiceEvent::ServiceRemoved(
1987 dns_ptr.get_name().to_string(),
1988 dns_ptr.alias().to_string(),
1989 ),
1990 );
1991 }
1992 }
1993 false
1994 };
1995 msg.answers_mut().retain(&mut record_predicate);
1996 msg.authorities_mut().retain(&mut record_predicate);
1997 msg.additionals_mut().retain(&mut record_predicate);
1998
1999 self.conflict_handler(&msg, intf);
2001
2002 let mut is_for_us = true; for answer in msg.answers() {
2009 if answer.get_type() == RRType::PTR {
2010 if self.service_queriers.contains_key(answer.get_name()) {
2011 is_for_us = true;
2012 break; } else {
2014 is_for_us = false;
2015 }
2016 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2017 let answer_lowercase = answer.get_name().to_lowercase();
2019 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2020 is_for_us = true;
2021 break; }
2023 }
2024 }
2025
2026 struct InstanceChange {
2028 ty: RRType, name: String, }
2031
2032 let mut changes = Vec::new();
2040 let mut timers = Vec::new();
2041 for record in msg.all_records() {
2042 match self
2043 .cache
2044 .add_or_update(intf, record, &mut timers, is_for_us)
2045 {
2046 Some((dns_record, true)) => {
2047 timers.push(dns_record.get_record().get_expire_time());
2048 timers.push(dns_record.get_record().get_refresh_time());
2049
2050 let ty = dns_record.get_type();
2051 let name = dns_record.get_name();
2052 if ty == RRType::PTR {
2053 if self.service_queriers.contains_key(name) {
2054 timers.push(dns_record.get_record().get_refresh_time());
2055 }
2056
2057 if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2059 call_service_listener(
2060 &self.service_queriers,
2061 name,
2062 ServiceEvent::ServiceFound(
2063 name.to_string(),
2064 dns_ptr.alias().to_string(),
2065 ),
2066 );
2067 changes.push(InstanceChange {
2068 ty,
2069 name: dns_ptr.alias().to_string(),
2070 });
2071 }
2072 } else {
2073 changes.push(InstanceChange {
2074 ty,
2075 name: name.to_string(),
2076 });
2077 }
2078 }
2079 Some((dns_record, false)) => {
2080 timers.push(dns_record.get_record().get_expire_time());
2081 timers.push(dns_record.get_record().get_refresh_time());
2082 }
2083 _ => {}
2084 }
2085 }
2086
2087 for t in timers {
2089 self.add_timer(t);
2090 }
2091
2092 for change in changes
2094 .iter()
2095 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2096 {
2097 let addr_map = self.cache.get_addresses_for_host(&change.name);
2098 for (name, addresses) in addr_map {
2099 call_hostname_resolution_listener(
2100 &self.hostname_resolvers,
2101 &change.name,
2102 HostnameResolutionEvent::AddressesFound(name, addresses),
2103 )
2104 }
2105 }
2106
2107 let mut updated_instances = HashSet::new();
2109 for update in changes {
2110 match update.ty {
2111 RRType::PTR | RRType::SRV | RRType::TXT => {
2112 updated_instances.insert(update.name);
2113 }
2114 RRType::A | RRType::AAAA => {
2115 let instances = self.cache.get_instances_on_host(&update.name);
2116 updated_instances.extend(instances);
2117 }
2118 _ => {}
2119 }
2120 }
2121
2122 self.resolve_updated_instances(&updated_instances);
2123 }
2124
2125 fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2126 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2127 return;
2128 };
2129
2130 for answer in msg.answers().iter() {
2131 let mut new_records = Vec::new();
2132
2133 let name = answer.get_name();
2134 let Some(probe) = dns_registry.probing.get_mut(name) else {
2135 continue;
2136 };
2137
2138 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2140 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2141 if !valid_ip_on_intf(&answer_addr.address(), intf) {
2142 debug!(
2143 "conflict handler: answer addr {:?} not in the subnet of {:?}",
2144 answer_addr, intf
2145 );
2146 continue;
2147 }
2148 }
2149
2150 let any_match = probe.records.iter().any(|r| {
2153 r.get_type() == answer.get_type()
2154 && r.get_class() == answer.get_class()
2155 && r.rrdata_match(answer.as_ref())
2156 });
2157 if any_match {
2158 continue; }
2160 }
2161
2162 probe.records.retain(|record| {
2163 if record.get_type() == answer.get_type()
2164 && record.get_class() == answer.get_class()
2165 && !record.rrdata_match(answer.as_ref())
2166 {
2167 debug!(
2168 "found conflict name: '{name}' record: {}: {} PEER: {}",
2169 record.get_type(),
2170 record.rdata_print(),
2171 answer.rdata_print()
2172 );
2173
2174 let mut new_record = record.clone();
2177 let new_name = match record.get_type() {
2178 RRType::A => hostname_change(name),
2179 RRType::AAAA => hostname_change(name),
2180 _ => name_change(name),
2181 };
2182 new_record.get_record_mut().set_new_name(new_name);
2183 new_records.push(new_record);
2184 return false; }
2186
2187 true
2188 });
2189
2190 let create_time = current_time_millis() + fastrand::u64(0..250);
2197
2198 let waiting_services = probe.waiting_services.clone();
2199
2200 for record in new_records {
2201 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2202 self.timers.push(Reverse(create_time));
2203 }
2204
2205 dns_registry.name_changes.insert(
2207 record.get_record().get_original_name().to_string(),
2208 record.get_name().to_string(),
2209 );
2210
2211 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2212 Some(p) => p,
2213 None => {
2214 let new_probe = dns_registry
2215 .probing
2216 .entry(record.get_name().to_string())
2217 .or_insert_with(|| {
2218 debug!("conflict handler: new probe of {}", record.get_name());
2219 Probe::new(create_time)
2220 });
2221 self.timers.push(Reverse(new_probe.next_send));
2222 new_probe
2223 }
2224 };
2225
2226 debug!(
2227 "insert record with new name '{}' {} into probe",
2228 record.get_name(),
2229 record.get_type()
2230 );
2231 new_probe.insert_record(record);
2232
2233 new_probe.waiting_services.extend(waiting_services.clone());
2234 }
2235 }
2236 }
2237
2238 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2245 let mut resolved: HashSet<String> = HashSet::new();
2246 let mut unresolved: HashSet<String> = HashSet::new();
2247 let mut removed_instances = HashMap::new();
2248
2249 for (ty_domain, records) in self.cache.all_ptr().iter() {
2250 if !self.service_queriers.contains_key(ty_domain) {
2251 continue;
2253 }
2254
2255 for record in records.iter() {
2256 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2257 if updated_instances.contains(dns_ptr.alias()) {
2258 if let Ok(info) =
2259 self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2260 {
2261 if info.is_ready() {
2262 debug!("call queriers to resolve {}", dns_ptr.alias());
2263 resolved.insert(dns_ptr.alias().to_string());
2264 call_service_listener(
2265 &self.service_queriers,
2266 ty_domain,
2267 ServiceEvent::ServiceResolved(info),
2268 );
2269 } else {
2270 if self.resolved.remove(dns_ptr.alias()) {
2271 removed_instances
2272 .entry(ty_domain.to_string())
2273 .or_insert_with(HashSet::new)
2274 .insert(dns_ptr.alias().to_string());
2275 }
2276 unresolved.insert(dns_ptr.alias().to_string());
2277 }
2278 }
2279 }
2280 }
2281 }
2282 }
2283
2284 for instance in resolved.drain() {
2285 self.pending_resolves.remove(&instance);
2286 self.resolved.insert(instance);
2287 }
2288
2289 for instance in unresolved.drain() {
2290 self.add_pending_resolve(instance);
2291 }
2292
2293 self.notify_service_removal(removed_instances);
2294 }
2295
2296 fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2298 let sock = match self.intf_socks.get(intf) {
2299 Some(sock) => sock,
2300 None => return,
2301 };
2302 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2303
2304 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2307
2308 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2309 debug!("missing dns registry for intf {}", intf.ip());
2310 return;
2311 };
2312
2313 for question in msg.questions().iter() {
2314 trace!("query question: {:?}", &question);
2315
2316 let qtype = question.entry_type();
2317
2318 if qtype == RRType::PTR {
2319 for service in self.my_services.values() {
2320 if service.get_status(intf) != ServiceStatus::Announced {
2321 continue;
2322 }
2323
2324 if question.entry_name() == service.get_type()
2325 || service
2326 .get_subtype()
2327 .as_ref()
2328 .is_some_and(|v| v == question.entry_name())
2329 {
2330 add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2331 } else if question.entry_name() == META_QUERY {
2332 let ptr_added = out.add_answer(
2333 &msg,
2334 DnsPointer::new(
2335 question.entry_name(),
2336 RRType::PTR,
2337 CLASS_IN,
2338 service.get_other_ttl(),
2339 service.get_type().to_string(),
2340 ),
2341 );
2342 if !ptr_added {
2343 trace!("answer was not added for meta-query {:?}", &question);
2344 }
2345 }
2346 }
2347 } else {
2348 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2350 let probe_name = question.entry_name();
2351
2352 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2353 let now = current_time_millis();
2354
2355 if probe.start_time < now {
2359 let incoming_records: Vec<_> = msg
2360 .authorities()
2361 .iter()
2362 .filter(|r| r.get_name() == probe_name)
2363 .collect();
2364
2365 probe.tiebreaking(&incoming_records, now, probe_name);
2366 }
2367 }
2368 }
2369
2370 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2371 for service in self.my_services.values() {
2372 if service.get_status(intf) != ServiceStatus::Announced {
2373 continue;
2374 }
2375
2376 let service_hostname =
2377 match dns_registry.name_changes.get(service.get_hostname()) {
2378 Some(new_name) => new_name,
2379 None => service.get_hostname(),
2380 };
2381
2382 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2383 let intf_addrs = service.get_addrs_on_intf(intf);
2384 if intf_addrs.is_empty()
2385 && (qtype == RRType::A || qtype == RRType::AAAA)
2386 {
2387 let t = match qtype {
2388 RRType::A => "TYPE_A",
2389 RRType::AAAA => "TYPE_AAAA",
2390 _ => "invalid_type",
2391 };
2392 trace!(
2393 "Cannot find valid addrs for {} response on intf {:?}",
2394 t,
2395 &intf
2396 );
2397 return;
2398 }
2399 for address in intf_addrs {
2400 out.add_answer(
2401 &msg,
2402 DnsAddress::new(
2403 service_hostname,
2404 ip_address_rr_type(&address),
2405 CLASS_IN | CLASS_CACHE_FLUSH,
2406 service.get_host_ttl(),
2407 address,
2408 intf.into(),
2409 ),
2410 );
2411 }
2412 }
2413 }
2414 }
2415
2416 let query_name = question.entry_name().to_lowercase();
2417 let service_opt = self
2418 .my_services
2419 .iter()
2420 .find(|(k, _v)| {
2421 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2422 Some(new_name) => new_name,
2423 None => k,
2424 };
2425 service_name == &query_name
2426 })
2427 .map(|(_, v)| v);
2428
2429 let Some(service) = service_opt else {
2430 continue;
2431 };
2432
2433 if service.get_status(intf) != ServiceStatus::Announced {
2434 continue;
2435 }
2436
2437 add_answer_of_service(&mut out, &msg, question.entry_name(), service, qtype, intf);
2438 }
2439 }
2440
2441 if !out.answers_count() > 0 {
2442 out.set_id(msg.id());
2443 send_dns_outgoing(&out, intf, sock);
2444
2445 self.increase_counter(Counter::Respond, 1);
2446 self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2447 }
2448
2449 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2450 }
2451
2452 fn increase_counter(&mut self, counter: Counter, count: i64) {
2454 let key = counter.to_string();
2455 match self.counters.get_mut(&key) {
2456 Some(v) => *v += count,
2457 None => {
2458 self.counters.insert(key, count);
2459 }
2460 }
2461 }
2462
2463 fn set_counter(&mut self, counter: Counter, count: i64) {
2465 let key = counter.to_string();
2466 self.counters.insert(key, count);
2467 }
2468
2469 fn signal_sock_drain(&self) {
2470 let mut signal_buf = [0; 1024];
2471
2472 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2474 trace!(
2475 "signal socket recvd: {}",
2476 String::from_utf8_lossy(&signal_buf[0..sz])
2477 );
2478 }
2479 }
2480
2481 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2482 self.retransmissions.push(ReRun { next_time, command });
2483 self.add_timer(next_time);
2484 }
2485
2486 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2488 for (ty_domain, sender) in self.service_queriers.iter() {
2489 if let Some(instances) = expired.get(ty_domain) {
2490 for instance_name in instances {
2491 let event = ServiceEvent::ServiceRemoved(
2492 ty_domain.to_string(),
2493 instance_name.to_string(),
2494 );
2495 match sender.send(event) {
2496 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2497 Err(e) => debug!("Failed to send event: {}", e),
2498 }
2499 }
2500 }
2501 }
2502 }
2503
2504 fn exec_command(&mut self, command: Command, repeating: bool) {
2508 match command {
2509 Command::Browse(ty, next_delay, listener) => {
2510 self.exec_command_browse(repeating, ty, next_delay, listener);
2511 }
2512
2513 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2514 self.exec_command_resolve_hostname(
2515 repeating, hostname, next_delay, listener, timeout,
2516 );
2517 }
2518
2519 Command::Register(service_info) => {
2520 self.register_service(service_info);
2521 self.increase_counter(Counter::Register, 1);
2522 }
2523
2524 Command::RegisterResend(fullname, intf) => {
2525 trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2526 self.exec_command_register_resend(fullname, intf);
2527 }
2528
2529 Command::Unregister(fullname, resp_s) => {
2530 trace!("unregister service {} repeat {}", &fullname, &repeating);
2531 self.exec_command_unregister(repeating, fullname, resp_s);
2532 }
2533
2534 Command::UnregisterResend(packet, ip) => {
2535 self.exec_command_unregister_resend(packet, ip);
2536 }
2537
2538 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2539
2540 Command::StopResolveHostname(hostname) => {
2541 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2542 }
2543
2544 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2545
2546 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2547
2548 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2549 Ok(()) => trace!("Sent status to the client"),
2550 Err(e) => debug!("Failed to send status: {}", e),
2551 },
2552
2553 Command::Monitor(resp_s) => {
2554 self.monitors.push(resp_s);
2555 }
2556
2557 Command::SetOption(daemon_opt) => {
2558 self.process_set_option(daemon_opt);
2559 }
2560
2561 Command::GetOption(resp_s) => {
2562 let val = DaemonOptionVal {
2563 _service_name_len_max: self.service_name_len_max,
2564 ip_check_interval: self.ip_check_interval,
2565 };
2566 if let Err(e) = resp_s.send(val) {
2567 debug!("Failed to send options: {}", e);
2568 }
2569 }
2570
2571 Command::Verify(instance_fullname, timeout) => {
2572 self.exec_command_verify(instance_fullname, timeout, repeating);
2573 }
2574
2575 _ => {
2576 debug!("unexpected command: {:?}", &command);
2577 }
2578 }
2579 }
2580
2581 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2582 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2583 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2584 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2585 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2586 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2587 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2588 self.set_counter(Counter::Timer, self.timers.len() as i64);
2589
2590 let dns_registry_probe_count: usize = self
2591 .dns_registry_map
2592 .values()
2593 .map(|r| r.probing.len())
2594 .sum();
2595 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2596
2597 let dns_registry_active_count: usize = self
2598 .dns_registry_map
2599 .values()
2600 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2601 .sum();
2602 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2603
2604 let dns_registry_timer_count: usize = self
2605 .dns_registry_map
2606 .values()
2607 .map(|r| r.new_timers.len())
2608 .sum();
2609 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2610
2611 let dns_registry_name_change_count: usize = self
2612 .dns_registry_map
2613 .values()
2614 .map(|r| r.name_changes.len())
2615 .sum();
2616 self.set_counter(
2617 Counter::DnsRegistryNameChange,
2618 dns_registry_name_change_count as i64,
2619 );
2620
2621 if let Err(e) = resp_s.send(self.counters.clone()) {
2623 debug!("Failed to send metrics: {}", e);
2624 }
2625 }
2626
2627 fn exec_command_browse(
2628 &mut self,
2629 repeating: bool,
2630 ty: String,
2631 next_delay: u32,
2632 listener: Sender<ServiceEvent>,
2633 ) {
2634 let pretty_addrs: Vec<String> = self
2635 .intf_socks
2636 .keys()
2637 .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2638 .collect();
2639
2640 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2641 "{ty} on {} interfaces [{}]",
2642 pretty_addrs.len(),
2643 pretty_addrs.join(", ")
2644 ))) {
2645 debug!(
2646 "Failed to send SearchStarted({})(repeating:{}): {}",
2647 &ty, repeating, e
2648 );
2649 return;
2650 }
2651 if !repeating {
2652 self.service_queriers.insert(ty.clone(), listener.clone());
2656
2657 self.query_cache_for_service(&ty, &listener);
2659 }
2660
2661 self.send_query(&ty, RRType::PTR);
2662 self.increase_counter(Counter::Browse, 1);
2663
2664 let next_time = current_time_millis() + (next_delay * 1000) as u64;
2665 let max_delay = 60 * 60;
2666 let delay = cmp::min(next_delay * 2, max_delay);
2667 self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2668 }
2669
2670 fn exec_command_resolve_hostname(
2671 &mut self,
2672 repeating: bool,
2673 hostname: String,
2674 next_delay: u32,
2675 listener: Sender<HostnameResolutionEvent>,
2676 timeout: Option<u64>,
2677 ) {
2678 let addr_list: Vec<_> = self.intf_socks.keys().collect();
2679 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2680 "{} on addrs {:?}",
2681 &hostname, &addr_list
2682 ))) {
2683 debug!(
2684 "Failed to send ResolveStarted({})(repeating:{}): {}",
2685 &hostname, repeating, e
2686 );
2687 return;
2688 }
2689 if !repeating {
2690 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2691 self.query_cache_for_hostname(&hostname, listener.clone());
2693 }
2694
2695 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2696 self.increase_counter(Counter::ResolveHostname, 1);
2697
2698 let now = current_time_millis();
2699 let next_time = now + u64::from(next_delay) * 1000;
2700 let max_delay = 60 * 60;
2701 let delay = cmp::min(next_delay * 2, max_delay);
2702
2703 if self
2705 .hostname_resolvers
2706 .get(&hostname)
2707 .and_then(|(_sender, timeout)| *timeout)
2708 .map(|timeout| next_time < timeout)
2709 .unwrap_or(true)
2710 {
2711 self.add_retransmission(
2712 next_time,
2713 Command::ResolveHostname(hostname, delay, listener, None),
2714 );
2715 }
2716 }
2717
2718 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2719 let pending_query = self.query_unresolved(&instance);
2720 let max_try = 3;
2721 if pending_query && try_count < max_try {
2722 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2725 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2726 }
2727 }
2728
2729 fn exec_command_unregister(
2730 &mut self,
2731 repeating: bool,
2732 fullname: String,
2733 resp_s: Sender<UnregisterStatus>,
2734 ) {
2735 let response = match self.my_services.remove_entry(&fullname) {
2736 None => {
2737 debug!("unregister: cannot find such service {}", &fullname);
2738 UnregisterStatus::NotFound
2739 }
2740 Some((_k, info)) => {
2741 let mut timers = Vec::new();
2742 let mut multicast_sent_trackers = HashSet::new();
2744
2745 for (intf, sock) in self.intf_socks.iter() {
2746 if let Some(tracker) = multicast_send_tracker(intf) {
2747 if multicast_sent_trackers.contains(&tracker) {
2748 continue; }
2750 multicast_sent_trackers.insert(tracker);
2751 }
2752 let packet = self.unregister_service(&info, intf, sock);
2753 if !repeating && !packet.is_empty() {
2755 let next_time = current_time_millis() + 120;
2756 self.retransmissions.push(ReRun {
2757 next_time,
2758 command: Command::UnregisterResend(packet, intf.clone()),
2759 });
2760 timers.push(next_time);
2761 }
2762 }
2763
2764 for t in timers {
2765 self.add_timer(t);
2766 }
2767
2768 self.increase_counter(Counter::Unregister, 1);
2769 UnregisterStatus::OK
2770 }
2771 };
2772 if let Err(e) = resp_s.send(response) {
2773 debug!("unregister: failed to send response: {}", e);
2774 }
2775 }
2776
2777 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2778 if let Some(sock) = self.intf_socks.get(&intf) {
2779 debug!("UnregisterResend from {}", &intf.ip());
2780 multicast_on_intf(&packet[..], &intf, sock);
2781 self.increase_counter(Counter::UnregisterResend, 1);
2782 }
2783 }
2784
2785 fn exec_command_stop_browse(&mut self, ty_domain: String) {
2786 match self.service_queriers.remove_entry(&ty_domain) {
2787 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2788 Some((ty, sender)) => {
2789 trace!("StopBrowse: removed queryer for {}", &ty);
2791 let mut i = 0;
2792 while i < self.retransmissions.len() {
2793 if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2794 if t == &ty {
2795 self.retransmissions.remove(i);
2796 trace!("StopBrowse: removed retransmission for {}", &ty);
2797 continue;
2798 }
2799 }
2800 i += 1;
2801 }
2802
2803 self.cache.remove_service_type(&ty_domain);
2805
2806 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2808 Ok(()) => trace!("Sent SearchStopped to the listener"),
2809 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2810 }
2811 }
2812 }
2813 }
2814
2815 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2816 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2817 trace!("StopResolve: removed queryer for {}", &host);
2819 let mut i = 0;
2820 while i < self.retransmissions.len() {
2821 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2822 if t == &host {
2823 self.retransmissions.remove(i);
2824 trace!("StopResolve: removed retransmission for {}", &host);
2825 continue;
2826 }
2827 }
2828 i += 1;
2829 }
2830
2831 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2833 Ok(()) => trace!("Sent SearchStopped to the listener"),
2834 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2835 }
2836 }
2837 }
2838
2839 fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2840 let Some(info) = self.my_services.get_mut(&fullname) else {
2841 trace!("announce: cannot find such service {}", &fullname);
2842 return;
2843 };
2844
2845 let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2846 return;
2847 };
2848
2849 let Some(sock) = self.intf_socks.get(&intf) else {
2850 return;
2851 };
2852
2853 if announce_service_on_intf(dns_registry, info, &intf, sock) {
2854 let mut hostname = info.get_hostname();
2855 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2856 hostname = new_name;
2857 }
2858 let service_name = match dns_registry.name_changes.get(&fullname) {
2859 Some(new_name) => new_name.to_string(),
2860 None => fullname,
2861 };
2862
2863 debug!("resend: announce service {} on {}", service_name, intf.ip());
2864
2865 notify_monitors(
2866 &mut self.monitors,
2867 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2868 );
2869 info.set_status(&intf, ServiceStatus::Announced);
2870 } else {
2871 debug!("register-resend should not fail");
2872 }
2873
2874 self.increase_counter(Counter::RegisterResend, 1);
2875 }
2876
2877 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2878 let now = current_time_millis();
2888 let expire_at = if repeating {
2889 None
2890 } else {
2891 Some(now + timeout.as_millis() as u64)
2892 };
2893
2894 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2896
2897 if !record_vec.is_empty() {
2898 let query_vec: Vec<(&str, RRType)> = record_vec
2899 .iter()
2900 .map(|(record, rr_type)| (record.as_str(), *rr_type))
2901 .collect();
2902 self.send_query_vec(&query_vec);
2903
2904 if let Some(new_expire) = expire_at {
2905 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2909 }
2910 }
2911 }
2912
2913 fn refresh_active_services(&mut self) {
2915 let mut query_ptr_count = 0;
2916 let mut query_srv_count = 0;
2917 let mut new_timers = HashSet::new();
2918 let mut query_addr_count = 0;
2919
2920 for (ty_domain, _sender) in self.service_queriers.iter() {
2921 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2922 if !refreshed_timers.is_empty() {
2923 trace!("sending refresh query for PTR: {}", ty_domain);
2924 self.send_query(ty_domain, RRType::PTR);
2925 query_ptr_count += 1;
2926 new_timers.extend(refreshed_timers);
2927 }
2928
2929 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
2930 for (instance, types) in instances {
2931 trace!("sending refresh query for: {}", &instance);
2932 let query_vec = types
2933 .into_iter()
2934 .map(|ty| (instance.as_str(), ty))
2935 .collect::<Vec<_>>();
2936 self.send_query_vec(&query_vec);
2937 query_srv_count += 1;
2938 }
2939 new_timers.extend(timers);
2940 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2941 for hostname in hostnames.iter() {
2942 trace!("sending refresh queries for A and AAAA: {}", hostname);
2943 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2944 query_addr_count += 2;
2945 }
2946 new_timers.extend(timers);
2947 }
2948
2949 for timer in new_timers {
2950 self.add_timer(timer);
2951 }
2952
2953 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2954 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
2955 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2956 }
2957}
2958
2959fn add_answer_of_service(
2961 out: &mut DnsOutgoing,
2962 msg: &DnsIncoming,
2963 entry_name: &str,
2964 service: &ServiceInfo,
2965 qtype: RRType,
2966 intf: &Interface,
2967) {
2968 if qtype == RRType::SRV || qtype == RRType::ANY {
2969 out.add_answer(
2970 msg,
2971 DnsSrv::new(
2972 entry_name,
2973 CLASS_IN | CLASS_CACHE_FLUSH,
2974 service.get_host_ttl(),
2975 service.get_priority(),
2976 service.get_weight(),
2977 service.get_port(),
2978 service.get_hostname().to_string(),
2979 ),
2980 );
2981 }
2982
2983 if qtype == RRType::TXT || qtype == RRType::ANY {
2984 out.add_answer(
2985 msg,
2986 DnsTxt::new(
2987 entry_name,
2988 CLASS_IN | CLASS_CACHE_FLUSH,
2989 service.get_other_ttl(),
2990 service.generate_txt(),
2991 ),
2992 );
2993 }
2994
2995 if qtype == RRType::SRV {
2996 let intf_addrs = service.get_addrs_on_intf(intf);
2997 if intf_addrs.is_empty() {
2998 debug!(
2999 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3000 &intf
3001 );
3002 return;
3003 }
3004 for address in intf_addrs {
3005 out.add_additional_answer(DnsAddress::new(
3006 service.get_hostname(),
3007 ip_address_rr_type(&address),
3008 CLASS_IN | CLASS_CACHE_FLUSH,
3009 service.get_host_ttl(),
3010 address,
3011 intf.into(),
3012 ));
3013 }
3014 }
3015}
3016
3017#[derive(Debug)]
3020pub enum ServiceEvent {
3021 SearchStarted(String),
3023
3024 ServiceFound(String, String),
3026
3027 ServiceResolved(ServiceInfo),
3029
3030 ServiceRemoved(String, String),
3032
3033 SearchStopped(String),
3035}
3036
3037#[derive(Debug)]
3040#[non_exhaustive]
3041pub enum HostnameResolutionEvent {
3042 SearchStarted(String),
3044 AddressesFound(String, HashSet<IpAddr>),
3046 AddressesRemoved(String, HashSet<IpAddr>),
3048 SearchTimeout(String),
3050 SearchStopped(String),
3052}
3053
3054#[derive(Clone, Debug)]
3057#[non_exhaustive]
3058pub enum DaemonEvent {
3059 Announce(String, String),
3061
3062 Error(Error),
3064
3065 IpAdd(IpAddr),
3067
3068 IpDel(IpAddr),
3070
3071 NameChange(DnsNameChange),
3074
3075 Respond(IpAddr),
3077}
3078
3079#[derive(Clone, Debug)]
3082pub struct DnsNameChange {
3083 pub original: String,
3085
3086 pub new_name: String,
3096
3097 pub rr_type: RRType,
3099
3100 pub intf_name: String,
3102}
3103
3104#[derive(Debug)]
3106enum Command {
3107 Browse(String, u32, Sender<ServiceEvent>),
3109
3110 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
3115
3116 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, Interface), UnregisterResend(Vec<u8>, Interface), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3137
3138 GetStatus(Sender<DaemonStatus>),
3140
3141 Monitor(Sender<DaemonEvent>),
3143
3144 SetOption(DaemonOption),
3145
3146 GetOption(Sender<DaemonOptionVal>),
3147
3148 Verify(String, Duration),
3153
3154 Exit(Sender<DaemonStatus>),
3155}
3156
3157impl fmt::Display for Command {
3158 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3159 match self {
3160 Self::Browse(_, _, _) => write!(f, "Command Browse"),
3161 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3162 Self::Exit(_) => write!(f, "Command Exit"),
3163 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3164 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3165 Self::Monitor(_) => write!(f, "Command Monitor"),
3166 Self::Register(_) => write!(f, "Command Register"),
3167 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3168 Self::SetOption(_) => write!(f, "Command SetOption"),
3169 Self::GetOption(_) => write!(f, "Command GetOption"),
3170 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3171 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3172 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3173 Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3174 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3175 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3176 }
3177 }
3178}
3179
3180struct DaemonOptionVal {
3181 _service_name_len_max: u8,
3182 ip_check_interval: u64,
3183}
3184
3185#[derive(Debug)]
3186enum DaemonOption {
3187 ServiceNameLenMax(u8),
3188 IpCheckInterval(u64),
3189 EnableInterface(Vec<IfKind>),
3190 DisableInterface(Vec<IfKind>),
3191 MulticastLoopV4(bool),
3192 MulticastLoopV6(bool),
3193}
3194
3195const DOMAIN_LEN: usize = "._tcp.local.".len();
3197
3198fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3200 if ty_domain.len() <= DOMAIN_LEN + 1 {
3201 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3203 }
3204
3205 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3207 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3208 }
3209 Ok(())
3210}
3211
3212fn check_domain_suffix(name: &str) -> Result<()> {
3214 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3215 return Err(e_fmt!(
3216 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3217 name
3218 ));
3219 }
3220
3221 Ok(())
3222}
3223
3224fn check_service_name(fullname: &str) -> Result<()> {
3232 check_domain_suffix(fullname)?;
3233
3234 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3235 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3236
3237 if &name[0..1] != "_" {
3238 return Err(e_fmt!("Service name must start with '_'"));
3239 }
3240
3241 let name = &name[1..];
3242
3243 if name.contains("--") {
3244 return Err(e_fmt!("Service name must not contain '--'"));
3245 }
3246
3247 if name.starts_with('-') || name.ends_with('-') {
3248 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3249 }
3250
3251 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3252 if ascii_count < 1 {
3253 return Err(e_fmt!(
3254 "Service name must contain at least one letter (eg: 'A-Za-z')"
3255 ));
3256 }
3257
3258 Ok(())
3259}
3260
3261fn check_hostname(hostname: &str) -> Result<()> {
3263 if !hostname.ends_with(".local.") {
3264 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3265 }
3266
3267 if hostname == ".local." {
3268 return Err(e_fmt!(
3269 "The part of the hostname before '.local.' cannot be empty"
3270 ));
3271 }
3272
3273 if hostname.len() > 255 {
3274 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3275 }
3276
3277 Ok(())
3278}
3279
3280fn call_service_listener(
3281 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3282 ty_domain: &str,
3283 event: ServiceEvent,
3284) {
3285 if let Some(listener) = listeners_map.get(ty_domain) {
3286 match listener.send(event) {
3287 Ok(()) => trace!("Sent event to listener successfully"),
3288 Err(e) => debug!("Failed to send event: {}", e),
3289 }
3290 }
3291}
3292
3293fn call_hostname_resolution_listener(
3294 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3295 hostname: &str,
3296 event: HostnameResolutionEvent,
3297) {
3298 let hostname_lower = hostname.to_lowercase();
3299 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3300 match listener.send(event) {
3301 Ok(()) => trace!("Sent event to listener successfully"),
3302 Err(e) => debug!("Failed to send event: {}", e),
3303 }
3304 }
3305}
3306
3307fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3310 if_addrs::get_if_addrs()
3311 .unwrap_or_default()
3312 .into_iter()
3313 .filter(|i| !i.is_loopback() || with_loopback)
3314 .collect()
3315}
3316
3317fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3319 let qtype = if out.is_query() { "query" } else { "response" };
3320 trace!(
3321 "send outgoing {}: {} questions {} answers {} authorities {} additional",
3322 qtype,
3323 out.questions().len(),
3324 out.answers_count(),
3325 out.authorities().len(),
3326 out.additionals().len()
3327 );
3328 let packet_list = out.to_data_on_wire();
3329 for packet in packet_list.iter() {
3330 multicast_on_intf(packet, intf, sock);
3331 }
3332 packet_list
3333}
3334
3335fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3337 if packet.len() > MAX_MSG_ABSOLUTE {
3338 debug!("Drop over-sized packet ({})", packet.len());
3339 return;
3340 }
3341
3342 let addr: SocketAddr = match intf.addr {
3343 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3344 if_addrs::IfAddr::V6(_) => {
3345 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3346 sock.set_scope_id(intf.index.unwrap_or(0)); sock.into()
3348 }
3349 };
3350
3351 send_packet(packet, addr, intf, socket);
3352}
3353
3354fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3356 match sock.send_to(packet, addr) {
3357 Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3358 Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3359 }
3360}
3361
3362fn valid_instance_name(name: &str) -> bool {
3366 name.split('.').count() >= 5
3367}
3368
3369fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3370 monitors.retain(|sender| {
3371 if let Err(e) = sender.try_send(event.clone()) {
3372 debug!("notify_monitors: try_send: {}", &e);
3373 if matches!(e, TrySendError::Disconnected(_)) {
3374 return false; }
3376 }
3377 true
3378 });
3379}
3380
3381fn prepare_announce(
3384 info: &ServiceInfo,
3385 intf: &Interface,
3386 dns_registry: &mut DnsRegistry,
3387) -> Option<DnsOutgoing> {
3388 let intf_addrs = info.get_addrs_on_intf(intf);
3389 if intf_addrs.is_empty() {
3390 trace!("No valid addrs to add on intf {:?}", &intf);
3391 return None;
3392 }
3393
3394 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3396 Some(new_name) => new_name,
3397 None => info.get_fullname(),
3398 };
3399
3400 debug!(
3401 "prepare to announce service {service_fullname} on {}: {}",
3402 &intf.name,
3403 &intf.ip()
3404 );
3405
3406 let mut probing_count = 0;
3407 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3408 let create_time = current_time_millis() + fastrand::u64(0..250);
3409
3410 out.add_answer_at_time(
3411 DnsPointer::new(
3412 info.get_type(),
3413 RRType::PTR,
3414 CLASS_IN,
3415 info.get_other_ttl(),
3416 service_fullname.to_string(),
3417 ),
3418 0,
3419 );
3420
3421 if let Some(sub) = info.get_subtype() {
3422 trace!("Adding subdomain {}", sub);
3423 out.add_answer_at_time(
3424 DnsPointer::new(
3425 sub,
3426 RRType::PTR,
3427 CLASS_IN,
3428 info.get_other_ttl(),
3429 service_fullname.to_string(),
3430 ),
3431 0,
3432 );
3433 }
3434
3435 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3437 Some(new_name) => new_name.to_string(),
3438 None => info.get_hostname().to_string(),
3439 };
3440
3441 let mut srv = DnsSrv::new(
3442 info.get_fullname(),
3443 CLASS_IN | CLASS_CACHE_FLUSH,
3444 info.get_host_ttl(),
3445 info.get_priority(),
3446 info.get_weight(),
3447 info.get_port(),
3448 hostname,
3449 );
3450
3451 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3452 srv.get_record_mut().set_new_name(new_name.to_string());
3453 }
3454
3455 if !info.requires_probe()
3456 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3457 {
3458 out.add_answer_at_time(srv, 0);
3459 } else {
3460 probing_count += 1;
3461 }
3462
3463 let mut txt = DnsTxt::new(
3466 info.get_fullname(),
3467 CLASS_IN | CLASS_CACHE_FLUSH,
3468 info.get_other_ttl(),
3469 info.generate_txt(),
3470 );
3471
3472 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3473 txt.get_record_mut().set_new_name(new_name.to_string());
3474 }
3475
3476 if !info.requires_probe()
3477 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3478 {
3479 out.add_answer_at_time(txt, 0);
3480 } else {
3481 probing_count += 1;
3482 }
3483
3484 let hostname = info.get_hostname();
3487 for address in intf_addrs {
3488 let mut dns_addr = DnsAddress::new(
3489 hostname,
3490 ip_address_rr_type(&address),
3491 CLASS_IN | CLASS_CACHE_FLUSH,
3492 info.get_host_ttl(),
3493 address,
3494 intf.into(),
3495 );
3496
3497 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3498 dns_addr.get_record_mut().set_new_name(new_name.to_string());
3499 }
3500
3501 if !info.requires_probe()
3502 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3503 {
3504 out.add_answer_at_time(dns_addr, 0);
3505 } else {
3506 probing_count += 1;
3507 }
3508 }
3509
3510 if probing_count > 0 {
3511 return None;
3512 }
3513
3514 Some(out)
3515}
3516
3517fn announce_service_on_intf(
3520 dns_registry: &mut DnsRegistry,
3521 info: &ServiceInfo,
3522 intf: &Interface,
3523 sock: &MioUdpSocket,
3524) -> bool {
3525 if let Some(out) = prepare_announce(info, intf, dns_registry) {
3526 send_dns_outgoing(&out, intf, sock);
3527 return true;
3528 }
3529 false
3530}
3531
3532fn name_change(original: &str) -> String {
3540 let mut parts: Vec<_> = original.split('.').collect();
3541 let Some(first_part) = parts.get_mut(0) else {
3542 return format!("{original} (2)");
3543 };
3544
3545 let mut new_name = format!("{} (2)", first_part);
3546
3547 if let Some(paren_pos) = first_part.rfind(" (") {
3549 if let Some(end_paren) = first_part[paren_pos..].find(')') {
3551 let absolute_end_pos = paren_pos + end_paren;
3552 if absolute_end_pos == first_part.len() - 1 {
3554 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3557 let base_name = &first_part[..paren_pos];
3558 new_name = format!("{} ({})", base_name, number + 1)
3559 }
3560 }
3561 }
3562 }
3563
3564 *first_part = &new_name;
3565 parts.join(".")
3566}
3567
3568fn hostname_change(original: &str) -> String {
3576 let mut parts: Vec<_> = original.split('.').collect();
3577 let Some(first_part) = parts.get_mut(0) else {
3578 return format!("{original}-2");
3579 };
3580
3581 let mut new_name = format!("{}-2", first_part);
3582
3583 if let Some(hyphen_pos) = first_part.rfind('-') {
3585 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3587 let base_name = &first_part[..hyphen_pos];
3588 new_name = format!("{}-{}", base_name, number + 1);
3589 }
3590 }
3591
3592 *first_part = &new_name;
3593 parts.join(".")
3594}
3595
3596fn add_answer_with_additionals(
3597 out: &mut DnsOutgoing,
3598 msg: &DnsIncoming,
3599 service: &ServiceInfo,
3600 intf: &Interface,
3601 dns_registry: &DnsRegistry,
3602) {
3603 let intf_addrs = service.get_addrs_on_intf(intf);
3604 if intf_addrs.is_empty() {
3605 trace!("No addrs on LAN of intf {:?}", intf);
3606 return;
3607 }
3608
3609 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3611 Some(new_name) => new_name,
3612 None => service.get_fullname(),
3613 };
3614
3615 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3616 Some(new_name) => new_name,
3617 None => service.get_hostname(),
3618 };
3619
3620 let ptr_added = out.add_answer(
3621 msg,
3622 DnsPointer::new(
3623 service.get_type(),
3624 RRType::PTR,
3625 CLASS_IN,
3626 service.get_other_ttl(),
3627 service_fullname.to_string(),
3628 ),
3629 );
3630
3631 if !ptr_added {
3632 trace!("answer was not added for msg {:?}", msg);
3633 return;
3634 }
3635
3636 if let Some(sub) = service.get_subtype() {
3637 trace!("Adding subdomain {}", sub);
3638 out.add_additional_answer(DnsPointer::new(
3639 sub,
3640 RRType::PTR,
3641 CLASS_IN,
3642 service.get_other_ttl(),
3643 service_fullname.to_string(),
3644 ));
3645 }
3646
3647 out.add_additional_answer(DnsSrv::new(
3650 service_fullname,
3651 CLASS_IN | CLASS_CACHE_FLUSH,
3652 service.get_host_ttl(),
3653 service.get_priority(),
3654 service.get_weight(),
3655 service.get_port(),
3656 hostname.to_string(),
3657 ));
3658
3659 out.add_additional_answer(DnsTxt::new(
3660 service_fullname,
3661 CLASS_IN | CLASS_CACHE_FLUSH,
3662 service.get_other_ttl(),
3663 service.generate_txt(),
3664 ));
3665
3666 for address in intf_addrs {
3667 out.add_additional_answer(DnsAddress::new(
3668 hostname,
3669 ip_address_rr_type(&address),
3670 CLASS_IN | CLASS_CACHE_FLUSH,
3671 service.get_host_ttl(),
3672 address,
3673 intf.into(),
3674 ));
3675 }
3676}
3677
3678#[cfg(test)]
3679mod tests {
3680 use super::{
3681 check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3682 name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3683 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3684 MDNS_PORT,
3685 };
3686 use crate::{
3687 dns_parser::{
3688 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, CLASS_IN, FLAGS_AA,
3689 FLAGS_QR_RESPONSE,
3690 },
3691 service_daemon::{add_answer_of_service, check_hostname},
3692 };
3693 use std::{
3694 net::{SocketAddr, SocketAddrV4},
3695 time::Duration,
3696 };
3697 use test_log::test;
3698
3699 #[test]
3700 fn test_socketaddr_print() {
3701 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3702 let print = format!("{}", addr);
3703 assert_eq!(print, "224.0.0.251:5353");
3704 }
3705
3706 #[test]
3707 fn test_instance_name() {
3708 assert!(valid_instance_name("my-laser._printer._tcp.local."));
3709 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3710 assert!(!valid_instance_name("_printer._tcp.local."));
3711 }
3712
3713 #[test]
3714 fn test_check_service_name_length() {
3715 let result = check_service_name_length("_tcp", 100);
3716 assert!(result.is_err());
3717 if let Err(e) = result {
3718 println!("{}", e);
3719 }
3720 }
3721
3722 #[test]
3723 fn test_check_hostname() {
3724 for hostname in &[
3726 "my_host.local.",
3727 &("A".repeat(255 - ".local.".len()) + ".local."),
3728 ] {
3729 let result = check_hostname(hostname);
3730 assert!(result.is_ok());
3731 }
3732
3733 for hostname in &[
3735 "my_host.local",
3736 ".local.",
3737 &("A".repeat(256 - ".local.".len()) + ".local."),
3738 ] {
3739 let result = check_hostname(hostname);
3740 assert!(result.is_err());
3741 if let Err(e) = result {
3742 println!("{}", e);
3743 }
3744 }
3745 }
3746
3747 #[test]
3748 fn test_check_domain_suffix() {
3749 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3750 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3751 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3752 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3753 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3754 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3755 }
3756
3757 #[test]
3758 fn test_service_with_temporarily_invalidated_ptr() {
3759 let d = ServiceDaemon::new().expect("Failed to create daemon");
3761
3762 let service = "_test_inval_ptr._udp.local.";
3763 let host_name = "my_host_tmp_invalidated_ptr.local.";
3764 let intfs: Vec<_> = my_ip_interfaces(false);
3765 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3766 let port = 5201;
3767 let my_service =
3768 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3769 .expect("invalid service info")
3770 .enable_addr_auto();
3771 let result = d.register(my_service.clone());
3772 assert!(result.is_ok());
3773
3774 let browse_chan = d.browse(service).unwrap();
3776 let timeout = Duration::from_secs(2);
3777 let mut resolved = false;
3778
3779 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3780 match event {
3781 ServiceEvent::ServiceResolved(info) => {
3782 resolved = true;
3783 println!("Resolved a service of {}", &info.get_fullname());
3784 break;
3785 }
3786 e => {
3787 println!("Received event {:?}", e);
3788 }
3789 }
3790 }
3791
3792 assert!(resolved);
3793
3794 println!("Stopping browse of {}", service);
3795 d.stop_browse(service).unwrap();
3798
3799 let mut stopped = false;
3804 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3805 match event {
3806 ServiceEvent::SearchStopped(_) => {
3807 stopped = true;
3808 println!("Stopped browsing service");
3809 break;
3810 }
3811 e => {
3815 println!("Received event {:?}", e);
3816 }
3817 }
3818 }
3819
3820 assert!(stopped);
3821
3822 let invalidate_ptr_packet = DnsPointer::new(
3824 my_service.get_type(),
3825 RRType::PTR,
3826 CLASS_IN,
3827 0,
3828 my_service.get_fullname().to_string(),
3829 );
3830
3831 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3832 packet_buffer.add_additional_answer(invalidate_ptr_packet);
3833
3834 for intf in intfs {
3835 let sock = new_socket_bind(&intf, true).unwrap();
3836 send_dns_outgoing(&packet_buffer, &intf, &sock);
3837 }
3838
3839 println!(
3840 "Sent PTR record invalidation. Starting second browse for {}",
3841 service
3842 );
3843
3844 let browse_chan = d.browse(service).unwrap();
3846
3847 resolved = false;
3848 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3849 match event {
3850 ServiceEvent::ServiceResolved(info) => {
3851 resolved = true;
3852 println!("Resolved a service of {}", &info.get_fullname());
3853 break;
3854 }
3855 e => {
3856 println!("Received event {:?}", e);
3857 }
3858 }
3859 }
3860
3861 assert!(resolved);
3862 d.shutdown().unwrap();
3863 }
3864
3865 #[test]
3866 fn test_expired_srv() {
3867 let service_type = "_expired-srv._udp.local.";
3869 let instance = "test_instance";
3870 let host_name = "expired_srv_host.local.";
3871 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3872 .unwrap()
3873 .enable_addr_auto();
3874 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
3879
3880 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3882 let result = mdns_server.register(my_service);
3883 assert!(result.is_ok());
3884
3885 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3886 let browse_chan = mdns_client.browse(service_type).unwrap();
3887 let timeout = Duration::from_secs(2);
3888 let mut resolved = false;
3889
3890 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3891 match event {
3892 ServiceEvent::ServiceResolved(info) => {
3893 resolved = true;
3894 println!("Resolved a service of {}", &info.get_fullname());
3895 break;
3896 }
3897 _ => {}
3898 }
3899 }
3900
3901 assert!(resolved);
3902
3903 mdns_server.shutdown().unwrap();
3905
3906 let expire_timeout = Duration::from_secs(new_ttl as u64);
3908 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3909 match event {
3910 ServiceEvent::ServiceRemoved(service_type, full_name) => {
3911 println!("Service removed: {}: {}", &service_type, &full_name);
3912 break;
3913 }
3914 _ => {}
3915 }
3916 }
3917 }
3918
3919 #[test]
3920 fn test_hostname_resolution_address_removed() {
3921 let server = ServiceDaemon::new().expect("Failed to create server");
3923 let hostname = "addr_remove_host._tcp.local.";
3924 let service_ip_addr = my_ip_interfaces(false)
3925 .iter()
3926 .find(|iface| iface.ip().is_ipv4())
3927 .map(|iface| iface.ip())
3928 .unwrap();
3929
3930 let mut my_service = ServiceInfo::new(
3931 "_host_res_test._tcp.local.",
3932 "my_instance",
3933 hostname,
3934 &service_ip_addr,
3935 1234,
3936 None,
3937 )
3938 .expect("invalid service info");
3939
3940 let addr_ttl = 2;
3942 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
3945
3946 let client = ServiceDaemon::new().expect("Failed to create client");
3948 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3949 let resolved = loop {
3950 match event_receiver.recv() {
3951 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3952 assert!(found_hostname == hostname);
3953 assert!(addresses.contains(&service_ip_addr));
3954 println!("address found: {:?}", &addresses);
3955 break true;
3956 }
3957 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3958 Ok(_event) => {}
3959 Err(_) => break false,
3960 }
3961 };
3962
3963 assert!(resolved);
3964
3965 server.shutdown().unwrap();
3967
3968 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3970 let removed = loop {
3971 match event_receiver.recv_timeout(timeout) {
3972 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3973 assert!(removed_host == hostname);
3974 assert!(addresses.contains(&service_ip_addr));
3975
3976 println!(
3977 "address removed: hostname: {} addresses: {:?}",
3978 &hostname, &addresses
3979 );
3980 break true;
3981 }
3982 Ok(_event) => {}
3983 Err(_) => {
3984 break false;
3985 }
3986 }
3987 };
3988
3989 assert!(removed);
3990
3991 client.shutdown().unwrap();
3992 }
3993
3994 #[test]
3995 fn test_refresh_ptr() {
3996 let service_type = "_refresh-ptr._udp.local.";
3998 let instance = "test_instance";
3999 let host_name = "refresh_ptr_host.local.";
4000 let service_ip_addr = my_ip_interfaces(false)
4001 .iter()
4002 .find(|iface| iface.ip().is_ipv4())
4003 .map(|iface| iface.ip())
4004 .unwrap();
4005
4006 let mut my_service = ServiceInfo::new(
4007 service_type,
4008 instance,
4009 host_name,
4010 &service_ip_addr,
4011 5023,
4012 None,
4013 )
4014 .unwrap();
4015
4016 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4018
4019 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4021 let result = mdns_server.register(my_service);
4022 assert!(result.is_ok());
4023
4024 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4025 let browse_chan = mdns_client.browse(service_type).unwrap();
4026 let timeout = Duration::from_millis(1500); let mut resolved = false;
4028
4029 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4031 match event {
4032 ServiceEvent::ServiceResolved(info) => {
4033 resolved = true;
4034 println!("Resolved a service of {}", &info.get_fullname());
4035 break;
4036 }
4037 _ => {}
4038 }
4039 }
4040
4041 assert!(resolved);
4042
4043 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4045 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4046 println!("event: {:?}", &event);
4047 }
4048
4049 let metrics_chan = mdns_client.get_metrics().unwrap();
4051 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4052 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4053 assert_eq!(ptr_refresh_counter, 1);
4054 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4055 assert_eq!(srvtxt_refresh_counter, 1);
4056
4057 mdns_server.shutdown().unwrap();
4059 mdns_client.shutdown().unwrap();
4060 }
4061
4062 #[test]
4063 fn test_name_change() {
4064 assert_eq!(name_change("foo.local."), "foo (2).local.");
4065 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4066 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4067 assert_eq!(name_change("foo"), "foo (2)");
4068 assert_eq!(name_change("foo (2)"), "foo (3)");
4069 assert_eq!(name_change(""), " (2)");
4070
4071 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4076
4077 #[test]
4078 fn test_hostname_change() {
4079 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4080 assert_eq!(hostname_change("foo"), "foo-2");
4081 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4082 assert_eq!(hostname_change("foo-9"), "foo-10");
4083 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4084 }
4085
4086 #[test]
4087 fn test_add_answer_txt_ttl() {
4088 let service_type = "_test_add_answer._udp.local.";
4090 let instance = "test_instance";
4091 let host_name = "add_answer_host.local.";
4092 let service_intf = my_ip_interfaces(false)
4093 .into_iter()
4094 .find(|iface| iface.ip().is_ipv4())
4095 .unwrap();
4096 let service_ip_addr = service_intf.ip();
4097 let my_service = ServiceInfo::new(
4098 service_type,
4099 instance,
4100 host_name,
4101 &service_ip_addr,
4102 5023,
4103 None,
4104 )
4105 .unwrap();
4106
4107 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4109
4110 let mut dummy_data = out.to_data_on_wire();
4112 let interface_id = InterfaceId::from(&service_intf);
4113 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4114
4115 add_answer_of_service(
4117 &mut out,
4118 &incoming,
4119 instance,
4120 &my_service,
4121 RRType::TXT,
4122 &service_intf,
4123 );
4124
4125 assert!(
4127 out.answers_count() > 0,
4128 "No answers added to the outgoing message"
4129 );
4130
4131 let answer = out._answers().first().unwrap();
4133 assert_eq!(answer.0.get_type(), RRType::TXT);
4134
4135 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4137 }
4138}