1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38 FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{
42 split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43 },
44 Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51 cmp::{self, Reverse},
52 collections::{BinaryHeap, HashMap, HashSet},
53 fmt,
54 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55 str, thread,
56 time::Duration,
57 vec,
58};
59
60pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79#[derive(Debug)]
81pub enum UnregisterStatus {
82 OK,
84 NotFound,
86}
87
88#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92 Running,
94
95 Shutdown,
97}
98
99#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103 Register,
104 RegisterResend,
105 Unregister,
106 UnregisterResend,
107 Browse,
108 ResolveHostname,
109 Respond,
110 CacheRefreshPTR,
111 CacheRefreshSRV,
112 CacheRefreshAddr,
113 KnownAnswerSuppression,
114}
115
116impl fmt::Display for Counter {
117 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118 match self {
119 Self::Register => write!(f, "register"),
120 Self::RegisterResend => write!(f, "register-resend"),
121 Self::Unregister => write!(f, "unregister"),
122 Self::UnregisterResend => write!(f, "unregister-resend"),
123 Self::Browse => write!(f, "browse"),
124 Self::ResolveHostname => write!(f, "resolve-hostname"),
125 Self::Respond => write!(f, "respond"),
126 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
127 Self::CacheRefreshSRV => write!(f, "cache-refresh-srv"),
128 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
129 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
130 }
131 }
132}
133
134pub type Metrics = HashMap<String, i64>;
137
138const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
144pub struct ServiceDaemon {
145 sender: Sender<Command>,
147
148 signal_addr: SocketAddr,
154}
155
156impl ServiceDaemon {
157 pub fn new() -> Result<Self> {
162 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
165
166 let signal_sock = UdpSocket::bind(signal_addr)
167 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
168
169 let signal_addr = signal_sock
171 .local_addr()
172 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
173
174 signal_sock
176 .set_nonblocking(true)
177 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
178
179 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
180
181 let (sender, receiver) = bounded(100);
182
183 let mio_sock = MioUdpSocket::from_std(signal_sock);
185 thread::Builder::new()
186 .name("mDNS_daemon".to_string())
187 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
188 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
189
190 Ok(Self {
191 sender,
192 signal_addr,
193 })
194 }
195
196 fn send_cmd(&self, cmd: Command) -> Result<()> {
199 let cmd_name = cmd.to_string();
200
201 self.sender.try_send(cmd).map_err(|e| match e {
203 TrySendError::Full(_) => Error::Again,
204 e => e_fmt!("flume::channel::send failed: {}", e),
205 })?;
206
207 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
209 let socket = UdpSocket::bind(addr)
210 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
211 socket
212 .send_to(cmd_name.as_bytes(), self.signal_addr)
213 .map_err(|e| {
214 e_fmt!(
215 "signal socket send_to {} ({}) failed: {}",
216 self.signal_addr,
217 cmd_name,
218 e
219 )
220 })?;
221
222 Ok(())
223 }
224
225 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
236 check_domain_suffix(service_type)?;
237
238 let (resp_s, resp_r) = bounded(10);
239 self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
240 Ok(resp_r)
241 }
242
243 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
248 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
249 }
250
251 pub fn resolve_hostname(
259 &self,
260 hostname: &str,
261 timeout: Option<u64>,
262 ) -> Result<Receiver<HostnameResolutionEvent>> {
263 check_hostname(hostname)?;
264 let (resp_s, resp_r) = bounded(10);
265 self.send_cmd(Command::ResolveHostname(
266 hostname.to_string(),
267 1,
268 resp_s,
269 timeout,
270 ))?;
271 Ok(resp_r)
272 }
273
274 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
279 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
280 }
281
282 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
290 check_service_name(service_info.get_fullname())?;
291 check_hostname(service_info.get_hostname())?;
292
293 self.send_cmd(Command::Register(service_info))
294 }
295
296 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
304 let (resp_s, resp_r) = bounded(1);
305 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
306 Ok(resp_r)
307 }
308
309 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
313 let (resp_s, resp_r) = bounded(100);
314 self.send_cmd(Command::Monitor(resp_s))?;
315 Ok(resp_r)
316 }
317
318 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
323 let (resp_s, resp_r) = bounded(1);
324 self.send_cmd(Command::Exit(resp_s))?;
325 Ok(resp_r)
326 }
327
328 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
334 let (resp_s, resp_r) = bounded(1);
335
336 if self.sender.is_disconnected() {
337 resp_s
338 .send(DaemonStatus::Shutdown)
339 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
340 } else {
341 self.send_cmd(Command::GetStatus(resp_s))?;
342 }
343
344 Ok(resp_r)
345 }
346
347 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
352 let (resp_s, resp_r) = bounded(1);
353 self.send_cmd(Command::GetMetrics(resp_s))?;
354 Ok(resp_r)
355 }
356
357 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
364 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
367 return Err(Error::Msg(format!(
368 "service name length max {} is too large",
369 len_max
370 )));
371 }
372
373 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
374 }
375
376 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
382 let interval_in_millis = interval_in_secs as u64 * 1000;
383 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
384 interval_in_millis,
385 )))
386 }
387
388 pub fn get_ip_check_interval(&self) -> Result<u32> {
390 let (resp_s, resp_r) = bounded(1);
391 self.send_cmd(Command::GetOption(resp_s))?;
392
393 let option = resp_r
394 .recv_timeout(Duration::from_secs(10))
395 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
396 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
397 Ok(ip_check_interval_in_secs as u32)
398 }
399
400 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
407 let if_kind_vec = if_kind.into_vec();
408 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
409 if_kind_vec.kinds,
410 )))
411 }
412
413 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
420 let if_kind_vec = if_kind.into_vec();
421 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
422 if_kind_vec.kinds,
423 )))
424 }
425
426 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
442 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
443 }
444
445 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
461 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
462 }
463
464 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
477 self.send_cmd(Command::Verify(instance_fullname, timeout))
478 }
479
480 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
481 let zc = Zeroconf::new(signal_sock, poller);
482
483 if let Some(cmd) = Self::run(zc, receiver) {
484 match cmd {
485 Command::Exit(resp_s) => {
486 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
489 debug!("exit: failed to send response of shutdown: {}", e);
490 }
491 }
492 _ => {
493 debug!("Unexpected command: {:?}", cmd);
494 }
495 }
496 }
497 }
498
499 fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
508 if let Err(e) = zc.poller.registry().register(
510 &mut zc.signal_sock,
511 mio::Token(SIGNAL_SOCK_EVENT_KEY),
512 mio::Interest::READABLE,
513 ) {
514 debug!("failed to add signal socket to the poller: {}", e);
515 return None;
516 }
517
518 for (intf, sock) in zc.intf_socks.iter_mut() {
520 let key =
521 Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
522
523 if let Err(e) =
524 zc.poller
525 .registry()
526 .register(sock, mio::Token(key), mio::Interest::READABLE)
527 {
528 debug!("add socket of {:?} to poller: {e}", intf);
529 return None;
530 }
531 }
532
533 let mut next_ip_check = if zc.ip_check_interval > 0 {
535 current_time_millis() + zc.ip_check_interval
536 } else {
537 0
538 };
539
540 if next_ip_check > 0 {
541 zc.add_timer(next_ip_check);
542 }
543
544 let mut events = mio::Events::with_capacity(1024);
547 loop {
548 let now = current_time_millis();
549
550 let earliest_timer = zc.peek_earliest_timer();
551 let timeout = earliest_timer.map(|timer| {
552 let millis = if timer > now { timer - now } else { 1 };
554 Duration::from_millis(millis)
555 });
556
557 events.clear();
559 match zc.poller.poll(&mut events, timeout) {
560 Ok(_) => zc.handle_poller_events(&events),
561 Err(e) => debug!("failed to select from sockets: {}", e),
562 }
563
564 let now = current_time_millis();
565
566 if let Some(timer) = earliest_timer {
568 if now >= timer {
569 zc.pop_earliest_timer();
570 }
571 }
572
573 for hostname in zc
575 .hostname_resolvers
576 .clone()
577 .into_iter()
578 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
579 .map(|(hostname, _)| hostname)
580 {
581 trace!("hostname resolver timeout for {}", &hostname);
582 call_hostname_resolution_listener(
583 &zc.hostname_resolvers,
584 &hostname,
585 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
586 );
587 call_hostname_resolution_listener(
588 &zc.hostname_resolvers,
589 &hostname,
590 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
591 );
592 zc.hostname_resolvers.remove(&hostname);
593 }
594
595 while let Ok(command) = receiver.try_recv() {
597 if matches!(command, Command::Exit(_)) {
598 zc.status = DaemonStatus::Shutdown;
599 return Some(command);
600 }
601 zc.exec_command(command, false);
602 }
603
604 let mut i = 0;
606 while i < zc.retransmissions.len() {
607 if now >= zc.retransmissions[i].next_time {
608 let rerun = zc.retransmissions.remove(i);
609 zc.exec_command(rerun.command, true);
610 } else {
611 i += 1;
612 }
613 }
614
615 zc.refresh_active_services();
617
618 let mut query_count = 0;
620 for (hostname, _sender) in zc.hostname_resolvers.iter() {
621 for (hostname, ip_addr) in
622 zc.cache.refresh_due_hostname_resolutions(hostname).iter()
623 {
624 zc.send_query(hostname, ip_address_rr_type(ip_addr));
625 query_count += 1;
626 }
627 }
628
629 zc.increase_counter(Counter::CacheRefreshAddr, query_count);
630
631 let now = current_time_millis();
633
634 let expired_services = zc.cache.evict_expired_services(now);
636 zc.notify_service_removal(expired_services);
637
638 let expired_addrs = zc.cache.evict_expired_addr(now);
640 for (hostname, addrs) in expired_addrs {
641 call_hostname_resolution_listener(
642 &zc.hostname_resolvers,
643 &hostname,
644 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
645 );
646 let instances = zc.cache.get_instances_on_host(&hostname);
647 let instance_set: HashSet<String> = instances.into_iter().collect();
648 zc.resolve_updated_instances(&instance_set);
649 }
650
651 zc.probing_handler();
653
654 if now >= next_ip_check && next_ip_check > 0 {
656 next_ip_check = now + zc.ip_check_interval;
657 zc.add_timer(next_ip_check);
658
659 zc.check_ip_changes();
660 }
661 }
662 }
663}
664
665fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
667 let intf_ip = &intf.ip();
670 match intf_ip {
671 IpAddr::V4(ip) => {
672 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
673 let sock = new_socket(addr.into(), true)?;
674
675 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
677 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
678
679 sock.set_multicast_if_v4(ip)
681 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
682
683 if !should_loop {
684 sock.set_multicast_loop_v4(false)
685 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
686 }
687
688 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
690 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
691 for packet in test_packets {
692 sock.send_to(&packet, &multicast_addr)
693 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
694 }
695 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
696 }
697 IpAddr::V6(ip) => {
698 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
699 let sock = new_socket(addr.into(), true)?;
700
701 sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
703 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
704
705 sock.set_multicast_if_v6(intf.index.unwrap_or(0))
707 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
708
709 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
714 }
715 }
716}
717
718fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
721 let domain = match addr {
722 SocketAddr::V4(_) => socket2::Domain::IPV4,
723 SocketAddr::V6(_) => socket2::Domain::IPV6,
724 };
725
726 let fd = Socket::new(domain, socket2::Type::DGRAM, None)
727 .map_err(|e| e_fmt!("create socket failed: {}", e))?;
728
729 fd.set_reuse_address(true)
730 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
731 #[cfg(unix)] fd.set_reuse_port(true)
733 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
734
735 if non_block {
736 fd.set_nonblocking(true)
737 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
738 }
739
740 fd.bind(&addr.into())
741 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
742
743 trace!("new socket bind to {}", &addr);
744 Ok(fd)
745}
746
747struct ReRun {
749 next_time: u64,
751 command: Command,
752}
753
754#[derive(Debug, Eq, Hash, PartialEq)]
756enum IpVersion {
757 V4,
758 V6,
759}
760
761#[derive(Debug, Eq, Hash, PartialEq)]
763struct MulticastSendTracker {
764 intf_index: u32,
765 ip_version: IpVersion,
766}
767
768fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
770 match intf.index {
771 Some(index) => {
772 let ip_ver = match intf.addr {
773 IfAddr::V4(_) => IpVersion::V4,
774 IfAddr::V6(_) => IpVersion::V6,
775 };
776 Some(MulticastSendTracker {
777 intf_index: index,
778 ip_version: ip_ver,
779 })
780 }
781 None => None,
782 }
783}
784
785#[derive(Debug, Clone)]
789#[non_exhaustive]
790pub enum IfKind {
791 All,
793
794 IPv4,
796
797 IPv6,
799
800 Name(String),
802
803 Addr(IpAddr),
805
806 LoopbackV4,
811
812 LoopbackV6,
814}
815
816impl IfKind {
817 fn matches(&self, intf: &Interface) -> bool {
819 match self {
820 Self::All => true,
821 Self::IPv4 => intf.ip().is_ipv4(),
822 Self::IPv6 => intf.ip().is_ipv6(),
823 Self::Name(ifname) => ifname == &intf.name,
824 Self::Addr(addr) => addr == &intf.ip(),
825 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
826 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
827 }
828 }
829}
830
831impl From<&str> for IfKind {
834 fn from(val: &str) -> Self {
835 Self::Name(val.to_string())
836 }
837}
838
839impl From<&String> for IfKind {
840 fn from(val: &String) -> Self {
841 Self::Name(val.to_string())
842 }
843}
844
845impl From<IpAddr> for IfKind {
847 fn from(val: IpAddr) -> Self {
848 Self::Addr(val)
849 }
850}
851
852pub struct IfKindVec {
854 kinds: Vec<IfKind>,
855}
856
857pub trait IntoIfKindVec {
859 fn into_vec(self) -> IfKindVec;
860}
861
862impl<T: Into<IfKind>> IntoIfKindVec for T {
863 fn into_vec(self) -> IfKindVec {
864 let if_kind: IfKind = self.into();
865 IfKindVec {
866 kinds: vec![if_kind],
867 }
868 }
869}
870
871impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
872 fn into_vec(self) -> IfKindVec {
873 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
874 IfKindVec { kinds }
875 }
876}
877
878struct IfSelection {
880 if_kind: IfKind,
882
883 selected: bool,
885}
886
887struct Zeroconf {
889 intf_socks: HashMap<Interface, MioUdpSocket>,
891
892 poll_ids: HashMap<usize, Interface>,
894
895 poll_id_count: usize,
897
898 my_services: HashMap<String, ServiceInfo>,
900
901 cache: DnsCache,
903
904 dns_registry_map: HashMap<Interface, DnsRegistry>,
906
907 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
918
919 counters: Metrics,
920
921 poller: Poll,
923
924 monitors: Vec<Sender<DaemonEvent>>,
926
927 service_name_len_max: u8,
929
930 ip_check_interval: u64,
932
933 if_selections: Vec<IfSelection>,
935
936 signal_sock: MioUdpSocket,
938
939 timers: BinaryHeap<Reverse<u64>>,
945
946 status: DaemonStatus,
947
948 pending_resolves: HashSet<String>,
950
951 resolved: HashSet<String>,
953
954 multicast_loop_v4: bool,
955
956 multicast_loop_v6: bool,
957}
958
959impl Zeroconf {
960 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
961 let my_ifaddrs = my_ip_interfaces(false);
963
964 let mut intf_socks = HashMap::new();
968 let mut dns_registry_map = HashMap::new();
969
970 for intf in my_ifaddrs {
971 let sock = match new_socket_bind(&intf, true) {
972 Ok(s) => s,
973 Err(e) => {
974 trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
975 continue;
976 }
977 };
978
979 dns_registry_map.insert(intf.clone(), DnsRegistry::new());
980
981 intf_socks.insert(intf, sock);
982 }
983
984 let monitors = Vec::new();
985 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
986 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
987
988 let timers = BinaryHeap::new();
989
990 let if_selections = vec![
992 IfSelection {
993 if_kind: IfKind::LoopbackV4,
994 selected: false,
995 },
996 IfSelection {
997 if_kind: IfKind::LoopbackV6,
998 selected: false,
999 },
1000 ];
1001
1002 let status = DaemonStatus::Running;
1003
1004 Self {
1005 intf_socks,
1006 poll_ids: HashMap::new(),
1007 poll_id_count: 0,
1008 my_services: HashMap::new(),
1009 cache: DnsCache::new(),
1010 dns_registry_map,
1011 hostname_resolvers: HashMap::new(),
1012 service_queriers: HashMap::new(),
1013 retransmissions: Vec::new(),
1014 counters: HashMap::new(),
1015 poller,
1016 monitors,
1017 service_name_len_max,
1018 ip_check_interval,
1019 if_selections,
1020 signal_sock,
1021 timers,
1022 status,
1023 pending_resolves: HashSet::new(),
1024 resolved: HashSet::new(),
1025 multicast_loop_v4: true,
1026 multicast_loop_v6: true,
1027 }
1028 }
1029
1030 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1031 match daemon_opt {
1032 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1033 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1034 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1035 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1036 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1037 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1038 }
1039 }
1040
1041 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1042 for if_kind in kinds {
1043 self.if_selections.push(IfSelection {
1044 if_kind,
1045 selected: true,
1046 });
1047 }
1048
1049 self.apply_intf_selections(my_ip_interfaces(true));
1050 }
1051
1052 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1053 for if_kind in kinds {
1054 self.if_selections.push(IfSelection {
1055 if_kind,
1056 selected: false,
1057 });
1058 }
1059
1060 self.apply_intf_selections(my_ip_interfaces(true));
1061 }
1062
1063 fn set_multicast_loop_v4(&mut self, on: bool) {
1064 for (_, sock) in self.intf_socks.iter_mut() {
1065 if let Err(e) = sock.set_multicast_loop_v4(on) {
1066 debug!("failed to set multicast loop v4: {e}");
1067 }
1068 }
1069 }
1070
1071 fn set_multicast_loop_v6(&mut self, on: bool) {
1072 for (_, sock) in self.intf_socks.iter_mut() {
1073 if let Err(e) = sock.set_multicast_loop_v6(on) {
1074 debug!("failed to set multicast loop v6: {e}");
1075 }
1076 }
1077 }
1078
1079 fn notify_monitors(&mut self, event: DaemonEvent) {
1080 self.monitors.retain(|sender| {
1082 if let Err(e) = sender.try_send(event.clone()) {
1083 debug!("notify_monitors: try_send: {}", &e);
1084 if matches!(e, TrySendError::Disconnected(_)) {
1085 return false; }
1087 }
1088 true
1089 });
1090 }
1091
1092 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1094 for (_, service_info) in self.my_services.iter_mut() {
1095 if service_info.is_addr_auto() {
1096 service_info.remove_ipaddr(addr);
1097 }
1098 }
1099 }
1100
1101 fn add_poll(&mut self, intf: Interface) -> usize {
1103 Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1104 }
1105
1106 fn add_poll_impl(
1110 poll_ids: &mut HashMap<usize, Interface>,
1111 poll_id_count: &mut usize,
1112 intf: Interface,
1113 ) -> usize {
1114 let key = *poll_id_count;
1115 *poll_id_count += 1;
1116 let _ = (*poll_ids).insert(key, intf);
1117 key
1118 }
1119
1120 fn add_timer(&mut self, next_time: u64) {
1121 self.timers.push(Reverse(next_time));
1122 }
1123
1124 fn peek_earliest_timer(&self) -> Option<u64> {
1125 self.timers.peek().map(|Reverse(v)| *v)
1126 }
1127
1128 fn pop_earliest_timer(&mut self) -> Option<u64> {
1129 self.timers.pop().map(|Reverse(v)| v)
1130 }
1131
1132 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1134 let intf_count = interfaces.len();
1135 let mut intf_selections = vec![true; intf_count];
1136
1137 for selection in self.if_selections.iter() {
1139 for i in 0..intf_count {
1141 if selection.if_kind.matches(&interfaces[i]) {
1142 intf_selections[i] = selection.selected;
1143 }
1144 }
1145 }
1146
1147 let mut selected_addrs = HashSet::new();
1148 for i in 0..intf_count {
1149 if intf_selections[i] {
1150 selected_addrs.insert(interfaces[i].addr.ip());
1151 }
1152 }
1153
1154 selected_addrs
1155 }
1156
1157 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1162 let intf_count = interfaces.len();
1164 let mut intf_selections = vec![true; intf_count];
1165
1166 for selection in self.if_selections.iter() {
1168 for i in 0..intf_count {
1170 if selection.if_kind.matches(&interfaces[i]) {
1171 intf_selections[i] = selection.selected;
1172 }
1173 }
1174 }
1175
1176 for (idx, intf) in interfaces.into_iter().enumerate() {
1178 if intf_selections[idx] {
1179 if !self.intf_socks.contains_key(&intf) {
1181 debug!("apply_intf_selections: add {:?}", &intf.ip());
1182 self.add_new_interface(intf);
1183 }
1184 } else {
1185 if let Some(mut sock) = self.intf_socks.remove(&intf) {
1187 match self.poller.registry().deregister(&mut sock) {
1188 Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1189 Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1190 }
1191
1192 self.poll_ids.retain(|_, v| v != &intf);
1194
1195 self.cache.remove_addrs_on_disabled_intf(&intf);
1197 }
1198 }
1199 }
1200 }
1201
1202 fn check_ip_changes(&mut self) {
1204 let my_ifaddrs = my_ip_interfaces(true);
1206
1207 let poll_ids = &mut self.poll_ids;
1208 let poller = &mut self.poller;
1209 let deleted_addrs = self
1211 .intf_socks
1212 .iter_mut()
1213 .filter_map(|(intf, sock)| {
1214 if !my_ifaddrs.contains(intf) {
1215 if let Err(e) = poller.registry().deregister(sock) {
1216 debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1217 }
1218 poll_ids.retain(|_, v| v != intf);
1220 Some(intf.ip())
1221 } else {
1222 None
1223 }
1224 })
1225 .collect::<Vec<IpAddr>>();
1226
1227 for ip in deleted_addrs.iter() {
1229 self.del_addr_in_my_services(ip);
1230 self.notify_monitors(DaemonEvent::IpDel(*ip));
1231 }
1232
1233 self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1235
1236 self.apply_intf_selections(my_ifaddrs);
1238 }
1239
1240 fn add_new_interface(&mut self, intf: Interface) {
1241 let new_ip = intf.ip();
1243 let should_loop = if new_ip.is_ipv4() {
1244 self.multicast_loop_v4
1245 } else {
1246 self.multicast_loop_v6
1247 };
1248 let mut sock = match new_socket_bind(&intf, should_loop) {
1249 Ok(s) => s,
1250 Err(e) => {
1251 debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1252 return;
1253 }
1254 };
1255
1256 let key = self.add_poll(intf.clone());
1258 if let Err(e) =
1259 self.poller
1260 .registry()
1261 .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1262 {
1263 debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1264 return;
1265 }
1266
1267 debug!("add new interface {}: {new_ip}", intf.name);
1268 let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1269 Some(registry) => registry,
1270 None => self
1271 .dns_registry_map
1272 .entry(intf.clone())
1273 .or_insert_with(DnsRegistry::new),
1274 };
1275
1276 for (_, service_info) in self.my_services.iter_mut() {
1277 if service_info.is_addr_auto() {
1278 service_info.insert_ipaddr(new_ip);
1279
1280 if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1281 debug!(
1282 "Announce service {} on {}",
1283 service_info.get_fullname(),
1284 intf.ip()
1285 );
1286 service_info.set_status(&intf, ServiceStatus::Announced);
1287 } else {
1288 for timer in dns_registry.new_timers.drain(..) {
1289 self.timers.push(Reverse(timer));
1290 }
1291 service_info.set_status(&intf, ServiceStatus::Probing);
1292 }
1293 }
1294 }
1295
1296 self.intf_socks.insert(intf, sock);
1297
1298 self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1300 }
1301
1302 fn register_service(&mut self, mut info: ServiceInfo) {
1311 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1313 debug!("check_service_name_length: {}", &e);
1314 self.notify_monitors(DaemonEvent::Error(e));
1315 return;
1316 }
1317
1318 if info.is_addr_auto() {
1319 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1320 for addr in selected_addrs {
1321 info.insert_ipaddr(addr);
1322 }
1323 }
1324
1325 debug!("register service {:?}", &info);
1326
1327 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1328 if !outgoing_addrs.is_empty() {
1329 self.notify_monitors(DaemonEvent::Announce(
1330 info.get_fullname().to_string(),
1331 format!("{:?}", &outgoing_addrs),
1332 ));
1333 }
1334
1335 let service_fullname = info.get_fullname().to_lowercase();
1338 self.my_services.insert(service_fullname, info);
1339 }
1340
1341 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1344 let mut outgoing_addrs = Vec::new();
1345 let mut multicast_sent_trackers = HashSet::new();
1347
1348 let mut outgoing_intfs = Vec::new();
1349
1350 for (intf, sock) in self.intf_socks.iter() {
1351 if let Some(tracker) = multicast_send_tracker(intf) {
1352 if multicast_sent_trackers.contains(&tracker) {
1353 continue; }
1355 }
1356
1357 let dns_registry = match self.dns_registry_map.get_mut(intf) {
1358 Some(registry) => registry,
1359 None => self
1360 .dns_registry_map
1361 .entry(intf.clone())
1362 .or_insert_with(DnsRegistry::new),
1363 };
1364
1365 if announce_service_on_intf(dns_registry, info, intf, sock) {
1366 if let Some(tracker) = multicast_send_tracker(intf) {
1367 multicast_sent_trackers.insert(tracker);
1368 }
1369 outgoing_addrs.push(intf.ip());
1370 outgoing_intfs.push(intf.clone());
1371
1372 debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1373
1374 info.set_status(intf, ServiceStatus::Announced);
1375 } else {
1376 for timer in dns_registry.new_timers.drain(..) {
1377 self.timers.push(Reverse(timer));
1378 }
1379 info.set_status(intf, ServiceStatus::Probing);
1380 }
1381 }
1382
1383 let next_time = current_time_millis() + 1000;
1387 for intf in outgoing_intfs {
1388 self.add_retransmission(
1389 next_time,
1390 Command::RegisterResend(info.get_fullname().to_string(), intf),
1391 );
1392 }
1393
1394 outgoing_addrs
1395 }
1396
1397 fn probing_handler(&mut self) {
1399 let now = current_time_millis();
1400
1401 for (intf, sock) in self.intf_socks.iter() {
1402 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1403 continue;
1404 };
1405
1406 let mut expired_probe_names = Vec::new();
1407 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1408
1409 for (name, probe) in dns_registry.probing.iter_mut() {
1410 if now >= probe.next_send {
1411 if probe.expired(now) {
1412 expired_probe_names.push(name.clone());
1414 } else {
1415 out.add_question(name, RRType::ANY);
1416
1417 for record in probe.records.iter() {
1425 out.add_authority(record.clone());
1426 }
1427
1428 probe.update_next_send(now);
1429
1430 self.timers.push(Reverse(probe.next_send));
1432 }
1433 }
1434 }
1435
1436 if !out.questions().is_empty() {
1438 debug!("sending out probing of {} questions", out.questions().len());
1439 send_dns_outgoing(&out, intf, sock);
1440 }
1441
1442 let mut waiting_services = HashSet::new();
1443
1444 for name in expired_probe_names {
1445 let Some(probe) = dns_registry.probing.remove(&name) else {
1446 continue;
1447 };
1448
1449 for record in probe.records.iter() {
1451 if let Some(new_name) = record.get_record().get_new_name() {
1452 dns_registry
1453 .name_changes
1454 .insert(name.clone(), new_name.to_string());
1455
1456 let event = DnsNameChange {
1457 original: record.get_record().get_original_name().to_string(),
1458 new_name: new_name.to_string(),
1459 rr_type: record.get_type(),
1460 intf_name: intf.name.to_string(),
1461 };
1462 notify_monitors(&mut self.monitors, DaemonEvent::NameChange(event));
1463 }
1464 }
1465
1466 debug!(
1468 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
1469 probe.records.len(),
1470 probe.waiting_services.len(),
1471 );
1472
1473 if !probe.records.is_empty() {
1475 match dns_registry.active.get_mut(&name) {
1476 Some(records) => {
1477 records.extend(probe.records);
1478 }
1479 None => {
1480 dns_registry.active.insert(name, probe.records);
1481 }
1482 }
1483
1484 waiting_services.extend(probe.waiting_services);
1485 }
1486 }
1487
1488 for service_name in waiting_services {
1490 debug!(
1491 "try to announce service {service_name} on intf {}",
1492 intf.ip()
1493 );
1494 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1496 if info.get_status(intf) == ServiceStatus::Announced {
1497 debug!("service {} already announced", info.get_fullname());
1498 continue;
1499 }
1500
1501 if announce_service_on_intf(dns_registry, info, intf, sock) {
1502 let next_time = now + 1000;
1503 let command =
1504 Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1505 self.retransmissions.push(ReRun { next_time, command });
1506 self.timers.push(Reverse(next_time));
1507
1508 let fullname = match dns_registry.name_changes.get(&service_name) {
1509 Some(new_name) => new_name.to_string(),
1510 None => service_name.to_string(),
1511 };
1512
1513 let mut hostname = info.get_hostname();
1514 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1515 hostname = new_name;
1516 }
1517
1518 debug!("wake up: announce service {} on {}", fullname, intf.ip());
1519 notify_monitors(
1520 &mut self.monitors,
1521 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1522 );
1523
1524 info.set_status(intf, ServiceStatus::Announced);
1525 }
1526 }
1527 }
1528 }
1529 }
1530
1531 fn unregister_service(
1532 &self,
1533 info: &ServiceInfo,
1534 intf: &Interface,
1535 sock: &MioUdpSocket,
1536 ) -> Vec<u8> {
1537 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1538 out.add_answer_at_time(
1539 DnsPointer::new(
1540 info.get_type(),
1541 RRType::PTR,
1542 CLASS_IN,
1543 0,
1544 info.get_fullname().to_string(),
1545 ),
1546 0,
1547 );
1548
1549 if let Some(sub) = info.get_subtype() {
1550 trace!("Adding subdomain {}", sub);
1551 out.add_answer_at_time(
1552 DnsPointer::new(
1553 sub,
1554 RRType::PTR,
1555 CLASS_IN,
1556 0,
1557 info.get_fullname().to_string(),
1558 ),
1559 0,
1560 );
1561 }
1562
1563 out.add_answer_at_time(
1564 DnsSrv::new(
1565 info.get_fullname(),
1566 CLASS_IN | CLASS_CACHE_FLUSH,
1567 0,
1568 info.get_priority(),
1569 info.get_weight(),
1570 info.get_port(),
1571 info.get_hostname().to_string(),
1572 ),
1573 0,
1574 );
1575 out.add_answer_at_time(
1576 DnsTxt::new(
1577 info.get_fullname(),
1578 CLASS_IN | CLASS_CACHE_FLUSH,
1579 0,
1580 info.generate_txt(),
1581 ),
1582 0,
1583 );
1584
1585 for address in info.get_addrs_on_intf(intf) {
1586 out.add_answer_at_time(
1587 DnsAddress::new(
1588 info.get_hostname(),
1589 ip_address_rr_type(&address),
1590 CLASS_IN | CLASS_CACHE_FLUSH,
1591 0,
1592 address,
1593 ),
1594 0,
1595 );
1596 }
1597
1598 send_dns_outgoing(&out, intf, sock).remove(0)
1600 }
1601
1602 fn add_hostname_resolver(
1606 &mut self,
1607 hostname: String,
1608 listener: Sender<HostnameResolutionEvent>,
1609 timeout: Option<u64>,
1610 ) {
1611 let real_timeout = timeout.map(|t| current_time_millis() + t);
1612 self.hostname_resolvers
1613 .insert(hostname.to_lowercase(), (listener, real_timeout));
1614 if let Some(t) = real_timeout {
1615 self.add_timer(t);
1616 }
1617 }
1618
1619 fn send_query(&self, name: &str, qtype: RRType) {
1621 self.send_query_vec(&[(name, qtype)]);
1622 }
1623
1624 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1626 trace!("Sending query questions: {:?}", questions);
1627 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1628 let now = current_time_millis();
1629
1630 for (name, qtype) in questions {
1631 out.add_question(name, *qtype);
1632
1633 for record in self.cache.get_known_answers(name, *qtype, now) {
1634 trace!("add known answer: {:?}", record);
1642 let mut new_record = record.clone();
1643 new_record.get_record_mut().update_ttl(now);
1644 out.add_answer_box(new_record);
1645 }
1646 }
1647
1648 let mut multicast_sent_trackers = HashSet::new();
1650 for (intf, sock) in self.intf_socks.iter() {
1651 if let Some(tracker) = multicast_send_tracker(intf) {
1652 if multicast_sent_trackers.contains(&tracker) {
1653 continue; }
1655 multicast_sent_trackers.insert(tracker);
1656 }
1657 send_dns_outgoing(&out, intf, sock);
1658 }
1659 }
1660
1661 fn handle_read(&mut self, intf: &Interface) -> bool {
1666 let sock = match self.intf_socks.get_mut(intf) {
1667 Some(if_sock) => if_sock,
1668 None => return false,
1669 };
1670 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1671
1672 let sz = match sock.recv(&mut buf) {
1679 Ok(sz) => sz,
1680 Err(e) => {
1681 if e.kind() != std::io::ErrorKind::WouldBlock {
1682 debug!("listening socket read failed: {}", e);
1683 }
1684 return false;
1685 }
1686 };
1687
1688 trace!("received {} bytes at IP: {}", sz, intf.ip());
1689
1690 if sz == 0 {
1692 debug!("socket {:?} was likely shutdown", &sock);
1693 if let Err(e) = self.poller.registry().deregister(sock) {
1694 debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1695 }
1696
1697 let should_loop = if intf.ip().is_ipv4() {
1699 self.multicast_loop_v4
1700 } else {
1701 self.multicast_loop_v6
1702 };
1703 match new_socket_bind(intf, should_loop) {
1704 Ok(new_sock) => {
1705 trace!("reset socket for IP {}", intf.ip());
1706 self.intf_socks.insert(intf.clone(), new_sock);
1707 }
1708 Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1709 }
1710
1711 return false;
1712 }
1713
1714 buf.truncate(sz); match DnsIncoming::new(buf) {
1717 Ok(msg) => {
1718 if msg.is_query() {
1719 self.handle_query(msg, intf);
1720 } else if msg.is_response() {
1721 self.handle_response(msg, intf);
1722 } else {
1723 debug!("Invalid message: not query and not response");
1724 }
1725 }
1726 Err(e) => debug!("Invalid incoming DNS message: {}", e),
1727 }
1728
1729 true
1730 }
1731
1732 fn query_unresolved(&mut self, instance: &str) -> bool {
1734 if !valid_instance_name(instance) {
1735 trace!("instance name {} not valid", instance);
1736 return false;
1737 }
1738
1739 if let Some(records) = self.cache.get_srv(instance) {
1740 for record in records {
1741 if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1742 if self.cache.get_addr(srv.host()).is_none() {
1743 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1744 return true;
1745 }
1746 }
1747 }
1748 } else {
1749 self.send_query(instance, RRType::ANY);
1750 return true;
1751 }
1752
1753 false
1754 }
1755
1756 fn query_cache_for_service(&mut self, ty_domain: &str, sender: &Sender<ServiceEvent>) {
1759 let mut resolved: HashSet<String> = HashSet::new();
1760 let mut unresolved: HashSet<String> = HashSet::new();
1761
1762 if let Some(records) = self.cache.get_ptr(ty_domain) {
1763 for record in records.iter() {
1764 if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1765 let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1766 Ok(ok) => ok,
1767 Err(err) => {
1768 debug!("Error while creating service info from cache: {}", err);
1769 continue;
1770 }
1771 };
1772
1773 match sender.send(ServiceEvent::ServiceFound(
1774 ty_domain.to_string(),
1775 ptr.alias().to_string(),
1776 )) {
1777 Ok(()) => debug!("send service found {}", ptr.alias()),
1778 Err(e) => {
1779 debug!("failed to send service found: {}", e);
1780 continue;
1781 }
1782 }
1783
1784 if info.is_ready() {
1785 resolved.insert(ptr.alias().to_string());
1786 match sender.send(ServiceEvent::ServiceResolved(info)) {
1787 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1788 Err(e) => debug!("failed to send service resolved: {}", e),
1789 }
1790 } else {
1791 unresolved.insert(ptr.alias().to_string());
1792 }
1793 }
1794 }
1795 }
1796
1797 for instance in resolved.drain() {
1798 self.pending_resolves.remove(&instance);
1799 self.resolved.insert(instance);
1800 }
1801
1802 for instance in unresolved.drain() {
1803 self.add_pending_resolve(instance);
1804 }
1805 }
1806
1807 fn query_cache_for_hostname(
1810 &mut self,
1811 hostname: &str,
1812 sender: Sender<HostnameResolutionEvent>,
1813 ) {
1814 let addresses_map = self.cache.get_addresses_for_host(hostname);
1815 for (name, addresses) in addresses_map {
1816 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1817 Ok(()) => trace!("sent hostname addresses found"),
1818 Err(e) => debug!("failed to send hostname addresses found: {}", e),
1819 }
1820 }
1821 }
1822
1823 fn add_pending_resolve(&mut self, instance: String) {
1824 if !self.pending_resolves.contains(&instance) {
1825 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1826 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1827 self.pending_resolves.insert(instance);
1828 }
1829 }
1830
1831 fn create_service_info_from_cache(
1832 &self,
1833 ty_domain: &str,
1834 fullname: &str,
1835 ) -> Result<ServiceInfo> {
1836 let my_name = {
1837 let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1838 name.strip_suffix('.').unwrap_or(name).to_string()
1839 };
1840
1841 let now = current_time_millis();
1842 let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1843
1844 if let Some(subtype) = self.cache.get_subtype(fullname) {
1846 trace!(
1847 "ty_domain: {} found subtype {} for instance: {}",
1848 ty_domain,
1849 subtype,
1850 fullname
1851 );
1852 if info.get_subtype().is_none() {
1853 info.set_subtype(subtype.clone());
1854 }
1855 }
1856
1857 if let Some(records) = self.cache.get_srv(fullname) {
1859 if let Some(answer) = records.first() {
1860 if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1861 info.set_hostname(dns_srv.host().to_string());
1862 info.set_port(dns_srv.port());
1863 }
1864 }
1865 }
1866
1867 if let Some(records) = self.cache.get_txt(fullname) {
1869 if let Some(record) = records.first() {
1870 if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1871 info.set_properties_from_txt(dns_txt.text());
1872 }
1873 }
1874 }
1875
1876 if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1878 for answer in records.iter() {
1879 if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1880 if dns_a.get_record().is_expired(now) {
1881 trace!("Addr expired: {}", dns_a.address());
1882 } else {
1883 info.insert_ipaddr(dns_a.address());
1884 }
1885 }
1886 }
1887 }
1888
1889 Ok(info)
1890 }
1891
1892 fn handle_poller_events(&mut self, events: &mio::Events) {
1893 for ev in events.iter() {
1894 trace!("event received with key {:?}", ev.token());
1895 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1896 self.signal_sock_drain();
1898
1899 if let Err(e) = self.poller.registry().reregister(
1900 &mut self.signal_sock,
1901 ev.token(),
1902 mio::Interest::READABLE,
1903 ) {
1904 debug!("failed to modify poller for signal socket: {}", e);
1905 }
1906 continue; }
1908
1909 let intf = match self.poll_ids.get(&ev.token().0) {
1911 Some(interface) => interface.clone(),
1912 None => {
1913 debug!("Ip for event key {} not found", ev.token().0);
1914 break;
1915 }
1916 };
1917 while self.handle_read(&intf) {}
1918
1919 if let Some(sock) = self.intf_socks.get_mut(&intf) {
1921 if let Err(e) =
1922 self.poller
1923 .registry()
1924 .reregister(sock, ev.token(), mio::Interest::READABLE)
1925 {
1926 debug!("modify poller for interface {:?}: {}", &intf, e);
1927 break;
1928 }
1929 }
1930 }
1931 }
1932
1933 fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1936 trace!(
1937 "handle_response: {} answers {} authorities {} additionals",
1938 msg.answers().len(),
1939 &msg.authorities().len(),
1940 &msg.num_additionals()
1941 );
1942 let now = current_time_millis();
1943
1944 let mut record_predicate = |record: &DnsRecordBox| {
1946 if !record.get_record().is_expired(now) {
1947 return true;
1948 }
1949
1950 debug!("record is expired, removing it from cache.");
1951 if self.cache.remove(record) {
1952 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1954 call_service_listener(
1955 &self.service_queriers,
1956 dns_ptr.get_name(),
1957 ServiceEvent::ServiceRemoved(
1958 dns_ptr.get_name().to_string(),
1959 dns_ptr.alias().to_string(),
1960 ),
1961 );
1962 }
1963 }
1964 false
1965 };
1966 msg.answers_mut().retain(&mut record_predicate);
1967 msg.authorities_mut().retain(&mut record_predicate);
1968 msg.additionals_mut().retain(&mut record_predicate);
1969
1970 self.conflict_handler(&msg, intf);
1972
1973 struct InstanceChange {
1975 ty: RRType, name: String, }
1978
1979 let mut changes = Vec::new();
1987 let mut timers = Vec::new();
1988 for record in msg.all_records() {
1989 match self.cache.add_or_update(intf, record, &mut timers) {
1990 Some((dns_record, true)) => {
1991 timers.push(dns_record.get_record().get_expire_time());
1992 timers.push(dns_record.get_record().get_refresh_time());
1993
1994 let ty = dns_record.get_type();
1995 let name = dns_record.get_name();
1996 if ty == RRType::PTR {
1997 if self.service_queriers.contains_key(name) {
1998 timers.push(dns_record.get_record().get_refresh_time());
1999 }
2000
2001 if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2003 call_service_listener(
2004 &self.service_queriers,
2005 name,
2006 ServiceEvent::ServiceFound(
2007 name.to_string(),
2008 dns_ptr.alias().to_string(),
2009 ),
2010 );
2011 changes.push(InstanceChange {
2012 ty,
2013 name: dns_ptr.alias().to_string(),
2014 });
2015 }
2016 } else {
2017 changes.push(InstanceChange {
2018 ty,
2019 name: name.to_string(),
2020 });
2021 }
2022 }
2023 Some((dns_record, false)) => {
2024 timers.push(dns_record.get_record().get_expire_time());
2025 timers.push(dns_record.get_record().get_refresh_time());
2026 }
2027 _ => {}
2028 }
2029 }
2030
2031 for t in timers {
2033 self.add_timer(t);
2034 }
2035
2036 for change in changes
2038 .iter()
2039 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2040 {
2041 let addr_map = self.cache.get_addresses_for_host(&change.name);
2042 for (name, addresses) in addr_map {
2043 call_hostname_resolution_listener(
2044 &self.hostname_resolvers,
2045 &change.name,
2046 HostnameResolutionEvent::AddressesFound(name, addresses),
2047 )
2048 }
2049 }
2050
2051 let mut updated_instances = HashSet::new();
2053 for update in changes {
2054 match update.ty {
2055 RRType::PTR | RRType::SRV | RRType::TXT => {
2056 updated_instances.insert(update.name);
2057 }
2058 RRType::A | RRType::AAAA => {
2059 let instances = self.cache.get_instances_on_host(&update.name);
2060 updated_instances.extend(instances);
2061 }
2062 _ => {}
2063 }
2064 }
2065
2066 self.resolve_updated_instances(&updated_instances);
2067 }
2068
2069 fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2070 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2071 return;
2072 };
2073
2074 for answer in msg.answers().iter() {
2075 let mut new_records = Vec::new();
2076
2077 let name = answer.get_name();
2078 let Some(probe) = dns_registry.probing.get_mut(name) else {
2079 continue;
2080 };
2081
2082 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2084 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2085 if !valid_ip_on_intf(&answer_addr.address(), intf) {
2086 debug!(
2087 "conflict handler: answer addr {:?} not in the subnet of {:?}",
2088 answer_addr, intf
2089 );
2090 continue;
2091 }
2092 }
2093
2094 let any_match = probe.records.iter().any(|r| {
2097 r.get_type() == answer.get_type()
2098 && r.get_class() == answer.get_class()
2099 && r.rrdata_match(answer.as_ref())
2100 });
2101 if any_match {
2102 continue; }
2104 }
2105
2106 probe.records.retain(|record| {
2107 if record.get_type() == answer.get_type()
2108 && record.get_class() == answer.get_class()
2109 && !record.rrdata_match(answer.as_ref())
2110 {
2111 debug!(
2112 "found conflict name: '{name}' record: {}: {} PEER: {}",
2113 record.get_type(),
2114 record.rdata_print(),
2115 answer.rdata_print()
2116 );
2117
2118 let mut new_record = record.clone();
2121 let new_name = match record.get_type() {
2122 RRType::A => hostname_change(name),
2123 RRType::AAAA => hostname_change(name),
2124 _ => name_change(name),
2125 };
2126 new_record.get_record_mut().set_new_name(new_name);
2127 new_records.push(new_record);
2128 return false; }
2130
2131 true
2132 });
2133
2134 let create_time = current_time_millis() + fastrand::u64(0..250);
2141
2142 let waiting_services = probe.waiting_services.clone();
2143
2144 for record in new_records {
2145 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2146 self.timers.push(Reverse(create_time));
2147 }
2148
2149 dns_registry.name_changes.insert(
2151 record.get_record().get_original_name().to_string(),
2152 record.get_name().to_string(),
2153 );
2154
2155 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2156 Some(p) => p,
2157 None => {
2158 let new_probe = dns_registry
2159 .probing
2160 .entry(record.get_name().to_string())
2161 .or_insert_with(|| {
2162 debug!("conflict handler: new probe of {}", record.get_name());
2163 Probe::new(create_time)
2164 });
2165 self.timers.push(Reverse(new_probe.next_send));
2166 new_probe
2167 }
2168 };
2169
2170 debug!(
2171 "insert record with new name '{}' {} into probe",
2172 record.get_name(),
2173 record.get_type()
2174 );
2175 new_probe.insert_record(record);
2176
2177 new_probe.waiting_services.extend(waiting_services.clone());
2178 }
2179 }
2180 }
2181
2182 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2189 let mut resolved: HashSet<String> = HashSet::new();
2190 let mut unresolved: HashSet<String> = HashSet::new();
2191 let mut removed_instances = HashMap::new();
2192
2193 for (ty_domain, records) in self.cache.all_ptr().iter() {
2194 if !self.service_queriers.contains_key(ty_domain) {
2195 continue;
2197 }
2198
2199 for record in records.iter() {
2200 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2201 if updated_instances.contains(dns_ptr.alias()) {
2202 if let Ok(info) =
2203 self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2204 {
2205 if info.is_ready() {
2206 debug!("call queriers to resolve {}", dns_ptr.alias());
2207 resolved.insert(dns_ptr.alias().to_string());
2208 call_service_listener(
2209 &self.service_queriers,
2210 ty_domain,
2211 ServiceEvent::ServiceResolved(info),
2212 );
2213 } else {
2214 if self.resolved.remove(dns_ptr.alias()) {
2215 removed_instances
2216 .entry(ty_domain.to_string())
2217 .or_insert_with(HashSet::new)
2218 .insert(dns_ptr.alias().to_string());
2219 }
2220 unresolved.insert(dns_ptr.alias().to_string());
2221 }
2222 }
2223 }
2224 }
2225 }
2226 }
2227
2228 for instance in resolved.drain() {
2229 self.pending_resolves.remove(&instance);
2230 self.resolved.insert(instance);
2231 }
2232
2233 for instance in unresolved.drain() {
2234 self.add_pending_resolve(instance);
2235 }
2236
2237 self.notify_service_removal(removed_instances);
2238 }
2239
2240 fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2242 let sock = match self.intf_socks.get(intf) {
2243 Some(sock) => sock,
2244 None => return,
2245 };
2246 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2247
2248 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2251
2252 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2253 debug!("missing dns registry for intf {}", intf.ip());
2254 return;
2255 };
2256
2257 for question in msg.questions().iter() {
2258 trace!("query question: {:?}", &question);
2259
2260 let qtype = question.entry_type();
2261
2262 if qtype == RRType::PTR {
2263 for service in self.my_services.values() {
2264 if service.get_status(intf) != ServiceStatus::Announced {
2265 continue;
2266 }
2267
2268 if question.entry_name() == service.get_type()
2269 || service
2270 .get_subtype()
2271 .as_ref()
2272 .is_some_and(|v| v == question.entry_name())
2273 {
2274 add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2275 } else if question.entry_name() == META_QUERY {
2276 let ptr_added = out.add_answer(
2277 &msg,
2278 DnsPointer::new(
2279 question.entry_name(),
2280 RRType::PTR,
2281 CLASS_IN,
2282 service.get_other_ttl(),
2283 service.get_type().to_string(),
2284 ),
2285 );
2286 if !ptr_added {
2287 trace!("answer was not added for meta-query {:?}", &question);
2288 }
2289 }
2290 }
2291 } else {
2292 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2294 let probe_name = question.entry_name();
2295
2296 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2297 let now = current_time_millis();
2298
2299 if probe.start_time < now {
2303 let incoming_records: Vec<_> = msg
2304 .authorities()
2305 .iter()
2306 .filter(|r| r.get_name() == probe_name)
2307 .collect();
2308
2309 match probe.tiebreaking(&incoming_records) {
2319 cmp::Ordering::Less => {
2320 debug!(
2321 "tiebreaking '{}': LOST, will wait for one second",
2322 probe_name
2323 );
2324 probe.start_time = now + 1000; probe.next_send = now + 1000;
2326 }
2327 ordering => {
2328 debug!("tiebreaking '{}': {:?}", probe_name, ordering);
2329 }
2330 }
2331 }
2332 }
2333 }
2334
2335 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2336 for service in self.my_services.values() {
2337 if service.get_status(intf) != ServiceStatus::Announced {
2338 continue;
2339 }
2340
2341 let service_hostname =
2342 match dns_registry.name_changes.get(service.get_hostname()) {
2343 Some(new_name) => new_name,
2344 None => service.get_hostname(),
2345 };
2346
2347 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2348 let intf_addrs = service.get_addrs_on_intf(intf);
2349 if intf_addrs.is_empty()
2350 && (qtype == RRType::A || qtype == RRType::AAAA)
2351 {
2352 let t = match qtype {
2353 RRType::A => "TYPE_A",
2354 RRType::AAAA => "TYPE_AAAA",
2355 _ => "invalid_type",
2356 };
2357 trace!(
2358 "Cannot find valid addrs for {} response on intf {:?}",
2359 t,
2360 &intf
2361 );
2362 return;
2363 }
2364 for address in intf_addrs {
2365 out.add_answer(
2366 &msg,
2367 DnsAddress::new(
2368 service_hostname,
2369 ip_address_rr_type(&address),
2370 CLASS_IN | CLASS_CACHE_FLUSH,
2371 service.get_host_ttl(),
2372 address,
2373 ),
2374 );
2375 }
2376 }
2377 }
2378 }
2379
2380 let query_name = question.entry_name().to_lowercase();
2381 let service_opt = self
2382 .my_services
2383 .iter()
2384 .find(|(k, _v)| {
2385 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2386 Some(new_name) => new_name,
2387 None => k,
2388 };
2389 service_name == &query_name
2390 })
2391 .map(|(_, v)| v);
2392
2393 let Some(service) = service_opt else {
2394 continue;
2395 };
2396
2397 if service.get_status(intf) != ServiceStatus::Announced {
2398 continue;
2399 }
2400
2401 if qtype == RRType::SRV || qtype == RRType::ANY {
2402 out.add_answer(
2403 &msg,
2404 DnsSrv::new(
2405 question.entry_name(),
2406 CLASS_IN | CLASS_CACHE_FLUSH,
2407 service.get_host_ttl(),
2408 service.get_priority(),
2409 service.get_weight(),
2410 service.get_port(),
2411 service.get_hostname().to_string(),
2412 ),
2413 );
2414 }
2415
2416 if qtype == RRType::TXT || qtype == RRType::ANY {
2417 out.add_answer(
2418 &msg,
2419 DnsTxt::new(
2420 question.entry_name(),
2421 CLASS_IN | CLASS_CACHE_FLUSH,
2422 service.get_host_ttl(),
2423 service.generate_txt(),
2424 ),
2425 );
2426 }
2427
2428 if qtype == RRType::SRV {
2429 let intf_addrs = service.get_addrs_on_intf(intf);
2430 if intf_addrs.is_empty() {
2431 debug!(
2432 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2433 &intf
2434 );
2435 return;
2436 }
2437 for address in intf_addrs {
2438 out.add_additional_answer(DnsAddress::new(
2439 service.get_hostname(),
2440 ip_address_rr_type(&address),
2441 CLASS_IN | CLASS_CACHE_FLUSH,
2442 service.get_host_ttl(),
2443 address,
2444 ));
2445 }
2446 }
2447 }
2448 }
2449
2450 if !out.answers_count() > 0 {
2451 out.set_id(msg.id());
2452 send_dns_outgoing(&out, intf, sock);
2453
2454 self.increase_counter(Counter::Respond, 1);
2455 self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2456 }
2457
2458 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2459 }
2460
2461 fn increase_counter(&mut self, counter: Counter, count: i64) {
2463 let key = counter.to_string();
2464 match self.counters.get_mut(&key) {
2465 Some(v) => *v += count,
2466 None => {
2467 self.counters.insert(key, count);
2468 }
2469 }
2470 }
2471
2472 fn signal_sock_drain(&self) {
2473 let mut signal_buf = [0; 1024];
2474
2475 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2477 trace!(
2478 "signal socket recvd: {}",
2479 String::from_utf8_lossy(&signal_buf[0..sz])
2480 );
2481 }
2482 }
2483
2484 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2485 self.retransmissions.push(ReRun { next_time, command });
2486 self.add_timer(next_time);
2487 }
2488
2489 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2491 for (ty_domain, sender) in self.service_queriers.iter() {
2492 if let Some(instances) = expired.get(ty_domain) {
2493 for instance_name in instances {
2494 let event = ServiceEvent::ServiceRemoved(
2495 ty_domain.to_string(),
2496 instance_name.to_string(),
2497 );
2498 match sender.send(event) {
2499 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2500 Err(e) => debug!("Failed to send event: {}", e),
2501 }
2502 }
2503 }
2504 }
2505 }
2506
2507 fn exec_command(&mut self, command: Command, repeating: bool) {
2511 match command {
2512 Command::Browse(ty, next_delay, listener) => {
2513 self.exec_command_browse(repeating, ty, next_delay, listener);
2514 }
2515
2516 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2517 self.exec_command_resolve_hostname(
2518 repeating, hostname, next_delay, listener, timeout,
2519 );
2520 }
2521
2522 Command::Register(service_info) => {
2523 self.register_service(service_info);
2524 self.increase_counter(Counter::Register, 1);
2525 }
2526
2527 Command::RegisterResend(fullname, intf) => {
2528 trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2529 self.exec_command_register_resend(fullname, intf);
2530 }
2531
2532 Command::Unregister(fullname, resp_s) => {
2533 trace!("unregister service {} repeat {}", &fullname, &repeating);
2534 self.exec_command_unregister(repeating, fullname, resp_s);
2535 }
2536
2537 Command::UnregisterResend(packet, ip) => {
2538 self.exec_command_unregister_resend(packet, ip);
2539 }
2540
2541 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2542
2543 Command::StopResolveHostname(hostname) => {
2544 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2545 }
2546
2547 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2548
2549 Command::GetMetrics(resp_s) => match resp_s.send(self.counters.clone()) {
2550 Ok(()) => trace!("Sent metrics to the client"),
2551 Err(e) => debug!("Failed to send metrics: {}", e),
2552 },
2553
2554 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2555 Ok(()) => trace!("Sent status to the client"),
2556 Err(e) => debug!("Failed to send status: {}", e),
2557 },
2558
2559 Command::Monitor(resp_s) => {
2560 self.monitors.push(resp_s);
2561 }
2562
2563 Command::SetOption(daemon_opt) => {
2564 self.process_set_option(daemon_opt);
2565 }
2566
2567 Command::GetOption(resp_s) => {
2568 let val = DaemonOptionVal {
2569 _service_name_len_max: self.service_name_len_max,
2570 ip_check_interval: self.ip_check_interval,
2571 };
2572 if let Err(e) = resp_s.send(val) {
2573 debug!("Failed to send options: {}", e);
2574 }
2575 }
2576
2577 Command::Verify(instance_fullname, timeout) => {
2578 self.exec_command_verify(instance_fullname, timeout, repeating);
2579 }
2580
2581 _ => {
2582 debug!("unexpected command: {:?}", &command);
2583 }
2584 }
2585 }
2586
2587 fn exec_command_browse(
2588 &mut self,
2589 repeating: bool,
2590 ty: String,
2591 next_delay: u32,
2592 listener: Sender<ServiceEvent>,
2593 ) {
2594 let pretty_addrs: Vec<String> = self
2595 .intf_socks
2596 .keys()
2597 .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2598 .collect();
2599
2600 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2601 "{ty} on {} interfaces [{}]",
2602 pretty_addrs.len(),
2603 pretty_addrs.join(", ")
2604 ))) {
2605 debug!(
2606 "Failed to send SearchStarted({})(repeating:{}): {}",
2607 &ty, repeating, e
2608 );
2609 return;
2610 }
2611 if !repeating {
2612 self.service_queriers.insert(ty.clone(), listener.clone());
2616
2617 self.query_cache_for_service(&ty, &listener);
2619 }
2620
2621 self.send_query(&ty, RRType::PTR);
2622 self.increase_counter(Counter::Browse, 1);
2623
2624 let next_time = current_time_millis() + (next_delay * 1000) as u64;
2625 let max_delay = 60 * 60;
2626 let delay = cmp::min(next_delay * 2, max_delay);
2627 self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2628 }
2629
2630 fn exec_command_resolve_hostname(
2631 &mut self,
2632 repeating: bool,
2633 hostname: String,
2634 next_delay: u32,
2635 listener: Sender<HostnameResolutionEvent>,
2636 timeout: Option<u64>,
2637 ) {
2638 let addr_list: Vec<_> = self.intf_socks.keys().collect();
2639 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2640 "{} on addrs {:?}",
2641 &hostname, &addr_list
2642 ))) {
2643 debug!(
2644 "Failed to send ResolveStarted({})(repeating:{}): {}",
2645 &hostname, repeating, e
2646 );
2647 return;
2648 }
2649 if !repeating {
2650 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2651 self.query_cache_for_hostname(&hostname, listener.clone());
2653 }
2654
2655 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2656 self.increase_counter(Counter::ResolveHostname, 1);
2657
2658 let now = current_time_millis();
2659 let next_time = now + u64::from(next_delay) * 1000;
2660 let max_delay = 60 * 60;
2661 let delay = cmp::min(next_delay * 2, max_delay);
2662
2663 if self
2665 .hostname_resolvers
2666 .get(&hostname)
2667 .and_then(|(_sender, timeout)| *timeout)
2668 .map(|timeout| next_time < timeout)
2669 .unwrap_or(true)
2670 {
2671 self.add_retransmission(
2672 next_time,
2673 Command::ResolveHostname(hostname, delay, listener, None),
2674 );
2675 }
2676 }
2677
2678 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2679 let pending_query = self.query_unresolved(&instance);
2680 let max_try = 3;
2681 if pending_query && try_count < max_try {
2682 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2685 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2686 }
2687 }
2688
2689 fn exec_command_unregister(
2690 &mut self,
2691 repeating: bool,
2692 fullname: String,
2693 resp_s: Sender<UnregisterStatus>,
2694 ) {
2695 let response = match self.my_services.remove_entry(&fullname) {
2696 None => {
2697 debug!("unregister: cannot find such service {}", &fullname);
2698 UnregisterStatus::NotFound
2699 }
2700 Some((_k, info)) => {
2701 let mut timers = Vec::new();
2702 let mut multicast_sent_trackers = HashSet::new();
2704
2705 for (intf, sock) in self.intf_socks.iter() {
2706 if let Some(tracker) = multicast_send_tracker(intf) {
2707 if multicast_sent_trackers.contains(&tracker) {
2708 continue; }
2710 multicast_sent_trackers.insert(tracker);
2711 }
2712 let packet = self.unregister_service(&info, intf, sock);
2713 if !repeating && !packet.is_empty() {
2715 let next_time = current_time_millis() + 120;
2716 self.retransmissions.push(ReRun {
2717 next_time,
2718 command: Command::UnregisterResend(packet, intf.clone()),
2719 });
2720 timers.push(next_time);
2721 }
2722 }
2723
2724 for t in timers {
2725 self.add_timer(t);
2726 }
2727
2728 self.increase_counter(Counter::Unregister, 1);
2729 UnregisterStatus::OK
2730 }
2731 };
2732 if let Err(e) = resp_s.send(response) {
2733 debug!("unregister: failed to send response: {}", e);
2734 }
2735 }
2736
2737 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2738 if let Some(sock) = self.intf_socks.get(&intf) {
2739 debug!("UnregisterResend from {}", &intf.ip());
2740 multicast_on_intf(&packet[..], &intf, sock);
2741 self.increase_counter(Counter::UnregisterResend, 1);
2742 }
2743 }
2744
2745 fn exec_command_stop_browse(&mut self, ty_domain: String) {
2746 match self.service_queriers.remove_entry(&ty_domain) {
2747 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2748 Some((ty, sender)) => {
2749 trace!("StopBrowse: removed queryer for {}", &ty);
2751 let mut i = 0;
2752 while i < self.retransmissions.len() {
2753 if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2754 if t == &ty {
2755 self.retransmissions.remove(i);
2756 trace!("StopBrowse: removed retransmission for {}", &ty);
2757 continue;
2758 }
2759 }
2760 i += 1;
2761 }
2762
2763 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2765 Ok(()) => trace!("Sent SearchStopped to the listener"),
2766 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2767 }
2768 }
2769 }
2770 }
2771
2772 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2773 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2774 trace!("StopResolve: removed queryer for {}", &host);
2776 let mut i = 0;
2777 while i < self.retransmissions.len() {
2778 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2779 if t == &host {
2780 self.retransmissions.remove(i);
2781 trace!("StopResolve: removed retransmission for {}", &host);
2782 continue;
2783 }
2784 }
2785 i += 1;
2786 }
2787
2788 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2790 Ok(()) => trace!("Sent SearchStopped to the listener"),
2791 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2792 }
2793 }
2794 }
2795
2796 fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2797 let Some(info) = self.my_services.get_mut(&fullname) else {
2798 trace!("announce: cannot find such service {}", &fullname);
2799 return;
2800 };
2801
2802 let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2803 return;
2804 };
2805
2806 let Some(sock) = self.intf_socks.get(&intf) else {
2807 return;
2808 };
2809
2810 if announce_service_on_intf(dns_registry, info, &intf, sock) {
2811 let mut hostname = info.get_hostname();
2812 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2813 hostname = new_name;
2814 }
2815 let service_name = match dns_registry.name_changes.get(&fullname) {
2816 Some(new_name) => new_name.to_string(),
2817 None => fullname,
2818 };
2819
2820 debug!("resend: announce service {} on {}", service_name, intf.ip());
2821
2822 notify_monitors(
2823 &mut self.monitors,
2824 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2825 );
2826 info.set_status(&intf, ServiceStatus::Announced);
2827 } else {
2828 debug!("register-resend should not fail");
2829 }
2830
2831 self.increase_counter(Counter::RegisterResend, 1);
2832 }
2833
2834 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2835 let now = current_time_millis();
2845 let expire_at = if repeating {
2846 None
2847 } else {
2848 Some(now + timeout.as_millis() as u64)
2849 };
2850
2851 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2853
2854 if !record_vec.is_empty() {
2855 let query_vec: Vec<(&str, RRType)> = record_vec
2856 .iter()
2857 .map(|(record, rr_type)| (record.as_str(), *rr_type))
2858 .collect();
2859 self.send_query_vec(&query_vec);
2860
2861 if let Some(new_expire) = expire_at {
2862 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2866 }
2867 }
2868 }
2869
2870 fn refresh_active_services(&mut self) {
2872 let mut query_ptr_count = 0;
2873 let mut query_srv_count = 0;
2874 let mut new_timers = HashSet::new();
2875 let mut query_addr_count = 0;
2876
2877 for (ty_domain, _sender) in self.service_queriers.iter() {
2878 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2879 if !refreshed_timers.is_empty() {
2880 trace!("sending refresh query for PTR: {}", ty_domain);
2881 self.send_query(ty_domain, RRType::PTR);
2882 query_ptr_count += 1;
2883 new_timers.extend(refreshed_timers);
2884 }
2885
2886 let (instances, timers) = self.cache.refresh_due_srv(ty_domain);
2887 for instance in instances.iter() {
2888 trace!("sending refresh query for SRV: {}", instance);
2889 self.send_query(instance, RRType::SRV);
2890 query_srv_count += 1;
2891 }
2892 new_timers.extend(timers);
2893 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2894 for hostname in hostnames.iter() {
2895 trace!("sending refresh queries for A and AAAA: {}", hostname);
2896 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2897 query_addr_count += 2;
2898 }
2899 new_timers.extend(timers);
2900 }
2901
2902 for timer in new_timers {
2903 self.add_timer(timer);
2904 }
2905
2906 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2907 self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
2908 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2909 }
2910}
2911
2912#[derive(Debug)]
2915pub enum ServiceEvent {
2916 SearchStarted(String),
2918
2919 ServiceFound(String, String),
2921
2922 ServiceResolved(ServiceInfo),
2924
2925 ServiceRemoved(String, String),
2927
2928 SearchStopped(String),
2930}
2931
2932#[derive(Debug)]
2935#[non_exhaustive]
2936pub enum HostnameResolutionEvent {
2937 SearchStarted(String),
2939 AddressesFound(String, HashSet<IpAddr>),
2941 AddressesRemoved(String, HashSet<IpAddr>),
2943 SearchTimeout(String),
2945 SearchStopped(String),
2947}
2948
2949#[derive(Clone, Debug)]
2952#[non_exhaustive]
2953pub enum DaemonEvent {
2954 Announce(String, String),
2956
2957 Error(Error),
2959
2960 IpAdd(IpAddr),
2962
2963 IpDel(IpAddr),
2965
2966 NameChange(DnsNameChange),
2969
2970 Respond(IpAddr),
2972}
2973
2974#[derive(Clone, Debug)]
2977pub struct DnsNameChange {
2978 pub original: String,
2980
2981 pub new_name: String,
2991
2992 pub rr_type: RRType,
2994
2995 pub intf_name: String,
2997}
2998
2999#[derive(Debug)]
3001enum Command {
3002 Browse(String, u32, Sender<ServiceEvent>),
3004
3005 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
3010
3011 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, Interface), UnregisterResend(Vec<u8>, Interface), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3032
3033 GetStatus(Sender<DaemonStatus>),
3035
3036 Monitor(Sender<DaemonEvent>),
3038
3039 SetOption(DaemonOption),
3040
3041 GetOption(Sender<DaemonOptionVal>),
3042
3043 Verify(String, Duration),
3048
3049 Exit(Sender<DaemonStatus>),
3050}
3051
3052impl fmt::Display for Command {
3053 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3054 match self {
3055 Self::Browse(_, _, _) => write!(f, "Command Browse"),
3056 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3057 Self::Exit(_) => write!(f, "Command Exit"),
3058 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3059 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3060 Self::Monitor(_) => write!(f, "Command Monitor"),
3061 Self::Register(_) => write!(f, "Command Register"),
3062 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3063 Self::SetOption(_) => write!(f, "Command SetOption"),
3064 Self::GetOption(_) => write!(f, "Command GetOption"),
3065 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3066 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3067 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3068 Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3069 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3070 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3071 }
3072 }
3073}
3074
3075struct DaemonOptionVal {
3076 _service_name_len_max: u8,
3077 ip_check_interval: u64,
3078}
3079
3080#[derive(Debug)]
3081enum DaemonOption {
3082 ServiceNameLenMax(u8),
3083 IpCheckInterval(u64),
3084 EnableInterface(Vec<IfKind>),
3085 DisableInterface(Vec<IfKind>),
3086 MulticastLoopV4(bool),
3087 MulticastLoopV6(bool),
3088}
3089
3090const DOMAIN_LEN: usize = "._tcp.local.".len();
3092
3093fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3095 if ty_domain.len() <= DOMAIN_LEN + 1 {
3096 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3098 }
3099
3100 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3102 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3103 }
3104 Ok(())
3105}
3106
3107fn check_domain_suffix(name: &str) -> Result<()> {
3109 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3110 return Err(e_fmt!(
3111 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3112 name
3113 ));
3114 }
3115
3116 Ok(())
3117}
3118
3119fn check_service_name(fullname: &str) -> Result<()> {
3127 check_domain_suffix(fullname)?;
3128
3129 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3130 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3131
3132 if &name[0..1] != "_" {
3133 return Err(e_fmt!("Service name must start with '_'"));
3134 }
3135
3136 let name = &name[1..];
3137
3138 if name.contains("--") {
3139 return Err(e_fmt!("Service name must not contain '--'"));
3140 }
3141
3142 if name.starts_with('-') || name.ends_with('-') {
3143 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3144 }
3145
3146 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3147 if ascii_count < 1 {
3148 return Err(e_fmt!(
3149 "Service name must contain at least one letter (eg: 'A-Za-z')"
3150 ));
3151 }
3152
3153 Ok(())
3154}
3155
3156fn check_hostname(hostname: &str) -> Result<()> {
3158 if !hostname.ends_with(".local.") {
3159 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3160 }
3161
3162 if hostname == ".local." {
3163 return Err(e_fmt!(
3164 "The part of the hostname before '.local.' cannot be empty"
3165 ));
3166 }
3167
3168 if hostname.len() > 255 {
3169 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3170 }
3171
3172 Ok(())
3173}
3174
3175fn call_service_listener(
3176 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3177 ty_domain: &str,
3178 event: ServiceEvent,
3179) {
3180 if let Some(listener) = listeners_map.get(ty_domain) {
3181 match listener.send(event) {
3182 Ok(()) => trace!("Sent event to listener successfully"),
3183 Err(e) => debug!("Failed to send event: {}", e),
3184 }
3185 }
3186}
3187
3188fn call_hostname_resolution_listener(
3189 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3190 hostname: &str,
3191 event: HostnameResolutionEvent,
3192) {
3193 let hostname_lower = hostname.to_lowercase();
3194 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3195 match listener.send(event) {
3196 Ok(()) => trace!("Sent event to listener successfully"),
3197 Err(e) => debug!("Failed to send event: {}", e),
3198 }
3199 }
3200}
3201
3202fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3205 if_addrs::get_if_addrs()
3206 .unwrap_or_default()
3207 .into_iter()
3208 .filter(|i| !i.is_loopback() || with_loopback)
3209 .collect()
3210}
3211
3212fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3214 let qtype = if out.is_query() { "query" } else { "response" };
3215 trace!(
3216 "send outgoing {}: {} questions {} answers {} authorities {} additional",
3217 qtype,
3218 out.questions().len(),
3219 out.answers_count(),
3220 out.authorities().len(),
3221 out.additionals().len()
3222 );
3223 let packet_list = out.to_data_on_wire();
3224 for packet in packet_list.iter() {
3225 multicast_on_intf(packet, intf, sock);
3226 }
3227 packet_list
3228}
3229
3230fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3232 if packet.len() > MAX_MSG_ABSOLUTE {
3233 debug!("Drop over-sized packet ({})", packet.len());
3234 return;
3235 }
3236
3237 let addr: SocketAddr = match intf.addr {
3238 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3239 if_addrs::IfAddr::V6(_) => {
3240 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3241 sock.set_scope_id(intf.index.unwrap_or(0)); sock.into()
3243 }
3244 };
3245
3246 send_packet(packet, addr, intf, socket);
3247}
3248
3249fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3251 match sock.send_to(packet, addr) {
3252 Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3253 Err(e) => debug!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3254 }
3255}
3256
3257fn valid_instance_name(name: &str) -> bool {
3261 name.split('.').count() >= 5
3262}
3263
3264fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3265 monitors.retain(|sender| {
3266 if let Err(e) = sender.try_send(event.clone()) {
3267 debug!("notify_monitors: try_send: {}", &e);
3268 if matches!(e, TrySendError::Disconnected(_)) {
3269 return false; }
3271 }
3272 true
3273 });
3274}
3275
3276fn prepare_announce(
3279 info: &ServiceInfo,
3280 intf: &Interface,
3281 dns_registry: &mut DnsRegistry,
3282) -> Option<DnsOutgoing> {
3283 let intf_addrs = info.get_addrs_on_intf(intf);
3284 if intf_addrs.is_empty() {
3285 trace!("No valid addrs to add on intf {:?}", &intf);
3286 return None;
3287 }
3288
3289 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3291 Some(new_name) => new_name,
3292 None => info.get_fullname(),
3293 };
3294
3295 debug!(
3296 "prepare to announce service {service_fullname} on {}: {}",
3297 &intf.name,
3298 &intf.ip()
3299 );
3300
3301 let mut probing_count = 0;
3302 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3303 let create_time = current_time_millis() + fastrand::u64(0..250);
3304
3305 out.add_answer_at_time(
3306 DnsPointer::new(
3307 info.get_type(),
3308 RRType::PTR,
3309 CLASS_IN,
3310 info.get_other_ttl(),
3311 service_fullname.to_string(),
3312 ),
3313 0,
3314 );
3315
3316 if let Some(sub) = info.get_subtype() {
3317 trace!("Adding subdomain {}", sub);
3318 out.add_answer_at_time(
3319 DnsPointer::new(
3320 sub,
3321 RRType::PTR,
3322 CLASS_IN,
3323 info.get_other_ttl(),
3324 service_fullname.to_string(),
3325 ),
3326 0,
3327 );
3328 }
3329
3330 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3332 Some(new_name) => new_name.to_string(),
3333 None => info.get_hostname().to_string(),
3334 };
3335
3336 let mut srv = DnsSrv::new(
3337 info.get_fullname(),
3338 CLASS_IN | CLASS_CACHE_FLUSH,
3339 info.get_host_ttl(),
3340 info.get_priority(),
3341 info.get_weight(),
3342 info.get_port(),
3343 hostname,
3344 );
3345
3346 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3347 srv.get_record_mut().set_new_name(new_name.to_string());
3348 }
3349
3350 if !info.requires_probe()
3351 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3352 {
3353 out.add_answer_at_time(srv, 0);
3354 } else {
3355 probing_count += 1;
3356 }
3357
3358 let mut txt = DnsTxt::new(
3361 info.get_fullname(),
3362 CLASS_IN | CLASS_CACHE_FLUSH,
3363 info.get_other_ttl(),
3364 info.generate_txt(),
3365 );
3366
3367 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3368 txt.get_record_mut().set_new_name(new_name.to_string());
3369 }
3370
3371 if !info.requires_probe()
3372 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3373 {
3374 out.add_answer_at_time(txt, 0);
3375 } else {
3376 probing_count += 1;
3377 }
3378
3379 let hostname = info.get_hostname();
3382 for address in intf_addrs {
3383 let mut dns_addr = DnsAddress::new(
3384 hostname,
3385 ip_address_rr_type(&address),
3386 CLASS_IN | CLASS_CACHE_FLUSH,
3387 info.get_host_ttl(),
3388 address,
3389 );
3390
3391 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3392 dns_addr.get_record_mut().set_new_name(new_name.to_string());
3393 }
3394
3395 if !info.requires_probe()
3396 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3397 {
3398 out.add_answer_at_time(dns_addr, 0);
3399 } else {
3400 probing_count += 1;
3401 }
3402 }
3403
3404 if probing_count > 0 {
3405 return None;
3406 }
3407
3408 Some(out)
3409}
3410
3411fn announce_service_on_intf(
3414 dns_registry: &mut DnsRegistry,
3415 info: &ServiceInfo,
3416 intf: &Interface,
3417 sock: &MioUdpSocket,
3418) -> bool {
3419 if let Some(out) = prepare_announce(info, intf, dns_registry) {
3420 send_dns_outgoing(&out, intf, sock);
3421 return true;
3422 }
3423 false
3424}
3425
3426fn name_change(original: &str) -> String {
3434 let mut parts: Vec<_> = original.split('.').collect();
3435 let Some(first_part) = parts.get_mut(0) else {
3436 return format!("{original} (2)");
3437 };
3438
3439 let mut new_name = format!("{} (2)", first_part);
3440
3441 if let Some(paren_pos) = first_part.rfind(" (") {
3443 if let Some(end_paren) = first_part[paren_pos..].find(')') {
3445 let absolute_end_pos = paren_pos + end_paren;
3446 if absolute_end_pos == first_part.len() - 1 {
3448 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3451 let base_name = &first_part[..paren_pos];
3452 new_name = format!("{} ({})", base_name, number + 1)
3453 }
3454 }
3455 }
3456 }
3457
3458 *first_part = &new_name;
3459 parts.join(".")
3460}
3461
3462fn hostname_change(original: &str) -> String {
3470 let mut parts: Vec<_> = original.split('.').collect();
3471 let Some(first_part) = parts.get_mut(0) else {
3472 return format!("{original}-2");
3473 };
3474
3475 let mut new_name = format!("{}-2", first_part);
3476
3477 if let Some(hyphen_pos) = first_part.rfind('-') {
3479 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3481 let base_name = &first_part[..hyphen_pos];
3482 new_name = format!("{}-{}", base_name, number + 1);
3483 }
3484 }
3485
3486 *first_part = &new_name;
3487 parts.join(".")
3488}
3489
3490fn add_answer_with_additionals(
3491 out: &mut DnsOutgoing,
3492 msg: &DnsIncoming,
3493 service: &ServiceInfo,
3494 intf: &Interface,
3495 dns_registry: &DnsRegistry,
3496) {
3497 let intf_addrs = service.get_addrs_on_intf(intf);
3498 if intf_addrs.is_empty() {
3499 trace!("No addrs on LAN of intf {:?}", intf);
3500 return;
3501 }
3502
3503 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3505 Some(new_name) => new_name,
3506 None => service.get_fullname(),
3507 };
3508
3509 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3510 Some(new_name) => new_name,
3511 None => service.get_hostname(),
3512 };
3513
3514 let ptr_added = out.add_answer(
3515 msg,
3516 DnsPointer::new(
3517 service.get_type(),
3518 RRType::PTR,
3519 CLASS_IN,
3520 service.get_other_ttl(),
3521 service_fullname.to_string(),
3522 ),
3523 );
3524
3525 if !ptr_added {
3526 trace!("answer was not added for msg {:?}", msg);
3527 return;
3528 }
3529
3530 if let Some(sub) = service.get_subtype() {
3531 trace!("Adding subdomain {}", sub);
3532 out.add_additional_answer(DnsPointer::new(
3533 sub,
3534 RRType::PTR,
3535 CLASS_IN,
3536 service.get_other_ttl(),
3537 service_fullname.to_string(),
3538 ));
3539 }
3540
3541 out.add_additional_answer(DnsSrv::new(
3544 service_fullname,
3545 CLASS_IN | CLASS_CACHE_FLUSH,
3546 service.get_host_ttl(),
3547 service.get_priority(),
3548 service.get_weight(),
3549 service.get_port(),
3550 hostname.to_string(),
3551 ));
3552
3553 out.add_additional_answer(DnsTxt::new(
3554 service_fullname,
3555 CLASS_IN | CLASS_CACHE_FLUSH,
3556 service.get_host_ttl(),
3557 service.generate_txt(),
3558 ));
3559
3560 for address in intf_addrs {
3561 out.add_additional_answer(DnsAddress::new(
3562 hostname,
3563 ip_address_rr_type(&address),
3564 CLASS_IN | CLASS_CACHE_FLUSH,
3565 service.get_host_ttl(),
3566 address,
3567 ));
3568 }
3569}
3570
3571#[cfg(test)]
3572mod tests {
3573 use super::{
3574 check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3575 name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3576 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3577 MDNS_PORT,
3578 };
3579 use crate::{
3580 dns_parser::{DnsOutgoing, DnsPointer, RRType, CLASS_IN, FLAGS_AA, FLAGS_QR_RESPONSE},
3581 service_daemon::check_hostname,
3582 };
3583 use std::{
3584 net::{SocketAddr, SocketAddrV4},
3585 time::Duration,
3586 };
3587 use test_log::test;
3588
3589 #[test]
3590 fn test_socketaddr_print() {
3591 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3592 let print = format!("{}", addr);
3593 assert_eq!(print, "224.0.0.251:5353");
3594 }
3595
3596 #[test]
3597 fn test_instance_name() {
3598 assert!(valid_instance_name("my-laser._printer._tcp.local."));
3599 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3600 assert!(!valid_instance_name("_printer._tcp.local."));
3601 }
3602
3603 #[test]
3604 fn test_check_service_name_length() {
3605 let result = check_service_name_length("_tcp", 100);
3606 assert!(result.is_err());
3607 if let Err(e) = result {
3608 println!("{}", e);
3609 }
3610 }
3611
3612 #[test]
3613 fn test_check_hostname() {
3614 for hostname in &[
3616 "my_host.local.",
3617 &("A".repeat(255 - ".local.".len()) + ".local."),
3618 ] {
3619 let result = check_hostname(hostname);
3620 assert!(result.is_ok());
3621 }
3622
3623 for hostname in &[
3625 "my_host.local",
3626 ".local.",
3627 &("A".repeat(256 - ".local.".len()) + ".local."),
3628 ] {
3629 let result = check_hostname(hostname);
3630 assert!(result.is_err());
3631 if let Err(e) = result {
3632 println!("{}", e);
3633 }
3634 }
3635 }
3636
3637 #[test]
3638 fn test_check_domain_suffix() {
3639 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3640 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3641 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3642 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3643 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3644 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3645 }
3646
3647 #[test]
3648 fn test_service_with_temporarily_invalidated_ptr() {
3649 let d = ServiceDaemon::new().expect("Failed to create daemon");
3651
3652 let service = "_test_inval_ptr._udp.local.";
3653 let host_name = "my_host_tmp_invalidated_ptr.local.";
3654 let intfs: Vec<_> = my_ip_interfaces(false);
3655 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3656 let port = 5201;
3657 let my_service =
3658 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3659 .expect("invalid service info")
3660 .enable_addr_auto();
3661 let result = d.register(my_service.clone());
3662 assert!(result.is_ok());
3663
3664 let browse_chan = d.browse(service).unwrap();
3666 let timeout = Duration::from_secs(2);
3667 let mut resolved = false;
3668
3669 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3670 match event {
3671 ServiceEvent::ServiceResolved(info) => {
3672 resolved = true;
3673 println!("Resolved a service of {}", &info.get_fullname());
3674 break;
3675 }
3676 e => {
3677 println!("Received event {:?}", e);
3678 }
3679 }
3680 }
3681
3682 assert!(resolved);
3683
3684 println!("Stopping browse of {}", service);
3685 d.stop_browse(service).unwrap();
3688
3689 let mut stopped = false;
3694 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3695 match event {
3696 ServiceEvent::SearchStopped(_) => {
3697 stopped = true;
3698 println!("Stopped browsing service");
3699 break;
3700 }
3701 e => {
3705 println!("Received event {:?}", e);
3706 }
3707 }
3708 }
3709
3710 assert!(stopped);
3711
3712 let invalidate_ptr_packet = DnsPointer::new(
3714 my_service.get_type(),
3715 RRType::PTR,
3716 CLASS_IN,
3717 0,
3718 my_service.get_fullname().to_string(),
3719 );
3720
3721 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3722 packet_buffer.add_additional_answer(invalidate_ptr_packet);
3723
3724 for intf in intfs {
3725 let sock = new_socket_bind(&intf, true).unwrap();
3726 send_dns_outgoing(&packet_buffer, &intf, &sock);
3727 }
3728
3729 println!(
3730 "Sent PTR record invalidation. Starting second browse for {}",
3731 service
3732 );
3733
3734 let browse_chan = d.browse(service).unwrap();
3736
3737 resolved = false;
3738 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3739 match event {
3740 ServiceEvent::ServiceResolved(info) => {
3741 resolved = true;
3742 println!("Resolved a service of {}", &info.get_fullname());
3743 break;
3744 }
3745 e => {
3746 println!("Received event {:?}", e);
3747 }
3748 }
3749 }
3750
3751 assert!(resolved);
3752 d.shutdown().unwrap();
3753 }
3754
3755 #[test]
3756 fn test_expired_srv() {
3757 let service_type = "_expired-srv._udp.local.";
3759 let instance = "test_instance";
3760 let host_name = "expired_srv_host.local.";
3761 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3762 .unwrap()
3763 .enable_addr_auto();
3764 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
3769
3770 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3772 let result = mdns_server.register(my_service);
3773 assert!(result.is_ok());
3774
3775 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3776 let browse_chan = mdns_client.browse(service_type).unwrap();
3777 let timeout = Duration::from_secs(2);
3778 let mut resolved = false;
3779
3780 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3781 match event {
3782 ServiceEvent::ServiceResolved(info) => {
3783 resolved = true;
3784 println!("Resolved a service of {}", &info.get_fullname());
3785 break;
3786 }
3787 _ => {}
3788 }
3789 }
3790
3791 assert!(resolved);
3792
3793 mdns_server.shutdown().unwrap();
3795
3796 let expire_timeout = Duration::from_secs(new_ttl as u64);
3798 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3799 match event {
3800 ServiceEvent::ServiceRemoved(service_type, full_name) => {
3801 println!("Service removed: {}: {}", &service_type, &full_name);
3802 break;
3803 }
3804 _ => {}
3805 }
3806 }
3807 }
3808
3809 #[test]
3810 fn test_hostname_resolution_address_removed() {
3811 let server = ServiceDaemon::new().expect("Failed to create server");
3813 let hostname = "addr_remove_host._tcp.local.";
3814 let service_ip_addr = my_ip_interfaces(false)
3815 .iter()
3816 .find(|iface| iface.ip().is_ipv4())
3817 .map(|iface| iface.ip())
3818 .unwrap();
3819
3820 let mut my_service = ServiceInfo::new(
3821 "_host_res_test._tcp.local.",
3822 "my_instance",
3823 hostname,
3824 &service_ip_addr,
3825 1234,
3826 None,
3827 )
3828 .expect("invalid service info");
3829
3830 let addr_ttl = 2;
3832 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
3835
3836 let client = ServiceDaemon::new().expect("Failed to create client");
3838 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
3839 let resolved = loop {
3840 match event_receiver.recv() {
3841 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
3842 assert!(found_hostname == hostname);
3843 assert!(addresses.contains(&service_ip_addr));
3844 println!("address found: {:?}", &addresses);
3845 break true;
3846 }
3847 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
3848 Ok(_event) => {}
3849 Err(_) => break false,
3850 }
3851 };
3852
3853 assert!(resolved);
3854
3855 server.shutdown().unwrap();
3857
3858 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
3860 let removed = loop {
3861 match event_receiver.recv_timeout(timeout) {
3862 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
3863 assert!(removed_host == hostname);
3864 assert!(addresses.contains(&service_ip_addr));
3865
3866 println!(
3867 "address removed: hostname: {} addresses: {:?}",
3868 &hostname, &addresses
3869 );
3870 break true;
3871 }
3872 Ok(_event) => {}
3873 Err(_) => {
3874 break false;
3875 }
3876 }
3877 };
3878
3879 assert!(removed);
3880
3881 client.shutdown().unwrap();
3882 }
3883
3884 #[test]
3885 fn test_refresh_ptr() {
3886 let service_type = "_refresh-ptr._udp.local.";
3888 let instance = "test_instance";
3889 let host_name = "refresh_ptr_host.local.";
3890 let service_ip_addr = my_ip_interfaces(false)
3891 .iter()
3892 .find(|iface| iface.ip().is_ipv4())
3893 .map(|iface| iface.ip())
3894 .unwrap();
3895
3896 let mut my_service = ServiceInfo::new(
3897 service_type,
3898 instance,
3899 host_name,
3900 &service_ip_addr,
3901 5023,
3902 None,
3903 )
3904 .unwrap();
3905
3906 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
3908
3909 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3911 let result = mdns_server.register(my_service);
3912 assert!(result.is_ok());
3913
3914 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3915 let browse_chan = mdns_client.browse(service_type).unwrap();
3916 let timeout = Duration::from_millis(1500); let mut resolved = false;
3918
3919 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3921 match event {
3922 ServiceEvent::ServiceResolved(info) => {
3923 resolved = true;
3924 println!("Resolved a service of {}", &info.get_fullname());
3925 break;
3926 }
3927 _ => {}
3928 }
3929 }
3930
3931 assert!(resolved);
3932
3933 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
3935 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3936 println!("event: {:?}", &event);
3937 }
3938
3939 let metrics_chan = mdns_client.get_metrics().unwrap();
3941 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
3942 let refresh_counter = metrics["cache-refresh-ptr"];
3943 assert_eq!(refresh_counter, 1);
3944
3945 mdns_server.shutdown().unwrap();
3947 mdns_client.shutdown().unwrap();
3948 }
3949
3950 #[test]
3951 fn test_name_change() {
3952 assert_eq!(name_change("foo.local."), "foo (2).local.");
3953 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
3954 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
3955 assert_eq!(name_change("foo"), "foo (2)");
3956 assert_eq!(name_change("foo (2)"), "foo (3)");
3957 assert_eq!(name_change(""), " (2)");
3958
3959 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
3964
3965 #[test]
3966 fn test_hostname_change() {
3967 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
3968 assert_eq!(hostname_change("foo"), "foo-2");
3969 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
3970 assert_eq!(hostname_change("foo-9"), "foo-10");
3971 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
3972 }
3973}