1#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34 current_time_millis,
35 dns_cache::{DnsCache, IpType},
36 dns_parser::{
37 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
38 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
39 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
40 },
41 error::{e_fmt, Error, Result},
42 service_info::{valid_ip_on_intf, DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
43 Receiver, ResolvedService, TxtProperties,
44};
45use flume::{bounded, Sender, TrySendError};
46use if_addrs::{IfAddr, Interface};
47use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
48use socket2::Domain;
49use socket_pktinfo::PktInfoUdpSocket;
50use std::{
51 cmp::{self, Reverse},
52 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
53 fmt, io,
54 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55 str, thread,
56 time::Duration,
57 vec,
58};
59
60pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72pub const MDNS_PORT: u16 = 5353;
74
75const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
76const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
77const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
78
79const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
80
81#[derive(Debug)]
83pub enum UnregisterStatus {
84 OK,
86 NotFound,
88}
89
90#[derive(Debug, PartialEq, Clone, Eq)]
92#[non_exhaustive]
93pub enum DaemonStatus {
94 Running,
96
97 Shutdown,
99}
100
101#[derive(Hash, Eq, PartialEq)]
104enum Counter {
105 Register,
106 RegisterResend,
107 Unregister,
108 UnregisterResend,
109 Browse,
110 ResolveHostname,
111 Respond,
112 CacheRefreshPTR,
113 CacheRefreshSrvTxt,
114 CacheRefreshAddr,
115 KnownAnswerSuppression,
116 CachedPTR,
117 CachedSRV,
118 CachedAddr,
119 CachedTxt,
120 CachedNSec,
121 CachedSubtype,
122 DnsRegistryProbe,
123 DnsRegistryActive,
124 DnsRegistryTimer,
125 DnsRegistryNameChange,
126 Timer,
127}
128
129impl fmt::Display for Counter {
130 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131 match self {
132 Self::Register => write!(f, "register"),
133 Self::RegisterResend => write!(f, "register-resend"),
134 Self::Unregister => write!(f, "unregister"),
135 Self::UnregisterResend => write!(f, "unregister-resend"),
136 Self::Browse => write!(f, "browse"),
137 Self::ResolveHostname => write!(f, "resolve-hostname"),
138 Self::Respond => write!(f, "respond"),
139 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
140 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
141 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
142 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
143 Self::CachedPTR => write!(f, "cached-ptr"),
144 Self::CachedSRV => write!(f, "cached-srv"),
145 Self::CachedAddr => write!(f, "cached-addr"),
146 Self::CachedTxt => write!(f, "cached-txt"),
147 Self::CachedNSec => write!(f, "cached-nsec"),
148 Self::CachedSubtype => write!(f, "cached-subtype"),
149 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
150 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
151 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
152 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
153 Self::Timer => write!(f, "timer"),
154 }
155 }
156}
157
158#[derive(Debug)]
159enum InternalError {
160 IntfAddrInvalid(Interface),
161}
162
163impl fmt::Display for InternalError {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 match self {
166 InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
167 }
168 }
169}
170
171type MyResult<T> = core::result::Result<T, InternalError>;
172
173struct MyUdpSocket {
178 pktinfo: PktInfoUdpSocket,
181
182 mio: MioUdpSocket,
185}
186
187impl MyUdpSocket {
188 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
189 let std_sock = pktinfo.try_clone_std()?;
190 let mio = MioUdpSocket::from_std(std_sock);
191
192 Ok(Self { pktinfo, mio })
193 }
194}
195
196impl Source for MyUdpSocket {
198 fn register(
199 &mut self,
200 registry: &Registry,
201 token: Token,
202 interests: Interest,
203 ) -> io::Result<()> {
204 self.mio.register(registry, token, interests)
205 }
206
207 fn reregister(
208 &mut self,
209 registry: &Registry,
210 token: Token,
211 interests: Interest,
212 ) -> io::Result<()> {
213 self.mio.reregister(registry, token, interests)
214 }
215
216 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
217 self.mio.deregister(registry)
218 }
219}
220
221pub type Metrics = HashMap<String, i64>;
224
225const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
233pub struct ServiceDaemon {
234 sender: Sender<Command>,
236
237 signal_addr: SocketAddr,
243}
244
245impl ServiceDaemon {
246 pub fn new() -> Result<Self> {
262 Self::new_with_port(MDNS_PORT)
263 }
264
265 pub fn new_with_port(port: u16) -> Result<Self> {
297 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
300
301 let signal_sock = UdpSocket::bind(signal_addr)
302 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
303
304 let signal_addr = signal_sock
306 .local_addr()
307 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
308
309 signal_sock
311 .set_nonblocking(true)
312 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
313
314 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
315
316 let (sender, receiver) = bounded(100);
317
318 let mio_sock = MioUdpSocket::from_std(signal_sock);
320 let cmd_sender = sender.clone();
321 thread::Builder::new()
322 .name("mDNS_daemon".to_string())
323 .spawn(move || {
324 Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
325 })
326 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
327
328 Ok(Self {
329 sender,
330 signal_addr,
331 })
332 }
333
334 fn send_cmd(&self, cmd: Command) -> Result<()> {
337 let cmd_name = cmd.to_string();
338
339 self.sender.try_send(cmd).map_err(|e| match e {
341 TrySendError::Full(_) => Error::Again,
342 TrySendError::Disconnected(_) => Error::DaemonShutdown,
343 })?;
344
345 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
347 let socket = UdpSocket::bind(addr)
348 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
349 socket
350 .send_to(cmd_name.as_bytes(), self.signal_addr)
351 .map_err(|e| {
352 e_fmt!(
353 "signal socket send_to {} ({}) failed: {}",
354 self.signal_addr,
355 cmd_name,
356 e
357 )
358 })?;
359
360 Ok(())
361 }
362
363 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
383 check_domain_suffix(service_type)?;
384
385 let (resp_s, resp_r) = bounded(10);
386 self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
387 Ok(resp_r)
388 }
389
390 pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
403 check_domain_suffix(service_type)?;
404
405 let (resp_s, resp_r) = bounded(10);
406 self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
407 Ok(resp_r)
408 }
409
410 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
418 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
419 }
420
421 pub fn resolve_hostname(
441 &self,
442 hostname: &str,
443 timeout: Option<u64>,
444 ) -> Result<Receiver<HostnameResolutionEvent>> {
445 check_hostname(hostname)?;
446 let (resp_s, resp_r) = bounded(10);
447 self.send_cmd(Command::ResolveHostname(
448 hostname.to_string(),
449 1,
450 resp_s,
451 timeout,
452 ))?;
453 Ok(resp_r)
454 }
455
456 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
462 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
463 }
464
465 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
485 check_service_name(service_info.get_fullname())?;
486 check_hostname(service_info.get_hostname())?;
487
488 self.send_cmd(Command::Register(service_info.into()))
489 }
490
491 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
502 let (resp_s, resp_r) = bounded(1);
503 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
504 Ok(resp_r)
505 }
506
507 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
517 let (resp_s, resp_r) = bounded(100);
518 self.send_cmd(Command::Monitor(resp_s))?;
519 Ok(resp_r)
520 }
521
522 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
530 let (resp_s, resp_r) = bounded(1);
531 self.send_cmd(Command::Exit(resp_s))?;
532 Ok(resp_r)
533 }
534
535 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
543 let (resp_s, resp_r) = bounded(1);
544
545 if self.sender.is_disconnected() {
546 resp_s
547 .send(DaemonStatus::Shutdown)
548 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
549 } else {
550 self.send_cmd(Command::GetStatus(resp_s))?;
551 }
552
553 Ok(resp_r)
554 }
555
556 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
567 let (resp_s, resp_r) = bounded(1);
568 self.send_cmd(Command::GetMetrics(resp_s))?;
569 Ok(resp_r)
570 }
571
572 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
587 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
590 return Err(Error::Msg(format!(
591 "service name length max {len_max} is too large"
592 )));
593 }
594
595 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
596 }
597
598 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
604 let interval_in_millis = interval_in_secs as u64 * 1000;
605 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
606 interval_in_millis,
607 )))
608 }
609
610 pub fn get_ip_check_interval(&self) -> Result<u32> {
612 let (resp_s, resp_r) = bounded(1);
613 self.send_cmd(Command::GetOption(resp_s))?;
614
615 let option = resp_r
616 .recv_timeout(Duration::from_secs(10))
617 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
618 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
619 Ok(ip_check_interval_in_secs as u32)
620 }
621
622 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
629 let if_kind_vec = if_kind.into_vec();
630 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
631 if_kind_vec.kinds,
632 )))
633 }
634
635 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
642 let if_kind_vec = if_kind.into_vec();
643 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
644 if_kind_vec.kinds,
645 )))
646 }
647
648 pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
659 self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
660 }
661
662 pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
665 self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
666 }
667
668 #[cfg(test)]
669 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
670 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
671 ifname.to_string(),
672 )))
673 }
674
675 #[cfg(test)]
676 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
677 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
678 ifname.to_string(),
679 )))
680 }
681
682 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
698 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
699 }
700
701 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
717 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
718 }
719
720 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
739 self.send_cmd(Command::Verify(instance_fullname, timeout))
740 }
741
742 fn daemon_thread(
743 signal_sock: MioUdpSocket,
744 poller: Poll,
745 receiver: Receiver<Command>,
746 port: u16,
747 cmd_sender: Sender<Command>,
748 signal_addr: SocketAddr,
749 ) {
750 let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
751
752 if let Some(cmd) = zc.run(receiver) {
753 match cmd {
754 Command::Exit(resp_s) => {
755 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
758 debug!("exit: failed to send response of shutdown: {}", e);
759 }
760 }
761 _ => {
762 debug!("Unexpected command: {:?}", cmd);
763 }
764 }
765 }
766 }
767}
768
769fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
771 let intf_ip = &intf.ip();
774 match intf_ip {
775 IpAddr::V4(ip) => {
776 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
777 let sock = new_socket(addr.into(), true)?;
778
779 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
781 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
782
783 sock.set_multicast_if_v4(ip)
785 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
786
787 sock.set_multicast_ttl_v4(255)
792 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
793
794 if !should_loop {
795 sock.set_multicast_loop_v4(false)
796 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
797 }
798
799 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
801 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
802 for packet in test_packets {
803 sock.send_to(&packet, &multicast_addr)
804 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
805 }
806 MyUdpSocket::new(sock)
807 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
808 }
809 IpAddr::V6(ip) => {
810 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
811 let sock = new_socket(addr.into(), true)?;
812
813 let if_index = intf.index.unwrap_or(0);
814
815 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
817 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
818
819 sock.set_multicast_if_v6(if_index)
821 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
822
823 MyUdpSocket::new(sock)
828 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
829 }
830 }
831}
832
833fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
836 let domain = match addr {
837 SocketAddr::V4(_) => socket2::Domain::IPV4,
838 SocketAddr::V6(_) => socket2::Domain::IPV6,
839 };
840
841 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
842
843 fd.set_reuse_address(true)
844 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
845 #[cfg(unix)]
846 if let Err(e) = fd.set_reuse_port(true) {
847 debug!(
848 "SO_REUSEPORT is not supported, continuing without it: {}",
849 e
850 );
851 }
852
853 if non_block {
854 fd.set_nonblocking(true)
855 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
856 }
857
858 fd.bind(&addr.into())
859 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
860
861 trace!("new socket bind to {}", &addr);
862 Ok(fd)
863}
864
865struct ReRun {
867 next_time: u64,
869 command: Command,
870}
871
872#[derive(Debug, Clone)]
876#[non_exhaustive]
877pub enum IfKind {
878 All,
880
881 IPv4,
883
884 IPv6,
886
887 Name(String),
889
890 Addr(IpAddr),
894
895 LoopbackV4,
899
900 LoopbackV6,
902
903 IndexV4(u32),
905
906 IndexV6(u32),
908
909 Predicate(IfPredicate),
911}
912
913impl IfKind {
914 pub(crate) fn matches(&self, intf: &Interface) -> bool {
916 match self {
917 Self::All => true,
918 Self::IPv4 => intf.ip().is_ipv4(),
919 Self::IPv6 => intf.ip().is_ipv6(),
920 Self::Name(ifname) => ifname == &intf.name,
921 Self::Addr(addr) => addr == &intf.ip(),
922 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
923 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
924 Self::IndexV4(idx) => intf.index == Some(*idx) && intf.ip().is_ipv4(),
925 Self::IndexV6(idx) => intf.index == Some(*idx) && intf.ip().is_ipv6(),
926 Self::Predicate(p) => p.matches(intf),
927 }
928 }
929}
930
931impl From<&str> for IfKind {
934 fn from(val: &str) -> Self {
935 Self::Name(val.to_string())
936 }
937}
938
939impl From<&String> for IfKind {
940 fn from(val: &String) -> Self {
941 Self::Name(val.to_string())
942 }
943}
944
945impl From<IpAddr> for IfKind {
947 fn from(val: IpAddr) -> Self {
948 Self::Addr(val)
949 }
950}
951
952pub struct IfKindVec {
954 kinds: Vec<IfKind>,
955}
956
957pub trait IntoIfKindVec {
959 fn into_vec(self) -> IfKindVec;
960}
961
962impl<T: Into<IfKind>> IntoIfKindVec for T {
963 fn into_vec(self) -> IfKindVec {
964 let if_kind: IfKind = self.into();
965 IfKindVec {
966 kinds: vec![if_kind],
967 }
968 }
969}
970
971impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
972 fn into_vec(self) -> IfKindVec {
973 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
974 IfKindVec { kinds }
975 }
976}
977
978#[derive(Clone)]
980pub struct IfPredicate(std::sync::Arc<dyn Fn(&Interface) -> bool + Send + Sync>);
981
982impl IfPredicate {
983 pub fn new(predicate: impl Fn(&Interface) -> bool + Send + Sync + 'static) -> Self {
994 Self(std::sync::Arc::new(predicate))
995 }
996
997 pub(crate) fn matches(&self, intf: &Interface) -> bool {
998 self.0(intf)
999 }
1000}
1001
1002impl std::fmt::Debug for IfPredicate {
1003 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1004 write!(f, "IfPredicate(...)")
1005 }
1006}
1007
1008struct IfSelection {
1010 if_kind: IfKind,
1012
1013 selected: bool,
1015}
1016
1017struct Zeroconf {
1019 port: u16,
1022
1023 my_intfs: HashMap<u32, MyIntf>,
1025
1026 ipv4_sock: Option<MyUdpSocket>,
1028
1029 ipv6_sock: Option<MyUdpSocket>,
1031
1032 my_services: HashMap<String, ServiceInfo>,
1034
1035 cache: DnsCache,
1037
1038 dns_registry_map: HashMap<u32, DnsRegistry>,
1040
1041 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
1052
1053 counters: Metrics,
1054
1055 poller: Poll,
1057
1058 monitors: Vec<Sender<DaemonEvent>>,
1060
1061 service_name_len_max: u8,
1063
1064 ip_check_interval: u64,
1066
1067 if_selections: Vec<IfSelection>,
1069
1070 signal_sock: MioUdpSocket,
1072
1073 timers: BinaryHeap<Reverse<u64>>,
1079
1080 status: DaemonStatus,
1081
1082 pending_resolves: HashSet<String>,
1084
1085 resolved: HashSet<String>,
1087
1088 multicast_loop_v4: bool,
1089
1090 multicast_loop_v6: bool,
1091
1092 accept_unsolicited: bool,
1093
1094 include_apple_p2p: bool,
1095
1096 cmd_sender: Sender<Command>,
1097
1098 signal_addr: SocketAddr,
1099
1100 #[cfg(test)]
1101 test_down_interfaces: HashSet<String>,
1102}
1103
1104fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
1106 let intf_ip = &intf.ip();
1107 match intf_ip {
1108 IpAddr::V4(ip) => {
1109 debug!("join multicast group V4 on {} addr {ip}", intf.name);
1111 my_sock
1112 .join_multicast_v4(&GROUP_ADDR_V4, ip)
1113 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
1114 }
1115 IpAddr::V6(ip) => {
1116 let if_index = intf.index.unwrap_or(0);
1117 debug!(
1119 "join multicast group V6 on {} addr {ip} with index {if_index}",
1120 intf.name
1121 );
1122 my_sock
1123 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
1124 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
1125 }
1126 }
1127 Ok(())
1128}
1129
1130impl Zeroconf {
1131 fn new(
1132 signal_sock: MioUdpSocket,
1133 poller: Poll,
1134 port: u16,
1135 cmd_sender: Sender<Command>,
1136 signal_addr: SocketAddr,
1137 ) -> Self {
1138 let my_ifaddrs = my_ip_interfaces(true);
1140
1141 let mut my_intfs = HashMap::new();
1145 let mut dns_registry_map = HashMap::new();
1146
1147 let mut ipv4_sock = None;
1150 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1151 match new_socket(addr.into(), true) {
1152 Ok(sock) => {
1153 sock.set_multicast_ttl_v4(255)
1158 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1159 .ok();
1160
1161 ipv4_sock = match MyUdpSocket::new(sock) {
1163 Ok(s) => Some(s),
1164 Err(e) => {
1165 debug!("failed to create IPv4 MyUdpSocket: {e}");
1166 None
1167 }
1168 };
1169 }
1170 Err(e) => debug!("failed to create IPv4 socket: {e}"),
1172 }
1173
1174 let mut ipv6_sock = None;
1175 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1176 match new_socket(addr.into(), true) {
1177 Ok(sock) => {
1178 sock.set_multicast_hops_v6(255)
1182 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1183 .ok();
1184
1185 ipv6_sock = match MyUdpSocket::new(sock) {
1187 Ok(s) => Some(s),
1188 Err(e) => {
1189 debug!("failed to create IPv6 MyUdpSocket: {e}");
1190 None
1191 }
1192 };
1193 }
1194 Err(e) => debug!("failed to create IPv6 socket: {e}"),
1195 }
1196
1197 for intf in my_ifaddrs {
1199 let sock_opt = if intf.ip().is_ipv4() {
1200 &ipv4_sock
1201 } else {
1202 &ipv6_sock
1203 };
1204 let Some(sock) = sock_opt else {
1205 debug!(
1206 "no socket available for interface {} with addr {}. Skipped.",
1207 intf.name,
1208 intf.ip()
1209 );
1210 continue;
1211 };
1212
1213 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1214 debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1215 }
1216
1217 let if_index = intf.index.unwrap_or(0);
1218
1219 dns_registry_map
1221 .entry(if_index)
1222 .or_insert_with(DnsRegistry::new);
1223
1224 my_intfs
1225 .entry(if_index)
1226 .and_modify(|v: &mut MyIntf| {
1227 v.addrs.insert(intf.addr.clone());
1228 })
1229 .or_insert(MyIntf {
1230 name: intf.name.clone(),
1231 index: if_index,
1232 addrs: HashSet::from([intf.addr]),
1233 });
1234 }
1235
1236 let monitors = Vec::new();
1237 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1238 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1239
1240 let timers = BinaryHeap::new();
1241
1242 let if_selections = vec![];
1244
1245 let status = DaemonStatus::Running;
1246
1247 Self {
1248 port,
1249 my_intfs,
1250 ipv4_sock,
1251 ipv6_sock,
1252 my_services: HashMap::new(),
1253 cache: DnsCache::new(),
1254 dns_registry_map,
1255 hostname_resolvers: HashMap::new(),
1256 service_queriers: HashMap::new(),
1257 retransmissions: Vec::new(),
1258 counters: HashMap::new(),
1259 poller,
1260 monitors,
1261 service_name_len_max,
1262 ip_check_interval,
1263 if_selections,
1264 signal_sock,
1265 timers,
1266 status,
1267 pending_resolves: HashSet::new(),
1268 resolved: HashSet::new(),
1269 multicast_loop_v4: true,
1270 multicast_loop_v6: true,
1271 accept_unsolicited: false,
1272 include_apple_p2p: false,
1273 cmd_sender,
1274 signal_addr,
1275
1276 #[cfg(test)]
1277 test_down_interfaces: HashSet::new(),
1278 }
1279 }
1280
1281 fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1283 let cmd_name = cmd.to_string();
1284
1285 self.cmd_sender.try_send(cmd).map_err(|e| match e {
1286 TrySendError::Full(_) => Error::Again,
1287 TrySendError::Disconnected(_) => Error::DaemonShutdown,
1288 })?;
1289
1290 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1291 let socket = UdpSocket::bind(addr)
1292 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1293 socket
1294 .send_to(cmd_name.as_bytes(), self.signal_addr)
1295 .map_err(|e| {
1296 e_fmt!(
1297 "signal socket send_to {} ({}) failed: {}",
1298 self.signal_addr,
1299 cmd_name,
1300 e
1301 )
1302 })?;
1303
1304 Ok(())
1305 }
1306
1307 fn cleanup(&mut self) {
1315 debug!("Starting cleanup for shutdown");
1316
1317 let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1319 for fullname in service_names {
1320 if let Some(info) = self.my_services.get(&fullname) {
1321 debug!("Unregistering service during shutdown: {}", &fullname);
1322
1323 for intf in self.my_intfs.values() {
1324 if let Some(sock) = self.ipv4_sock.as_ref() {
1325 self.unregister_service(info, intf, &sock.pktinfo);
1326 }
1327
1328 if let Some(sock) = self.ipv6_sock.as_ref() {
1329 self.unregister_service(info, intf, &sock.pktinfo);
1330 }
1331 }
1332 }
1333 }
1334 self.my_services.clear();
1335
1336 let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1338 for ty_domain in browse_types {
1339 debug!("Stopping browse during shutdown: {}", &ty_domain);
1340 if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1341 if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1343 debug!("Failed to send SearchStopped during shutdown: {}", e);
1344 }
1345 }
1346 }
1347
1348 let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1350 for hostname in hostnames {
1351 debug!(
1352 "Stopping hostname resolution during shutdown: {}",
1353 &hostname
1354 );
1355 if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1356 if let Err(e) =
1358 sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1359 {
1360 debug!(
1361 "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1362 e
1363 );
1364 }
1365 }
1366 }
1367
1368 self.retransmissions.clear();
1370
1371 debug!("Cleanup completed");
1372 }
1373
1374 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1383 if let Err(e) = self.poller.registry().register(
1385 &mut self.signal_sock,
1386 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1387 mio::Interest::READABLE,
1388 ) {
1389 debug!("failed to add signal socket to the poller: {}", e);
1390 return None;
1391 }
1392
1393 if let Some(sock) = self.ipv4_sock.as_mut() {
1394 if let Err(e) = self.poller.registry().register(
1395 sock,
1396 mio::Token(IPV4_SOCK_EVENT_KEY),
1397 mio::Interest::READABLE,
1398 ) {
1399 debug!("failed to register ipv4 socket: {}", e);
1400 return None;
1401 }
1402 }
1403
1404 if let Some(sock) = self.ipv6_sock.as_mut() {
1405 if let Err(e) = self.poller.registry().register(
1406 sock,
1407 mio::Token(IPV6_SOCK_EVENT_KEY),
1408 mio::Interest::READABLE,
1409 ) {
1410 debug!("failed to register ipv6 socket: {}", e);
1411 return None;
1412 }
1413 }
1414
1415 let mut next_ip_check = if self.ip_check_interval > 0 {
1417 current_time_millis() + self.ip_check_interval
1418 } else {
1419 0
1420 };
1421
1422 if next_ip_check > 0 {
1423 self.add_timer(next_ip_check);
1424 }
1425
1426 let mut events = mio::Events::with_capacity(1024);
1429 loop {
1430 let now = current_time_millis();
1431
1432 let earliest_timer = self.peek_earliest_timer();
1433 let timeout = earliest_timer.map(|timer| {
1434 let millis = if timer > now { timer - now } else { 1 };
1436 Duration::from_millis(millis)
1437 });
1438
1439 events.clear();
1441 match self.poller.poll(&mut events, timeout) {
1442 Ok(_) => self.handle_poller_events(&events),
1443 Err(e) => debug!("failed to select from sockets: {}", e),
1444 }
1445
1446 let now = current_time_millis();
1447
1448 self.pop_timers_till(now);
1450
1451 for hostname in self
1453 .hostname_resolvers
1454 .clone()
1455 .into_iter()
1456 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1457 .map(|(hostname, _)| hostname)
1458 {
1459 trace!("hostname resolver timeout for {}", &hostname);
1460 call_hostname_resolution_listener(
1461 &self.hostname_resolvers,
1462 &hostname,
1463 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1464 );
1465 call_hostname_resolution_listener(
1466 &self.hostname_resolvers,
1467 &hostname,
1468 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1469 );
1470 self.hostname_resolvers.remove(&hostname);
1471 }
1472
1473 while let Ok(command) = receiver.try_recv() {
1475 if matches!(command, Command::Exit(_)) {
1476 debug!("Exit command received, performing cleanup");
1477 self.cleanup();
1478 self.status = DaemonStatus::Shutdown;
1479 return Some(command);
1480 }
1481 self.exec_command(command, false);
1482 }
1483
1484 let mut i = 0;
1486 while i < self.retransmissions.len() {
1487 if now >= self.retransmissions[i].next_time {
1488 let rerun = self.retransmissions.remove(i);
1489 self.exec_command(rerun.command, true);
1490 } else {
1491 i += 1;
1492 }
1493 }
1494
1495 self.refresh_active_services();
1497
1498 let mut query_count = 0;
1500 for (hostname, _sender) in self.hostname_resolvers.iter() {
1501 for (hostname, ip_addr) in
1502 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1503 {
1504 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1505 query_count += 1;
1506 }
1507 }
1508
1509 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1510
1511 let now = current_time_millis();
1513
1514 let expired_services = self.cache.evict_expired_services(now);
1516 if !expired_services.is_empty() {
1517 debug!(
1518 "run: send {} service removal to listeners",
1519 expired_services.len()
1520 );
1521 self.notify_service_removal(expired_services);
1522 }
1523
1524 let expired_addrs = self.cache.evict_expired_addr(now);
1526 for (hostname, addrs) in expired_addrs {
1527 call_hostname_resolution_listener(
1528 &self.hostname_resolvers,
1529 &hostname,
1530 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1531 );
1532 let instances = self.cache.get_instances_on_host(&hostname);
1533 let instance_set: HashSet<String> = instances.into_iter().collect();
1534 self.resolve_updated_instances(&instance_set);
1535 }
1536
1537 self.probing_handler();
1539
1540 if now >= next_ip_check && next_ip_check > 0 {
1542 next_ip_check = now + self.ip_check_interval;
1543 self.add_timer(next_ip_check);
1544
1545 self.check_ip_changes();
1546 }
1547 }
1548 }
1549
1550 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1551 match daemon_opt {
1552 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1553 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1554 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1555 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1556 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1557 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1558 DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1559 DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1560 #[cfg(test)]
1561 DaemonOption::TestDownInterface(ifname) => {
1562 self.test_down_interfaces.insert(ifname);
1563 }
1564 #[cfg(test)]
1565 DaemonOption::TestUpInterface(ifname) => {
1566 self.test_down_interfaces.remove(&ifname);
1567 }
1568 }
1569 }
1570
1571 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1572 debug!("enable_interface: {:?}", kinds);
1573 let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1574
1575 for if_kind in kinds {
1576 self.if_selections.push(IfSelection {
1577 if_kind: resolve_addr_to_index(if_kind, &interfaces),
1578 selected: true,
1579 });
1580 }
1581
1582 self.apply_intf_selections(interfaces);
1583 }
1584
1585 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1586 debug!("disable_interface: {:?}", kinds);
1587 let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1588
1589 for if_kind in kinds {
1590 self.if_selections.push(IfSelection {
1591 if_kind: resolve_addr_to_index(if_kind, &interfaces),
1592 selected: false,
1593 });
1594 }
1595
1596 self.apply_intf_selections(interfaces);
1597 }
1598
1599 fn set_multicast_loop_v4(&mut self, on: bool) {
1600 let Some(sock) = self.ipv4_sock.as_mut() else {
1601 return;
1602 };
1603 self.multicast_loop_v4 = on;
1604 sock.pktinfo
1605 .set_multicast_loop_v4(on)
1606 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1607 .unwrap();
1608 }
1609
1610 fn set_multicast_loop_v6(&mut self, on: bool) {
1611 let Some(sock) = self.ipv6_sock.as_mut() else {
1612 return;
1613 };
1614 self.multicast_loop_v6 = on;
1615 sock.pktinfo
1616 .set_multicast_loop_v6(on)
1617 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1618 .unwrap();
1619 }
1620
1621 fn set_accept_unsolicited(&mut self, accept: bool) {
1622 self.accept_unsolicited = accept;
1623 }
1624
1625 fn set_apple_p2p(&mut self, include: bool) {
1626 if self.include_apple_p2p != include {
1627 self.include_apple_p2p = include;
1628 self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1629 }
1630 }
1631
1632 fn notify_monitors(&mut self, event: DaemonEvent) {
1633 self.monitors.retain(|sender| {
1635 if let Err(e) = sender.try_send(event.clone()) {
1636 debug!("notify_monitors: try_send: {}", &e);
1637 if matches!(e, TrySendError::Disconnected(_)) {
1638 return false; }
1640 }
1641 true
1642 });
1643 }
1644
1645 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1647 for (_, service_info) in self.my_services.iter_mut() {
1648 if service_info.is_addr_auto() {
1649 service_info.remove_ipaddr(addr);
1650 }
1651 }
1652 }
1653
1654 fn add_timer(&mut self, next_time: u64) {
1655 self.timers.push(Reverse(next_time));
1656 }
1657
1658 fn peek_earliest_timer(&self) -> Option<u64> {
1659 self.timers.peek().map(|Reverse(v)| *v)
1660 }
1661
1662 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1663 self.timers.pop().map(|Reverse(v)| v)
1664 }
1665
1666 fn pop_timers_till(&mut self, now: u64) {
1668 while let Some(Reverse(v)) = self.timers.peek() {
1669 if *v > now {
1670 break;
1671 }
1672 self.timers.pop();
1673 }
1674 }
1675
1676 fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1678 let intf_count = interfaces.len();
1679 let mut intf_selections = vec![true; intf_count];
1680
1681 for selection in self.if_selections.iter() {
1683 for i in 0..intf_count {
1685 if selection.if_kind.matches(&interfaces[i]) {
1686 intf_selections[i] = selection.selected;
1687 }
1688 }
1689 }
1690
1691 let mut selected_addrs = HashSet::new();
1692 for i in 0..intf_count {
1693 if intf_selections[i] {
1694 selected_addrs.insert(interfaces[i].clone());
1695 }
1696 }
1697
1698 selected_addrs
1699 }
1700
1701 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1706 let intf_count = interfaces.len();
1708 let mut intf_selections = vec![true; intf_count];
1709
1710 for selection in self.if_selections.iter() {
1712 for i in 0..intf_count {
1714 if selection.if_kind.matches(&interfaces[i]) {
1715 intf_selections[i] = selection.selected;
1716 }
1717 }
1718 }
1719
1720 for (idx, intf) in interfaces.into_iter().enumerate() {
1722 if intf_selections[idx] {
1723 self.add_interface(intf);
1725 } else {
1726 self.del_interface_addr(&intf);
1728 }
1729 }
1730 }
1731
1732 fn del_ip(&mut self, ip: IpAddr) {
1733 self.del_addr_in_my_services(&ip);
1734 self.notify_monitors(DaemonEvent::IpDel(ip));
1735 }
1736
1737 fn check_ip_changes(&mut self) {
1739 let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1741
1742 #[cfg(test)]
1743 let my_ifaddrs: Vec<_> = my_ifaddrs
1744 .into_iter()
1745 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1746 .collect();
1747
1748 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1749 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1750 let if_index = intf.index.unwrap_or(0);
1751 acc.entry(if_index).or_default().push(&intf.addr);
1752 acc
1753 });
1754
1755 let mut deleted_intfs = Vec::new();
1756 let mut deleted_ips = Vec::new();
1757
1758 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1759 let mut last_ipv4 = None;
1760 let mut last_ipv6 = None;
1761
1762 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1763 my_intf.addrs.retain(|addr| {
1764 if current_addrs.contains(&addr) {
1765 true
1766 } else {
1767 match addr.ip() {
1768 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1769 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1770 }
1771 deleted_ips.push(addr.ip());
1772 false
1773 }
1774 });
1775 if my_intf.addrs.is_empty() {
1776 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1777 }
1778 } else {
1779 debug!(
1781 "check_ip_changes: interface {} ({}) no longer exists, removing",
1782 my_intf.name, if_index
1783 );
1784 for addr in my_intf.addrs.iter() {
1785 match addr.ip() {
1786 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1787 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1788 }
1789 deleted_ips.push(addr.ip())
1790 }
1791 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1792 }
1793 }
1794
1795 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1796 debug!(
1797 "check_ip_changes: {} deleted ips {} deleted intfs",
1798 deleted_ips.len(),
1799 deleted_intfs.len()
1800 );
1801 }
1802
1803 for ip in deleted_ips {
1804 self.del_ip(ip);
1805 }
1806
1807 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1808 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1809 continue;
1810 };
1811
1812 if let Some(ipv4) = last_ipv4 {
1813 debug!("leave multicast for {ipv4}");
1814 if let Some(sock) = self.ipv4_sock.as_mut() {
1815 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1816 debug!("leave multicast group for addr {ipv4}: {e}");
1817 }
1818 }
1819 }
1820
1821 if let Some(ipv6) = last_ipv6 {
1822 debug!("leave multicast for {ipv6}");
1823 if let Some(sock) = self.ipv6_sock.as_mut() {
1824 if let Err(e) = sock
1825 .pktinfo
1826 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1827 {
1828 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1829 }
1830 }
1831 }
1832
1833 let intf_id = InterfaceId {
1835 name: my_intf.name.to_string(),
1836 index: my_intf.index,
1837 };
1838 let result = self.cache.remove_records_on_intf(intf_id);
1839 self.notify_service_removal(result.removed_instances);
1840 self.resolve_updated_instances(&result.modified_instances);
1841 }
1842
1843 self.apply_intf_selections(my_ifaddrs);
1845 }
1846
1847 fn del_interface_addr(&mut self, intf: &Interface) {
1850 let if_index = intf.index.unwrap_or(0);
1851 debug!(
1852 "del_interface_addr: {} ({if_index}) addr {}",
1853 intf.name,
1854 intf.ip()
1855 );
1856
1857 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1858 debug!("del_interface_addr: interface {} not found", intf.name);
1859 return;
1860 };
1861
1862 let mut ip_removed = false;
1863
1864 if my_intf.addrs.remove(&intf.addr) {
1865 ip_removed = true;
1866
1867 match intf.addr.ip() {
1868 IpAddr::V4(ipv4) => {
1869 if my_intf.next_ifaddr_v4().is_none() {
1870 if let Some(sock) = self.ipv4_sock.as_mut() {
1871 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1872 debug!("leave multicast group for addr {ipv4}: {e}");
1873 } else {
1874 debug!("leave multicast for {ipv4}");
1875 }
1876 }
1877 }
1878 }
1879
1880 IpAddr::V6(ipv6) => {
1881 if my_intf.next_ifaddr_v6().is_none() {
1882 if let Some(sock) = self.ipv6_sock.as_mut() {
1883 if let Err(e) =
1884 sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1885 {
1886 debug!("leave multicast group for addr {ipv6}: {e}");
1887 }
1888 }
1889 }
1890 }
1891 }
1892
1893 if my_intf.addrs.is_empty() {
1894 debug!("del_interface_addr: removing interface {}", intf.name);
1896 self.my_intfs.remove(&if_index);
1897 self.dns_registry_map.remove(&if_index);
1898 self.cache
1899 .remove_addrs_on_disabled_intf(if_index, IpType::BOTH);
1900 } else {
1901 let is_v4 = intf.addr.ip().is_ipv4();
1905 let version_gone = if is_v4 {
1906 my_intf.next_ifaddr_v4().is_none()
1907 } else {
1908 my_intf.next_ifaddr_v6().is_none()
1909 };
1910 if version_gone {
1911 let ip_type = if is_v4 { IpType::V4 } else { IpType::V6 };
1912 self.cache.remove_addrs_on_disabled_intf(if_index, ip_type);
1913 }
1914 }
1915 }
1916
1917 if ip_removed {
1918 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1920 self.del_addr_in_my_services(&intf.ip());
1922 }
1923 }
1924
1925 fn add_interface(&mut self, intf: Interface) {
1926 let sock_opt = if intf.ip().is_ipv4() {
1927 &self.ipv4_sock
1928 } else {
1929 &self.ipv6_sock
1930 };
1931
1932 let Some(sock) = sock_opt else {
1933 debug!(
1934 "add_interface: no socket available for interface {} with addr {}. Skipped.",
1935 intf.name,
1936 intf.ip()
1937 );
1938 return;
1939 };
1940
1941 let if_index = intf.index.unwrap_or(0);
1942 let mut new_addr = false;
1943
1944 match self.my_intfs.entry(if_index) {
1945 Entry::Occupied(mut entry) => {
1946 let my_intf = entry.get_mut();
1948 if !my_intf.addrs.contains(&intf.addr) {
1949 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1950 debug!("add_interface: socket_config {}: {e}", &intf.name);
1951 }
1952 my_intf.addrs.insert(intf.addr.clone());
1953 new_addr = true;
1954 }
1955 }
1956 Entry::Vacant(entry) => {
1957 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1958 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1959 return;
1960 }
1961
1962 new_addr = true;
1963 let new_intf = MyIntf {
1964 name: intf.name.clone(),
1965 index: if_index,
1966 addrs: HashSet::from([intf.addr.clone()]),
1967 };
1968 entry.insert(new_intf);
1969 }
1970 }
1971
1972 if !new_addr {
1973 trace!("add_interface: interface {} already exists", &intf.name);
1974 return;
1975 }
1976
1977 debug!("add new interface {}: {}", intf.name, intf.ip());
1978
1979 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1980 debug!("add_interface: cannot find if_index {if_index}");
1981 return;
1982 };
1983
1984 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1985 Some(registry) => registry,
1986 None => self
1987 .dns_registry_map
1988 .entry(if_index)
1989 .or_insert_with(DnsRegistry::new),
1990 };
1991
1992 for (_, service_info) in self.my_services.iter_mut() {
1993 if service_info.is_addr_auto() {
1994 service_info.insert_ipaddr(&intf);
1995
1996 if let Ok(true) = announce_service_on_intf(
1997 dns_registry,
1998 service_info,
1999 my_intf,
2000 &sock.pktinfo,
2001 self.port,
2002 ) {
2003 debug!(
2004 "Announce service {} on {}",
2005 service_info.get_fullname(),
2006 intf.ip()
2007 );
2008 service_info.set_status(if_index, ServiceStatus::Announced);
2009 } else {
2010 for timer in dns_registry.new_timers.drain(..) {
2011 self.timers.push(Reverse(timer));
2012 }
2013 service_info.set_status(if_index, ServiceStatus::Probing);
2014 }
2015 }
2016 }
2017
2018 if let Some(my_intf) = self.my_intfs.get(&if_index) {
2023 for ty in self.service_queriers.keys() {
2024 self.send_query_on_intf(ty, RRType::PTR, my_intf);
2025 }
2026 }
2027
2028 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
2030 }
2031
2032 fn register_service(&mut self, mut info: ServiceInfo) {
2041 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
2043 error!("check_service_name_length: {}", &e);
2044 self.notify_monitors(DaemonEvent::Error(e));
2045 return;
2046 }
2047
2048 if info.is_addr_auto() {
2049 let selected_intfs =
2050 self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
2051 for intf in selected_intfs {
2052 info.insert_ipaddr(&intf);
2053 }
2054 }
2055
2056 debug!("register service {:?}", &info);
2057
2058 let outgoing_addrs = self.send_unsolicited_response(&mut info);
2059 if !outgoing_addrs.is_empty() {
2060 self.notify_monitors(DaemonEvent::Announce(
2061 info.get_fullname().to_string(),
2062 format!("{:?}", &outgoing_addrs),
2063 ));
2064 }
2065
2066 let service_fullname = info.get_fullname().to_lowercase();
2069 self.my_services.insert(service_fullname, info);
2070 }
2071
2072 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
2075 let mut outgoing_addrs = Vec::new();
2076 let mut outgoing_intfs = HashSet::new();
2077
2078 let mut invalid_intf_addrs = HashSet::new();
2079
2080 for (if_index, intf) in self.my_intfs.iter() {
2081 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
2082 Some(registry) => registry,
2083 None => self
2084 .dns_registry_map
2085 .entry(*if_index)
2086 .or_insert_with(DnsRegistry::new),
2087 };
2088
2089 let mut announced = false;
2090
2091 if let Some(sock) = self.ipv4_sock.as_mut() {
2093 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
2094 Ok(true) => {
2095 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
2096 outgoing_addrs.push(addr.ip());
2097 }
2098 outgoing_intfs.insert(intf.index);
2099
2100 debug!(
2101 "Announce service IPv4 {} on {}",
2102 info.get_fullname(),
2103 intf.name
2104 );
2105 announced = true;
2106 }
2107 Ok(false) => {}
2108 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2109 invalid_intf_addrs.insert(intf_addr);
2110 }
2111 }
2112 }
2113
2114 if let Some(sock) = self.ipv6_sock.as_mut() {
2115 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
2116 Ok(true) => {
2117 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
2118 outgoing_addrs.push(addr.ip());
2119 }
2120 outgoing_intfs.insert(intf.index);
2121
2122 debug!(
2123 "Announce service IPv6 {} on {}",
2124 info.get_fullname(),
2125 intf.name
2126 );
2127 announced = true;
2128 }
2129 Ok(false) => {}
2130 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2131 invalid_intf_addrs.insert(intf_addr);
2132 }
2133 }
2134 }
2135
2136 if announced {
2137 info.set_status(intf.index, ServiceStatus::Announced);
2138 } else {
2139 for timer in dns_registry.new_timers.drain(..) {
2140 self.timers.push(Reverse(timer));
2141 }
2142 info.set_status(*if_index, ServiceStatus::Probing);
2143 }
2144 }
2145
2146 if !invalid_intf_addrs.is_empty() {
2147 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2148 }
2149
2150 let next_time = current_time_millis() + 1000;
2154 for if_index in outgoing_intfs {
2155 self.add_retransmission(
2156 next_time,
2157 Command::RegisterResend(info.get_fullname().to_string(), if_index),
2158 );
2159 }
2160
2161 outgoing_addrs
2162 }
2163
2164 fn probing_handler(&mut self) {
2166 let now = current_time_millis();
2167 let mut invalid_intf_addrs = HashSet::new();
2168
2169 for (if_index, intf) in self.my_intfs.iter() {
2170 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2171 continue;
2172 };
2173
2174 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2175
2176 if !out.questions().is_empty() {
2178 trace!("sending out probing of questions: {:?}", out.questions());
2179 if let Some(sock) = self.ipv4_sock.as_mut() {
2180 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2181 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2182 {
2183 invalid_intf_addrs.insert(intf_addr);
2184 }
2185 }
2186 if let Some(sock) = self.ipv6_sock.as_mut() {
2187 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2188 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2189 {
2190 invalid_intf_addrs.insert(intf_addr);
2191 }
2192 }
2193 }
2194
2195 let waiting_services =
2197 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2198
2199 for service_name in waiting_services {
2200 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2202 if info.get_status(*if_index) == ServiceStatus::Announced {
2203 debug!("service {} already announced", info.get_fullname());
2204 continue;
2205 }
2206
2207 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2208 match announce_service_on_intf(
2209 dns_registry,
2210 info,
2211 intf,
2212 &sock.pktinfo,
2213 self.port,
2214 ) {
2215 Ok(announced) => announced,
2216 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2217 invalid_intf_addrs.insert(intf_addr);
2218 false
2219 }
2220 }
2221 } else {
2222 false
2223 };
2224 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2225 match announce_service_on_intf(
2226 dns_registry,
2227 info,
2228 intf,
2229 &sock.pktinfo,
2230 self.port,
2231 ) {
2232 Ok(announced) => announced,
2233 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2234 invalid_intf_addrs.insert(intf_addr);
2235 false
2236 }
2237 }
2238 } else {
2239 false
2240 };
2241
2242 if announced_v4 || announced_v6 {
2243 let next_time = now + 1000;
2244 let command =
2245 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2246 self.retransmissions.push(ReRun { next_time, command });
2247 self.timers.push(Reverse(next_time));
2248
2249 let fullname = dns_registry.resolve_name(&service_name).to_string();
2250
2251 let hostname = dns_registry.resolve_name(info.get_hostname());
2252
2253 debug!("wake up: announce service {} on {}", fullname, intf.name);
2254 notify_monitors(
2255 &mut self.monitors,
2256 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2257 );
2258
2259 info.set_status(*if_index, ServiceStatus::Announced);
2260 }
2261 }
2262 }
2263 }
2264
2265 if !invalid_intf_addrs.is_empty() {
2266 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2267 }
2268 }
2269
2270 fn unregister_service(
2271 &self,
2272 info: &ServiceInfo,
2273 intf: &MyIntf,
2274 sock: &PktInfoUdpSocket,
2275 ) -> Vec<u8> {
2276 let is_ipv4 = sock.domain() == Domain::IPV4;
2277
2278 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2279 out.add_answer_at_time(
2280 DnsPointer::new(
2281 info.get_type(),
2282 RRType::PTR,
2283 CLASS_IN,
2284 0,
2285 info.get_fullname().to_string(),
2286 ),
2287 0,
2288 );
2289
2290 if let Some(sub) = info.get_subtype() {
2291 trace!("Adding subdomain {}", sub);
2292 out.add_answer_at_time(
2293 DnsPointer::new(
2294 sub,
2295 RRType::PTR,
2296 CLASS_IN,
2297 0,
2298 info.get_fullname().to_string(),
2299 ),
2300 0,
2301 );
2302 }
2303
2304 out.add_answer_at_time(
2305 DnsSrv::new(
2306 info.get_fullname(),
2307 CLASS_IN | CLASS_CACHE_FLUSH,
2308 0,
2309 info.get_priority(),
2310 info.get_weight(),
2311 info.get_port(),
2312 info.get_hostname().to_string(),
2313 ),
2314 0,
2315 );
2316 out.add_answer_at_time(
2317 DnsTxt::new(
2318 info.get_fullname(),
2319 CLASS_IN | CLASS_CACHE_FLUSH,
2320 0,
2321 info.generate_txt(),
2322 ),
2323 0,
2324 );
2325
2326 let if_addrs = if is_ipv4 {
2327 info.get_addrs_on_my_intf_v4(intf)
2328 } else {
2329 info.get_addrs_on_my_intf_v6(intf)
2330 };
2331
2332 if if_addrs.is_empty() {
2333 return vec![];
2334 }
2335
2336 for address in if_addrs {
2337 out.add_answer_at_time(
2338 DnsAddress::new(
2339 info.get_hostname(),
2340 ip_address_rr_type(&address),
2341 CLASS_IN | CLASS_CACHE_FLUSH,
2342 0,
2343 address,
2344 intf.into(),
2345 ),
2346 0,
2347 );
2348 }
2349
2350 let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port, None, None) {
2352 Ok(sent_vec) => sent_vec,
2353 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2354 let invalid_intf_addrs = HashSet::from([intf_addr]);
2355 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2356 vec![]
2357 }
2358 };
2359 sent_vec.into_iter().next().unwrap_or_default()
2360 }
2361
2362 fn add_hostname_resolver(
2366 &mut self,
2367 hostname: String,
2368 listener: Sender<HostnameResolutionEvent>,
2369 timeout: Option<u64>,
2370 ) {
2371 let real_timeout = timeout.map(|t| current_time_millis() + t);
2372 self.hostname_resolvers
2373 .insert(hostname.to_lowercase(), (listener, real_timeout));
2374 if let Some(t) = real_timeout {
2375 self.add_timer(t);
2376 }
2377 }
2378
2379 fn send_query(&self, name: &str, qtype: RRType) {
2381 self.send_query_vec(&[(name, qtype)]);
2382 }
2383
2384 fn send_query_on_intf(&self, name: &str, qtype: RRType, intf: &MyIntf) {
2389 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2390 out.add_question(name, qtype);
2391
2392 let mut invalid_intf_addrs = HashSet::new();
2393 if let Some(sock) = self.ipv4_sock.as_ref() {
2394 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2395 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2396 {
2397 invalid_intf_addrs.insert(intf_addr);
2398 }
2399 }
2400 if let Some(sock) = self.ipv6_sock.as_ref() {
2401 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2402 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2403 {
2404 invalid_intf_addrs.insert(intf_addr);
2405 }
2406 }
2407 if !invalid_intf_addrs.is_empty() {
2408 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2409 }
2410 }
2411
2412 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2414 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2415 let now = current_time_millis();
2416
2417 for (name, qtype) in questions {
2418 out.add_question(name, *qtype);
2419
2420 for record in self.cache.get_known_answers(name, *qtype, now) {
2421 trace!("add known answer: {:?}", record.record);
2429 let mut new_record = record.record.clone();
2430 new_record.get_record_mut().update_ttl(now);
2431 out.add_answer_box(new_record);
2432 }
2433 }
2434
2435 let mut invalid_intf_addrs = HashSet::new();
2436 for (_, intf) in self.my_intfs.iter() {
2437 if let Some(sock) = self.ipv4_sock.as_ref() {
2438 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2439 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2440 {
2441 invalid_intf_addrs.insert(intf_addr);
2442 }
2443 }
2444 if let Some(sock) = self.ipv6_sock.as_ref() {
2445 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2446 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2447 {
2448 invalid_intf_addrs.insert(intf_addr);
2449 }
2450 }
2451 }
2452
2453 if !invalid_intf_addrs.is_empty() {
2454 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2455 }
2456 }
2457
2458 fn handle_read(&mut self, event_key: usize) -> bool {
2463 let sock_opt = match event_key {
2464 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2465 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2466 _ => {
2467 debug!("handle_read: unknown token {}", event_key);
2468 return false;
2469 }
2470 };
2471 let Some(sock) = sock_opt.as_mut() else {
2472 debug!("handle_read: socket not available for token {}", event_key);
2473 return false;
2474 };
2475 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2476
2477 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2484 Ok(sz) => sz,
2485 Err(e) => {
2486 if e.kind() != std::io::ErrorKind::WouldBlock {
2487 debug!("listening socket read failed: {}", e);
2488 }
2489 return false;
2490 }
2491 };
2492
2493 let pkt_if_index = pktinfo.if_index as u32;
2495 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2496 debug!(
2497 "handle_read: no interface found for pktinfo if_index: {}",
2498 pktinfo.if_index
2499 );
2500 return true; };
2502
2503 let is_ipv4 = event_key == IPV4_SOCK_EVENT_KEY;
2508 if (is_ipv4 && my_intf.next_ifaddr_v4().is_none())
2509 || (!is_ipv4 && my_intf.next_ifaddr_v6().is_none())
2510 {
2511 debug!(
2512 "handle_read: dropping {} packet on intf {} (disabled)",
2513 if is_ipv4 { "IPv4" } else { "IPv6" },
2514 my_intf.name
2515 );
2516 return true;
2517 }
2518
2519 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
2522 Ok(msg) => {
2523 if msg.is_query() {
2524 let querier_addr = pktinfo.addr_src;
2525 self.handle_query(msg, pkt_if_index, querier_addr);
2526 } else if msg.is_response() {
2527 self.handle_response(msg, pkt_if_index);
2528 } else {
2529 debug!("Invalid message: not query and not response");
2530 }
2531 }
2532 Err(e) => debug!("Invalid incoming DNS message: {}", e),
2533 }
2534
2535 true
2536 }
2537
2538 fn query_unresolved(&mut self, instance: &str) -> bool {
2540 if !valid_instance_name(instance) {
2541 trace!("instance name {} not valid", instance);
2542 return false;
2543 }
2544
2545 if let Some(records) = self.cache.get_srv(instance) {
2546 for record in records {
2547 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2548 if self.cache.get_addr(srv.host()).is_none() {
2549 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2550 return true;
2551 }
2552 }
2553 }
2554 } else {
2555 self.send_query(instance, RRType::ANY);
2556 return true;
2557 }
2558
2559 false
2560 }
2561
2562 fn query_cache_for_service(
2565 &mut self,
2566 ty_domain: &str,
2567 sender: &Sender<ServiceEvent>,
2568 now: u64,
2569 ) {
2570 let mut resolved: HashSet<String> = HashSet::new();
2571 let mut unresolved: HashSet<String> = HashSet::new();
2572
2573 if let Some(records) = self.cache.get_ptr(ty_domain) {
2574 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2575 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2576 let mut new_event = None;
2577 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2578 Ok(resolved_service) => {
2579 if resolved_service.is_valid() {
2580 debug!("Resolved service from cache: {}", ptr.alias());
2581 new_event =
2582 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2583 } else {
2584 debug!("Resolved service is not valid: {}", ptr.alias());
2585 }
2586 }
2587 Err(err) => {
2588 debug!("Error while resolving service from cache: {}", err);
2589 continue;
2590 }
2591 }
2592
2593 match sender.send(ServiceEvent::ServiceFound(
2594 ty_domain.to_string(),
2595 ptr.alias().to_string(),
2596 )) {
2597 Ok(()) => debug!("sent service found {}", ptr.alias()),
2598 Err(e) => {
2599 debug!("failed to send service found: {}", e);
2600 continue;
2601 }
2602 }
2603
2604 if let Some(event) = new_event {
2605 resolved.insert(ptr.alias().to_string());
2606 match sender.send(event) {
2607 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2608 Err(e) => debug!("failed to send service resolved: {}", e),
2609 }
2610 } else {
2611 unresolved.insert(ptr.alias().to_string());
2612 }
2613 }
2614 }
2615 }
2616
2617 for instance in resolved.drain() {
2618 self.pending_resolves.remove(&instance);
2619 self.resolved.insert(instance);
2620 }
2621
2622 for instance in unresolved.drain() {
2623 self.add_pending_resolve(instance);
2624 }
2625 }
2626
2627 fn query_cache_for_hostname(
2630 &mut self,
2631 hostname: &str,
2632 sender: Sender<HostnameResolutionEvent>,
2633 ) {
2634 let addresses_map = self.cache.get_addresses_for_host(hostname);
2635 for (name, addresses) in addresses_map {
2636 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2637 Ok(()) => trace!("sent hostname addresses found"),
2638 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2639 }
2640 }
2641 }
2642
2643 fn add_pending_resolve(&mut self, instance: String) {
2644 if !self.pending_resolves.contains(&instance) {
2645 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2646 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2647 self.pending_resolves.insert(instance);
2648 }
2649 }
2650
2651 fn resolve_service_from_cache(
2653 &self,
2654 ty_domain: &str,
2655 fullname: &str,
2656 ) -> Result<ResolvedService> {
2657 let now = current_time_millis();
2658 let mut resolved_service = ResolvedService {
2659 ty_domain: ty_domain.to_string(),
2660 sub_ty_domain: None,
2661 fullname: fullname.to_string(),
2662 host: String::new(),
2663 port: 0,
2664 addresses: HashSet::new(),
2665 txt_properties: TxtProperties::new(),
2666 };
2667
2668 if let Some(subtype) = self.cache.get_subtype(fullname) {
2670 trace!(
2671 "ty_domain: {} found subtype {} for instance: {}",
2672 ty_domain,
2673 subtype,
2674 fullname
2675 );
2676 if resolved_service.sub_ty_domain.is_none() {
2677 resolved_service.sub_ty_domain = Some(subtype.to_string());
2678 }
2679 }
2680
2681 if let Some(records) = self.cache.get_srv(fullname) {
2683 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2684 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2685 resolved_service.host = dns_srv.host().to_string();
2686 resolved_service.port = dns_srv.port();
2687 }
2688 }
2689 }
2690
2691 if let Some(records) = self.cache.get_txt(fullname) {
2693 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2694 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2695 resolved_service.txt_properties = dns_txt.text().into();
2696 }
2697 }
2698 }
2699
2700 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2702 for answer in records.iter() {
2703 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2704 if dns_a.expires_soon(now) {
2705 trace!(
2706 "Addr expired or expires soon: {}",
2707 dns_a.address().to_ip_addr()
2708 );
2709 } else {
2710 let scoped = dns_a.address();
2711 if let ScopedIp::V4(v4) = &scoped {
2712 let existing = resolved_service
2715 .addresses
2716 .iter()
2717 .find(|a| a.to_ip_addr() == IpAddr::V4(*v4.addr()))
2718 .cloned();
2719 if let Some(mut existing) = existing {
2720 resolved_service.addresses.remove(&existing);
2721 if let ScopedIp::V4(existing_v4) = &mut existing {
2722 for id in v4.interface_ids() {
2723 existing_v4.add_interface_id(id.clone());
2724 }
2725 }
2726 resolved_service.addresses.insert(existing);
2727 } else {
2728 resolved_service.addresses.insert(scoped);
2729 }
2730 } else {
2731 resolved_service.addresses.insert(scoped);
2732 }
2733 }
2734 }
2735 }
2736 }
2737
2738 Ok(resolved_service)
2739 }
2740
2741 fn handle_poller_events(&mut self, events: &mio::Events) {
2742 for ev in events.iter() {
2743 trace!("event received with key {:?}", ev.token());
2744 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2745 self.signal_sock_drain();
2747
2748 if let Err(e) = self.poller.registry().reregister(
2749 &mut self.signal_sock,
2750 ev.token(),
2751 mio::Interest::READABLE,
2752 ) {
2753 debug!("failed to modify poller for signal socket: {}", e);
2754 }
2755 continue; }
2757
2758 while self.handle_read(ev.token().0) {}
2760
2761 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2763 if let Some(sock) = self.ipv4_sock.as_mut() {
2765 if let Err(e) =
2766 self.poller
2767 .registry()
2768 .reregister(sock, ev.token(), mio::Interest::READABLE)
2769 {
2770 debug!("modify poller for IPv4 socket: {}", e);
2771 }
2772 }
2773 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2774 if let Some(sock) = self.ipv6_sock.as_mut() {
2776 if let Err(e) =
2777 self.poller
2778 .registry()
2779 .reregister(sock, ev.token(), mio::Interest::READABLE)
2780 {
2781 debug!("modify poller for IPv6 socket: {}", e);
2782 }
2783 }
2784 }
2785 }
2786 }
2787
2788 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2791 let now = current_time_millis();
2792
2793 let mut record_predicate = |record: &DnsRecordBox| {
2795 if !record.get_record().is_expired(now) {
2796 return true;
2797 }
2798
2799 debug!("record is expired, removing it from cache.");
2800 if self.cache.remove(record) {
2801 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2803 call_service_listener(
2804 &self.service_queriers,
2805 dns_ptr.get_name(),
2806 ServiceEvent::ServiceRemoved(
2807 dns_ptr.get_name().to_string(),
2808 dns_ptr.alias().to_string(),
2809 ),
2810 );
2811 }
2812 }
2813 false
2814 };
2815 msg.answers_mut().retain(&mut record_predicate);
2816 msg.authorities_mut().retain(&mut record_predicate);
2817 msg.additionals_mut().retain(&mut record_predicate);
2818
2819 self.conflict_handler(&msg, if_index);
2821
2822 let mut is_for_us = true; for answer in msg.answers() {
2829 if answer.get_type() == RRType::PTR {
2830 if self.service_queriers.contains_key(answer.get_name()) {
2831 is_for_us = true;
2832 break; } else {
2834 is_for_us = false;
2835 }
2836 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2837 let answer_lowercase = answer.get_name().to_lowercase();
2839 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2840 is_for_us = true;
2841 break; }
2843 }
2844 }
2845
2846 if self.accept_unsolicited {
2848 is_for_us = true;
2849 }
2850
2851 struct InstanceChange {
2853 ty: RRType, name: String, }
2856
2857 let mut changes = Vec::new();
2865 let mut timers = Vec::new();
2866 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2867 return;
2868 };
2869 for record in msg.all_records() {
2870 match self
2871 .cache
2872 .add_or_update(my_intf, record, &mut timers, is_for_us)
2873 {
2874 Some((dns_record, true)) => {
2875 timers.push(dns_record.record.get_record().get_expire_time());
2876 timers.push(dns_record.record.get_record().get_refresh_time());
2877
2878 let ty = dns_record.record.get_type();
2879 let name = dns_record.record.get_name();
2880
2881 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2883 if self.service_queriers.contains_key(name) {
2884 timers.push(dns_record.record.get_record().get_refresh_time());
2885 }
2886
2887 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2889 {
2890 debug!("calling listener with service found: {name}");
2891 call_service_listener(
2892 &self.service_queriers,
2893 name,
2894 ServiceEvent::ServiceFound(
2895 name.to_string(),
2896 dns_ptr.alias().to_string(),
2897 ),
2898 );
2899 changes.push(InstanceChange {
2900 ty,
2901 name: dns_ptr.alias().to_string(),
2902 });
2903 }
2904 } else {
2905 changes.push(InstanceChange {
2906 ty,
2907 name: name.to_string(),
2908 });
2909 }
2910 }
2911 Some((dns_record, false)) => {
2912 timers.push(dns_record.record.get_record().get_expire_time());
2913 timers.push(dns_record.record.get_record().get_refresh_time());
2914 }
2915 _ => {}
2916 }
2917 }
2918
2919 for t in timers {
2921 self.add_timer(t);
2922 }
2923
2924 for change in changes
2926 .iter()
2927 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2928 {
2929 let addr_map = self.cache.get_addresses_for_host(&change.name);
2930 for (name, addresses) in addr_map {
2931 call_hostname_resolution_listener(
2932 &self.hostname_resolvers,
2933 &change.name,
2934 HostnameResolutionEvent::AddressesFound(name, addresses),
2935 )
2936 }
2937 }
2938
2939 let mut updated_instances = HashSet::new();
2941 for update in changes {
2942 match update.ty {
2943 RRType::PTR | RRType::SRV | RRType::TXT => {
2944 updated_instances.insert(update.name);
2945 }
2946 RRType::A | RRType::AAAA => {
2947 let instances = self.cache.get_instances_on_host(&update.name);
2948 updated_instances.extend(instances);
2949 }
2950 _ => {}
2951 }
2952 }
2953
2954 self.resolve_updated_instances(&updated_instances);
2955 }
2956
2957 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2958 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2959 debug!("handle_response: no intf found for index {if_index}");
2960 return;
2961 };
2962
2963 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2964 return;
2965 };
2966
2967 for answer in msg.answers().iter() {
2968 let mut new_records = Vec::new();
2969
2970 let name = answer.get_name();
2971 let Some(probe) = dns_registry.probing.get_mut(name) else {
2972 continue;
2973 };
2974
2975 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2977 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2978 if answer_addr.interface_id.index != if_index {
2979 debug!(
2980 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2981 answer_addr, my_intf.name
2982 );
2983 continue;
2984 }
2985 }
2986
2987 let any_match = probe.records.iter().any(|r| {
2990 r.get_type() == answer.get_type()
2991 && r.get_class() == answer.get_class()
2992 && r.rrdata_match(answer.as_ref())
2993 });
2994 if any_match {
2995 continue; }
2997 }
2998
2999 probe.records.retain(|record| {
3000 if record.get_type() == answer.get_type()
3001 && record.get_class() == answer.get_class()
3002 && !record.rrdata_match(answer.as_ref())
3003 {
3004 debug!(
3005 "found conflict name: '{name}' record: {}: {} PEER: {}",
3006 record.get_type(),
3007 record.rdata_print(),
3008 answer.rdata_print()
3009 );
3010
3011 let mut new_record = record.clone();
3014 let new_name = match record.get_type() {
3015 RRType::A => hostname_change(name),
3016 RRType::AAAA => hostname_change(name),
3017 _ => name_change(name),
3018 };
3019 new_record.get_record_mut().set_new_name(new_name);
3020 new_records.push(new_record);
3021 return false; }
3023
3024 true
3025 });
3026
3027 let create_time = current_time_millis() + fastrand::u64(0..250);
3034
3035 let waiting_services = probe.waiting_services.clone();
3036
3037 for record in new_records {
3038 if dns_registry.update_hostname(name, record.get_name(), create_time) {
3039 self.timers.push(Reverse(create_time));
3040 }
3041
3042 dns_registry.name_changes.insert(
3044 record.get_record().get_original_name().to_string(),
3045 record.get_name().to_string(),
3046 );
3047
3048 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
3049 Some(p) => p,
3050 None => {
3051 let new_probe = dns_registry
3052 .probing
3053 .entry(record.get_name().to_string())
3054 .or_insert_with(|| {
3055 debug!("conflict handler: new probe of {}", record.get_name());
3056 Probe::new(create_time)
3057 });
3058 self.timers.push(Reverse(new_probe.next_send));
3059 new_probe
3060 }
3061 };
3062
3063 debug!(
3064 "insert record with new name '{}' {} into probe",
3065 record.get_name(),
3066 record.get_type()
3067 );
3068 new_probe.insert_record(record);
3069
3070 new_probe.waiting_services.extend(waiting_services.clone());
3071 }
3072 }
3073 }
3074
3075 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
3082 if updated_instances.is_empty() {
3083 return;
3084 }
3085
3086 let mut resolved: HashSet<String> = HashSet::new();
3087 let mut unresolved: HashSet<String> = HashSet::new();
3088 let mut removed_instances = HashMap::new();
3089
3090 let now = current_time_millis();
3091
3092 for (ty_domain, records) in self.cache.all_ptr().iter() {
3093 if !self.service_queriers.contains_key(ty_domain) {
3094 continue;
3096 }
3097
3098 for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
3099 let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
3100 continue;
3101 };
3102
3103 let instance = dns_ptr.alias();
3104 if !updated_instances.contains(instance) {
3105 continue;
3106 }
3107
3108 let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
3109 else {
3110 continue;
3111 };
3112
3113 debug!("resolve_updated_instances: from cache: {instance}");
3114 if resolved_service.is_valid() {
3115 debug!("call queriers to resolve {instance}");
3116 resolved.insert(instance.to_string());
3117 let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
3118 call_service_listener(&self.service_queriers, ty_domain, event);
3119 } else {
3120 debug!("Resolved service is not valid: {instance}");
3121 if self.resolved.remove(dns_ptr.alias()) {
3122 removed_instances
3123 .entry(ty_domain.to_string())
3124 .or_insert_with(HashSet::new)
3125 .insert(instance.to_string());
3126 }
3127 unresolved.insert(instance.to_string());
3128 }
3129 }
3130 }
3131
3132 for instance in resolved.drain() {
3133 self.pending_resolves.remove(&instance);
3134 self.resolved.insert(instance);
3135 }
3136
3137 for instance in unresolved.drain() {
3138 self.add_pending_resolve(instance);
3139 }
3140
3141 if !removed_instances.is_empty() {
3142 debug!(
3143 "resolve_updated_instances: removed {}",
3144 &removed_instances.len()
3145 );
3146 self.notify_service_removal(removed_instances);
3147 }
3148 }
3149
3150 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, querier_addr: SocketAddr) {
3152 let querier_ip = querier_addr.ip();
3153 let is_ipv4 = querier_ip.is_ipv4();
3154 let sock_opt = if is_ipv4 {
3155 &self.ipv4_sock
3156 } else {
3157 &self.ipv6_sock
3158 };
3159 let Some(sock) = sock_opt.as_ref() else {
3160 debug!("handle_query: socket not available for intf {}", if_index);
3161 return;
3162 };
3163 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3164
3165 const META_QUERY: &str = "_services._dns-sd._udp.local.";
3168
3169 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3170 debug!("missing dns registry for intf {}", if_index);
3171 return;
3172 };
3173
3174 let Some(intf) = self.my_intfs.get(&if_index) else {
3175 debug!("handle_query: no intf found for index {if_index}");
3176 return;
3177 };
3178
3179 for question in msg.questions().iter() {
3180 let qtype = question.entry_type();
3181 let q_name = question.entry_name();
3182
3183 if qtype == RRType::PTR {
3184 for service in self.my_services.values() {
3185 if service.get_status(if_index) != ServiceStatus::Announced {
3186 continue;
3187 }
3188
3189 if service.matches_type_or_subtype(q_name) {
3190 out.add_answer_with_additionals(&msg, service, intf, dns_registry, is_ipv4);
3191 } else if q_name == META_QUERY {
3192 let ttl = service.get_other_ttl();
3193 let alias = service.get_type().to_string();
3194 let ptr = DnsPointer::new(q_name, RRType::PTR, CLASS_IN, ttl, alias);
3195 if !out.add_answer(&msg, ptr) {
3196 trace!("answer was not added for meta-query {:?}", &question);
3197 }
3198 }
3199 }
3200 } else {
3201 if qtype == RRType::ANY && msg.num_authorities() > 0 {
3203 if let Some(probe) = dns_registry.probing.get_mut(q_name) {
3204 probe.tiebreaking(&msg, q_name);
3205 }
3206 }
3207
3208 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3209 for service in self.my_services.values() {
3210 if service.get_status(if_index) != ServiceStatus::Announced {
3211 continue;
3212 }
3213
3214 let service_hostname = dns_registry.resolve_name(service.get_hostname());
3215
3216 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3217 let mut intf_addrs: Vec<IpAddr> = Vec::new();
3224 if qtype == RRType::A || qtype == RRType::ANY {
3225 intf_addrs.extend(service.get_addrs_on_my_intf_v4(intf));
3226 }
3227 if qtype == RRType::AAAA || qtype == RRType::ANY {
3228 intf_addrs.extend(service.get_addrs_on_my_intf_v6(intf));
3229 }
3230 if intf_addrs.is_empty()
3231 && (qtype == RRType::A || qtype == RRType::AAAA)
3232 {
3233 let t = match qtype {
3234 RRType::A => "TYPE_A",
3235 RRType::AAAA => "TYPE_AAAA",
3236 _ => "invalid_type",
3237 };
3238 trace!(
3239 "Cannot find valid addrs for {} response on intf {:?}",
3240 t,
3241 &intf
3242 );
3243 continue;
3244 }
3245 for address in intf_addrs {
3246 out.add_answer(
3247 &msg,
3248 DnsAddress::new(
3249 service_hostname,
3250 ip_address_rr_type(&address),
3251 CLASS_IN | CLASS_CACHE_FLUSH,
3252 service.get_host_ttl(),
3253 address,
3254 intf.into(),
3255 ),
3256 );
3257 }
3258 }
3259 }
3260 }
3261
3262 let query_name = q_name.to_lowercase();
3263 let service_opt = self
3264 .my_services
3265 .iter()
3266 .find(|(k, _v)| dns_registry.resolve_name(k.as_str()) == query_name)
3267 .map(|(_, v)| v);
3268
3269 let Some(service) = service_opt else {
3270 continue;
3271 };
3272
3273 if service.get_status(if_index) != ServiceStatus::Announced {
3274 continue;
3275 }
3276
3277 let intf_addrs = if is_ipv4 {
3278 service.get_addrs_on_my_intf_v4(intf)
3279 } else {
3280 service.get_addrs_on_my_intf_v6(intf)
3281 };
3282 if intf_addrs.is_empty() {
3283 debug!(
3284 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3285 &intf
3286 );
3287 continue;
3288 }
3289
3290 add_answer_of_service(
3291 &mut out,
3292 &msg,
3293 question.entry_name(),
3294 service,
3295 qtype,
3296 intf_addrs,
3297 );
3298 }
3299 }
3300
3301 if out.answers_count() > 0 {
3302 debug!("sending response on intf {}", &intf.name);
3303 out.set_id(msg.id());
3304
3305 let matched_source = intf
3308 .addrs
3309 .iter()
3310 .find(|if_addr| valid_ip_on_intf(&querier_ip, if_addr));
3311
3312 let unicast_dest = if querier_addr.port() != MDNS_PORT {
3320 Some(querier_addr)
3321 } else {
3322 None
3323 };
3324
3325 if unicast_dest.is_some() {
3326 for q in msg.questions() {
3327 out.add_question(q.entry_name(), q.entry_type());
3328 }
3329 out.clear_cache_flush_bits();
3330 }
3331
3332 if let Err(InternalError::IntfAddrInvalid(intf_addr)) = send_dns_outgoing(
3333 &out,
3334 intf,
3335 &sock.pktinfo,
3336 self.port,
3337 matched_source,
3338 unicast_dest,
3339 ) {
3340 let invalid_intf_addr = HashSet::from([intf_addr]);
3341 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3342 }
3343
3344 let if_name = intf.name.clone();
3345
3346 self.increase_counter(Counter::Respond, 1);
3347 self.notify_monitors(DaemonEvent::Respond(if_name));
3348 }
3349
3350 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3351 }
3352
3353 fn increase_counter(&mut self, counter: Counter, count: i64) {
3355 let key = counter.to_string();
3356 match self.counters.get_mut(&key) {
3357 Some(v) => *v += count,
3358 None => {
3359 self.counters.insert(key, count);
3360 }
3361 }
3362 }
3363
3364 fn set_counter(&mut self, counter: Counter, count: i64) {
3366 let key = counter.to_string();
3367 self.counters.insert(key, count);
3368 }
3369
3370 fn signal_sock_drain(&self) {
3371 let mut signal_buf = [0; 1024];
3372
3373 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3375 trace!(
3376 "signal socket recvd: {}",
3377 String::from_utf8_lossy(&signal_buf[0..sz])
3378 );
3379 }
3380 }
3381
3382 fn add_retransmission(&mut self, next_time: u64, command: Command) {
3383 self.retransmissions.push(ReRun { next_time, command });
3384 self.add_timer(next_time);
3385 }
3386
3387 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3390 for (ty_domain, sender) in self.service_queriers.iter() {
3391 if let Some(instances) = expired.get(ty_domain) {
3392 for instance_name in instances {
3393 let event = ServiceEvent::ServiceRemoved(
3394 ty_domain.to_string(),
3395 instance_name.to_string(),
3396 );
3397 match sender.send(event) {
3398 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3399 Err(e) => debug!("Failed to send event: {}", e),
3400 }
3401 }
3402 }
3403 }
3404 }
3405
3406 fn exec_command(&mut self, command: Command, repeating: bool) {
3410 trace!("exec_command: {:?} repeating: {}", &command, repeating);
3411 match command {
3412 Command::Browse(ty, next_delay, cache_only, listener) => {
3413 self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3414 }
3415
3416 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3417 self.exec_command_resolve_hostname(
3418 repeating, hostname, next_delay, listener, timeout,
3419 );
3420 }
3421
3422 Command::Register(service_info) => {
3423 self.register_service(*service_info);
3424 self.increase_counter(Counter::Register, 1);
3425 }
3426
3427 Command::RegisterResend(fullname, intf) => {
3428 trace!("register-resend service: {fullname} on {}", &intf);
3429 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3430 self.exec_command_register_resend(fullname, intf)
3431 {
3432 let invalid_intf_addr = HashSet::from([intf_addr]);
3433 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3434 }
3435 }
3436
3437 Command::Unregister(fullname, resp_s) => {
3438 trace!("unregister service {} repeat {}", &fullname, &repeating);
3439 self.exec_command_unregister(repeating, fullname, resp_s);
3440 }
3441
3442 Command::UnregisterResend(packet, if_index, is_ipv4) => {
3443 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3444 }
3445
3446 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3447
3448 Command::StopResolveHostname(hostname) => {
3449 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3450 }
3451
3452 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3453
3454 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3455
3456 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3457 Ok(()) => trace!("Sent status to the client"),
3458 Err(e) => debug!("Failed to send status: {}", e),
3459 },
3460
3461 Command::Monitor(resp_s) => {
3462 self.monitors.push(resp_s);
3463 }
3464
3465 Command::SetOption(daemon_opt) => {
3466 self.process_set_option(daemon_opt);
3467 }
3468
3469 Command::GetOption(resp_s) => {
3470 let val = DaemonOptionVal {
3471 _service_name_len_max: self.service_name_len_max,
3472 ip_check_interval: self.ip_check_interval,
3473 };
3474 if let Err(e) = resp_s.send(val) {
3475 debug!("Failed to send options: {}", e);
3476 }
3477 }
3478
3479 Command::Verify(instance_fullname, timeout) => {
3480 self.exec_command_verify(instance_fullname, timeout, repeating);
3481 }
3482
3483 Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3484 for intf_addr in invalid_intf_addrs {
3485 self.del_interface_addr(&intf_addr);
3486 }
3487
3488 self.check_ip_changes();
3489 }
3490
3491 _ => {
3492 debug!("unexpected command: {:?}", &command);
3493 }
3494 }
3495 }
3496
3497 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3498 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3499 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3500 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3501 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3502 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3503 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3504 self.set_counter(Counter::Timer, self.timers.len() as i64);
3505
3506 let dns_registry_probe_count: usize = self
3507 .dns_registry_map
3508 .values()
3509 .map(|r| r.probing.len())
3510 .sum();
3511 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3512
3513 let dns_registry_active_count: usize = self
3514 .dns_registry_map
3515 .values()
3516 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3517 .sum();
3518 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3519
3520 let dns_registry_timer_count: usize = self
3521 .dns_registry_map
3522 .values()
3523 .map(|r| r.new_timers.len())
3524 .sum();
3525 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3526
3527 let dns_registry_name_change_count: usize = self
3528 .dns_registry_map
3529 .values()
3530 .map(|r| r.name_changes.len())
3531 .sum();
3532 self.set_counter(
3533 Counter::DnsRegistryNameChange,
3534 dns_registry_name_change_count as i64,
3535 );
3536
3537 if let Err(e) = resp_s.send(self.counters.clone()) {
3539 debug!("Failed to send metrics: {}", e);
3540 }
3541 }
3542
3543 fn exec_command_browse(
3544 &mut self,
3545 repeating: bool,
3546 ty: String,
3547 next_delay: u32,
3548 cache_only: bool,
3549 listener: Sender<ServiceEvent>,
3550 ) {
3551 let pretty_addrs: Vec<String> = self
3552 .my_intfs
3553 .iter()
3554 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3555 .collect();
3556
3557 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3558 "{ty} on {} interfaces [{}]",
3559 pretty_addrs.len(),
3560 pretty_addrs.join(", ")
3561 ))) {
3562 debug!(
3563 "Failed to send SearchStarted({})(repeating:{}): {}",
3564 &ty, repeating, e
3565 );
3566 return;
3567 }
3568
3569 let now = current_time_millis();
3570 if !repeating {
3571 self.service_queriers.insert(ty.clone(), listener.clone());
3575
3576 self.query_cache_for_service(&ty, &listener, now);
3578 }
3579
3580 if cache_only {
3581 match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3583 Ok(()) => debug!("SearchStopped sent for {}", &ty),
3584 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3585 }
3586 return;
3587 }
3588
3589 self.send_query(&ty, RRType::PTR);
3590 self.increase_counter(Counter::Browse, 1);
3591
3592 let next_time = now + (next_delay * 1000) as u64;
3593 let max_delay = 60 * 60;
3594 let delay = cmp::min(next_delay * 2, max_delay);
3595 self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3596 }
3597
3598 fn exec_command_resolve_hostname(
3599 &mut self,
3600 repeating: bool,
3601 hostname: String,
3602 next_delay: u32,
3603 listener: Sender<HostnameResolutionEvent>,
3604 timeout: Option<u64>,
3605 ) {
3606 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3607 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3608 "{} on addrs {:?}",
3609 &hostname, &addr_list
3610 ))) {
3611 debug!(
3612 "Failed to send ResolveStarted({})(repeating:{}): {}",
3613 &hostname, repeating, e
3614 );
3615 return;
3616 }
3617 if !repeating {
3618 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3619 self.query_cache_for_hostname(&hostname, listener.clone());
3621 }
3622
3623 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3624 self.increase_counter(Counter::ResolveHostname, 1);
3625
3626 let now = current_time_millis();
3627 let next_time = now + u64::from(next_delay) * 1000;
3628 let max_delay = 60 * 60;
3629 let delay = cmp::min(next_delay * 2, max_delay);
3630
3631 if self
3633 .hostname_resolvers
3634 .get(&hostname)
3635 .and_then(|(_sender, timeout)| *timeout)
3636 .map(|timeout| next_time < timeout)
3637 .unwrap_or(true)
3638 {
3639 self.add_retransmission(
3640 next_time,
3641 Command::ResolveHostname(hostname, delay, listener, None),
3642 );
3643 }
3644 }
3645
3646 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3647 let pending_query = self.query_unresolved(&instance);
3648 let max_try = 3;
3649 if pending_query && try_count < max_try {
3650 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3653 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3654 }
3655 }
3656
3657 fn exec_command_unregister(
3658 &mut self,
3659 repeating: bool,
3660 fullname: String,
3661 resp_s: Sender<UnregisterStatus>,
3662 ) {
3663 let response = match self.my_services.remove_entry(&fullname) {
3664 None => {
3665 debug!("unregister: cannot find such service {}", &fullname);
3666 UnregisterStatus::NotFound
3667 }
3668 Some((_k, info)) => {
3669 let mut timers = Vec::new();
3670
3671 for (if_index, intf) in self.my_intfs.iter() {
3672 if let Some(sock) = self.ipv4_sock.as_ref() {
3673 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3674 if !repeating && !packet.is_empty() {
3676 let next_time = current_time_millis() + 120;
3677 self.retransmissions.push(ReRun {
3678 next_time,
3679 command: Command::UnregisterResend(packet, *if_index, true),
3680 });
3681 timers.push(next_time);
3682 }
3683 }
3684
3685 if let Some(sock) = self.ipv6_sock.as_ref() {
3687 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3688 if !repeating && !packet.is_empty() {
3689 let next_time = current_time_millis() + 120;
3690 self.retransmissions.push(ReRun {
3691 next_time,
3692 command: Command::UnregisterResend(packet, *if_index, false),
3693 });
3694 timers.push(next_time);
3695 }
3696 }
3697 }
3698
3699 for t in timers {
3700 self.add_timer(t);
3701 }
3702
3703 self.increase_counter(Counter::Unregister, 1);
3704 UnregisterStatus::OK
3705 }
3706 };
3707 if let Err(e) = resp_s.send(response) {
3708 debug!("unregister: failed to send response: {}", e);
3709 }
3710 }
3711
3712 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3713 let Some(intf) = self.my_intfs.get(&if_index) else {
3714 return;
3715 };
3716 let sock_opt = if is_ipv4 {
3717 &self.ipv4_sock
3718 } else {
3719 &self.ipv6_sock
3720 };
3721 let Some(sock) = sock_opt else {
3722 return;
3723 };
3724
3725 let if_addr = if is_ipv4 {
3726 match intf.next_ifaddr_v4() {
3727 Some(addr) => addr,
3728 None => return,
3729 }
3730 } else {
3731 match intf.next_ifaddr_v6() {
3732 Some(addr) => addr,
3733 None => return,
3734 }
3735 };
3736
3737 debug!("UnregisterResend from {:?}", if_addr);
3738 multicast_on_intf(
3739 &packet[..],
3740 &intf.name,
3741 intf.index,
3742 if_addr,
3743 &sock.pktinfo,
3744 self.port,
3745 );
3746
3747 self.increase_counter(Counter::UnregisterResend, 1);
3748 }
3749
3750 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3751 match self.service_queriers.remove_entry(&ty_domain) {
3752 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3753 Some((ty, sender)) => {
3754 trace!("StopBrowse: removed queryer for {}", &ty);
3756 let mut i = 0;
3757 while i < self.retransmissions.len() {
3758 if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3759 if t == &ty {
3760 self.retransmissions.remove(i);
3761 trace!("StopBrowse: removed retransmission for {}", &ty);
3762 continue;
3763 }
3764 }
3765 i += 1;
3766 }
3767
3768 self.cache.remove_service_type(&ty_domain);
3770
3771 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3773 Ok(()) => trace!("Sent SearchStopped to the listener"),
3774 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3775 }
3776 }
3777 }
3778 }
3779
3780 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3781 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3782 trace!("StopResolve: removed queryer for {}", &host);
3784 let mut i = 0;
3785 while i < self.retransmissions.len() {
3786 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3787 if t == &host {
3788 self.retransmissions.remove(i);
3789 trace!("StopResolve: removed retransmission for {}", &host);
3790 continue;
3791 }
3792 }
3793 i += 1;
3794 }
3795
3796 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3798 Ok(()) => trace!("Sent SearchStopped to the listener"),
3799 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3800 }
3801 }
3802 }
3803
3804 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3805 let Some(info) = self.my_services.get_mut(&fullname) else {
3806 trace!("announce: cannot find such service {}", &fullname);
3807 return Ok(());
3808 };
3809
3810 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3811 return Ok(());
3812 };
3813
3814 let Some(intf) = self.my_intfs.get(&if_index) else {
3815 return Ok(());
3816 };
3817
3818 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3819 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3820 } else {
3821 false
3822 };
3823 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3824 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3825 } else {
3826 false
3827 };
3828
3829 if announced_v4 || announced_v6 {
3830 let hostname = dns_registry.resolve_name(info.get_hostname());
3831 let service_name = dns_registry.resolve_name(&fullname).to_string();
3832
3833 debug!("resend: announce service {service_name} on {}", intf.name);
3834
3835 notify_monitors(
3836 &mut self.monitors,
3837 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3838 );
3839 info.set_status(if_index, ServiceStatus::Announced);
3840 } else {
3841 debug!("register-resend should not fail");
3842 }
3843
3844 self.increase_counter(Counter::RegisterResend, 1);
3845 Ok(())
3846 }
3847
3848 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3849 let now = current_time_millis();
3859 let expire_at = if repeating {
3860 None
3861 } else {
3862 Some(now + timeout.as_millis() as u64)
3863 };
3864
3865 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3867
3868 if !record_vec.is_empty() {
3869 let query_vec: Vec<(&str, RRType)> = record_vec
3870 .iter()
3871 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3872 .collect();
3873 self.send_query_vec(&query_vec);
3874
3875 if let Some(new_expire) = expire_at {
3876 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3880 }
3881 }
3882 }
3883
3884 fn refresh_active_services(&mut self) {
3886 let mut query_ptr_count = 0;
3887 let mut query_srv_count = 0;
3888 let mut new_timers = HashSet::new();
3889 let mut query_addr_count = 0;
3890
3891 for (ty_domain, _sender) in self.service_queriers.iter() {
3892 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3893 if !refreshed_timers.is_empty() {
3894 trace!("sending refresh query for PTR: {}", ty_domain);
3895 self.send_query(ty_domain, RRType::PTR);
3896 query_ptr_count += 1;
3897 new_timers.extend(refreshed_timers);
3898 }
3899
3900 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3901 for (instance, types) in instances {
3902 trace!("sending refresh query for: {}", &instance);
3903 let query_vec = types
3904 .into_iter()
3905 .map(|ty| (instance.as_str(), ty))
3906 .collect::<Vec<_>>();
3907 self.send_query_vec(&query_vec);
3908 query_srv_count += 1;
3909 }
3910 new_timers.extend(timers);
3911 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3912 for hostname in hostnames.iter() {
3913 trace!("sending refresh queries for A and AAAA: {}", hostname);
3914 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3915 query_addr_count += 2;
3916 }
3917 new_timers.extend(timers);
3918 }
3919
3920 for timer in new_timers {
3921 self.add_timer(timer);
3922 }
3923
3924 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3925 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3926 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3927 }
3928}
3929
3930fn add_answer_of_service(
3932 out: &mut DnsOutgoing,
3933 msg: &DnsIncoming,
3934 entry_name: &str,
3935 service: &ServiceInfo,
3936 qtype: RRType,
3937 intf_addrs: Vec<IpAddr>,
3938) {
3939 if qtype == RRType::SRV || qtype == RRType::ANY {
3940 out.add_answer(
3941 msg,
3942 DnsSrv::new(
3943 entry_name,
3944 CLASS_IN | CLASS_CACHE_FLUSH,
3945 service.get_host_ttl(),
3946 service.get_priority(),
3947 service.get_weight(),
3948 service.get_port(),
3949 service.get_hostname().to_string(),
3950 ),
3951 );
3952 }
3953
3954 if qtype == RRType::TXT || qtype == RRType::ANY {
3955 out.add_answer(
3956 msg,
3957 DnsTxt::new(
3958 entry_name,
3959 CLASS_IN | CLASS_CACHE_FLUSH,
3960 service.get_other_ttl(),
3961 service.generate_txt(),
3962 ),
3963 );
3964 }
3965
3966 if qtype == RRType::SRV {
3967 for address in intf_addrs {
3968 out.add_additional_answer(DnsAddress::new(
3969 service.get_hostname(),
3970 ip_address_rr_type(&address),
3971 CLASS_IN | CLASS_CACHE_FLUSH,
3972 service.get_host_ttl(),
3973 address,
3974 InterfaceId::default(),
3975 ));
3976 }
3977 }
3978}
3979
3980#[derive(Clone, Debug)]
3983#[non_exhaustive]
3984pub enum ServiceEvent {
3985 SearchStarted(String),
3987
3988 ServiceFound(String, String),
3990
3991 ServiceResolved(Box<ResolvedService>),
3993
3994 ServiceRemoved(String, String),
3996
3997 SearchStopped(String),
3999}
4000
4001#[derive(Clone, Debug)]
4004#[non_exhaustive]
4005pub enum HostnameResolutionEvent {
4006 SearchStarted(String),
4008 AddressesFound(String, HashSet<ScopedIp>),
4010 AddressesRemoved(String, HashSet<ScopedIp>),
4012 SearchTimeout(String),
4014 SearchStopped(String),
4016}
4017
4018#[derive(Clone, Debug)]
4021#[non_exhaustive]
4022pub enum DaemonEvent {
4023 Announce(String, String),
4025
4026 Error(Error),
4028
4029 IpAdd(IpAddr),
4031
4032 IpDel(IpAddr),
4034
4035 NameChange(DnsNameChange),
4038
4039 Respond(String),
4041}
4042
4043#[derive(Clone, Debug)]
4046pub struct DnsNameChange {
4047 pub original: String,
4049
4050 pub new_name: String,
4060
4061 pub rr_type: RRType,
4063
4064 pub intf_name: String,
4066}
4067
4068#[derive(Debug)]
4070enum Command {
4071 Browse(String, u32, bool, Sender<ServiceEvent>),
4073
4074 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(Box<ServiceInfo>),
4079
4080 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
4101
4102 GetStatus(Sender<DaemonStatus>),
4104
4105 Monitor(Sender<DaemonEvent>),
4107
4108 SetOption(DaemonOption),
4109
4110 GetOption(Sender<DaemonOptionVal>),
4111
4112 Verify(String, Duration),
4117
4118 InvalidIntfAddrs(HashSet<Interface>),
4120
4121 Exit(Sender<DaemonStatus>),
4122}
4123
4124impl fmt::Display for Command {
4125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4126 match self {
4127 Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
4128 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
4129 Self::Exit(_) => write!(f, "Command Exit"),
4130 Self::GetStatus(_) => write!(f, "Command GetStatus"),
4131 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
4132 Self::Monitor(_) => write!(f, "Command Monitor"),
4133 Self::Register(_) => write!(f, "Command Register"),
4134 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
4135 Self::SetOption(_) => write!(f, "Command SetOption"),
4136 Self::GetOption(_) => write!(f, "Command GetOption"),
4137 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
4138 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
4139 Self::Unregister(_, _) => write!(f, "Command Unregister"),
4140 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
4141 Self::Resolve(_, _) => write!(f, "Command Resolve"),
4142 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
4143 Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
4144 }
4145 }
4146}
4147
4148struct DaemonOptionVal {
4149 _service_name_len_max: u8,
4150 ip_check_interval: u64,
4151}
4152
4153#[derive(Debug)]
4154enum DaemonOption {
4155 ServiceNameLenMax(u8),
4156 IpCheckInterval(u64),
4157 EnableInterface(Vec<IfKind>),
4158 DisableInterface(Vec<IfKind>),
4159 MulticastLoopV4(bool),
4160 MulticastLoopV6(bool),
4161 AcceptUnsolicited(bool),
4162 IncludeAppleP2P(bool),
4163 #[cfg(test)]
4164 TestDownInterface(String),
4165 #[cfg(test)]
4166 TestUpInterface(String),
4167}
4168
4169const DOMAIN_LEN: usize = "._tcp.local.".len();
4171
4172fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
4174 if ty_domain.len() <= DOMAIN_LEN + 1 {
4175 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
4177 }
4178
4179 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
4181 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
4182 }
4183 Ok(())
4184}
4185
4186fn check_domain_suffix(name: &str) -> Result<()> {
4188 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
4189 return Err(e_fmt!(
4190 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
4191 name
4192 ));
4193 }
4194
4195 Ok(())
4196}
4197
4198fn check_service_name(fullname: &str) -> Result<()> {
4206 check_domain_suffix(fullname)?;
4207
4208 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
4209 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
4210
4211 if &name[0..1] != "_" {
4212 return Err(e_fmt!("Service name must start with '_'"));
4213 }
4214
4215 let name = &name[1..];
4216
4217 if name.contains("--") {
4218 return Err(e_fmt!("Service name must not contain '--'"));
4219 }
4220
4221 if name.starts_with('-') || name.ends_with('-') {
4222 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4223 }
4224
4225 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4226 if ascii_count < 1 {
4227 return Err(e_fmt!(
4228 "Service name must contain at least one letter (eg: 'A-Za-z')"
4229 ));
4230 }
4231
4232 Ok(())
4233}
4234
4235fn check_hostname(hostname: &str) -> Result<()> {
4237 if !hostname.ends_with(".local.") {
4238 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4239 }
4240
4241 if hostname == ".local." {
4242 return Err(e_fmt!(
4243 "The part of the hostname before '.local.' cannot be empty"
4244 ));
4245 }
4246
4247 if hostname.len() > 255 {
4248 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4249 }
4250
4251 Ok(())
4252}
4253
4254fn call_service_listener(
4255 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4256 ty_domain: &str,
4257 event: ServiceEvent,
4258) {
4259 if let Some(listener) = listeners_map.get(ty_domain) {
4260 match listener.send(event) {
4261 Ok(()) => trace!("Sent event to listener successfully"),
4262 Err(e) => debug!("Failed to send event: {}", e),
4263 }
4264 }
4265}
4266
4267fn call_hostname_resolution_listener(
4268 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4269 hostname: &str,
4270 event: HostnameResolutionEvent,
4271) {
4272 let hostname_lower = hostname.to_lowercase();
4273 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4274 match listener.send(event) {
4275 Ok(()) => trace!("Sent event to listener successfully"),
4276 Err(e) => debug!("Failed to send event: {}", e),
4277 }
4278 }
4279}
4280
4281fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4285 my_ip_interfaces_inner(with_loopback, false)
4286}
4287
4288fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4289 if_addrs::get_if_addrs()
4290 .unwrap_or_default()
4291 .into_iter()
4292 .filter(|i| {
4293 i.is_oper_up()
4294 && !i.is_p2p()
4295 && (!i.is_loopback() || with_loopback)
4296 && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4297 })
4298 .collect()
4299}
4300
4301fn is_apple_p2p_by_name(name: &str) -> bool {
4304 let p2p_prefixes = ["awdl", "llw"];
4305 p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4306}
4307
4308fn send_dns_outgoing(
4311 out: &DnsOutgoing,
4312 my_intf: &MyIntf,
4313 sock: &PktInfoUdpSocket,
4314 port: u16,
4315 source: Option<&IfAddr>,
4316 unicast_dest: Option<SocketAddr>,
4317) -> MyResult<Vec<Vec<u8>>> {
4318 let if_name = &my_intf.name;
4319
4320 let if_addr = match source {
4321 Some(addr) => addr,
4322 None => {
4323 if sock.domain() == Domain::IPV4 {
4324 match my_intf.next_ifaddr_v4() {
4325 Some(addr) => addr,
4326 None => return Ok(vec![]),
4327 }
4328 } else {
4329 match my_intf.next_ifaddr_v6() {
4330 Some(addr) => addr,
4331 None => return Ok(vec![]),
4332 }
4333 }
4334 }
4335 };
4336
4337 send_dns_outgoing_impl(
4338 out,
4339 if_name,
4340 my_intf.index,
4341 if_addr,
4342 sock,
4343 port,
4344 unicast_dest,
4345 )
4346}
4347
4348fn send_dns_outgoing_impl(
4350 out: &DnsOutgoing,
4351 if_name: &str,
4352 if_index: u32,
4353 if_addr: &IfAddr,
4354 sock: &PktInfoUdpSocket,
4355 port: u16,
4356 unicast_dest: Option<SocketAddr>,
4357) -> MyResult<Vec<Vec<u8>>> {
4358 let qtype = if out.is_query() {
4359 "query"
4360 } else {
4361 if out.answers_count() == 0 && out.additionals().is_empty() {
4362 return Ok(vec![]); }
4364 "response"
4365 };
4366 trace!(
4367 "send {}: {} questions {} answers {} authorities {} additional",
4368 qtype,
4369 out.questions().len(),
4370 out.answers_count(),
4371 out.authorities().len(),
4372 out.additionals().len()
4373 );
4374
4375 match if_addr.ip() {
4376 IpAddr::V4(ipv4) => {
4377 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4378 debug!(
4379 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4380 ipv4, e
4381 );
4382 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4384 let intf_addr = Interface {
4385 name: if_name.to_string(),
4386 addr: if_addr.clone(),
4387 index: Some(if_index),
4388 oper_status: if_addrs::IfOperStatus::Down,
4389 is_p2p: false,
4390 #[cfg(windows)]
4391 adapter_name: String::new(),
4392 };
4393 return Err(InternalError::IntfAddrInvalid(intf_addr));
4394 }
4395 return Ok(vec![]); }
4397 }
4398 IpAddr::V6(ipv6) => {
4399 if let Err(e) = sock.set_multicast_if_v6(if_index) {
4400 debug!(
4401 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4402 ipv6, e
4403 );
4404 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4406 let intf_addr = Interface {
4407 name: if_name.to_string(),
4408 addr: if_addr.clone(),
4409 index: Some(if_index),
4410 oper_status: if_addrs::IfOperStatus::Down,
4411 is_p2p: false,
4412 #[cfg(windows)]
4413 adapter_name: String::new(),
4414 };
4415 return Err(InternalError::IntfAddrInvalid(intf_addr));
4416 }
4417 return Ok(vec![]); }
4419 }
4420 }
4421
4422 let packet_list = out.to_data_on_wire();
4423 for packet in packet_list.iter() {
4424 match unicast_dest {
4425 Some(dest) => unicast_on_intf(packet, if_name, dest, sock),
4426 None => multicast_on_intf(packet, if_name, if_index, if_addr, sock, port),
4427 }
4428 }
4429 Ok(packet_list)
4430}
4431
4432fn unicast_on_intf(packet: &[u8], if_name: &str, dest: SocketAddr, socket: &PktInfoUdpSocket) {
4435 if packet.len() > MAX_MSG_ABSOLUTE {
4436 debug!("Drop over-sized packet ({})", packet.len());
4437 return;
4438 }
4439
4440 let sock_addr = dest.into();
4441 match socket.send_to(packet, &sock_addr) {
4442 Ok(sz) => trace!(
4443 "sent unicast {} bytes on interface {} to {}",
4444 sz,
4445 if_name,
4446 dest
4447 ),
4448 Err(e) => trace!(
4449 "Failed to send unicast to {} via {:?}: {}",
4450 dest,
4451 &if_name,
4452 e
4453 ),
4454 }
4455}
4456
4457fn multicast_on_intf(
4459 packet: &[u8],
4460 if_name: &str,
4461 if_index: u32,
4462 if_addr: &IfAddr,
4463 socket: &PktInfoUdpSocket,
4464 port: u16,
4465) {
4466 if packet.len() > MAX_MSG_ABSOLUTE {
4467 debug!("Drop over-sized packet ({})", packet.len());
4468 return;
4469 }
4470
4471 let addr: SocketAddr = match if_addr {
4472 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4473 if_addrs::IfAddr::V6(_) => {
4474 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4475 sock.set_scope_id(if_index); sock.into()
4477 }
4478 };
4479
4480 let sock_addr = addr.into();
4482 match socket.send_to(packet, &sock_addr) {
4483 Ok(sz) => trace!(
4484 "sent out {} bytes on interface {} (idx {}) addr {}",
4485 sz,
4486 if_name,
4487 if_index,
4488 if_addr.ip()
4489 ),
4490 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4491 }
4492}
4493
4494fn valid_instance_name(name: &str) -> bool {
4498 name.split('.').count() >= 5
4499}
4500
4501fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4502 monitors.retain(|sender| {
4503 if let Err(e) = sender.try_send(event.clone()) {
4504 debug!("notify_monitors: try_send: {}", &e);
4505 if matches!(e, TrySendError::Disconnected(_)) {
4506 return false; }
4508 }
4509 true
4510 });
4511}
4512
4513fn prepare_announce(
4516 info: &ServiceInfo,
4517 intf: &MyIntf,
4518 dns_registry: &mut DnsRegistry,
4519 is_ipv4: bool,
4520) -> Option<DnsOutgoing> {
4521 let intf_addrs = if is_ipv4 {
4522 info.get_addrs_on_my_intf_v4(intf)
4523 } else {
4524 info.get_addrs_on_my_intf_v6(intf)
4525 };
4526
4527 if intf_addrs.is_empty() {
4528 debug!(
4529 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4530 &intf.name
4531 );
4532 return None;
4533 }
4534
4535 let service_fullname = dns_registry.resolve_name(info.get_fullname());
4537
4538 debug!(
4539 "prepare to announce service {service_fullname} on {:?}",
4540 &intf_addrs
4541 );
4542
4543 let mut probing_count = 0;
4544 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4545 let create_time = current_time_millis() + fastrand::u64(0..250);
4546
4547 out.add_answer_at_time(
4548 DnsPointer::new(
4549 info.get_type(),
4550 RRType::PTR,
4551 CLASS_IN,
4552 info.get_other_ttl(),
4553 service_fullname.to_string(),
4554 ),
4555 0,
4556 );
4557
4558 if let Some(sub) = info.get_subtype() {
4559 trace!("Adding subdomain {}", sub);
4560 out.add_answer_at_time(
4561 DnsPointer::new(
4562 sub,
4563 RRType::PTR,
4564 CLASS_IN,
4565 info.get_other_ttl(),
4566 service_fullname.to_string(),
4567 ),
4568 0,
4569 );
4570 }
4571
4572 let hostname = dns_registry.resolve_name(info.get_hostname()).to_string();
4574
4575 let mut srv = DnsSrv::new(
4576 info.get_fullname(),
4577 CLASS_IN | CLASS_CACHE_FLUSH,
4578 info.get_host_ttl(),
4579 info.get_priority(),
4580 info.get_weight(),
4581 info.get_port(),
4582 hostname,
4583 );
4584
4585 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4586 srv.get_record_mut().set_new_name(new_name.to_string());
4587 }
4588
4589 if !info.requires_probe()
4590 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4591 {
4592 out.add_answer_at_time(srv, 0);
4593 } else {
4594 probing_count += 1;
4595 }
4596
4597 let mut txt = DnsTxt::new(
4600 info.get_fullname(),
4601 CLASS_IN | CLASS_CACHE_FLUSH,
4602 info.get_other_ttl(),
4603 info.generate_txt(),
4604 );
4605
4606 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4607 txt.get_record_mut().set_new_name(new_name.to_string());
4608 }
4609
4610 if !info.requires_probe()
4611 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4612 {
4613 out.add_answer_at_time(txt, 0);
4614 } else {
4615 probing_count += 1;
4616 }
4617
4618 let hostname = info.get_hostname();
4621 for address in intf_addrs {
4622 let mut dns_addr = DnsAddress::new(
4623 hostname,
4624 ip_address_rr_type(&address),
4625 CLASS_IN | CLASS_CACHE_FLUSH,
4626 info.get_host_ttl(),
4627 address,
4628 intf.into(),
4629 );
4630
4631 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4632 dns_addr.get_record_mut().set_new_name(new_name.to_string());
4633 }
4634
4635 if !info.requires_probe()
4636 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4637 {
4638 out.add_answer_at_time(dns_addr, 0);
4639 } else {
4640 probing_count += 1;
4641 }
4642 }
4643
4644 if probing_count > 0 {
4645 return None;
4646 }
4647
4648 Some(out)
4649}
4650
4651fn announce_service_on_intf(
4654 dns_registry: &mut DnsRegistry,
4655 info: &ServiceInfo,
4656 intf: &MyIntf,
4657 sock: &PktInfoUdpSocket,
4658 port: u16,
4659) -> MyResult<bool> {
4660 let is_ipv4 = sock.domain() == Domain::IPV4;
4661 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4662 let _ = send_dns_outgoing(&out, intf, sock, port, None, None)?;
4663 return Ok(true);
4664 }
4665
4666 Ok(false)
4667}
4668
4669fn name_change(original: &str) -> String {
4677 let mut parts: Vec<_> = original.split('.').collect();
4678 let Some(first_part) = parts.get_mut(0) else {
4679 return format!("{original} (2)");
4680 };
4681
4682 let mut new_name = format!("{first_part} (2)");
4683
4684 if let Some(paren_pos) = first_part.rfind(" (") {
4686 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4688 let absolute_end_pos = paren_pos + end_paren;
4689 if absolute_end_pos == first_part.len() - 1 {
4691 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4694 let base_name = &first_part[..paren_pos];
4695 new_name = format!("{} ({})", base_name, number + 1)
4696 }
4697 }
4698 }
4699 }
4700
4701 *first_part = &new_name;
4702 parts.join(".")
4703}
4704
4705fn hostname_change(original: &str) -> String {
4713 let mut parts: Vec<_> = original.split('.').collect();
4714 let Some(first_part) = parts.get_mut(0) else {
4715 return format!("{original}-2");
4716 };
4717
4718 let mut new_name = format!("{first_part}-2");
4719
4720 if let Some(hyphen_pos) = first_part.rfind('-') {
4722 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4724 let base_name = &first_part[..hyphen_pos];
4725 new_name = format!("{}-{}", base_name, number + 1);
4726 }
4727 }
4728
4729 *first_part = &new_name;
4730 parts.join(".")
4731}
4732
4733fn check_probing(
4736 dns_registry: &mut DnsRegistry,
4737 timers: &mut BinaryHeap<Reverse<u64>>,
4738 now: u64,
4739) -> (DnsOutgoing, Vec<String>) {
4740 let mut expired_probes = Vec::new();
4741 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4742
4743 for (name, probe) in dns_registry.probing.iter_mut() {
4744 if now >= probe.next_send {
4745 if probe.expired(now) {
4746 expired_probes.push(name.clone());
4748 } else {
4749 out.add_question(name, RRType::ANY);
4750
4751 for record in probe.records.iter() {
4759 out.add_authority(record.clone());
4760 }
4761
4762 probe.update_next_send(now);
4763
4764 timers.push(Reverse(probe.next_send));
4766 }
4767 }
4768 }
4769
4770 (out, expired_probes)
4771}
4772
4773fn handle_expired_probes(
4778 expired_probes: Vec<String>,
4779 intf_name: &str,
4780 dns_registry: &mut DnsRegistry,
4781 monitors: &mut Vec<Sender<DaemonEvent>>,
4782) -> HashSet<String> {
4783 let mut waiting_services = HashSet::new();
4784
4785 for name in expired_probes {
4786 let Some(probe) = dns_registry.probing.remove(&name) else {
4787 continue;
4788 };
4789
4790 for record in probe.records.iter() {
4792 if let Some(new_name) = record.get_record().get_new_name() {
4793 dns_registry
4794 .name_changes
4795 .insert(name.clone(), new_name.to_string());
4796
4797 let event = DnsNameChange {
4798 original: record.get_record().get_original_name().to_string(),
4799 new_name: new_name.to_string(),
4800 rr_type: record.get_type(),
4801 intf_name: intf_name.to_string(),
4802 };
4803 debug!("Name change event: {:?}", &event);
4804 notify_monitors(monitors, DaemonEvent::NameChange(event));
4805 }
4806 }
4807
4808 debug!(
4810 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4811 probe.records.len(),
4812 probe.waiting_services.len(),
4813 );
4814
4815 if !probe.records.is_empty() {
4817 match dns_registry.active.get_mut(&name) {
4818 Some(records) => {
4819 records.extend(probe.records);
4820 }
4821 None => {
4822 dns_registry.active.insert(name, probe.records);
4823 }
4824 }
4825
4826 waiting_services.extend(probe.waiting_services);
4827 }
4828 }
4829
4830 waiting_services
4831}
4832
4833fn resolve_addr_to_index(if_kind: IfKind, interfaces: &[Interface]) -> IfKind {
4835 if let IfKind::Addr(addr) = &if_kind {
4836 if let Some(intf) = interfaces.iter().find(|intf| &intf.ip() == addr) {
4837 let if_index = intf.index.unwrap_or(0);
4838 return if addr.is_ipv4() {
4839 IfKind::IndexV4(if_index)
4840 } else {
4841 IfKind::IndexV6(if_index)
4842 };
4843 }
4844 }
4845 if_kind
4846}
4847
4848#[cfg(test)]
4849mod tests {
4850 use super::{
4851 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4852 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4853 valid_ip_on_intf, HostnameResolutionEvent, MyIntf, ServiceDaemon, ServiceEvent,
4854 ServiceInfo, GROUP_ADDR_V4, MDNS_PORT,
4855 };
4856 use crate::{
4857 dns_parser::{
4858 DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp,
4859 CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE,
4860 },
4861 service_daemon::{add_answer_of_service, check_hostname},
4862 };
4863 use if_addrs::{IfAddr, Ifv4Addr};
4864 use std::{
4865 collections::HashSet,
4866 net::{IpAddr, Ipv4Addr, UdpSocket},
4867 time::{Duration, Instant, SystemTime},
4868 };
4869 use test_log::test;
4870
4871 #[test]
4872 fn test_response_source_ifaddr_match() {
4873 let ifaddr_a = IfAddr::V4(Ifv4Addr {
4877 ip: Ipv4Addr::new(192, 168, 1, 148),
4878 netmask: Ipv4Addr::new(255, 255, 255, 0),
4879 broadcast: None,
4880 prefixlen: 24,
4881 });
4882 let ifaddr_b = IfAddr::V4(Ifv4Addr {
4883 ip: Ipv4Addr::new(10, 238, 0, 51),
4884 netmask: Ipv4Addr::new(255, 255, 255, 0),
4885 broadcast: None,
4886 prefixlen: 24,
4887 });
4888
4889 let intf = MyIntf {
4890 name: "dummy0".to_string(),
4891 index: 1,
4892 addrs: HashSet::from([ifaddr_a.clone(), ifaddr_b.clone()]),
4893 };
4894
4895 let pick = |querier: IpAddr| -> Option<IfAddr> {
4896 intf.addrs
4897 .iter()
4898 .find(|a| valid_ip_on_intf(&querier, a))
4899 .cloned()
4900 };
4901
4902 assert_eq!(
4903 pick(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))),
4904 Some(ifaddr_a)
4905 );
4906 assert_eq!(
4907 pick(IpAddr::V4(Ipv4Addr::new(10, 238, 0, 99))),
4908 Some(ifaddr_b)
4909 );
4910 assert_eq!(pick(IpAddr::V4(Ipv4Addr::new(172, 16, 0, 1))), None);
4912 }
4913
4914 #[test]
4915 fn test_instance_name() {
4916 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4917 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4918 assert!(!valid_instance_name("_printer._tcp.local."));
4919 }
4920
4921 #[test]
4922 fn test_legacy_unicast_response() {
4923 let intf_ip = match my_ip_interfaces(false)
4934 .into_iter()
4935 .find_map(|intf| match intf.ip() {
4936 IpAddr::V4(ip) => Some(ip),
4937 IpAddr::V6(_) => None,
4938 }) {
4939 Some(ip) => ip,
4940 None => {
4941 println!("No IPv4 interface available; skipping test.");
4942 return;
4943 }
4944 };
4945
4946 let daemon = ServiceDaemon::new().expect("Failed to create daemon");
4948 let unique = SystemTime::now()
4949 .duration_since(SystemTime::UNIX_EPOCH)
4950 .unwrap()
4951 .as_micros();
4952 let hostname = format!("legacy-unicast-test-{unique}.local.");
4953 let service_info = ServiceInfo::new(
4954 "_legacy-uni._udp.local.",
4955 "test_instance",
4956 &hostname,
4957 &[IpAddr::V4(intf_ip)] as &[IpAddr],
4958 5353, None,
4960 )
4961 .expect("invalid service info");
4962 daemon.register(service_info).expect("register service");
4963
4964 let querier = UdpSocket::bind((intf_ip, 0)).expect("bind querier socket");
4968 querier
4969 .set_multicast_loop_v4(true)
4970 .expect("enable multicast loopback");
4971 querier
4972 .set_read_timeout(Some(Duration::from_millis(500)))
4973 .expect("set read timeout");
4974 assert_ne!(
4975 querier.local_addr().unwrap().port(),
4976 MDNS_PORT,
4977 "querier must use an ephemeral (non-5353) source port"
4978 );
4979
4980 let mut query = DnsOutgoing::new(FLAGS_QR_QUERY);
4982 query.add_question(&hostname, RRType::A);
4983 let query_packet = query
4984 .to_data_on_wire()
4985 .pop()
4986 .expect("query serialized to one packet");
4987
4988 let if_id = InterfaceId {
4989 name: "test".to_string(),
4990 index: 0,
4991 };
4992
4993 let deadline = Instant::now() + Duration::from_secs(8);
4996 let mut response = None;
4997 'outer: while Instant::now() < deadline {
4998 querier
4999 .send_to(&query_packet, (GROUP_ADDR_V4, MDNS_PORT))
5000 .expect("send query");
5001
5002 let mut buf = [0u8; 1500];
5005 while let Ok((len, from)) = querier.recv_from(&mut buf) {
5006 let Ok(msg) = DnsIncoming::new(buf[..len].to_vec(), if_id.clone()) else {
5007 continue;
5008 };
5009 if msg.is_response()
5010 && msg
5011 .answers()
5012 .iter()
5013 .any(|a| a.get_name().eq_ignore_ascii_case(&hostname))
5014 {
5015 response = Some((msg, from));
5016 break 'outer;
5017 }
5018 }
5019 }
5020
5021 let (msg, from) = response.expect(
5022 "expected a unicast response to the legacy query; \
5023 a multicast-only reply would never reach this un-joined socket",
5024 );
5025
5026 assert_eq!(
5028 from.port(),
5029 MDNS_PORT,
5030 "response should originate from the mDNS port"
5031 );
5032
5033 assert!(
5035 msg.questions()
5036 .iter()
5037 .any(|q| q.entry_name().eq_ignore_ascii_case(&hostname)),
5038 "legacy unicast response must echo the question section"
5039 );
5040
5041 let answer = msg
5044 .answers()
5045 .iter()
5046 .find(|a| a.get_name().eq_ignore_ascii_case(&hostname))
5047 .expect("response contains an answer for our hostname");
5048 assert_eq!(
5049 answer.get_type(),
5050 RRType::A,
5051 "an A query should be answered with an A record"
5052 );
5053 assert!(
5054 !answer.get_cache_flush(),
5055 "legacy unicast responses must clear the cache-flush bit"
5056 );
5057
5058 daemon.shutdown().unwrap();
5059 }
5060
5061 #[test]
5062 fn test_check_service_name_length() {
5063 let result = check_service_name_length("_tcp", 100);
5064 assert!(result.is_err());
5065 if let Err(e) = result {
5066 println!("{}", e);
5067 }
5068 }
5069
5070 #[test]
5071 fn test_check_hostname() {
5072 for hostname in &[
5074 "my_host.local.",
5075 &("A".repeat(255 - ".local.".len()) + ".local."),
5076 ] {
5077 let result = check_hostname(hostname);
5078 assert!(result.is_ok());
5079 }
5080
5081 for hostname in &[
5083 "my_host.local",
5084 ".local.",
5085 &("A".repeat(256 - ".local.".len()) + ".local."),
5086 ] {
5087 let result = check_hostname(hostname);
5088 assert!(result.is_err());
5089 if let Err(e) = result {
5090 println!("{}", e);
5091 }
5092 }
5093 }
5094
5095 #[test]
5096 fn test_check_domain_suffix() {
5097 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
5098 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
5099 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
5100 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
5101 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
5102 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
5103 }
5104
5105 #[test]
5106 fn test_service_with_temporarily_invalidated_ptr() {
5107 let d = ServiceDaemon::new().expect("Failed to create daemon");
5109
5110 let service = "_test_inval_ptr._udp.local.";
5111 let host_name = "my_host_tmp_invalidated_ptr.local.";
5112 let intfs: Vec<_> = my_ip_interfaces(false);
5113 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
5114 let port = 5201;
5115 let my_service =
5116 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
5117 .expect("invalid service info")
5118 .enable_addr_auto();
5119 let result = d.register(my_service.clone());
5120 assert!(result.is_ok());
5121
5122 let browse_chan = d.browse(service).unwrap();
5124 let timeout = Duration::from_secs(2);
5125 let mut resolved = false;
5126
5127 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5128 match event {
5129 ServiceEvent::ServiceResolved(info) => {
5130 resolved = true;
5131 println!("Resolved a service of {}", &info.fullname);
5132 break;
5133 }
5134 e => {
5135 println!("Received event {:?}", e);
5136 }
5137 }
5138 }
5139
5140 assert!(resolved);
5141
5142 println!("Stopping browse of {}", service);
5143 d.stop_browse(service).unwrap();
5146
5147 let mut stopped = false;
5152 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5153 match event {
5154 ServiceEvent::SearchStopped(_) => {
5155 stopped = true;
5156 println!("Stopped browsing service");
5157 break;
5158 }
5159 e => {
5163 println!("Received event {:?}", e);
5164 }
5165 }
5166 }
5167
5168 assert!(stopped);
5169
5170 let invalidate_ptr_packet = DnsPointer::new(
5172 my_service.get_type(),
5173 RRType::PTR,
5174 CLASS_IN,
5175 0,
5176 my_service.get_fullname().to_string(),
5177 );
5178
5179 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5180 packet_buffer.add_additional_answer(invalidate_ptr_packet);
5181
5182 for intf in intfs {
5183 let sock = _new_socket_bind(&intf, true).unwrap();
5184 send_dns_outgoing_impl(
5185 &packet_buffer,
5186 &intf.name,
5187 intf.index.unwrap_or(0),
5188 &intf.addr,
5189 &sock.pktinfo,
5190 MDNS_PORT,
5191 None,
5192 )
5193 .unwrap();
5194 }
5195
5196 println!(
5197 "Sent PTR record invalidation. Starting second browse for {}",
5198 service
5199 );
5200
5201 let browse_chan = d.browse(service).unwrap();
5203
5204 resolved = false;
5205 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5206 match event {
5207 ServiceEvent::ServiceResolved(info) => {
5208 resolved = true;
5209 println!("Resolved a service of {}", &info.fullname);
5210 break;
5211 }
5212 e => {
5213 println!("Received event {:?}", e);
5214 }
5215 }
5216 }
5217
5218 assert!(resolved);
5219 d.shutdown().unwrap();
5220 }
5221
5222 #[test]
5223 fn test_expired_srv() {
5224 let service_type = "_expired-srv._udp.local.";
5226 let instance = "test_instance";
5227 let host_name = "expired_srv_host.local.";
5228 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
5229 .unwrap()
5230 .enable_addr_auto();
5231 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
5236
5237 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5239 let result = mdns_server.register(my_service);
5240 assert!(result.is_ok());
5241
5242 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5243 let browse_chan = mdns_client.browse(service_type).unwrap();
5244 let timeout = Duration::from_secs(2);
5245 let mut resolved = false;
5246
5247 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5248 if let ServiceEvent::ServiceResolved(info) = event {
5249 resolved = true;
5250 println!("Resolved a service of {}", &info.fullname);
5251 break;
5252 }
5253 }
5254
5255 assert!(resolved);
5256
5257 mdns_server.shutdown().unwrap();
5259
5260 let expire_timeout = Duration::from_secs(new_ttl as u64);
5262 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
5263 if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
5264 println!("Service removed: {}: {}", &service_type, &full_name);
5265 break;
5266 }
5267 }
5268 }
5269
5270 #[test]
5271 fn test_hostname_resolution_address_removed() {
5272 let server = ServiceDaemon::new().expect("Failed to create server");
5274 let hostname = "addr_remove_host._tcp.local.";
5275 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
5276 .iter()
5277 .find(|iface| iface.ip().is_ipv4())
5278 .map(|iface| iface.into())
5279 .unwrap();
5280
5281 let mut my_service = ServiceInfo::new(
5282 "_host_res_test._tcp.local.",
5283 "my_instance",
5284 hostname,
5285 service_ip_addr.to_ip_addr(),
5286 1234,
5287 None,
5288 )
5289 .expect("invalid service info");
5290
5291 let addr_ttl = 2;
5293 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
5296
5297 let client = ServiceDaemon::new().expect("Failed to create client");
5299 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
5300 let resolved = loop {
5301 match event_receiver.recv() {
5302 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
5303 assert!(found_hostname == hostname);
5304 assert!(addresses.contains(&service_ip_addr));
5305 println!("address found: {:?}", &addresses);
5306 break true;
5307 }
5308 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
5309 Ok(_event) => {}
5310 Err(_) => break false,
5311 }
5312 };
5313
5314 assert!(resolved);
5315
5316 server.shutdown().unwrap();
5318
5319 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
5321 let removed = loop {
5322 match event_receiver.recv_timeout(timeout) {
5323 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
5324 assert!(removed_host == hostname);
5325 assert!(addresses.contains(&service_ip_addr));
5326
5327 println!(
5328 "address removed: hostname: {} addresses: {:?}",
5329 &hostname, &addresses
5330 );
5331 break true;
5332 }
5333 Ok(_event) => {}
5334 Err(_) => {
5335 break false;
5336 }
5337 }
5338 };
5339
5340 assert!(removed);
5341
5342 client.shutdown().unwrap();
5343 }
5344
5345 #[test]
5346 fn test_refresh_ptr() {
5347 let service_type = "_refresh-ptr._udp.local.";
5349 let instance = "test_instance";
5350 let host_name = "refresh_ptr_host.local.";
5351 let service_ip_addr = my_ip_interfaces(false)
5352 .iter()
5353 .find(|iface| iface.ip().is_ipv4())
5354 .map(|iface| iface.ip())
5355 .unwrap();
5356
5357 let mut my_service = ServiceInfo::new(
5358 service_type,
5359 instance,
5360 host_name,
5361 service_ip_addr,
5362 5023,
5363 None,
5364 )
5365 .unwrap();
5366
5367 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
5369
5370 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5372 let result = mdns_server.register(my_service);
5373 assert!(result.is_ok());
5374
5375 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5376 let browse_chan = mdns_client.browse(service_type).unwrap();
5377 let timeout = Duration::from_millis(1500); let mut resolved = false;
5379
5380 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5382 if let ServiceEvent::ServiceResolved(info) = event {
5383 resolved = true;
5384 println!("Resolved a service of {}", &info.fullname);
5385 break;
5386 }
5387 }
5388
5389 assert!(resolved);
5390
5391 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
5393 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5394 println!("event: {:?}", &event);
5395 }
5396
5397 let metrics_chan = mdns_client.get_metrics().unwrap();
5399 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
5400 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
5401 assert_eq!(ptr_refresh_counter, 1);
5402 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
5403 assert_eq!(srvtxt_refresh_counter, 1);
5404
5405 mdns_server.shutdown().unwrap();
5407 mdns_client.shutdown().unwrap();
5408 }
5409
5410 #[test]
5411 fn test_name_change() {
5412 assert_eq!(name_change("foo.local."), "foo (2).local.");
5413 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5414 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5415 assert_eq!(name_change("foo"), "foo (2)");
5416 assert_eq!(name_change("foo (2)"), "foo (3)");
5417 assert_eq!(name_change(""), " (2)");
5418
5419 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
5424
5425 #[test]
5426 fn test_hostname_change() {
5427 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5428 assert_eq!(hostname_change("foo"), "foo-2");
5429 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5430 assert_eq!(hostname_change("foo-9"), "foo-10");
5431 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5432 }
5433
5434 #[test]
5435 fn test_add_answer_txt_ttl() {
5436 let service_type = "_test_add_answer._udp.local.";
5438 let instance = "test_instance";
5439 let host_name = "add_answer_host.local.";
5440 let service_intf = my_ip_interfaces(false)
5441 .into_iter()
5442 .find(|iface| iface.ip().is_ipv4())
5443 .unwrap();
5444 let service_ip_addr = service_intf.ip();
5445 let my_service = ServiceInfo::new(
5446 service_type,
5447 instance,
5448 host_name,
5449 service_ip_addr,
5450 5023,
5451 None,
5452 )
5453 .unwrap();
5454
5455 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5457
5458 let mut dummy_data = out.to_data_on_wire();
5460 let interface_id = InterfaceId::from(&service_intf);
5461 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5462
5463 let if_addrs = vec![service_intf.ip()];
5465 add_answer_of_service(
5466 &mut out,
5467 &incoming,
5468 instance,
5469 &my_service,
5470 RRType::TXT,
5471 if_addrs,
5472 );
5473
5474 assert!(
5476 out.answers_count() > 0,
5477 "No answers added to the outgoing message"
5478 );
5479
5480 let answer = out._answers().first().unwrap();
5482 assert_eq!(answer.0.get_type(), RRType::TXT);
5483
5484 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5486 }
5487
5488 #[test]
5489 fn test_interface_flip() {
5490 let ty_domain = "_intf-flip._udp.local.";
5492 let host_name = "intf_flip.local.";
5493 let now = SystemTime::now()
5494 .duration_since(SystemTime::UNIX_EPOCH)
5495 .unwrap();
5496 let instance_name = now.as_micros().to_string(); let port = 5200;
5498
5499 let (ip_addr1, intf_name) = my_ip_interfaces(false)
5501 .iter()
5502 .find(|iface| iface.ip().is_ipv4())
5503 .map(|iface| (iface.ip(), iface.name.clone()))
5504 .unwrap();
5505
5506 println!("Using interface {} with IP {}", intf_name, ip_addr1);
5507
5508 let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5510 .expect("valid service info");
5511 let server1 = ServiceDaemon::new().expect("failed to start server");
5512 server1
5513 .register(service1)
5514 .expect("Failed to register service1");
5515
5516 std::thread::sleep(Duration::from_secs(2));
5518
5519 let client = ServiceDaemon::new().expect("failed to start client");
5521
5522 let receiver = client.browse(ty_domain).unwrap();
5523
5524 let timeout = Duration::from_secs(3);
5525 let mut got_data = false;
5526
5527 while let Ok(event) = receiver.recv_timeout(timeout) {
5528 if let ServiceEvent::ServiceResolved(_) = event {
5529 println!("Received ServiceResolved event");
5530 got_data = true;
5531 break;
5532 }
5533 }
5534
5535 assert!(got_data, "Should receive ServiceResolved event");
5536
5537 client.set_ip_check_interval(1).unwrap();
5539
5540 println!("Shutting down interface {}", &intf_name);
5542 client.test_down_interface(&intf_name).unwrap();
5543
5544 let mut got_removed = false;
5545
5546 while let Ok(event) = receiver.recv_timeout(timeout) {
5547 if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5548 got_removed = true;
5549 println!("removed: {ty_domain} : {instance}");
5550 break;
5551 }
5552 }
5553 assert!(got_removed, "Should receive ServiceRemoved event");
5554
5555 println!("Bringing up interface {}", &intf_name);
5556 client.test_up_interface(&intf_name).unwrap();
5557 let mut got_data = false;
5558 while let Ok(event) = receiver.recv_timeout(timeout) {
5559 if let ServiceEvent::ServiceResolved(resolved) = event {
5560 got_data = true;
5561 println!("Received ServiceResolved: {:?}", resolved);
5562 break;
5563 }
5564 }
5565 assert!(
5566 got_data,
5567 "Should receive ServiceResolved event after interface is back up"
5568 );
5569
5570 server1.shutdown().unwrap();
5571 client.shutdown().unwrap();
5572 }
5573
5574 #[test]
5575 fn test_cache_only() {
5576 let service_type = "_cache_only._udp.local.";
5578 let instance = "test_instance";
5579 let host_name = "cache_only_host.local.";
5580 let service_ip_addr = my_ip_interfaces(false)
5581 .iter()
5582 .find(|iface| iface.ip().is_ipv4())
5583 .map(|iface| iface.ip())
5584 .unwrap();
5585
5586 let mut my_service = ServiceInfo::new(
5587 service_type,
5588 instance,
5589 host_name,
5590 service_ip_addr,
5591 5023,
5592 None,
5593 )
5594 .unwrap();
5595
5596 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
5598
5599 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5600
5601 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5604 std::thread::sleep(Duration::from_secs(2));
5605
5606 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5608 let result = mdns_server.register(my_service);
5609 assert!(result.is_ok());
5610
5611 let timeout = Duration::from_millis(1500); let mut resolved = false;
5613
5614 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5616 if let ServiceEvent::ServiceResolved(info) = event {
5617 resolved = true;
5618 println!("Resolved a service of {}", &info.get_fullname());
5619 break;
5620 }
5621 }
5622
5623 assert!(resolved);
5624
5625 mdns_server.shutdown().unwrap();
5627 mdns_client.shutdown().unwrap();
5628 }
5629
5630 #[test]
5631 fn test_cache_only_unsolicited() {
5632 let service_type = "_c_unsolicit._udp.local.";
5633 let instance = "test_instance";
5634 let host_name = "c_unsolicit_host.local.";
5635 let service_ip_addr = my_ip_interfaces(false)
5636 .iter()
5637 .find(|iface| iface.ip().is_ipv4())
5638 .map(|iface| iface.ip())
5639 .unwrap();
5640
5641 let my_service = ServiceInfo::new(
5642 service_type,
5643 instance,
5644 host_name,
5645 service_ip_addr,
5646 5023,
5647 None,
5648 )
5649 .unwrap();
5650
5651 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5653 let result = mdns_server.register(my_service);
5654 assert!(result.is_ok());
5655
5656 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5657 mdns_client.accept_unsolicited(true).unwrap();
5658
5659 std::thread::sleep(Duration::from_secs(2));
5662 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5663 let timeout = Duration::from_millis(1500); let mut resolved = false;
5665
5666 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5668 if let ServiceEvent::ServiceResolved(info) = event {
5669 resolved = true;
5670 println!("Resolved a service of {}", &info.get_fullname());
5671 break;
5672 }
5673 }
5674
5675 assert!(resolved);
5676
5677 mdns_server.shutdown().unwrap();
5679 mdns_client.shutdown().unwrap();
5680 }
5681
5682 #[test]
5683 fn test_custom_port_isolation() {
5684 let service_type = "_custom_port._udp.local.";
5689 let instance_custom = "custom_port_instance";
5690 let instance_default = "default_port_instance";
5691 let host_name = "custom_port_host.local.";
5692
5693 let service_ip_addr = my_ip_interfaces(false)
5694 .iter()
5695 .find(|iface| iface.ip().is_ipv4())
5696 .map(|iface| iface.ip())
5697 .expect("Test requires an IPv4 interface");
5698
5699 let service_custom = ServiceInfo::new(
5701 service_type,
5702 instance_custom,
5703 host_name,
5704 service_ip_addr,
5705 8080,
5706 None,
5707 )
5708 .unwrap();
5709
5710 let service_default = ServiceInfo::new(
5712 service_type,
5713 instance_default,
5714 host_name,
5715 service_ip_addr,
5716 8081,
5717 None,
5718 )
5719 .unwrap();
5720
5721 let custom_port = 5454u16;
5723 let server_custom =
5724 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5725 let client_custom =
5726 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5727
5728 let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5730
5731 server_custom
5733 .register(service_custom.clone())
5734 .expect("Failed to register custom port service");
5735
5736 server_default
5738 .register(service_default.clone())
5739 .expect("Failed to register default port service");
5740
5741 let browse_custom = client_custom
5743 .browse(service_type)
5744 .expect("Failed to browse on custom port");
5745
5746 let timeout = Duration::from_secs(3);
5747 let mut found_custom = false;
5748 let mut found_default_on_custom = false;
5749
5750 while let Ok(event) = browse_custom.recv_timeout(timeout) {
5752 if let ServiceEvent::ServiceResolved(info) = event {
5753 println!(
5754 "Custom port client resolved: {} on port {}",
5755 info.get_fullname(),
5756 info.get_port()
5757 );
5758 if info.get_fullname().starts_with(instance_custom) {
5759 found_custom = true;
5760 assert_eq!(info.get_port(), 8080);
5761 }
5762 if info.get_fullname().starts_with(instance_default) {
5763 found_default_on_custom = true;
5764 }
5765 }
5766 }
5767
5768 assert!(
5769 found_custom,
5770 "Custom port client should find service on custom port"
5771 );
5772 assert!(
5773 !found_default_on_custom,
5774 "Custom port client should NOT find service on default port"
5775 );
5776
5777 let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5780 let browse_default = client_default
5781 .browse(service_type)
5782 .expect("Failed to browse on default port");
5783
5784 let mut found_default = false;
5785 let mut found_custom_on_default = false;
5786
5787 while let Ok(event) = browse_default.recv_timeout(timeout) {
5788 if let ServiceEvent::ServiceResolved(info) = event {
5789 println!(
5790 "Default port client resolved: {} on port {}",
5791 info.get_fullname(),
5792 info.get_port()
5793 );
5794 if info.get_fullname().starts_with(instance_default) {
5795 found_default = true;
5796 assert_eq!(info.get_port(), 8081);
5797 }
5798 if info.get_fullname().starts_with(instance_custom) {
5799 found_custom_on_default = true;
5800 }
5801 }
5802 }
5803
5804 assert!(
5805 found_default,
5806 "Default port client should find service on default port"
5807 );
5808 assert!(
5809 !found_custom_on_default,
5810 "Default port client should NOT find service on custom port"
5811 );
5812
5813 server_custom.shutdown().unwrap();
5815 client_custom.shutdown().unwrap();
5816 server_default.shutdown().unwrap();
5817 client_default.shutdown().unwrap();
5818 }
5819}