1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38 FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{
42 split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43 },
44 Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51 cmp::{self, Reverse},
52 collections::{BinaryHeap, HashMap, HashSet},
53 fmt,
54 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55 str, thread,
56 time::Duration,
57 vec,
58};
59
60pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79#[derive(Debug)]
81pub enum UnregisterStatus {
82 OK,
84 NotFound,
86}
87
88#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92 Running,
94
95 Shutdown,
97}
98
99#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103 Register,
104 RegisterResend,
105 Unregister,
106 UnregisterResend,
107 Browse,
108 ResolveHostname,
109 Respond,
110 CacheRefreshPTR,
111 CacheRefreshSRV,
112 CacheRefreshAddr,
113 KnownAnswerSuppression,
114 CachedPTR,
115 CachedSRV,
116 CachedAddr,
117 CachedTxt,
118 CachedNSec,
119 CachedSubtype,
120 DnsRegistryProbe,
121 DnsRegistryActive,
122 DnsRegistryTimer,
123 DnsRegistryNameChange,
124 Timer,
125}
126
127impl fmt::Display for Counter {
128 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
129 match self {
130 Self::Register => write!(f, "register"),
131 Self::RegisterResend => write!(f, "register-resend"),
132 Self::Unregister => write!(f, "unregister"),
133 Self::UnregisterResend => write!(f, "unregister-resend"),
134 Self::Browse => write!(f, "browse"),
135 Self::ResolveHostname => write!(f, "resolve-hostname"),
136 Self::Respond => write!(f, "respond"),
137 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
138 Self::CacheRefreshSRV => write!(f, "cache-refresh-srv"),
139 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
140 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
141 Self::CachedPTR => write!(f, "cached-ptr"),
142 Self::CachedSRV => write!(f, "cached-srv"),
143 Self::CachedAddr => write!(f, "cached-addr"),
144 Self::CachedTxt => write!(f, "cached-txt"),
145 Self::CachedNSec => write!(f, "cached-nsec"),
146 Self::CachedSubtype => write!(f, "cached-subtype"),
147 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
148 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
149 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
150 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
151 Self::Timer => write!(f, "timer"),
152 }
153 }
154}
155
156pub type Metrics = HashMap<String, i64>;
159
160const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
166pub struct ServiceDaemon {
167 sender: Sender<Command>,
169
170 signal_addr: SocketAddr,
176}
177
178impl ServiceDaemon {
179 pub fn new() -> Result<Self> {
184 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
187
188 let signal_sock = UdpSocket::bind(signal_addr)
189 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
190
191 let signal_addr = signal_sock
193 .local_addr()
194 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
195
196 signal_sock
198 .set_nonblocking(true)
199 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
200
201 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
202
203 let (sender, receiver) = bounded(100);
204
205 let mio_sock = MioUdpSocket::from_std(signal_sock);
207 thread::Builder::new()
208 .name("mDNS_daemon".to_string())
209 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
210 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
211
212 Ok(Self {
213 sender,
214 signal_addr,
215 })
216 }
217
218 fn send_cmd(&self, cmd: Command) -> Result<()> {
221 let cmd_name = cmd.to_string();
222
223 self.sender.try_send(cmd).map_err(|e| match e {
225 TrySendError::Full(_) => Error::Again,
226 e => e_fmt!("flume::channel::send failed: {}", e),
227 })?;
228
229 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
231 let socket = UdpSocket::bind(addr)
232 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
233 socket
234 .send_to(cmd_name.as_bytes(), self.signal_addr)
235 .map_err(|e| {
236 e_fmt!(
237 "signal socket send_to {} ({}) failed: {}",
238 self.signal_addr,
239 cmd_name,
240 e
241 )
242 })?;
243
244 Ok(())
245 }
246
247 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
258 check_domain_suffix(service_type)?;
259
260 let (resp_s, resp_r) = bounded(10);
261 self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
262 Ok(resp_r)
263 }
264
265 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
270 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
271 }
272
273 pub fn resolve_hostname(
281 &self,
282 hostname: &str,
283 timeout: Option<u64>,
284 ) -> Result<Receiver<HostnameResolutionEvent>> {
285 check_hostname(hostname)?;
286 let (resp_s, resp_r) = bounded(10);
287 self.send_cmd(Command::ResolveHostname(
288 hostname.to_string(),
289 1,
290 resp_s,
291 timeout,
292 ))?;
293 Ok(resp_r)
294 }
295
296 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
301 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
302 }
303
304 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
312 check_service_name(service_info.get_fullname())?;
313 check_hostname(service_info.get_hostname())?;
314
315 self.send_cmd(Command::Register(service_info))
316 }
317
318 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
326 let (resp_s, resp_r) = bounded(1);
327 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
328 Ok(resp_r)
329 }
330
331 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
335 let (resp_s, resp_r) = bounded(100);
336 self.send_cmd(Command::Monitor(resp_s))?;
337 Ok(resp_r)
338 }
339
340 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
345 let (resp_s, resp_r) = bounded(1);
346 self.send_cmd(Command::Exit(resp_s))?;
347 Ok(resp_r)
348 }
349
350 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
356 let (resp_s, resp_r) = bounded(1);
357
358 if self.sender.is_disconnected() {
359 resp_s
360 .send(DaemonStatus::Shutdown)
361 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
362 } else {
363 self.send_cmd(Command::GetStatus(resp_s))?;
364 }
365
366 Ok(resp_r)
367 }
368
369 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
374 let (resp_s, resp_r) = bounded(1);
375 self.send_cmd(Command::GetMetrics(resp_s))?;
376 Ok(resp_r)
377 }
378
379 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
386 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
389 return Err(Error::Msg(format!(
390 "service name length max {} is too large",
391 len_max
392 )));
393 }
394
395 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
396 }
397
398 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
404 let interval_in_millis = interval_in_secs as u64 * 1000;
405 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
406 interval_in_millis,
407 )))
408 }
409
410 pub fn get_ip_check_interval(&self) -> Result<u32> {
412 let (resp_s, resp_r) = bounded(1);
413 self.send_cmd(Command::GetOption(resp_s))?;
414
415 let option = resp_r
416 .recv_timeout(Duration::from_secs(10))
417 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
418 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
419 Ok(ip_check_interval_in_secs as u32)
420 }
421
422 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
429 let if_kind_vec = if_kind.into_vec();
430 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
431 if_kind_vec.kinds,
432 )))
433 }
434
435 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
442 let if_kind_vec = if_kind.into_vec();
443 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
444 if_kind_vec.kinds,
445 )))
446 }
447
448 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
464 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
465 }
466
467 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
483 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
484 }
485
486 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
499 self.send_cmd(Command::Verify(instance_fullname, timeout))
500 }
501
502 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
503 let zc = Zeroconf::new(signal_sock, poller);
504
505 if let Some(cmd) = Self::run(zc, receiver) {
506 match cmd {
507 Command::Exit(resp_s) => {
508 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
511 debug!("exit: failed to send response of shutdown: {}", e);
512 }
513 }
514 _ => {
515 debug!("Unexpected command: {:?}", cmd);
516 }
517 }
518 }
519 }
520
521 fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
530 if let Err(e) = zc.poller.registry().register(
532 &mut zc.signal_sock,
533 mio::Token(SIGNAL_SOCK_EVENT_KEY),
534 mio::Interest::READABLE,
535 ) {
536 debug!("failed to add signal socket to the poller: {}", e);
537 return None;
538 }
539
540 for (intf, sock) in zc.intf_socks.iter_mut() {
542 let key =
543 Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
544
545 if let Err(e) =
546 zc.poller
547 .registry()
548 .register(sock, mio::Token(key), mio::Interest::READABLE)
549 {
550 debug!("add socket of {:?} to poller: {e}", intf);
551 return None;
552 }
553 }
554
555 let mut next_ip_check = if zc.ip_check_interval > 0 {
557 current_time_millis() + zc.ip_check_interval
558 } else {
559 0
560 };
561
562 if next_ip_check > 0 {
563 zc.add_timer(next_ip_check);
564 }
565
566 let mut events = mio::Events::with_capacity(1024);
569 loop {
570 let now = current_time_millis();
571
572 let earliest_timer = zc.peek_earliest_timer();
573 let timeout = earliest_timer.map(|timer| {
574 let millis = if timer > now { timer - now } else { 1 };
576 Duration::from_millis(millis)
577 });
578
579 events.clear();
581 match zc.poller.poll(&mut events, timeout) {
582 Ok(_) => zc.handle_poller_events(&events),
583 Err(e) => debug!("failed to select from sockets: {}", e),
584 }
585
586 let now = current_time_millis();
587
588 zc.pop_timers_till(now);
590
591 for hostname in zc
593 .hostname_resolvers
594 .clone()
595 .into_iter()
596 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
597 .map(|(hostname, _)| hostname)
598 {
599 trace!("hostname resolver timeout for {}", &hostname);
600 call_hostname_resolution_listener(
601 &zc.hostname_resolvers,
602 &hostname,
603 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
604 );
605 call_hostname_resolution_listener(
606 &zc.hostname_resolvers,
607 &hostname,
608 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
609 );
610 zc.hostname_resolvers.remove(&hostname);
611 }
612
613 while let Ok(command) = receiver.try_recv() {
615 if matches!(command, Command::Exit(_)) {
616 zc.status = DaemonStatus::Shutdown;
617 return Some(command);
618 }
619 zc.exec_command(command, false);
620 }
621
622 let mut i = 0;
624 while i < zc.retransmissions.len() {
625 if now >= zc.retransmissions[i].next_time {
626 let rerun = zc.retransmissions.remove(i);
627 zc.exec_command(rerun.command, true);
628 } else {
629 i += 1;
630 }
631 }
632
633 zc.refresh_active_services();
635
636 let mut query_count = 0;
638 for (hostname, _sender) in zc.hostname_resolvers.iter() {
639 for (hostname, ip_addr) in
640 zc.cache.refresh_due_hostname_resolutions(hostname).iter()
641 {
642 zc.send_query(hostname, ip_address_rr_type(ip_addr));
643 query_count += 1;
644 }
645 }
646
647 zc.increase_counter(Counter::CacheRefreshAddr, query_count);
648
649 let now = current_time_millis();
651
652 let expired_services = zc.cache.evict_expired_services(now);
654 zc.notify_service_removal(expired_services);
655
656 let expired_addrs = zc.cache.evict_expired_addr(now);
658 for (hostname, addrs) in expired_addrs {
659 call_hostname_resolution_listener(
660 &zc.hostname_resolvers,
661 &hostname,
662 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
663 );
664 let instances = zc.cache.get_instances_on_host(&hostname);
665 let instance_set: HashSet<String> = instances.into_iter().collect();
666 zc.resolve_updated_instances(&instance_set);
667 }
668
669 zc.probing_handler();
671
672 if now >= next_ip_check && next_ip_check > 0 {
674 next_ip_check = now + zc.ip_check_interval;
675 zc.add_timer(next_ip_check);
676
677 zc.check_ip_changes();
678 }
679 }
680 }
681}
682
683fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
685 let intf_ip = &intf.ip();
688 match intf_ip {
689 IpAddr::V4(ip) => {
690 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
691 let sock = new_socket(addr.into(), true)?;
692
693 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
695 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
696
697 sock.set_multicast_if_v4(ip)
699 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
700
701 if !should_loop {
702 sock.set_multicast_loop_v4(false)
703 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
704 }
705
706 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
708 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
709 for packet in test_packets {
710 sock.send_to(&packet, &multicast_addr)
711 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
712 }
713 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
714 }
715 IpAddr::V6(ip) => {
716 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
717 let sock = new_socket(addr.into(), true)?;
718
719 sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
721 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
722
723 sock.set_multicast_if_v6(intf.index.unwrap_or(0))
725 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
726
727 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
732 }
733 }
734}
735
736fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
739 let domain = match addr {
740 SocketAddr::V4(_) => socket2::Domain::IPV4,
741 SocketAddr::V6(_) => socket2::Domain::IPV6,
742 };
743
744 let fd = Socket::new(domain, socket2::Type::DGRAM, None)
745 .map_err(|e| e_fmt!("create socket failed: {}", e))?;
746
747 fd.set_reuse_address(true)
748 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
749 #[cfg(unix)] fd.set_reuse_port(true)
751 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
752
753 if non_block {
754 fd.set_nonblocking(true)
755 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
756 }
757
758 fd.bind(&addr.into())
759 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
760
761 trace!("new socket bind to {}", &addr);
762 Ok(fd)
763}
764
765struct ReRun {
767 next_time: u64,
769 command: Command,
770}
771
772#[derive(Debug, Eq, Hash, PartialEq)]
774enum IpVersion {
775 V4,
776 V6,
777}
778
779#[derive(Debug, Eq, Hash, PartialEq)]
781struct MulticastSendTracker {
782 intf_index: u32,
783 ip_version: IpVersion,
784}
785
786fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
788 match intf.index {
789 Some(index) => {
790 let ip_ver = match intf.addr {
791 IfAddr::V4(_) => IpVersion::V4,
792 IfAddr::V6(_) => IpVersion::V6,
793 };
794 Some(MulticastSendTracker {
795 intf_index: index,
796 ip_version: ip_ver,
797 })
798 }
799 None => None,
800 }
801}
802
803#[derive(Debug, Clone)]
807#[non_exhaustive]
808pub enum IfKind {
809 All,
811
812 IPv4,
814
815 IPv6,
817
818 Name(String),
820
821 Addr(IpAddr),
823
824 LoopbackV4,
829
830 LoopbackV6,
832}
833
834impl IfKind {
835 fn matches(&self, intf: &Interface) -> bool {
837 match self {
838 Self::All => true,
839 Self::IPv4 => intf.ip().is_ipv4(),
840 Self::IPv6 => intf.ip().is_ipv6(),
841 Self::Name(ifname) => ifname == &intf.name,
842 Self::Addr(addr) => addr == &intf.ip(),
843 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
844 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
845 }
846 }
847}
848
849impl From<&str> for IfKind {
852 fn from(val: &str) -> Self {
853 Self::Name(val.to_string())
854 }
855}
856
857impl From<&String> for IfKind {
858 fn from(val: &String) -> Self {
859 Self::Name(val.to_string())
860 }
861}
862
863impl From<IpAddr> for IfKind {
865 fn from(val: IpAddr) -> Self {
866 Self::Addr(val)
867 }
868}
869
870pub struct IfKindVec {
872 kinds: Vec<IfKind>,
873}
874
875pub trait IntoIfKindVec {
877 fn into_vec(self) -> IfKindVec;
878}
879
880impl<T: Into<IfKind>> IntoIfKindVec for T {
881 fn into_vec(self) -> IfKindVec {
882 let if_kind: IfKind = self.into();
883 IfKindVec {
884 kinds: vec![if_kind],
885 }
886 }
887}
888
889impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
890 fn into_vec(self) -> IfKindVec {
891 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
892 IfKindVec { kinds }
893 }
894}
895
896struct IfSelection {
898 if_kind: IfKind,
900
901 selected: bool,
903}
904
905struct Zeroconf {
907 intf_socks: HashMap<Interface, MioUdpSocket>,
909
910 poll_ids: HashMap<usize, Interface>,
912
913 poll_id_count: usize,
915
916 my_services: HashMap<String, ServiceInfo>,
918
919 cache: DnsCache,
921
922 dns_registry_map: HashMap<Interface, DnsRegistry>,
924
925 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
936
937 counters: Metrics,
938
939 poller: Poll,
941
942 monitors: Vec<Sender<DaemonEvent>>,
944
945 service_name_len_max: u8,
947
948 ip_check_interval: u64,
950
951 if_selections: Vec<IfSelection>,
953
954 signal_sock: MioUdpSocket,
956
957 timers: BinaryHeap<Reverse<u64>>,
963
964 status: DaemonStatus,
965
966 pending_resolves: HashSet<String>,
968
969 resolved: HashSet<String>,
971
972 multicast_loop_v4: bool,
973
974 multicast_loop_v6: bool,
975}
976
977impl Zeroconf {
978 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
979 let my_ifaddrs = my_ip_interfaces(false);
981
982 let mut intf_socks = HashMap::new();
986 let mut dns_registry_map = HashMap::new();
987
988 for intf in my_ifaddrs {
989 let sock = match new_socket_bind(&intf, true) {
990 Ok(s) => s,
991 Err(e) => {
992 trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
993 continue;
994 }
995 };
996
997 dns_registry_map.insert(intf.clone(), DnsRegistry::new());
998
999 intf_socks.insert(intf, sock);
1000 }
1001
1002 let monitors = Vec::new();
1003 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1004 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1005
1006 let timers = BinaryHeap::new();
1007
1008 let if_selections = vec![
1010 IfSelection {
1011 if_kind: IfKind::LoopbackV4,
1012 selected: false,
1013 },
1014 IfSelection {
1015 if_kind: IfKind::LoopbackV6,
1016 selected: false,
1017 },
1018 ];
1019
1020 let status = DaemonStatus::Running;
1021
1022 Self {
1023 intf_socks,
1024 poll_ids: HashMap::new(),
1025 poll_id_count: 0,
1026 my_services: HashMap::new(),
1027 cache: DnsCache::new(),
1028 dns_registry_map,
1029 hostname_resolvers: HashMap::new(),
1030 service_queriers: HashMap::new(),
1031 retransmissions: Vec::new(),
1032 counters: HashMap::new(),
1033 poller,
1034 monitors,
1035 service_name_len_max,
1036 ip_check_interval,
1037 if_selections,
1038 signal_sock,
1039 timers,
1040 status,
1041 pending_resolves: HashSet::new(),
1042 resolved: HashSet::new(),
1043 multicast_loop_v4: true,
1044 multicast_loop_v6: true,
1045 }
1046 }
1047
1048 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1049 match daemon_opt {
1050 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1051 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1052 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1053 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1054 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1055 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1056 }
1057 }
1058
1059 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1060 for if_kind in kinds {
1061 self.if_selections.push(IfSelection {
1062 if_kind,
1063 selected: true,
1064 });
1065 }
1066
1067 self.apply_intf_selections(my_ip_interfaces(true));
1068 }
1069
1070 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1071 for if_kind in kinds {
1072 self.if_selections.push(IfSelection {
1073 if_kind,
1074 selected: false,
1075 });
1076 }
1077
1078 self.apply_intf_selections(my_ip_interfaces(true));
1079 }
1080
1081 fn set_multicast_loop_v4(&mut self, on: bool) {
1082 for (_, sock) in self.intf_socks.iter_mut() {
1083 if let Err(e) = sock.set_multicast_loop_v4(on) {
1084 debug!("failed to set multicast loop v4: {e}");
1085 }
1086 }
1087 }
1088
1089 fn set_multicast_loop_v6(&mut self, on: bool) {
1090 for (_, sock) in self.intf_socks.iter_mut() {
1091 if let Err(e) = sock.set_multicast_loop_v6(on) {
1092 debug!("failed to set multicast loop v6: {e}");
1093 }
1094 }
1095 }
1096
1097 fn notify_monitors(&mut self, event: DaemonEvent) {
1098 self.monitors.retain(|sender| {
1100 if let Err(e) = sender.try_send(event.clone()) {
1101 debug!("notify_monitors: try_send: {}", &e);
1102 if matches!(e, TrySendError::Disconnected(_)) {
1103 return false; }
1105 }
1106 true
1107 });
1108 }
1109
1110 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1112 for (_, service_info) in self.my_services.iter_mut() {
1113 if service_info.is_addr_auto() {
1114 service_info.remove_ipaddr(addr);
1115 }
1116 }
1117 }
1118
1119 fn add_poll(&mut self, intf: Interface) -> usize {
1121 Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1122 }
1123
1124 fn add_poll_impl(
1128 poll_ids: &mut HashMap<usize, Interface>,
1129 poll_id_count: &mut usize,
1130 intf: Interface,
1131 ) -> usize {
1132 let key = *poll_id_count;
1133 *poll_id_count += 1;
1134 let _ = (*poll_ids).insert(key, intf);
1135 key
1136 }
1137
1138 fn add_timer(&mut self, next_time: u64) {
1139 self.timers.push(Reverse(next_time));
1140 }
1141
1142 fn peek_earliest_timer(&self) -> Option<u64> {
1143 self.timers.peek().map(|Reverse(v)| *v)
1144 }
1145
1146 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1147 self.timers.pop().map(|Reverse(v)| v)
1148 }
1149
1150 fn pop_timers_till(&mut self, now: u64) {
1152 while let Some(Reverse(v)) = self.timers.peek() {
1153 if *v > now {
1154 break;
1155 }
1156 self.timers.pop();
1157 }
1158 }
1159
1160 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1162 let intf_count = interfaces.len();
1163 let mut intf_selections = vec![true; intf_count];
1164
1165 for selection in self.if_selections.iter() {
1167 for i in 0..intf_count {
1169 if selection.if_kind.matches(&interfaces[i]) {
1170 intf_selections[i] = selection.selected;
1171 }
1172 }
1173 }
1174
1175 let mut selected_addrs = HashSet::new();
1176 for i in 0..intf_count {
1177 if intf_selections[i] {
1178 selected_addrs.insert(interfaces[i].addr.ip());
1179 }
1180 }
1181
1182 selected_addrs
1183 }
1184
1185 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1190 let intf_count = interfaces.len();
1192 let mut intf_selections = vec![true; intf_count];
1193
1194 for selection in self.if_selections.iter() {
1196 for i in 0..intf_count {
1198 if selection.if_kind.matches(&interfaces[i]) {
1199 intf_selections[i] = selection.selected;
1200 }
1201 }
1202 }
1203
1204 for (idx, intf) in interfaces.into_iter().enumerate() {
1206 if intf_selections[idx] {
1207 if !self.intf_socks.contains_key(&intf) {
1209 debug!("apply_intf_selections: add {:?}", &intf.ip());
1210 self.add_new_interface(intf);
1211 }
1212 } else {
1213 if let Some(mut sock) = self.intf_socks.remove(&intf) {
1215 match self.poller.registry().deregister(&mut sock) {
1216 Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1217 Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1218 }
1219
1220 self.poll_ids.retain(|_, v| v != &intf);
1222
1223 self.cache.remove_addrs_on_disabled_intf(&intf);
1225 }
1226 }
1227 }
1228 }
1229
1230 fn check_ip_changes(&mut self) {
1232 let my_ifaddrs = my_ip_interfaces(true);
1234
1235 let poll_ids = &mut self.poll_ids;
1236 let poller = &mut self.poller;
1237 let deleted_addrs = self
1239 .intf_socks
1240 .iter_mut()
1241 .filter_map(|(intf, sock)| {
1242 if !my_ifaddrs.contains(intf) {
1243 if let Err(e) = poller.registry().deregister(sock) {
1244 debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1245 }
1246 poll_ids.retain(|_, v| v != intf);
1248 Some(intf.ip())
1249 } else {
1250 None
1251 }
1252 })
1253 .collect::<Vec<IpAddr>>();
1254
1255 for ip in deleted_addrs.iter() {
1257 self.del_addr_in_my_services(ip);
1258 self.notify_monitors(DaemonEvent::IpDel(*ip));
1259 }
1260
1261 self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1263
1264 self.apply_intf_selections(my_ifaddrs);
1266 }
1267
1268 fn add_new_interface(&mut self, intf: Interface) {
1269 let new_ip = intf.ip();
1271 let should_loop = if new_ip.is_ipv4() {
1272 self.multicast_loop_v4
1273 } else {
1274 self.multicast_loop_v6
1275 };
1276 let mut sock = match new_socket_bind(&intf, should_loop) {
1277 Ok(s) => s,
1278 Err(e) => {
1279 debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1280 return;
1281 }
1282 };
1283
1284 let key = self.add_poll(intf.clone());
1286 if let Err(e) =
1287 self.poller
1288 .registry()
1289 .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1290 {
1291 debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1292 return;
1293 }
1294
1295 debug!("add new interface {}: {new_ip}", intf.name);
1296 let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1297 Some(registry) => registry,
1298 None => self
1299 .dns_registry_map
1300 .entry(intf.clone())
1301 .or_insert_with(DnsRegistry::new),
1302 };
1303
1304 for (_, service_info) in self.my_services.iter_mut() {
1305 if service_info.is_addr_auto() {
1306 service_info.insert_ipaddr(new_ip);
1307
1308 if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1309 debug!(
1310 "Announce service {} on {}",
1311 service_info.get_fullname(),
1312 intf.ip()
1313 );
1314 service_info.set_status(&intf, ServiceStatus::Announced);
1315 } else {
1316 for timer in dns_registry.new_timers.drain(..) {
1317 self.timers.push(Reverse(timer));
1318 }
1319 service_info.set_status(&intf, ServiceStatus::Probing);
1320 }
1321 }
1322 }
1323
1324 self.intf_socks.insert(intf, sock);
1325
1326 self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1328 }
1329
1330 fn register_service(&mut self, mut info: ServiceInfo) {
1339 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1341 debug!("check_service_name_length: {}", &e);
1342 self.notify_monitors(DaemonEvent::Error(e));
1343 return;
1344 }
1345
1346 if info.is_addr_auto() {
1347 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1348 for addr in selected_addrs {
1349 info.insert_ipaddr(addr);
1350 }
1351 }
1352
1353 debug!("register service {:?}", &info);
1354
1355 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1356 if !outgoing_addrs.is_empty() {
1357 self.notify_monitors(DaemonEvent::Announce(
1358 info.get_fullname().to_string(),
1359 format!("{:?}", &outgoing_addrs),
1360 ));
1361 }
1362
1363 let service_fullname = info.get_fullname().to_lowercase();
1366 self.my_services.insert(service_fullname, info);
1367 }
1368
1369 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1372 let mut outgoing_addrs = Vec::new();
1373 let mut multicast_sent_trackers = HashSet::new();
1375
1376 let mut outgoing_intfs = Vec::new();
1377
1378 for (intf, sock) in self.intf_socks.iter() {
1379 if let Some(tracker) = multicast_send_tracker(intf) {
1380 if multicast_sent_trackers.contains(&tracker) {
1381 continue; }
1383 }
1384
1385 let dns_registry = match self.dns_registry_map.get_mut(intf) {
1386 Some(registry) => registry,
1387 None => self
1388 .dns_registry_map
1389 .entry(intf.clone())
1390 .or_insert_with(DnsRegistry::new),
1391 };
1392
1393 if announce_service_on_intf(dns_registry, info, intf, sock) {
1394 if let Some(tracker) = multicast_send_tracker(intf) {
1395 multicast_sent_trackers.insert(tracker);
1396 }
1397 outgoing_addrs.push(intf.ip());
1398 outgoing_intfs.push(intf.clone());
1399
1400 debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1401
1402 info.set_status(intf, ServiceStatus::Announced);
1403 } else {
1404 for timer in dns_registry.new_timers.drain(..) {
1405 self.timers.push(Reverse(timer));
1406 }
1407 info.set_status(intf, ServiceStatus::Probing);
1408 }
1409 }
1410
1411 let next_time = current_time_millis() + 1000;
1415 for intf in outgoing_intfs {
1416 self.add_retransmission(
1417 next_time,
1418 Command::RegisterResend(info.get_fullname().to_string(), intf),
1419 );
1420 }
1421
1422 outgoing_addrs
1423 }
1424
1425 fn probing_handler(&mut self) {
1427 let now = current_time_millis();
1428
1429 for (intf, sock) in self.intf_socks.iter() {
1430 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1431 continue;
1432 };
1433
1434 let mut expired_probe_names = Vec::new();
1435 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1436
1437 for (name, probe) in dns_registry.probing.iter_mut() {
1438 if now >= probe.next_send {
1439 if probe.expired(now) {
1440 expired_probe_names.push(name.clone());
1442 } else {
1443 out.add_question(name, RRType::ANY);
1444
1445 for record in probe.records.iter() {
1453 out.add_authority(record.clone());
1454 }
1455
1456 probe.update_next_send(now);
1457
1458 self.timers.push(Reverse(probe.next_send));
1460 }
1461 }
1462 }
1463
1464 if !out.questions().is_empty() {
1466 debug!("sending out probing of {} questions", out.questions().len());
1467 send_dns_outgoing(&out, intf, sock);
1468 }
1469
1470 let mut waiting_services = HashSet::new();
1471
1472 for name in expired_probe_names {
1473 let Some(probe) = dns_registry.probing.remove(&name) else {
1474 continue;
1475 };
1476
1477 for record in probe.records.iter() {
1479 if let Some(new_name) = record.get_record().get_new_name() {
1480 dns_registry
1481 .name_changes
1482 .insert(name.clone(), new_name.to_string());
1483
1484 let event = DnsNameChange {
1485 original: record.get_record().get_original_name().to_string(),
1486 new_name: new_name.to_string(),
1487 rr_type: record.get_type(),
1488 intf_name: intf.name.to_string(),
1489 };
1490 notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1491 }
1492 }
1493
1494 debug!(
1496 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1497 probe.records.len(),
1498 probe.waiting_services.len(),
1499 );
1500
1501 if !probe.records.is_empty() {
1503 match dns_registry.active.get_mut(&name) {
1504 Some(records) => {
1505 records.extend(probe.records);
1506 }
1507 None => {
1508 dns_registry.active.insert(name, probe.records);
1509 }
1510 }
1511
1512 waiting_services.extend(probe.waiting_services);
1513 }
1514 }
1515
1516 for service_name in waiting_services {
1518 debug!(
1519 "try to announce service {service_name} on intf {}",
1520 intf.ip()
1521 );
1522 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1524 if info.get_status(intf) == ServiceStatus::Announced {
1525 debug!("service {} already announced", info.get_fullname());
1526 continue;
1527 }
1528
1529 if announce_service_on_intf(dns_registry, info, intf, sock) {
1530 let next_time = now + 1000;
1531 let command =
1532 Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1533 self.retransmissions.push(ReRun { next_time, command });
1534 self.timers.push(Reverse(next_time));
1535
1536 let fullname = match dns_registry.name_changes.get(&service_name) {
1537 Some(new_name) => new_name.to_string(),
1538 None => service_name.to_string(),
1539 };
1540
1541 let mut hostname = info.get_hostname();
1542 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1543 hostname = new_name;
1544 }
1545
1546 debug!("wake up: announce service {} on {}", fullname, intf.ip());
1547 notify_monitors(
1548 &mut self.monitors,
1549 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1550 );
1551
1552 info.set_status(intf, ServiceStatus::Announced);
1553 }
1554 }
1555 }
1556 }
1557 }
1558
1559 fn unregister_service(
1560 &self,
1561 info: &ServiceInfo,
1562 intf: &Interface,
1563 sock: &MioUdpSocket,
1564 ) -> Vec<u8> {
1565 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1566 out.add_answer_at_time(
1567 DnsPointer::new(
1568 info.get_type(),
1569 RRType::PTR,
1570 CLASS_IN,
1571 0,
1572 info.get_fullname().to_string(),
1573 ),
1574 0,
1575 );
1576
1577 if let Some(sub) = info.get_subtype() {
1578 trace!("Adding subdomain {}", sub);
1579 out.add_answer_at_time(
1580 DnsPointer::new(
1581 sub,
1582 RRType::PTR,
1583 CLASS_IN,
1584 0,
1585 info.get_fullname().to_string(),
1586 ),
1587 0,
1588 );
1589 }
1590
1591 out.add_answer_at_time(
1592 DnsSrv::new(
1593 info.get_fullname(),
1594 CLASS_IN | CLASS_CACHE_FLUSH,
1595 0,
1596 info.get_priority(),
1597 info.get_weight(),
1598 info.get_port(),
1599 info.get_hostname().to_string(),
1600 ),
1601 0,
1602 );
1603 out.add_answer_at_time(
1604 DnsTxt::new(
1605 info.get_fullname(),
1606 CLASS_IN | CLASS_CACHE_FLUSH,
1607 0,
1608 info.generate_txt(),
1609 ),
1610 0,
1611 );
1612
1613 for address in info.get_addrs_on_intf(intf) {
1614 out.add_answer_at_time(
1615 DnsAddress::new(
1616 info.get_hostname(),
1617 ip_address_rr_type(&address),
1618 CLASS_IN | CLASS_CACHE_FLUSH,
1619 0,
1620 address,
1621 intf.into(),
1622 ),
1623 0,
1624 );
1625 }
1626
1627 send_dns_outgoing(&out, intf, sock).remove(0)
1629 }
1630
1631 fn add_hostname_resolver(
1635 &mut self,
1636 hostname: String,
1637 listener: Sender<HostnameResolutionEvent>,
1638 timeout: Option<u64>,
1639 ) {
1640 let real_timeout = timeout.map(|t| current_time_millis() + t);
1641 self.hostname_resolvers
1642 .insert(hostname.to_lowercase(), (listener, real_timeout));
1643 if let Some(t) = real_timeout {
1644 self.add_timer(t);
1645 }
1646 }
1647
1648 fn send_query(&self, name: &str, qtype: RRType) {
1650 self.send_query_vec(&[(name, qtype)]);
1651 }
1652
1653 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1655 trace!("Sending query questions: {:?}", questions);
1656 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1657 let now = current_time_millis();
1658
1659 for (name, qtype) in questions {
1660 out.add_question(name, *qtype);
1661
1662 for record in self.cache.get_known_answers(name, *qtype, now) {
1663 trace!("add known answer: {:?}", record);
1671 let mut new_record = record.clone();
1672 new_record.get_record_mut().update_ttl(now);
1673 out.add_answer_box(new_record);
1674 }
1675 }
1676
1677 let mut multicast_sent_trackers = HashSet::new();
1679 for (intf, sock) in self.intf_socks.iter() {
1680 if let Some(tracker) = multicast_send_tracker(intf) {
1681 if multicast_sent_trackers.contains(&tracker) {
1682 continue; }
1684 multicast_sent_trackers.insert(tracker);
1685 }
1686 send_dns_outgoing(&out, intf, sock);
1687 }
1688 }
1689
1690 fn handle_read(&mut self, intf: &Interface) -> bool {
1695 let sock = match self.intf_socks.get_mut(intf) {
1696 Some(if_sock) => if_sock,
1697 None => return false,
1698 };
1699 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1700
1701 let sz = match sock.recv(&mut buf) {
1708 Ok(sz) => sz,
1709 Err(e) => {
1710 if e.kind() != std::io::ErrorKind::WouldBlock {
1711 debug!("listening socket read failed: {}", e);
1712 }
1713 return false;
1714 }
1715 };
1716
1717 trace!("received {} bytes at IP: {}", sz, intf.ip());
1718
1719 if sz == 0 {
1721 debug!("socket {:?} was likely shutdown", &sock);
1722 if let Err(e) = self.poller.registry().deregister(sock) {
1723 debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1724 }
1725
1726 let should_loop = if intf.ip().is_ipv4() {
1728 self.multicast_loop_v4
1729 } else {
1730 self.multicast_loop_v6
1731 };
1732 match new_socket_bind(intf, should_loop) {
1733 Ok(new_sock) => {
1734 trace!("reset socket for IP {}", intf.ip());
1735 self.intf_socks.insert(intf.clone(), new_sock);
1736 }
1737 Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1738 }
1739
1740 return false;
1741 }
1742
1743 buf.truncate(sz); match DnsIncoming::new(buf, intf.into()) {
1746 Ok(msg) => {
1747 if msg.is_query() {
1748 self.handle_query(msg, intf);
1749 } else if msg.is_response() {
1750 self.handle_response(msg, intf);
1751 } else {
1752 debug!("Invalid message: not query and not response");
1753 }
1754 }
1755 Err(e) => debug!("Invalid incoming DNS message: {}", e),
1756 }
1757
1758 true
1759 }
1760
1761 fn query_unresolved(&mut self, instance: &str) -> bool {
1763 if !valid_instance_name(instance) {
1764 trace!("instance name {} not valid", instance);
1765 return false;
1766 }
1767
1768 if let Some(records) = self.cache.get_srv(instance) {
1769 for record in records {
1770 if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1771 if self.cache.get_addr(srv.host()).is_none() {
1772 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1773 return true;
1774 }
1775 }
1776 }
1777 } else {
1778 self.send_query(instance, RRType::ANY);
1779 return true;
1780 }
1781
1782 false
1783 }
1784
1785 fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1788 let mut resolved: HashSet<String> = HashSet::new();
1789 let mut unresolved: HashSet<String> = HashSet::new();
1790
1791 if let Some(records) = self.cache.get_ptr(ty_domain) {
1792 for record in records.iter() {
1793 if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1794 let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1795 Ok(ok) => ok,
1796 Err(err) => {
1797 debug!("Error while creating service info from cache: {}", err);
1798 continue;
1799 }
1800 };
1801
1802 match sender.send(ServiceEvent::ServiceFound(
1803 ty_domain.to_string(),
1804 ptr.alias().to_string(),
1805 )) {
1806 Ok(()) => debug!("send service found {}", ptr.alias()),
1807 Err(e) => {
1808 debug!("failed to send service found: {}", e);
1809 continue;
1810 }
1811 }
1812
1813 if info.is_ready() {
1814 resolved.insert(ptr.alias().to_string());
1815 match sender.send(ServiceEvent::ServiceResolved(info)) {
1816 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1817 Err(e) => debug!("failed to send service resolved: {}", e),
1818 }
1819 } else {
1820 unresolved.insert(ptr.alias().to_string());
1821 }
1822 }
1823 }
1824 }
1825
1826 for instance in resolved.drain() {
1827 self.pending_resolves.remove(&instance);
1828 self.resolved.insert(instance);
1829 }
1830
1831 for instance in unresolved.drain() {
1832 self.add_pending_resolve(instance);
1833 }
1834 }
1835
1836 fn query_cache_for_hostname(
1839 &mut self,
1840 hostname: &str,
1841 sender: Sender<HostnameResolutionEvent>,
1842 ) {
1843 let addresses_map = self.cache.get_addresses_for_host(hostname);
1844 for (name, addresses) in addresses_map {
1845 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1846 Ok(()) => trace!("sent hostname addresses found"),
1847 Err(e) => debug!("failed to send hostname addresses found: {}", e),
1848 }
1849 }
1850 }
1851
1852 fn add_pending_resolve(&mut self, instance: String) {
1853 if !self.pending_resolves.contains(&instance) {
1854 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1855 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1856 self.pending_resolves.insert(instance);
1857 }
1858 }
1859
1860 fn create_service_info_from_cache(
1861 &self,
1862 ty_domain: &str,
1863 fullname: &str,
1864 ) -> Result<ServiceInfo> {
1865 let my_name = {
1866 let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1867 name.strip_suffix('.').unwrap_or(name).to_string()
1868 };
1869
1870 let now = current_time_millis();
1871 let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1872
1873 if let Some(subtype) = self.cache.get_subtype(fullname) {
1875 trace!(
1876 "ty_domain: {} found subtype {} for instance: {}",
1877 ty_domain,
1878 subtype,
1879 fullname
1880 );
1881 if info.get_subtype().is_none() {
1882 info.set_subtype(subtype.clone());
1883 }
1884 }
1885
1886 if let Some(records) = self.cache.get_srv(fullname) {
1888 if let Some(answer) = records.first() {
1889 if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1890 info.set_hostname(dns_srv.host().to_string());
1891 info.set_port(dns_srv.port());
1892 }
1893 }
1894 }
1895
1896 if let Some(records) = self.cache.get_txt(fullname) {
1898 if let Some(record) = records.first() {
1899 if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1900 info.set_properties_from_txt(dns_txt.text());
1901 }
1902 }
1903 }
1904
1905 if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1907 for answer in records.iter() {
1908 if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1909 if dns_a.get_record().is_expired(now) {
1910 trace!("Addr expired: {}", dns_a.address());
1911 } else {
1912 info.insert_ipaddr(dns_a.address());
1913 }
1914 }
1915 }
1916 }
1917
1918 Ok(info)
1919 }
1920
1921 fn handle_poller_events(&mut self, events: &mio::Events) {
1922 for ev in events.iter() {
1923 trace!("event received with key {:?}", ev.token());
1924 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1925 self.signal_sock_drain();
1927
1928 if let Err(e) = self.poller.registry().reregister(
1929 &mut self.signal_sock,
1930 ev.token(),
1931 mio::Interest::READABLE,
1932 ) {
1933 debug!("failed to modify poller for signal socket: {}", e);
1934 }
1935 continue; }
1937
1938 let intf = match self.poll_ids.get(&ev.token().0) {
1940 Some(interface) => interface.clone(),
1941 None => {
1942 debug!("Ip for event key {} not found", ev.token().0);
1943 break;
1944 }
1945 };
1946 while self.handle_read(&intf) {}
1947
1948 if let Some(sock) = self.intf_socks.get_mut(&intf) {
1950 if let Err(e) =
1951 self.poller
1952 .registry()
1953 .reregister(sock, ev.token(), mio::Interest::READABLE)
1954 {
1955 debug!("modify poller for interface {:?}: {}", &intf, e);
1956 break;
1957 }
1958 }
1959 }
1960 }
1961
1962 fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1965 trace!(
1966 "handle_response: {} answers {} authorities {} additionals",
1967 msg.answers().len(),
1968 &msg.authorities().len(),
1969 &msg.num_additionals()
1970 );
1971 let now = current_time_millis();
1972
1973 let mut record_predicate = |record: &DnsRecordBox| {
1975 if !record.get_record().is_expired(now) {
1976 return true;
1977 }
1978
1979 debug!("record is expired, removing it from cache.");
1980 if self.cache.remove(record) {
1981 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1983 call_service_listener(
1984 &self.service_queriers,
1985 dns_ptr.get_name(),
1986 ServiceEvent::ServiceRemoved(
1987 dns_ptr.get_name().to_string(),
1988 dns_ptr.alias().to_string(),
1989 ),
1990 );
1991 }
1992 }
1993 false
1994 };
1995 msg.answers_mut().retain(&mut record_predicate);
1996 msg.authorities_mut().retain(&mut record_predicate);
1997 msg.additionals_mut().retain(&mut record_predicate);
1998
1999 self.conflict_handler(&msg, intf);
2001
2002 let mut is_for_us = true; for answer in msg.answers() {
2009 if answer.get_type() == RRType::PTR {
2010 if self.service_queriers.contains_key(answer.get_name()) {
2011 is_for_us = true;
2012 break; } else {
2014 is_for_us = false;
2015 }
2016 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2017 let answer_lowercase = answer.get_name().to_lowercase();
2019 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2020 is_for_us = true;
2021 break; }
2023 }
2024 }
2025
2026 struct InstanceChange {
2028 ty: RRType, name: String, }
2031
2032 let mut changes = Vec::new();
2040 let mut timers = Vec::new();
2041 for record in msg.all_records() {
2042 match self
2043 .cache
2044 .add_or_update(intf, record, &mut timers, is_for_us)
2045 {
2046 Some((dns_record, true)) => {
2047 timers.push(dns_record.get_record().get_expire_time());
2048 timers.push(dns_record.get_record().get_refresh_time());
2049
2050 let ty = dns_record.get_type();
2051 let name = dns_record.get_name();
2052 if ty == RRType::PTR {
2053 if self.service_queriers.contains_key(name) {
2054 timers.push(dns_record.get_record().get_refresh_time());
2055 }
2056
2057 if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2059 call_service_listener(
2060 &self.service_queriers,
2061 name,
2062 ServiceEvent::ServiceFound(
2063 name.to_string(),
2064 dns_ptr.alias().to_string(),
2065 ),
2066 );
2067 changes.push(InstanceChange {
2068 ty,
2069 name: dns_ptr.alias().to_string(),
2070 });
2071 }
2072 } else {
2073 changes.push(InstanceChange {
2074 ty,
2075 name: name.to_string(),
2076 });
2077 }
2078 }
2079 Some((dns_record, false)) => {
2080 timers.push(dns_record.get_record().get_expire_time());
2081 timers.push(dns_record.get_record().get_refresh_time());
2082 }
2083 _ => {}
2084 }
2085 }
2086
2087 for t in timers {
2089 self.add_timer(t);
2090 }
2091
2092 for change in changes
2094 .iter()
2095 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2096 {
2097 let addr_map = self.cache.get_addresses_for_host(&change.name);
2098 for (name, addresses) in addr_map {
2099 call_hostname_resolution_listener(
2100 &self.hostname_resolvers,
2101 &change.name,
2102 HostnameResolutionEvent::AddressesFound(name, addresses),
2103 )
2104 }
2105 }
2106
2107 let mut updated_instances = HashSet::new();
2109 for update in changes {
2110 match update.ty {
2111 RRType::PTR | RRType::SRV | RRType::TXT => {
2112 updated_instances.insert(update.name);
2113 }
2114 RRType::A | RRType::AAAA => {
2115 let instances = self.cache.get_instances_on_host(&update.name);
2116 updated_instances.extend(instances);
2117 }
2118 _ => {}
2119 }
2120 }
2121
2122 self.resolve_updated_instances(&updated_instances);
2123 }
2124
2125 fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2126 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2127 return;
2128 };
2129
2130 for answer in msg.answers().iter() {
2131 let mut new_records = Vec::new();
2132
2133 let name = answer.get_name();
2134 let Some(probe) = dns_registry.probing.get_mut(name) else {
2135 continue;
2136 };
2137
2138 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2140 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2141 if !valid_ip_on_intf(&answer_addr.address(), intf) {
2142 debug!(
2143 "conflict handler: answer addr {:?} not in the subnet of {:?}",
2144 answer_addr, intf
2145 );
2146 continue;
2147 }
2148 }
2149
2150 let any_match = probe.records.iter().any(|r| {
2153 r.get_type() == answer.get_type()
2154 && r.get_class() == answer.get_class()
2155 && r.rrdata_match(answer.as_ref())
2156 });
2157 if any_match {
2158 continue; }
2160 }
2161
2162 probe.records.retain(|record| {
2163 if record.get_type() == answer.get_type()
2164 && record.get_class() == answer.get_class()
2165 && !record.rrdata_match(answer.as_ref())
2166 {
2167 debug!(
2168 "found conflict name: '{name}' record: {}: {} PEER: {}",
2169 record.get_type(),
2170 record.rdata_print(),
2171 answer.rdata_print()
2172 );
2173
2174 let mut new_record = record.clone();
2177 let new_name = match record.get_type() {
2178 RRType::A => hostname_change(name),
2179 RRType::AAAA => hostname_change(name),
2180 _ => name_change(name),
2181 };
2182 new_record.get_record_mut().set_new_name(new_name);
2183 new_records.push(new_record);
2184 return false; }
2186
2187 true
2188 });
2189
2190 let create_time = current_time_millis() + fastrand::u64(0..250);
2197
2198 let waiting_services = probe.waiting_services.clone();
2199
2200 for record in new_records {
2201 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2202 self.timers.push(Reverse(create_time));
2203 }
2204
2205 dns_registry.name_changes.insert(
2207 record.get_record().get_original_name().to_string(),
2208 record.get_name().to_string(),
2209 );
2210
2211 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2212 Some(p) => p,
2213 None => {
2214 let new_probe = dns_registry
2215 .probing
2216 .entry(record.get_name().to_string())
2217 .or_insert_with(|| {
2218 debug!("conflict handler: new probe of {}", record.get_name());
2219 Probe::new(create_time)
2220 });
2221 self.timers.push(Reverse(new_probe.next_send));
2222 new_probe
2223 }
2224 };
2225
2226 debug!(
2227 "insert record with new name '{}' {} into probe",
2228 record.get_name(),
2229 record.get_type()
2230 );
2231 new_probe.insert_record(record);
2232
2233 new_probe.waiting_services.extend(waiting_services.clone());
2234 }
2235 }
2236 }
2237
2238 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2245 let mut resolved: HashSet<String> = HashSet::new();
2246 let mut unresolved: HashSet<String> = HashSet::new();
2247 let mut removed_instances = HashMap::new();
2248
2249 for (ty_domain, records) in self.cache.all_ptr().iter() {
2250 if !self.service_queriers.contains_key(ty_domain) {
2251 continue;
2253 }
2254
2255 for record in records.iter() {
2256 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2257 if updated_instances.contains(dns_ptr.alias()) {
2258 if let Ok(info) =
2259 self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2260 {
2261 if info.is_ready() {
2262 debug!("call queriers to resolve {}", dns_ptr.alias());
2263 resolved.insert(dns_ptr.alias().to_string());
2264 call_service_listener(
2265 &self.service_queriers,
2266 ty_domain,
2267 ServiceEvent::ServiceResolved(info),
2268 );
2269 } else {
2270 if self.resolved.remove(dns_ptr.alias()) {
2271 removed_instances
2272 .entry(ty_domain.to_string())
2273 .or_insert_with(HashSet::new)
2274 .insert(dns_ptr.alias().to_string());
2275 }
2276 unresolved.insert(dns_ptr.alias().to_string());
2277 }
2278 }
2279 }
2280 }
2281 }
2282 }
2283
2284 for instance in resolved.drain() {
2285 self.pending_resolves.remove(&instance);
2286 self.resolved.insert(instance);
2287 }
2288
2289 for instance in unresolved.drain() {
2290 self.add_pending_resolve(instance);
2291 }
2292
2293 self.notify_service_removal(removed_instances);
2294 }
2295
2296 fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2298 let sock = match self.intf_socks.get(intf) {
2299 Some(sock) => sock,
2300 None => return,
2301 };
2302 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2303
2304 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2307
2308 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2309 debug!("missing dns registry for intf {}", intf.ip());
2310 return;
2311 };
2312
2313 for question in msg.questions().iter() {
2314 trace!("query question: {:?}", &question);
2315
2316 let qtype = question.entry_type();
2317
2318 if qtype == RRType::PTR {
2319 for service in self.my_services.values() {
2320 if service.get_status(intf) != ServiceStatus::Announced {
2321 continue;
2322 }
2323
2324 if question.entry_name() == service.get_type()
2325 || service
2326 .get_subtype()
2327 .as_ref()
2328 .is_some_and(|v| v == question.entry_name())
2329 {
2330 add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2331 } else if question.entry_name() == META_QUERY {
2332 let ptr_added = out.add_answer(
2333 &msg,
2334 DnsPointer::new(
2335 question.entry_name(),
2336 RRType::PTR,
2337 CLASS_IN,
2338 service.get_other_ttl(),
2339 service.get_type().to_string(),
2340 ),
2341 );
2342 if !ptr_added {
2343 trace!("answer was not added for meta-query {:?}", &question);
2344 }
2345 }
2346 }
2347 } else {
2348 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2350 let probe_name = question.entry_name();
2351
2352 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2353 let now = current_time_millis();
2354
2355 if probe.start_time < now {
2359 let incoming_records: Vec<_> = msg
2360 .authorities()
2361 .iter()
2362 .filter(|r| r.get_name() == probe_name)
2363 .collect();
2364
2365 probe.tiebreaking(&incoming_records, now, probe_name);
2366 }
2367 }
2368 }
2369
2370 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2371 for service in self.my_services.values() {
2372 if service.get_status(intf) != ServiceStatus::Announced {
2373 continue;
2374 }
2375
2376 let service_hostname =
2377 match dns_registry.name_changes.get(service.get_hostname()) {
2378 Some(new_name) => new_name,
2379 None => service.get_hostname(),
2380 };
2381
2382 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2383 let intf_addrs = service.get_addrs_on_intf(intf);
2384 if intf_addrs.is_empty()
2385 && (qtype == RRType::A || qtype == RRType::AAAA)
2386 {
2387 let t = match qtype {
2388 RRType::A => "TYPE_A",
2389 RRType::AAAA => "TYPE_AAAA",
2390 _ => "invalid_type",
2391 };
2392 trace!(
2393 "Cannot find valid addrs for {} response on intf {:?}",
2394 t,
2395 &intf
2396 );
2397 return;
2398 }
2399 for address in intf_addrs {
2400 out.add_answer(
2401 &msg,
2402 DnsAddress::new(
2403 service_hostname,
2404 ip_address_rr_type(&address),
2405 CLASS_IN | CLASS_CACHE_FLUSH,
2406 service.get_host_ttl(),
2407 address,
2408 intf.into(),
2409 ),
2410 );
2411 }
2412 }
2413 }
2414 }
2415
2416 let query_name = question.entry_name().to_lowercase();
2417 let service_opt = self
2418 .my_services
2419 .iter()
2420 .find(|(k, _v)| {
2421 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2422 Some(new_name) => new_name,
2423 None => k,
2424 };
2425 service_name == &query_name
2426 })
2427 .map(|(_, v)| v);
2428
2429 let Some(service) = service_opt else {
2430 continue;
2431 };
2432
2433 if service.get_status(intf) != ServiceStatus::Announced {
2434 continue;
2435 }
2436
2437 if qtype == RRType::SRV || qtype == RRType::ANY {
2438 out.add_answer(
2439 &msg,
2440 DnsSrv::new(
2441 question.entry_name(),
2442 CLASS_IN | CLASS_CACHE_FLUSH,
2443 service.get_host_ttl(),
2444 service.get_priority(),
2445 service.get_weight(),
2446 service.get_port(),
2447 service.get_hostname().to_string(),
2448 ),
2449 );
2450 }
2451
2452 if qtype == RRType::TXT || qtype == RRType::ANY {
2453 out.add_answer(
2454 &msg,
2455 DnsTxt::new(
2456 question.entry_name(),
2457 CLASS_IN | CLASS_CACHE_FLUSH,
2458 service.get_host_ttl(),
2459 service.generate_txt(),
2460 ),
2461 );
2462 }
2463
2464 if qtype == RRType::SRV {
2465 let intf_addrs = service.get_addrs_on_intf(intf);
2466 if intf_addrs.is_empty() {
2467 debug!(
2468 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2469 &intf
2470 );
2471 return;
2472 }
2473 for address in intf_addrs {
2474 out.add_additional_answer(DnsAddress::new(
2475 service.get_hostname(),
2476 ip_address_rr_type(&address),
2477 CLASS_IN | CLASS_CACHE_FLUSH,
2478 service.get_host_ttl(),
2479 address,
2480 intf.into(),
2481 ));
2482 }
2483 }
2484 }
2485 }
2486
2487 if !out.answers_count() > 0 {
2488 out.set_id(msg.id());
2489 send_dns_outgoing(&out, intf, sock);
2490
2491 self.increase_counter(Counter::Respond, 1);
2492 self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2493 }
2494
2495 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2496 }
2497
2498 fn increase_counter(&mut self, counter: Counter, count: i64) {
2500 let key = counter.to_string();
2501 match self.counters.get_mut(&key) {
2502 Some(v) => *v += count,
2503 None => {
2504 self.counters.insert(key, count);
2505 }
2506 }
2507 }
2508
2509 fn set_counter(&mut self, counter: Counter, count: i64) {
2511 let key = counter.to_string();
2512 self.counters.insert(key, count);
2513 }
2514
2515 fn signal_sock_drain(&self) {
2516 let mut signal_buf = [0; 1024];
2517
2518 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2520 trace!(
2521 "signal socket recvd: {}",
2522 String::from_utf8_lossy(&signal_buf[0..sz])
2523 );
2524 }
2525 }
2526
2527 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2528 self.retransmissions.push(ReRun { next_time, command });
2529 self.add_timer(next_time);
2530 }
2531
2532 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2534 for (ty_domain, sender) in self.service_queriers.iter() {
2535 if let Some(instances) = expired.get(ty_domain) {
2536 for instance_name in instances {
2537 let event = ServiceEvent::ServiceRemoved(
2538 ty_domain.to_string(),
2539 instance_name.to_string(),
2540 );
2541 match sender.send(event) {
2542 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2543 Err(e) => debug!("Failed to send event: {}", e),
2544 }
2545 }
2546 }
2547 }
2548 }
2549
2550 fn exec_command(&mut self, command: Command, repeating: bool) {
2554 match command {
2555 Command::Browse(ty, next_delay, listener) => {
2556 self.exec_command_browse(repeating, ty, next_delay, listener);
2557 }
2558
2559 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2560 self.exec_command_resolve_hostname(
2561 repeating, hostname, next_delay, listener, timeout,
2562 );
2563 }
2564
2565 Command::Register(service_info) => {
2566 self.register_service(service_info);
2567 self.increase_counter(Counter::Register, 1);
2568 }
2569
2570 Command::RegisterResend(fullname, intf) => {
2571 trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2572 self.exec_command_register_resend(fullname, intf);
2573 }
2574
2575 Command::Unregister(fullname, resp_s) => {
2576 trace!("unregister service {} repeat {}", &fullname, &repeating);
2577 self.exec_command_unregister(repeating, fullname, resp_s);
2578 }
2579
2580 Command::UnregisterResend(packet, ip) => {
2581 self.exec_command_unregister_resend(packet, ip);
2582 }
2583
2584 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2585
2586 Command::StopResolveHostname(hostname) => {
2587 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2588 }
2589
2590 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2591
2592 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2593
2594 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2595 Ok(()) => trace!("Sent status to the client"),
2596 Err(e) => debug!("Failed to send status: {}", e),
2597 },
2598
2599 Command::Monitor(resp_s) => {
2600 self.monitors.push(resp_s);
2601 }
2602
2603 Command::SetOption(daemon_opt) => {
2604 self.process_set_option(daemon_opt);
2605 }
2606
2607 Command::GetOption(resp_s) => {
2608 let val = DaemonOptionVal {
2609 _service_name_len_max: self.service_name_len_max,
2610 ip_check_interval: self.ip_check_interval,
2611 };
2612 if let Err(e) = resp_s.send(val) {
2613 debug!("Failed to send options: {}", e);
2614 }
2615 }
2616
2617 Command::Verify(instance_fullname, timeout) => {
2618 self.exec_command_verify(instance_fullname, timeout, repeating);
2619 }
2620
2621 _ => {
2622 debug!("unexpected command: {:?}", &command);
2623 }
2624 }
2625 }
2626
2627 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2628 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2629 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2630 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2631 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2632 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2633 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2634 self.set_counter(Counter::Timer, self.timers.len() as i64);
2635
2636 let dns_registry_probe_count: usize = self
2637 .dns_registry_map
2638 .values()
2639 .map(|r| r.probing.len())
2640 .sum();
2641 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2642
2643 let dns_registry_active_count: usize = self
2644 .dns_registry_map
2645 .values()
2646 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2647 .sum();
2648 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2649
2650 let dns_registry_timer_count: usize = self
2651 .dns_registry_map
2652 .values()
2653 .map(|r| r.new_timers.len())
2654 .sum();
2655 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2656
2657 let dns_registry_name_change_count: usize = self
2658 .dns_registry_map
2659 .values()
2660 .map(|r| r.name_changes.len())
2661 .sum();
2662 self.set_counter(
2663 Counter::DnsRegistryNameChange,
2664 dns_registry_name_change_count as i64,
2665 );
2666
2667 if let Err(e) = resp_s.send(self.counters.clone()) {
2669 debug!("Failed to send metrics: {}", e);
2670 }
2671 }
2672
2673 fn exec_command_browse(
2674 &mut self,
2675 repeating: bool,
2676 ty: String,
2677 next_delay: u32,
2678 listener: Sender<ServiceEvent>,
2679 ) {
2680 let pretty_addrs: Vec<String> = self
2681 .intf_socks
2682 .keys()
2683 .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2684 .collect();
2685
2686 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2687 "{ty} on {} interfaces [{}]",
2688 pretty_addrs.len(),
2689 pretty_addrs.join(", ")
2690 ))) {
2691 debug!(
2692 "Failed to send SearchStarted({})(repeating:{}): {}",
2693 &ty, repeating, e
2694 );
2695 return;
2696 }
2697 if !repeating {
2698 self.service_queriers.insert(ty.clone(), listener.clone());
2702
2703 self.query_cache_for_service(&ty, &listener);
2705 }
2706
2707 self.send_query(&ty, RRType::PTR);
2708 self.increase_counter(Counter::Browse, 1);
2709
2710 let next_time = current_time_millis() + (next_delay * 1000) as u64;
2711 let max_delay = 60 * 60;
2712 let delay = cmp::min(next_delay * 2, max_delay);
2713 self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2714 }
2715
2716 fn exec_command_resolve_hostname(
2717 &mut self,
2718 repeating: bool,
2719 hostname: String,
2720 next_delay: u32,
2721 listener: Sender<HostnameResolutionEvent>,
2722 timeout: Option<u64>,
2723 ) {
2724 let addr_list: Vec<_> = self.intf_socks.keys().collect();
2725 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2726 "{} on addrs {:?}",
2727 &hostname, &addr_list
2728 ))) {
2729 debug!(
2730 "Failed to send ResolveStarted({})(repeating:{}): {}",
2731 &hostname, repeating, e
2732 );
2733 return;
2734 }
2735 if !repeating {
2736 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2737 self.query_cache_for_hostname(&hostname, listener.clone());
2739 }
2740
2741 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2742 self.increase_counter(Counter::ResolveHostname, 1);
2743
2744 let now = current_time_millis();
2745 let next_time = now + u64::from(next_delay) * 1000;
2746 let max_delay = 60 * 60;
2747 let delay = cmp::min(next_delay * 2, max_delay);
2748
2749 if self
2751 .hostname_resolvers
2752 .get(&hostname)
2753 .and_then(|(_sender, timeout)| *timeout)
2754 .map(|timeout| next_time < timeout)
2755 .unwrap_or(true)
2756 {
2757 self.add_retransmission(
2758 next_time,
2759 Command::ResolveHostname(hostname, delay, listener, None),
2760 );
2761 }
2762 }
2763
2764 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2765 let pending_query = self.query_unresolved(&instance);
2766 let max_try = 3;
2767 if pending_query && try_count < max_try {
2768 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2771 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2772 }
2773 }
2774
2775 fn exec_command_unregister(
2776 &mut self,
2777 repeating: bool,
2778 fullname: String,
2779 resp_s: Sender<UnregisterStatus>,
2780 ) {
2781 let response = match self.my_services.remove_entry(&fullname) {
2782 None => {
2783 debug!("unregister: cannot find such service {}", &fullname);
2784 UnregisterStatus::NotFound
2785 }
2786 Some((_k, info)) => {
2787 let mut timers = Vec::new();
2788 let mut multicast_sent_trackers = HashSet::new();
2790
2791 for (intf, sock) in self.intf_socks.iter() {
2792 if let Some(tracker) = multicast_send_tracker(intf) {
2793 if multicast_sent_trackers.contains(&tracker) {
2794 continue; }
2796 multicast_sent_trackers.insert(tracker);
2797 }
2798 let packet = self.unregister_service(&info, intf, sock);
2799 if !repeating && !packet.is_empty() {
2801 let next_time = current_time_millis() + 120;
2802 self.retransmissions.push(ReRun {
2803 next_time,
2804 command: Command::UnregisterResend(packet, intf.clone()),
2805 });
2806 timers.push(next_time);
2807 }
2808 }
2809
2810 for t in timers {
2811 self.add_timer(t);
2812 }
2813
2814 self.increase_counter(Counter::Unregister, 1);
2815 UnregisterStatus::OK
2816 }
2817 };
2818 if let Err(e) = resp_s.send(response) {
2819 debug!("unregister: failed to send response: {}", e);
2820 }
2821 }
2822
2823 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2824 if let Some(sock) = self.intf_socks.get(&intf) {
2825 debug!("UnregisterResend from {}", &intf.ip());
2826 multicast_on_intf(&packet[..], &intf, sock);
2827 self.increase_counter(Counter::UnregisterResend, 1);
2828 }
2829 }
2830
2831 fn exec_command_stop_browse(&mut self, ty_domain: String) {
2832 match self.service_queriers.remove_entry(&ty_domain) {
2833 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2834 Some((ty, sender)) => {
2835 trace!("StopBrowse: removed queryer for {}", &ty);
2837 let mut i = 0;
2838 while i < self.retransmissions.len() {
2839 if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2840 if t == &ty {
2841 self.retransmissions.remove(i);
2842 trace!("StopBrowse: removed retransmission for {}", &ty);
2843 continue;
2844 }
2845 }
2846 i += 1;
2847 }
2848
2849 self.cache.remove_service_type(&ty_domain);
2851
2852 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2854 Ok(()) => trace!("Sent SearchStopped to the listener"),
2855 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2856 }
2857 }
2858 }
2859 }
2860
2861 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2862 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2863 trace!("StopResolve: removed queryer for {}", &host);
2865 let mut i = 0;
2866 while i < self.retransmissions.len() {
2867 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2868 if t == &host {
2869 self.retransmissions.remove(i);
2870 trace!("StopResolve: removed retransmission for {}", &host);
2871 continue;
2872 }
2873 }
2874 i += 1;
2875 }
2876
2877 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2879 Ok(()) => trace!("Sent SearchStopped to the listener"),
2880 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2881 }
2882 }
2883 }
2884
2885 fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2886 let Some(info) = self.my_services.get_mut(&fullname) else {
2887 trace!("announce: cannot find such service {}", &fullname);
2888 return;
2889 };
2890
2891 let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2892 return;
2893 };
2894
2895 let Some(sock) = self.intf_socks.get(&intf) else {
2896 return;
2897 };
2898
2899 if announce_service_on_intf(dns_registry, info, &intf, sock) {
2900 let mut hostname = info.get_hostname();
2901 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2902 hostname = new_name;
2903 }
2904 let service_name = match dns_registry.name_changes.get(&fullname) {
2905 Some(new_name) => new_name.to_string(),
2906 None => fullname,
2907 };
2908
2909 debug!("resend: announce service {} on {}", service_name, intf.ip());
2910
2911 notify_monitors(
2912 &mut self.monitors,
2913 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2914 );
2915 info.set_status(&intf, ServiceStatus::Announced);
2916 } else {
2917 debug!("register-resend should not fail");
2918 }
2919
2920 self.increase_counter(Counter::RegisterResend, 1);
2921 }
2922
2923 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2924 let now = current_time_millis();
2934 let expire_at = if repeating {
2935 None
2936 } else {
2937 Some(now + timeout.as_millis() as u64)
2938 };
2939
2940 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2942
2943 if !record_vec.is_empty() {
2944 let query_vec: Vec<(&str, RRType)> = record_vec
2945 .iter()
2946 .map(|(record, rr_type)| (record.as_str(), *rr_type))
2947 .collect();
2948 self.send_query_vec(&query_vec);
2949
2950 if let Some(new_expire) = expire_at {
2951 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2955 }
2956 }
2957 }
2958
2959 fn refresh_active_services(&mut self) {
2961 let mut query_ptr_count = 0;
2962 let mut query_srv_count = 0;
2963 let mut new_timers = HashSet::new();
2964 let mut query_addr_count = 0;
2965
2966 for (ty_domain, _sender) in self.service_queriers.iter() {
2967 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2968 if !refreshed_timers.is_empty() {
2969 trace!("sending refresh query for PTR: {}", ty_domain);
2970 self.send_query(ty_domain, RRType::PTR);
2971 query_ptr_count += 1;
2972 new_timers.extend(refreshed_timers);
2973 }
2974
2975 let (instances, timers) = self.cache.refresh_due_srv(ty_domain);
2976 for instance in instances.iter() {
2977 trace!("sending refresh query for SRV: {}", instance);
2978 self.send_query(instance, RRType::SRV);
2979 query_srv_count += 1;
2980 }
2981 new_timers.extend(timers);
2982 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2983 for hostname in hostnames.iter() {
2984 trace!("sending refresh queries for A and AAAA: {}", hostname);
2985 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2986 query_addr_count += 2;
2987 }
2988 new_timers.extend(timers);
2989 }
2990
2991 for timer in new_timers {
2992 self.add_timer(timer);
2993 }
2994
2995 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2996 self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
2997 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2998 }
2999}
3000
3001#[derive(Debug)]
3004pub enum ServiceEvent {
3005 SearchStarted(String),
3007
3008 ServiceFound(String, String),
3010
3011 ServiceResolved(ServiceInfo),
3013
3014 ServiceRemoved(String, String),
3016
3017 SearchStopped(String),
3019}
3020
3021#[derive(Debug)]
3024#[non_exhaustive]
3025pub enum HostnameResolutionEvent {
3026 SearchStarted(String),
3028 AddressesFound(String, HashSet<IpAddr>),
3030 AddressesRemoved(String, HashSet<IpAddr>),
3032 SearchTimeout(String),
3034 SearchStopped(String),
3036}
3037
3038#[derive(Clone, Debug)]
3041#[non_exhaustive]
3042pub enum DaemonEvent {
3043 Announce(String, String),
3045
3046 Error(Error),
3048
3049 IpAdd(IpAddr),
3051
3052 IpDel(IpAddr),
3054
3055 NameChange(DnsNameChange),
3058
3059 Respond(IpAddr),
3061}
3062
3063#[derive(Clone, Debug)]
3066pub struct DnsNameChange {
3067 pub original: String,
3069
3070 pub new_name: String,
3080
3081 pub rr_type: RRType,
3083
3084 pub intf_name: String,
3086}
3087
3088#[derive(Debug)]
3090enum Command {
3091 Browse(String, u32, Sender<ServiceEvent>),
3093
3094 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
3099
3100 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, Interface), UnregisterResend(Vec<u8>, Interface), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3121
3122 GetStatus(Sender<DaemonStatus>),
3124
3125 Monitor(Sender<DaemonEvent>),
3127
3128 SetOption(DaemonOption),
3129
3130 GetOption(Sender<DaemonOptionVal>),
3131
3132 Verify(String, Duration),
3137
3138 Exit(Sender<DaemonStatus>),
3139}
3140
3141impl fmt::Display for Command {
3142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3143 match self {
3144 Self::Browse(_, _, _) => write!(f, "Command Browse"),
3145 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3146 Self::Exit(_) => write!(f, "Command Exit"),
3147 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3148 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3149 Self::Monitor(_) => write!(f, "Command Monitor"),
3150 Self::Register(_) => write!(f, "Command Register"),
3151 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3152 Self::SetOption(_) => write!(f, "Command SetOption"),
3153 Self::GetOption(_) => write!(f, "Command GetOption"),
3154 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3155 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3156 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3157 Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3158 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3159 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3160 }
3161 }
3162}
3163
3164struct DaemonOptionVal {
3165 _service_name_len_max: u8,
3166 ip_check_interval: u64,
3167}
3168
3169#[derive(Debug)]
3170enum DaemonOption {
3171 ServiceNameLenMax(u8),
3172 IpCheckInterval(u64),
3173 EnableInterface(Vec<IfKind>),
3174 DisableInterface(Vec<IfKind>),
3175 MulticastLoopV4(bool),
3176 MulticastLoopV6(bool),
3177}
3178
3179const DOMAIN_LEN: usize = "._tcp.local.".len();
3181
3182fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3184 if ty_domain.len() <= DOMAIN_LEN + 1 {
3185 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3187 }
3188
3189 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3191 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3192 }
3193 Ok(())
3194}
3195
3196fn check_domain_suffix(name: &str) -> Result<()> {
3198 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3199 return Err(e_fmt!(
3200 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3201 name
3202 ));
3203 }
3204
3205 Ok(())
3206}
3207
3208fn check_service_name(fullname: &str) -> Result<()> {
3216 check_domain_suffix(fullname)?;
3217
3218 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3219 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3220
3221 if &name[0..1] != "_" {
3222 return Err(e_fmt!("Service name must start with '_'"));
3223 }
3224
3225 let name = &name[1..];
3226
3227 if name.contains("--") {
3228 return Err(e_fmt!("Service name must not contain '--'"));
3229 }
3230
3231 if name.starts_with('-') || name.ends_with('-') {
3232 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3233 }
3234
3235 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3236 if ascii_count < 1 {
3237 return Err(e_fmt!(
3238 "Service name must contain at least one letter (eg: 'A-Za-z')"
3239 ));
3240 }
3241
3242 Ok(())
3243}
3244
3245fn check_hostname(hostname: &str) -> Result<()> {
3247 if !hostname.ends_with(".local.") {
3248 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3249 }
3250
3251 if hostname == ".local." {
3252 return Err(e_fmt!(
3253 "The part of the hostname before '.local.' cannot be empty"
3254 ));
3255 }
3256
3257 if hostname.len() > 255 {
3258 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3259 }
3260
3261 Ok(())
3262}
3263
3264fn call_service_listener(
3265 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3266 ty_domain: &str,
3267 event: ServiceEvent,
3268) {
3269 if let Some(listener) = listeners_map.get(ty_domain) {
3270 match listener.send(event) {
3271 Ok(()) => trace!("Sent event to listener successfully"),
3272 Err(e) => debug!("Failed to send event: {}", e),
3273 }
3274 }
3275}
3276
3277fn call_hostname_resolution_listener(
3278 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3279 hostname: &str,
3280 event: HostnameResolutionEvent,
3281) {
3282 let hostname_lower = hostname.to_lowercase();
3283 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3284 match listener.send(event) {
3285 Ok(()) => trace!("Sent event to listener successfully"),
3286 Err(e) => debug!("Failed to send event: {}", e),
3287 }
3288 }
3289}
3290
3291fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3294 if_addrs::get_if_addrs()
3295 .unwrap_or_default()
3296 .into_iter()
3297 .filter(|i| !i.is_loopback() || with_loopback)
3298 .collect()
3299}
3300
3301fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3303 let qtype = if out.is_query() { "query" } else { "response" };
3304 trace!(
3305 "send outgoing {}: {} questions {} answers {} authorities {} additional",
3306 qtype,
3307 out.questions().len(),
3308 out.answers_count(),
3309 out.authorities().len(),
3310 out.additionals().len()
3311 );
3312 let packet_list = out.to_data_on_wire();
3313 for packet in packet_list.iter() {
3314 multicast_on_intf(packet, intf, sock);
3315 }
3316 packet_list
3317}
3318
3319fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3321 if packet.len() > MAX_MSG_ABSOLUTE {
3322 debug!("Drop over-sized packet ({})", packet.len());
3323 return;
3324 }
3325
3326 let addr: SocketAddr = match intf.addr {
3327 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3328 if_addrs::IfAddr::V6(_) => {
3329 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3330 sock.set_scope_id(intf.index.unwrap_or(0)); sock.into()
3332 }
3333 };
3334
3335 send_packet(packet, addr, intf, socket);
3336}
3337
3338fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3340 match sock.send_to(packet, addr) {
3341 Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3342 Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3343 }
3344}
3345
3346fn valid_instance_name(name: &str) -> bool {
3350 name.split('.').count() >= 5
3351}
3352
3353fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3354 monitors.retain(|sender| {
3355 if let Err(e) = sender.try_send(event.clone()) {
3356 debug!("notify_monitors: try_send: {}", &e);
3357 if matches!(e, TrySendError::Disconnected(_)) {
3358 return false; }
3360 }
3361 true
3362 });
3363}
3364
3365fn prepare_announce(
3368 info: &ServiceInfo,
3369 intf: &Interface,
3370 dns_registry: &mut DnsRegistry,
3371) -> Option<DnsOutgoing> {
3372 let intf_addrs = info.get_addrs_on_intf(intf);
3373 if intf_addrs.is_empty() {
3374 trace!("No valid addrs to add on intf {:?}", &intf);
3375 return None;
3376 }
3377
3378 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3380 Some(new_name) => new_name,
3381 None => info.get_fullname(),
3382 };
3383
3384 debug!(
3385 "prepare to announce service {service_fullname} on {}: {}",
3386 &intf.name,
3387 &intf.ip()
3388 );
3389
3390 let mut probing_count = 0;
3391 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3392 let create_time = current_time_millis() + fastrand::u64(0..250);
3393
3394 out.add_answer_at_time(
3395 DnsPointer::new(
3396 info.get_type(),
3397 RRType::PTR,
3398 CLASS_IN,
3399 info.get_other_ttl(),
3400 service_fullname.to_string(),
3401 ),
3402 0,
3403 );
3404
3405 if let Some(sub) = info.get_subtype() {
3406 trace!("Adding subdomain {}", sub);
3407 out.add_answer_at_time(
3408 DnsPointer::new(
3409 sub,
3410 RRType::PTR,
3411 CLASS_IN,
3412 info.get_other_ttl(),
3413 service_fullname.to_string(),
3414 ),
3415 0,
3416 );
3417 }
3418
3419 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3421 Some(new_name) => new_name.to_string(),
3422 None => info.get_hostname().to_string(),
3423 };
3424
3425 let mut srv = DnsSrv::new(
3426 info.get_fullname(),
3427 CLASS_IN | CLASS_CACHE_FLUSH,
3428 info.get_host_ttl(),
3429 info.get_priority(),
3430 info.get_weight(),
3431 info.get_port(),
3432 hostname,
3433 );
3434
3435 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3436 srv.get_record_mut().set_new_name(new_name.to_string());
3437 }
3438
3439 if !info.requires_probe()
3440 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3441 {
3442 out.add_answer_at_time(srv, 0);
3443 } else {
3444 probing_count += 1;
3445 }
3446
3447 let mut txt = DnsTxt::new(
3450 info.get_fullname(),
3451 CLASS_IN | CLASS_CACHE_FLUSH,
3452 info.get_other_ttl(),
3453 info.generate_txt(),
3454 );
3455
3456 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3457 txt.get_record_mut().set_new_name(new_name.to_string());
3458 }
3459
3460 if !info.requires_probe()
3461 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3462 {
3463 out.add_answer_at_time(txt, 0);
3464 } else {
3465 probing_count += 1;
3466 }
3467
3468 let hostname = info.get_hostname();
3471 for address in intf_addrs {
3472 let mut dns_addr = DnsAddress::new(
3473 hostname,
3474 ip_address_rr_type(&address),
3475 CLASS_IN | CLASS_CACHE_FLUSH,
3476 info.get_host_ttl(),
3477 address,
3478 intf.into(),
3479 );
3480
3481 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3482 dns_addr.get_record_mut().set_new_name(new_name.to_string());
3483 }
3484
3485 if !info.requires_probe()
3486 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3487 {
3488 out.add_answer_at_time(dns_addr, 0);
3489 } else {
3490 probing_count += 1;
3491 }
3492 }
3493
3494 if probing_count > 0 {
3495 return None;
3496 }
3497
3498 Some(out)
3499}
3500
3501fn announce_service_on_intf(
3504 dns_registry: &mut DnsRegistry,
3505 info: &ServiceInfo,
3506 intf: &Interface,
3507 sock: &MioUdpSocket,
3508) -> bool {
3509 if let Some(out) = prepare_announce(info, intf, dns_registry) {
3510 send_dns_outgoing(&out, intf, sock);
3511 return true;
3512 }
3513 false
3514}
3515
3516fn name_change(original: &str) -> String {
3524 let mut parts: Vec<_> = original.split('.').collect();
3525 let Some(first_part) = parts.get_mut(0) else {
3526 return format!("{original} (2)");
3527 };
3528
3529 let mut new_name = format!("{} (2)", first_part);
3530
3531 if let Some(paren_pos) = first_part.rfind(" (") {
3533 if let Some(end_paren) = first_part[paren_pos..].find(')') {
3535 let absolute_end_pos = paren_pos + end_paren;
3536 if absolute_end_pos == first_part.len() - 1 {
3538 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3541 let base_name = &first_part[..paren_pos];
3542 new_name = format!("{} ({})", base_name, number + 1)
3543 }
3544 }
3545 }
3546 }
3547
3548 *first_part = &new_name;
3549 parts.join(".")
3550}
3551
3552fn hostname_change(original: &str) -> String {
3560 let mut parts: Vec<_> = original.split('.').collect();
3561 let Some(first_part) = parts.get_mut(0) else {
3562 return format!("{original}-2");
3563 };
3564
3565 let mut new_name = format!("{}-2", first_part);
3566
3567 if let Some(hyphen_pos) = first_part.rfind('-') {
3569 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3571 let base_name = &first_part[..hyphen_pos];
3572 new_name = format!("{}-{}", base_name, number + 1);
3573 }
3574 }
3575
3576 *first_part = &new_name;
3577 parts.join(".")
3578}
3579
3580fn add_answer_with_additionals(
3581 out: &mut DnsOutgoing,
3582 msg: &DnsIncoming,
3583 service: &ServiceInfo,
3584 intf: &Interface,
3585 dns_registry: &DnsRegistry,
3586) {
3587 let intf_addrs = service.get_addrs_on_intf(intf);
3588 if intf_addrs.is_empty() {
3589 trace!("No addrs on LAN of intf {:?}", intf);
3590 return;
3591 }
3592
3593 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3595 Some(new_name) => new_name,
3596 None => service.get_fullname(),
3597 };
3598
3599 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3600 Some(new_name) => new_name,
3601 None => service.get_hostname(),
3602 };
3603
3604 let ptr_added = out.add_answer(
3605 msg,
3606 DnsPointer::new(
3607 service.get_type(),
3608 RRType::PTR,
3609 CLASS_IN,
3610 service.get_other_ttl(),
3611 service_fullname.to_string(),
3612 ),
3613 );
3614
3615 if !ptr_added {
3616 trace!("answer was not added for msg {:?}", msg);
3617 return;
3618 }
3619
3620 if let Some(sub) = service.get_subtype() {
3621 trace!("Adding subdomain {}", sub);
3622 out.add_additional_answer(DnsPointer::new(
3623 sub,
3624 RRType::PTR,
3625 CLASS_IN,
3626 service.get_other_ttl(),
3627 service_fullname.to_string(),
3628 ));
3629 }
3630
3631 out.add_additional_answer(DnsSrv::new(
3634 service_fullname,
3635 CLASS_IN | CLASS_CACHE_FLUSH,
3636 service.get_host_ttl(),
3637 service.get_priority(),
3638 service.get_weight(),
3639 service.get_port(),
3640 hostname.to_string(),
3641 ));
3642
3643 out.add_additional_answer(DnsTxt::new(
3644 service_fullname,
3645 CLASS_IN | CLASS_CACHE_FLUSH,
3646 service.get_host_ttl(),
3647 service.generate_txt(),
3648 ));
3649
3650 for address in intf_addrs {
3651 out.add_additional_answer(DnsAddress::new(
3652 hostname,
3653 ip_address_rr_type(&address),
3654 CLASS_IN | CLASS_CACHE_FLUSH,
3655 service.get_host_ttl(),
3656 address,
3657 intf.into(),
3658 ));
3659 }
3660}
3661
3662#[cfg(test)]
3663mod tests {
3664 use super::{
3665 check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3666 name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3667 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3668 MDNS_PORT,
3669 };
3670 use crate::{
3671 dns_parser::{DnsOutgoing, DnsPointer, RRType, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE},
3672 service_daemon::check_hostname,
3673 };
3674 use std::{
3675 net::{SocketAddr, SocketAddrV4},
3676 time::Duration,
3677 };
3678 use test_log::test;
3679
3680 #[test]
3681 fn test_socketaddr_print() {
3682 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3683 let print = format!("{}", addr);
3684 assert_eq!(print, "224.0.0.251:5353");
3685 }
3686
3687 #[test]
3688 fn test_instance_name() {
3689 assert!(valid_instance_name("my-laser._printer._tcp.local."));
3690 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3691 assert!(!valid_instance_name("_printer._tcp.local."));
3692 }
3693
3694 #[test]
3695 fn test_check_service_name_length() {
3696 let result = check_service_name_length("_tcp", 100);
3697 assert!(result.is_err());
3698 if let Err(e) = result {
3699 println!("{}", e);
3700 }
3701 }
3702
3703 #[test]
3704 fn test_check_hostname() {
3705 for hostname in &[
3707 "my_host.local.",
3708 &("A".repeat(255 - ".local.".len()) + ".local."),
3709 ] {
3710 let result = check_hostname(hostname);
3711 assert!(result.is_ok());
3712 }
3713
3714 for hostname in &[
3716 "my_host.local",
3717 ".local.",
3718 &("A".repeat(256 - ".local.".len()) + ".local."),
3719 ] {
3720 let result = check_hostname(hostname);
3721 assert!(result.is_err());
3722 if let Err(e) = result {
3723 println!("{}", e);
3724 }
3725 }
3726 }
3727
3728 #[test]
3729 fn test_check_domain_suffix() {
3730 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3731 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3732 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3733 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3734 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3735 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3736 }
3737
3738 #[test]
3739 fn test_service_with_temporarily_invalidated_ptr() {
3740 let d = ServiceDaemon::new().expect("Failed to create daemon");
3742
3743 let service = "_test_inval_ptr._udp.local.";
3744 let host_name = "my_host_tmp_invalidated_ptr.local.";
3745 let intfs: Vec<_> = my_ip_interfaces(false);
3746 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3747 let port = 5201;
3748 let my_service =
3749 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3750 .expect("invalid service info")
3751 .enable_addr_auto();
3752 let result = d.register(my_service.clone());
3753 assert!(result.is_ok());
3754
3755 let browse_chan = d.browse(service).unwrap();
3757 let timeout = Duration::from_secs(2);
3758 let mut resolved = false;
3759
3760 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3761 match event {
3762 ServiceEvent::ServiceResolved(info) => {
3763 resolved = true;
3764 println!("Resolved a service of {}", &info.get_fullname());
3765 break;
3766 }
3767 e => {
3768 println!("Received event {:?}", e);
3769 }
3770 }
3771 }
3772
3773 assert!(resolved);
3774
3775 println!("Stopping browse of {}", service);
3776 d.stop_browse(service).unwrap();
3779
3780 let mut stopped = false;
3785 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3786 match event {
3787 ServiceEvent::SearchStopped(_) => {
3788 stopped = true;
3789 println!("Stopped browsing service");
3790 break;
3791 }
3792 e => {
3796 println!("Received event {:?}", e);
3797 }
3798 }
3799 }
3800
3801 assert!(stopped);
3802
3803 let invalidate_ptr_packet = DnsPointer::new(
3805 my_service.get_type(),
3806 RRType::PTR,
3807 CLASS_IN,
3808 0,
3809 my_service.get_fullname().to_string(),
3810 );
3811
3812 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3813 packet_buffer.add_additional_answer(invalidate_ptr_packet);
3814
3815 for intf in intfs {
3816 let sock = new_socket_bind(&intf, true).unwrap();
3817 send_dns_outgoing(&packet_buffer, &intf, &sock);
3818 }
3819
3820 println!(
3821 "Sent PTR record invalidation. Starting second browse for {}",
3822 service
3823 );
3824
3825 let browse_chan = d.browse(service).unwrap();
3827
3828 resolved = false;
3829 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3830 match event {
3831 ServiceEvent::ServiceResolved(info) => {
3832 resolved = true;
3833 println!("Resolved a service of {}", &info.get_fullname());
3834 break;
3835 }
3836 e => {
3837 println!("Received event {:?}", e);
3838 }
3839 }
3840 }
3841
3842 assert!(resolved);
3843 d.shutdown().unwrap();
3844 }
3845
3846 #[test]
3847 fn test_expired_srv() {
3848 let service_type = "_expired-srv._udp.local.";
3850 let instance = "test_instance";
3851 let host_name = "expired_srv_host.local.";
3852 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3853 .unwrap()
3854 .enable_addr_auto();
3855 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
3860
3861 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3863 let result = mdns_server.register(my_service);
3864 assert!(result.is_ok());
3865
3866 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3867 let browse_chan = mdns_client.browse(service_type).unwrap();
3868 let timeout = Duration::from_secs(2);
3869 let mut resolved = false;
3870
3871 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3872 match event {
3873 ServiceEvent::ServiceResolved(info) => {
3874 resolved = true;
3875 println!("Resolved a service of {}", &info.get_fullname());
3876 break;
3877 }
3878 _ => {}
3879 }
3880 }
3881
3882 assert!(resolved);
3883
3884 mdns_server.shutdown().unwrap();
3886
3887 let expire_timeout = Duration::from_secs(new_ttl as u64);
3889 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3890 match event {
3891 ServiceEvent::ServiceRemoved(service_type, full_name) => {
3892 println!("Service removed: {}: {}", &service_type, &full_name);
3893 break;
3894 }
3895 _ => {}
3896 }
3897 }
3898 }
3899
3900 #[test]
3901 fn test_hostname_resolution_address_removed() {
3902 let server = ServiceDaemon::new().expect("Failed to create server");
3904 let hostname = "addr_remove_host._tcp.local.";
3905 let service_ip_addr = my_ip_interfaces(false)
3906 .iter()
3907 .find(|iface| iface.ip().is_ipv4())
3908 .map(|iface| iface.ip())
3909 .unwrap();
3910
3911 let mut my_service = ServiceInfo::new(
3912 "_host_res_test._tcp.local.",
3913 "my_instance",
3914 hostname,
3915 &service_ip_addr,
3916 1234,
3917 None,
3918 )
3919 .expect("invalid service info");
3920
3921 let addr_ttl = 2;
3923 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
3926
3927 let client = ServiceDaemon::new().expect("Failed to create client");
3929 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3930 let resolved = loop {
3931 match event_receiver.recv() {
3932 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3933 assert!(found_hostname == hostname);
3934 assert!(addresses.contains(&service_ip_addr));
3935 println!("address found: {:?}", &addresses);
3936 break true;
3937 }
3938 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3939 Ok(_event) => {}
3940 Err(_) => break false,
3941 }
3942 };
3943
3944 assert!(resolved);
3945
3946 server.shutdown().unwrap();
3948
3949 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3951 let removed = loop {
3952 match event_receiver.recv_timeout(timeout) {
3953 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3954 assert!(removed_host == hostname);
3955 assert!(addresses.contains(&service_ip_addr));
3956
3957 println!(
3958 "address removed: hostname: {} addresses: {:?}",
3959 &hostname, &addresses
3960 );
3961 break true;
3962 }
3963 Ok(_event) => {}
3964 Err(_) => {
3965 break false;
3966 }
3967 }
3968 };
3969
3970 assert!(removed);
3971
3972 client.shutdown().unwrap();
3973 }
3974
3975 #[test]
3976 fn test_refresh_ptr() {
3977 let service_type = "_refresh-ptr._udp.local.";
3979 let instance = "test_instance";
3980 let host_name = "refresh_ptr_host.local.";
3981 let service_ip_addr = my_ip_interfaces(false)
3982 .iter()
3983 .find(|iface| iface.ip().is_ipv4())
3984 .map(|iface| iface.ip())
3985 .unwrap();
3986
3987 let mut my_service = ServiceInfo::new(
3988 service_type,
3989 instance,
3990 host_name,
3991 &service_ip_addr,
3992 5023,
3993 None,
3994 )
3995 .unwrap();
3996
3997 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
3999
4000 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4002 let result = mdns_server.register(my_service);
4003 assert!(result.is_ok());
4004
4005 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4006 let browse_chan = mdns_client.browse(service_type).unwrap();
4007 let timeout = Duration::from_millis(1500); let mut resolved = false;
4009
4010 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4012 match event {
4013 ServiceEvent::ServiceResolved(info) => {
4014 resolved = true;
4015 println!("Resolved a service of {}", &info.get_fullname());
4016 break;
4017 }
4018 _ => {}
4019 }
4020 }
4021
4022 assert!(resolved);
4023
4024 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4026 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4027 println!("event: {:?}", &event);
4028 }
4029
4030 let metrics_chan = mdns_client.get_metrics().unwrap();
4032 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4033 let refresh_counter = metrics["cache-refresh-ptr"];
4034 assert_eq!(refresh_counter, 1);
4035
4036 mdns_server.shutdown().unwrap();
4038 mdns_client.shutdown().unwrap();
4039 }
4040
4041 #[test]
4042 fn test_name_change() {
4043 assert_eq!(name_change("foo.local."), "foo (2).local.");
4044 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4045 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4046 assert_eq!(name_change("foo"), "foo (2)");
4047 assert_eq!(name_change("foo (2)"), "foo (3)");
4048 assert_eq!(name_change(""), " (2)");
4049
4050 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4055
4056 #[test]
4057 fn test_hostname_change() {
4058 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4059 assert_eq!(hostname_change("foo"), "foo-2");
4060 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4061 assert_eq!(hostname_change("foo-9"), "foo-10");
4062 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4063 }
4064}