1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, RRType, CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA,
38 FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{
42 split_sub_domain, valid_ip_on_intf, DnsRegistry, Probe, ServiceInfo, ServiceStatus,
43 },
44 Receiver,
45};
46use flume::{bounded, Sender, TrySendError};
47use if_addrs::{IfAddr, Interface};
48use mio::{net::UdpSocket as MioUdpSocket, Poll};
49use socket2::Socket;
50use std::{
51 cmp::{self, Reverse},
52 collections::{BinaryHeap, HashMap, HashSet},
53 fmt,
54 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55 str, thread,
56 time::Duration,
57 vec,
58};
59
60pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72const MDNS_PORT: u16 = 5353;
73const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
74const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
75const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
76
77const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
78
79#[derive(Debug)]
81pub enum UnregisterStatus {
82 OK,
84 NotFound,
86}
87
88#[derive(Debug, PartialEq, Clone, Eq)]
90#[non_exhaustive]
91pub enum DaemonStatus {
92 Running,
94
95 Shutdown,
97}
98
99#[derive(Hash, Eq, PartialEq)]
102enum Counter {
103 Register,
104 RegisterResend,
105 Unregister,
106 UnregisterResend,
107 Browse,
108 ResolveHostname,
109 Respond,
110 CacheRefreshPTR,
111 CacheRefreshSrvTxt,
112 CacheRefreshAddr,
113 KnownAnswerSuppression,
114 CachedPTR,
115 CachedSRV,
116 CachedAddr,
117 CachedTxt,
118 CachedNSec,
119 CachedSubtype,
120 DnsRegistryProbe,
121 DnsRegistryActive,
122 DnsRegistryTimer,
123 DnsRegistryNameChange,
124 Timer,
125}
126
127impl fmt::Display for Counter {
128 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
129 match self {
130 Self::Register => write!(f, "register"),
131 Self::RegisterResend => write!(f, "register-resend"),
132 Self::Unregister => write!(f, "unregister"),
133 Self::UnregisterResend => write!(f, "unregister-resend"),
134 Self::Browse => write!(f, "browse"),
135 Self::ResolveHostname => write!(f, "resolve-hostname"),
136 Self::Respond => write!(f, "respond"),
137 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
138 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
139 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
140 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
141 Self::CachedPTR => write!(f, "cached-ptr"),
142 Self::CachedSRV => write!(f, "cached-srv"),
143 Self::CachedAddr => write!(f, "cached-addr"),
144 Self::CachedTxt => write!(f, "cached-txt"),
145 Self::CachedNSec => write!(f, "cached-nsec"),
146 Self::CachedSubtype => write!(f, "cached-subtype"),
147 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
148 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
149 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
150 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
151 Self::Timer => write!(f, "timer"),
152 }
153 }
154}
155
156pub type Metrics = HashMap<String, i64>;
159
160const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
166pub struct ServiceDaemon {
167 sender: Sender<Command>,
169
170 signal_addr: SocketAddr,
176}
177
178impl ServiceDaemon {
179 pub fn new() -> Result<Self> {
184 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
187
188 let signal_sock = UdpSocket::bind(signal_addr)
189 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
190
191 let signal_addr = signal_sock
193 .local_addr()
194 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
195
196 signal_sock
198 .set_nonblocking(true)
199 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
200
201 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
202
203 let (sender, receiver) = bounded(100);
204
205 let mio_sock = MioUdpSocket::from_std(signal_sock);
207 thread::Builder::new()
208 .name("mDNS_daemon".to_string())
209 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver))
210 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
211
212 Ok(Self {
213 sender,
214 signal_addr,
215 })
216 }
217
218 fn send_cmd(&self, cmd: Command) -> Result<()> {
221 let cmd_name = cmd.to_string();
222
223 self.sender.try_send(cmd).map_err(|e| match e {
225 TrySendError::Full(_) => Error::Again,
226 e => e_fmt!("flume::channel::send failed: {}", e),
227 })?;
228
229 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
231 let socket = UdpSocket::bind(addr)
232 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
233 socket
234 .send_to(cmd_name.as_bytes(), self.signal_addr)
235 .map_err(|e| {
236 e_fmt!(
237 "signal socket send_to {} ({}) failed: {}",
238 self.signal_addr,
239 cmd_name,
240 e
241 )
242 })?;
243
244 Ok(())
245 }
246
247 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
258 check_domain_suffix(service_type)?;
259
260 let (resp_s, resp_r) = bounded(10);
261 self.send_cmd(Command::Browse(service_type.to_string(), 1, resp_s))?;
262 Ok(resp_r)
263 }
264
265 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
270 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
271 }
272
273 pub fn resolve_hostname(
281 &self,
282 hostname: &str,
283 timeout: Option<u64>,
284 ) -> Result<Receiver<HostnameResolutionEvent>> {
285 check_hostname(hostname)?;
286 let (resp_s, resp_r) = bounded(10);
287 self.send_cmd(Command::ResolveHostname(
288 hostname.to_string(),
289 1,
290 resp_s,
291 timeout,
292 ))?;
293 Ok(resp_r)
294 }
295
296 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
301 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
302 }
303
304 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
312 check_service_name(service_info.get_fullname())?;
313 check_hostname(service_info.get_hostname())?;
314
315 self.send_cmd(Command::Register(service_info))
316 }
317
318 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
326 let (resp_s, resp_r) = bounded(1);
327 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
328 Ok(resp_r)
329 }
330
331 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
335 let (resp_s, resp_r) = bounded(100);
336 self.send_cmd(Command::Monitor(resp_s))?;
337 Ok(resp_r)
338 }
339
340 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
345 let (resp_s, resp_r) = bounded(1);
346 self.send_cmd(Command::Exit(resp_s))?;
347 Ok(resp_r)
348 }
349
350 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
356 let (resp_s, resp_r) = bounded(1);
357
358 if self.sender.is_disconnected() {
359 resp_s
360 .send(DaemonStatus::Shutdown)
361 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
362 } else {
363 self.send_cmd(Command::GetStatus(resp_s))?;
364 }
365
366 Ok(resp_r)
367 }
368
369 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
374 let (resp_s, resp_r) = bounded(1);
375 self.send_cmd(Command::GetMetrics(resp_s))?;
376 Ok(resp_r)
377 }
378
379 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
386 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
389 return Err(Error::Msg(format!(
390 "service name length max {} is too large",
391 len_max
392 )));
393 }
394
395 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
396 }
397
398 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
404 let interval_in_millis = interval_in_secs as u64 * 1000;
405 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
406 interval_in_millis,
407 )))
408 }
409
410 pub fn get_ip_check_interval(&self) -> Result<u32> {
412 let (resp_s, resp_r) = bounded(1);
413 self.send_cmd(Command::GetOption(resp_s))?;
414
415 let option = resp_r
416 .recv_timeout(Duration::from_secs(10))
417 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
418 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
419 Ok(ip_check_interval_in_secs as u32)
420 }
421
422 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
429 let if_kind_vec = if_kind.into_vec();
430 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
431 if_kind_vec.kinds,
432 )))
433 }
434
435 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
442 let if_kind_vec = if_kind.into_vec();
443 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
444 if_kind_vec.kinds,
445 )))
446 }
447
448 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
464 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
465 }
466
467 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
483 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
484 }
485
486 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
499 self.send_cmd(Command::Verify(instance_fullname, timeout))
500 }
501
502 fn daemon_thread(signal_sock: MioUdpSocket, poller: Poll, receiver: Receiver<Command>) {
503 let zc = Zeroconf::new(signal_sock, poller);
504
505 if let Some(cmd) = Self::run(zc, receiver) {
506 match cmd {
507 Command::Exit(resp_s) => {
508 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
511 debug!("exit: failed to send response of shutdown: {}", e);
512 }
513 }
514 _ => {
515 debug!("Unexpected command: {:?}", cmd);
516 }
517 }
518 }
519 }
520
521 fn run(mut zc: Zeroconf, receiver: Receiver<Command>) -> Option<Command> {
530 if let Err(e) = zc.poller.registry().register(
532 &mut zc.signal_sock,
533 mio::Token(SIGNAL_SOCK_EVENT_KEY),
534 mio::Interest::READABLE,
535 ) {
536 debug!("failed to add signal socket to the poller: {}", e);
537 return None;
538 }
539
540 for (intf, sock) in zc.intf_socks.iter_mut() {
542 let key =
543 Zeroconf::add_poll_impl(&mut zc.poll_ids, &mut zc.poll_id_count, intf.clone());
544
545 if let Err(e) =
546 zc.poller
547 .registry()
548 .register(sock, mio::Token(key), mio::Interest::READABLE)
549 {
550 debug!("add socket of {:?} to poller: {e}", intf);
551 return None;
552 }
553 }
554
555 let mut next_ip_check = if zc.ip_check_interval > 0 {
557 current_time_millis() + zc.ip_check_interval
558 } else {
559 0
560 };
561
562 if next_ip_check > 0 {
563 zc.add_timer(next_ip_check);
564 }
565
566 let mut events = mio::Events::with_capacity(1024);
569 loop {
570 let now = current_time_millis();
571
572 let earliest_timer = zc.peek_earliest_timer();
573 let timeout = earliest_timer.map(|timer| {
574 let millis = if timer > now { timer - now } else { 1 };
576 Duration::from_millis(millis)
577 });
578
579 events.clear();
581 match zc.poller.poll(&mut events, timeout) {
582 Ok(_) => zc.handle_poller_events(&events),
583 Err(e) => debug!("failed to select from sockets: {}", e),
584 }
585
586 let now = current_time_millis();
587
588 zc.pop_timers_till(now);
590
591 for hostname in zc
593 .hostname_resolvers
594 .clone()
595 .into_iter()
596 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
597 .map(|(hostname, _)| hostname)
598 {
599 trace!("hostname resolver timeout for {}", &hostname);
600 call_hostname_resolution_listener(
601 &zc.hostname_resolvers,
602 &hostname,
603 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
604 );
605 call_hostname_resolution_listener(
606 &zc.hostname_resolvers,
607 &hostname,
608 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
609 );
610 zc.hostname_resolvers.remove(&hostname);
611 }
612
613 while let Ok(command) = receiver.try_recv() {
615 if matches!(command, Command::Exit(_)) {
616 zc.status = DaemonStatus::Shutdown;
617 return Some(command);
618 }
619 zc.exec_command(command, false);
620 }
621
622 let mut i = 0;
624 while i < zc.retransmissions.len() {
625 if now >= zc.retransmissions[i].next_time {
626 let rerun = zc.retransmissions.remove(i);
627 zc.exec_command(rerun.command, true);
628 } else {
629 i += 1;
630 }
631 }
632
633 zc.refresh_active_services();
635
636 let mut query_count = 0;
638 for (hostname, _sender) in zc.hostname_resolvers.iter() {
639 for (hostname, ip_addr) in
640 zc.cache.refresh_due_hostname_resolutions(hostname).iter()
641 {
642 zc.send_query(hostname, ip_address_rr_type(ip_addr));
643 query_count += 1;
644 }
645 }
646
647 zc.increase_counter(Counter::CacheRefreshAddr, query_count);
648
649 let now = current_time_millis();
651
652 let expired_services = zc.cache.evict_expired_services(now);
654 zc.notify_service_removal(expired_services);
655
656 let expired_addrs = zc.cache.evict_expired_addr(now);
658 for (hostname, addrs) in expired_addrs {
659 call_hostname_resolution_listener(
660 &zc.hostname_resolvers,
661 &hostname,
662 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
663 );
664 let instances = zc.cache.get_instances_on_host(&hostname);
665 let instance_set: HashSet<String> = instances.into_iter().collect();
666 zc.resolve_updated_instances(&instance_set);
667 }
668
669 zc.probing_handler();
671
672 if now >= next_ip_check && next_ip_check > 0 {
674 next_ip_check = now + zc.ip_check_interval;
675 zc.add_timer(next_ip_check);
676
677 zc.check_ip_changes();
678 }
679 }
680 }
681}
682
683fn new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MioUdpSocket> {
685 let intf_ip = &intf.ip();
688 match intf_ip {
689 IpAddr::V4(ip) => {
690 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
691 let sock = new_socket(addr.into(), true)?;
692
693 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
695 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
696
697 sock.set_multicast_if_v4(ip)
699 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
700
701 sock.set_multicast_ttl_v4(255)
706 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
707
708 if !should_loop {
709 sock.set_multicast_loop_v4(false)
710 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
711 }
712
713 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
715 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
716 for packet in test_packets {
717 sock.send_to(&packet, &multicast_addr)
718 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
719 }
720 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
721 }
722 IpAddr::V6(ip) => {
723 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
724 let sock = new_socket(addr.into(), true)?;
725
726 sock.join_multicast_v6(&GROUP_ADDR_V6, intf.index.unwrap_or(0))
728 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
729
730 sock.set_multicast_if_v6(intf.index.unwrap_or(0))
732 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
733
734 sock.set_multicast_hops_v6(255)
738 .map_err(|e| e_fmt!("set set_multicast_hops_v6 on addr {}: {}", ip, e))?;
739
740 Ok(MioUdpSocket::from_std(UdpSocket::from(sock)))
745 }
746 }
747}
748
749fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
752 let domain = match addr {
753 SocketAddr::V4(_) => socket2::Domain::IPV4,
754 SocketAddr::V6(_) => socket2::Domain::IPV6,
755 };
756
757 let fd = Socket::new(domain, socket2::Type::DGRAM, None)
758 .map_err(|e| e_fmt!("create socket failed: {}", e))?;
759
760 fd.set_reuse_address(true)
761 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
762 #[cfg(unix)] fd.set_reuse_port(true)
764 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
765
766 if non_block {
767 fd.set_nonblocking(true)
768 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
769 }
770
771 fd.bind(&addr.into())
772 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
773
774 trace!("new socket bind to {}", &addr);
775 Ok(fd)
776}
777
778struct ReRun {
780 next_time: u64,
782 command: Command,
783}
784
785#[derive(Debug, Eq, Hash, PartialEq)]
787enum IpVersion {
788 V4,
789 V6,
790}
791
792#[derive(Debug, Eq, Hash, PartialEq)]
794struct MulticastSendTracker {
795 intf_index: u32,
796 ip_version: IpVersion,
797}
798
799fn multicast_send_tracker(intf: &Interface) -> Option<MulticastSendTracker> {
801 match intf.index {
802 Some(index) => {
803 let ip_ver = match intf.addr {
804 IfAddr::V4(_) => IpVersion::V4,
805 IfAddr::V6(_) => IpVersion::V6,
806 };
807 Some(MulticastSendTracker {
808 intf_index: index,
809 ip_version: ip_ver,
810 })
811 }
812 None => None,
813 }
814}
815
816#[derive(Debug, Clone)]
820#[non_exhaustive]
821pub enum IfKind {
822 All,
824
825 IPv4,
827
828 IPv6,
830
831 Name(String),
833
834 Addr(IpAddr),
836
837 LoopbackV4,
842
843 LoopbackV6,
845}
846
847impl IfKind {
848 fn matches(&self, intf: &Interface) -> bool {
850 match self {
851 Self::All => true,
852 Self::IPv4 => intf.ip().is_ipv4(),
853 Self::IPv6 => intf.ip().is_ipv6(),
854 Self::Name(ifname) => ifname == &intf.name,
855 Self::Addr(addr) => addr == &intf.ip(),
856 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
857 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
858 }
859 }
860}
861
862impl From<&str> for IfKind {
865 fn from(val: &str) -> Self {
866 Self::Name(val.to_string())
867 }
868}
869
870impl From<&String> for IfKind {
871 fn from(val: &String) -> Self {
872 Self::Name(val.to_string())
873 }
874}
875
876impl From<IpAddr> for IfKind {
878 fn from(val: IpAddr) -> Self {
879 Self::Addr(val)
880 }
881}
882
883pub struct IfKindVec {
885 kinds: Vec<IfKind>,
886}
887
888pub trait IntoIfKindVec {
890 fn into_vec(self) -> IfKindVec;
891}
892
893impl<T: Into<IfKind>> IntoIfKindVec for T {
894 fn into_vec(self) -> IfKindVec {
895 let if_kind: IfKind = self.into();
896 IfKindVec {
897 kinds: vec![if_kind],
898 }
899 }
900}
901
902impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
903 fn into_vec(self) -> IfKindVec {
904 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
905 IfKindVec { kinds }
906 }
907}
908
909struct IfSelection {
911 if_kind: IfKind,
913
914 selected: bool,
916}
917
918struct Zeroconf {
920 intf_socks: HashMap<Interface, MioUdpSocket>,
922
923 poll_ids: HashMap<usize, Interface>,
925
926 poll_id_count: usize,
928
929 my_services: HashMap<String, ServiceInfo>,
931
932 cache: DnsCache,
934
935 dns_registry_map: HashMap<Interface, DnsRegistry>,
937
938 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
949
950 counters: Metrics,
951
952 poller: Poll,
954
955 monitors: Vec<Sender<DaemonEvent>>,
957
958 service_name_len_max: u8,
960
961 ip_check_interval: u64,
963
964 if_selections: Vec<IfSelection>,
966
967 signal_sock: MioUdpSocket,
969
970 timers: BinaryHeap<Reverse<u64>>,
976
977 status: DaemonStatus,
978
979 pending_resolves: HashSet<String>,
981
982 resolved: HashSet<String>,
984
985 multicast_loop_v4: bool,
986
987 multicast_loop_v6: bool,
988}
989
990impl Zeroconf {
991 fn new(signal_sock: MioUdpSocket, poller: Poll) -> Self {
992 let my_ifaddrs = my_ip_interfaces(false);
994
995 let mut intf_socks = HashMap::new();
999 let mut dns_registry_map = HashMap::new();
1000
1001 for intf in my_ifaddrs {
1002 let sock = match new_socket_bind(&intf, true) {
1003 Ok(s) => s,
1004 Err(e) => {
1005 trace!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1006 continue;
1007 }
1008 };
1009
1010 dns_registry_map.insert(intf.clone(), DnsRegistry::new());
1011
1012 intf_socks.insert(intf, sock);
1013 }
1014
1015 let monitors = Vec::new();
1016 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1017 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1018
1019 let timers = BinaryHeap::new();
1020
1021 let if_selections = vec![
1023 IfSelection {
1024 if_kind: IfKind::LoopbackV4,
1025 selected: false,
1026 },
1027 IfSelection {
1028 if_kind: IfKind::LoopbackV6,
1029 selected: false,
1030 },
1031 ];
1032
1033 let status = DaemonStatus::Running;
1034
1035 Self {
1036 intf_socks,
1037 poll_ids: HashMap::new(),
1038 poll_id_count: 0,
1039 my_services: HashMap::new(),
1040 cache: DnsCache::new(),
1041 dns_registry_map,
1042 hostname_resolvers: HashMap::new(),
1043 service_queriers: HashMap::new(),
1044 retransmissions: Vec::new(),
1045 counters: HashMap::new(),
1046 poller,
1047 monitors,
1048 service_name_len_max,
1049 ip_check_interval,
1050 if_selections,
1051 signal_sock,
1052 timers,
1053 status,
1054 pending_resolves: HashSet::new(),
1055 resolved: HashSet::new(),
1056 multicast_loop_v4: true,
1057 multicast_loop_v6: true,
1058 }
1059 }
1060
1061 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1062 match daemon_opt {
1063 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1064 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1065 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1066 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1067 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1068 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1069 }
1070 }
1071
1072 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1073 for if_kind in kinds {
1074 self.if_selections.push(IfSelection {
1075 if_kind,
1076 selected: true,
1077 });
1078 }
1079
1080 self.apply_intf_selections(my_ip_interfaces(true));
1081 }
1082
1083 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1084 for if_kind in kinds {
1085 self.if_selections.push(IfSelection {
1086 if_kind,
1087 selected: false,
1088 });
1089 }
1090
1091 self.apply_intf_selections(my_ip_interfaces(true));
1092 }
1093
1094 fn set_multicast_loop_v4(&mut self, on: bool) {
1095 for (_, sock) in self.intf_socks.iter_mut() {
1096 if let Err(e) = sock.set_multicast_loop_v4(on) {
1097 debug!("failed to set multicast loop v4: {e}");
1098 }
1099 }
1100 }
1101
1102 fn set_multicast_loop_v6(&mut self, on: bool) {
1103 for (_, sock) in self.intf_socks.iter_mut() {
1104 if let Err(e) = sock.set_multicast_loop_v6(on) {
1105 debug!("failed to set multicast loop v6: {e}");
1106 }
1107 }
1108 }
1109
1110 fn notify_monitors(&mut self, event: DaemonEvent) {
1111 self.monitors.retain(|sender| {
1113 if let Err(e) = sender.try_send(event.clone()) {
1114 debug!("notify_monitors: try_send: {}", &e);
1115 if matches!(e, TrySendError::Disconnected(_)) {
1116 return false; }
1118 }
1119 true
1120 });
1121 }
1122
1123 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1125 for (_, service_info) in self.my_services.iter_mut() {
1126 if service_info.is_addr_auto() {
1127 service_info.remove_ipaddr(addr);
1128 }
1129 }
1130 }
1131
1132 fn add_poll(&mut self, intf: Interface) -> usize {
1134 Self::add_poll_impl(&mut self.poll_ids, &mut self.poll_id_count, intf)
1135 }
1136
1137 fn add_poll_impl(
1141 poll_ids: &mut HashMap<usize, Interface>,
1142 poll_id_count: &mut usize,
1143 intf: Interface,
1144 ) -> usize {
1145 let key = *poll_id_count;
1146 *poll_id_count += 1;
1147 let _ = (*poll_ids).insert(key, intf);
1148 key
1149 }
1150
1151 fn add_timer(&mut self, next_time: u64) {
1152 self.timers.push(Reverse(next_time));
1153 }
1154
1155 fn peek_earliest_timer(&self) -> Option<u64> {
1156 self.timers.peek().map(|Reverse(v)| *v)
1157 }
1158
1159 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1160 self.timers.pop().map(|Reverse(v)| v)
1161 }
1162
1163 fn pop_timers_till(&mut self, now: u64) {
1165 while let Some(Reverse(v)) = self.timers.peek() {
1166 if *v > now {
1167 break;
1168 }
1169 self.timers.pop();
1170 }
1171 }
1172
1173 fn selected_addrs(&self, interfaces: Vec<Interface>) -> HashSet<IpAddr> {
1175 let intf_count = interfaces.len();
1176 let mut intf_selections = vec![true; intf_count];
1177
1178 for selection in self.if_selections.iter() {
1180 for i in 0..intf_count {
1182 if selection.if_kind.matches(&interfaces[i]) {
1183 intf_selections[i] = selection.selected;
1184 }
1185 }
1186 }
1187
1188 let mut selected_addrs = HashSet::new();
1189 for i in 0..intf_count {
1190 if intf_selections[i] {
1191 selected_addrs.insert(interfaces[i].addr.ip());
1192 }
1193 }
1194
1195 selected_addrs
1196 }
1197
1198 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1203 let intf_count = interfaces.len();
1205 let mut intf_selections = vec![true; intf_count];
1206
1207 for selection in self.if_selections.iter() {
1209 for i in 0..intf_count {
1211 if selection.if_kind.matches(&interfaces[i]) {
1212 intf_selections[i] = selection.selected;
1213 }
1214 }
1215 }
1216
1217 for (idx, intf) in interfaces.into_iter().enumerate() {
1219 if intf_selections[idx] {
1220 if !self.intf_socks.contains_key(&intf) {
1222 debug!("apply_intf_selections: add {:?}", &intf.ip());
1223 self.add_new_interface(intf);
1224 }
1225 } else {
1226 if let Some(mut sock) = self.intf_socks.remove(&intf) {
1228 match self.poller.registry().deregister(&mut sock) {
1229 Ok(()) => debug!("apply_intf_selections: deregister {:?}", &intf.ip()),
1230 Err(e) => debug!("apply_intf_selections: poller.delete {:?}: {}", &intf, e),
1231 }
1232
1233 self.poll_ids.retain(|_, v| v != &intf);
1235
1236 self.cache.remove_addrs_on_disabled_intf(&intf);
1238 }
1239 }
1240 }
1241 }
1242
1243 fn check_ip_changes(&mut self) {
1245 let my_ifaddrs = my_ip_interfaces(true);
1247
1248 let poll_ids = &mut self.poll_ids;
1249 let poller = &mut self.poller;
1250 let deleted_addrs = self
1252 .intf_socks
1253 .iter_mut()
1254 .filter_map(|(intf, sock)| {
1255 if !my_ifaddrs.contains(intf) {
1256 if let Err(e) = poller.registry().deregister(sock) {
1257 debug!("check_ip_changes: poller.delete {:?}: {}", intf, e);
1258 }
1259 poll_ids.retain(|_, v| v != intf);
1261 Some(intf.ip())
1262 } else {
1263 None
1264 }
1265 })
1266 .collect::<Vec<IpAddr>>();
1267
1268 for ip in deleted_addrs.iter() {
1270 self.del_addr_in_my_services(ip);
1271 self.notify_monitors(DaemonEvent::IpDel(*ip));
1272 }
1273
1274 self.intf_socks.retain(|intf, _| my_ifaddrs.contains(intf));
1276
1277 self.apply_intf_selections(my_ifaddrs);
1279 }
1280
1281 fn add_new_interface(&mut self, intf: Interface) {
1282 let new_ip = intf.ip();
1284 let should_loop = if new_ip.is_ipv4() {
1285 self.multicast_loop_v4
1286 } else {
1287 self.multicast_loop_v6
1288 };
1289 let mut sock = match new_socket_bind(&intf, should_loop) {
1290 Ok(s) => s,
1291 Err(e) => {
1292 debug!("bind a socket to {}: {}. Skipped.", &intf.ip(), e);
1293 return;
1294 }
1295 };
1296
1297 let key = self.add_poll(intf.clone());
1299 if let Err(e) =
1300 self.poller
1301 .registry()
1302 .register(&mut sock, mio::Token(key), mio::Interest::READABLE)
1303 {
1304 debug!("check_ip_changes: poller add ip {}: {}", new_ip, e);
1305 return;
1306 }
1307
1308 debug!("add new interface {}: {new_ip}", intf.name);
1309 let dns_registry = match self.dns_registry_map.get_mut(&intf) {
1310 Some(registry) => registry,
1311 None => self
1312 .dns_registry_map
1313 .entry(intf.clone())
1314 .or_insert_with(DnsRegistry::new),
1315 };
1316
1317 for (_, service_info) in self.my_services.iter_mut() {
1318 if service_info.is_addr_auto() {
1319 service_info.insert_ipaddr(new_ip);
1320
1321 if announce_service_on_intf(dns_registry, service_info, &intf, &sock) {
1322 debug!(
1323 "Announce service {} on {}",
1324 service_info.get_fullname(),
1325 intf.ip()
1326 );
1327 service_info.set_status(&intf, ServiceStatus::Announced);
1328 } else {
1329 for timer in dns_registry.new_timers.drain(..) {
1330 self.timers.push(Reverse(timer));
1331 }
1332 service_info.set_status(&intf, ServiceStatus::Probing);
1333 }
1334 }
1335 }
1336
1337 self.intf_socks.insert(intf, sock);
1338
1339 self.notify_monitors(DaemonEvent::IpAdd(new_ip));
1341 }
1342
1343 fn register_service(&mut self, mut info: ServiceInfo) {
1352 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1354 debug!("check_service_name_length: {}", &e);
1355 self.notify_monitors(DaemonEvent::Error(e));
1356 return;
1357 }
1358
1359 if info.is_addr_auto() {
1360 let selected_addrs = self.selected_addrs(my_ip_interfaces(true));
1361 for addr in selected_addrs {
1362 info.insert_ipaddr(addr);
1363 }
1364 }
1365
1366 debug!("register service {:?}", &info);
1367
1368 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1369 if !outgoing_addrs.is_empty() {
1370 self.notify_monitors(DaemonEvent::Announce(
1371 info.get_fullname().to_string(),
1372 format!("{:?}", &outgoing_addrs),
1373 ));
1374 }
1375
1376 let service_fullname = info.get_fullname().to_lowercase();
1379 self.my_services.insert(service_fullname, info);
1380 }
1381
1382 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1385 let mut outgoing_addrs = Vec::new();
1386 let mut multicast_sent_trackers = HashSet::new();
1388
1389 let mut outgoing_intfs = Vec::new();
1390
1391 for (intf, sock) in self.intf_socks.iter() {
1392 if let Some(tracker) = multicast_send_tracker(intf) {
1393 if multicast_sent_trackers.contains(&tracker) {
1394 continue; }
1396 }
1397
1398 let dns_registry = match self.dns_registry_map.get_mut(intf) {
1399 Some(registry) => registry,
1400 None => self
1401 .dns_registry_map
1402 .entry(intf.clone())
1403 .or_insert_with(DnsRegistry::new),
1404 };
1405
1406 if announce_service_on_intf(dns_registry, info, intf, sock) {
1407 if let Some(tracker) = multicast_send_tracker(intf) {
1408 multicast_sent_trackers.insert(tracker);
1409 }
1410 outgoing_addrs.push(intf.ip());
1411 outgoing_intfs.push(intf.clone());
1412
1413 debug!("Announce service {} on {}", info.get_fullname(), intf.ip());
1414
1415 info.set_status(intf, ServiceStatus::Announced);
1416 } else {
1417 for timer in dns_registry.new_timers.drain(..) {
1418 self.timers.push(Reverse(timer));
1419 }
1420 info.set_status(intf, ServiceStatus::Probing);
1421 }
1422 }
1423
1424 let next_time = current_time_millis() + 1000;
1428 for intf in outgoing_intfs {
1429 self.add_retransmission(
1430 next_time,
1431 Command::RegisterResend(info.get_fullname().to_string(), intf),
1432 );
1433 }
1434
1435 outgoing_addrs
1436 }
1437
1438 fn probing_handler(&mut self) {
1440 let now = current_time_millis();
1441
1442 for (intf, sock) in self.intf_socks.iter() {
1443 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
1444 continue;
1445 };
1446
1447 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1448
1449 if !out.questions().is_empty() {
1451 debug!("sending out probing of {} questions", out.questions().len());
1452 send_dns_outgoing(&out, intf, sock);
1453 }
1454
1455 let waiting_services =
1457 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1458
1459 for service_name in waiting_services {
1460 debug!(
1461 "try to announce service {service_name} on intf {}",
1462 intf.ip()
1463 );
1464 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1466 if info.get_status(intf) == ServiceStatus::Announced {
1467 debug!("service {} already announced", info.get_fullname());
1468 continue;
1469 }
1470
1471 if announce_service_on_intf(dns_registry, info, intf, sock) {
1472 let next_time = now + 1000;
1473 let command =
1474 Command::RegisterResend(info.get_fullname().to_string(), intf.clone());
1475 self.retransmissions.push(ReRun { next_time, command });
1476 self.timers.push(Reverse(next_time));
1477
1478 let fullname = match dns_registry.name_changes.get(&service_name) {
1479 Some(new_name) => new_name.to_string(),
1480 None => service_name.to_string(),
1481 };
1482
1483 let mut hostname = info.get_hostname();
1484 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1485 hostname = new_name;
1486 }
1487
1488 debug!("wake up: announce service {} on {}", fullname, intf.ip());
1489 notify_monitors(
1490 &mut self.monitors,
1491 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.ip())),
1492 );
1493
1494 info.set_status(intf, ServiceStatus::Announced);
1495 }
1496 }
1497 }
1498 }
1499 }
1500
1501 fn unregister_service(
1502 &self,
1503 info: &ServiceInfo,
1504 intf: &Interface,
1505 sock: &MioUdpSocket,
1506 ) -> Vec<u8> {
1507 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1508 out.add_answer_at_time(
1509 DnsPointer::new(
1510 info.get_type(),
1511 RRType::PTR,
1512 CLASS_IN,
1513 0,
1514 info.get_fullname().to_string(),
1515 ),
1516 0,
1517 );
1518
1519 if let Some(sub) = info.get_subtype() {
1520 trace!("Adding subdomain {}", sub);
1521 out.add_answer_at_time(
1522 DnsPointer::new(
1523 sub,
1524 RRType::PTR,
1525 CLASS_IN,
1526 0,
1527 info.get_fullname().to_string(),
1528 ),
1529 0,
1530 );
1531 }
1532
1533 out.add_answer_at_time(
1534 DnsSrv::new(
1535 info.get_fullname(),
1536 CLASS_IN | CLASS_CACHE_FLUSH,
1537 0,
1538 info.get_priority(),
1539 info.get_weight(),
1540 info.get_port(),
1541 info.get_hostname().to_string(),
1542 ),
1543 0,
1544 );
1545 out.add_answer_at_time(
1546 DnsTxt::new(
1547 info.get_fullname(),
1548 CLASS_IN | CLASS_CACHE_FLUSH,
1549 0,
1550 info.generate_txt(),
1551 ),
1552 0,
1553 );
1554
1555 for address in info.get_addrs_on_intf(intf) {
1556 out.add_answer_at_time(
1557 DnsAddress::new(
1558 info.get_hostname(),
1559 ip_address_rr_type(&address),
1560 CLASS_IN | CLASS_CACHE_FLUSH,
1561 0,
1562 address,
1563 intf.into(),
1564 ),
1565 0,
1566 );
1567 }
1568
1569 send_dns_outgoing(&out, intf, sock).remove(0)
1571 }
1572
1573 fn add_hostname_resolver(
1577 &mut self,
1578 hostname: String,
1579 listener: Sender<HostnameResolutionEvent>,
1580 timeout: Option<u64>,
1581 ) {
1582 let real_timeout = timeout.map(|t| current_time_millis() + t);
1583 self.hostname_resolvers
1584 .insert(hostname.to_lowercase(), (listener, real_timeout));
1585 if let Some(t) = real_timeout {
1586 self.add_timer(t);
1587 }
1588 }
1589
1590 fn send_query(&self, name: &str, qtype: RRType) {
1592 self.send_query_vec(&[(name, qtype)]);
1593 }
1594
1595 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
1597 trace!("Sending query questions: {:?}", questions);
1598 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
1599 let now = current_time_millis();
1600
1601 for (name, qtype) in questions {
1602 out.add_question(name, *qtype);
1603
1604 for record in self.cache.get_known_answers(name, *qtype, now) {
1605 trace!("add known answer: {:?}", record);
1613 let mut new_record = record.clone();
1614 new_record.get_record_mut().update_ttl(now);
1615 out.add_answer_box(new_record);
1616 }
1617 }
1618
1619 let mut multicast_sent_trackers = HashSet::new();
1621 for (intf, sock) in self.intf_socks.iter() {
1622 if let Some(tracker) = multicast_send_tracker(intf) {
1623 if multicast_sent_trackers.contains(&tracker) {
1624 continue; }
1626 multicast_sent_trackers.insert(tracker);
1627 }
1628 send_dns_outgoing(&out, intf, sock);
1629 }
1630 }
1631
1632 fn handle_read(&mut self, intf: &Interface) -> bool {
1637 let sock = match self.intf_socks.get_mut(intf) {
1638 Some(if_sock) => if_sock,
1639 None => return false,
1640 };
1641 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
1642
1643 let sz = match sock.recv(&mut buf) {
1650 Ok(sz) => sz,
1651 Err(e) => {
1652 if e.kind() != std::io::ErrorKind::WouldBlock {
1653 debug!("listening socket read failed: {}", e);
1654 }
1655 return false;
1656 }
1657 };
1658
1659 trace!("received {} bytes at IP: {}", sz, intf.ip());
1660
1661 if sz == 0 {
1663 debug!("socket {:?} was likely shutdown", &sock);
1664 if let Err(e) = self.poller.registry().deregister(sock) {
1665 debug!("failed to remove sock {:?} from poller: {}", sock, &e);
1666 }
1667
1668 let should_loop = if intf.ip().is_ipv4() {
1670 self.multicast_loop_v4
1671 } else {
1672 self.multicast_loop_v6
1673 };
1674 match new_socket_bind(intf, should_loop) {
1675 Ok(new_sock) => {
1676 trace!("reset socket for IP {}", intf.ip());
1677 self.intf_socks.insert(intf.clone(), new_sock);
1678 }
1679 Err(e) => debug!("re-bind a socket to {:?}: {}", intf, e),
1680 }
1681
1682 return false;
1683 }
1684
1685 buf.truncate(sz); match DnsIncoming::new(buf, intf.into()) {
1688 Ok(msg) => {
1689 if msg.is_query() {
1690 self.handle_query(msg, intf);
1691 } else if msg.is_response() {
1692 self.handle_response(msg, intf);
1693 } else {
1694 debug!("Invalid message: not query and not response");
1695 }
1696 }
1697 Err(e) => debug!("Invalid incoming DNS message: {}", e),
1698 }
1699
1700 true
1701 }
1702
1703 fn query_unresolved(&mut self, instance: &str) -> bool {
1705 if !valid_instance_name(instance) {
1706 trace!("instance name {} not valid", instance);
1707 return false;
1708 }
1709
1710 if let Some(records) = self.cache.get_srv(instance) {
1711 for record in records {
1712 if let Some(srv) = record.any().downcast_ref::<DnsSrv>() {
1713 if self.cache.get_addr(srv.host()).is_none() {
1714 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
1715 return true;
1716 }
1717 }
1718 }
1719 } else {
1720 self.send_query(instance, RRType::ANY);
1721 return true;
1722 }
1723
1724 false
1725 }
1726
1727 fn query_cache_for_service(
1730 &mut self,
1731 ty_domain: &str,
1732 sender: &Sender<ServiceEvent>,
1733 now: u64,
1734 ) {
1735 let mut resolved: HashSet<String> = HashSet::new();
1736 let mut unresolved: HashSet<String> = HashSet::new();
1737
1738 if let Some(records) = self.cache.get_ptr(ty_domain) {
1739 for record in records.iter().filter(|r| !r.expires_soon(now)) {
1740 if let Some(ptr) = record.any().downcast_ref::<DnsPointer>() {
1741 let info = match self.create_service_info_from_cache(ty_domain, ptr.alias()) {
1742 Ok(ok) => ok,
1743 Err(err) => {
1744 debug!("Error while creating service info from cache: {}", err);
1745 continue;
1746 }
1747 };
1748
1749 match sender.send(ServiceEvent::ServiceFound(
1750 ty_domain.to_string(),
1751 ptr.alias().to_string(),
1752 )) {
1753 Ok(()) => debug!("send service found {}", ptr.alias()),
1754 Err(e) => {
1755 debug!("failed to send service found: {}", e);
1756 continue;
1757 }
1758 }
1759
1760 if info.is_ready() {
1761 resolved.insert(ptr.alias().to_string());
1762 match sender.send(ServiceEvent::ServiceResolved(info)) {
1763 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
1764 Err(e) => debug!("failed to send service resolved: {}", e),
1765 }
1766 } else {
1767 unresolved.insert(ptr.alias().to_string());
1768 }
1769 }
1770 }
1771 }
1772
1773 for instance in resolved.drain() {
1774 self.pending_resolves.remove(&instance);
1775 self.resolved.insert(instance);
1776 }
1777
1778 for instance in unresolved.drain() {
1779 self.add_pending_resolve(instance);
1780 }
1781 }
1782
1783 fn query_cache_for_hostname(
1786 &mut self,
1787 hostname: &str,
1788 sender: Sender<HostnameResolutionEvent>,
1789 ) {
1790 let addresses_map = self.cache.get_addresses_for_host(hostname);
1791 for (name, addresses) in addresses_map {
1792 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
1793 Ok(()) => trace!("sent hostname addresses found"),
1794 Err(e) => debug!("failed to send hostname addresses found: {}", e),
1795 }
1796 }
1797 }
1798
1799 fn add_pending_resolve(&mut self, instance: String) {
1800 if !self.pending_resolves.contains(&instance) {
1801 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
1802 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
1803 self.pending_resolves.insert(instance);
1804 }
1805 }
1806
1807 fn create_service_info_from_cache(
1808 &self,
1809 ty_domain: &str,
1810 fullname: &str,
1811 ) -> Result<ServiceInfo> {
1812 let my_name = {
1813 let name = fullname.trim_end_matches(split_sub_domain(ty_domain).0);
1814 name.strip_suffix('.').unwrap_or(name).to_string()
1815 };
1816
1817 let now = current_time_millis();
1818 let mut info = ServiceInfo::new(ty_domain, &my_name, "", (), 0, None)?;
1819
1820 if let Some(subtype) = self.cache.get_subtype(fullname) {
1822 trace!(
1823 "ty_domain: {} found subtype {} for instance: {}",
1824 ty_domain,
1825 subtype,
1826 fullname
1827 );
1828 if info.get_subtype().is_none() {
1829 info.set_subtype(subtype.clone());
1830 }
1831 }
1832
1833 if let Some(records) = self.cache.get_srv(fullname) {
1835 if let Some(answer) = records.iter().find(|r| !r.expires_soon(now)) {
1836 if let Some(dns_srv) = answer.any().downcast_ref::<DnsSrv>() {
1837 info.set_hostname(dns_srv.host().to_string());
1838 info.set_port(dns_srv.port());
1839 }
1840 }
1841 }
1842
1843 if let Some(records) = self.cache.get_txt(fullname) {
1845 if let Some(record) = records.iter().find(|r| !r.expires_soon(now)) {
1846 if let Some(dns_txt) = record.any().downcast_ref::<DnsTxt>() {
1847 info.set_properties_from_txt(dns_txt.text());
1848 }
1849 }
1850 }
1851
1852 if let Some(records) = self.cache.get_addr(info.get_hostname()) {
1854 for answer in records.iter() {
1855 if let Some(dns_a) = answer.any().downcast_ref::<DnsAddress>() {
1856 if dns_a.expires_soon(now) {
1857 trace!("Addr expired or expires soon: {}", dns_a.address());
1858 } else {
1859 info.insert_ipaddr(dns_a.address());
1860 }
1861 }
1862 }
1863 }
1864
1865 Ok(info)
1866 }
1867
1868 fn handle_poller_events(&mut self, events: &mio::Events) {
1869 for ev in events.iter() {
1870 trace!("event received with key {:?}", ev.token());
1871 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
1872 self.signal_sock_drain();
1874
1875 if let Err(e) = self.poller.registry().reregister(
1876 &mut self.signal_sock,
1877 ev.token(),
1878 mio::Interest::READABLE,
1879 ) {
1880 debug!("failed to modify poller for signal socket: {}", e);
1881 }
1882 continue; }
1884
1885 let intf = match self.poll_ids.get(&ev.token().0) {
1887 Some(interface) => interface.clone(),
1888 None => {
1889 debug!("Ip for event key {} not found", ev.token().0);
1890 break;
1891 }
1892 };
1893 while self.handle_read(&intf) {}
1894
1895 if let Some(sock) = self.intf_socks.get_mut(&intf) {
1897 if let Err(e) =
1898 self.poller
1899 .registry()
1900 .reregister(sock, ev.token(), mio::Interest::READABLE)
1901 {
1902 debug!("modify poller for interface {:?}: {}", &intf, e);
1903 break;
1904 }
1905 }
1906 }
1907 }
1908
1909 fn handle_response(&mut self, mut msg: DnsIncoming, intf: &Interface) {
1912 trace!(
1913 "handle_response: {} answers {} authorities {} additionals",
1914 msg.answers().len(),
1915 &msg.authorities().len(),
1916 &msg.num_additionals()
1917 );
1918 let now = current_time_millis();
1919
1920 let mut record_predicate = |record: &DnsRecordBox| {
1922 if !record.get_record().is_expired(now) {
1923 return true;
1924 }
1925
1926 debug!("record is expired, removing it from cache.");
1927 if self.cache.remove(record) {
1928 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
1930 call_service_listener(
1931 &self.service_queriers,
1932 dns_ptr.get_name(),
1933 ServiceEvent::ServiceRemoved(
1934 dns_ptr.get_name().to_string(),
1935 dns_ptr.alias().to_string(),
1936 ),
1937 );
1938 }
1939 }
1940 false
1941 };
1942 msg.answers_mut().retain(&mut record_predicate);
1943 msg.authorities_mut().retain(&mut record_predicate);
1944 msg.additionals_mut().retain(&mut record_predicate);
1945
1946 self.conflict_handler(&msg, intf);
1948
1949 let mut is_for_us = true; for answer in msg.answers() {
1956 if answer.get_type() == RRType::PTR {
1957 if self.service_queriers.contains_key(answer.get_name()) {
1958 is_for_us = true;
1959 break; } else {
1961 is_for_us = false;
1962 }
1963 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
1964 let answer_lowercase = answer.get_name().to_lowercase();
1966 if self.hostname_resolvers.contains_key(&answer_lowercase) {
1967 is_for_us = true;
1968 break; }
1970 }
1971 }
1972
1973 struct InstanceChange {
1975 ty: RRType, name: String, }
1978
1979 let mut changes = Vec::new();
1987 let mut timers = Vec::new();
1988 for record in msg.all_records() {
1989 match self
1990 .cache
1991 .add_or_update(intf, record, &mut timers, is_for_us)
1992 {
1993 Some((dns_record, true)) => {
1994 timers.push(dns_record.get_record().get_expire_time());
1995 timers.push(dns_record.get_record().get_refresh_time());
1996
1997 let ty = dns_record.get_type();
1998 let name = dns_record.get_name();
1999
2000 if ty == RRType::PTR && dns_record.get_record().get_ttl() > 1 {
2002 if self.service_queriers.contains_key(name) {
2003 timers.push(dns_record.get_record().get_refresh_time());
2004 }
2005
2006 if let Some(dns_ptr) = dns_record.any().downcast_ref::<DnsPointer>() {
2008 call_service_listener(
2009 &self.service_queriers,
2010 name,
2011 ServiceEvent::ServiceFound(
2012 name.to_string(),
2013 dns_ptr.alias().to_string(),
2014 ),
2015 );
2016 changes.push(InstanceChange {
2017 ty,
2018 name: dns_ptr.alias().to_string(),
2019 });
2020 }
2021 } else {
2022 changes.push(InstanceChange {
2023 ty,
2024 name: name.to_string(),
2025 });
2026 }
2027 }
2028 Some((dns_record, false)) => {
2029 timers.push(dns_record.get_record().get_expire_time());
2030 timers.push(dns_record.get_record().get_refresh_time());
2031 }
2032 _ => {}
2033 }
2034 }
2035
2036 for t in timers {
2038 self.add_timer(t);
2039 }
2040
2041 for change in changes
2043 .iter()
2044 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2045 {
2046 let addr_map = self.cache.get_addresses_for_host(&change.name);
2047 for (name, addresses) in addr_map {
2048 call_hostname_resolution_listener(
2049 &self.hostname_resolvers,
2050 &change.name,
2051 HostnameResolutionEvent::AddressesFound(name, addresses),
2052 )
2053 }
2054 }
2055
2056 let mut updated_instances = HashSet::new();
2058 for update in changes {
2059 match update.ty {
2060 RRType::PTR | RRType::SRV | RRType::TXT => {
2061 updated_instances.insert(update.name);
2062 }
2063 RRType::A | RRType::AAAA => {
2064 let instances = self.cache.get_instances_on_host(&update.name);
2065 updated_instances.extend(instances);
2066 }
2067 _ => {}
2068 }
2069 }
2070
2071 self.resolve_updated_instances(&updated_instances);
2072 }
2073
2074 fn conflict_handler(&mut self, msg: &DnsIncoming, intf: &Interface) {
2075 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2076 return;
2077 };
2078
2079 for answer in msg.answers().iter() {
2080 let mut new_records = Vec::new();
2081
2082 let name = answer.get_name();
2083 let Some(probe) = dns_registry.probing.get_mut(name) else {
2084 continue;
2085 };
2086
2087 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2089 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2090 if !valid_ip_on_intf(&answer_addr.address(), intf) {
2091 debug!(
2092 "conflict handler: answer addr {:?} not in the subnet of {:?}",
2093 answer_addr, intf
2094 );
2095 continue;
2096 }
2097 }
2098
2099 let any_match = probe.records.iter().any(|r| {
2102 r.get_type() == answer.get_type()
2103 && r.get_class() == answer.get_class()
2104 && r.rrdata_match(answer.as_ref())
2105 });
2106 if any_match {
2107 continue; }
2109 }
2110
2111 probe.records.retain(|record| {
2112 if record.get_type() == answer.get_type()
2113 && record.get_class() == answer.get_class()
2114 && !record.rrdata_match(answer.as_ref())
2115 {
2116 debug!(
2117 "found conflict name: '{name}' record: {}: {} PEER: {}",
2118 record.get_type(),
2119 record.rdata_print(),
2120 answer.rdata_print()
2121 );
2122
2123 let mut new_record = record.clone();
2126 let new_name = match record.get_type() {
2127 RRType::A => hostname_change(name),
2128 RRType::AAAA => hostname_change(name),
2129 _ => name_change(name),
2130 };
2131 new_record.get_record_mut().set_new_name(new_name);
2132 new_records.push(new_record);
2133 return false; }
2135
2136 true
2137 });
2138
2139 let create_time = current_time_millis() + fastrand::u64(0..250);
2146
2147 let waiting_services = probe.waiting_services.clone();
2148
2149 for record in new_records {
2150 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2151 self.timers.push(Reverse(create_time));
2152 }
2153
2154 dns_registry.name_changes.insert(
2156 record.get_record().get_original_name().to_string(),
2157 record.get_name().to_string(),
2158 );
2159
2160 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2161 Some(p) => p,
2162 None => {
2163 let new_probe = dns_registry
2164 .probing
2165 .entry(record.get_name().to_string())
2166 .or_insert_with(|| {
2167 debug!("conflict handler: new probe of {}", record.get_name());
2168 Probe::new(create_time)
2169 });
2170 self.timers.push(Reverse(new_probe.next_send));
2171 new_probe
2172 }
2173 };
2174
2175 debug!(
2176 "insert record with new name '{}' {} into probe",
2177 record.get_name(),
2178 record.get_type()
2179 );
2180 new_probe.insert_record(record);
2181
2182 new_probe.waiting_services.extend(waiting_services.clone());
2183 }
2184 }
2185 }
2186
2187 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2194 let mut resolved: HashSet<String> = HashSet::new();
2195 let mut unresolved: HashSet<String> = HashSet::new();
2196 let mut removed_instances = HashMap::new();
2197
2198 let now = current_time_millis();
2199
2200 for (ty_domain, records) in self.cache.all_ptr().iter() {
2201 if !self.service_queriers.contains_key(ty_domain) {
2202 continue;
2204 }
2205
2206 for record in records.iter().filter(|r| !r.expires_soon(now)) {
2207 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2208 if updated_instances.contains(dns_ptr.alias()) {
2209 if let Ok(info) =
2210 self.create_service_info_from_cache(ty_domain, dns_ptr.alias())
2211 {
2212 if info.is_ready() {
2213 debug!("call queriers to resolve {}", dns_ptr.alias());
2214 resolved.insert(dns_ptr.alias().to_string());
2215 call_service_listener(
2216 &self.service_queriers,
2217 ty_domain,
2218 ServiceEvent::ServiceResolved(info),
2219 );
2220 } else {
2221 if self.resolved.remove(dns_ptr.alias()) {
2222 removed_instances
2223 .entry(ty_domain.to_string())
2224 .or_insert_with(HashSet::new)
2225 .insert(dns_ptr.alias().to_string());
2226 }
2227 unresolved.insert(dns_ptr.alias().to_string());
2228 }
2229 }
2230 }
2231 }
2232 }
2233 }
2234
2235 for instance in resolved.drain() {
2236 self.pending_resolves.remove(&instance);
2237 self.resolved.insert(instance);
2238 }
2239
2240 for instance in unresolved.drain() {
2241 self.add_pending_resolve(instance);
2242 }
2243
2244 self.notify_service_removal(removed_instances);
2245 }
2246
2247 fn handle_query(&mut self, msg: DnsIncoming, intf: &Interface) {
2249 let sock = match self.intf_socks.get(intf) {
2250 Some(sock) => sock,
2251 None => return,
2252 };
2253 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2254
2255 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2258
2259 let Some(dns_registry) = self.dns_registry_map.get_mut(intf) else {
2260 debug!("missing dns registry for intf {}", intf.ip());
2261 return;
2262 };
2263
2264 for question in msg.questions().iter() {
2265 trace!("query question: {:?}", &question);
2266
2267 let qtype = question.entry_type();
2268
2269 if qtype == RRType::PTR {
2270 for service in self.my_services.values() {
2271 if service.get_status(intf) != ServiceStatus::Announced {
2272 continue;
2273 }
2274
2275 if question.entry_name() == service.get_type()
2276 || service
2277 .get_subtype()
2278 .as_ref()
2279 .is_some_and(|v| v == question.entry_name())
2280 {
2281 add_answer_with_additionals(&mut out, &msg, service, intf, dns_registry);
2282 } else if question.entry_name() == META_QUERY {
2283 let ptr_added = out.add_answer(
2284 &msg,
2285 DnsPointer::new(
2286 question.entry_name(),
2287 RRType::PTR,
2288 CLASS_IN,
2289 service.get_other_ttl(),
2290 service.get_type().to_string(),
2291 ),
2292 );
2293 if !ptr_added {
2294 trace!("answer was not added for meta-query {:?}", &question);
2295 }
2296 }
2297 }
2298 } else {
2299 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2301 let probe_name = question.entry_name();
2302
2303 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2304 let now = current_time_millis();
2305
2306 if probe.start_time < now {
2310 let incoming_records: Vec<_> = msg
2311 .authorities()
2312 .iter()
2313 .filter(|r| r.get_name() == probe_name)
2314 .collect();
2315
2316 probe.tiebreaking(&incoming_records, now, probe_name);
2317 }
2318 }
2319 }
2320
2321 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2322 for service in self.my_services.values() {
2323 if service.get_status(intf) != ServiceStatus::Announced {
2324 continue;
2325 }
2326
2327 let service_hostname =
2328 match dns_registry.name_changes.get(service.get_hostname()) {
2329 Some(new_name) => new_name,
2330 None => service.get_hostname(),
2331 };
2332
2333 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2334 let intf_addrs = service.get_addrs_on_intf(intf);
2335 if intf_addrs.is_empty()
2336 && (qtype == RRType::A || qtype == RRType::AAAA)
2337 {
2338 let t = match qtype {
2339 RRType::A => "TYPE_A",
2340 RRType::AAAA => "TYPE_AAAA",
2341 _ => "invalid_type",
2342 };
2343 trace!(
2344 "Cannot find valid addrs for {} response on intf {:?}",
2345 t,
2346 &intf
2347 );
2348 return;
2349 }
2350 for address in intf_addrs {
2351 out.add_answer(
2352 &msg,
2353 DnsAddress::new(
2354 service_hostname,
2355 ip_address_rr_type(&address),
2356 CLASS_IN | CLASS_CACHE_FLUSH,
2357 service.get_host_ttl(),
2358 address,
2359 intf.into(),
2360 ),
2361 );
2362 }
2363 }
2364 }
2365 }
2366
2367 let query_name = question.entry_name().to_lowercase();
2368 let service_opt = self
2369 .my_services
2370 .iter()
2371 .find(|(k, _v)| {
2372 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2373 Some(new_name) => new_name,
2374 None => k,
2375 };
2376 service_name == &query_name
2377 })
2378 .map(|(_, v)| v);
2379
2380 let Some(service) = service_opt else {
2381 continue;
2382 };
2383
2384 if service.get_status(intf) != ServiceStatus::Announced {
2385 continue;
2386 }
2387
2388 add_answer_of_service(&mut out, &msg, question.entry_name(), service, qtype, intf);
2389 }
2390 }
2391
2392 if !out.answers_count() > 0 {
2393 out.set_id(msg.id());
2394 send_dns_outgoing(&out, intf, sock);
2395
2396 self.increase_counter(Counter::Respond, 1);
2397 self.notify_monitors(DaemonEvent::Respond(intf.ip()));
2398 }
2399
2400 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2401 }
2402
2403 fn increase_counter(&mut self, counter: Counter, count: i64) {
2405 let key = counter.to_string();
2406 match self.counters.get_mut(&key) {
2407 Some(v) => *v += count,
2408 None => {
2409 self.counters.insert(key, count);
2410 }
2411 }
2412 }
2413
2414 fn set_counter(&mut self, counter: Counter, count: i64) {
2416 let key = counter.to_string();
2417 self.counters.insert(key, count);
2418 }
2419
2420 fn signal_sock_drain(&self) {
2421 let mut signal_buf = [0; 1024];
2422
2423 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2425 trace!(
2426 "signal socket recvd: {}",
2427 String::from_utf8_lossy(&signal_buf[0..sz])
2428 );
2429 }
2430 }
2431
2432 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2433 self.retransmissions.push(ReRun { next_time, command });
2434 self.add_timer(next_time);
2435 }
2436
2437 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2439 for (ty_domain, sender) in self.service_queriers.iter() {
2440 if let Some(instances) = expired.get(ty_domain) {
2441 for instance_name in instances {
2442 let event = ServiceEvent::ServiceRemoved(
2443 ty_domain.to_string(),
2444 instance_name.to_string(),
2445 );
2446 match sender.send(event) {
2447 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2448 Err(e) => debug!("Failed to send event: {}", e),
2449 }
2450 }
2451 }
2452 }
2453 }
2454
2455 fn exec_command(&mut self, command: Command, repeating: bool) {
2459 match command {
2460 Command::Browse(ty, next_delay, listener) => {
2461 self.exec_command_browse(repeating, ty, next_delay, listener);
2462 }
2463
2464 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2465 self.exec_command_resolve_hostname(
2466 repeating, hostname, next_delay, listener, timeout,
2467 );
2468 }
2469
2470 Command::Register(service_info) => {
2471 self.register_service(service_info);
2472 self.increase_counter(Counter::Register, 1);
2473 }
2474
2475 Command::RegisterResend(fullname, intf) => {
2476 trace!("register-resend service: {fullname} on {:?}", &intf.addr);
2477 self.exec_command_register_resend(fullname, intf);
2478 }
2479
2480 Command::Unregister(fullname, resp_s) => {
2481 trace!("unregister service {} repeat {}", &fullname, &repeating);
2482 self.exec_command_unregister(repeating, fullname, resp_s);
2483 }
2484
2485 Command::UnregisterResend(packet, ip) => {
2486 self.exec_command_unregister_resend(packet, ip);
2487 }
2488
2489 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2490
2491 Command::StopResolveHostname(hostname) => {
2492 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2493 }
2494
2495 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2496
2497 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
2498
2499 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
2500 Ok(()) => trace!("Sent status to the client"),
2501 Err(e) => debug!("Failed to send status: {}", e),
2502 },
2503
2504 Command::Monitor(resp_s) => {
2505 self.monitors.push(resp_s);
2506 }
2507
2508 Command::SetOption(daemon_opt) => {
2509 self.process_set_option(daemon_opt);
2510 }
2511
2512 Command::GetOption(resp_s) => {
2513 let val = DaemonOptionVal {
2514 _service_name_len_max: self.service_name_len_max,
2515 ip_check_interval: self.ip_check_interval,
2516 };
2517 if let Err(e) = resp_s.send(val) {
2518 debug!("Failed to send options: {}", e);
2519 }
2520 }
2521
2522 Command::Verify(instance_fullname, timeout) => {
2523 self.exec_command_verify(instance_fullname, timeout, repeating);
2524 }
2525
2526 _ => {
2527 debug!("unexpected command: {:?}", &command);
2528 }
2529 }
2530 }
2531
2532 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
2533 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
2534 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
2535 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
2536 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
2537 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
2538 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
2539 self.set_counter(Counter::Timer, self.timers.len() as i64);
2540
2541 let dns_registry_probe_count: usize = self
2542 .dns_registry_map
2543 .values()
2544 .map(|r| r.probing.len())
2545 .sum();
2546 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
2547
2548 let dns_registry_active_count: usize = self
2549 .dns_registry_map
2550 .values()
2551 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
2552 .sum();
2553 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
2554
2555 let dns_registry_timer_count: usize = self
2556 .dns_registry_map
2557 .values()
2558 .map(|r| r.new_timers.len())
2559 .sum();
2560 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
2561
2562 let dns_registry_name_change_count: usize = self
2563 .dns_registry_map
2564 .values()
2565 .map(|r| r.name_changes.len())
2566 .sum();
2567 self.set_counter(
2568 Counter::DnsRegistryNameChange,
2569 dns_registry_name_change_count as i64,
2570 );
2571
2572 if let Err(e) = resp_s.send(self.counters.clone()) {
2574 debug!("Failed to send metrics: {}", e);
2575 }
2576 }
2577
2578 fn exec_command_browse(
2579 &mut self,
2580 repeating: bool,
2581 ty: String,
2582 next_delay: u32,
2583 listener: Sender<ServiceEvent>,
2584 ) {
2585 let pretty_addrs: Vec<String> = self
2586 .intf_socks
2587 .keys()
2588 .map(|itf| format!("{} ({})", itf.ip(), itf.name))
2589 .collect();
2590
2591 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
2592 "{ty} on {} interfaces [{}]",
2593 pretty_addrs.len(),
2594 pretty_addrs.join(", ")
2595 ))) {
2596 debug!(
2597 "Failed to send SearchStarted({})(repeating:{}): {}",
2598 &ty, repeating, e
2599 );
2600 return;
2601 }
2602
2603 let now = current_time_millis();
2604 if !repeating {
2605 self.service_queriers.insert(ty.clone(), listener.clone());
2609
2610 self.query_cache_for_service(&ty, &listener, now);
2612 }
2613
2614 self.send_query(&ty, RRType::PTR);
2615 self.increase_counter(Counter::Browse, 1);
2616
2617 let next_time = now + (next_delay * 1000) as u64;
2618 let max_delay = 60 * 60;
2619 let delay = cmp::min(next_delay * 2, max_delay);
2620 self.add_retransmission(next_time, Command::Browse(ty, delay, listener));
2621 }
2622
2623 fn exec_command_resolve_hostname(
2624 &mut self,
2625 repeating: bool,
2626 hostname: String,
2627 next_delay: u32,
2628 listener: Sender<HostnameResolutionEvent>,
2629 timeout: Option<u64>,
2630 ) {
2631 let addr_list: Vec<_> = self.intf_socks.keys().collect();
2632 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
2633 "{} on addrs {:?}",
2634 &hostname, &addr_list
2635 ))) {
2636 debug!(
2637 "Failed to send ResolveStarted({})(repeating:{}): {}",
2638 &hostname, repeating, e
2639 );
2640 return;
2641 }
2642 if !repeating {
2643 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
2644 self.query_cache_for_hostname(&hostname, listener.clone());
2646 }
2647
2648 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
2649 self.increase_counter(Counter::ResolveHostname, 1);
2650
2651 let now = current_time_millis();
2652 let next_time = now + u64::from(next_delay) * 1000;
2653 let max_delay = 60 * 60;
2654 let delay = cmp::min(next_delay * 2, max_delay);
2655
2656 if self
2658 .hostname_resolvers
2659 .get(&hostname)
2660 .and_then(|(_sender, timeout)| *timeout)
2661 .map(|timeout| next_time < timeout)
2662 .unwrap_or(true)
2663 {
2664 self.add_retransmission(
2665 next_time,
2666 Command::ResolveHostname(hostname, delay, listener, None),
2667 );
2668 }
2669 }
2670
2671 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
2672 let pending_query = self.query_unresolved(&instance);
2673 let max_try = 3;
2674 if pending_query && try_count < max_try {
2675 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2678 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
2679 }
2680 }
2681
2682 fn exec_command_unregister(
2683 &mut self,
2684 repeating: bool,
2685 fullname: String,
2686 resp_s: Sender<UnregisterStatus>,
2687 ) {
2688 let response = match self.my_services.remove_entry(&fullname) {
2689 None => {
2690 debug!("unregister: cannot find such service {}", &fullname);
2691 UnregisterStatus::NotFound
2692 }
2693 Some((_k, info)) => {
2694 let mut timers = Vec::new();
2695 let mut multicast_sent_trackers = HashSet::new();
2697
2698 for (intf, sock) in self.intf_socks.iter() {
2699 if let Some(tracker) = multicast_send_tracker(intf) {
2700 if multicast_sent_trackers.contains(&tracker) {
2701 continue; }
2703 multicast_sent_trackers.insert(tracker);
2704 }
2705 let packet = self.unregister_service(&info, intf, sock);
2706 if !repeating && !packet.is_empty() {
2708 let next_time = current_time_millis() + 120;
2709 self.retransmissions.push(ReRun {
2710 next_time,
2711 command: Command::UnregisterResend(packet, intf.clone()),
2712 });
2713 timers.push(next_time);
2714 }
2715 }
2716
2717 for t in timers {
2718 self.add_timer(t);
2719 }
2720
2721 self.increase_counter(Counter::Unregister, 1);
2722 UnregisterStatus::OK
2723 }
2724 };
2725 if let Err(e) = resp_s.send(response) {
2726 debug!("unregister: failed to send response: {}", e);
2727 }
2728 }
2729
2730 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, intf: Interface) {
2731 if let Some(sock) = self.intf_socks.get(&intf) {
2732 debug!("UnregisterResend from {}", &intf.ip());
2733 multicast_on_intf(&packet[..], &intf, sock);
2734 self.increase_counter(Counter::UnregisterResend, 1);
2735 }
2736 }
2737
2738 fn exec_command_stop_browse(&mut self, ty_domain: String) {
2739 match self.service_queriers.remove_entry(&ty_domain) {
2740 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
2741 Some((ty, sender)) => {
2742 trace!("StopBrowse: removed queryer for {}", &ty);
2744 let mut i = 0;
2745 while i < self.retransmissions.len() {
2746 if let Command::Browse(t, _, _) = &self.retransmissions[i].command {
2747 if t == &ty {
2748 self.retransmissions.remove(i);
2749 trace!("StopBrowse: removed retransmission for {}", &ty);
2750 continue;
2751 }
2752 }
2753 i += 1;
2754 }
2755
2756 self.cache.remove_service_type(&ty_domain);
2758
2759 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
2761 Ok(()) => trace!("Sent SearchStopped to the listener"),
2762 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2763 }
2764 }
2765 }
2766 }
2767
2768 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
2769 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
2770 trace!("StopResolve: removed queryer for {}", &host);
2772 let mut i = 0;
2773 while i < self.retransmissions.len() {
2774 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
2775 if t == &host {
2776 self.retransmissions.remove(i);
2777 trace!("StopResolve: removed retransmission for {}", &host);
2778 continue;
2779 }
2780 }
2781 i += 1;
2782 }
2783
2784 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
2786 Ok(()) => trace!("Sent SearchStopped to the listener"),
2787 Err(e) => debug!("Failed to send SearchStopped: {}", e),
2788 }
2789 }
2790 }
2791
2792 fn exec_command_register_resend(&mut self, fullname: String, intf: Interface) {
2793 let Some(info) = self.my_services.get_mut(&fullname) else {
2794 trace!("announce: cannot find such service {}", &fullname);
2795 return;
2796 };
2797
2798 let Some(dns_registry) = self.dns_registry_map.get_mut(&intf) else {
2799 return;
2800 };
2801
2802 let Some(sock) = self.intf_socks.get(&intf) else {
2803 return;
2804 };
2805
2806 if announce_service_on_intf(dns_registry, info, &intf, sock) {
2807 let mut hostname = info.get_hostname();
2808 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2809 hostname = new_name;
2810 }
2811 let service_name = match dns_registry.name_changes.get(&fullname) {
2812 Some(new_name) => new_name.to_string(),
2813 None => fullname,
2814 };
2815
2816 debug!("resend: announce service {} on {}", service_name, intf.ip());
2817
2818 notify_monitors(
2819 &mut self.monitors,
2820 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.ip())),
2821 );
2822 info.set_status(&intf, ServiceStatus::Announced);
2823 } else {
2824 debug!("register-resend should not fail");
2825 }
2826
2827 self.increase_counter(Counter::RegisterResend, 1);
2828 }
2829
2830 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
2831 let now = current_time_millis();
2841 let expire_at = if repeating {
2842 None
2843 } else {
2844 Some(now + timeout.as_millis() as u64)
2845 };
2846
2847 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
2849
2850 if !record_vec.is_empty() {
2851 let query_vec: Vec<(&str, RRType)> = record_vec
2852 .iter()
2853 .map(|(record, rr_type)| (record.as_str(), *rr_type))
2854 .collect();
2855 self.send_query_vec(&query_vec);
2856
2857 if let Some(new_expire) = expire_at {
2858 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
2862 }
2863 }
2864 }
2865
2866 fn refresh_active_services(&mut self) {
2868 let mut query_ptr_count = 0;
2869 let mut query_srv_count = 0;
2870 let mut new_timers = HashSet::new();
2871 let mut query_addr_count = 0;
2872
2873 for (ty_domain, _sender) in self.service_queriers.iter() {
2874 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
2875 if !refreshed_timers.is_empty() {
2876 trace!("sending refresh query for PTR: {}", ty_domain);
2877 self.send_query(ty_domain, RRType::PTR);
2878 query_ptr_count += 1;
2879 new_timers.extend(refreshed_timers);
2880 }
2881
2882 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
2883 for (instance, types) in instances {
2884 trace!("sending refresh query for: {}", &instance);
2885 let query_vec = types
2886 .into_iter()
2887 .map(|ty| (instance.as_str(), ty))
2888 .collect::<Vec<_>>();
2889 self.send_query_vec(&query_vec);
2890 query_srv_count += 1;
2891 }
2892 new_timers.extend(timers);
2893 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
2894 for hostname in hostnames.iter() {
2895 trace!("sending refresh queries for A and AAAA: {}", hostname);
2896 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
2897 query_addr_count += 2;
2898 }
2899 new_timers.extend(timers);
2900 }
2901
2902 for timer in new_timers {
2903 self.add_timer(timer);
2904 }
2905
2906 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
2907 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
2908 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
2909 }
2910}
2911
2912fn add_answer_of_service(
2914 out: &mut DnsOutgoing,
2915 msg: &DnsIncoming,
2916 entry_name: &str,
2917 service: &ServiceInfo,
2918 qtype: RRType,
2919 intf: &Interface,
2920) {
2921 if qtype == RRType::SRV || qtype == RRType::ANY {
2922 out.add_answer(
2923 msg,
2924 DnsSrv::new(
2925 entry_name,
2926 CLASS_IN | CLASS_CACHE_FLUSH,
2927 service.get_host_ttl(),
2928 service.get_priority(),
2929 service.get_weight(),
2930 service.get_port(),
2931 service.get_hostname().to_string(),
2932 ),
2933 );
2934 }
2935
2936 if qtype == RRType::TXT || qtype == RRType::ANY {
2937 out.add_answer(
2938 msg,
2939 DnsTxt::new(
2940 entry_name,
2941 CLASS_IN | CLASS_CACHE_FLUSH,
2942 service.get_other_ttl(),
2943 service.generate_txt(),
2944 ),
2945 );
2946 }
2947
2948 if qtype == RRType::SRV {
2949 let intf_addrs = service.get_addrs_on_intf(intf);
2950 if intf_addrs.is_empty() {
2951 debug!(
2952 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2953 &intf
2954 );
2955 return;
2956 }
2957 for address in intf_addrs {
2958 out.add_additional_answer(DnsAddress::new(
2959 service.get_hostname(),
2960 ip_address_rr_type(&address),
2961 CLASS_IN | CLASS_CACHE_FLUSH,
2962 service.get_host_ttl(),
2963 address,
2964 intf.into(),
2965 ));
2966 }
2967 }
2968}
2969
2970#[derive(Clone, Debug)]
2973pub enum ServiceEvent {
2974 SearchStarted(String),
2976
2977 ServiceFound(String, String),
2979
2980 ServiceResolved(ServiceInfo),
2982
2983 ServiceRemoved(String, String),
2985
2986 SearchStopped(String),
2988}
2989
2990#[derive(Clone, Debug)]
2993#[non_exhaustive]
2994pub enum HostnameResolutionEvent {
2995 SearchStarted(String),
2997 AddressesFound(String, HashSet<IpAddr>),
2999 AddressesRemoved(String, HashSet<IpAddr>),
3001 SearchTimeout(String),
3003 SearchStopped(String),
3005}
3006
3007#[derive(Clone, Debug)]
3010#[non_exhaustive]
3011pub enum DaemonEvent {
3012 Announce(String, String),
3014
3015 Error(Error),
3017
3018 IpAdd(IpAddr),
3020
3021 IpDel(IpAddr),
3023
3024 NameChange(DnsNameChange),
3027
3028 Respond(IpAddr),
3030}
3031
3032#[derive(Clone, Debug)]
3035pub struct DnsNameChange {
3036 pub original: String,
3038
3039 pub new_name: String,
3049
3050 pub rr_type: RRType,
3052
3053 pub intf_name: String,
3055}
3056
3057#[derive(Debug)]
3059enum Command {
3060 Browse(String, u32, Sender<ServiceEvent>),
3062
3063 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(ServiceInfo),
3068
3069 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, Interface), UnregisterResend(Vec<u8>, Interface), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3090
3091 GetStatus(Sender<DaemonStatus>),
3093
3094 Monitor(Sender<DaemonEvent>),
3096
3097 SetOption(DaemonOption),
3098
3099 GetOption(Sender<DaemonOptionVal>),
3100
3101 Verify(String, Duration),
3106
3107 Exit(Sender<DaemonStatus>),
3108}
3109
3110impl fmt::Display for Command {
3111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3112 match self {
3113 Self::Browse(_, _, _) => write!(f, "Command Browse"),
3114 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3115 Self::Exit(_) => write!(f, "Command Exit"),
3116 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3117 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3118 Self::Monitor(_) => write!(f, "Command Monitor"),
3119 Self::Register(_) => write!(f, "Command Register"),
3120 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3121 Self::SetOption(_) => write!(f, "Command SetOption"),
3122 Self::GetOption(_) => write!(f, "Command GetOption"),
3123 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3124 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3125 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3126 Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
3127 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3128 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3129 }
3130 }
3131}
3132
3133struct DaemonOptionVal {
3134 _service_name_len_max: u8,
3135 ip_check_interval: u64,
3136}
3137
3138#[derive(Debug)]
3139enum DaemonOption {
3140 ServiceNameLenMax(u8),
3141 IpCheckInterval(u64),
3142 EnableInterface(Vec<IfKind>),
3143 DisableInterface(Vec<IfKind>),
3144 MulticastLoopV4(bool),
3145 MulticastLoopV6(bool),
3146}
3147
3148const DOMAIN_LEN: usize = "._tcp.local.".len();
3150
3151fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3153 if ty_domain.len() <= DOMAIN_LEN + 1 {
3154 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3156 }
3157
3158 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3160 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3161 }
3162 Ok(())
3163}
3164
3165fn check_domain_suffix(name: &str) -> Result<()> {
3167 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3168 return Err(e_fmt!(
3169 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3170 name
3171 ));
3172 }
3173
3174 Ok(())
3175}
3176
3177fn check_service_name(fullname: &str) -> Result<()> {
3185 check_domain_suffix(fullname)?;
3186
3187 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3188 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3189
3190 if &name[0..1] != "_" {
3191 return Err(e_fmt!("Service name must start with '_'"));
3192 }
3193
3194 let name = &name[1..];
3195
3196 if name.contains("--") {
3197 return Err(e_fmt!("Service name must not contain '--'"));
3198 }
3199
3200 if name.starts_with('-') || name.ends_with('-') {
3201 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3202 }
3203
3204 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3205 if ascii_count < 1 {
3206 return Err(e_fmt!(
3207 "Service name must contain at least one letter (eg: 'A-Za-z')"
3208 ));
3209 }
3210
3211 Ok(())
3212}
3213
3214fn check_hostname(hostname: &str) -> Result<()> {
3216 if !hostname.ends_with(".local.") {
3217 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3218 }
3219
3220 if hostname == ".local." {
3221 return Err(e_fmt!(
3222 "The part of the hostname before '.local.' cannot be empty"
3223 ));
3224 }
3225
3226 if hostname.len() > 255 {
3227 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3228 }
3229
3230 Ok(())
3231}
3232
3233fn call_service_listener(
3234 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3235 ty_domain: &str,
3236 event: ServiceEvent,
3237) {
3238 if let Some(listener) = listeners_map.get(ty_domain) {
3239 match listener.send(event) {
3240 Ok(()) => trace!("Sent event to listener successfully"),
3241 Err(e) => debug!("Failed to send event: {}", e),
3242 }
3243 }
3244}
3245
3246fn call_hostname_resolution_listener(
3247 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3248 hostname: &str,
3249 event: HostnameResolutionEvent,
3250) {
3251 let hostname_lower = hostname.to_lowercase();
3252 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3253 match listener.send(event) {
3254 Ok(()) => trace!("Sent event to listener successfully"),
3255 Err(e) => debug!("Failed to send event: {}", e),
3256 }
3257 }
3258}
3259
3260fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3263 if_addrs::get_if_addrs()
3264 .unwrap_or_default()
3265 .into_iter()
3266 .filter(|i| !i.is_loopback() || with_loopback)
3267 .collect()
3268}
3269
3270fn send_dns_outgoing(out: &DnsOutgoing, intf: &Interface, sock: &MioUdpSocket) -> Vec<Vec<u8>> {
3272 let qtype = if out.is_query() { "query" } else { "response" };
3273 trace!(
3274 "send outgoing {}: {} questions {} answers {} authorities {} additional",
3275 qtype,
3276 out.questions().len(),
3277 out.answers_count(),
3278 out.authorities().len(),
3279 out.additionals().len()
3280 );
3281 let packet_list = out.to_data_on_wire();
3282 for packet in packet_list.iter() {
3283 multicast_on_intf(packet, intf, sock);
3284 }
3285 packet_list
3286}
3287
3288fn multicast_on_intf(packet: &[u8], intf: &Interface, socket: &MioUdpSocket) {
3290 if packet.len() > MAX_MSG_ABSOLUTE {
3291 debug!("Drop over-sized packet ({})", packet.len());
3292 return;
3293 }
3294
3295 let addr: SocketAddr = match intf.addr {
3296 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into(),
3297 if_addrs::IfAddr::V6(_) => {
3298 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, MDNS_PORT, 0, 0);
3299 sock.set_scope_id(intf.index.unwrap_or(0)); sock.into()
3301 }
3302 };
3303
3304 send_packet(packet, addr, intf, socket);
3305}
3306
3307fn send_packet(packet: &[u8], addr: SocketAddr, intf: &Interface, sock: &MioUdpSocket) {
3309 match sock.send_to(packet, addr) {
3310 Ok(sz) => trace!("sent out {} bytes on interface {:?}", sz, intf),
3311 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &intf, e),
3312 }
3313}
3314
3315fn valid_instance_name(name: &str) -> bool {
3319 name.split('.').count() >= 5
3320}
3321
3322fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3323 monitors.retain(|sender| {
3324 if let Err(e) = sender.try_send(event.clone()) {
3325 debug!("notify_monitors: try_send: {}", &e);
3326 if matches!(e, TrySendError::Disconnected(_)) {
3327 return false; }
3329 }
3330 true
3331 });
3332}
3333
3334fn prepare_announce(
3337 info: &ServiceInfo,
3338 intf: &Interface,
3339 dns_registry: &mut DnsRegistry,
3340) -> Option<DnsOutgoing> {
3341 let intf_addrs = info.get_addrs_on_intf(intf);
3342 if intf_addrs.is_empty() {
3343 trace!("No valid addrs to add on intf {:?}", &intf);
3344 return None;
3345 }
3346
3347 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3349 Some(new_name) => new_name,
3350 None => info.get_fullname(),
3351 };
3352
3353 debug!(
3354 "prepare to announce service {service_fullname} on {}: {}",
3355 &intf.name,
3356 &intf.ip()
3357 );
3358
3359 let mut probing_count = 0;
3360 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3361 let create_time = current_time_millis() + fastrand::u64(0..250);
3362
3363 out.add_answer_at_time(
3364 DnsPointer::new(
3365 info.get_type(),
3366 RRType::PTR,
3367 CLASS_IN,
3368 info.get_other_ttl(),
3369 service_fullname.to_string(),
3370 ),
3371 0,
3372 );
3373
3374 if let Some(sub) = info.get_subtype() {
3375 trace!("Adding subdomain {}", sub);
3376 out.add_answer_at_time(
3377 DnsPointer::new(
3378 sub,
3379 RRType::PTR,
3380 CLASS_IN,
3381 info.get_other_ttl(),
3382 service_fullname.to_string(),
3383 ),
3384 0,
3385 );
3386 }
3387
3388 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
3390 Some(new_name) => new_name.to_string(),
3391 None => info.get_hostname().to_string(),
3392 };
3393
3394 let mut srv = DnsSrv::new(
3395 info.get_fullname(),
3396 CLASS_IN | CLASS_CACHE_FLUSH,
3397 info.get_host_ttl(),
3398 info.get_priority(),
3399 info.get_weight(),
3400 info.get_port(),
3401 hostname,
3402 );
3403
3404 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3405 srv.get_record_mut().set_new_name(new_name.to_string());
3406 }
3407
3408 if !info.requires_probe()
3409 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
3410 {
3411 out.add_answer_at_time(srv, 0);
3412 } else {
3413 probing_count += 1;
3414 }
3415
3416 let mut txt = DnsTxt::new(
3419 info.get_fullname(),
3420 CLASS_IN | CLASS_CACHE_FLUSH,
3421 info.get_other_ttl(),
3422 info.generate_txt(),
3423 );
3424
3425 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
3426 txt.get_record_mut().set_new_name(new_name.to_string());
3427 }
3428
3429 if !info.requires_probe()
3430 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
3431 {
3432 out.add_answer_at_time(txt, 0);
3433 } else {
3434 probing_count += 1;
3435 }
3436
3437 let hostname = info.get_hostname();
3440 for address in intf_addrs {
3441 let mut dns_addr = DnsAddress::new(
3442 hostname,
3443 ip_address_rr_type(&address),
3444 CLASS_IN | CLASS_CACHE_FLUSH,
3445 info.get_host_ttl(),
3446 address,
3447 intf.into(),
3448 );
3449
3450 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3451 dns_addr.get_record_mut().set_new_name(new_name.to_string());
3452 }
3453
3454 if !info.requires_probe()
3455 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
3456 {
3457 out.add_answer_at_time(dns_addr, 0);
3458 } else {
3459 probing_count += 1;
3460 }
3461 }
3462
3463 if probing_count > 0 {
3464 return None;
3465 }
3466
3467 Some(out)
3468}
3469
3470fn announce_service_on_intf(
3473 dns_registry: &mut DnsRegistry,
3474 info: &ServiceInfo,
3475 intf: &Interface,
3476 sock: &MioUdpSocket,
3477) -> bool {
3478 if let Some(out) = prepare_announce(info, intf, dns_registry) {
3479 send_dns_outgoing(&out, intf, sock);
3480 return true;
3481 }
3482 false
3483}
3484
3485fn name_change(original: &str) -> String {
3493 let mut parts: Vec<_> = original.split('.').collect();
3494 let Some(first_part) = parts.get_mut(0) else {
3495 return format!("{original} (2)");
3496 };
3497
3498 let mut new_name = format!("{} (2)", first_part);
3499
3500 if let Some(paren_pos) = first_part.rfind(" (") {
3502 if let Some(end_paren) = first_part[paren_pos..].find(')') {
3504 let absolute_end_pos = paren_pos + end_paren;
3505 if absolute_end_pos == first_part.len() - 1 {
3507 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
3510 let base_name = &first_part[..paren_pos];
3511 new_name = format!("{} ({})", base_name, number + 1)
3512 }
3513 }
3514 }
3515 }
3516
3517 *first_part = &new_name;
3518 parts.join(".")
3519}
3520
3521fn hostname_change(original: &str) -> String {
3529 let mut parts: Vec<_> = original.split('.').collect();
3530 let Some(first_part) = parts.get_mut(0) else {
3531 return format!("{original}-2");
3532 };
3533
3534 let mut new_name = format!("{}-2", first_part);
3535
3536 if let Some(hyphen_pos) = first_part.rfind('-') {
3538 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
3540 let base_name = &first_part[..hyphen_pos];
3541 new_name = format!("{}-{}", base_name, number + 1);
3542 }
3543 }
3544
3545 *first_part = &new_name;
3546 parts.join(".")
3547}
3548
3549fn add_answer_with_additionals(
3550 out: &mut DnsOutgoing,
3551 msg: &DnsIncoming,
3552 service: &ServiceInfo,
3553 intf: &Interface,
3554 dns_registry: &DnsRegistry,
3555) {
3556 let intf_addrs = service.get_addrs_on_intf(intf);
3557 if intf_addrs.is_empty() {
3558 trace!("No addrs on LAN of intf {:?}", intf);
3559 return;
3560 }
3561
3562 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
3564 Some(new_name) => new_name,
3565 None => service.get_fullname(),
3566 };
3567
3568 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
3569 Some(new_name) => new_name,
3570 None => service.get_hostname(),
3571 };
3572
3573 let ptr_added = out.add_answer(
3574 msg,
3575 DnsPointer::new(
3576 service.get_type(),
3577 RRType::PTR,
3578 CLASS_IN,
3579 service.get_other_ttl(),
3580 service_fullname.to_string(),
3581 ),
3582 );
3583
3584 if !ptr_added {
3585 trace!("answer was not added for msg {:?}", msg);
3586 return;
3587 }
3588
3589 if let Some(sub) = service.get_subtype() {
3590 trace!("Adding subdomain {}", sub);
3591 out.add_additional_answer(DnsPointer::new(
3592 sub,
3593 RRType::PTR,
3594 CLASS_IN,
3595 service.get_other_ttl(),
3596 service_fullname.to_string(),
3597 ));
3598 }
3599
3600 out.add_additional_answer(DnsSrv::new(
3603 service_fullname,
3604 CLASS_IN | CLASS_CACHE_FLUSH,
3605 service.get_host_ttl(),
3606 service.get_priority(),
3607 service.get_weight(),
3608 service.get_port(),
3609 hostname.to_string(),
3610 ));
3611
3612 out.add_additional_answer(DnsTxt::new(
3613 service_fullname,
3614 CLASS_IN | CLASS_CACHE_FLUSH,
3615 service.get_other_ttl(),
3616 service.generate_txt(),
3617 ));
3618
3619 for address in intf_addrs {
3620 out.add_additional_answer(DnsAddress::new(
3621 hostname,
3622 ip_address_rr_type(&address),
3623 CLASS_IN | CLASS_CACHE_FLUSH,
3624 service.get_host_ttl(),
3625 address,
3626 intf.into(),
3627 ));
3628 }
3629}
3630
3631fn check_probing(
3634 dns_registry: &mut DnsRegistry,
3635 timers: &mut BinaryHeap<Reverse<u64>>,
3636 now: u64,
3637) -> (DnsOutgoing, Vec<String>) {
3638 let mut expired_probes = Vec::new();
3639 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
3640
3641 for (name, probe) in dns_registry.probing.iter_mut() {
3642 if now >= probe.next_send {
3643 if probe.expired(now) {
3644 expired_probes.push(name.clone());
3646 } else {
3647 out.add_question(name, RRType::ANY);
3648
3649 for record in probe.records.iter() {
3657 out.add_authority(record.clone());
3658 }
3659
3660 probe.update_next_send(now);
3661
3662 timers.push(Reverse(probe.next_send));
3664 }
3665 }
3666 }
3667
3668 (out, expired_probes)
3669}
3670
3671fn handle_expired_probes(
3676 expired_probes: Vec<String>,
3677 intf_name: &str,
3678 dns_registry: &mut DnsRegistry,
3679 monitors: &mut Vec<Sender<DaemonEvent>>,
3680) -> HashSet<String> {
3681 let mut waiting_services = HashSet::new();
3682
3683 for name in expired_probes {
3684 let Some(probe) = dns_registry.probing.remove(&name) else {
3685 continue;
3686 };
3687
3688 for record in probe.records.iter() {
3690 if let Some(new_name) = record.get_record().get_new_name() {
3691 dns_registry
3692 .name_changes
3693 .insert(name.clone(), new_name.to_string());
3694
3695 let event = DnsNameChange {
3696 original: record.get_record().get_original_name().to_string(),
3697 new_name: new_name.to_string(),
3698 rr_type: record.get_type(),
3699 intf_name: intf_name.to_string(),
3700 };
3701 notify_monitors(monitors, DaemonEvent::NameChange(event));
3702 }
3703 }
3704
3705 debug!(
3707 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
3708 probe.records.len(),
3709 probe.waiting_services.len(),
3710 );
3711
3712 if !probe.records.is_empty() {
3714 match dns_registry.active.get_mut(&name) {
3715 Some(records) => {
3716 records.extend(probe.records);
3717 }
3718 None => {
3719 dns_registry.active.insert(name, probe.records);
3720 }
3721 }
3722
3723 waiting_services.extend(probe.waiting_services);
3724 }
3725 }
3726
3727 waiting_services
3728}
3729
3730#[cfg(test)]
3731mod tests {
3732 use super::{
3733 check_domain_suffix, check_service_name_length, hostname_change, my_ip_interfaces,
3734 name_change, new_socket_bind, send_dns_outgoing, valid_instance_name,
3735 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, GROUP_ADDR_V4,
3736 MDNS_PORT,
3737 };
3738 use crate::{
3739 dns_parser::{
3740 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, CLASS_IN, FLAGS_AA,
3741 FLAGS_QR_RESPONSE,
3742 },
3743 service_daemon::{add_answer_of_service, check_hostname},
3744 };
3745 use std::{
3746 net::{SocketAddr, SocketAddrV4},
3747 time::Duration,
3748 };
3749 use test_log::test;
3750
3751 #[test]
3752 fn test_socketaddr_print() {
3753 let addr: SocketAddr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
3754 let print = format!("{}", addr);
3755 assert_eq!(print, "224.0.0.251:5353");
3756 }
3757
3758 #[test]
3759 fn test_instance_name() {
3760 assert!(valid_instance_name("my-laser._printer._tcp.local."));
3761 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
3762 assert!(!valid_instance_name("_printer._tcp.local."));
3763 }
3764
3765 #[test]
3766 fn test_check_service_name_length() {
3767 let result = check_service_name_length("_tcp", 100);
3768 assert!(result.is_err());
3769 if let Err(e) = result {
3770 println!("{}", e);
3771 }
3772 }
3773
3774 #[test]
3775 fn test_check_hostname() {
3776 for hostname in &[
3778 "my_host.local.",
3779 &("A".repeat(255 - ".local.".len()) + ".local."),
3780 ] {
3781 let result = check_hostname(hostname);
3782 assert!(result.is_ok());
3783 }
3784
3785 for hostname in &[
3787 "my_host.local",
3788 ".local.",
3789 &("A".repeat(256 - ".local.".len()) + ".local."),
3790 ] {
3791 let result = check_hostname(hostname);
3792 assert!(result.is_err());
3793 if let Err(e) = result {
3794 println!("{}", e);
3795 }
3796 }
3797 }
3798
3799 #[test]
3800 fn test_check_domain_suffix() {
3801 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
3802 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
3803 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
3804 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
3805 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
3806 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
3807 }
3808
3809 #[test]
3810 fn test_service_with_temporarily_invalidated_ptr() {
3811 let d = ServiceDaemon::new().expect("Failed to create daemon");
3813
3814 let service = "_test_inval_ptr._udp.local.";
3815 let host_name = "my_host_tmp_invalidated_ptr.local.";
3816 let intfs: Vec<_> = my_ip_interfaces(false);
3817 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
3818 let port = 5201;
3819 let my_service =
3820 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
3821 .expect("invalid service info")
3822 .enable_addr_auto();
3823 let result = d.register(my_service.clone());
3824 assert!(result.is_ok());
3825
3826 let browse_chan = d.browse(service).unwrap();
3828 let timeout = Duration::from_secs(2);
3829 let mut resolved = false;
3830
3831 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3832 match event {
3833 ServiceEvent::ServiceResolved(info) => {
3834 resolved = true;
3835 println!("Resolved a service of {}", &info.get_fullname());
3836 break;
3837 }
3838 e => {
3839 println!("Received event {:?}", e);
3840 }
3841 }
3842 }
3843
3844 assert!(resolved);
3845
3846 println!("Stopping browse of {}", service);
3847 d.stop_browse(service).unwrap();
3850
3851 let mut stopped = false;
3856 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3857 match event {
3858 ServiceEvent::SearchStopped(_) => {
3859 stopped = true;
3860 println!("Stopped browsing service");
3861 break;
3862 }
3863 e => {
3867 println!("Received event {:?}", e);
3868 }
3869 }
3870 }
3871
3872 assert!(stopped);
3873
3874 let invalidate_ptr_packet = DnsPointer::new(
3876 my_service.get_type(),
3877 RRType::PTR,
3878 CLASS_IN,
3879 0,
3880 my_service.get_fullname().to_string(),
3881 );
3882
3883 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3884 packet_buffer.add_additional_answer(invalidate_ptr_packet);
3885
3886 for intf in intfs {
3887 let sock = new_socket_bind(&intf, true).unwrap();
3888 send_dns_outgoing(&packet_buffer, &intf, &sock);
3889 }
3890
3891 println!(
3892 "Sent PTR record invalidation. Starting second browse for {}",
3893 service
3894 );
3895
3896 let browse_chan = d.browse(service).unwrap();
3898
3899 resolved = false;
3900 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3901 match event {
3902 ServiceEvent::ServiceResolved(info) => {
3903 resolved = true;
3904 println!("Resolved a service of {}", &info.get_fullname());
3905 break;
3906 }
3907 e => {
3908 println!("Received event {:?}", e);
3909 }
3910 }
3911 }
3912
3913 assert!(resolved);
3914 d.shutdown().unwrap();
3915 }
3916
3917 #[test]
3918 fn test_expired_srv() {
3919 let service_type = "_expired-srv._udp.local.";
3921 let instance = "test_instance";
3922 let host_name = "expired_srv_host.local.";
3923 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
3924 .unwrap()
3925 .enable_addr_auto();
3926 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
3931
3932 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
3934 let result = mdns_server.register(my_service);
3935 assert!(result.is_ok());
3936
3937 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
3938 let browse_chan = mdns_client.browse(service_type).unwrap();
3939 let timeout = Duration::from_secs(2);
3940 let mut resolved = false;
3941
3942 while let Ok(event) = browse_chan.recv_timeout(timeout) {
3943 match event {
3944 ServiceEvent::ServiceResolved(info) => {
3945 resolved = true;
3946 println!("Resolved a service of {}", &info.get_fullname());
3947 break;
3948 }
3949 _ => {}
3950 }
3951 }
3952
3953 assert!(resolved);
3954
3955 mdns_server.shutdown().unwrap();
3957
3958 let expire_timeout = Duration::from_secs(new_ttl as u64);
3960 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
3961 match event {
3962 ServiceEvent::ServiceRemoved(service_type, full_name) => {
3963 println!("Service removed: {}: {}", &service_type, &full_name);
3964 break;
3965 }
3966 _ => {}
3967 }
3968 }
3969 }
3970
3971 #[test]
3972 fn test_hostname_resolution_address_removed() {
3973 let server = ServiceDaemon::new().expect("Failed to create server");
3975 let hostname = "addr_remove_host._tcp.local.";
3976 let service_ip_addr = my_ip_interfaces(false)
3977 .iter()
3978 .find(|iface| iface.ip().is_ipv4())
3979 .map(|iface| iface.ip())
3980 .unwrap();
3981
3982 let mut my_service = ServiceInfo::new(
3983 "_host_res_test._tcp.local.",
3984 "my_instance",
3985 hostname,
3986 &service_ip_addr,
3987 1234,
3988 None,
3989 )
3990 .expect("invalid service info");
3991
3992 let addr_ttl = 2;
3994 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
3997
3998 let client = ServiceDaemon::new().expect("Failed to create client");
4000 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4001 let resolved = loop {
4002 match event_receiver.recv() {
4003 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4004 assert!(found_hostname == hostname);
4005 assert!(addresses.contains(&service_ip_addr));
4006 println!("address found: {:?}", &addresses);
4007 break true;
4008 }
4009 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4010 Ok(_event) => {}
4011 Err(_) => break false,
4012 }
4013 };
4014
4015 assert!(resolved);
4016
4017 server.shutdown().unwrap();
4019
4020 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4022 let removed = loop {
4023 match event_receiver.recv_timeout(timeout) {
4024 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4025 assert!(removed_host == hostname);
4026 assert!(addresses.contains(&service_ip_addr));
4027
4028 println!(
4029 "address removed: hostname: {} addresses: {:?}",
4030 &hostname, &addresses
4031 );
4032 break true;
4033 }
4034 Ok(_event) => {}
4035 Err(_) => {
4036 break false;
4037 }
4038 }
4039 };
4040
4041 assert!(removed);
4042
4043 client.shutdown().unwrap();
4044 }
4045
4046 #[test]
4047 fn test_refresh_ptr() {
4048 let service_type = "_refresh-ptr._udp.local.";
4050 let instance = "test_instance";
4051 let host_name = "refresh_ptr_host.local.";
4052 let service_ip_addr = my_ip_interfaces(false)
4053 .iter()
4054 .find(|iface| iface.ip().is_ipv4())
4055 .map(|iface| iface.ip())
4056 .unwrap();
4057
4058 let mut my_service = ServiceInfo::new(
4059 service_type,
4060 instance,
4061 host_name,
4062 &service_ip_addr,
4063 5023,
4064 None,
4065 )
4066 .unwrap();
4067
4068 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4070
4071 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4073 let result = mdns_server.register(my_service);
4074 assert!(result.is_ok());
4075
4076 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4077 let browse_chan = mdns_client.browse(service_type).unwrap();
4078 let timeout = Duration::from_millis(1500); let mut resolved = false;
4080
4081 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4083 match event {
4084 ServiceEvent::ServiceResolved(info) => {
4085 resolved = true;
4086 println!("Resolved a service of {}", &info.get_fullname());
4087 break;
4088 }
4089 _ => {}
4090 }
4091 }
4092
4093 assert!(resolved);
4094
4095 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4097 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4098 println!("event: {:?}", &event);
4099 }
4100
4101 let metrics_chan = mdns_client.get_metrics().unwrap();
4103 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4104 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4105 assert_eq!(ptr_refresh_counter, 1);
4106 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4107 assert_eq!(srvtxt_refresh_counter, 1);
4108
4109 mdns_server.shutdown().unwrap();
4111 mdns_client.shutdown().unwrap();
4112 }
4113
4114 #[test]
4115 fn test_name_change() {
4116 assert_eq!(name_change("foo.local."), "foo (2).local.");
4117 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4118 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4119 assert_eq!(name_change("foo"), "foo (2)");
4120 assert_eq!(name_change("foo (2)"), "foo (3)");
4121 assert_eq!(name_change(""), " (2)");
4122
4123 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4128
4129 #[test]
4130 fn test_hostname_change() {
4131 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4132 assert_eq!(hostname_change("foo"), "foo-2");
4133 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4134 assert_eq!(hostname_change("foo-9"), "foo-10");
4135 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4136 }
4137
4138 #[test]
4139 fn test_add_answer_txt_ttl() {
4140 let service_type = "_test_add_answer._udp.local.";
4142 let instance = "test_instance";
4143 let host_name = "add_answer_host.local.";
4144 let service_intf = my_ip_interfaces(false)
4145 .into_iter()
4146 .find(|iface| iface.ip().is_ipv4())
4147 .unwrap();
4148 let service_ip_addr = service_intf.ip();
4149 let my_service = ServiceInfo::new(
4150 service_type,
4151 instance,
4152 host_name,
4153 &service_ip_addr,
4154 5023,
4155 None,
4156 )
4157 .unwrap();
4158
4159 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4161
4162 let mut dummy_data = out.to_data_on_wire();
4164 let interface_id = InterfaceId::from(&service_intf);
4165 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4166
4167 add_answer_of_service(
4169 &mut out,
4170 &incoming,
4171 instance,
4172 &my_service,
4173 RRType::TXT,
4174 &service_intf,
4175 );
4176
4177 assert!(
4179 out.answers_count() > 0,
4180 "No answers added to the outgoing message"
4181 );
4182
4183 let answer = out._answers().first().unwrap();
4185 assert_eq!(answer.0.get_type(), RRType::TXT);
4186
4187 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4189 }
4190}