1#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42 Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50 cmp::{self, Reverse},
51 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52 fmt, io,
53 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54 str, thread,
55 time::Duration,
56 vec,
57};
58
59pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71pub const MDNS_PORT: u16 = 5353;
73
74const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
75const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
76const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
77
78const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
79
80#[derive(Debug)]
82pub enum UnregisterStatus {
83 OK,
85 NotFound,
87}
88
89#[derive(Debug, PartialEq, Clone, Eq)]
91#[non_exhaustive]
92pub enum DaemonStatus {
93 Running,
95
96 Shutdown,
98}
99
100#[derive(Hash, Eq, PartialEq)]
103enum Counter {
104 Register,
105 RegisterResend,
106 Unregister,
107 UnregisterResend,
108 Browse,
109 ResolveHostname,
110 Respond,
111 CacheRefreshPTR,
112 CacheRefreshSrvTxt,
113 CacheRefreshAddr,
114 KnownAnswerSuppression,
115 CachedPTR,
116 CachedSRV,
117 CachedAddr,
118 CachedTxt,
119 CachedNSec,
120 CachedSubtype,
121 DnsRegistryProbe,
122 DnsRegistryActive,
123 DnsRegistryTimer,
124 DnsRegistryNameChange,
125 Timer,
126}
127
128impl fmt::Display for Counter {
129 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
130 match self {
131 Self::Register => write!(f, "register"),
132 Self::RegisterResend => write!(f, "register-resend"),
133 Self::Unregister => write!(f, "unregister"),
134 Self::UnregisterResend => write!(f, "unregister-resend"),
135 Self::Browse => write!(f, "browse"),
136 Self::ResolveHostname => write!(f, "resolve-hostname"),
137 Self::Respond => write!(f, "respond"),
138 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
139 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
140 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
141 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
142 Self::CachedPTR => write!(f, "cached-ptr"),
143 Self::CachedSRV => write!(f, "cached-srv"),
144 Self::CachedAddr => write!(f, "cached-addr"),
145 Self::CachedTxt => write!(f, "cached-txt"),
146 Self::CachedNSec => write!(f, "cached-nsec"),
147 Self::CachedSubtype => write!(f, "cached-subtype"),
148 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
149 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
150 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
151 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
152 Self::Timer => write!(f, "timer"),
153 }
154 }
155}
156
157#[derive(Debug)]
158enum InternalError {
159 IntfAddrInvalid(Interface),
160}
161
162impl fmt::Display for InternalError {
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 match self {
165 InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
166 }
167 }
168}
169
170type MyResult<T> = core::result::Result<T, InternalError>;
171
172struct MyUdpSocket {
177 pktinfo: PktInfoUdpSocket,
180
181 mio: MioUdpSocket,
184}
185
186impl MyUdpSocket {
187 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
188 let std_sock = pktinfo.try_clone_std()?;
189 let mio = MioUdpSocket::from_std(std_sock);
190
191 Ok(Self { pktinfo, mio })
192 }
193}
194
195impl Source for MyUdpSocket {
197 fn register(
198 &mut self,
199 registry: &Registry,
200 token: Token,
201 interests: Interest,
202 ) -> io::Result<()> {
203 self.mio.register(registry, token, interests)
204 }
205
206 fn reregister(
207 &mut self,
208 registry: &Registry,
209 token: Token,
210 interests: Interest,
211 ) -> io::Result<()> {
212 self.mio.reregister(registry, token, interests)
213 }
214
215 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
216 self.mio.deregister(registry)
217 }
218}
219
220pub type Metrics = HashMap<String, i64>;
223
224const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
232pub struct ServiceDaemon {
233 sender: Sender<Command>,
235
236 signal_addr: SocketAddr,
242}
243
244impl ServiceDaemon {
245 pub fn new() -> Result<Self> {
251 Self::new_with_port(MDNS_PORT)
252 }
253
254 pub fn new_with_port(port: u16) -> Result<Self> {
277 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280
281 let signal_sock = UdpSocket::bind(signal_addr)
282 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
283
284 let signal_addr = signal_sock
286 .local_addr()
287 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
288
289 signal_sock
291 .set_nonblocking(true)
292 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
293
294 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
295
296 let (sender, receiver) = bounded(100);
297
298 let mio_sock = MioUdpSocket::from_std(signal_sock);
300 let cmd_sender = sender.clone();
301 thread::Builder::new()
302 .name("mDNS_daemon".to_string())
303 .spawn(move || {
304 Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
305 })
306 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
307
308 Ok(Self {
309 sender,
310 signal_addr,
311 })
312 }
313
314 fn send_cmd(&self, cmd: Command) -> Result<()> {
317 let cmd_name = cmd.to_string();
318
319 self.sender.try_send(cmd).map_err(|e| match e {
321 TrySendError::Full(_) => Error::Again,
322 e => e_fmt!("flume::channel::send failed: {}", e),
323 })?;
324
325 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
327 let socket = UdpSocket::bind(addr)
328 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
329 socket
330 .send_to(cmd_name.as_bytes(), self.signal_addr)
331 .map_err(|e| {
332 e_fmt!(
333 "signal socket send_to {} ({}) failed: {}",
334 self.signal_addr,
335 cmd_name,
336 e
337 )
338 })?;
339
340 Ok(())
341 }
342
343 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
354 check_domain_suffix(service_type)?;
355
356 let (resp_s, resp_r) = bounded(10);
357 self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
358 Ok(resp_r)
359 }
360
361 pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
370 check_domain_suffix(service_type)?;
371
372 let (resp_s, resp_r) = bounded(10);
373 self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
374 Ok(resp_r)
375 }
376
377 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
382 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
383 }
384
385 pub fn resolve_hostname(
393 &self,
394 hostname: &str,
395 timeout: Option<u64>,
396 ) -> Result<Receiver<HostnameResolutionEvent>> {
397 check_hostname(hostname)?;
398 let (resp_s, resp_r) = bounded(10);
399 self.send_cmd(Command::ResolveHostname(
400 hostname.to_string(),
401 1,
402 resp_s,
403 timeout,
404 ))?;
405 Ok(resp_r)
406 }
407
408 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
413 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
414 }
415
416 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
424 check_service_name(service_info.get_fullname())?;
425 check_hostname(service_info.get_hostname())?;
426
427 self.send_cmd(Command::Register(service_info.into()))
428 }
429
430 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
438 let (resp_s, resp_r) = bounded(1);
439 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
440 Ok(resp_r)
441 }
442
443 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
447 let (resp_s, resp_r) = bounded(100);
448 self.send_cmd(Command::Monitor(resp_s))?;
449 Ok(resp_r)
450 }
451
452 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
457 let (resp_s, resp_r) = bounded(1);
458 self.send_cmd(Command::Exit(resp_s))?;
459 Ok(resp_r)
460 }
461
462 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
468 let (resp_s, resp_r) = bounded(1);
469
470 if self.sender.is_disconnected() {
471 resp_s
472 .send(DaemonStatus::Shutdown)
473 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
474 } else {
475 self.send_cmd(Command::GetStatus(resp_s))?;
476 }
477
478 Ok(resp_r)
479 }
480
481 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
486 let (resp_s, resp_r) = bounded(1);
487 self.send_cmd(Command::GetMetrics(resp_s))?;
488 Ok(resp_r)
489 }
490
491 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
498 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
501 return Err(Error::Msg(format!(
502 "service name length max {len_max} is too large"
503 )));
504 }
505
506 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
507 }
508
509 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
515 let interval_in_millis = interval_in_secs as u64 * 1000;
516 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
517 interval_in_millis,
518 )))
519 }
520
521 pub fn get_ip_check_interval(&self) -> Result<u32> {
523 let (resp_s, resp_r) = bounded(1);
524 self.send_cmd(Command::GetOption(resp_s))?;
525
526 let option = resp_r
527 .recv_timeout(Duration::from_secs(10))
528 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
529 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
530 Ok(ip_check_interval_in_secs as u32)
531 }
532
533 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
540 let if_kind_vec = if_kind.into_vec();
541 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
542 if_kind_vec.kinds,
543 )))
544 }
545
546 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
553 let if_kind_vec = if_kind.into_vec();
554 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
555 if_kind_vec.kinds,
556 )))
557 }
558
559 pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
570 self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
571 }
572
573 pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
576 self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
577 }
578
579 #[cfg(test)]
580 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
581 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
582 ifname.to_string(),
583 )))
584 }
585
586 #[cfg(test)]
587 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
588 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
589 ifname.to_string(),
590 )))
591 }
592
593 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
609 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
610 }
611
612 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
628 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
629 }
630
631 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
644 self.send_cmd(Command::Verify(instance_fullname, timeout))
645 }
646
647 fn daemon_thread(
648 signal_sock: MioUdpSocket,
649 poller: Poll,
650 receiver: Receiver<Command>,
651 port: u16,
652 cmd_sender: Sender<Command>,
653 signal_addr: SocketAddr,
654 ) {
655 let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
656
657 if let Some(cmd) = zc.run(receiver) {
658 match cmd {
659 Command::Exit(resp_s) => {
660 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
663 debug!("exit: failed to send response of shutdown: {}", e);
664 }
665 }
666 _ => {
667 debug!("Unexpected command: {:?}", cmd);
668 }
669 }
670 }
671 }
672}
673
674fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
676 let intf_ip = &intf.ip();
679 match intf_ip {
680 IpAddr::V4(ip) => {
681 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
682 let sock = new_socket(addr.into(), true)?;
683
684 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
686 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
687
688 sock.set_multicast_if_v4(ip)
690 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
691
692 sock.set_multicast_ttl_v4(255)
697 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
698
699 if !should_loop {
700 sock.set_multicast_loop_v4(false)
701 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
702 }
703
704 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
706 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
707 for packet in test_packets {
708 sock.send_to(&packet, &multicast_addr)
709 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
710 }
711 MyUdpSocket::new(sock)
712 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
713 }
714 IpAddr::V6(ip) => {
715 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
716 let sock = new_socket(addr.into(), true)?;
717
718 let if_index = intf.index.unwrap_or(0);
719
720 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
722 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
723
724 sock.set_multicast_if_v6(if_index)
726 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
727
728 MyUdpSocket::new(sock)
733 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
734 }
735 }
736}
737
738fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
741 let domain = match addr {
742 SocketAddr::V4(_) => socket2::Domain::IPV4,
743 SocketAddr::V6(_) => socket2::Domain::IPV6,
744 };
745
746 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
747
748 fd.set_reuse_address(true)
749 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
750 #[cfg(unix)]
751 if let Err(e) = fd.set_reuse_port(true) {
752 debug!(
753 "SO_REUSEPORT is not supported, continuing without it: {}",
754 e
755 );
756 }
757
758 if non_block {
759 fd.set_nonblocking(true)
760 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
761 }
762
763 fd.bind(&addr.into())
764 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
765
766 trace!("new socket bind to {}", &addr);
767 Ok(fd)
768}
769
770struct ReRun {
772 next_time: u64,
774 command: Command,
775}
776
777#[derive(Debug, Clone)]
781#[non_exhaustive]
782pub enum IfKind {
783 All,
785
786 IPv4,
788
789 IPv6,
791
792 Name(String),
794
795 Addr(IpAddr),
797
798 LoopbackV4,
802
803 LoopbackV6,
805}
806
807impl IfKind {
808 fn matches(&self, intf: &Interface) -> bool {
810 match self {
811 Self::All => true,
812 Self::IPv4 => intf.ip().is_ipv4(),
813 Self::IPv6 => intf.ip().is_ipv6(),
814 Self::Name(ifname) => ifname == &intf.name,
815 Self::Addr(addr) => addr == &intf.ip(),
816 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
817 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
818 }
819 }
820}
821
822impl From<&str> for IfKind {
825 fn from(val: &str) -> Self {
826 Self::Name(val.to_string())
827 }
828}
829
830impl From<&String> for IfKind {
831 fn from(val: &String) -> Self {
832 Self::Name(val.to_string())
833 }
834}
835
836impl From<IpAddr> for IfKind {
838 fn from(val: IpAddr) -> Self {
839 Self::Addr(val)
840 }
841}
842
843pub struct IfKindVec {
845 kinds: Vec<IfKind>,
846}
847
848pub trait IntoIfKindVec {
850 fn into_vec(self) -> IfKindVec;
851}
852
853impl<T: Into<IfKind>> IntoIfKindVec for T {
854 fn into_vec(self) -> IfKindVec {
855 let if_kind: IfKind = self.into();
856 IfKindVec {
857 kinds: vec![if_kind],
858 }
859 }
860}
861
862impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
863 fn into_vec(self) -> IfKindVec {
864 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
865 IfKindVec { kinds }
866 }
867}
868
869struct IfSelection {
871 if_kind: IfKind,
873
874 selected: bool,
876}
877
878struct Zeroconf {
880 port: u16,
883
884 my_intfs: HashMap<u32, MyIntf>,
886
887 ipv4_sock: Option<MyUdpSocket>,
889
890 ipv6_sock: Option<MyUdpSocket>,
892
893 my_services: HashMap<String, ServiceInfo>,
895
896 cache: DnsCache,
898
899 dns_registry_map: HashMap<u32, DnsRegistry>,
901
902 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
913
914 counters: Metrics,
915
916 poller: Poll,
918
919 monitors: Vec<Sender<DaemonEvent>>,
921
922 service_name_len_max: u8,
924
925 ip_check_interval: u64,
927
928 if_selections: Vec<IfSelection>,
930
931 signal_sock: MioUdpSocket,
933
934 timers: BinaryHeap<Reverse<u64>>,
940
941 status: DaemonStatus,
942
943 pending_resolves: HashSet<String>,
945
946 resolved: HashSet<String>,
948
949 multicast_loop_v4: bool,
950
951 multicast_loop_v6: bool,
952
953 accept_unsolicited: bool,
954
955 include_apple_p2p: bool,
956
957 cmd_sender: Sender<Command>,
958
959 signal_addr: SocketAddr,
960
961 #[cfg(test)]
962 test_down_interfaces: HashSet<String>,
963}
964
965fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
967 let intf_ip = &intf.ip();
968 match intf_ip {
969 IpAddr::V4(ip) => {
970 debug!("join multicast group V4 on {} addr {ip}", intf.name);
972 my_sock
973 .join_multicast_v4(&GROUP_ADDR_V4, ip)
974 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
975 }
976 IpAddr::V6(ip) => {
977 let if_index = intf.index.unwrap_or(0);
978 debug!(
980 "join multicast group V6 on {} addr {ip} with index {if_index}",
981 intf.name
982 );
983 my_sock
984 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
985 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
986 }
987 }
988 Ok(())
989}
990
991impl Zeroconf {
992 fn new(
993 signal_sock: MioUdpSocket,
994 poller: Poll,
995 port: u16,
996 cmd_sender: Sender<Command>,
997 signal_addr: SocketAddr,
998 ) -> Self {
999 let my_ifaddrs = my_ip_interfaces(true);
1001
1002 let mut my_intfs = HashMap::new();
1006 let mut dns_registry_map = HashMap::new();
1007
1008 let mut ipv4_sock = None;
1011 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1012 match new_socket(addr.into(), true) {
1013 Ok(sock) => {
1014 sock.set_multicast_ttl_v4(255)
1019 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1020 .ok();
1021
1022 ipv4_sock = match MyUdpSocket::new(sock) {
1024 Ok(s) => Some(s),
1025 Err(e) => {
1026 debug!("failed to create IPv4 MyUdpSocket: {e}");
1027 None
1028 }
1029 };
1030 }
1031 Err(e) => debug!("failed to create IPv4 socket: {e}"),
1033 }
1034
1035 let mut ipv6_sock = None;
1036 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1037 match new_socket(addr.into(), true) {
1038 Ok(sock) => {
1039 sock.set_multicast_hops_v6(255)
1043 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1044 .ok();
1045
1046 ipv6_sock = match MyUdpSocket::new(sock) {
1048 Ok(s) => Some(s),
1049 Err(e) => {
1050 debug!("failed to create IPv6 MyUdpSocket: {e}");
1051 None
1052 }
1053 };
1054 }
1055 Err(e) => debug!("failed to create IPv6 socket: {e}"),
1056 }
1057
1058 for intf in my_ifaddrs {
1060 let sock_opt = if intf.ip().is_ipv4() {
1061 &ipv4_sock
1062 } else {
1063 &ipv6_sock
1064 };
1065 let Some(sock) = sock_opt else {
1066 debug!(
1067 "no socket available for interface {} with addr {}. Skipped.",
1068 intf.name,
1069 intf.ip()
1070 );
1071 continue;
1072 };
1073
1074 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1075 debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1076 }
1077
1078 let if_index = intf.index.unwrap_or(0);
1079
1080 dns_registry_map
1082 .entry(if_index)
1083 .or_insert_with(DnsRegistry::new);
1084
1085 my_intfs
1086 .entry(if_index)
1087 .and_modify(|v: &mut MyIntf| {
1088 v.addrs.insert(intf.addr.clone());
1089 })
1090 .or_insert(MyIntf {
1091 name: intf.name.clone(),
1092 index: if_index,
1093 addrs: HashSet::from([intf.addr]),
1094 });
1095 }
1096
1097 let monitors = Vec::new();
1098 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1099 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1100
1101 let timers = BinaryHeap::new();
1102
1103 let if_selections = vec![];
1105
1106 let status = DaemonStatus::Running;
1107
1108 Self {
1109 port,
1110 my_intfs,
1111 ipv4_sock,
1112 ipv6_sock,
1113 my_services: HashMap::new(),
1114 cache: DnsCache::new(),
1115 dns_registry_map,
1116 hostname_resolvers: HashMap::new(),
1117 service_queriers: HashMap::new(),
1118 retransmissions: Vec::new(),
1119 counters: HashMap::new(),
1120 poller,
1121 monitors,
1122 service_name_len_max,
1123 ip_check_interval,
1124 if_selections,
1125 signal_sock,
1126 timers,
1127 status,
1128 pending_resolves: HashSet::new(),
1129 resolved: HashSet::new(),
1130 multicast_loop_v4: true,
1131 multicast_loop_v6: true,
1132 accept_unsolicited: false,
1133 include_apple_p2p: false,
1134 cmd_sender,
1135 signal_addr,
1136
1137 #[cfg(test)]
1138 test_down_interfaces: HashSet::new(),
1139 }
1140 }
1141
1142 fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1144 let cmd_name = cmd.to_string();
1145
1146 self.cmd_sender.try_send(cmd).map_err(|e| match e {
1147 TrySendError::Full(_) => Error::Again,
1148 e => e_fmt!("flume::channel::send failed: {}", e),
1149 })?;
1150
1151 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1152 let socket = UdpSocket::bind(addr)
1153 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1154 socket
1155 .send_to(cmd_name.as_bytes(), self.signal_addr)
1156 .map_err(|e| {
1157 e_fmt!(
1158 "signal socket send_to {} ({}) failed: {}",
1159 self.signal_addr,
1160 cmd_name,
1161 e
1162 )
1163 })?;
1164
1165 Ok(())
1166 }
1167
1168 fn cleanup(&mut self) {
1176 debug!("Starting cleanup for shutdown");
1177
1178 let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1180 for fullname in service_names {
1181 if let Some(info) = self.my_services.get(&fullname) {
1182 debug!("Unregistering service during shutdown: {}", &fullname);
1183
1184 for intf in self.my_intfs.values() {
1185 if let Some(sock) = self.ipv4_sock.as_ref() {
1186 self.unregister_service(info, intf, &sock.pktinfo);
1187 }
1188
1189 if let Some(sock) = self.ipv6_sock.as_ref() {
1190 self.unregister_service(info, intf, &sock.pktinfo);
1191 }
1192 }
1193 }
1194 }
1195 self.my_services.clear();
1196
1197 let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1199 for ty_domain in browse_types {
1200 debug!("Stopping browse during shutdown: {}", &ty_domain);
1201 if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1202 if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1204 debug!("Failed to send SearchStopped during shutdown: {}", e);
1205 }
1206 }
1207 }
1208
1209 let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1211 for hostname in hostnames {
1212 debug!(
1213 "Stopping hostname resolution during shutdown: {}",
1214 &hostname
1215 );
1216 if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1217 if let Err(e) =
1219 sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1220 {
1221 debug!(
1222 "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1223 e
1224 );
1225 }
1226 }
1227 }
1228
1229 self.retransmissions.clear();
1231
1232 debug!("Cleanup completed");
1233 }
1234
1235 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1244 if let Err(e) = self.poller.registry().register(
1246 &mut self.signal_sock,
1247 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1248 mio::Interest::READABLE,
1249 ) {
1250 debug!("failed to add signal socket to the poller: {}", e);
1251 return None;
1252 }
1253
1254 if let Some(sock) = self.ipv4_sock.as_mut() {
1255 if let Err(e) = self.poller.registry().register(
1256 sock,
1257 mio::Token(IPV4_SOCK_EVENT_KEY),
1258 mio::Interest::READABLE,
1259 ) {
1260 debug!("failed to register ipv4 socket: {}", e);
1261 return None;
1262 }
1263 }
1264
1265 if let Some(sock) = self.ipv6_sock.as_mut() {
1266 if let Err(e) = self.poller.registry().register(
1267 sock,
1268 mio::Token(IPV6_SOCK_EVENT_KEY),
1269 mio::Interest::READABLE,
1270 ) {
1271 debug!("failed to register ipv6 socket: {}", e);
1272 return None;
1273 }
1274 }
1275
1276 let mut next_ip_check = if self.ip_check_interval > 0 {
1278 current_time_millis() + self.ip_check_interval
1279 } else {
1280 0
1281 };
1282
1283 if next_ip_check > 0 {
1284 self.add_timer(next_ip_check);
1285 }
1286
1287 let mut events = mio::Events::with_capacity(1024);
1290 loop {
1291 let now = current_time_millis();
1292
1293 let earliest_timer = self.peek_earliest_timer();
1294 let timeout = earliest_timer.map(|timer| {
1295 let millis = if timer > now { timer - now } else { 1 };
1297 Duration::from_millis(millis)
1298 });
1299
1300 events.clear();
1302 match self.poller.poll(&mut events, timeout) {
1303 Ok(_) => self.handle_poller_events(&events),
1304 Err(e) => debug!("failed to select from sockets: {}", e),
1305 }
1306
1307 let now = current_time_millis();
1308
1309 self.pop_timers_till(now);
1311
1312 for hostname in self
1314 .hostname_resolvers
1315 .clone()
1316 .into_iter()
1317 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1318 .map(|(hostname, _)| hostname)
1319 {
1320 trace!("hostname resolver timeout for {}", &hostname);
1321 call_hostname_resolution_listener(
1322 &self.hostname_resolvers,
1323 &hostname,
1324 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1325 );
1326 call_hostname_resolution_listener(
1327 &self.hostname_resolvers,
1328 &hostname,
1329 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1330 );
1331 self.hostname_resolvers.remove(&hostname);
1332 }
1333
1334 while let Ok(command) = receiver.try_recv() {
1336 if matches!(command, Command::Exit(_)) {
1337 debug!("Exit command received, performing cleanup");
1338 self.cleanup();
1339 self.status = DaemonStatus::Shutdown;
1340 return Some(command);
1341 }
1342 self.exec_command(command, false);
1343 }
1344
1345 let mut i = 0;
1347 while i < self.retransmissions.len() {
1348 if now >= self.retransmissions[i].next_time {
1349 let rerun = self.retransmissions.remove(i);
1350 self.exec_command(rerun.command, true);
1351 } else {
1352 i += 1;
1353 }
1354 }
1355
1356 self.refresh_active_services();
1358
1359 let mut query_count = 0;
1361 for (hostname, _sender) in self.hostname_resolvers.iter() {
1362 for (hostname, ip_addr) in
1363 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1364 {
1365 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1366 query_count += 1;
1367 }
1368 }
1369
1370 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1371
1372 let now = current_time_millis();
1374
1375 let expired_services = self.cache.evict_expired_services(now);
1377 if !expired_services.is_empty() {
1378 debug!(
1379 "run: send {} service removal to listeners",
1380 expired_services.len()
1381 );
1382 self.notify_service_removal(expired_services);
1383 }
1384
1385 let expired_addrs = self.cache.evict_expired_addr(now);
1387 for (hostname, addrs) in expired_addrs {
1388 call_hostname_resolution_listener(
1389 &self.hostname_resolvers,
1390 &hostname,
1391 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1392 );
1393 let instances = self.cache.get_instances_on_host(&hostname);
1394 let instance_set: HashSet<String> = instances.into_iter().collect();
1395 self.resolve_updated_instances(&instance_set);
1396 }
1397
1398 self.probing_handler();
1400
1401 if now >= next_ip_check && next_ip_check > 0 {
1403 next_ip_check = now + self.ip_check_interval;
1404 self.add_timer(next_ip_check);
1405
1406 self.check_ip_changes();
1407 }
1408 }
1409 }
1410
1411 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1412 match daemon_opt {
1413 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1414 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1415 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1416 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1417 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1418 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1419 DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1420 DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1421 #[cfg(test)]
1422 DaemonOption::TestDownInterface(ifname) => {
1423 self.test_down_interfaces.insert(ifname);
1424 }
1425 #[cfg(test)]
1426 DaemonOption::TestUpInterface(ifname) => {
1427 self.test_down_interfaces.remove(&ifname);
1428 }
1429 }
1430 }
1431
1432 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1433 debug!("enable_interface: {:?}", kinds);
1434 for if_kind in kinds {
1435 self.if_selections.push(IfSelection {
1436 if_kind,
1437 selected: true,
1438 });
1439 }
1440
1441 self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1442 }
1443
1444 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1445 debug!("disable_interface: {:?}", kinds);
1446 for if_kind in kinds {
1447 self.if_selections.push(IfSelection {
1448 if_kind,
1449 selected: false,
1450 });
1451 }
1452
1453 self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1454 }
1455
1456 fn set_multicast_loop_v4(&mut self, on: bool) {
1457 let Some(sock) = self.ipv4_sock.as_mut() else {
1458 return;
1459 };
1460 self.multicast_loop_v4 = on;
1461 sock.pktinfo
1462 .set_multicast_loop_v4(on)
1463 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1464 .unwrap();
1465 }
1466
1467 fn set_multicast_loop_v6(&mut self, on: bool) {
1468 let Some(sock) = self.ipv6_sock.as_mut() else {
1469 return;
1470 };
1471 self.multicast_loop_v6 = on;
1472 sock.pktinfo
1473 .set_multicast_loop_v6(on)
1474 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1475 .unwrap();
1476 }
1477
1478 fn set_accept_unsolicited(&mut self, accept: bool) {
1479 self.accept_unsolicited = accept;
1480 }
1481
1482 fn set_apple_p2p(&mut self, include: bool) {
1483 if self.include_apple_p2p != include {
1484 self.include_apple_p2p = include;
1485 self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1486 }
1487 }
1488
1489 fn notify_monitors(&mut self, event: DaemonEvent) {
1490 self.monitors.retain(|sender| {
1492 if let Err(e) = sender.try_send(event.clone()) {
1493 debug!("notify_monitors: try_send: {}", &e);
1494 if matches!(e, TrySendError::Disconnected(_)) {
1495 return false; }
1497 }
1498 true
1499 });
1500 }
1501
1502 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1504 for (_, service_info) in self.my_services.iter_mut() {
1505 if service_info.is_addr_auto() {
1506 service_info.remove_ipaddr(addr);
1507 }
1508 }
1509 }
1510
1511 fn add_timer(&mut self, next_time: u64) {
1512 self.timers.push(Reverse(next_time));
1513 }
1514
1515 fn peek_earliest_timer(&self) -> Option<u64> {
1516 self.timers.peek().map(|Reverse(v)| *v)
1517 }
1518
1519 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1520 self.timers.pop().map(|Reverse(v)| v)
1521 }
1522
1523 fn pop_timers_till(&mut self, now: u64) {
1525 while let Some(Reverse(v)) = self.timers.peek() {
1526 if *v > now {
1527 break;
1528 }
1529 self.timers.pop();
1530 }
1531 }
1532
1533 fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1535 let intf_count = interfaces.len();
1536 let mut intf_selections = vec![true; intf_count];
1537
1538 for selection in self.if_selections.iter() {
1540 for i in 0..intf_count {
1542 if selection.if_kind.matches(&interfaces[i]) {
1543 intf_selections[i] = selection.selected;
1544 }
1545 }
1546 }
1547
1548 let mut selected_addrs = HashSet::new();
1549 for i in 0..intf_count {
1550 if intf_selections[i] {
1551 selected_addrs.insert(interfaces[i].clone());
1552 }
1553 }
1554
1555 selected_addrs
1556 }
1557
1558 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1563 let intf_count = interfaces.len();
1565 let mut intf_selections = vec![true; intf_count];
1566
1567 for selection in self.if_selections.iter() {
1569 for i in 0..intf_count {
1571 if selection.if_kind.matches(&interfaces[i]) {
1572 intf_selections[i] = selection.selected;
1573 }
1574 }
1575 }
1576
1577 for (idx, intf) in interfaces.into_iter().enumerate() {
1579 if intf_selections[idx] {
1580 self.add_interface(intf);
1582 } else {
1583 self.del_interface_addr(&intf);
1585 }
1586 }
1587 }
1588
1589 fn del_ip(&mut self, ip: IpAddr) {
1590 self.del_addr_in_my_services(&ip);
1591 self.notify_monitors(DaemonEvent::IpDel(ip));
1592 }
1593
1594 fn check_ip_changes(&mut self) {
1596 let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1598
1599 #[cfg(test)]
1600 let my_ifaddrs: Vec<_> = my_ifaddrs
1601 .into_iter()
1602 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1603 .collect();
1604
1605 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1606 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1607 let if_index = intf.index.unwrap_or(0);
1608 acc.entry(if_index).or_default().push(&intf.addr);
1609 acc
1610 });
1611
1612 let mut deleted_intfs = Vec::new();
1613 let mut deleted_ips = Vec::new();
1614
1615 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1616 let mut last_ipv4 = None;
1617 let mut last_ipv6 = None;
1618
1619 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1620 my_intf.addrs.retain(|addr| {
1621 if current_addrs.contains(&addr) {
1622 true
1623 } else {
1624 match addr.ip() {
1625 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1626 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1627 }
1628 deleted_ips.push(addr.ip());
1629 false
1630 }
1631 });
1632 if my_intf.addrs.is_empty() {
1633 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1634 }
1635 } else {
1636 debug!(
1638 "check_ip_changes: interface {} ({}) no longer exists, removing",
1639 my_intf.name, if_index
1640 );
1641 for addr in my_intf.addrs.iter() {
1642 match addr.ip() {
1643 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1644 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1645 }
1646 deleted_ips.push(addr.ip())
1647 }
1648 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1649 }
1650 }
1651
1652 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1653 debug!(
1654 "check_ip_changes: {} deleted ips {} deleted intfs",
1655 deleted_ips.len(),
1656 deleted_intfs.len()
1657 );
1658 }
1659
1660 for ip in deleted_ips {
1661 self.del_ip(ip);
1662 }
1663
1664 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1665 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1666 continue;
1667 };
1668
1669 if let Some(ipv4) = last_ipv4 {
1670 debug!("leave multicast for {ipv4}");
1671 if let Some(sock) = self.ipv4_sock.as_mut() {
1672 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1673 debug!("leave multicast group for addr {ipv4}: {e}");
1674 }
1675 }
1676 }
1677
1678 if let Some(ipv6) = last_ipv6 {
1679 debug!("leave multicast for {ipv6}");
1680 if let Some(sock) = self.ipv6_sock.as_mut() {
1681 if let Err(e) = sock
1682 .pktinfo
1683 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1684 {
1685 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1686 }
1687 }
1688 }
1689
1690 let intf_id = InterfaceId {
1692 name: my_intf.name.to_string(),
1693 index: my_intf.index,
1694 };
1695 let removed_instances = self.cache.remove_records_on_intf(intf_id);
1696 self.notify_service_removal(removed_instances);
1697 }
1698
1699 self.apply_intf_selections(my_ifaddrs);
1701 }
1702
1703 fn del_interface_addr(&mut self, intf: &Interface) {
1706 let if_index = intf.index.unwrap_or(0);
1707 trace!(
1708 "del_interface_addr: {} ({if_index}) addr {}",
1709 intf.name,
1710 intf.ip()
1711 );
1712
1713 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1714 debug!("del_interface_addr: interface {} not found", intf.name);
1715 return;
1716 };
1717
1718 let mut ip_removed = false;
1719
1720 if my_intf.addrs.remove(&intf.addr) {
1721 ip_removed = true;
1722
1723 match intf.addr.ip() {
1724 IpAddr::V4(ipv4) => {
1725 if my_intf.next_ifaddr_v4().is_none() {
1726 if let Some(sock) = self.ipv4_sock.as_mut() {
1727 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1728 debug!("leave multicast group for addr {ipv4}: {e}");
1729 }
1730 }
1731 }
1732 }
1733
1734 IpAddr::V6(ipv6) => {
1735 if my_intf.next_ifaddr_v6().is_none() {
1736 if let Some(sock) = self.ipv6_sock.as_mut() {
1737 if let Err(e) =
1738 sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1739 {
1740 debug!("leave multicast group for addr {ipv6}: {e}");
1741 }
1742 }
1743 }
1744 }
1745 }
1746
1747 if my_intf.addrs.is_empty() {
1748 debug!("del_interface_addr: removing interface {}", intf.name);
1750 self.my_intfs.remove(&if_index);
1751 self.dns_registry_map.remove(&if_index);
1752 self.cache.remove_addrs_on_disabled_intf(if_index);
1753 }
1754 }
1755
1756 if ip_removed {
1757 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1759 self.del_addr_in_my_services(&intf.ip());
1761 }
1762 }
1763
1764 fn add_interface(&mut self, intf: Interface) {
1765 let sock_opt = if intf.ip().is_ipv4() {
1766 &self.ipv4_sock
1767 } else {
1768 &self.ipv6_sock
1769 };
1770
1771 let Some(sock) = sock_opt else {
1772 debug!(
1773 "add_interface: no socket available for interface {} with addr {}. Skipped.",
1774 intf.name,
1775 intf.ip()
1776 );
1777 return;
1778 };
1779
1780 let if_index = intf.index.unwrap_or(0);
1781 let mut new_addr = false;
1782
1783 match self.my_intfs.entry(if_index) {
1784 Entry::Occupied(mut entry) => {
1785 let my_intf = entry.get_mut();
1787 if !my_intf.addrs.contains(&intf.addr) {
1788 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1789 debug!("add_interface: socket_config {}: {e}", &intf.name);
1790 }
1791 my_intf.addrs.insert(intf.addr.clone());
1792 new_addr = true;
1793 }
1794 }
1795 Entry::Vacant(entry) => {
1796 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1797 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1798 return;
1799 }
1800
1801 new_addr = true;
1802 let new_intf = MyIntf {
1803 name: intf.name.clone(),
1804 index: if_index,
1805 addrs: HashSet::from([intf.addr.clone()]),
1806 };
1807 entry.insert(new_intf);
1808 }
1809 }
1810
1811 if !new_addr {
1812 trace!("add_interface: interface {} already exists", &intf.name);
1813 return;
1814 }
1815
1816 debug!("add new interface {}: {}", intf.name, intf.ip());
1817
1818 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1819 debug!("add_interface: cannot find if_index {if_index}");
1820 return;
1821 };
1822
1823 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1824 Some(registry) => registry,
1825 None => self
1826 .dns_registry_map
1827 .entry(if_index)
1828 .or_insert_with(DnsRegistry::new),
1829 };
1830
1831 for (_, service_info) in self.my_services.iter_mut() {
1832 if service_info.is_addr_auto() {
1833 service_info.insert_ipaddr(&intf);
1834
1835 if let Ok(true) = announce_service_on_intf(
1836 dns_registry,
1837 service_info,
1838 my_intf,
1839 &sock.pktinfo,
1840 self.port,
1841 ) {
1842 debug!(
1843 "Announce service {} on {}",
1844 service_info.get_fullname(),
1845 intf.ip()
1846 );
1847 service_info.set_status(if_index, ServiceStatus::Announced);
1848 } else {
1849 for timer in dns_registry.new_timers.drain(..) {
1850 self.timers.push(Reverse(timer));
1851 }
1852 service_info.set_status(if_index, ServiceStatus::Probing);
1853 }
1854 }
1855 }
1856
1857 let mut browse_reruns = Vec::new();
1859 let mut i = 0;
1860 while i < self.retransmissions.len() {
1861 if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1862 browse_reruns.push(self.retransmissions.remove(i));
1863 } else {
1864 i += 1;
1865 }
1866 }
1867
1868 for rerun in browse_reruns {
1869 self.exec_command(rerun.command, true);
1870 }
1871
1872 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1874 }
1875
1876 fn register_service(&mut self, mut info: ServiceInfo) {
1885 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1887 error!("check_service_name_length: {}", &e);
1888 self.notify_monitors(DaemonEvent::Error(e));
1889 return;
1890 }
1891
1892 if info.is_addr_auto() {
1893 let selected_intfs =
1894 self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
1895 for intf in selected_intfs {
1896 info.insert_ipaddr(&intf);
1897 }
1898 }
1899
1900 debug!("register service {:?}", &info);
1901
1902 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1903 if !outgoing_addrs.is_empty() {
1904 self.notify_monitors(DaemonEvent::Announce(
1905 info.get_fullname().to_string(),
1906 format!("{:?}", &outgoing_addrs),
1907 ));
1908 }
1909
1910 let service_fullname = info.get_fullname().to_lowercase();
1913 self.my_services.insert(service_fullname, info);
1914 }
1915
1916 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1919 let mut outgoing_addrs = Vec::new();
1920 let mut outgoing_intfs = HashSet::new();
1921
1922 let mut invalid_intf_addrs = HashSet::new();
1923
1924 for (if_index, intf) in self.my_intfs.iter() {
1925 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1926 Some(registry) => registry,
1927 None => self
1928 .dns_registry_map
1929 .entry(*if_index)
1930 .or_insert_with(DnsRegistry::new),
1931 };
1932
1933 let mut announced = false;
1934
1935 if let Some(sock) = self.ipv4_sock.as_mut() {
1937 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1938 Ok(true) => {
1939 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1940 outgoing_addrs.push(addr.ip());
1941 }
1942 outgoing_intfs.insert(intf.index);
1943
1944 debug!(
1945 "Announce service IPv4 {} on {}",
1946 info.get_fullname(),
1947 intf.name
1948 );
1949 announced = true;
1950 }
1951 Ok(false) => {}
1952 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
1953 invalid_intf_addrs.insert(intf_addr);
1954 }
1955 }
1956 }
1957
1958 if let Some(sock) = self.ipv6_sock.as_mut() {
1959 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1960 Ok(true) => {
1961 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1962 outgoing_addrs.push(addr.ip());
1963 }
1964 outgoing_intfs.insert(intf.index);
1965
1966 debug!(
1967 "Announce service IPv6 {} on {}",
1968 info.get_fullname(),
1969 intf.name
1970 );
1971 announced = true;
1972 }
1973 Ok(false) => {}
1974 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
1975 invalid_intf_addrs.insert(intf_addr);
1976 }
1977 }
1978 }
1979
1980 if announced {
1981 info.set_status(intf.index, ServiceStatus::Announced);
1982 } else {
1983 for timer in dns_registry.new_timers.drain(..) {
1984 self.timers.push(Reverse(timer));
1985 }
1986 info.set_status(*if_index, ServiceStatus::Probing);
1987 }
1988 }
1989
1990 if !invalid_intf_addrs.is_empty() {
1991 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
1992 }
1993
1994 let next_time = current_time_millis() + 1000;
1998 for if_index in outgoing_intfs {
1999 self.add_retransmission(
2000 next_time,
2001 Command::RegisterResend(info.get_fullname().to_string(), if_index),
2002 );
2003 }
2004
2005 outgoing_addrs
2006 }
2007
2008 fn probing_handler(&mut self) {
2010 let now = current_time_millis();
2011 let mut invalid_intf_addrs = HashSet::new();
2012
2013 for (if_index, intf) in self.my_intfs.iter() {
2014 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2015 continue;
2016 };
2017
2018 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2019
2020 if !out.questions().is_empty() {
2022 trace!("sending out probing of questions: {:?}", out.questions());
2023 if let Some(sock) = self.ipv4_sock.as_mut() {
2024 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2025 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2026 {
2027 invalid_intf_addrs.insert(intf_addr);
2028 }
2029 }
2030 if let Some(sock) = self.ipv6_sock.as_mut() {
2031 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2032 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2033 {
2034 invalid_intf_addrs.insert(intf_addr);
2035 }
2036 }
2037 }
2038
2039 let waiting_services =
2041 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2042
2043 for service_name in waiting_services {
2044 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2046 if info.get_status(*if_index) == ServiceStatus::Announced {
2047 debug!("service {} already announced", info.get_fullname());
2048 continue;
2049 }
2050
2051 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2052 match announce_service_on_intf(
2053 dns_registry,
2054 info,
2055 intf,
2056 &sock.pktinfo,
2057 self.port,
2058 ) {
2059 Ok(announced) => announced,
2060 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2061 invalid_intf_addrs.insert(intf_addr);
2062 false
2063 }
2064 }
2065 } else {
2066 false
2067 };
2068 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2069 match announce_service_on_intf(
2070 dns_registry,
2071 info,
2072 intf,
2073 &sock.pktinfo,
2074 self.port,
2075 ) {
2076 Ok(announced) => announced,
2077 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2078 invalid_intf_addrs.insert(intf_addr);
2079 false
2080 }
2081 }
2082 } else {
2083 false
2084 };
2085
2086 if announced_v4 || announced_v6 {
2087 let next_time = now + 1000;
2088 let command =
2089 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2090 self.retransmissions.push(ReRun { next_time, command });
2091 self.timers.push(Reverse(next_time));
2092
2093 let fullname = match dns_registry.name_changes.get(&service_name) {
2094 Some(new_name) => new_name.to_string(),
2095 None => service_name.to_string(),
2096 };
2097
2098 let mut hostname = info.get_hostname();
2099 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2100 hostname = new_name;
2101 }
2102
2103 debug!("wake up: announce service {} on {}", fullname, intf.name);
2104 notify_monitors(
2105 &mut self.monitors,
2106 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2107 );
2108
2109 info.set_status(*if_index, ServiceStatus::Announced);
2110 }
2111 }
2112 }
2113 }
2114
2115 if !invalid_intf_addrs.is_empty() {
2116 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2117 }
2118 }
2119
2120 fn unregister_service(
2121 &self,
2122 info: &ServiceInfo,
2123 intf: &MyIntf,
2124 sock: &PktInfoUdpSocket,
2125 ) -> Vec<u8> {
2126 let is_ipv4 = sock.domain() == Domain::IPV4;
2127
2128 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2129 out.add_answer_at_time(
2130 DnsPointer::new(
2131 info.get_type(),
2132 RRType::PTR,
2133 CLASS_IN,
2134 0,
2135 info.get_fullname().to_string(),
2136 ),
2137 0,
2138 );
2139
2140 if let Some(sub) = info.get_subtype() {
2141 trace!("Adding subdomain {}", sub);
2142 out.add_answer_at_time(
2143 DnsPointer::new(
2144 sub,
2145 RRType::PTR,
2146 CLASS_IN,
2147 0,
2148 info.get_fullname().to_string(),
2149 ),
2150 0,
2151 );
2152 }
2153
2154 out.add_answer_at_time(
2155 DnsSrv::new(
2156 info.get_fullname(),
2157 CLASS_IN | CLASS_CACHE_FLUSH,
2158 0,
2159 info.get_priority(),
2160 info.get_weight(),
2161 info.get_port(),
2162 info.get_hostname().to_string(),
2163 ),
2164 0,
2165 );
2166 out.add_answer_at_time(
2167 DnsTxt::new(
2168 info.get_fullname(),
2169 CLASS_IN | CLASS_CACHE_FLUSH,
2170 0,
2171 info.generate_txt(),
2172 ),
2173 0,
2174 );
2175
2176 let if_addrs = if is_ipv4 {
2177 info.get_addrs_on_my_intf_v4(intf)
2178 } else {
2179 info.get_addrs_on_my_intf_v6(intf)
2180 };
2181
2182 if if_addrs.is_empty() {
2183 return vec![];
2184 }
2185
2186 for address in if_addrs {
2187 out.add_answer_at_time(
2188 DnsAddress::new(
2189 info.get_hostname(),
2190 ip_address_rr_type(&address),
2191 CLASS_IN | CLASS_CACHE_FLUSH,
2192 0,
2193 address,
2194 intf.into(),
2195 ),
2196 0,
2197 );
2198 }
2199
2200 let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port) {
2202 Ok(sent_vec) => sent_vec,
2203 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2204 let invalid_intf_addrs = HashSet::from([intf_addr]);
2205 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2206 vec![]
2207 }
2208 };
2209 sent_vec.into_iter().next().unwrap_or_default()
2210 }
2211
2212 fn add_hostname_resolver(
2216 &mut self,
2217 hostname: String,
2218 listener: Sender<HostnameResolutionEvent>,
2219 timeout: Option<u64>,
2220 ) {
2221 let real_timeout = timeout.map(|t| current_time_millis() + t);
2222 self.hostname_resolvers
2223 .insert(hostname.to_lowercase(), (listener, real_timeout));
2224 if let Some(t) = real_timeout {
2225 self.add_timer(t);
2226 }
2227 }
2228
2229 fn send_query(&self, name: &str, qtype: RRType) {
2231 self.send_query_vec(&[(name, qtype)]);
2232 }
2233
2234 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2236 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2237 let now = current_time_millis();
2238
2239 for (name, qtype) in questions {
2240 out.add_question(name, *qtype);
2241
2242 for record in self.cache.get_known_answers(name, *qtype, now) {
2243 trace!("add known answer: {:?}", record.record);
2251 let mut new_record = record.record.clone();
2252 new_record.get_record_mut().update_ttl(now);
2253 out.add_answer_box(new_record);
2254 }
2255 }
2256
2257 let mut invalid_intf_addrs = HashSet::new();
2258 for (_, intf) in self.my_intfs.iter() {
2259 if let Some(sock) = self.ipv4_sock.as_ref() {
2260 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2261 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2262 {
2263 invalid_intf_addrs.insert(intf_addr);
2264 }
2265 }
2266 if let Some(sock) = self.ipv6_sock.as_ref() {
2267 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2268 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2269 {
2270 invalid_intf_addrs.insert(intf_addr);
2271 }
2272 }
2273 }
2274
2275 if !invalid_intf_addrs.is_empty() {
2276 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2277 }
2278 }
2279
2280 fn handle_read(&mut self, event_key: usize) -> bool {
2285 let sock_opt = match event_key {
2286 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2287 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2288 _ => {
2289 debug!("handle_read: unknown token {}", event_key);
2290 return false;
2291 }
2292 };
2293 let Some(sock) = sock_opt.as_mut() else {
2294 debug!("handle_read: socket not available for token {}", event_key);
2295 return false;
2296 };
2297 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2298
2299 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2306 Ok(sz) => sz,
2307 Err(e) => {
2308 if e.kind() != std::io::ErrorKind::WouldBlock {
2309 debug!("listening socket read failed: {}", e);
2310 }
2311 return false;
2312 }
2313 };
2314
2315 let pkt_if_index = pktinfo.if_index as u32;
2317 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2318 debug!(
2319 "handle_read: no interface found for pktinfo if_index: {}",
2320 pktinfo.if_index
2321 );
2322 return true; };
2324
2325 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
2328 Ok(msg) => {
2329 if msg.is_query() {
2330 self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2331 } else if msg.is_response() {
2332 self.handle_response(msg, pkt_if_index);
2333 } else {
2334 debug!("Invalid message: not query and not response");
2335 }
2336 }
2337 Err(e) => debug!("Invalid incoming DNS message: {}", e),
2338 }
2339
2340 true
2341 }
2342
2343 fn query_unresolved(&mut self, instance: &str) -> bool {
2345 if !valid_instance_name(instance) {
2346 trace!("instance name {} not valid", instance);
2347 return false;
2348 }
2349
2350 if let Some(records) = self.cache.get_srv(instance) {
2351 for record in records {
2352 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2353 if self.cache.get_addr(srv.host()).is_none() {
2354 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2355 return true;
2356 }
2357 }
2358 }
2359 } else {
2360 self.send_query(instance, RRType::ANY);
2361 return true;
2362 }
2363
2364 false
2365 }
2366
2367 fn query_cache_for_service(
2370 &mut self,
2371 ty_domain: &str,
2372 sender: &Sender<ServiceEvent>,
2373 now: u64,
2374 ) {
2375 let mut resolved: HashSet<String> = HashSet::new();
2376 let mut unresolved: HashSet<String> = HashSet::new();
2377
2378 if let Some(records) = self.cache.get_ptr(ty_domain) {
2379 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2380 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2381 let mut new_event = None;
2382 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2383 Ok(resolved_service) => {
2384 if resolved_service.is_valid() {
2385 debug!("Resolved service from cache: {}", ptr.alias());
2386 new_event =
2387 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2388 } else {
2389 debug!("Resolved service is not valid: {}", ptr.alias());
2390 }
2391 }
2392 Err(err) => {
2393 debug!("Error while resolving service from cache: {}", err);
2394 continue;
2395 }
2396 }
2397
2398 match sender.send(ServiceEvent::ServiceFound(
2399 ty_domain.to_string(),
2400 ptr.alias().to_string(),
2401 )) {
2402 Ok(()) => debug!("sent service found {}", ptr.alias()),
2403 Err(e) => {
2404 debug!("failed to send service found: {}", e);
2405 continue;
2406 }
2407 }
2408
2409 if let Some(event) = new_event {
2410 resolved.insert(ptr.alias().to_string());
2411 match sender.send(event) {
2412 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2413 Err(e) => debug!("failed to send service resolved: {}", e),
2414 }
2415 } else {
2416 unresolved.insert(ptr.alias().to_string());
2417 }
2418 }
2419 }
2420 }
2421
2422 for instance in resolved.drain() {
2423 self.pending_resolves.remove(&instance);
2424 self.resolved.insert(instance);
2425 }
2426
2427 for instance in unresolved.drain() {
2428 self.add_pending_resolve(instance);
2429 }
2430 }
2431
2432 fn query_cache_for_hostname(
2435 &mut self,
2436 hostname: &str,
2437 sender: Sender<HostnameResolutionEvent>,
2438 ) {
2439 let addresses_map = self.cache.get_addresses_for_host(hostname);
2440 for (name, addresses) in addresses_map {
2441 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2442 Ok(()) => trace!("sent hostname addresses found"),
2443 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2444 }
2445 }
2446 }
2447
2448 fn add_pending_resolve(&mut self, instance: String) {
2449 if !self.pending_resolves.contains(&instance) {
2450 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2451 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2452 self.pending_resolves.insert(instance);
2453 }
2454 }
2455
2456 fn resolve_service_from_cache(
2458 &self,
2459 ty_domain: &str,
2460 fullname: &str,
2461 ) -> Result<ResolvedService> {
2462 let now = current_time_millis();
2463 let mut resolved_service = ResolvedService {
2464 ty_domain: ty_domain.to_string(),
2465 sub_ty_domain: None,
2466 fullname: fullname.to_string(),
2467 host: String::new(),
2468 port: 0,
2469 addresses: HashSet::new(),
2470 txt_properties: TxtProperties::new(),
2471 };
2472
2473 if let Some(subtype) = self.cache.get_subtype(fullname) {
2475 trace!(
2476 "ty_domain: {} found subtype {} for instance: {}",
2477 ty_domain,
2478 subtype,
2479 fullname
2480 );
2481 if resolved_service.sub_ty_domain.is_none() {
2482 resolved_service.sub_ty_domain = Some(subtype.to_string());
2483 }
2484 }
2485
2486 if let Some(records) = self.cache.get_srv(fullname) {
2488 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2489 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2490 resolved_service.host = dns_srv.host().to_string();
2491 resolved_service.port = dns_srv.port();
2492 }
2493 }
2494 }
2495
2496 if let Some(records) = self.cache.get_txt(fullname) {
2498 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2499 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2500 resolved_service.txt_properties = dns_txt.text().into();
2501 }
2502 }
2503 }
2504
2505 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2507 for answer in records.iter() {
2508 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2509 if dns_a.expires_soon(now) {
2510 trace!(
2511 "Addr expired or expires soon: {}",
2512 dns_a.address().to_ip_addr()
2513 );
2514 } else {
2515 resolved_service.addresses.insert(dns_a.address());
2516 }
2517 }
2518 }
2519 }
2520
2521 Ok(resolved_service)
2522 }
2523
2524 fn handle_poller_events(&mut self, events: &mio::Events) {
2525 for ev in events.iter() {
2526 trace!("event received with key {:?}", ev.token());
2527 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2528 self.signal_sock_drain();
2530
2531 if let Err(e) = self.poller.registry().reregister(
2532 &mut self.signal_sock,
2533 ev.token(),
2534 mio::Interest::READABLE,
2535 ) {
2536 debug!("failed to modify poller for signal socket: {}", e);
2537 }
2538 continue; }
2540
2541 while self.handle_read(ev.token().0) {}
2543
2544 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2546 if let Some(sock) = self.ipv4_sock.as_mut() {
2548 if let Err(e) =
2549 self.poller
2550 .registry()
2551 .reregister(sock, ev.token(), mio::Interest::READABLE)
2552 {
2553 debug!("modify poller for IPv4 socket: {}", e);
2554 }
2555 }
2556 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2557 if let Some(sock) = self.ipv6_sock.as_mut() {
2559 if let Err(e) =
2560 self.poller
2561 .registry()
2562 .reregister(sock, ev.token(), mio::Interest::READABLE)
2563 {
2564 debug!("modify poller for IPv6 socket: {}", e);
2565 }
2566 }
2567 }
2568 }
2569 }
2570
2571 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2574 let now = current_time_millis();
2575
2576 let mut record_predicate = |record: &DnsRecordBox| {
2578 if !record.get_record().is_expired(now) {
2579 return true;
2580 }
2581
2582 debug!("record is expired, removing it from cache.");
2583 if self.cache.remove(record) {
2584 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2586 call_service_listener(
2587 &self.service_queriers,
2588 dns_ptr.get_name(),
2589 ServiceEvent::ServiceRemoved(
2590 dns_ptr.get_name().to_string(),
2591 dns_ptr.alias().to_string(),
2592 ),
2593 );
2594 }
2595 }
2596 false
2597 };
2598 msg.answers_mut().retain(&mut record_predicate);
2599 msg.authorities_mut().retain(&mut record_predicate);
2600 msg.additionals_mut().retain(&mut record_predicate);
2601
2602 self.conflict_handler(&msg, if_index);
2604
2605 let mut is_for_us = true; for answer in msg.answers() {
2612 if answer.get_type() == RRType::PTR {
2613 if self.service_queriers.contains_key(answer.get_name()) {
2614 is_for_us = true;
2615 break; } else {
2617 is_for_us = false;
2618 }
2619 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2620 let answer_lowercase = answer.get_name().to_lowercase();
2622 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2623 is_for_us = true;
2624 break; }
2626 }
2627 }
2628
2629 if self.accept_unsolicited {
2631 is_for_us = true;
2632 }
2633
2634 struct InstanceChange {
2636 ty: RRType, name: String, }
2639
2640 let mut changes = Vec::new();
2648 let mut timers = Vec::new();
2649 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2650 return;
2651 };
2652 for record in msg.all_records() {
2653 match self
2654 .cache
2655 .add_or_update(my_intf, record, &mut timers, is_for_us)
2656 {
2657 Some((dns_record, true)) => {
2658 timers.push(dns_record.record.get_record().get_expire_time());
2659 timers.push(dns_record.record.get_record().get_refresh_time());
2660
2661 let ty = dns_record.record.get_type();
2662 let name = dns_record.record.get_name();
2663
2664 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2666 if self.service_queriers.contains_key(name) {
2667 timers.push(dns_record.record.get_record().get_refresh_time());
2668 }
2669
2670 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2672 {
2673 debug!("calling listener with service found: {name}");
2674 call_service_listener(
2675 &self.service_queriers,
2676 name,
2677 ServiceEvent::ServiceFound(
2678 name.to_string(),
2679 dns_ptr.alias().to_string(),
2680 ),
2681 );
2682 changes.push(InstanceChange {
2683 ty,
2684 name: dns_ptr.alias().to_string(),
2685 });
2686 }
2687 } else {
2688 changes.push(InstanceChange {
2689 ty,
2690 name: name.to_string(),
2691 });
2692 }
2693 }
2694 Some((dns_record, false)) => {
2695 timers.push(dns_record.record.get_record().get_expire_time());
2696 timers.push(dns_record.record.get_record().get_refresh_time());
2697 }
2698 _ => {}
2699 }
2700 }
2701
2702 for t in timers {
2704 self.add_timer(t);
2705 }
2706
2707 for change in changes
2709 .iter()
2710 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2711 {
2712 let addr_map = self.cache.get_addresses_for_host(&change.name);
2713 for (name, addresses) in addr_map {
2714 call_hostname_resolution_listener(
2715 &self.hostname_resolvers,
2716 &change.name,
2717 HostnameResolutionEvent::AddressesFound(name, addresses),
2718 )
2719 }
2720 }
2721
2722 let mut updated_instances = HashSet::new();
2724 for update in changes {
2725 match update.ty {
2726 RRType::PTR | RRType::SRV | RRType::TXT => {
2727 updated_instances.insert(update.name);
2728 }
2729 RRType::A | RRType::AAAA => {
2730 let instances = self.cache.get_instances_on_host(&update.name);
2731 updated_instances.extend(instances);
2732 }
2733 _ => {}
2734 }
2735 }
2736
2737 self.resolve_updated_instances(&updated_instances);
2738 }
2739
2740 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2741 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2742 debug!("handle_response: no intf found for index {if_index}");
2743 return;
2744 };
2745
2746 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2747 return;
2748 };
2749
2750 for answer in msg.answers().iter() {
2751 let mut new_records = Vec::new();
2752
2753 let name = answer.get_name();
2754 let Some(probe) = dns_registry.probing.get_mut(name) else {
2755 continue;
2756 };
2757
2758 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2760 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2761 if answer_addr.interface_id.index != if_index {
2762 debug!(
2763 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2764 answer_addr, my_intf.name
2765 );
2766 continue;
2767 }
2768 }
2769
2770 let any_match = probe.records.iter().any(|r| {
2773 r.get_type() == answer.get_type()
2774 && r.get_class() == answer.get_class()
2775 && r.rrdata_match(answer.as_ref())
2776 });
2777 if any_match {
2778 continue; }
2780 }
2781
2782 probe.records.retain(|record| {
2783 if record.get_type() == answer.get_type()
2784 && record.get_class() == answer.get_class()
2785 && !record.rrdata_match(answer.as_ref())
2786 {
2787 debug!(
2788 "found conflict name: '{name}' record: {}: {} PEER: {}",
2789 record.get_type(),
2790 record.rdata_print(),
2791 answer.rdata_print()
2792 );
2793
2794 let mut new_record = record.clone();
2797 let new_name = match record.get_type() {
2798 RRType::A => hostname_change(name),
2799 RRType::AAAA => hostname_change(name),
2800 _ => name_change(name),
2801 };
2802 new_record.get_record_mut().set_new_name(new_name);
2803 new_records.push(new_record);
2804 return false; }
2806
2807 true
2808 });
2809
2810 let create_time = current_time_millis() + fastrand::u64(0..250);
2817
2818 let waiting_services = probe.waiting_services.clone();
2819
2820 for record in new_records {
2821 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2822 self.timers.push(Reverse(create_time));
2823 }
2824
2825 dns_registry.name_changes.insert(
2827 record.get_record().get_original_name().to_string(),
2828 record.get_name().to_string(),
2829 );
2830
2831 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2832 Some(p) => p,
2833 None => {
2834 let new_probe = dns_registry
2835 .probing
2836 .entry(record.get_name().to_string())
2837 .or_insert_with(|| {
2838 debug!("conflict handler: new probe of {}", record.get_name());
2839 Probe::new(create_time)
2840 });
2841 self.timers.push(Reverse(new_probe.next_send));
2842 new_probe
2843 }
2844 };
2845
2846 debug!(
2847 "insert record with new name '{}' {} into probe",
2848 record.get_name(),
2849 record.get_type()
2850 );
2851 new_probe.insert_record(record);
2852
2853 new_probe.waiting_services.extend(waiting_services.clone());
2854 }
2855 }
2856 }
2857
2858 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2865 let mut resolved: HashSet<String> = HashSet::new();
2866 let mut unresolved: HashSet<String> = HashSet::new();
2867 let mut removed_instances = HashMap::new();
2868
2869 let now = current_time_millis();
2870
2871 for (ty_domain, records) in self.cache.all_ptr().iter() {
2872 if !self.service_queriers.contains_key(ty_domain) {
2873 continue;
2875 }
2876
2877 for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2878 let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2879 continue;
2880 };
2881
2882 let instance = dns_ptr.alias();
2883 if !updated_instances.contains(instance) {
2884 continue;
2885 }
2886
2887 let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2888 else {
2889 continue;
2890 };
2891
2892 debug!("resolve_updated_instances: from cache: {instance}");
2893 if resolved_service.is_valid() {
2894 debug!("call queriers to resolve {instance}");
2895 resolved.insert(instance.to_string());
2896 let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2897 call_service_listener(&self.service_queriers, ty_domain, event);
2898 } else {
2899 debug!("Resolved service is not valid: {instance}");
2900 if self.resolved.remove(dns_ptr.alias()) {
2901 removed_instances
2902 .entry(ty_domain.to_string())
2903 .or_insert_with(HashSet::new)
2904 .insert(instance.to_string());
2905 }
2906 unresolved.insert(instance.to_string());
2907 }
2908 }
2909 }
2910
2911 for instance in resolved.drain() {
2912 self.pending_resolves.remove(&instance);
2913 self.resolved.insert(instance);
2914 }
2915
2916 for instance in unresolved.drain() {
2917 self.add_pending_resolve(instance);
2918 }
2919
2920 if !removed_instances.is_empty() {
2921 debug!(
2922 "resolve_updated_instances: removed {}",
2923 &removed_instances.len()
2924 );
2925 self.notify_service_removal(removed_instances);
2926 }
2927 }
2928
2929 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2931 let sock_opt = if is_ipv4 {
2932 &self.ipv4_sock
2933 } else {
2934 &self.ipv6_sock
2935 };
2936 let Some(sock) = sock_opt.as_ref() else {
2937 debug!("handle_query: socket not available for intf {}", if_index);
2938 return;
2939 };
2940 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2941
2942 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2945
2946 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2947 debug!("missing dns registry for intf {}", if_index);
2948 return;
2949 };
2950
2951 let Some(intf) = self.my_intfs.get(&if_index) else {
2952 debug!("handle_query: no intf found for index {if_index}");
2953 return;
2954 };
2955
2956 for question in msg.questions().iter() {
2957 let qtype = question.entry_type();
2958
2959 if qtype == RRType::PTR {
2960 for service in self.my_services.values() {
2961 if service.get_status(if_index) != ServiceStatus::Announced {
2962 continue;
2963 }
2964
2965 if question.entry_name() == service.get_type()
2966 || service
2967 .get_subtype()
2968 .as_ref()
2969 .is_some_and(|v| v == question.entry_name())
2970 {
2971 add_answer_with_additionals(
2972 &mut out,
2973 &msg,
2974 service,
2975 intf,
2976 dns_registry,
2977 is_ipv4,
2978 );
2979 } else if question.entry_name() == META_QUERY {
2980 let ptr_added = out.add_answer(
2981 &msg,
2982 DnsPointer::new(
2983 question.entry_name(),
2984 RRType::PTR,
2985 CLASS_IN,
2986 service.get_other_ttl(),
2987 service.get_type().to_string(),
2988 ),
2989 );
2990 if !ptr_added {
2991 trace!("answer was not added for meta-query {:?}", &question);
2992 }
2993 }
2994 }
2995 } else {
2996 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2998 let probe_name = question.entry_name();
2999
3000 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
3001 let now = current_time_millis();
3002
3003 if probe.start_time < now {
3007 let incoming_records: Vec<_> = msg
3008 .authorities()
3009 .iter()
3010 .filter(|r| r.get_name() == probe_name)
3011 .collect();
3012
3013 probe.tiebreaking(&incoming_records, now, probe_name);
3014 }
3015 }
3016 }
3017
3018 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3019 for service in self.my_services.values() {
3020 if service.get_status(if_index) != ServiceStatus::Announced {
3021 continue;
3022 }
3023
3024 let service_hostname =
3025 match dns_registry.name_changes.get(service.get_hostname()) {
3026 Some(new_name) => new_name,
3027 None => service.get_hostname(),
3028 };
3029
3030 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3031 let intf_addrs = if is_ipv4 {
3032 service.get_addrs_on_my_intf_v4(intf)
3033 } else {
3034 service.get_addrs_on_my_intf_v6(intf)
3035 };
3036 if intf_addrs.is_empty()
3037 && (qtype == RRType::A || qtype == RRType::AAAA)
3038 {
3039 let t = match qtype {
3040 RRType::A => "TYPE_A",
3041 RRType::AAAA => "TYPE_AAAA",
3042 _ => "invalid_type",
3043 };
3044 trace!(
3045 "Cannot find valid addrs for {} response on intf {:?}",
3046 t,
3047 &intf
3048 );
3049 return;
3050 }
3051 for address in intf_addrs {
3052 out.add_answer(
3053 &msg,
3054 DnsAddress::new(
3055 service_hostname,
3056 ip_address_rr_type(&address),
3057 CLASS_IN | CLASS_CACHE_FLUSH,
3058 service.get_host_ttl(),
3059 address,
3060 intf.into(),
3061 ),
3062 );
3063 }
3064 }
3065 }
3066 }
3067
3068 let query_name = question.entry_name().to_lowercase();
3069 let service_opt = self
3070 .my_services
3071 .iter()
3072 .find(|(k, _v)| {
3073 let service_name = match dns_registry.name_changes.get(k.as_str()) {
3074 Some(new_name) => new_name,
3075 None => k,
3076 };
3077 service_name == &query_name
3078 })
3079 .map(|(_, v)| v);
3080
3081 let Some(service) = service_opt else {
3082 continue;
3083 };
3084
3085 if service.get_status(if_index) != ServiceStatus::Announced {
3086 continue;
3087 }
3088
3089 let intf_addrs = if is_ipv4 {
3090 service.get_addrs_on_my_intf_v4(intf)
3091 } else {
3092 service.get_addrs_on_my_intf_v6(intf)
3093 };
3094 if intf_addrs.is_empty() {
3095 debug!(
3096 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3097 &intf
3098 );
3099 continue;
3100 }
3101
3102 add_answer_of_service(
3103 &mut out,
3104 &msg,
3105 question.entry_name(),
3106 service,
3107 qtype,
3108 intf_addrs,
3109 );
3110 }
3111 }
3112
3113 if out.answers_count() > 0 {
3114 debug!("sending response on intf {}", &intf.name);
3115 out.set_id(msg.id());
3116 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3117 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
3118 {
3119 let invalid_intf_addr = HashSet::from([intf_addr]);
3120 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3121 }
3122
3123 let if_name = intf.name.clone();
3124
3125 self.increase_counter(Counter::Respond, 1);
3126 self.notify_monitors(DaemonEvent::Respond(if_name));
3127 }
3128
3129 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3130 }
3131
3132 fn increase_counter(&mut self, counter: Counter, count: i64) {
3134 let key = counter.to_string();
3135 match self.counters.get_mut(&key) {
3136 Some(v) => *v += count,
3137 None => {
3138 self.counters.insert(key, count);
3139 }
3140 }
3141 }
3142
3143 fn set_counter(&mut self, counter: Counter, count: i64) {
3145 let key = counter.to_string();
3146 self.counters.insert(key, count);
3147 }
3148
3149 fn signal_sock_drain(&self) {
3150 let mut signal_buf = [0; 1024];
3151
3152 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3154 trace!(
3155 "signal socket recvd: {}",
3156 String::from_utf8_lossy(&signal_buf[0..sz])
3157 );
3158 }
3159 }
3160
3161 fn add_retransmission(&mut self, next_time: u64, command: Command) {
3162 self.retransmissions.push(ReRun { next_time, command });
3163 self.add_timer(next_time);
3164 }
3165
3166 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3169 for (ty_domain, sender) in self.service_queriers.iter() {
3170 if let Some(instances) = expired.get(ty_domain) {
3171 for instance_name in instances {
3172 let event = ServiceEvent::ServiceRemoved(
3173 ty_domain.to_string(),
3174 instance_name.to_string(),
3175 );
3176 match sender.send(event) {
3177 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3178 Err(e) => debug!("Failed to send event: {}", e),
3179 }
3180 }
3181 }
3182 }
3183 }
3184
3185 fn exec_command(&mut self, command: Command, repeating: bool) {
3189 trace!("exec_command: {:?} repeating: {}", &command, repeating);
3190 match command {
3191 Command::Browse(ty, next_delay, cache_only, listener) => {
3192 self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3193 }
3194
3195 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3196 self.exec_command_resolve_hostname(
3197 repeating, hostname, next_delay, listener, timeout,
3198 );
3199 }
3200
3201 Command::Register(service_info) => {
3202 self.register_service(*service_info);
3203 self.increase_counter(Counter::Register, 1);
3204 }
3205
3206 Command::RegisterResend(fullname, intf) => {
3207 trace!("register-resend service: {fullname} on {}", &intf);
3208 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3209 self.exec_command_register_resend(fullname, intf)
3210 {
3211 let invalid_intf_addr = HashSet::from([intf_addr]);
3212 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3213 }
3214 }
3215
3216 Command::Unregister(fullname, resp_s) => {
3217 trace!("unregister service {} repeat {}", &fullname, &repeating);
3218 self.exec_command_unregister(repeating, fullname, resp_s);
3219 }
3220
3221 Command::UnregisterResend(packet, if_index, is_ipv4) => {
3222 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3223 }
3224
3225 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3226
3227 Command::StopResolveHostname(hostname) => {
3228 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3229 }
3230
3231 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3232
3233 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3234
3235 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3236 Ok(()) => trace!("Sent status to the client"),
3237 Err(e) => debug!("Failed to send status: {}", e),
3238 },
3239
3240 Command::Monitor(resp_s) => {
3241 self.monitors.push(resp_s);
3242 }
3243
3244 Command::SetOption(daemon_opt) => {
3245 self.process_set_option(daemon_opt);
3246 }
3247
3248 Command::GetOption(resp_s) => {
3249 let val = DaemonOptionVal {
3250 _service_name_len_max: self.service_name_len_max,
3251 ip_check_interval: self.ip_check_interval,
3252 };
3253 if let Err(e) = resp_s.send(val) {
3254 debug!("Failed to send options: {}", e);
3255 }
3256 }
3257
3258 Command::Verify(instance_fullname, timeout) => {
3259 self.exec_command_verify(instance_fullname, timeout, repeating);
3260 }
3261
3262 Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3263 for intf_addr in invalid_intf_addrs {
3264 self.del_interface_addr(&intf_addr);
3265 }
3266
3267 self.check_ip_changes();
3268 }
3269
3270 _ => {
3271 debug!("unexpected command: {:?}", &command);
3272 }
3273 }
3274 }
3275
3276 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3277 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3278 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3279 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3280 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3281 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3282 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3283 self.set_counter(Counter::Timer, self.timers.len() as i64);
3284
3285 let dns_registry_probe_count: usize = self
3286 .dns_registry_map
3287 .values()
3288 .map(|r| r.probing.len())
3289 .sum();
3290 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3291
3292 let dns_registry_active_count: usize = self
3293 .dns_registry_map
3294 .values()
3295 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3296 .sum();
3297 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3298
3299 let dns_registry_timer_count: usize = self
3300 .dns_registry_map
3301 .values()
3302 .map(|r| r.new_timers.len())
3303 .sum();
3304 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3305
3306 let dns_registry_name_change_count: usize = self
3307 .dns_registry_map
3308 .values()
3309 .map(|r| r.name_changes.len())
3310 .sum();
3311 self.set_counter(
3312 Counter::DnsRegistryNameChange,
3313 dns_registry_name_change_count as i64,
3314 );
3315
3316 if let Err(e) = resp_s.send(self.counters.clone()) {
3318 debug!("Failed to send metrics: {}", e);
3319 }
3320 }
3321
3322 fn exec_command_browse(
3323 &mut self,
3324 repeating: bool,
3325 ty: String,
3326 next_delay: u32,
3327 cache_only: bool,
3328 listener: Sender<ServiceEvent>,
3329 ) {
3330 let pretty_addrs: Vec<String> = self
3331 .my_intfs
3332 .iter()
3333 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3334 .collect();
3335
3336 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3337 "{ty} on {} interfaces [{}]",
3338 pretty_addrs.len(),
3339 pretty_addrs.join(", ")
3340 ))) {
3341 debug!(
3342 "Failed to send SearchStarted({})(repeating:{}): {}",
3343 &ty, repeating, e
3344 );
3345 return;
3346 }
3347
3348 let now = current_time_millis();
3349 if !repeating {
3350 self.service_queriers.insert(ty.clone(), listener.clone());
3354
3355 self.query_cache_for_service(&ty, &listener, now);
3357 }
3358
3359 if cache_only {
3360 match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3362 Ok(()) => debug!("SearchStopped sent for {}", &ty),
3363 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3364 }
3365 return;
3366 }
3367
3368 self.send_query(&ty, RRType::PTR);
3369 self.increase_counter(Counter::Browse, 1);
3370
3371 let next_time = now + (next_delay * 1000) as u64;
3372 let max_delay = 60 * 60;
3373 let delay = cmp::min(next_delay * 2, max_delay);
3374 self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3375 }
3376
3377 fn exec_command_resolve_hostname(
3378 &mut self,
3379 repeating: bool,
3380 hostname: String,
3381 next_delay: u32,
3382 listener: Sender<HostnameResolutionEvent>,
3383 timeout: Option<u64>,
3384 ) {
3385 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3386 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3387 "{} on addrs {:?}",
3388 &hostname, &addr_list
3389 ))) {
3390 debug!(
3391 "Failed to send ResolveStarted({})(repeating:{}): {}",
3392 &hostname, repeating, e
3393 );
3394 return;
3395 }
3396 if !repeating {
3397 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3398 self.query_cache_for_hostname(&hostname, listener.clone());
3400 }
3401
3402 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3403 self.increase_counter(Counter::ResolveHostname, 1);
3404
3405 let now = current_time_millis();
3406 let next_time = now + u64::from(next_delay) * 1000;
3407 let max_delay = 60 * 60;
3408 let delay = cmp::min(next_delay * 2, max_delay);
3409
3410 if self
3412 .hostname_resolvers
3413 .get(&hostname)
3414 .and_then(|(_sender, timeout)| *timeout)
3415 .map(|timeout| next_time < timeout)
3416 .unwrap_or(true)
3417 {
3418 self.add_retransmission(
3419 next_time,
3420 Command::ResolveHostname(hostname, delay, listener, None),
3421 );
3422 }
3423 }
3424
3425 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3426 let pending_query = self.query_unresolved(&instance);
3427 let max_try = 3;
3428 if pending_query && try_count < max_try {
3429 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3432 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3433 }
3434 }
3435
3436 fn exec_command_unregister(
3437 &mut self,
3438 repeating: bool,
3439 fullname: String,
3440 resp_s: Sender<UnregisterStatus>,
3441 ) {
3442 let response = match self.my_services.remove_entry(&fullname) {
3443 None => {
3444 debug!("unregister: cannot find such service {}", &fullname);
3445 UnregisterStatus::NotFound
3446 }
3447 Some((_k, info)) => {
3448 let mut timers = Vec::new();
3449
3450 for (if_index, intf) in self.my_intfs.iter() {
3451 if let Some(sock) = self.ipv4_sock.as_ref() {
3452 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3453 if !repeating && !packet.is_empty() {
3455 let next_time = current_time_millis() + 120;
3456 self.retransmissions.push(ReRun {
3457 next_time,
3458 command: Command::UnregisterResend(packet, *if_index, true),
3459 });
3460 timers.push(next_time);
3461 }
3462 }
3463
3464 if let Some(sock) = self.ipv6_sock.as_ref() {
3466 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3467 if !repeating && !packet.is_empty() {
3468 let next_time = current_time_millis() + 120;
3469 self.retransmissions.push(ReRun {
3470 next_time,
3471 command: Command::UnregisterResend(packet, *if_index, false),
3472 });
3473 timers.push(next_time);
3474 }
3475 }
3476 }
3477
3478 for t in timers {
3479 self.add_timer(t);
3480 }
3481
3482 self.increase_counter(Counter::Unregister, 1);
3483 UnregisterStatus::OK
3484 }
3485 };
3486 if let Err(e) = resp_s.send(response) {
3487 debug!("unregister: failed to send response: {}", e);
3488 }
3489 }
3490
3491 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3492 let Some(intf) = self.my_intfs.get(&if_index) else {
3493 return;
3494 };
3495 let sock_opt = if is_ipv4 {
3496 &self.ipv4_sock
3497 } else {
3498 &self.ipv6_sock
3499 };
3500 let Some(sock) = sock_opt else {
3501 return;
3502 };
3503
3504 let if_addr = if is_ipv4 {
3505 match intf.next_ifaddr_v4() {
3506 Some(addr) => addr,
3507 None => return,
3508 }
3509 } else {
3510 match intf.next_ifaddr_v6() {
3511 Some(addr) => addr,
3512 None => return,
3513 }
3514 };
3515
3516 debug!("UnregisterResend from {:?}", if_addr);
3517 multicast_on_intf(
3518 &packet[..],
3519 &intf.name,
3520 intf.index,
3521 if_addr,
3522 &sock.pktinfo,
3523 self.port,
3524 );
3525
3526 self.increase_counter(Counter::UnregisterResend, 1);
3527 }
3528
3529 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3530 match self.service_queriers.remove_entry(&ty_domain) {
3531 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3532 Some((ty, sender)) => {
3533 trace!("StopBrowse: removed queryer for {}", &ty);
3535 let mut i = 0;
3536 while i < self.retransmissions.len() {
3537 if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3538 if t == &ty {
3539 self.retransmissions.remove(i);
3540 trace!("StopBrowse: removed retransmission for {}", &ty);
3541 continue;
3542 }
3543 }
3544 i += 1;
3545 }
3546
3547 self.cache.remove_service_type(&ty_domain);
3549
3550 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3552 Ok(()) => trace!("Sent SearchStopped to the listener"),
3553 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3554 }
3555 }
3556 }
3557 }
3558
3559 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3560 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3561 trace!("StopResolve: removed queryer for {}", &host);
3563 let mut i = 0;
3564 while i < self.retransmissions.len() {
3565 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3566 if t == &host {
3567 self.retransmissions.remove(i);
3568 trace!("StopResolve: removed retransmission for {}", &host);
3569 continue;
3570 }
3571 }
3572 i += 1;
3573 }
3574
3575 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3577 Ok(()) => trace!("Sent SearchStopped to the listener"),
3578 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3579 }
3580 }
3581 }
3582
3583 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3584 let Some(info) = self.my_services.get_mut(&fullname) else {
3585 trace!("announce: cannot find such service {}", &fullname);
3586 return Ok(());
3587 };
3588
3589 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3590 return Ok(());
3591 };
3592
3593 let Some(intf) = self.my_intfs.get(&if_index) else {
3594 return Ok(());
3595 };
3596
3597 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3598 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3599 } else {
3600 false
3601 };
3602 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3603 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3604 } else {
3605 false
3606 };
3607
3608 if announced_v4 || announced_v6 {
3609 let mut hostname = info.get_hostname();
3610 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3611 hostname = new_name;
3612 }
3613 let service_name = match dns_registry.name_changes.get(&fullname) {
3614 Some(new_name) => new_name.to_string(),
3615 None => fullname,
3616 };
3617
3618 debug!("resend: announce service {service_name} on {}", intf.name);
3619
3620 notify_monitors(
3621 &mut self.monitors,
3622 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3623 );
3624 info.set_status(if_index, ServiceStatus::Announced);
3625 } else {
3626 debug!("register-resend should not fail");
3627 }
3628
3629 self.increase_counter(Counter::RegisterResend, 1);
3630 Ok(())
3631 }
3632
3633 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3634 let now = current_time_millis();
3644 let expire_at = if repeating {
3645 None
3646 } else {
3647 Some(now + timeout.as_millis() as u64)
3648 };
3649
3650 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3652
3653 if !record_vec.is_empty() {
3654 let query_vec: Vec<(&str, RRType)> = record_vec
3655 .iter()
3656 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3657 .collect();
3658 self.send_query_vec(&query_vec);
3659
3660 if let Some(new_expire) = expire_at {
3661 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3665 }
3666 }
3667 }
3668
3669 fn refresh_active_services(&mut self) {
3671 let mut query_ptr_count = 0;
3672 let mut query_srv_count = 0;
3673 let mut new_timers = HashSet::new();
3674 let mut query_addr_count = 0;
3675
3676 for (ty_domain, _sender) in self.service_queriers.iter() {
3677 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3678 if !refreshed_timers.is_empty() {
3679 trace!("sending refresh query for PTR: {}", ty_domain);
3680 self.send_query(ty_domain, RRType::PTR);
3681 query_ptr_count += 1;
3682 new_timers.extend(refreshed_timers);
3683 }
3684
3685 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3686 for (instance, types) in instances {
3687 trace!("sending refresh query for: {}", &instance);
3688 let query_vec = types
3689 .into_iter()
3690 .map(|ty| (instance.as_str(), ty))
3691 .collect::<Vec<_>>();
3692 self.send_query_vec(&query_vec);
3693 query_srv_count += 1;
3694 }
3695 new_timers.extend(timers);
3696 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3697 for hostname in hostnames.iter() {
3698 trace!("sending refresh queries for A and AAAA: {}", hostname);
3699 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3700 query_addr_count += 2;
3701 }
3702 new_timers.extend(timers);
3703 }
3704
3705 for timer in new_timers {
3706 self.add_timer(timer);
3707 }
3708
3709 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3710 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3711 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3712 }
3713}
3714
3715fn add_answer_of_service(
3717 out: &mut DnsOutgoing,
3718 msg: &DnsIncoming,
3719 entry_name: &str,
3720 service: &ServiceInfo,
3721 qtype: RRType,
3722 intf_addrs: Vec<IpAddr>,
3723) {
3724 if qtype == RRType::SRV || qtype == RRType::ANY {
3725 out.add_answer(
3726 msg,
3727 DnsSrv::new(
3728 entry_name,
3729 CLASS_IN | CLASS_CACHE_FLUSH,
3730 service.get_host_ttl(),
3731 service.get_priority(),
3732 service.get_weight(),
3733 service.get_port(),
3734 service.get_hostname().to_string(),
3735 ),
3736 );
3737 }
3738
3739 if qtype == RRType::TXT || qtype == RRType::ANY {
3740 out.add_answer(
3741 msg,
3742 DnsTxt::new(
3743 entry_name,
3744 CLASS_IN | CLASS_CACHE_FLUSH,
3745 service.get_other_ttl(),
3746 service.generate_txt(),
3747 ),
3748 );
3749 }
3750
3751 if qtype == RRType::SRV {
3752 for address in intf_addrs {
3753 out.add_additional_answer(DnsAddress::new(
3754 service.get_hostname(),
3755 ip_address_rr_type(&address),
3756 CLASS_IN | CLASS_CACHE_FLUSH,
3757 service.get_host_ttl(),
3758 address,
3759 InterfaceId::default(),
3760 ));
3761 }
3762 }
3763}
3764
3765#[derive(Clone, Debug)]
3768#[non_exhaustive]
3769pub enum ServiceEvent {
3770 SearchStarted(String),
3772
3773 ServiceFound(String, String),
3775
3776 ServiceResolved(Box<ResolvedService>),
3778
3779 ServiceRemoved(String, String),
3781
3782 SearchStopped(String),
3784}
3785
3786#[derive(Clone, Debug)]
3789#[non_exhaustive]
3790pub enum HostnameResolutionEvent {
3791 SearchStarted(String),
3793 AddressesFound(String, HashSet<ScopedIp>),
3795 AddressesRemoved(String, HashSet<ScopedIp>),
3797 SearchTimeout(String),
3799 SearchStopped(String),
3801}
3802
3803#[derive(Clone, Debug)]
3806#[non_exhaustive]
3807pub enum DaemonEvent {
3808 Announce(String, String),
3810
3811 Error(Error),
3813
3814 IpAdd(IpAddr),
3816
3817 IpDel(IpAddr),
3819
3820 NameChange(DnsNameChange),
3823
3824 Respond(String),
3826}
3827
3828#[derive(Clone, Debug)]
3831pub struct DnsNameChange {
3832 pub original: String,
3834
3835 pub new_name: String,
3845
3846 pub rr_type: RRType,
3848
3849 pub intf_name: String,
3851}
3852
3853#[derive(Debug)]
3855enum Command {
3856 Browse(String, u32, bool, Sender<ServiceEvent>),
3858
3859 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(Box<ServiceInfo>),
3864
3865 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3886
3887 GetStatus(Sender<DaemonStatus>),
3889
3890 Monitor(Sender<DaemonEvent>),
3892
3893 SetOption(DaemonOption),
3894
3895 GetOption(Sender<DaemonOptionVal>),
3896
3897 Verify(String, Duration),
3902
3903 InvalidIntfAddrs(HashSet<Interface>),
3905
3906 Exit(Sender<DaemonStatus>),
3907}
3908
3909impl fmt::Display for Command {
3910 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3911 match self {
3912 Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3913 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3914 Self::Exit(_) => write!(f, "Command Exit"),
3915 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3916 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3917 Self::Monitor(_) => write!(f, "Command Monitor"),
3918 Self::Register(_) => write!(f, "Command Register"),
3919 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3920 Self::SetOption(_) => write!(f, "Command SetOption"),
3921 Self::GetOption(_) => write!(f, "Command GetOption"),
3922 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3923 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3924 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3925 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3926 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3927 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3928 Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
3929 }
3930 }
3931}
3932
3933struct DaemonOptionVal {
3934 _service_name_len_max: u8,
3935 ip_check_interval: u64,
3936}
3937
3938#[derive(Debug)]
3939enum DaemonOption {
3940 ServiceNameLenMax(u8),
3941 IpCheckInterval(u64),
3942 EnableInterface(Vec<IfKind>),
3943 DisableInterface(Vec<IfKind>),
3944 MulticastLoopV4(bool),
3945 MulticastLoopV6(bool),
3946 AcceptUnsolicited(bool),
3947 IncludeAppleP2P(bool),
3948 #[cfg(test)]
3949 TestDownInterface(String),
3950 #[cfg(test)]
3951 TestUpInterface(String),
3952}
3953
3954const DOMAIN_LEN: usize = "._tcp.local.".len();
3956
3957fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3959 if ty_domain.len() <= DOMAIN_LEN + 1 {
3960 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3962 }
3963
3964 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3966 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3967 }
3968 Ok(())
3969}
3970
3971fn check_domain_suffix(name: &str) -> Result<()> {
3973 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3974 return Err(e_fmt!(
3975 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3976 name
3977 ));
3978 }
3979
3980 Ok(())
3981}
3982
3983fn check_service_name(fullname: &str) -> Result<()> {
3991 check_domain_suffix(fullname)?;
3992
3993 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3994 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3995
3996 if &name[0..1] != "_" {
3997 return Err(e_fmt!("Service name must start with '_'"));
3998 }
3999
4000 let name = &name[1..];
4001
4002 if name.contains("--") {
4003 return Err(e_fmt!("Service name must not contain '--'"));
4004 }
4005
4006 if name.starts_with('-') || name.ends_with('-') {
4007 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4008 }
4009
4010 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4011 if ascii_count < 1 {
4012 return Err(e_fmt!(
4013 "Service name must contain at least one letter (eg: 'A-Za-z')"
4014 ));
4015 }
4016
4017 Ok(())
4018}
4019
4020fn check_hostname(hostname: &str) -> Result<()> {
4022 if !hostname.ends_with(".local.") {
4023 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4024 }
4025
4026 if hostname == ".local." {
4027 return Err(e_fmt!(
4028 "The part of the hostname before '.local.' cannot be empty"
4029 ));
4030 }
4031
4032 if hostname.len() > 255 {
4033 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4034 }
4035
4036 Ok(())
4037}
4038
4039fn call_service_listener(
4040 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4041 ty_domain: &str,
4042 event: ServiceEvent,
4043) {
4044 if let Some(listener) = listeners_map.get(ty_domain) {
4045 match listener.send(event) {
4046 Ok(()) => trace!("Sent event to listener successfully"),
4047 Err(e) => debug!("Failed to send event: {}", e),
4048 }
4049 }
4050}
4051
4052fn call_hostname_resolution_listener(
4053 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4054 hostname: &str,
4055 event: HostnameResolutionEvent,
4056) {
4057 let hostname_lower = hostname.to_lowercase();
4058 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4059 match listener.send(event) {
4060 Ok(()) => trace!("Sent event to listener successfully"),
4061 Err(e) => debug!("Failed to send event: {}", e),
4062 }
4063 }
4064}
4065
4066fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4070 my_ip_interfaces_inner(with_loopback, false)
4071}
4072
4073fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4074 if_addrs::get_if_addrs()
4075 .unwrap_or_default()
4076 .into_iter()
4077 .filter(|i| {
4078 i.is_oper_up()
4079 && !i.is_p2p()
4080 && (!i.is_loopback() || with_loopback)
4081 && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4082 })
4083 .collect()
4084}
4085
4086fn is_apple_p2p_by_name(name: &str) -> bool {
4089 let p2p_prefixes = ["awdl", "llw"];
4090 p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4091}
4092
4093fn send_dns_outgoing(
4096 out: &DnsOutgoing,
4097 my_intf: &MyIntf,
4098 sock: &PktInfoUdpSocket,
4099 port: u16,
4100) -> MyResult<Vec<Vec<u8>>> {
4101 let if_name = &my_intf.name;
4102
4103 let if_addr = if sock.domain() == Domain::IPV4 {
4104 match my_intf.next_ifaddr_v4() {
4105 Some(addr) => addr,
4106 None => return Ok(vec![]),
4107 }
4108 } else {
4109 match my_intf.next_ifaddr_v6() {
4110 Some(addr) => addr,
4111 None => return Ok(vec![]),
4112 }
4113 };
4114
4115 send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock, port)
4116}
4117
4118fn send_dns_outgoing_impl(
4120 out: &DnsOutgoing,
4121 if_name: &str,
4122 if_index: u32,
4123 if_addr: &IfAddr,
4124 sock: &PktInfoUdpSocket,
4125 port: u16,
4126) -> MyResult<Vec<Vec<u8>>> {
4127 let qtype = if out.is_query() {
4128 "query"
4129 } else {
4130 if out.answers_count() == 0 && out.additionals().is_empty() {
4131 return Ok(vec![]); }
4133 "response"
4134 };
4135 trace!(
4136 "send {}: {} questions {} answers {} authorities {} additional",
4137 qtype,
4138 out.questions().len(),
4139 out.answers_count(),
4140 out.authorities().len(),
4141 out.additionals().len()
4142 );
4143
4144 match if_addr.ip() {
4145 IpAddr::V4(ipv4) => {
4146 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4147 debug!(
4148 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4149 ipv4, e
4150 );
4151 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4153 let intf_addr = Interface {
4154 name: if_name.to_string(),
4155 addr: if_addr.clone(),
4156 index: Some(if_index),
4157 oper_status: if_addrs::IfOperStatus::Down,
4158 is_p2p: false,
4159 #[cfg(windows)]
4160 adapter_name: String::new(),
4161 };
4162 return Err(InternalError::IntfAddrInvalid(intf_addr));
4163 }
4164 return Ok(vec![]); }
4166 }
4167 IpAddr::V6(ipv6) => {
4168 if let Err(e) = sock.set_multicast_if_v6(if_index) {
4169 debug!(
4170 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4171 ipv6, e
4172 );
4173 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4175 let intf_addr = Interface {
4176 name: if_name.to_string(),
4177 addr: if_addr.clone(),
4178 index: Some(if_index),
4179 oper_status: if_addrs::IfOperStatus::Down,
4180 is_p2p: false,
4181 #[cfg(windows)]
4182 adapter_name: String::new(),
4183 };
4184 return Err(InternalError::IntfAddrInvalid(intf_addr));
4185 }
4186 return Ok(vec![]); }
4188 }
4189 }
4190
4191 let packet_list = out.to_data_on_wire();
4192 for packet in packet_list.iter() {
4193 multicast_on_intf(packet, if_name, if_index, if_addr, sock, port);
4194 }
4195 Ok(packet_list)
4196}
4197
4198fn multicast_on_intf(
4200 packet: &[u8],
4201 if_name: &str,
4202 if_index: u32,
4203 if_addr: &IfAddr,
4204 socket: &PktInfoUdpSocket,
4205 port: u16,
4206) {
4207 if packet.len() > MAX_MSG_ABSOLUTE {
4208 debug!("Drop over-sized packet ({})", packet.len());
4209 return;
4210 }
4211
4212 let addr: SocketAddr = match if_addr {
4213 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4214 if_addrs::IfAddr::V6(_) => {
4215 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4216 sock.set_scope_id(if_index); sock.into()
4218 }
4219 };
4220
4221 let sock_addr = addr.into();
4223 match socket.send_to(packet, &sock_addr) {
4224 Ok(sz) => trace!(
4225 "sent out {} bytes on interface {} (idx {}) addr {}",
4226 sz,
4227 if_name,
4228 if_index,
4229 if_addr.ip()
4230 ),
4231 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4232 }
4233}
4234
4235fn valid_instance_name(name: &str) -> bool {
4239 name.split('.').count() >= 5
4240}
4241
4242fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4243 monitors.retain(|sender| {
4244 if let Err(e) = sender.try_send(event.clone()) {
4245 debug!("notify_monitors: try_send: {}", &e);
4246 if matches!(e, TrySendError::Disconnected(_)) {
4247 return false; }
4249 }
4250 true
4251 });
4252}
4253
4254fn prepare_announce(
4257 info: &ServiceInfo,
4258 intf: &MyIntf,
4259 dns_registry: &mut DnsRegistry,
4260 is_ipv4: bool,
4261) -> Option<DnsOutgoing> {
4262 let intf_addrs = if is_ipv4 {
4263 info.get_addrs_on_my_intf_v4(intf)
4264 } else {
4265 info.get_addrs_on_my_intf_v6(intf)
4266 };
4267
4268 if intf_addrs.is_empty() {
4269 debug!(
4270 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4271 &intf.name
4272 );
4273 return None;
4274 }
4275
4276 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
4278 Some(new_name) => new_name,
4279 None => info.get_fullname(),
4280 };
4281
4282 debug!(
4283 "prepare to announce service {service_fullname} on {:?}",
4284 &intf_addrs
4285 );
4286
4287 let mut probing_count = 0;
4288 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4289 let create_time = current_time_millis() + fastrand::u64(0..250);
4290
4291 out.add_answer_at_time(
4292 DnsPointer::new(
4293 info.get_type(),
4294 RRType::PTR,
4295 CLASS_IN,
4296 info.get_other_ttl(),
4297 service_fullname.to_string(),
4298 ),
4299 0,
4300 );
4301
4302 if let Some(sub) = info.get_subtype() {
4303 trace!("Adding subdomain {}", sub);
4304 out.add_answer_at_time(
4305 DnsPointer::new(
4306 sub,
4307 RRType::PTR,
4308 CLASS_IN,
4309 info.get_other_ttl(),
4310 service_fullname.to_string(),
4311 ),
4312 0,
4313 );
4314 }
4315
4316 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
4318 Some(new_name) => new_name.to_string(),
4319 None => info.get_hostname().to_string(),
4320 };
4321
4322 let mut srv = DnsSrv::new(
4323 info.get_fullname(),
4324 CLASS_IN | CLASS_CACHE_FLUSH,
4325 info.get_host_ttl(),
4326 info.get_priority(),
4327 info.get_weight(),
4328 info.get_port(),
4329 hostname,
4330 );
4331
4332 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4333 srv.get_record_mut().set_new_name(new_name.to_string());
4334 }
4335
4336 if !info.requires_probe()
4337 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4338 {
4339 out.add_answer_at_time(srv, 0);
4340 } else {
4341 probing_count += 1;
4342 }
4343
4344 let mut txt = DnsTxt::new(
4347 info.get_fullname(),
4348 CLASS_IN | CLASS_CACHE_FLUSH,
4349 info.get_other_ttl(),
4350 info.generate_txt(),
4351 );
4352
4353 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4354 txt.get_record_mut().set_new_name(new_name.to_string());
4355 }
4356
4357 if !info.requires_probe()
4358 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4359 {
4360 out.add_answer_at_time(txt, 0);
4361 } else {
4362 probing_count += 1;
4363 }
4364
4365 let hostname = info.get_hostname();
4368 for address in intf_addrs {
4369 let mut dns_addr = DnsAddress::new(
4370 hostname,
4371 ip_address_rr_type(&address),
4372 CLASS_IN | CLASS_CACHE_FLUSH,
4373 info.get_host_ttl(),
4374 address,
4375 intf.into(),
4376 );
4377
4378 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4379 dns_addr.get_record_mut().set_new_name(new_name.to_string());
4380 }
4381
4382 if !info.requires_probe()
4383 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4384 {
4385 out.add_answer_at_time(dns_addr, 0);
4386 } else {
4387 probing_count += 1;
4388 }
4389 }
4390
4391 if probing_count > 0 {
4392 return None;
4393 }
4394
4395 Some(out)
4396}
4397
4398fn announce_service_on_intf(
4401 dns_registry: &mut DnsRegistry,
4402 info: &ServiceInfo,
4403 intf: &MyIntf,
4404 sock: &PktInfoUdpSocket,
4405 port: u16,
4406) -> MyResult<bool> {
4407 let is_ipv4 = sock.domain() == Domain::IPV4;
4408 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4409 let _ = send_dns_outgoing(&out, intf, sock, port)?;
4410 return Ok(true);
4411 }
4412
4413 Ok(false)
4414}
4415
4416fn name_change(original: &str) -> String {
4424 let mut parts: Vec<_> = original.split('.').collect();
4425 let Some(first_part) = parts.get_mut(0) else {
4426 return format!("{original} (2)");
4427 };
4428
4429 let mut new_name = format!("{first_part} (2)");
4430
4431 if let Some(paren_pos) = first_part.rfind(" (") {
4433 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4435 let absolute_end_pos = paren_pos + end_paren;
4436 if absolute_end_pos == first_part.len() - 1 {
4438 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4441 let base_name = &first_part[..paren_pos];
4442 new_name = format!("{} ({})", base_name, number + 1)
4443 }
4444 }
4445 }
4446 }
4447
4448 *first_part = &new_name;
4449 parts.join(".")
4450}
4451
4452fn hostname_change(original: &str) -> String {
4460 let mut parts: Vec<_> = original.split('.').collect();
4461 let Some(first_part) = parts.get_mut(0) else {
4462 return format!("{original}-2");
4463 };
4464
4465 let mut new_name = format!("{first_part}-2");
4466
4467 if let Some(hyphen_pos) = first_part.rfind('-') {
4469 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4471 let base_name = &first_part[..hyphen_pos];
4472 new_name = format!("{}-{}", base_name, number + 1);
4473 }
4474 }
4475
4476 *first_part = &new_name;
4477 parts.join(".")
4478}
4479
4480fn add_answer_with_additionals(
4481 out: &mut DnsOutgoing,
4482 msg: &DnsIncoming,
4483 service: &ServiceInfo,
4484 intf: &MyIntf,
4485 dns_registry: &DnsRegistry,
4486 is_ipv4: bool,
4487) {
4488 let intf_addrs = if is_ipv4 {
4489 service.get_addrs_on_my_intf_v4(intf)
4490 } else {
4491 service.get_addrs_on_my_intf_v6(intf)
4492 };
4493 if intf_addrs.is_empty() {
4494 trace!("No addrs on LAN of intf {:?}", intf);
4495 return;
4496 }
4497
4498 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4500 Some(new_name) => new_name,
4501 None => service.get_fullname(),
4502 };
4503
4504 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4505 Some(new_name) => new_name,
4506 None => service.get_hostname(),
4507 };
4508
4509 let ptr_added = out.add_answer(
4510 msg,
4511 DnsPointer::new(
4512 service.get_type(),
4513 RRType::PTR,
4514 CLASS_IN,
4515 service.get_other_ttl(),
4516 service_fullname.to_string(),
4517 ),
4518 );
4519
4520 if !ptr_added {
4521 trace!("answer was not added for msg {:?}", msg);
4522 return;
4523 }
4524
4525 if let Some(sub) = service.get_subtype() {
4526 trace!("Adding subdomain {}", sub);
4527 out.add_additional_answer(DnsPointer::new(
4528 sub,
4529 RRType::PTR,
4530 CLASS_IN,
4531 service.get_other_ttl(),
4532 service_fullname.to_string(),
4533 ));
4534 }
4535
4536 out.add_additional_answer(DnsSrv::new(
4539 service_fullname,
4540 CLASS_IN | CLASS_CACHE_FLUSH,
4541 service.get_host_ttl(),
4542 service.get_priority(),
4543 service.get_weight(),
4544 service.get_port(),
4545 hostname.to_string(),
4546 ));
4547
4548 out.add_additional_answer(DnsTxt::new(
4549 service_fullname,
4550 CLASS_IN | CLASS_CACHE_FLUSH,
4551 service.get_other_ttl(),
4552 service.generate_txt(),
4553 ));
4554
4555 for address in intf_addrs {
4556 out.add_additional_answer(DnsAddress::new(
4557 hostname,
4558 ip_address_rr_type(&address),
4559 CLASS_IN | CLASS_CACHE_FLUSH,
4560 service.get_host_ttl(),
4561 address,
4562 intf.into(),
4563 ));
4564 }
4565}
4566
4567fn check_probing(
4570 dns_registry: &mut DnsRegistry,
4571 timers: &mut BinaryHeap<Reverse<u64>>,
4572 now: u64,
4573) -> (DnsOutgoing, Vec<String>) {
4574 let mut expired_probes = Vec::new();
4575 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4576
4577 for (name, probe) in dns_registry.probing.iter_mut() {
4578 if now >= probe.next_send {
4579 if probe.expired(now) {
4580 expired_probes.push(name.clone());
4582 } else {
4583 out.add_question(name, RRType::ANY);
4584
4585 for record in probe.records.iter() {
4593 out.add_authority(record.clone());
4594 }
4595
4596 probe.update_next_send(now);
4597
4598 timers.push(Reverse(probe.next_send));
4600 }
4601 }
4602 }
4603
4604 (out, expired_probes)
4605}
4606
4607fn handle_expired_probes(
4612 expired_probes: Vec<String>,
4613 intf_name: &str,
4614 dns_registry: &mut DnsRegistry,
4615 monitors: &mut Vec<Sender<DaemonEvent>>,
4616) -> HashSet<String> {
4617 let mut waiting_services = HashSet::new();
4618
4619 for name in expired_probes {
4620 let Some(probe) = dns_registry.probing.remove(&name) else {
4621 continue;
4622 };
4623
4624 for record in probe.records.iter() {
4626 if let Some(new_name) = record.get_record().get_new_name() {
4627 dns_registry
4628 .name_changes
4629 .insert(name.clone(), new_name.to_string());
4630
4631 let event = DnsNameChange {
4632 original: record.get_record().get_original_name().to_string(),
4633 new_name: new_name.to_string(),
4634 rr_type: record.get_type(),
4635 intf_name: intf_name.to_string(),
4636 };
4637 debug!("Name change event: {:?}", &event);
4638 notify_monitors(monitors, DaemonEvent::NameChange(event));
4639 }
4640 }
4641
4642 debug!(
4644 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4645 probe.records.len(),
4646 probe.waiting_services.len(),
4647 );
4648
4649 if !probe.records.is_empty() {
4651 match dns_registry.active.get_mut(&name) {
4652 Some(records) => {
4653 records.extend(probe.records);
4654 }
4655 None => {
4656 dns_registry.active.insert(name, probe.records);
4657 }
4658 }
4659
4660 waiting_services.extend(probe.waiting_services);
4661 }
4662 }
4663
4664 waiting_services
4665}
4666
4667#[cfg(test)]
4668mod tests {
4669 use super::{
4670 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4671 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4672 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, MDNS_PORT,
4673 };
4674 use crate::{
4675 dns_parser::{
4676 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4677 FLAGS_AA, FLAGS_QR_RESPONSE,
4678 },
4679 service_daemon::{add_answer_of_service, check_hostname},
4680 };
4681 use std::time::{Duration, SystemTime};
4682 use test_log::test;
4683
4684 #[test]
4685 fn test_instance_name() {
4686 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4687 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4688 assert!(!valid_instance_name("_printer._tcp.local."));
4689 }
4690
4691 #[test]
4692 fn test_check_service_name_length() {
4693 let result = check_service_name_length("_tcp", 100);
4694 assert!(result.is_err());
4695 if let Err(e) = result {
4696 println!("{}", e);
4697 }
4698 }
4699
4700 #[test]
4701 fn test_check_hostname() {
4702 for hostname in &[
4704 "my_host.local.",
4705 &("A".repeat(255 - ".local.".len()) + ".local."),
4706 ] {
4707 let result = check_hostname(hostname);
4708 assert!(result.is_ok());
4709 }
4710
4711 for hostname in &[
4713 "my_host.local",
4714 ".local.",
4715 &("A".repeat(256 - ".local.".len()) + ".local."),
4716 ] {
4717 let result = check_hostname(hostname);
4718 assert!(result.is_err());
4719 if let Err(e) = result {
4720 println!("{}", e);
4721 }
4722 }
4723 }
4724
4725 #[test]
4726 fn test_check_domain_suffix() {
4727 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4728 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4729 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4730 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4731 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4732 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4733 }
4734
4735 #[test]
4736 fn test_service_with_temporarily_invalidated_ptr() {
4737 let d = ServiceDaemon::new().expect("Failed to create daemon");
4739
4740 let service = "_test_inval_ptr._udp.local.";
4741 let host_name = "my_host_tmp_invalidated_ptr.local.";
4742 let intfs: Vec<_> = my_ip_interfaces(false);
4743 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4744 let port = 5201;
4745 let my_service =
4746 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4747 .expect("invalid service info")
4748 .enable_addr_auto();
4749 let result = d.register(my_service.clone());
4750 assert!(result.is_ok());
4751
4752 let browse_chan = d.browse(service).unwrap();
4754 let timeout = Duration::from_secs(2);
4755 let mut resolved = false;
4756
4757 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4758 match event {
4759 ServiceEvent::ServiceResolved(info) => {
4760 resolved = true;
4761 println!("Resolved a service of {}", &info.fullname);
4762 break;
4763 }
4764 e => {
4765 println!("Received event {:?}", e);
4766 }
4767 }
4768 }
4769
4770 assert!(resolved);
4771
4772 println!("Stopping browse of {}", service);
4773 d.stop_browse(service).unwrap();
4776
4777 let mut stopped = false;
4782 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4783 match event {
4784 ServiceEvent::SearchStopped(_) => {
4785 stopped = true;
4786 println!("Stopped browsing service");
4787 break;
4788 }
4789 e => {
4793 println!("Received event {:?}", e);
4794 }
4795 }
4796 }
4797
4798 assert!(stopped);
4799
4800 let invalidate_ptr_packet = DnsPointer::new(
4802 my_service.get_type(),
4803 RRType::PTR,
4804 CLASS_IN,
4805 0,
4806 my_service.get_fullname().to_string(),
4807 );
4808
4809 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4810 packet_buffer.add_additional_answer(invalidate_ptr_packet);
4811
4812 for intf in intfs {
4813 let sock = _new_socket_bind(&intf, true).unwrap();
4814 send_dns_outgoing_impl(
4815 &packet_buffer,
4816 &intf.name,
4817 intf.index.unwrap_or(0),
4818 &intf.addr,
4819 &sock.pktinfo,
4820 MDNS_PORT,
4821 )
4822 .unwrap();
4823 }
4824
4825 println!(
4826 "Sent PTR record invalidation. Starting second browse for {}",
4827 service
4828 );
4829
4830 let browse_chan = d.browse(service).unwrap();
4832
4833 resolved = false;
4834 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4835 match event {
4836 ServiceEvent::ServiceResolved(info) => {
4837 resolved = true;
4838 println!("Resolved a service of {}", &info.fullname);
4839 break;
4840 }
4841 e => {
4842 println!("Received event {:?}", e);
4843 }
4844 }
4845 }
4846
4847 assert!(resolved);
4848 d.shutdown().unwrap();
4849 }
4850
4851 #[test]
4852 fn test_expired_srv() {
4853 let service_type = "_expired-srv._udp.local.";
4855 let instance = "test_instance";
4856 let host_name = "expired_srv_host.local.";
4857 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4858 .unwrap()
4859 .enable_addr_auto();
4860 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
4865
4866 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4868 let result = mdns_server.register(my_service);
4869 assert!(result.is_ok());
4870
4871 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4872 let browse_chan = mdns_client.browse(service_type).unwrap();
4873 let timeout = Duration::from_secs(2);
4874 let mut resolved = false;
4875
4876 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4877 if let ServiceEvent::ServiceResolved(info) = event {
4878 resolved = true;
4879 println!("Resolved a service of {}", &info.fullname);
4880 break;
4881 }
4882 }
4883
4884 assert!(resolved);
4885
4886 mdns_server.shutdown().unwrap();
4888
4889 let expire_timeout = Duration::from_secs(new_ttl as u64);
4891 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4892 if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4893 println!("Service removed: {}: {}", &service_type, &full_name);
4894 break;
4895 }
4896 }
4897 }
4898
4899 #[test]
4900 fn test_hostname_resolution_address_removed() {
4901 let server = ServiceDaemon::new().expect("Failed to create server");
4903 let hostname = "addr_remove_host._tcp.local.";
4904 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4905 .iter()
4906 .find(|iface| iface.ip().is_ipv4())
4907 .map(|iface| iface.ip().into())
4908 .unwrap();
4909
4910 let mut my_service = ServiceInfo::new(
4911 "_host_res_test._tcp.local.",
4912 "my_instance",
4913 hostname,
4914 service_ip_addr.to_ip_addr(),
4915 1234,
4916 None,
4917 )
4918 .expect("invalid service info");
4919
4920 let addr_ttl = 2;
4922 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
4925
4926 let client = ServiceDaemon::new().expect("Failed to create client");
4928 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4929 let resolved = loop {
4930 match event_receiver.recv() {
4931 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4932 assert!(found_hostname == hostname);
4933 assert!(addresses.contains(&service_ip_addr));
4934 println!("address found: {:?}", &addresses);
4935 break true;
4936 }
4937 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4938 Ok(_event) => {}
4939 Err(_) => break false,
4940 }
4941 };
4942
4943 assert!(resolved);
4944
4945 server.shutdown().unwrap();
4947
4948 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4950 let removed = loop {
4951 match event_receiver.recv_timeout(timeout) {
4952 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4953 assert!(removed_host == hostname);
4954 assert!(addresses.contains(&service_ip_addr));
4955
4956 println!(
4957 "address removed: hostname: {} addresses: {:?}",
4958 &hostname, &addresses
4959 );
4960 break true;
4961 }
4962 Ok(_event) => {}
4963 Err(_) => {
4964 break false;
4965 }
4966 }
4967 };
4968
4969 assert!(removed);
4970
4971 client.shutdown().unwrap();
4972 }
4973
4974 #[test]
4975 fn test_refresh_ptr() {
4976 let service_type = "_refresh-ptr._udp.local.";
4978 let instance = "test_instance";
4979 let host_name = "refresh_ptr_host.local.";
4980 let service_ip_addr = my_ip_interfaces(false)
4981 .iter()
4982 .find(|iface| iface.ip().is_ipv4())
4983 .map(|iface| iface.ip())
4984 .unwrap();
4985
4986 let mut my_service = ServiceInfo::new(
4987 service_type,
4988 instance,
4989 host_name,
4990 service_ip_addr,
4991 5023,
4992 None,
4993 )
4994 .unwrap();
4995
4996 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4998
4999 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5001 let result = mdns_server.register(my_service);
5002 assert!(result.is_ok());
5003
5004 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5005 let browse_chan = mdns_client.browse(service_type).unwrap();
5006 let timeout = Duration::from_millis(1500); let mut resolved = false;
5008
5009 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5011 if let ServiceEvent::ServiceResolved(info) = event {
5012 resolved = true;
5013 println!("Resolved a service of {}", &info.fullname);
5014 break;
5015 }
5016 }
5017
5018 assert!(resolved);
5019
5020 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
5022 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5023 println!("event: {:?}", &event);
5024 }
5025
5026 let metrics_chan = mdns_client.get_metrics().unwrap();
5028 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
5029 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
5030 assert_eq!(ptr_refresh_counter, 1);
5031 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
5032 assert_eq!(srvtxt_refresh_counter, 1);
5033
5034 mdns_server.shutdown().unwrap();
5036 mdns_client.shutdown().unwrap();
5037 }
5038
5039 #[test]
5040 fn test_name_change() {
5041 assert_eq!(name_change("foo.local."), "foo (2).local.");
5042 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5043 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5044 assert_eq!(name_change("foo"), "foo (2)");
5045 assert_eq!(name_change("foo (2)"), "foo (3)");
5046 assert_eq!(name_change(""), " (2)");
5047
5048 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
5053
5054 #[test]
5055 fn test_hostname_change() {
5056 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5057 assert_eq!(hostname_change("foo"), "foo-2");
5058 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5059 assert_eq!(hostname_change("foo-9"), "foo-10");
5060 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5061 }
5062
5063 #[test]
5064 fn test_add_answer_txt_ttl() {
5065 let service_type = "_test_add_answer._udp.local.";
5067 let instance = "test_instance";
5068 let host_name = "add_answer_host.local.";
5069 let service_intf = my_ip_interfaces(false)
5070 .into_iter()
5071 .find(|iface| iface.ip().is_ipv4())
5072 .unwrap();
5073 let service_ip_addr = service_intf.ip();
5074 let my_service = ServiceInfo::new(
5075 service_type,
5076 instance,
5077 host_name,
5078 service_ip_addr,
5079 5023,
5080 None,
5081 )
5082 .unwrap();
5083
5084 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5086
5087 let mut dummy_data = out.to_data_on_wire();
5089 let interface_id = InterfaceId::from(&service_intf);
5090 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5091
5092 let if_addrs = vec![service_intf.ip()];
5094 add_answer_of_service(
5095 &mut out,
5096 &incoming,
5097 instance,
5098 &my_service,
5099 RRType::TXT,
5100 if_addrs,
5101 );
5102
5103 assert!(
5105 out.answers_count() > 0,
5106 "No answers added to the outgoing message"
5107 );
5108
5109 let answer = out._answers().first().unwrap();
5111 assert_eq!(answer.0.get_type(), RRType::TXT);
5112
5113 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5115 }
5116
5117 #[test]
5118 fn test_interface_flip() {
5119 let ty_domain = "_intf-flip._udp.local.";
5121 let host_name = "intf_flip.local.";
5122 let now = SystemTime::now()
5123 .duration_since(SystemTime::UNIX_EPOCH)
5124 .unwrap();
5125 let instance_name = now.as_micros().to_string(); let port = 5200;
5127
5128 let (ip_addr1, intf_name) = my_ip_interfaces(false)
5130 .iter()
5131 .find(|iface| iface.ip().is_ipv4())
5132 .map(|iface| (iface.ip(), iface.name.clone()))
5133 .unwrap();
5134
5135 println!("Using interface {} with IP {}", intf_name, ip_addr1);
5136
5137 let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5139 .expect("valid service info");
5140 let server1 = ServiceDaemon::new().expect("failed to start server");
5141 server1
5142 .register(service1)
5143 .expect("Failed to register service1");
5144
5145 std::thread::sleep(Duration::from_secs(2));
5147
5148 let client = ServiceDaemon::new().expect("failed to start client");
5150
5151 let receiver = client.browse(ty_domain).unwrap();
5152
5153 let timeout = Duration::from_secs(3);
5154 let mut got_data = false;
5155
5156 while let Ok(event) = receiver.recv_timeout(timeout) {
5157 if let ServiceEvent::ServiceResolved(_) = event {
5158 println!("Received ServiceResolved event");
5159 got_data = true;
5160 break;
5161 }
5162 }
5163
5164 assert!(got_data, "Should receive ServiceResolved event");
5165
5166 client.set_ip_check_interval(1).unwrap();
5168
5169 println!("Shutting down interface {}", &intf_name);
5171 client.test_down_interface(&intf_name).unwrap();
5172
5173 let mut got_removed = false;
5174
5175 while let Ok(event) = receiver.recv_timeout(timeout) {
5176 if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5177 got_removed = true;
5178 println!("removed: {ty_domain} : {instance}");
5179 break;
5180 }
5181 }
5182 assert!(got_removed, "Should receive ServiceRemoved event");
5183
5184 println!("Bringing up interface {}", &intf_name);
5185 client.test_up_interface(&intf_name).unwrap();
5186 let mut got_data = false;
5187 while let Ok(event) = receiver.recv_timeout(timeout) {
5188 if let ServiceEvent::ServiceResolved(resolved) = event {
5189 got_data = true;
5190 println!("Received ServiceResolved: {:?}", resolved);
5191 break;
5192 }
5193 }
5194 assert!(
5195 got_data,
5196 "Should receive ServiceResolved event after interface is back up"
5197 );
5198
5199 server1.shutdown().unwrap();
5200 client.shutdown().unwrap();
5201 }
5202
5203 #[test]
5204 fn test_cache_only() {
5205 let service_type = "_cache_only._udp.local.";
5207 let instance = "test_instance";
5208 let host_name = "cache_only_host.local.";
5209 let service_ip_addr = my_ip_interfaces(false)
5210 .iter()
5211 .find(|iface| iface.ip().is_ipv4())
5212 .map(|iface| iface.ip())
5213 .unwrap();
5214
5215 let mut my_service = ServiceInfo::new(
5216 service_type,
5217 instance,
5218 host_name,
5219 service_ip_addr,
5220 5023,
5221 None,
5222 )
5223 .unwrap();
5224
5225 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
5227
5228 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5229
5230 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5233 std::thread::sleep(Duration::from_secs(2));
5234
5235 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5237 let result = mdns_server.register(my_service);
5238 assert!(result.is_ok());
5239
5240 let timeout = Duration::from_millis(1500); let mut resolved = false;
5242
5243 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5245 if let ServiceEvent::ServiceResolved(info) = event {
5246 resolved = true;
5247 println!("Resolved a service of {}", &info.get_fullname());
5248 break;
5249 }
5250 }
5251
5252 assert!(resolved);
5253
5254 mdns_server.shutdown().unwrap();
5256 mdns_client.shutdown().unwrap();
5257 }
5258
5259 #[test]
5260 fn test_cache_only_unsolicited() {
5261 let service_type = "_c_unsolicit._udp.local.";
5262 let instance = "test_instance";
5263 let host_name = "c_unsolicit_host.local.";
5264 let service_ip_addr = my_ip_interfaces(false)
5265 .iter()
5266 .find(|iface| iface.ip().is_ipv4())
5267 .map(|iface| iface.ip())
5268 .unwrap();
5269
5270 let my_service = ServiceInfo::new(
5271 service_type,
5272 instance,
5273 host_name,
5274 service_ip_addr,
5275 5023,
5276 None,
5277 )
5278 .unwrap();
5279
5280 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5282 let result = mdns_server.register(my_service);
5283 assert!(result.is_ok());
5284
5285 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5286 mdns_client.accept_unsolicited(true).unwrap();
5287
5288 std::thread::sleep(Duration::from_secs(2));
5291 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5292 let timeout = Duration::from_millis(1500); let mut resolved = false;
5294
5295 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5297 if let ServiceEvent::ServiceResolved(info) = event {
5298 resolved = true;
5299 println!("Resolved a service of {}", &info.get_fullname());
5300 break;
5301 }
5302 }
5303
5304 assert!(resolved);
5305
5306 mdns_server.shutdown().unwrap();
5308 mdns_client.shutdown().unwrap();
5309 }
5310
5311 #[test]
5312 fn test_custom_port_isolation() {
5313 let service_type = "_custom_port._udp.local.";
5318 let instance_custom = "custom_port_instance";
5319 let instance_default = "default_port_instance";
5320 let host_name = "custom_port_host.local.";
5321
5322 let service_ip_addr = my_ip_interfaces(false)
5323 .iter()
5324 .find(|iface| iface.ip().is_ipv4())
5325 .map(|iface| iface.ip())
5326 .expect("Test requires an IPv4 interface");
5327
5328 let service_custom = ServiceInfo::new(
5330 service_type,
5331 instance_custom,
5332 host_name,
5333 service_ip_addr,
5334 8080,
5335 None,
5336 )
5337 .unwrap();
5338
5339 let service_default = ServiceInfo::new(
5341 service_type,
5342 instance_default,
5343 host_name,
5344 service_ip_addr,
5345 8081,
5346 None,
5347 )
5348 .unwrap();
5349
5350 let custom_port = 5454u16;
5352 let server_custom =
5353 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5354 let client_custom =
5355 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5356
5357 let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5359
5360 server_custom
5362 .register(service_custom.clone())
5363 .expect("Failed to register custom port service");
5364
5365 server_default
5367 .register(service_default.clone())
5368 .expect("Failed to register default port service");
5369
5370 let browse_custom = client_custom
5372 .browse(service_type)
5373 .expect("Failed to browse on custom port");
5374
5375 let timeout = Duration::from_secs(3);
5376 let mut found_custom = false;
5377 let mut found_default_on_custom = false;
5378
5379 while let Ok(event) = browse_custom.recv_timeout(timeout) {
5381 if let ServiceEvent::ServiceResolved(info) = event {
5382 println!(
5383 "Custom port client resolved: {} on port {}",
5384 info.get_fullname(),
5385 info.get_port()
5386 );
5387 if info.get_fullname().starts_with(instance_custom) {
5388 found_custom = true;
5389 assert_eq!(info.get_port(), 8080);
5390 }
5391 if info.get_fullname().starts_with(instance_default) {
5392 found_default_on_custom = true;
5393 }
5394 }
5395 }
5396
5397 assert!(
5398 found_custom,
5399 "Custom port client should find service on custom port"
5400 );
5401 assert!(
5402 !found_default_on_custom,
5403 "Custom port client should NOT find service on default port"
5404 );
5405
5406 let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5409 let browse_default = client_default
5410 .browse(service_type)
5411 .expect("Failed to browse on default port");
5412
5413 let mut found_default = false;
5414 let mut found_custom_on_default = false;
5415
5416 while let Ok(event) = browse_default.recv_timeout(timeout) {
5417 if let ServiceEvent::ServiceResolved(info) = event {
5418 println!(
5419 "Default port client resolved: {} on port {}",
5420 info.get_fullname(),
5421 info.get_port()
5422 );
5423 if info.get_fullname().starts_with(instance_default) {
5424 found_default = true;
5425 assert_eq!(info.get_port(), 8081);
5426 }
5427 if info.get_fullname().starts_with(instance_custom) {
5428 found_custom_on_default = true;
5429 }
5430 }
5431 }
5432
5433 assert!(
5434 found_default,
5435 "Default port client should find service on default port"
5436 );
5437 assert!(
5438 !found_custom_on_default,
5439 "Default port client should NOT find service on custom port"
5440 );
5441
5442 server_custom.shutdown().unwrap();
5444 client_custom.shutdown().unwrap();
5445 server_default.shutdown().unwrap();
5446 client_default.shutdown().unwrap();
5447 }
5448}