1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38 FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{
42 split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43 },
44 Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51 cmp::{self, Reverse},
52 collections::{BinaryHeap, HashMap, HashSet},
53 fmt,
54 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55 str, thread,
56 time::Duration,
57 vec,
58};
59
60pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79#[derive(Debug)]
81pub enum UnregisterStatus {
82 OK,
84 NotFound,
86}
87
88#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92 Running,
94
95 Shutdown,
97}
98
99#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103 Register,
104 RegisterResend,
105 Unregister,
106 UnregisterResend,
107 Browse,
108 ResolveHostname,
109 Respond,
110 CacheRefreshPTR,
111 CacheRefreshSRV,
112 CacheRefreshAddr,
113 KnownAnswerSuppression,
114 CachedPTR,
115 CachedSRV,
116 CachedAddr,
117 CachedTxt,
118 CachedNSec,
119 CachedSubtype,
120 DnsRegistryProbe,
121 DnsRegistryActive,
122 DnsRegistryTimer,
123 DnsRegistryNameChange,
124 Timer,
125}
126
127impl fmt::Display for Counter {
128 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
129 match self {
130 Self::Register => write!(f, "register"),
131 Self::RegisterResend => write!(f, "register-resend"),
132 Self::Unregister => write!(f, "unregister"),
133 Self::UnregisterResend => write!(f, "unregister-resend"),
134 Self::Browse => write!(f, "browse"),
135 Self::ResolveHostname => write!(f, "resolve-hostname"),
136 Self::Respond => write!(f, "respond"),
137 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
138 Self::CacheRefreshSRV => write!(f, "cache-refresh-srv"),
139 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
140 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
141 Self::CachedPTR => write!(f, "cached-ptr"),
142 Self::CachedSRV => write!(f, "cached-srv"),
143 Self::CachedAddr => write!(f, "cached-addr"),
144 Self::CachedTxt => write!(f, "cached-txt"),
145 Self::CachedNSec => write!(f, "cached-nsec"),
146 Self::CachedSubtype => write!(f, "cached-subtype"),
147 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
148 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
149 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
150 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
151 Self::Timer => write!(f, "timer"),
152 }
153 }
154}
155
156pub type Metrics = HashMap<String, i64>;
159
160const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
166pub struct ServiceDaemon {
167 sender: Sender<Command>,
169
170 signal_addr: SocketAddr,
176}
177
178impl ServiceDaemon {
179 pub fn new() -> Result<Self> {
184 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
187
188 let signal_sock = UdpSocket::bind(signal_addr)
189 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
190
191 let signal_addr = signal_sock
193 .local_addr()
194 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
195
196 signal_sock
198 .set_nonblocking(true)
199 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
200
201 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
202
203 let (sender, receiver) = bounded(100);
204
205 let mio_sock = MioUdpSocket::from_std(signal_sock);
207 thread::Builder::new()
208 .name("mDNS_daemon".to_string())
209 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
210 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
211
212 Ok(Self {
213 sender,
214 signal_addr,
215 })
216 }
217
218 fn send_cmd(&self, cmd: Command) -> Result<()> {
221 let cmd_name = cmd.to_string();
222
223 self.sender.try_send(cmd).map_err(|e| match e {
225 TrySendError::Full(_) => Error::Again,
226 e => e_fmt!("flume::channel::send failed: {}", e),
227 })?;
228
229 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
231 let socket = UdpSocket::bind(addr)
232 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
233 socket
234 .send_to(cmd_name.as_bytes(), self.signal_addr)
235 .map_err(|e| {
236 e_fmt!(
237 "signal socket send_to {} ({}) failed: {}",
238 self.signal_addr,
239 cmd_name,
240 e
241 )
242 })?;
243
244 Ok(())
245 }
246
247 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
258 check_domain_suffix(service_type)?;
259
260 let (resp_s, resp_r) = bounded(10);
261 self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
262 Ok(resp_r)
263 }
264
265 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
270 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
271 }
272
273 pub fn resolve_hostname(
281 &self,
282 hostname: &str,
283 timeout: Option<u64>,
284 ) -> Result<Receiver<HostnameResolutionEvent>> {
285 check_hostname(hostname)?;
286 let (resp_s, resp_r) = bounded(10);
287 self.send_cmd(Command::ResolveHostname(
288 hostname.to_string(),
289 1,
290 resp_s,
291 timeout,
292 ))?;
293 Ok(resp_r)
294 }
295
296 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
301 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
302 }
303
304 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
312 check_service_name(service_info.get_fullname())?;
313 check_hostname(service_info.get_hostname())?;
314
315 self.send_cmd(Command::Register(service_info))
316 }
317
318 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
326 let (resp_s, resp_r) = bounded(1);
327 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
328 Ok(resp_r)
329 }
330
331 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
335 let (resp_s, resp_r) = bounded(100);
336 self.send_cmd(Command::Monitor(resp_s))?;
337 Ok(resp_r)
338 }
339
340 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
345 let (resp_s, resp_r) = bounded(1);
346 self.send_cmd(Command::Exit(resp_s))?;
347 Ok(resp_r)
348 }
349
350 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
356 let (resp_s, resp_r) = bounded(1);
357
358 if self.sender.is_disconnected() {
359 resp_s
360 .send(DaemonStatus::Shutdown)
361 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
362 } else {
363 self.send_cmd(Command::GetStatus(resp_s))?;
364 }
365
366 Ok(resp_r)
367 }
368
369 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
374 let (resp_s, resp_r) = bounded(1);
375 self.send_cmd(Command::GetMetrics(resp_s))?;
376 Ok(resp_r)
377 }
378
379 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
386 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
389 return Err(Error::Msg(format!(
390 "service name length max {} is too large",
391 len_max
392 )));
393 }
394
395 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
396 }
397
398 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
404 let interval_in_millis = interval_in_secs as u64 * 1000;
405 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
406 interval_in_millis,
407 )))
408 }
409
410 pub fn get_ip_check_interval(&self) -> Result<u32> {
412 let (resp_s, resp_r) = bounded(1);
413 self.send_cmd(Command::GetOption(resp_s))?;
414
415 let option = resp_r
416 .recv_timeout(Duration::from_secs(10))
417 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
418 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
419 Ok(ip_check_interval_in_secs as u32)
420 }
421
422 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
429 let if_kind_vec = if_kind.into_vec();
430 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
431 if_kind_vec.kinds,
432 )))
433 }
434
435 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
442 let if_kind_vec = if_kind.into_vec();
443 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
444 if_kind_vec.kinds,
445 )))
446 }
447
448 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
464 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
465 }
466
467 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
483 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
484 }
485
486 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
499 self.send_cmd(Command::Verify(instance_fullname, timeout))
500 }
501
502 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
503 let zc = Zeroconf::new(signal_sock, poller);
504
505 if let Some(cmd) = Self::run(zc, receiver) {
506 match cmd {
507 Command::Exit(resp_s) => {
508 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
511 debug!("exit: failed to send response of shutdown: {}", e);
512 }
513 }
514 _ => {
515 debug!("Unexpected command: {:?}", cmd);
516 }
517 }
518 }
519 }
520
521 fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
530 if let Err(e) = zc.poller.registry().register(
532 &mut zc.signal_sock,
533 mio::Token(SIGNAL_SOCK_EVENT_KEY),
534 mio::Interest::READABLE,
535 ) {
536 debug!("failed to add signal socket to the poller: {}", e);
537 return None;
538 }
539
540 for (intf, sock) in zc.intf_socks.iter_mut() {
542 let key =
543 Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
544
545 if let Err(e) =
546 zc.poller
547 .registry()
548 .register(sock, mio::Token(key), mio::Interest::READABLE)
549 {
550 debug!("add socket of {:?} to poller: {e}", intf);
551 return None;
552 }
553 }
554
555 let mut next_ip_check = if zc.ip_check_interval > 0 {
557 current_time_millis() + zc.ip_check_interval
558 } else {
559 0
560 };
561
562 if next_ip_check > 0 {
563 zc.add_timer(next_ip_check);
564 }
565
566 let mut events = mio::Events::with_capacity(1024);
569 loop {
570 let now = current_time_millis();
571
572 let earliest_timer = zc.peek_earliest_timer();
573 let timeout = earliest_timer.map(|timer| {
574 let millis = if timer > now { timer - now } else { 1 };
576 Duration::from_millis(millis)
577 });
578
579 events.clear();
581 match zc.poller.poll(&mut events, timeout) {
582 Ok(_) => zc.handle_poller_events(&events),
583 Err(e) => debug!("failed to select from sockets: {}", e),
584 }
585
586 let now = current_time_millis();
587
588 zc.pop_timers_till(now);
590
591 for hostname in zc
593 .hostname_resolvers
594 .clone()
595 .into_iter()
596 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
597 .map(|(hostname, _)| hostname)
598 {
599 trace!("hostname resolver timeout for {}", &hostname);
600 call_hostname_resolution_listener(
601 &zc.hostname_resolvers,
602 &hostname,
603 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
604 );
605 call_hostname_resolution_listener(
606 &zc.hostname_resolvers,
607 &hostname,
608 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
609 );
610 zc.hostname_resolvers.remove(&hostname);
611 }
612
613 while let Ok(command) = receiver.try_recv() {
615 if matches!(command, Command::Exit(_)) {
616 zc.status = DaemonStatus::Shutdown;
617 return Some(command);
618 }
619 zc.exec_command(command, false);
620 }
621
622 let mut i = 0;
624 while i < zc.retransmissions.len() {
625 if now >= zc.retransmissions[i].next_time {
626 let rerun = zc.retransmissions.remove(i);
627 zc.exec_command(rerun.command, true);
628 } else {
629 i += 1;
630 }
631 }
632
633 zc.refresh_active_services();
635
636 let mut query_count = 0;
638 for (hostname, _sender) in zc.hostname_resolvers.iter() {
639 for (hostname, ip_addr) in
640 zc.cache.refresh_due_hostname_resolutions(hostname).iter()
641 {
642 zc.send_query(hostname, ip_address_rr_type(ip_addr));
643 query_count += 1;
644 }
645 }
646
647 zc.increase_counter(Counter::CacheRefreshAddr, query_count);
648
649 let now = current_time_millis();
651
652 let expired_services = zc.cache.evict_expired_services(now);
654 zc.notify_service_removal(expired_services);
655
656 let expired_addrs = zc.cache.evict_expired_addr(now);
658 for (hostname, addrs) in expired_addrs {
659 call_hostname_resolution_listener(
660 &zc.hostname_resolvers,
661 &hostname,
662 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
663 );
664 let instances = zc.cache.get_instances_on_host(&hostname);
665 let instance_set: HashSet<String> = instances.into_iter().collect();
666 zc.resolve_updated_instances(&instance_set);
667 }
668
669 zc.probing_handler();
671
672 if now >= next_ip_check && next_ip_check > 0 {
674 next_ip_check = now + zc.ip_check_interval;
675 zc.add_timer(next_ip_check);
676
677 zc.check_ip_changes();
678 }
679 }
680 }
681}
682
683fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
685 let intf_ip = &intf.ip();
688 match intf_ip {
689 IpAddr::V4(ip) => {
690 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
691 let sock = new_socket(addr.into(), true)?;
692
693 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
695 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
696
697 sock.set_multicast_if_v4(ip)
699 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
700
701 if !should_loop {
702 sock.set_multicast_loop_v4(false)
703 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
704 }
705
706 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
708 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
709 for packet in test_packets {
710 sock.send_to(&packet, &multicast_addr)
711 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
712 }
713 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
714 }
715 IpAddr::V6(ip) => {
716 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
717 let sock = new_socket(addr.into(), true)?;
718
719 sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
721 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
722
723 sock.set_multicast_if_v6(intf.index.unwrap_or(0))
725 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
726
727 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
732 }
733 }
734}
735
736fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
739 let domain = match addr {
740 SocketAddr::V4(_) => socket2::Domain::IPV4,
741 SocketAddr::V6(_) => socket2::Domain::IPV6,
742 };
743
744 let fd = Socket::new(domain, socket2::Type::DGRAM, None)
745 .map_err(|e| e_fmt!("create socket failed: {}", e))?;
746
747 fd.set_reuse_address(true)
748 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
749 #[cfg(unix)] fd.set_reuse_port(true)
751 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
752
753 if non_block {
754 fd.set_nonblocking(true)
755 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
756 }
757
758 fd.bind(&addr.into())
759 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
760
761 trace!("new socket bind to {}", &addr);
762 Ok(fd)
763}
764
765struct ReRun {
767 next_time: u64,
769 command: Command,
770}
771
772#[derive(Debug, Eq, Hash, PartialEq)]
774enum IpVersion {
775 V4,
776 V6,
777}
778
779#[derive(Debug, Eq, Hash, PartialEq)]
781struct MulticastSendTracker {
782 intf_index: u32,
783 ip_version: IpVersion,
784}
785
786fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
788 match intf.index {
789 Some(index) => {
790 let ip_ver = match intf.addr {
791 IfAddr::V4(_) => IpVersion::V4,
792 IfAddr::V6(_) => IpVersion::V6,
793 };
794 Some(MulticastSendTracker {
795 intf_index: index,
796 ip_version: ip_ver,
797 })
798 }
799 None => None,
800 }
801}
802
803#[derive(Debug, Clone)]
807#[non_exhaustive]
808pub enum IfKind {
809 All,
811
812 IPv4,
814
815 IPv6,
817
818 Name(String),
820
821 Addr(IpAddr),
823
824 LoopbackV4,
829
830 LoopbackV6,
832}
833
834impl IfKind {
835 fn matches(&self, intf: &Interface) -> bool {
837 match self {
838 Self::All => true,
839 Self::IPv4 => intf.ip().is_ipv4(),
840 Self::IPv6 => intf.ip().is_ipv6(),
841 Self::Name(ifname) => ifname == &intf.name,
842 Self::Addr(addr) => addr == &intf.ip(),
843 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
844 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
845 }
846 }
847}
848
849impl From<&str> for IfKind {
852 fn from(val: &str) -> Self {
853 Self::Name(val.to_string())
854 }
855}
856
857impl From<&String> for IfKind {
858 fn from(val: &String) -> Self {
859 Self::Name(val.to_string())
860 }
861}
862
863impl From<IpAddr> for IfKind {
865 fn from(val: IpAddr) -> Self {
866 Self::Addr(val)
867 }
868}
869
870pub struct IfKindVec {
872 kinds: Vec<IfKind>,
873}
874
875pub trait IntoIfKindVec {
877 fn into_vec(self) -> IfKindVec;
878}
879
880impl<T: Into<IfKind>> IntoIfKindVec for T {
881 fn into_vec(self) -> IfKindVec {
882 let if_kind: IfKind = self.into();
883 IfKindVec {
884 kinds: vec![if_kind],
885 }
886 }
887}
888
889impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
890 fn into_vec(self) -> IfKindVec {
891 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
892 IfKindVec { kinds }
893 }
894}
895
896struct IfSelection {
898 if_kind: IfKind,
900
901 selected: bool,
903}
904
905struct Zeroconf {
907 intf_socks: HashMap<Interface, MioUdpSocket>,
909
910 poll_ids: HashMap<usize, Interface>,
912
913 poll_id_count: usize,
915
916 my_services: HashMap<String, ServiceInfo>,
918
919 cache: DnsCache,
921
922 dns_registry_map: HashMap<Interface, DnsRegistry>,
924
925 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
936
937 counters: Metrics,
938
939 poller: Poll,
941
942 monitors: Vec<Sender<DaemonEvent>>,
944
945 service_name_len_max: u8,
947
948 ip_check_interval: u64,
950
951 if_selections: Vec<IfSelection>,
953
954 signal_sock: MioUdpSocket,
956
957 timers: BinaryHeap<Reverse<u64>>,
963
964 status: DaemonStatus,
965
966 pending_resolves: HashSet<String>,
968
969 resolved: HashSet<String>,
971
972 multicast_loop_v4: bool,
973
974 multicast_loop_v6: bool,
975}
976
977impl Zeroconf {
978 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
979 let my_ifaddrs = my_ip_interfaces(false);
981
982 let mut intf_socks = HashMap::new();
986 let mut dns_registry_map = HashMap::new();
987
988 for intf in my_ifaddrs {
989 let sock = match new_socket_bind(&intf, true) {
990 Ok(s) => s,
991 Err(e) => {
992 trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
993 continue;
994 }
995 };
996
997 dns_registry_map.insert(intf.clone(), DnsRegistry::new());
998
999 intf_socks.insert(intf, sock);
1000 }
1001
1002 let monitors = Vec::new();
1003 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1004 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1005
1006 let timers = BinaryHeap::new();
1007
1008 let if_selections = vec![
1010 IfSelection {
1011 if_kind: IfKind::LoopbackV4,
1012 selected: false,
1013 },
1014 IfSelection {
1015 if_kind: IfKind::LoopbackV6,
1016 selected: false,
1017 },
1018 ];
1019
1020 let status = DaemonStatus::Running;
1021
1022 Self {
1023 intf_socks,
1024 poll_ids: HashMap::new(),
1025 poll_id_count: 0,
1026 my_services: HashMap::new(),
1027 cache: DnsCache::new(),
1028 dns_registry_map,
1029 hostname_resolvers: HashMap::new(),
1030 service_queriers: HashMap::new(),
1031 retransmissions: Vec::new(),
1032 counters: HashMap::new(),
1033 poller,
1034 monitors,
1035 service_name_len_max,
1036 ip_check_interval,
1037 if_selections,
1038 signal_sock,
1039 timers,
1040 status,
1041 pending_resolves: HashSet::new(),
1042 resolved: HashSet::new(),
1043 multicast_loop_v4: true,
1044 multicast_loop_v6: true,
1045 }
1046 }
1047
1048 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1049 match daemon_opt {
1050 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1051 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1052 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1053 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1054 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1055 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1056 }
1057 }
1058
1059 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1060 for if_kind in kinds {
1061 self.if_selections.push(IfSelection {
1062 if_kind,
1063 selected: true,
1064 });
1065 }
1066
1067 self.apply_intf_selections(my_ip_interfaces(true));
1068 }
1069
1070 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1071 for if_kind in kinds {
1072 self.if_selections.push(IfSelection {
1073 if_kind,
1074 selected: false,
1075 });
1076 }
1077
1078 self.apply_intf_selections(my_ip_interfaces(true));
1079 }
1080
1081 fn set_multicast_loop_v4(&mut self, on: bool) {
1082 for (_, sock) in self.intf_socks.iter_mut() {
1083 if let Err(e) = sock.set_multicast_loop_v4(on) {
1084 debug!("failed to set multicast loop v4: {e}");
1085 }
1086 }
1087 }
1088
1089 fn set_multicast_loop_v6(&mut self, on: bool) {
1090 for (_, sock) in self.intf_socks.iter_mut() {
1091 if let Err(e) = sock.set_multicast_loop_v6(on) {
1092 debug!("failed to set multicast loop v6: {e}");
1093 }
1094 }
1095 }
1096
1097 fn notify_monitors(&mut self, event: DaemonEvent) {
1098 self.monitors.retain(|sender| {
1100 if let Err(e) = sender.try_send(event.clone()) {
1101 debug!("notify_monitors: try_send: {}", &e);
1102 if matches!(e, TrySendError::Disconnected(_)) {
1103 return false; }
1105 }
1106 true
1107 });
1108 }
1109
1110 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1112 for (_, service_info) in self.my_services.iter_mut() {
1113 if service_info.is_addr_auto() {
1114 service_info.remove_ipaddr(addr);
1115 }
1116 }
1117 }
1118
1119 fn add_poll(&mut self, intf: Interface) -> usize {
1121 Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1122 }
1123
1124 fn add_poll_impl(
1128 poll_ids: &mut HashMap<usize, Interface>,
1129 poll_id_count: &mut usize,
1130 intf: Interface,
1131 ) -> usize {
1132 let key = *poll_id_count;
1133 *poll_id_count += 1;
1134 let _ = (*poll_ids).insert(key, intf);
1135 key
1136 }
1137
1138 fn add_timer(&mut self, next_time: u64) {
1139 self.timers.push(Reverse(next_time));
1140 }
1141
1142 fn peek_earliest_timer(&self) -> Option<u64> {
1143 self.timers.peek().map(|Reverse(v)| *v)
1144 }
1145
1146 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1147 self.timers.pop().map(|Reverse(v)| v)
1148 }
1149
1150 fn pop_timers_till(&mut self, now: u64) {
1152 while let Some(Reverse(v)) = self.timers.peek() {
1153 if *v > now {
1154 break;
1155 }
1156 self.timers.pop();
1157 }
1158 }
1159
1160 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1162 let intf_count = interfaces.len();
1163 let mut intf_selections = vec![true; intf_count];
1164
1165 for selection in self.if_selections.iter() {
1167 for i in 0..intf_count {
1169 if selection.if_kind.matches(&interfaces[i]) {
1170 intf_selections[i] = selection.selected;
1171 }
1172 }
1173 }
1174
1175 let mut selected_addrs = HashSet::new();
1176 for i in 0..intf_count {
1177 if intf_selections[i] {
1178 selected_addrs.insert(interfaces[i].addr.ip());
1179 }
1180 }
1181
1182 selected_addrs
1183 }
1184
1185 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1190 let intf_count = interfaces.len();
1192 let mut intf_selections = vec![true; intf_count];
1193
1194 for selection in self.if_selections.iter() {
1196 for i in 0..intf_count {
1198 if selection.if_kind.matches(&interfaces[i]) {
1199 intf_selections[i] = selection.selected;
1200 }
1201 }
1202 }
1203
1204 for (idx, intf) in interfaces.into_iter().enumerate() {
1206 if intf_selections[idx] {
1207 if !self.intf_socks.contains_key(&intf) {
1209 debug!("apply_intf_selections: add {:?}", &intf.ip());
1210 self.add_new_interface(intf);
1211 }
1212 } else {
1213 if let Some(mut sock) = self.intf_socks.remove(&intf) {
1215 match self.poller.registry().deregister(&mut sock) {
1216 Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1217 Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1218 }
1219
1220 self.poll_ids.retain(|_, v| v != &intf);
1222
1223 self.cache.remove_addrs_on_disabled_intf(&intf);
1225 }
1226 }
1227 }
1228 }
1229
1230 fn check_ip_changes(&mut self) {
1232 let my_ifaddrs = my_ip_interfaces(true);
1234
1235 let poll_ids = &mut self.poll_ids;
1236 let poller = &mut self.poller;
1237 let deleted_addrs = self
1239 .intf_socks
1240 .iter_mut()
1241 .filter_map(|(intf, sock)| {
1242 if !my_ifaddrs.contains(intf) {
1243 if let Err(e) = poller.registry().deregister(sock) {
1244 debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1245 }
1246 poll_ids.retain(|_, v| v != intf);
1248 Some(intf.ip())
1249 } else {
1250 None
1251 }
1252 })
1253 .collect::<Vec<IpAddr>>();
1254
1255 for ip in deleted_addrs.iter() {
1257 self.del_addr_in_my_services(ip);
1258 self.notify_monitors(DaemonEvent::IpDel(*ip));
1259 }
1260
1261 self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1263
1264 self.apply_intf_selections(my_ifaddrs);
1266 }
1267
1268 fn add_new_interface(&mut self, intf: Interface) {
1269 let new_ip = intf.ip();
1271 let should_loop = if new_ip.is_ipv4() {
1272 self.multicast_loop_v4
1273 } else {
1274 self.multicast_loop_v6
1275 };
1276 let mut sock = match new_socket_bind(&intf, should_loop) {
1277 Ok(s) => s,
1278 Err(e) => {
1279 debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1280 return;
1281 }
1282 };
1283
1284 let key = self.add_poll(intf.clone());
1286 if let Err(e) =
1287 self.poller
1288 .registry()
1289 .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1290 {
1291 debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1292 return;
1293 }
1294
1295 debug!("add new interface {}: {new_ip}", intf.name);
1296 let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1297 Some(registry) => registry,
1298 None => self
1299 .dns_registry_map
1300 .entry(intf.clone())
1301 .or_insert_with(DnsRegistry::new),
1302 };
1303
1304 for (_, service_info) in self.my_services.iter_mut() {
1305 if service_info.is_addr_auto() {
1306 service_info.insert_ipaddr(new_ip);
1307
1308 if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1309 debug!(
1310 "Announce service {} on {}",
1311 service_info.get_fullname(),
1312 intf.ip()
1313 );
1314 service_info.set_status(&intf, ServiceStatus::Announced);
1315 } else {
1316 for timer in dns_registry.new_timers.drain(..) {
1317 self.timers.push(Reverse(timer));
1318 }
1319 service_info.set_status(&intf, ServiceStatus::Probing);
1320 }
1321 }
1322 }
1323
1324 self.intf_socks.insert(intf, sock);
1325
1326 self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1328 }
1329
1330 fn register_service(&mut self, mut info: ServiceInfo) {
1339 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1341 debug!("check_service_name_length: {}", &e);
1342 self.notify_monitors(DaemonEvent::Error(e));
1343 return;
1344 }
1345
1346 if info.is_addr_auto() {
1347 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1348 for addr in selected_addrs {
1349 info.insert_ipaddr(addr);
1350 }
1351 }
1352
1353 debug!("register service {:?}", &info);
1354
1355 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1356 if !outgoing_addrs.is_empty() {
1357 self.notify_monitors(DaemonEvent::Announce(
1358 info.get_fullname().to_string(),
1359 format!("{:?}", &outgoing_addrs),
1360 ));
1361 }
1362
1363 let service_fullname = info.get_fullname().to_lowercase();
1366 self.my_services.insert(service_fullname, info);
1367 }
1368
1369 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1372 let mut outgoing_addrs = Vec::new();
1373 let mut multicast_sent_trackers = HashSet::new();
1375
1376 let mut outgoing_intfs = Vec::new();
1377
1378 for (intf, sock) in self.intf_socks.iter() {
1379 if let Some(tracker) = multicast_send_tracker(intf) {
1380 if multicast_sent_trackers.contains(&tracker) {
1381 continue; }
1383 }
1384
1385 let dns_registry = match self.dns_registry_map.get_mut(intf) {
1386 Some(registry) => registry,
1387 None => self
1388 .dns_registry_map
1389 .entry(intf.clone())
1390 .or_insert_with(DnsRegistry::new),
1391 };
1392
1393 if announce_service_on_intf(dns_registry, info, intf, sock) {
1394 if let Some(tracker) = multicast_send_tracker(intf) {
1395 multicast_sent_trackers.insert(tracker);
1396 }
1397 outgoing_addrs.push(intf.ip());
1398 outgoing_intfs.push(intf.clone());
1399
1400 debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1401
1402 info.set_status(intf, ServiceStatus::Announced);
1403 } else {
1404 for timer in dns_registry.new_timers.drain(..) {
1405 self.timers.push(Reverse(timer));
1406 }
1407 info.set_status(intf, ServiceStatus::Probing);
1408 }
1409 }
1410
1411 let next_time = current_time_millis() + 1000;
1415 for intf in outgoing_intfs {
1416 self.add_retransmission(
1417 next_time,
1418 Command::RegisterResend(info.get_fullname().to_string(), intf),
1419 );
1420 }
1421
1422 outgoing_addrs
1423 }
1424
1425 fn probing_handler(&mut self) {
1427 let now = current_time_millis();
1428
1429 for (intf, sock) in self.intf_socks.iter() {
1430 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1431 continue;
1432 };
1433
1434 let mut expired_probe_names = Vec::new();
1435 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1436
1437 for (name, probe) in dns_registry.probing.iter_mut() {
1438 if now >= probe.next_send {
1439 if probe.expired(now) {
1440 expired_probe_names.push(name.clone());
1442 } else {
1443 out.add_question(name, RRType::ANY);
1444
1445 for record in probe.records.iter() {
1453 out.add_authority(record.clone());
1454 }
1455
1456 probe.update_next_send(now);
1457
1458 self.timers.push(Reverse(probe.next_send));
1460 }
1461 }
1462 }
1463
1464 if !out.questions().is_empty() {
1466 debug!("sending out probing of {} questions", out.questions().len());
1467 send_dns_outgoing(&out, intf, sock);
1468 }
1469
1470 let mut waiting_services = HashSet::new();
1471
1472 for name in expired_probe_names {
1473 let Some(probe) = dns_registry.probing.remove(&name) else {
1474 continue;
1475 };
1476
1477 for record in probe.records.iter() {
1479 if let Some(new_name) = record.get_record().get_new_name() {
1480 dns_registry
1481 .name_changes
1482 .insert(name.clone(), new_name.to_string());
1483
1484 let event = DnsNameChange {
1485 original: record.get_record().get_original_name().to_string(),
1486 new_name: new_name.to_string(),
1487 rr_type: record.get_type(),
1488 intf_name: intf.name.to_string(),
1489 };
1490 notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1491 }
1492 }
1493
1494 debug!(
1496 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1497 probe.records.len(),
1498 probe.waiting_services.len(),
1499 );
1500
1501 if !probe.records.is_empty() {
1503 match dns_registry.active.get_mut(&name) {
1504 Some(records) => {
1505 records.extend(probe.records);
1506 }
1507 None => {
1508 dns_registry.active.insert(name, probe.records);
1509 }
1510 }
1511
1512 waiting_services.extend(probe.waiting_services);
1513 }
1514 }
1515
1516 for service_name in waiting_services {
1518 debug!(
1519 "try to announce service {service_name} on intf {}",
1520 intf.ip()
1521 );
1522 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1524 if info.get_status(intf) == ServiceStatus::Announced {
1525 debug!("service {} already announced", info.get_fullname());
1526 continue;
1527 }
1528
1529 if announce_service_on_intf(dns_registry, info, intf, sock) {
1530 let next_time = now + 1000;
1531 let command =
1532 Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1533 self.retransmissions.push(ReRun { next_time, command });
1534 self.timers.push(Reverse(next_time));
1535
1536 let fullname = match dns_registry.name_changes.get(&service_name) {
1537 Some(new_name) => new_name.to_string(),
1538 None => service_name.to_string(),
1539 };
1540
1541 let mut hostname = info.get_hostname();
1542 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1543 hostname = new_name;
1544 }
1545
1546 debug!("wake up: announce service {} on {}", fullname, intf.ip());
1547 notify_monitors(
1548 &mut self.monitors,
1549 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1550 );
1551
1552 info.set_status(intf, ServiceStatus::Announced);
1553 }
1554 }
1555 }
1556 }
1557 }
1558
1559 fn unregister_service(
1560 &self,
1561 info: &ServiceInfo,
1562 intf: &Interface,
1563 sock: &MioUdpSocket,
1564 ) -> Vec<u8> {
1565 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1566 out.add_answer_at_time(
1567 DnsPointer::new(
1568 info.get_type(),
1569 RRType::PTR,
1570 CLASS_IN,
1571 0,
1572 info.get_fullname().to_string(),
1573 ),
1574 0,
1575 );
1576
1577 if let Some(sub) = info.get_subtype() {
1578 trace!("Adding subdomain {}", sub);
1579 out.add_answer_at_time(
1580 DnsPointer::new(
1581 sub,
1582 RRType::PTR,
1583 CLASS_IN,
1584 0,
1585 info.get_fullname().to_string(),
1586 ),
1587 0,
1588 );
1589 }
1590
1591 out.add_answer_at_time(
1592 DnsSrv::new(
1593 info.get_fullname(),
1594 CLASS_IN | CLASS_CACHE_FLUSH,
1595 0,
1596 info.get_priority(),
1597 info.get_weight(),
1598 info.get_port(),
1599 info.get_hostname().to_string(),
1600 ),
1601 0,
1602 );
1603 out.add_answer_at_time(
1604 DnsTxt::new(
1605 info.get_fullname(),
1606 CLASS_IN | CLASS_CACHE_FLUSH,
1607 0,
1608 info.generate_txt(),
1609 ),
1610 0,
1611 );
1612
1613 for address in info.get_addrs_on_intf(intf) {
1614 out.add_answer_at_time(
1615 DnsAddress::new(
1616 info.get_hostname(),
1617 ip_address_rr_type(&address),
1618 CLASS_IN | CLASS_CACHE_FLUSH,
1619 0,
1620 address,
1621 ),
1622 0,
1623 );
1624 }
1625
1626 send_dns_outgoing(&out, intf, sock).remove(0)
1628 }
1629
1630 fn add_hostname_resolver(
1634 &mut self,
1635 hostname: String,
1636 listener: Sender<HostnameResolutionEvent>,
1637 timeout: Option<u64>,
1638 ) {
1639 let real_timeout = timeout.map(|t| current_time_millis() + t);
1640 self.hostname_resolvers
1641 .insert(hostname.to_lowercase(), (listener, real_timeout));
1642 if let Some(t) = real_timeout {
1643 self.add_timer(t);
1644 }
1645 }
1646
1647 fn send_query(&self, name: &str, qtype: RRType) {
1649 self.send_query_vec(&[(name, qtype)]);
1650 }
1651
1652 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1654 trace!("Sending query questions: {:?}", questions);
1655 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1656 let now = current_time_millis();
1657
1658 for (name, qtype) in questions {
1659 out.add_question(name, *qtype);
1660
1661 for record in self.cache.get_known_answers(name, *qtype, now) {
1662 trace!("add known answer: {:?}", record);
1670 let mut new_record = record.clone();
1671 new_record.get_record_mut().update_ttl(now);
1672 out.add_answer_box(new_record);
1673 }
1674 }
1675
1676 let mut multicast_sent_trackers = HashSet::new();
1678 for (intf, sock) in self.intf_socks.iter() {
1679 if let Some(tracker) = multicast_send_tracker(intf) {
1680 if multicast_sent_trackers.contains(&tracker) {
1681 continue; }
1683 multicast_sent_trackers.insert(tracker);
1684 }
1685 send_dns_outgoing(&out, intf, sock);
1686 }
1687 }
1688
1689 fn handle_read(&mut self, intf: &Interface) -> bool {
1694 let sock = match self.intf_socks.get_mut(intf) {
1695 Some(if_sock) => if_sock,
1696 None => return false,
1697 };
1698 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1699
1700 let sz = match sock.recv(&mut buf) {
1707 Ok(sz) => sz,
1708 Err(e) => {
1709 if e.kind() != std::io::ErrorKind::WouldBlock {
1710 debug!("listening socket read failed: {}", e);
1711 }
1712 return false;
1713 }
1714 };
1715
1716 trace!("received {} bytes at IP: {}", sz, intf.ip());
1717
1718 if sz == 0 {
1720 debug!("socket {:?} was likely shutdown", &sock);
1721 if let Err(e) = self.poller.registry().deregister(sock) {
1722 debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1723 }
1724
1725 let should_loop = if intf.ip().is_ipv4() {
1727 self.multicast_loop_v4
1728 } else {
1729 self.multicast_loop_v6
1730 };
1731 match new_socket_bind(intf, should_loop) {
1732 Ok(new_sock) => {
1733 trace!("reset socket for IP {}", intf.ip());
1734 self.intf_socks.insert(intf.clone(), new_sock);
1735 }
1736 Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1737 }
1738
1739 return false;
1740 }
1741
1742 buf.truncate(sz); match DnsIncoming::new(buf) {
1745 Ok(msg) => {
1746 if msg.is_query() {
1747 self.handle_query(msg, intf);
1748 } else if msg.is_response() {
1749 self.handle_response(msg, intf);
1750 } else {
1751 debug!("Invalid message: not query and not response");
1752 }
1753 }
1754 Err(e) => debug!("Invalid incoming DNS message: {}", e),
1755 }
1756
1757 true
1758 }
1759
1760 fn query_unresolved(&mut self, instance: &str) -> bool {
1762 if !valid_instance_name(instance) {
1763 trace!("instance name {} not valid", instance);
1764 return false;
1765 }
1766
1767 if let Some(records) = self.cache.get_srv(instance) {
1768 for record in records {
1769 if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1770 if self.cache.get_addr(srv.host()).is_none() {
1771 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1772 return true;
1773 }
1774 }
1775 }
1776 } else {
1777 self.send_query(instance, RRType::ANY);
1778 return true;
1779 }
1780
1781 false
1782 }
1783
1784 fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1787 let mut resolved: HashSet<String> = HashSet::new();
1788 let mut unresolved: HashSet<String> = HashSet::new();
1789
1790 if let Some(records) = self.cache.get_ptr(ty_domain) {
1791 for record in records.iter() {
1792 if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1793 let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1794 Ok(ok) => ok,
1795 Err(err) => {
1796 debug!("Error while creating service info from cache: {}", err);
1797 continue;
1798 }
1799 };
1800
1801 match sender.send(ServiceEvent::ServiceFound(
1802 ty_domain.to_string(),
1803 ptr.alias().to_string(),
1804 )) {
1805 Ok(()) => debug!("send service found {}", ptr.alias()),
1806 Err(e) => {
1807 debug!("failed to send service found: {}", e);
1808 continue;
1809 }
1810 }
1811
1812 if info.is_ready() {
1813 resolved.insert(ptr.alias().to_string());
1814 match sender.send(ServiceEvent::ServiceResolved(info)) {
1815 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1816 Err(e) => debug!("failed to send service resolved: {}", e),
1817 }
1818 } else {
1819 unresolved.insert(ptr.alias().to_string());
1820 }
1821 }
1822 }
1823 }
1824
1825 for instance in resolved.drain() {
1826 self.pending_resolves.remove(&instance);
1827 self.resolved.insert(instance);
1828 }
1829
1830 for instance in unresolved.drain() {
1831 self.add_pending_resolve(instance);
1832 }
1833 }
1834
1835 fn query_cache_for_hostname(
1838 &mut self,
1839 hostname: &str,
1840 sender: Sender<HostnameResolutionEvent>,
1841 ) {
1842 let addresses_map = self.cache.get_addresses_for_host(hostname);
1843 for (name, addresses) in addresses_map {
1844 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1845 Ok(()) => trace!("sent hostname addresses found"),
1846 Err(e) => debug!("failed to send hostname addresses found: {}", e),
1847 }
1848 }
1849 }
1850
1851 fn add_pending_resolve(&mut self, instance: String) {
1852 if !self.pending_resolves.contains(&instance) {
1853 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1854 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1855 self.pending_resolves.insert(instance);
1856 }
1857 }
1858
1859 fn create_service_info_from_cache(
1860 &self,
1861 ty_domain: &str,
1862 fullname: &str,
1863 ) -> Result<ServiceInfo> {
1864 let my_name = {
1865 let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1866 name.strip_suffix('.').unwrap_or(name).to_string()
1867 };
1868
1869 let now = current_time_millis();
1870 let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1871
1872 if let Some(subtype) = self.cache.get_subtype(fullname) {
1874 trace!(
1875 "ty_domain: {} found subtype {} for instance: {}",
1876 ty_domain,
1877 subtype,
1878 fullname
1879 );
1880 if info.get_subtype().is_none() {
1881 info.set_subtype(subtype.clone());
1882 }
1883 }
1884
1885 if let Some(records) = self.cache.get_srv(fullname) {
1887 if let Some(answer) = records.first() {
1888 if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1889 info.set_hostname(dns_srv.host().to_string());
1890 info.set_port(dns_srv.port());
1891 }
1892 }
1893 }
1894
1895 if let Some(records) = self.cache.get_txt(fullname) {
1897 if let Some(record) = records.first() {
1898 if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1899 info.set_properties_from_txt(dns_txt.text());
1900 }
1901 }
1902 }
1903
1904 if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1906 for answer in records.iter() {
1907 if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1908 if dns_a.get_record().is_expired(now) {
1909 trace!("Addr expired: {}", dns_a.address());
1910 } else {
1911 info.insert_ipaddr(dns_a.address());
1912 }
1913 }
1914 }
1915 }
1916
1917 Ok(info)
1918 }
1919
1920 fn handle_poller_events(&mut self, events: &mio::Events) {
1921 for ev in events.iter() {
1922 trace!("event received with key {:?}", ev.token());
1923 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1924 self.signal_sock_drain();
1926
1927 if let Err(e) = self.poller.registry().reregister(
1928 &mut self.signal_sock,
1929 ev.token(),
1930 mio::Interest::READABLE,
1931 ) {
1932 debug!("failed to modify poller for signal socket: {}", e);
1933 }
1934 continue; }
1936
1937 let intf = match self.poll_ids.get(&ev.token().0) {
1939 Some(interface) => interface.clone(),
1940 None => {
1941 debug!("Ip for event key {} not found", ev.token().0);
1942 break;
1943 }
1944 };
1945 while self.handle_read(&intf) {}
1946
1947 if let Some(sock) = self.intf_socks.get_mut(&intf) {
1949 if let Err(e) =
1950 self.poller
1951 .registry()
1952 .reregister(sock, ev.token(), mio::Interest::READABLE)
1953 {
1954 debug!("modify poller for interface {:?}: {}", &intf, e);
1955 break;
1956 }
1957 }
1958 }
1959 }
1960
1961 fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1964 trace!(
1965 "handle_response: {} answers {} authorities {} additionals",
1966 msg.answers().len(),
1967 &msg.authorities().len(),
1968 &msg.num_additionals()
1969 );
1970 let now = current_time_millis();
1971
1972 let mut record_predicate = |record: &DnsRecordBox| {
1974 if !record.get_record().is_expired(now) {
1975 return true;
1976 }
1977
1978 debug!("record is expired, removing it from cache.");
1979 if self.cache.remove(record) {
1980 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1982 call_service_listener(
1983 &self.service_queriers,
1984 dns_ptr.get_name(),
1985 ServiceEvent::ServiceRemoved(
1986 dns_ptr.get_name().to_string(),
1987 dns_ptr.alias().to_string(),
1988 ),
1989 );
1990 }
1991 }
1992 false
1993 };
1994 msg.answers_mut().retain(&mut record_predicate);
1995 msg.authorities_mut().retain(&mut record_predicate);
1996 msg.additionals_mut().retain(&mut record_predicate);
1997
1998 self.conflict_handler(&msg, intf);
2000
2001 let mut is_for_us = true; for answer in msg.answers() {
2008 if answer.get_type() == RRType::PTR {
2009 if self.service_queriers.contains_key(answer.get_name()) {
2010 is_for_us = true;
2011 break; } else {
2013 is_for_us = false;
2014 }
2015 }
2016 }
2017
2018 struct InstanceChange {
2020 ty: RRType, name: String, }
2023
2024 let mut changes = Vec::new();
2032 let mut timers = Vec::new();
2033 for record in msg.all_records() {
2034 match self
2035 .cache
2036 .add_or_update(intf, record, &mut timers, is_for_us)
2037 {
2038 Some((dns_record, true)) => {
2039 timers.push(dns_record.get_record().get_expire_time());
2040 timers.push(dns_record.get_record().get_refresh_time());
2041
2042 let ty = dns_record.get_type();
2043 let name = dns_record.get_name();
2044 if ty == RRType::PTR {
2045 if self.service_queriers.contains_key(name) {
2046 timers.push(dns_record.get_record().get_refresh_time());
2047 }
2048
2049 if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2051 call_service_listener(
2052 &self.service_queriers,
2053 name,
2054 ServiceEvent::ServiceFound(
2055 name.to_string(),
2056 dns_ptr.alias().to_string(),
2057 ),
2058 );
2059 changes.push(InstanceChange {
2060 ty,
2061 name: dns_ptr.alias().to_string(),
2062 });
2063 }
2064 } else {
2065 changes.push(InstanceChange {
2066 ty,
2067 name: name.to_string(),
2068 });
2069 }
2070 }
2071 Some((dns_record, false)) => {
2072 timers.push(dns_record.get_record().get_expire_time());
2073 timers.push(dns_record.get_record().get_refresh_time());
2074 }
2075 _ => {}
2076 }
2077 }
2078
2079 for t in timers {
2081 self.add_timer(t);
2082 }
2083
2084 for change in changes
2086 .iter()
2087 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2088 {
2089 let addr_map = self.cache.get_addresses_for_host(&change.name);
2090 for (name, addresses) in addr_map {
2091 call_hostname_resolution_listener(
2092 &self.hostname_resolvers,
2093 &change.name,
2094 HostnameResolutionEvent::AddressesFound(name, addresses),
2095 )
2096 }
2097 }
2098
2099 let mut updated_instances = HashSet::new();
2101 for update in changes {
2102 match update.ty {
2103 RRType::PTR | RRType::SRV | RRType::TXT => {
2104 updated_instances.insert(update.name);
2105 }
2106 RRType::A | RRType::AAAA => {
2107 let instances = self.cache.get_instances_on_host(&update.name);
2108 updated_instances.extend(instances);
2109 }
2110 _ => {}
2111 }
2112 }
2113
2114 self.resolve_updated_instances(&updated_instances);
2115 }
2116
2117 fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2118 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2119 return;
2120 };
2121
2122 for answer in msg.answers().iter() {
2123 let mut new_records = Vec::new();
2124
2125 let name = answer.get_name();
2126 let Some(probe) = dns_registry.probing.get_mut(name) else {
2127 continue;
2128 };
2129
2130 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2132 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2133 if !valid_ip_on_intf(&answer_addr.address(), intf) {
2134 debug!(
2135 "conflict handler: answer addr {:?} not in the subnet of {:?}",
2136 answer_addr, intf
2137 );
2138 continue;
2139 }
2140 }
2141
2142 let any_match = probe.records.iter().any(|r| {
2145 r.get_type() == answer.get_type()
2146 && r.get_class() == answer.get_class()
2147 && r.rrdata_match(answer.as_ref())
2148 });
2149 if any_match {
2150 continue; }
2152 }
2153
2154 probe.records.retain(|record| {
2155 if record.get_type() == answer.get_type()
2156 && record.get_class() == answer.get_class()
2157 && !record.rrdata_match(answer.as_ref())
2158 {
2159 debug!(
2160 "found conflict name: '{name}' record: {}: {} PEER: {}",
2161 record.get_type(),
2162 record.rdata_print(),
2163 answer.rdata_print()
2164 );
2165
2166 let mut new_record = record.clone();
2169 let new_name = match record.get_type() {
2170 RRType::A => hostname_change(name),
2171 RRType::AAAA => hostname_change(name),
2172 _ => name_change(name),
2173 };
2174 new_record.get_record_mut().set_new_name(new_name);
2175 new_records.push(new_record);
2176 return false; }
2178
2179 true
2180 });
2181
2182 let create_time = current_time_millis() + fastrand::u64(0..250);
2189
2190 let waiting_services = probe.waiting_services.clone();
2191
2192 for record in new_records {
2193 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2194 self.timers.push(Reverse(create_time));
2195 }
2196
2197 dns_registry.name_changes.insert(
2199 record.get_record().get_original_name().to_string(),
2200 record.get_name().to_string(),
2201 );
2202
2203 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2204 Some(p) => p,
2205 None => {
2206 let new_probe = dns_registry
2207 .probing
2208 .entry(record.get_name().to_string())
2209 .or_insert_with(|| {
2210 debug!("conflict handler: new probe of {}", record.get_name());
2211 Probe::new(create_time)
2212 });
2213 self.timers.push(Reverse(new_probe.next_send));
2214 new_probe
2215 }
2216 };
2217
2218 debug!(
2219 "insert record with new name '{}' {} into probe",
2220 record.get_name(),
2221 record.get_type()
2222 );
2223 new_probe.insert_record(record);
2224
2225 new_probe.waiting_services.extend(waiting_services.clone());
2226 }
2227 }
2228 }
2229
2230 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2237 let mut resolved: HashSet<String> = HashSet::new();
2238 let mut unresolved: HashSet<String> = HashSet::new();
2239 let mut removed_instances = HashMap::new();
2240
2241 for (ty_domain, records) in self.cache.all_ptr().iter() {
2242 if !self.service_queriers.contains_key(ty_domain) {
2243 continue;
2245 }
2246
2247 for record in records.iter() {
2248 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2249 if updated_instances.contains(dns_ptr.alias()) {
2250 if let Ok(info) =
2251 self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2252 {
2253 if info.is_ready() {
2254 debug!("call queriers to resolve {}", dns_ptr.alias());
2255 resolved.insert(dns_ptr.alias().to_string());
2256 call_service_listener(
2257 &self.service_queriers,
2258 ty_domain,
2259 ServiceEvent::ServiceResolved(info),
2260 );
2261 } else {
2262 if self.resolved.remove(dns_ptr.alias()) {
2263 removed_instances
2264 .entry(ty_domain.to_string())
2265 .or_insert_with(HashSet::new)
2266 .insert(dns_ptr.alias().to_string());
2267 }
2268 unresolved.insert(dns_ptr.alias().to_string());
2269 }
2270 }
2271 }
2272 }
2273 }
2274 }
2275
2276 for instance in resolved.drain() {
2277 self.pending_resolves.remove(&instance);
2278 self.resolved.insert(instance);
2279 }
2280
2281 for instance in unresolved.drain() {
2282 self.add_pending_resolve(instance);
2283 }
2284
2285 self.notify_service_removal(removed_instances);
2286 }
2287
2288 fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2290 let sock = match self.intf_socks.get(intf) {
2291 Some(sock) => sock,
2292 None => return,
2293 };
2294 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2295
2296 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2299
2300 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2301 debug!("missing dns registry for intf {}", intf.ip());
2302 return;
2303 };
2304
2305 for question in msg.questions().iter() {
2306 trace!("query question: {:?}", &question);
2307
2308 let qtype = question.entry_type();
2309
2310 if qtype == RRType::PTR {
2311 for service in self.my_services.values() {
2312 if service.get_status(intf) != ServiceStatus::Announced {
2313 continue;
2314 }
2315
2316 if question.entry_name() == service.get_type()
2317 || service
2318 .get_subtype()
2319 .as_ref()
2320 .is_some_and(|v| v == question.entry_name())
2321 {
2322 add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2323 } else if question.entry_name() == META_QUERY {
2324 let ptr_added = out.add_answer(
2325 &msg,
2326 DnsPointer::new(
2327 question.entry_name(),
2328 RRType::PTR,
2329 CLASS_IN,
2330 service.get_other_ttl(),
2331 service.get_type().to_string(),
2332 ),
2333 );
2334 if !ptr_added {
2335 trace!("answer was not added for meta-query {:?}", &question);
2336 }
2337 }
2338 }
2339 } else {
2340 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2342 let probe_name = question.entry_name();
2343
2344 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2345 let now = current_time_millis();
2346
2347 if probe.start_time < now {
2351 let incoming_records: Vec<_> = msg
2352 .authorities()
2353 .iter()
2354 .filter(|r| r.get_name() == probe_name)
2355 .collect();
2356
2357 probe.tiebreaking(&incoming_records, now, probe_name);
2358 }
2359 }
2360 }
2361
2362 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2363 for service in self.my_services.values() {
2364 if service.get_status(intf) != ServiceStatus::Announced {
2365 continue;
2366 }
2367
2368 let service_hostname =
2369 match dns_registry.name_changes.get(service.get_hostname()) {
2370 Some(new_name) => new_name,
2371 None => service.get_hostname(),
2372 };
2373
2374 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2375 let intf_addrs = service.get_addrs_on_intf(intf);
2376 if intf_addrs.is_empty()
2377 && (qtype == RRType::A || qtype == RRType::AAAA)
2378 {
2379 let t = match qtype {
2380 RRType::A => "TYPE_A",
2381 RRType::AAAA => "TYPE_AAAA",
2382 _ => "invalid_type",
2383 };
2384 trace!(
2385 "Cannot find valid addrs for {} response on intf {:?}",
2386 t,
2387 &intf
2388 );
2389 return;
2390 }
2391 for address in intf_addrs {
2392 out.add_answer(
2393 &msg,
2394 DnsAddress::new(
2395 service_hostname,
2396 ip_address_rr_type(&address),
2397 CLASS_IN | CLASS_CACHE_FLUSH,
2398 service.get_host_ttl(),
2399 address,
2400 ),
2401 );
2402 }
2403 }
2404 }
2405 }
2406
2407 let query_name = question.entry_name().to_lowercase();
2408 let service_opt = self
2409 .my_services
2410 .iter()
2411 .find(|(k, _v)| {
2412 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2413 Some(new_name) => new_name,
2414 None => k,
2415 };
2416 service_name == &query_name
2417 })
2418 .map(|(_, v)| v);
2419
2420 let Some(service) = service_opt else {
2421 continue;
2422 };
2423
2424 if service.get_status(intf) != ServiceStatus::Announced {
2425 continue;
2426 }
2427
2428 if qtype == RRType::SRV || qtype == RRType::ANY {
2429 out.add_answer(
2430 &msg,
2431 DnsSrv::new(
2432 question.entry_name(),
2433 CLASS_IN | CLASS_CACHE_FLUSH,
2434 service.get_host_ttl(),
2435 service.get_priority(),
2436 service.get_weight(),
2437 service.get_port(),
2438 service.get_hostname().to_string(),
2439 ),
2440 );
2441 }
2442
2443 if qtype == RRType::TXT || qtype == RRType::ANY {
2444 out.add_answer(
2445 &msg,
2446 DnsTxt::new(
2447 question.entry_name(),
2448 CLASS_IN | CLASS_CACHE_FLUSH,
2449 service.get_host_ttl(),
2450 service.generate_txt(),
2451 ),
2452 );
2453 }
2454
2455 if qtype == RRType::SRV {
2456 let intf_addrs = service.get_addrs_on_intf(intf);
2457 if intf_addrs.is_empty() {
2458 debug!(
2459 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2460 &intf
2461 );
2462 return;
2463 }
2464 for address in intf_addrs {
2465 out.add_additional_answer(DnsAddress::new(
2466 service.get_hostname(),
2467 ip_address_rr_type(&address),
2468 CLASS_IN | CLASS_CACHE_FLUSH,
2469 service.get_host_ttl(),
2470 address,
2471 ));
2472 }
2473 }
2474 }
2475 }
2476
2477 if !out.answers_count() > 0 {
2478 out.set_id(msg.id());
2479 send_dns_outgoing(&out, intf, sock);
2480
2481 self.increase_counter(Counter::Respond, 1);
2482 self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2483 }
2484
2485 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2486 }
2487
2488 fn increase_counter(&mut self, counter: Counter, count: i64) {
2490 let key = counter.to_string();
2491 match self.counters.get_mut(&key) {
2492 Some(v) => *v += count,
2493 None => {
2494 self.counters.insert(key, count);
2495 }
2496 }
2497 }
2498
2499 fn set_counter(&mut self, counter: Counter, count: i64) {
2501 let key = counter.to_string();
2502 self.counters.insert(key, count);
2503 }
2504
2505 fn signal_sock_drain(&self) {
2506 let mut signal_buf = [0; 1024];
2507
2508 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2510 trace!(
2511 "signal socket recvd: {}",
2512 String::from_utf8_lossy(&signal_buf[0..sz])
2513 );
2514 }
2515 }
2516
2517 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2518 self.retransmissions.push(ReRun { next_time, command });
2519 self.add_timer(next_time);
2520 }
2521
2522 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2524 for (ty_domain, sender) in self.service_queriers.iter() {
2525 if let Some(instances) = expired.get(ty_domain) {
2526 for instance_name in instances {
2527 let event = ServiceEvent::ServiceRemoved(
2528 ty_domain.to_string(),
2529 instance_name.to_string(),
2530 );
2531 match sender.send(event) {
2532 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2533 Err(e) => debug!("Failed to send event: {}", e),
2534 }
2535 }
2536 }
2537 }
2538 }
2539
2540 fn exec_command(&mut self, command: Command, repeating: bool) {
2544 match command {
2545 Command::Browse(ty, next_delay, listener) => {
2546 self.exec_command_browse(repeating, ty, next_delay, listener);
2547 }
2548
2549 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2550 self.exec_command_resolve_hostname(
2551 repeating, hostname, next_delay, listener, timeout,
2552 );
2553 }
2554
2555 Command::Register(service_info) => {
2556 self.register_service(service_info);
2557 self.increase_counter(Counter::Register, 1);
2558 }
2559
2560 Command::RegisterResend(fullname, intf) => {
2561 trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2562 self.exec_command_register_resend(fullname, intf);
2563 }
2564
2565 Command::Unregister(fullname, resp_s) => {
2566 trace!("unregister service {} repeat {}", &fullname, &repeating);
2567 self.exec_command_unregister(repeating, fullname, resp_s);
2568 }
2569
2570 Command::UnregisterResend(packet, ip) => {
2571 self.exec_command_unregister_resend(packet, ip);
2572 }
2573
2574 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2575
2576 Command::StopResolveHostname(hostname) => {
2577 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2578 }
2579
2580 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2581
2582 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2583
2584 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2585 Ok(()) => trace!("Sent status to the client"),
2586 Err(e) => debug!("Failed to send status: {}", e),
2587 },
2588
2589 Command::Monitor(resp_s) => {
2590 self.monitors.push(resp_s);
2591 }
2592
2593 Command::SetOption(daemon_opt) => {
2594 self.process_set_option(daemon_opt);
2595 }
2596
2597 Command::GetOption(resp_s) => {
2598 let val = DaemonOptionVal {
2599 _service_name_len_max: self.service_name_len_max,
2600 ip_check_interval: self.ip_check_interval,
2601 };
2602 if let Err(e) = resp_s.send(val) {
2603 debug!("Failed to send options: {}", e);
2604 }
2605 }
2606
2607 Command::Verify(instance_fullname, timeout) => {
2608 self.exec_command_verify(instance_fullname, timeout, repeating);
2609 }
2610
2611 _ => {
2612 debug!("unexpected command: {:?}", &command);
2613 }
2614 }
2615 }
2616
2617 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2618 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2619 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2620 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2621 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2622 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2623 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2624 self.set_counter(Counter::Timer, self.timers.len() as i64);
2625
2626 let dns_registry_probe_count: usize = self
2627 .dns_registry_map
2628 .values()
2629 .map(|r| r.probing.len())
2630 .sum();
2631 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2632
2633 let dns_registry_active_count: usize = self
2634 .dns_registry_map
2635 .values()
2636 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2637 .sum();
2638 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2639
2640 let dns_registry_timer_count: usize = self
2641 .dns_registry_map
2642 .values()
2643 .map(|r| r.new_timers.len())
2644 .sum();
2645 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2646
2647 let dns_registry_name_change_count: usize = self
2648 .dns_registry_map
2649 .values()
2650 .map(|r| r.name_changes.len())
2651 .sum();
2652 self.set_counter(
2653 Counter::DnsRegistryNameChange,
2654 dns_registry_name_change_count as i64,
2655 );
2656
2657 if let Err(e) = resp_s.send(self.counters.clone()) {
2659 debug!("Failed to send metrics: {}", e);
2660 }
2661 }
2662
2663 fn exec_command_browse(
2664 &mut self,
2665 repeating: bool,
2666 ty: String,
2667 next_delay: u32,
2668 listener: Sender<ServiceEvent>,
2669 ) {
2670 let pretty_addrs: Vec<String> = self
2671 .intf_socks
2672 .keys()
2673 .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2674 .collect();
2675
2676 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2677 "{ty} on {} interfaces [{}]",
2678 pretty_addrs.len(),
2679 pretty_addrs.join(", ")
2680 ))) {
2681 debug!(
2682 "Failed to send SearchStarted({})(repeating:{}): {}",
2683 &ty, repeating, e
2684 );
2685 return;
2686 }
2687 if !repeating {
2688 self.service_queriers.insert(ty.clone(), listener.clone());
2692
2693 self.query_cache_for_service(&ty, &listener);
2695 }
2696
2697 self.send_query(&ty, RRType::PTR);
2698 self.increase_counter(Counter::Browse, 1);
2699
2700 let next_time = current_time_millis() + (next_delay * 1000) as u64;
2701 let max_delay = 60 * 60;
2702 let delay = cmp::min(next_delay * 2, max_delay);
2703 self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2704 }
2705
2706 fn exec_command_resolve_hostname(
2707 &mut self,
2708 repeating: bool,
2709 hostname: String,
2710 next_delay: u32,
2711 listener: Sender<HostnameResolutionEvent>,
2712 timeout: Option<u64>,
2713 ) {
2714 let addr_list: Vec<_> = self.intf_socks.keys().collect();
2715 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2716 "{} on addrs {:?}",
2717 &hostname, &addr_list
2718 ))) {
2719 debug!(
2720 "Failed to send ResolveStarted({})(repeating:{}): {}",
2721 &hostname, repeating, e
2722 );
2723 return;
2724 }
2725 if !repeating {
2726 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2727 self.query_cache_for_hostname(&hostname, listener.clone());
2729 }
2730
2731 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2732 self.increase_counter(Counter::ResolveHostname, 1);
2733
2734 let now = current_time_millis();
2735 let next_time = now + u64::from(next_delay) * 1000;
2736 let max_delay = 60 * 60;
2737 let delay = cmp::min(next_delay * 2, max_delay);
2738
2739 if self
2741 .hostname_resolvers
2742 .get(&hostname)
2743 .and_then(|(_sender, timeout)| *timeout)
2744 .map(|timeout| next_time < timeout)
2745 .unwrap_or(true)
2746 {
2747 self.add_retransmission(
2748 next_time,
2749 Command::ResolveHostname(hostname, delay, listener, None),
2750 );
2751 }
2752 }
2753
2754 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2755 let pending_query = self.query_unresolved(&instance);
2756 let max_try = 3;
2757 if pending_query && try_count < max_try {
2758 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2761 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2762 }
2763 }
2764
2765 fn exec_command_unregister(
2766 &mut self,
2767 repeating: bool,
2768 fullname: String,
2769 resp_s: Sender<UnregisterStatus>,
2770 ) {
2771 let response = match self.my_services.remove_entry(&fullname) {
2772 None => {
2773 debug!("unregister: cannot find such service {}", &fullname);
2774 UnregisterStatus::NotFound
2775 }
2776 Some((_k, info)) => {
2777 let mut timers = Vec::new();
2778 let mut multicast_sent_trackers = HashSet::new();
2780
2781 for (intf, sock) in self.intf_socks.iter() {
2782 if let Some(tracker) = multicast_send_tracker(intf) {
2783 if multicast_sent_trackers.contains(&tracker) {
2784 continue; }
2786 multicast_sent_trackers.insert(tracker);
2787 }
2788 let packet = self.unregister_service(&info, intf, sock);
2789 if !repeating && !packet.is_empty() {
2791 let next_time = current_time_millis() + 120;
2792 self.retransmissions.push(ReRun {
2793 next_time,
2794 command: Command::UnregisterResend(packet, intf.clone()),
2795 });
2796 timers.push(next_time);
2797 }
2798 }
2799
2800 for t in timers {
2801 self.add_timer(t);
2802 }
2803
2804 self.increase_counter(Counter::Unregister, 1);
2805 UnregisterStatus::OK
2806 }
2807 };
2808 if let Err(e) = resp_s.send(response) {
2809 debug!("unregister: failed to send response: {}", e);
2810 }
2811 }
2812
2813 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2814 if let Some(sock) = self.intf_socks.get(&intf) {
2815 debug!("UnregisterResend from {}", &intf.ip());
2816 multicast_on_intf(&packet[..], &intf, sock);
2817 self.increase_counter(Counter::UnregisterResend, 1);
2818 }
2819 }
2820
2821 fn exec_command_stop_browse(&mut self, ty_domain: String) {
2822 match self.service_queriers.remove_entry(&ty_domain) {
2823 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2824 Some((ty, sender)) => {
2825 trace!("StopBrowse: removed queryer for {}", &ty);
2827 let mut i = 0;
2828 while i < self.retransmissions.len() {
2829 if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2830 if t == &ty {
2831 self.retransmissions.remove(i);
2832 trace!("StopBrowse: removed retransmission for {}", &ty);
2833 continue;
2834 }
2835 }
2836 i += 1;
2837 }
2838
2839 self.cache.remove_service_type(&ty_domain);
2841
2842 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2844 Ok(()) => trace!("Sent SearchStopped to the listener"),
2845 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2846 }
2847 }
2848 }
2849 }
2850
2851 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2852 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2853 trace!("StopResolve: removed queryer for {}", &host);
2855 let mut i = 0;
2856 while i < self.retransmissions.len() {
2857 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2858 if t == &host {
2859 self.retransmissions.remove(i);
2860 trace!("StopResolve: removed retransmission for {}", &host);
2861 continue;
2862 }
2863 }
2864 i += 1;
2865 }
2866
2867 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2869 Ok(()) => trace!("Sent SearchStopped to the listener"),
2870 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2871 }
2872 }
2873 }
2874
2875 fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2876 let Some(info) = self.my_services.get_mut(&fullname) else {
2877 trace!("announce: cannot find such service {}", &fullname);
2878 return;
2879 };
2880
2881 let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2882 return;
2883 };
2884
2885 let Some(sock) = self.intf_socks.get(&intf) else {
2886 return;
2887 };
2888
2889 if announce_service_on_intf(dns_registry, info, &intf, sock) {
2890 let mut hostname = info.get_hostname();
2891 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2892 hostname = new_name;
2893 }
2894 let service_name = match dns_registry.name_changes.get(&fullname) {
2895 Some(new_name) => new_name.to_string(),
2896 None => fullname,
2897 };
2898
2899 debug!("resend: announce service {} on {}", service_name, intf.ip());
2900
2901 notify_monitors(
2902 &mut self.monitors,
2903 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2904 );
2905 info.set_status(&intf, ServiceStatus::Announced);
2906 } else {
2907 debug!("register-resend should not fail");
2908 }
2909
2910 self.increase_counter(Counter::RegisterResend, 1);
2911 }
2912
2913 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2914 let now = current_time_millis();
2924 let expire_at = if repeating {
2925 None
2926 } else {
2927 Some(now + timeout.as_millis() as u64)
2928 };
2929
2930 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2932
2933 if !record_vec.is_empty() {
2934 let query_vec: Vec<(&str, RRType)> = record_vec
2935 .iter()
2936 .map(|(record, rr_type)| (record.as_str(), *rr_type))
2937 .collect();
2938 self.send_query_vec(&query_vec);
2939
2940 if let Some(new_expire) = expire_at {
2941 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2945 }
2946 }
2947 }
2948
2949 fn refresh_active_services(&mut self) {
2951 let mut query_ptr_count = 0;
2952 let mut query_srv_count = 0;
2953 let mut new_timers = HashSet::new();
2954 let mut query_addr_count = 0;
2955
2956 for (ty_domain, _sender) in self.service_queriers.iter() {
2957 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2958 if !refreshed_timers.is_empty() {
2959 trace!("sending refresh query for PTR: {}", ty_domain);
2960 self.send_query(ty_domain, RRType::PTR);
2961 query_ptr_count += 1;
2962 new_timers.extend(refreshed_timers);
2963 }
2964
2965 let (instances, timers) = self.cache.refresh_due_srv(ty_domain);
2966 for instance in instances.iter() {
2967 trace!("sending refresh query for SRV: {}", instance);
2968 self.send_query(instance, RRType::SRV);
2969 query_srv_count += 1;
2970 }
2971 new_timers.extend(timers);
2972 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2973 for hostname in hostnames.iter() {
2974 trace!("sending refresh queries for A and AAAA: {}", hostname);
2975 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2976 query_addr_count += 2;
2977 }
2978 new_timers.extend(timers);
2979 }
2980
2981 for timer in new_timers {
2982 self.add_timer(timer);
2983 }
2984
2985 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2986 self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
2987 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2988 }
2989}
2990
2991#[derive(Debug)]
2994pub enum ServiceEvent {
2995 SearchStarted(String),
2997
2998 ServiceFound(String, String),
3000
3001 ServiceResolved(ServiceInfo),
3003
3004 ServiceRemoved(String, String),
3006
3007 SearchStopped(String),
3009}
3010
3011#[derive(Debug)]
3014#[non_exhaustive]
3015pub enum HostnameResolutionEvent {
3016 SearchStarted(String),
3018 AddressesFound(String, HashSet<IpAddr>),
3020 AddressesRemoved(String, HashSet<IpAddr>),
3022 SearchTimeout(String),
3024 SearchStopped(String),
3026}
3027
3028#[derive(Clone, Debug)]
3031#[non_exhaustive]
3032pub enum DaemonEvent {
3033 Announce(String, String),
3035
3036 Error(Error),
3038
3039 IpAdd(IpAddr),
3041
3042 IpDel(IpAddr),
3044
3045 NameChange(DnsNameChange),
3048
3049 Respond(IpAddr),
3051}
3052
3053#[derive(Clone, Debug)]
3056pub struct DnsNameChange {
3057 pub original: String,
3059
3060 pub new_name: String,
3070
3071 pub rr_type: RRType,
3073
3074 pub intf_name: String,
3076}
3077
3078#[derive(Debug)]
3080enum Command {
3081 Browse(String, u32, Sender<ServiceEvent>),
3083
3084 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
3089
3090 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, Interface), UnregisterResend(Vec<u8>, Interface), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3111
3112 GetStatus(Sender<DaemonStatus>),
3114
3115 Monitor(Sender<DaemonEvent>),
3117
3118 SetOption(DaemonOption),
3119
3120 GetOption(Sender<DaemonOptionVal>),
3121
3122 Verify(String, Duration),
3127
3128 Exit(Sender<DaemonStatus>),
3129}
3130
3131impl fmt::Display for Command {
3132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3133 match self {
3134 Self::Browse(_, _, _) => write!(f, "Command Browse"),
3135 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3136 Self::Exit(_) => write!(f, "Command Exit"),
3137 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3138 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3139 Self::Monitor(_) => write!(f, "Command Monitor"),
3140 Self::Register(_) => write!(f, "Command Register"),
3141 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3142 Self::SetOption(_) => write!(f, "Command SetOption"),
3143 Self::GetOption(_) => write!(f, "Command GetOption"),
3144 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3145 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3146 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3147 Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3148 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3149 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3150 }
3151 }
3152}
3153
3154struct DaemonOptionVal {
3155 _service_name_len_max: u8,
3156 ip_check_interval: u64,
3157}
3158
3159#[derive(Debug)]
3160enum DaemonOption {
3161 ServiceNameLenMax(u8),
3162 IpCheckInterval(u64),
3163 EnableInterface(Vec<IfKind>),
3164 DisableInterface(Vec<IfKind>),
3165 MulticastLoopV4(bool),
3166 MulticastLoopV6(bool),
3167}
3168
3169const DOMAIN_LEN: usize = "._tcp.local.".len();
3171
3172fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3174 if ty_domain.len() <= DOMAIN_LEN + 1 {
3175 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3177 }
3178
3179 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3181 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3182 }
3183 Ok(())
3184}
3185
3186fn check_domain_suffix(name: &str) -> Result<()> {
3188 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3189 return Err(e_fmt!(
3190 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3191 name
3192 ));
3193 }
3194
3195 Ok(())
3196}
3197
3198fn check_service_name(fullname: &str) -> Result<()> {
3206 check_domain_suffix(fullname)?;
3207
3208 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3209 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3210
3211 if &name[0..1] != "_" {
3212 return Err(e_fmt!("Service name must start with '_'"));
3213 }
3214
3215 let name = &name[1..];
3216
3217 if name.contains("--") {
3218 return Err(e_fmt!("Service name must not contain '--'"));
3219 }
3220
3221 if name.starts_with('-') || name.ends_with('-') {
3222 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3223 }
3224
3225 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3226 if ascii_count < 1 {
3227 return Err(e_fmt!(
3228 "Service name must contain at least one letter (eg: 'A-Za-z')"
3229 ));
3230 }
3231
3232 Ok(())
3233}
3234
3235fn check_hostname(hostname: &str) -> Result<()> {
3237 if !hostname.ends_with(".local.") {
3238 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3239 }
3240
3241 if hostname == ".local." {
3242 return Err(e_fmt!(
3243 "The part of the hostname before '.local.' cannot be empty"
3244 ));
3245 }
3246
3247 if hostname.len() > 255 {
3248 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3249 }
3250
3251 Ok(())
3252}
3253
3254fn call_service_listener(
3255 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3256 ty_domain: &str,
3257 event: ServiceEvent,
3258) {
3259 if let Some(listener) = listeners_map.get(ty_domain) {
3260 match listener.send(event) {
3261 Ok(()) => trace!("Sent event to listener successfully"),
3262 Err(e) => debug!("Failed to send event: {}", e),
3263 }
3264 }
3265}
3266
3267fn call_hostname_resolution_listener(
3268 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3269 hostname: &str,
3270 event: HostnameResolutionEvent,
3271) {
3272 let hostname_lower = hostname.to_lowercase();
3273 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3274 match listener.send(event) {
3275 Ok(()) => trace!("Sent event to listener successfully"),
3276 Err(e) => debug!("Failed to send event: {}", e),
3277 }
3278 }
3279}
3280
3281fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3284 if_addrs::get_if_addrs()
3285 .unwrap_or_default()
3286 .into_iter()
3287 .filter(|i| !i.is_loopback() || with_loopback)
3288 .collect()
3289}
3290
3291fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3293 let qtype = if out.is_query() { "query" } else { "response" };
3294 trace!(
3295 "send outgoing {}: {} questions {} answers {} authorities {} additional",
3296 qtype,
3297 out.questions().len(),
3298 out.answers_count(),
3299 out.authorities().len(),
3300 out.additionals().len()
3301 );
3302 let packet_list = out.to_data_on_wire();
3303 for packet in packet_list.iter() {
3304 multicast_on_intf(packet, intf, sock);
3305 }
3306 packet_list
3307}
3308
3309fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3311 if packet.len() > MAX_MSG_ABSOLUTE {
3312 debug!("Drop over-sized packet ({})", packet.len());
3313 return;
3314 }
3315
3316 let addr: SocketAddr = match intf.addr {
3317 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3318 if_addrs::IfAddr::V6(_) => {
3319 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3320 sock.set_scope_id(intf.index.unwrap_or(0)); sock.into()
3322 }
3323 };
3324
3325 send_packet(packet, addr, intf, socket);
3326}
3327
3328fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3330 match sock.send_to(packet, addr) {
3331 Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3332 Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3333 }
3334}
3335
3336fn valid_instance_name(name: &str) -> bool {
3340 name.split('.').count() >= 5
3341}
3342
3343fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3344 monitors.retain(|sender| {
3345 if let Err(e) = sender.try_send(event.clone()) {
3346 debug!("notify_monitors: try_send: {}", &e);
3347 if matches!(e, TrySendError::Disconnected(_)) {
3348 return false; }
3350 }
3351 true
3352 });
3353}
3354
3355fn prepare_announce(
3358 info: &ServiceInfo,
3359 intf: &Interface,
3360 dns_registry: &mut DnsRegistry,
3361) -> Option<DnsOutgoing> {
3362 let intf_addrs = info.get_addrs_on_intf(intf);
3363 if intf_addrs.is_empty() {
3364 trace!("No valid addrs to add on intf {:?}", &intf);
3365 return None;
3366 }
3367
3368 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3370 Some(new_name) => new_name,
3371 None => info.get_fullname(),
3372 };
3373
3374 debug!(
3375 "prepare to announce service {service_fullname} on {}: {}",
3376 &intf.name,
3377 &intf.ip()
3378 );
3379
3380 let mut probing_count = 0;
3381 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3382 let create_time = current_time_millis() + fastrand::u64(0..250);
3383
3384 out.add_answer_at_time(
3385 DnsPointer::new(
3386 info.get_type(),
3387 RRType::PTR,
3388 CLASS_IN,
3389 info.get_other_ttl(),
3390 service_fullname.to_string(),
3391 ),
3392 0,
3393 );
3394
3395 if let Some(sub) = info.get_subtype() {
3396 trace!("Adding subdomain {}", sub);
3397 out.add_answer_at_time(
3398 DnsPointer::new(
3399 sub,
3400 RRType::PTR,
3401 CLASS_IN,
3402 info.get_other_ttl(),
3403 service_fullname.to_string(),
3404 ),
3405 0,
3406 );
3407 }
3408
3409 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3411 Some(new_name) => new_name.to_string(),
3412 None => info.get_hostname().to_string(),
3413 };
3414
3415 let mut srv = DnsSrv::new(
3416 info.get_fullname(),
3417 CLASS_IN | CLASS_CACHE_FLUSH,
3418 info.get_host_ttl(),
3419 info.get_priority(),
3420 info.get_weight(),
3421 info.get_port(),
3422 hostname,
3423 );
3424
3425 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3426 srv.get_record_mut().set_new_name(new_name.to_string());
3427 }
3428
3429 if !info.requires_probe()
3430 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3431 {
3432 out.add_answer_at_time(srv, 0);
3433 } else {
3434 probing_count += 1;
3435 }
3436
3437 let mut txt = DnsTxt::new(
3440 info.get_fullname(),
3441 CLASS_IN | CLASS_CACHE_FLUSH,
3442 info.get_other_ttl(),
3443 info.generate_txt(),
3444 );
3445
3446 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3447 txt.get_record_mut().set_new_name(new_name.to_string());
3448 }
3449
3450 if !info.requires_probe()
3451 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3452 {
3453 out.add_answer_at_time(txt, 0);
3454 } else {
3455 probing_count += 1;
3456 }
3457
3458 let hostname = info.get_hostname();
3461 for address in intf_addrs {
3462 let mut dns_addr = DnsAddress::new(
3463 hostname,
3464 ip_address_rr_type(&address),
3465 CLASS_IN | CLASS_CACHE_FLUSH,
3466 info.get_host_ttl(),
3467 address,
3468 );
3469
3470 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3471 dns_addr.get_record_mut().set_new_name(new_name.to_string());
3472 }
3473
3474 if !info.requires_probe()
3475 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3476 {
3477 out.add_answer_at_time(dns_addr, 0);
3478 } else {
3479 probing_count += 1;
3480 }
3481 }
3482
3483 if probing_count > 0 {
3484 return None;
3485 }
3486
3487 Some(out)
3488}
3489
3490fn announce_service_on_intf(
3493 dns_registry: &mut DnsRegistry,
3494 info: &ServiceInfo,
3495 intf: &Interface,
3496 sock: &MioUdpSocket,
3497) -> bool {
3498 if let Some(out) = prepare_announce(info, intf, dns_registry) {
3499 send_dns_outgoing(&out, intf, sock);
3500 return true;
3501 }
3502 false
3503}
3504
3505fn name_change(original: &str) -> String {
3513 let mut parts: Vec<_> = original.split('.').collect();
3514 let Some(first_part) = parts.get_mut(0) else {
3515 return format!("{original} (2)");
3516 };
3517
3518 let mut new_name = format!("{} (2)", first_part);
3519
3520 if let Some(paren_pos) = first_part.rfind(" (") {
3522 if let Some(end_paren) = first_part[paren_pos..].find(')') {
3524 let absolute_end_pos = paren_pos + end_paren;
3525 if absolute_end_pos == first_part.len() - 1 {
3527 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3530 let base_name = &first_part[..paren_pos];
3531 new_name = format!("{} ({})", base_name, number + 1)
3532 }
3533 }
3534 }
3535 }
3536
3537 *first_part = &new_name;
3538 parts.join(".")
3539}
3540
3541fn hostname_change(original: &str) -> String {
3549 let mut parts: Vec<_> = original.split('.').collect();
3550 let Some(first_part) = parts.get_mut(0) else {
3551 return format!("{original}-2");
3552 };
3553
3554 let mut new_name = format!("{}-2", first_part);
3555
3556 if let Some(hyphen_pos) = first_part.rfind('-') {
3558 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3560 let base_name = &first_part[..hyphen_pos];
3561 new_name = format!("{}-{}", base_name, number + 1);
3562 }
3563 }
3564
3565 *first_part = &new_name;
3566 parts.join(".")
3567}
3568
3569fn add_answer_with_additionals(
3570 out: &mut DnsOutgoing,
3571 msg: &DnsIncoming,
3572 service: &ServiceInfo,
3573 intf: &Interface,
3574 dns_registry: &DnsRegistry,
3575) {
3576 let intf_addrs = service.get_addrs_on_intf(intf);
3577 if intf_addrs.is_empty() {
3578 trace!("No addrs on LAN of intf {:?}", intf);
3579 return;
3580 }
3581
3582 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3584 Some(new_name) => new_name,
3585 None => service.get_fullname(),
3586 };
3587
3588 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3589 Some(new_name) => new_name,
3590 None => service.get_hostname(),
3591 };
3592
3593 let ptr_added = out.add_answer(
3594 msg,
3595 DnsPointer::new(
3596 service.get_type(),
3597 RRType::PTR,
3598 CLASS_IN,
3599 service.get_other_ttl(),
3600 service_fullname.to_string(),
3601 ),
3602 );
3603
3604 if !ptr_added {
3605 trace!("answer was not added for msg {:?}", msg);
3606 return;
3607 }
3608
3609 if let Some(sub) = service.get_subtype() {
3610 trace!("Adding subdomain {}", sub);
3611 out.add_additional_answer(DnsPointer::new(
3612 sub,
3613 RRType::PTR,
3614 CLASS_IN,
3615 service.get_other_ttl(),
3616 service_fullname.to_string(),
3617 ));
3618 }
3619
3620 out.add_additional_answer(DnsSrv::new(
3623 service_fullname,
3624 CLASS_IN | CLASS_CACHE_FLUSH,
3625 service.get_host_ttl(),
3626 service.get_priority(),
3627 service.get_weight(),
3628 service.get_port(),
3629 hostname.to_string(),
3630 ));
3631
3632 out.add_additional_answer(DnsTxt::new(
3633 service_fullname,
3634 CLASS_IN | CLASS_CACHE_FLUSH,
3635 service.get_host_ttl(),
3636 service.generate_txt(),
3637 ));
3638
3639 for address in intf_addrs {
3640 out.add_additional_answer(DnsAddress::new(
3641 hostname,
3642 ip_address_rr_type(&address),
3643 CLASS_IN | CLASS_CACHE_FLUSH,
3644 service.get_host_ttl(),
3645 address,
3646 ));
3647 }
3648}
3649
3650#[cfg(test)]
3651mod tests {
3652 use super::{
3653 check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3654 name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3655 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3656 MDNS_PORT,
3657 };
3658 use crate::{
3659 dns_parser::{DnsOutgoing, DnsPointer, RRType, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE},
3660 service_daemon::check_hostname,
3661 };
3662 use std::{
3663 net::{SocketAddr, SocketAddrV4},
3664 time::Duration,
3665 };
3666 use test_log::test;
3667
3668 #[test]
3669 fn test_socketaddr_print() {
3670 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3671 let print = format!("{}", addr);
3672 assert_eq!(print, "224.0.0.251:5353");
3673 }
3674
3675 #[test]
3676 fn test_instance_name() {
3677 assert!(valid_instance_name("my-laser._printer._tcp.local."));
3678 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3679 assert!(!valid_instance_name("_printer._tcp.local."));
3680 }
3681
3682 #[test]
3683 fn test_check_service_name_length() {
3684 let result = check_service_name_length("_tcp", 100);
3685 assert!(result.is_err());
3686 if let Err(e) = result {
3687 println!("{}", e);
3688 }
3689 }
3690
3691 #[test]
3692 fn test_check_hostname() {
3693 for hostname in &[
3695 "my_host.local.",
3696 &("A".repeat(255 - ".local.".len()) + ".local."),
3697 ] {
3698 let result = check_hostname(hostname);
3699 assert!(result.is_ok());
3700 }
3701
3702 for hostname in &[
3704 "my_host.local",
3705 ".local.",
3706 &("A".repeat(256 - ".local.".len()) + ".local."),
3707 ] {
3708 let result = check_hostname(hostname);
3709 assert!(result.is_err());
3710 if let Err(e) = result {
3711 println!("{}", e);
3712 }
3713 }
3714 }
3715
3716 #[test]
3717 fn test_check_domain_suffix() {
3718 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3719 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3720 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3721 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3722 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3723 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3724 }
3725
3726 #[test]
3727 fn test_service_with_temporarily_invalidated_ptr() {
3728 let d = ServiceDaemon::new().expect("Failed to create daemon");
3730
3731 let service = "_test_inval_ptr._udp.local.";
3732 let host_name = "my_host_tmp_invalidated_ptr.local.";
3733 let intfs: Vec<_> = my_ip_interfaces(false);
3734 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3735 let port = 5201;
3736 let my_service =
3737 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3738 .expect("invalid service info")
3739 .enable_addr_auto();
3740 let result = d.register(my_service.clone());
3741 assert!(result.is_ok());
3742
3743 let browse_chan = d.browse(service).unwrap();
3745 let timeout = Duration::from_secs(2);
3746 let mut resolved = false;
3747
3748 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3749 match event {
3750 ServiceEvent::ServiceResolved(info) => {
3751 resolved = true;
3752 println!("Resolved a service of {}", &info.get_fullname());
3753 break;
3754 }
3755 e => {
3756 println!("Received event {:?}", e);
3757 }
3758 }
3759 }
3760
3761 assert!(resolved);
3762
3763 println!("Stopping browse of {}", service);
3764 d.stop_browse(service).unwrap();
3767
3768 let mut stopped = false;
3773 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3774 match event {
3775 ServiceEvent::SearchStopped(_) => {
3776 stopped = true;
3777 println!("Stopped browsing service");
3778 break;
3779 }
3780 e => {
3784 println!("Received event {:?}", e);
3785 }
3786 }
3787 }
3788
3789 assert!(stopped);
3790
3791 let invalidate_ptr_packet = DnsPointer::new(
3793 my_service.get_type(),
3794 RRType::PTR,
3795 CLASS_IN,
3796 0,
3797 my_service.get_fullname().to_string(),
3798 );
3799
3800 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3801 packet_buffer.add_additional_answer(invalidate_ptr_packet);
3802
3803 for intf in intfs {
3804 let sock = new_socket_bind(&intf, true).unwrap();
3805 send_dns_outgoing(&packet_buffer, &intf, &sock);
3806 }
3807
3808 println!(
3809 "Sent PTR record invalidation. Starting second browse for {}",
3810 service
3811 );
3812
3813 let browse_chan = d.browse(service).unwrap();
3815
3816 resolved = false;
3817 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3818 match event {
3819 ServiceEvent::ServiceResolved(info) => {
3820 resolved = true;
3821 println!("Resolved a service of {}", &info.get_fullname());
3822 break;
3823 }
3824 e => {
3825 println!("Received event {:?}", e);
3826 }
3827 }
3828 }
3829
3830 assert!(resolved);
3831 d.shutdown().unwrap();
3832 }
3833
3834 #[test]
3835 fn test_expired_srv() {
3836 let service_type = "_expired-srv._udp.local.";
3838 let instance = "test_instance";
3839 let host_name = "expired_srv_host.local.";
3840 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3841 .unwrap()
3842 .enable_addr_auto();
3843 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
3848
3849 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3851 let result = mdns_server.register(my_service);
3852 assert!(result.is_ok());
3853
3854 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3855 let browse_chan = mdns_client.browse(service_type).unwrap();
3856 let timeout = Duration::from_secs(2);
3857 let mut resolved = false;
3858
3859 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3860 match event {
3861 ServiceEvent::ServiceResolved(info) => {
3862 resolved = true;
3863 println!("Resolved a service of {}", &info.get_fullname());
3864 break;
3865 }
3866 _ => {}
3867 }
3868 }
3869
3870 assert!(resolved);
3871
3872 mdns_server.shutdown().unwrap();
3874
3875 let expire_timeout = Duration::from_secs(new_ttl as u64);
3877 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3878 match event {
3879 ServiceEvent::ServiceRemoved(service_type, full_name) => {
3880 println!("Service removed: {}: {}", &service_type, &full_name);
3881 break;
3882 }
3883 _ => {}
3884 }
3885 }
3886 }
3887
3888 #[test]
3889 fn test_hostname_resolution_address_removed() {
3890 let server = ServiceDaemon::new().expect("Failed to create server");
3892 let hostname = "addr_remove_host._tcp.local.";
3893 let service_ip_addr = my_ip_interfaces(false)
3894 .iter()
3895 .find(|iface| iface.ip().is_ipv4())
3896 .map(|iface| iface.ip())
3897 .unwrap();
3898
3899 let mut my_service = ServiceInfo::new(
3900 "_host_res_test._tcp.local.",
3901 "my_instance",
3902 hostname,
3903 &service_ip_addr,
3904 1234,
3905 None,
3906 )
3907 .expect("invalid service info");
3908
3909 let addr_ttl = 2;
3911 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
3914
3915 let client = ServiceDaemon::new().expect("Failed to create client");
3917 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3918 let resolved = loop {
3919 match event_receiver.recv() {
3920 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3921 assert!(found_hostname == hostname);
3922 assert!(addresses.contains(&service_ip_addr));
3923 println!("address found: {:?}", &addresses);
3924 break true;
3925 }
3926 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3927 Ok(_event) => {}
3928 Err(_) => break false,
3929 }
3930 };
3931
3932 assert!(resolved);
3933
3934 server.shutdown().unwrap();
3936
3937 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3939 let removed = loop {
3940 match event_receiver.recv_timeout(timeout) {
3941 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3942 assert!(removed_host == hostname);
3943 assert!(addresses.contains(&service_ip_addr));
3944
3945 println!(
3946 "address removed: hostname: {} addresses: {:?}",
3947 &hostname, &addresses
3948 );
3949 break true;
3950 }
3951 Ok(_event) => {}
3952 Err(_) => {
3953 break false;
3954 }
3955 }
3956 };
3957
3958 assert!(removed);
3959
3960 client.shutdown().unwrap();
3961 }
3962
3963 #[test]
3964 fn test_refresh_ptr() {
3965 let service_type = "_refresh-ptr._udp.local.";
3967 let instance = "test_instance";
3968 let host_name = "refresh_ptr_host.local.";
3969 let service_ip_addr = my_ip_interfaces(false)
3970 .iter()
3971 .find(|iface| iface.ip().is_ipv4())
3972 .map(|iface| iface.ip())
3973 .unwrap();
3974
3975 let mut my_service = ServiceInfo::new(
3976 service_type,
3977 instance,
3978 host_name,
3979 &service_ip_addr,
3980 5023,
3981 None,
3982 )
3983 .unwrap();
3984
3985 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
3987
3988 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3990 let result = mdns_server.register(my_service);
3991 assert!(result.is_ok());
3992
3993 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3994 let browse_chan = mdns_client.browse(service_type).unwrap();
3995 let timeout = Duration::from_millis(1500); let mut resolved = false;
3997
3998 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4000 match event {
4001 ServiceEvent::ServiceResolved(info) => {
4002 resolved = true;
4003 println!("Resolved a service of {}", &info.get_fullname());
4004 break;
4005 }
4006 _ => {}
4007 }
4008 }
4009
4010 assert!(resolved);
4011
4012 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4014 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4015 println!("event: {:?}", &event);
4016 }
4017
4018 let metrics_chan = mdns_client.get_metrics().unwrap();
4020 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4021 let refresh_counter = metrics["cache-refresh-ptr"];
4022 assert_eq!(refresh_counter, 1);
4023
4024 mdns_server.shutdown().unwrap();
4026 mdns_client.shutdown().unwrap();
4027 }
4028
4029 #[test]
4030 fn test_name_change() {
4031 assert_eq!(name_change("foo.local."), "foo (2).local.");
4032 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4033 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4034 assert_eq!(name_change("foo"), "foo (2)");
4035 assert_eq!(name_change("foo (2)"), "foo (3)");
4036 assert_eq!(name_change(""), " (2)");
4037
4038 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4043
4044 #[test]
4045 fn test_hostname_change() {
4046 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4047 assert_eq!(hostname_change("foo"), "foo-2");
4048 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4049 assert_eq!(hostname_change("foo-9"), "foo-10");
4050 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4051 }
4052}