1#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34 current_time_millis,
35 dns_cache::{DnsCache, IpType},
36 dns_parser::{
37 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
38 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
39 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
40 },
41 error::{e_fmt, Error, Result},
42 service_info::{valid_ip_on_intf, DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
43 Receiver, ResolvedService, TxtProperties,
44};
45use flume::{bounded, Sender, TrySendError};
46use if_addrs::{IfAddr, Interface};
47use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
48use socket2::Domain;
49use socket_pktinfo::PktInfoUdpSocket;
50use std::{
51 cmp::{self, Reverse},
52 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
53 fmt, io,
54 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55 str, thread,
56 time::Duration,
57 vec,
58};
59
60pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72pub const MDNS_PORT: u16 = 5353;
74
75const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
76const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
77const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
78
79const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
80
81#[derive(Debug)]
83pub enum UnregisterStatus {
84 OK,
86 NotFound,
88}
89
90#[derive(Debug, PartialEq, Clone, Eq)]
92#[non_exhaustive]
93pub enum DaemonStatus {
94 Running,
96
97 Shutdown,
99}
100
101#[derive(Hash, Eq, PartialEq)]
104enum Counter {
105 Register,
106 RegisterResend,
107 Unregister,
108 UnregisterResend,
109 Browse,
110 ResolveHostname,
111 Respond,
112 CacheRefreshPTR,
113 CacheRefreshSrvTxt,
114 CacheRefreshAddr,
115 KnownAnswerSuppression,
116 CachedPTR,
117 CachedSRV,
118 CachedAddr,
119 CachedTxt,
120 CachedNSec,
121 CachedSubtype,
122 DnsRegistryProbe,
123 DnsRegistryActive,
124 DnsRegistryTimer,
125 DnsRegistryNameChange,
126 Timer,
127}
128
129impl fmt::Display for Counter {
130 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131 match self {
132 Self::Register => write!(f, "register"),
133 Self::RegisterResend => write!(f, "register-resend"),
134 Self::Unregister => write!(f, "unregister"),
135 Self::UnregisterResend => write!(f, "unregister-resend"),
136 Self::Browse => write!(f, "browse"),
137 Self::ResolveHostname => write!(f, "resolve-hostname"),
138 Self::Respond => write!(f, "respond"),
139 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
140 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
141 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
142 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
143 Self::CachedPTR => write!(f, "cached-ptr"),
144 Self::CachedSRV => write!(f, "cached-srv"),
145 Self::CachedAddr => write!(f, "cached-addr"),
146 Self::CachedTxt => write!(f, "cached-txt"),
147 Self::CachedNSec => write!(f, "cached-nsec"),
148 Self::CachedSubtype => write!(f, "cached-subtype"),
149 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
150 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
151 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
152 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
153 Self::Timer => write!(f, "timer"),
154 }
155 }
156}
157
158#[derive(Debug)]
159enum InternalError {
160 IntfAddrInvalid(Interface),
161}
162
163impl fmt::Display for InternalError {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 match self {
166 InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
167 }
168 }
169}
170
171type MyResult<T> = core::result::Result<T, InternalError>;
172
173struct MyUdpSocket {
178 pktinfo: PktInfoUdpSocket,
181
182 mio: MioUdpSocket,
185}
186
187impl MyUdpSocket {
188 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
189 let std_sock = pktinfo.try_clone_std()?;
190 let mio = MioUdpSocket::from_std(std_sock);
191
192 Ok(Self { pktinfo, mio })
193 }
194}
195
196impl Source for MyUdpSocket {
198 fn register(
199 &mut self,
200 registry: &Registry,
201 token: Token,
202 interests: Interest,
203 ) -> io::Result<()> {
204 self.mio.register(registry, token, interests)
205 }
206
207 fn reregister(
208 &mut self,
209 registry: &Registry,
210 token: Token,
211 interests: Interest,
212 ) -> io::Result<()> {
213 self.mio.reregister(registry, token, interests)
214 }
215
216 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
217 self.mio.deregister(registry)
218 }
219}
220
221pub type Metrics = HashMap<String, i64>;
224
225const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
233pub struct ServiceDaemon {
234 sender: Sender<Command>,
236
237 signal_addr: SocketAddr,
243}
244
245impl ServiceDaemon {
246 pub fn new() -> Result<Self> {
262 Self::new_with_port(MDNS_PORT)
263 }
264
265 pub fn new_with_port(port: u16) -> Result<Self> {
297 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
300
301 let signal_sock = UdpSocket::bind(signal_addr)
302 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
303
304 let signal_addr = signal_sock
306 .local_addr()
307 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
308
309 signal_sock
311 .set_nonblocking(true)
312 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
313
314 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
315
316 let (sender, receiver) = bounded(100);
317
318 let mio_sock = MioUdpSocket::from_std(signal_sock);
320 let cmd_sender = sender.clone();
321 thread::Builder::new()
322 .name("mDNS_daemon".to_string())
323 .spawn(move || {
324 Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
325 })
326 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
327
328 Ok(Self {
329 sender,
330 signal_addr,
331 })
332 }
333
334 fn send_cmd(&self, cmd: Command) -> Result<()> {
337 let cmd_name = cmd.to_string();
338
339 self.sender.try_send(cmd).map_err(|e| match e {
341 TrySendError::Full(_) => Error::Again,
342 TrySendError::Disconnected(_) => Error::DaemonShutdown,
343 })?;
344
345 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
347 let socket = UdpSocket::bind(addr)
348 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
349 socket
350 .send_to(cmd_name.as_bytes(), self.signal_addr)
351 .map_err(|e| {
352 e_fmt!(
353 "signal socket send_to {} ({}) failed: {}",
354 self.signal_addr,
355 cmd_name,
356 e
357 )
358 })?;
359
360 Ok(())
361 }
362
363 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
383 check_domain_suffix(service_type)?;
384
385 let (resp_s, resp_r) = bounded(10);
386 self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
387 Ok(resp_r)
388 }
389
390 pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
403 check_domain_suffix(service_type)?;
404
405 let (resp_s, resp_r) = bounded(10);
406 self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
407 Ok(resp_r)
408 }
409
410 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
418 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
419 }
420
421 pub fn resolve_hostname(
441 &self,
442 hostname: &str,
443 timeout: Option<u64>,
444 ) -> Result<Receiver<HostnameResolutionEvent>> {
445 check_hostname(hostname)?;
446 let (resp_s, resp_r) = bounded(10);
447 self.send_cmd(Command::ResolveHostname(
448 hostname.to_string(),
449 1,
450 resp_s,
451 timeout,
452 ))?;
453 Ok(resp_r)
454 }
455
456 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
462 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
463 }
464
465 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
485 check_service_name(service_info.get_fullname())?;
486 check_hostname(service_info.get_hostname())?;
487
488 self.send_cmd(Command::Register(service_info.into()))
489 }
490
491 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
502 let (resp_s, resp_r) = bounded(1);
503 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
504 Ok(resp_r)
505 }
506
507 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
517 let (resp_s, resp_r) = bounded(100);
518 self.send_cmd(Command::Monitor(resp_s))?;
519 Ok(resp_r)
520 }
521
522 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
530 let (resp_s, resp_r) = bounded(1);
531 self.send_cmd(Command::Exit(resp_s))?;
532 Ok(resp_r)
533 }
534
535 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
543 let (resp_s, resp_r) = bounded(1);
544
545 if self.sender.is_disconnected() {
546 resp_s
547 .send(DaemonStatus::Shutdown)
548 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
549 } else {
550 self.send_cmd(Command::GetStatus(resp_s))?;
551 }
552
553 Ok(resp_r)
554 }
555
556 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
567 let (resp_s, resp_r) = bounded(1);
568 self.send_cmd(Command::GetMetrics(resp_s))?;
569 Ok(resp_r)
570 }
571
572 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
587 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
590 return Err(Error::Msg(format!(
591 "service name length max {len_max} is too large"
592 )));
593 }
594
595 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
596 }
597
598 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
604 let interval_in_millis = interval_in_secs as u64 * 1000;
605 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
606 interval_in_millis,
607 )))
608 }
609
610 pub fn get_ip_check_interval(&self) -> Result<u32> {
612 let (resp_s, resp_r) = bounded(1);
613 self.send_cmd(Command::GetOption(resp_s))?;
614
615 let option = resp_r
616 .recv_timeout(Duration::from_secs(10))
617 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
618 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
619 Ok(ip_check_interval_in_secs as u32)
620 }
621
622 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
629 let if_kind_vec = if_kind.into_vec();
630 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
631 if_kind_vec.kinds,
632 )))
633 }
634
635 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
642 let if_kind_vec = if_kind.into_vec();
643 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
644 if_kind_vec.kinds,
645 )))
646 }
647
648 pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
659 self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
660 }
661
662 pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
665 self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
666 }
667
668 #[cfg(test)]
669 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
670 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
671 ifname.to_string(),
672 )))
673 }
674
675 #[cfg(test)]
676 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
677 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
678 ifname.to_string(),
679 )))
680 }
681
682 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
698 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
699 }
700
701 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
717 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
718 }
719
720 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
739 self.send_cmd(Command::Verify(instance_fullname, timeout))
740 }
741
742 fn daemon_thread(
743 signal_sock: MioUdpSocket,
744 poller: Poll,
745 receiver: Receiver<Command>,
746 port: u16,
747 cmd_sender: Sender<Command>,
748 signal_addr: SocketAddr,
749 ) {
750 let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
751
752 if let Some(cmd) = zc.run(receiver) {
753 match cmd {
754 Command::Exit(resp_s) => {
755 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
758 debug!("exit: failed to send response of shutdown: {}", e);
759 }
760 }
761 _ => {
762 debug!("Unexpected command: {:?}", cmd);
763 }
764 }
765 }
766 }
767}
768
769fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
771 let intf_ip = &intf.ip();
774 match intf_ip {
775 IpAddr::V4(ip) => {
776 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
777 let sock = new_socket(addr.into(), true)?;
778
779 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
781 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
782
783 sock.set_multicast_if_v4(ip)
785 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
786
787 sock.set_multicast_ttl_v4(255)
792 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
793
794 if !should_loop {
795 sock.set_multicast_loop_v4(false)
796 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
797 }
798
799 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
801 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
802 for packet in test_packets {
803 sock.send_to(&packet, &multicast_addr)
804 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
805 }
806 MyUdpSocket::new(sock)
807 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
808 }
809 IpAddr::V6(ip) => {
810 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
811 let sock = new_socket(addr.into(), true)?;
812
813 let if_index = intf.index.unwrap_or(0);
814
815 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
817 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
818
819 sock.set_multicast_if_v6(if_index)
821 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
822
823 MyUdpSocket::new(sock)
828 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
829 }
830 }
831}
832
833fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
836 let domain = match addr {
837 SocketAddr::V4(_) => socket2::Domain::IPV4,
838 SocketAddr::V6(_) => socket2::Domain::IPV6,
839 };
840
841 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
842
843 fd.set_reuse_address(true)
844 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
845 #[cfg(unix)]
846 if let Err(e) = fd.set_reuse_port(true) {
847 debug!(
848 "SO_REUSEPORT is not supported, continuing without it: {}",
849 e
850 );
851 }
852
853 if non_block {
854 fd.set_nonblocking(true)
855 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
856 }
857
858 fd.bind(&addr.into())
859 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
860
861 trace!("new socket bind to {}", &addr);
862 Ok(fd)
863}
864
865struct ReRun {
867 next_time: u64,
869 command: Command,
870}
871
872#[derive(Debug, Clone)]
876#[non_exhaustive]
877pub enum IfKind {
878 All,
880
881 IPv4,
883
884 IPv6,
886
887 Name(String),
889
890 Addr(IpAddr),
894
895 LoopbackV4,
899
900 LoopbackV6,
902
903 IndexV4(u32),
905
906 IndexV6(u32),
908}
909
910impl IfKind {
911 fn matches(&self, intf: &Interface) -> bool {
913 match self {
914 Self::All => true,
915 Self::IPv4 => intf.ip().is_ipv4(),
916 Self::IPv6 => intf.ip().is_ipv6(),
917 Self::Name(ifname) => ifname == &intf.name,
918 Self::Addr(addr) => addr == &intf.ip(),
919 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
920 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
921 Self::IndexV4(idx) => intf.index == Some(*idx) && intf.ip().is_ipv4(),
922 Self::IndexV6(idx) => intf.index == Some(*idx) && intf.ip().is_ipv6(),
923 }
924 }
925}
926
927impl From<&str> for IfKind {
930 fn from(val: &str) -> Self {
931 Self::Name(val.to_string())
932 }
933}
934
935impl From<&String> for IfKind {
936 fn from(val: &String) -> Self {
937 Self::Name(val.to_string())
938 }
939}
940
941impl From<IpAddr> for IfKind {
943 fn from(val: IpAddr) -> Self {
944 Self::Addr(val)
945 }
946}
947
948pub struct IfKindVec {
950 kinds: Vec<IfKind>,
951}
952
953pub trait IntoIfKindVec {
955 fn into_vec(self) -> IfKindVec;
956}
957
958impl<T: Into<IfKind>> IntoIfKindVec for T {
959 fn into_vec(self) -> IfKindVec {
960 let if_kind: IfKind = self.into();
961 IfKindVec {
962 kinds: vec![if_kind],
963 }
964 }
965}
966
967impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
968 fn into_vec(self) -> IfKindVec {
969 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
970 IfKindVec { kinds }
971 }
972}
973
974struct IfSelection {
976 if_kind: IfKind,
978
979 selected: bool,
981}
982
983struct Zeroconf {
985 port: u16,
988
989 my_intfs: HashMap<u32, MyIntf>,
991
992 ipv4_sock: Option<MyUdpSocket>,
994
995 ipv6_sock: Option<MyUdpSocket>,
997
998 my_services: HashMap<String, ServiceInfo>,
1000
1001 cache: DnsCache,
1003
1004 dns_registry_map: HashMap<u32, DnsRegistry>,
1006
1007 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
1018
1019 counters: Metrics,
1020
1021 poller: Poll,
1023
1024 monitors: Vec<Sender<DaemonEvent>>,
1026
1027 service_name_len_max: u8,
1029
1030 ip_check_interval: u64,
1032
1033 if_selections: Vec<IfSelection>,
1035
1036 signal_sock: MioUdpSocket,
1038
1039 timers: BinaryHeap<Reverse<u64>>,
1045
1046 status: DaemonStatus,
1047
1048 pending_resolves: HashSet<String>,
1050
1051 resolved: HashSet<String>,
1053
1054 multicast_loop_v4: bool,
1055
1056 multicast_loop_v6: bool,
1057
1058 accept_unsolicited: bool,
1059
1060 include_apple_p2p: bool,
1061
1062 cmd_sender: Sender<Command>,
1063
1064 signal_addr: SocketAddr,
1065
1066 #[cfg(test)]
1067 test_down_interfaces: HashSet<String>,
1068}
1069
1070fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
1072 let intf_ip = &intf.ip();
1073 match intf_ip {
1074 IpAddr::V4(ip) => {
1075 debug!("join multicast group V4 on {} addr {ip}", intf.name);
1077 my_sock
1078 .join_multicast_v4(&GROUP_ADDR_V4, ip)
1079 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
1080 }
1081 IpAddr::V6(ip) => {
1082 let if_index = intf.index.unwrap_or(0);
1083 debug!(
1085 "join multicast group V6 on {} addr {ip} with index {if_index}",
1086 intf.name
1087 );
1088 my_sock
1089 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
1090 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
1091 }
1092 }
1093 Ok(())
1094}
1095
1096impl Zeroconf {
1097 fn new(
1098 signal_sock: MioUdpSocket,
1099 poller: Poll,
1100 port: u16,
1101 cmd_sender: Sender<Command>,
1102 signal_addr: SocketAddr,
1103 ) -> Self {
1104 let my_ifaddrs = my_ip_interfaces(true);
1106
1107 let mut my_intfs = HashMap::new();
1111 let mut dns_registry_map = HashMap::new();
1112
1113 let mut ipv4_sock = None;
1116 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1117 match new_socket(addr.into(), true) {
1118 Ok(sock) => {
1119 sock.set_multicast_ttl_v4(255)
1124 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1125 .ok();
1126
1127 ipv4_sock = match MyUdpSocket::new(sock) {
1129 Ok(s) => Some(s),
1130 Err(e) => {
1131 debug!("failed to create IPv4 MyUdpSocket: {e}");
1132 None
1133 }
1134 };
1135 }
1136 Err(e) => debug!("failed to create IPv4 socket: {e}"),
1138 }
1139
1140 let mut ipv6_sock = None;
1141 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1142 match new_socket(addr.into(), true) {
1143 Ok(sock) => {
1144 sock.set_multicast_hops_v6(255)
1148 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1149 .ok();
1150
1151 ipv6_sock = match MyUdpSocket::new(sock) {
1153 Ok(s) => Some(s),
1154 Err(e) => {
1155 debug!("failed to create IPv6 MyUdpSocket: {e}");
1156 None
1157 }
1158 };
1159 }
1160 Err(e) => debug!("failed to create IPv6 socket: {e}"),
1161 }
1162
1163 for intf in my_ifaddrs {
1165 let sock_opt = if intf.ip().is_ipv4() {
1166 &ipv4_sock
1167 } else {
1168 &ipv6_sock
1169 };
1170 let Some(sock) = sock_opt else {
1171 debug!(
1172 "no socket available for interface {} with addr {}. Skipped.",
1173 intf.name,
1174 intf.ip()
1175 );
1176 continue;
1177 };
1178
1179 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1180 debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1181 }
1182
1183 let if_index = intf.index.unwrap_or(0);
1184
1185 dns_registry_map
1187 .entry(if_index)
1188 .or_insert_with(DnsRegistry::new);
1189
1190 my_intfs
1191 .entry(if_index)
1192 .and_modify(|v: &mut MyIntf| {
1193 v.addrs.insert(intf.addr.clone());
1194 })
1195 .or_insert(MyIntf {
1196 name: intf.name.clone(),
1197 index: if_index,
1198 addrs: HashSet::from([intf.addr]),
1199 });
1200 }
1201
1202 let monitors = Vec::new();
1203 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1204 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1205
1206 let timers = BinaryHeap::new();
1207
1208 let if_selections = vec![];
1210
1211 let status = DaemonStatus::Running;
1212
1213 Self {
1214 port,
1215 my_intfs,
1216 ipv4_sock,
1217 ipv6_sock,
1218 my_services: HashMap::new(),
1219 cache: DnsCache::new(),
1220 dns_registry_map,
1221 hostname_resolvers: HashMap::new(),
1222 service_queriers: HashMap::new(),
1223 retransmissions: Vec::new(),
1224 counters: HashMap::new(),
1225 poller,
1226 monitors,
1227 service_name_len_max,
1228 ip_check_interval,
1229 if_selections,
1230 signal_sock,
1231 timers,
1232 status,
1233 pending_resolves: HashSet::new(),
1234 resolved: HashSet::new(),
1235 multicast_loop_v4: true,
1236 multicast_loop_v6: true,
1237 accept_unsolicited: false,
1238 include_apple_p2p: false,
1239 cmd_sender,
1240 signal_addr,
1241
1242 #[cfg(test)]
1243 test_down_interfaces: HashSet::new(),
1244 }
1245 }
1246
1247 fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1249 let cmd_name = cmd.to_string();
1250
1251 self.cmd_sender.try_send(cmd).map_err(|e| match e {
1252 TrySendError::Full(_) => Error::Again,
1253 TrySendError::Disconnected(_) => Error::DaemonShutdown,
1254 })?;
1255
1256 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1257 let socket = UdpSocket::bind(addr)
1258 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1259 socket
1260 .send_to(cmd_name.as_bytes(), self.signal_addr)
1261 .map_err(|e| {
1262 e_fmt!(
1263 "signal socket send_to {} ({}) failed: {}",
1264 self.signal_addr,
1265 cmd_name,
1266 e
1267 )
1268 })?;
1269
1270 Ok(())
1271 }
1272
1273 fn cleanup(&mut self) {
1281 debug!("Starting cleanup for shutdown");
1282
1283 let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1285 for fullname in service_names {
1286 if let Some(info) = self.my_services.get(&fullname) {
1287 debug!("Unregistering service during shutdown: {}", &fullname);
1288
1289 for intf in self.my_intfs.values() {
1290 if let Some(sock) = self.ipv4_sock.as_ref() {
1291 self.unregister_service(info, intf, &sock.pktinfo);
1292 }
1293
1294 if let Some(sock) = self.ipv6_sock.as_ref() {
1295 self.unregister_service(info, intf, &sock.pktinfo);
1296 }
1297 }
1298 }
1299 }
1300 self.my_services.clear();
1301
1302 let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1304 for ty_domain in browse_types {
1305 debug!("Stopping browse during shutdown: {}", &ty_domain);
1306 if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1307 if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1309 debug!("Failed to send SearchStopped during shutdown: {}", e);
1310 }
1311 }
1312 }
1313
1314 let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1316 for hostname in hostnames {
1317 debug!(
1318 "Stopping hostname resolution during shutdown: {}",
1319 &hostname
1320 );
1321 if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1322 if let Err(e) =
1324 sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1325 {
1326 debug!(
1327 "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1328 e
1329 );
1330 }
1331 }
1332 }
1333
1334 self.retransmissions.clear();
1336
1337 debug!("Cleanup completed");
1338 }
1339
1340 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1349 if let Err(e) = self.poller.registry().register(
1351 &mut self.signal_sock,
1352 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1353 mio::Interest::READABLE,
1354 ) {
1355 debug!("failed to add signal socket to the poller: {}", e);
1356 return None;
1357 }
1358
1359 if let Some(sock) = self.ipv4_sock.as_mut() {
1360 if let Err(e) = self.poller.registry().register(
1361 sock,
1362 mio::Token(IPV4_SOCK_EVENT_KEY),
1363 mio::Interest::READABLE,
1364 ) {
1365 debug!("failed to register ipv4 socket: {}", e);
1366 return None;
1367 }
1368 }
1369
1370 if let Some(sock) = self.ipv6_sock.as_mut() {
1371 if let Err(e) = self.poller.registry().register(
1372 sock,
1373 mio::Token(IPV6_SOCK_EVENT_KEY),
1374 mio::Interest::READABLE,
1375 ) {
1376 debug!("failed to register ipv6 socket: {}", e);
1377 return None;
1378 }
1379 }
1380
1381 let mut next_ip_check = if self.ip_check_interval > 0 {
1383 current_time_millis() + self.ip_check_interval
1384 } else {
1385 0
1386 };
1387
1388 if next_ip_check > 0 {
1389 self.add_timer(next_ip_check);
1390 }
1391
1392 let mut events = mio::Events::with_capacity(1024);
1395 loop {
1396 let now = current_time_millis();
1397
1398 let earliest_timer = self.peek_earliest_timer();
1399 let timeout = earliest_timer.map(|timer| {
1400 let millis = if timer > now { timer - now } else { 1 };
1402 Duration::from_millis(millis)
1403 });
1404
1405 events.clear();
1407 match self.poller.poll(&mut events, timeout) {
1408 Ok(_) => self.handle_poller_events(&events),
1409 Err(e) => debug!("failed to select from sockets: {}", e),
1410 }
1411
1412 let now = current_time_millis();
1413
1414 self.pop_timers_till(now);
1416
1417 for hostname in self
1419 .hostname_resolvers
1420 .clone()
1421 .into_iter()
1422 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1423 .map(|(hostname, _)| hostname)
1424 {
1425 trace!("hostname resolver timeout for {}", &hostname);
1426 call_hostname_resolution_listener(
1427 &self.hostname_resolvers,
1428 &hostname,
1429 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1430 );
1431 call_hostname_resolution_listener(
1432 &self.hostname_resolvers,
1433 &hostname,
1434 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1435 );
1436 self.hostname_resolvers.remove(&hostname);
1437 }
1438
1439 while let Ok(command) = receiver.try_recv() {
1441 if matches!(command, Command::Exit(_)) {
1442 debug!("Exit command received, performing cleanup");
1443 self.cleanup();
1444 self.status = DaemonStatus::Shutdown;
1445 return Some(command);
1446 }
1447 self.exec_command(command, false);
1448 }
1449
1450 let mut i = 0;
1452 while i < self.retransmissions.len() {
1453 if now >= self.retransmissions[i].next_time {
1454 let rerun = self.retransmissions.remove(i);
1455 self.exec_command(rerun.command, true);
1456 } else {
1457 i += 1;
1458 }
1459 }
1460
1461 self.refresh_active_services();
1463
1464 let mut query_count = 0;
1466 for (hostname, _sender) in self.hostname_resolvers.iter() {
1467 for (hostname, ip_addr) in
1468 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1469 {
1470 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1471 query_count += 1;
1472 }
1473 }
1474
1475 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1476
1477 let now = current_time_millis();
1479
1480 let expired_services = self.cache.evict_expired_services(now);
1482 if !expired_services.is_empty() {
1483 debug!(
1484 "run: send {} service removal to listeners",
1485 expired_services.len()
1486 );
1487 self.notify_service_removal(expired_services);
1488 }
1489
1490 let expired_addrs = self.cache.evict_expired_addr(now);
1492 for (hostname, addrs) in expired_addrs {
1493 call_hostname_resolution_listener(
1494 &self.hostname_resolvers,
1495 &hostname,
1496 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1497 );
1498 let instances = self.cache.get_instances_on_host(&hostname);
1499 let instance_set: HashSet<String> = instances.into_iter().collect();
1500 self.resolve_updated_instances(&instance_set);
1501 }
1502
1503 self.probing_handler();
1505
1506 if now >= next_ip_check && next_ip_check > 0 {
1508 next_ip_check = now + self.ip_check_interval;
1509 self.add_timer(next_ip_check);
1510
1511 self.check_ip_changes();
1512 }
1513 }
1514 }
1515
1516 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1517 match daemon_opt {
1518 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1519 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1520 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1521 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1522 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1523 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1524 DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1525 DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1526 #[cfg(test)]
1527 DaemonOption::TestDownInterface(ifname) => {
1528 self.test_down_interfaces.insert(ifname);
1529 }
1530 #[cfg(test)]
1531 DaemonOption::TestUpInterface(ifname) => {
1532 self.test_down_interfaces.remove(&ifname);
1533 }
1534 }
1535 }
1536
1537 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1538 debug!("enable_interface: {:?}", kinds);
1539 let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1540
1541 for if_kind in kinds {
1542 self.if_selections.push(IfSelection {
1543 if_kind: resolve_addr_to_index(if_kind, &interfaces),
1544 selected: true,
1545 });
1546 }
1547
1548 self.apply_intf_selections(interfaces);
1549 }
1550
1551 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1552 debug!("disable_interface: {:?}", kinds);
1553 let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1554
1555 for if_kind in kinds {
1556 self.if_selections.push(IfSelection {
1557 if_kind: resolve_addr_to_index(if_kind, &interfaces),
1558 selected: false,
1559 });
1560 }
1561
1562 self.apply_intf_selections(interfaces);
1563 }
1564
1565 fn set_multicast_loop_v4(&mut self, on: bool) {
1566 let Some(sock) = self.ipv4_sock.as_mut() else {
1567 return;
1568 };
1569 self.multicast_loop_v4 = on;
1570 sock.pktinfo
1571 .set_multicast_loop_v4(on)
1572 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1573 .unwrap();
1574 }
1575
1576 fn set_multicast_loop_v6(&mut self, on: bool) {
1577 let Some(sock) = self.ipv6_sock.as_mut() else {
1578 return;
1579 };
1580 self.multicast_loop_v6 = on;
1581 sock.pktinfo
1582 .set_multicast_loop_v6(on)
1583 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1584 .unwrap();
1585 }
1586
1587 fn set_accept_unsolicited(&mut self, accept: bool) {
1588 self.accept_unsolicited = accept;
1589 }
1590
1591 fn set_apple_p2p(&mut self, include: bool) {
1592 if self.include_apple_p2p != include {
1593 self.include_apple_p2p = include;
1594 self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1595 }
1596 }
1597
1598 fn notify_monitors(&mut self, event: DaemonEvent) {
1599 self.monitors.retain(|sender| {
1601 if let Err(e) = sender.try_send(event.clone()) {
1602 debug!("notify_monitors: try_send: {}", &e);
1603 if matches!(e, TrySendError::Disconnected(_)) {
1604 return false; }
1606 }
1607 true
1608 });
1609 }
1610
1611 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1613 for (_, service_info) in self.my_services.iter_mut() {
1614 if service_info.is_addr_auto() {
1615 service_info.remove_ipaddr(addr);
1616 }
1617 }
1618 }
1619
1620 fn add_timer(&mut self, next_time: u64) {
1621 self.timers.push(Reverse(next_time));
1622 }
1623
1624 fn peek_earliest_timer(&self) -> Option<u64> {
1625 self.timers.peek().map(|Reverse(v)| *v)
1626 }
1627
1628 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1629 self.timers.pop().map(|Reverse(v)| v)
1630 }
1631
1632 fn pop_timers_till(&mut self, now: u64) {
1634 while let Some(Reverse(v)) = self.timers.peek() {
1635 if *v > now {
1636 break;
1637 }
1638 self.timers.pop();
1639 }
1640 }
1641
1642 fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1644 let intf_count = interfaces.len();
1645 let mut intf_selections = vec![true; intf_count];
1646
1647 for selection in self.if_selections.iter() {
1649 for i in 0..intf_count {
1651 if selection.if_kind.matches(&interfaces[i]) {
1652 intf_selections[i] = selection.selected;
1653 }
1654 }
1655 }
1656
1657 let mut selected_addrs = HashSet::new();
1658 for i in 0..intf_count {
1659 if intf_selections[i] {
1660 selected_addrs.insert(interfaces[i].clone());
1661 }
1662 }
1663
1664 selected_addrs
1665 }
1666
1667 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1672 let intf_count = interfaces.len();
1674 let mut intf_selections = vec![true; intf_count];
1675
1676 for selection in self.if_selections.iter() {
1678 for i in 0..intf_count {
1680 if selection.if_kind.matches(&interfaces[i]) {
1681 intf_selections[i] = selection.selected;
1682 }
1683 }
1684 }
1685
1686 for (idx, intf) in interfaces.into_iter().enumerate() {
1688 if intf_selections[idx] {
1689 self.add_interface(intf);
1691 } else {
1692 self.del_interface_addr(&intf);
1694 }
1695 }
1696 }
1697
1698 fn del_ip(&mut self, ip: IpAddr) {
1699 self.del_addr_in_my_services(&ip);
1700 self.notify_monitors(DaemonEvent::IpDel(ip));
1701 }
1702
1703 fn check_ip_changes(&mut self) {
1705 let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1707
1708 #[cfg(test)]
1709 let my_ifaddrs: Vec<_> = my_ifaddrs
1710 .into_iter()
1711 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1712 .collect();
1713
1714 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1715 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1716 let if_index = intf.index.unwrap_or(0);
1717 acc.entry(if_index).or_default().push(&intf.addr);
1718 acc
1719 });
1720
1721 let mut deleted_intfs = Vec::new();
1722 let mut deleted_ips = Vec::new();
1723
1724 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1725 let mut last_ipv4 = None;
1726 let mut last_ipv6 = None;
1727
1728 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1729 my_intf.addrs.retain(|addr| {
1730 if current_addrs.contains(&addr) {
1731 true
1732 } else {
1733 match addr.ip() {
1734 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1735 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1736 }
1737 deleted_ips.push(addr.ip());
1738 false
1739 }
1740 });
1741 if my_intf.addrs.is_empty() {
1742 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1743 }
1744 } else {
1745 debug!(
1747 "check_ip_changes: interface {} ({}) no longer exists, removing",
1748 my_intf.name, if_index
1749 );
1750 for addr in my_intf.addrs.iter() {
1751 match addr.ip() {
1752 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1753 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1754 }
1755 deleted_ips.push(addr.ip())
1756 }
1757 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1758 }
1759 }
1760
1761 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1762 debug!(
1763 "check_ip_changes: {} deleted ips {} deleted intfs",
1764 deleted_ips.len(),
1765 deleted_intfs.len()
1766 );
1767 }
1768
1769 for ip in deleted_ips {
1770 self.del_ip(ip);
1771 }
1772
1773 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1774 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1775 continue;
1776 };
1777
1778 if let Some(ipv4) = last_ipv4 {
1779 debug!("leave multicast for {ipv4}");
1780 if let Some(sock) = self.ipv4_sock.as_mut() {
1781 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1782 debug!("leave multicast group for addr {ipv4}: {e}");
1783 }
1784 }
1785 }
1786
1787 if let Some(ipv6) = last_ipv6 {
1788 debug!("leave multicast for {ipv6}");
1789 if let Some(sock) = self.ipv6_sock.as_mut() {
1790 if let Err(e) = sock
1791 .pktinfo
1792 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1793 {
1794 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1795 }
1796 }
1797 }
1798
1799 let intf_id = InterfaceId {
1801 name: my_intf.name.to_string(),
1802 index: my_intf.index,
1803 };
1804 let result = self.cache.remove_records_on_intf(intf_id);
1805 self.notify_service_removal(result.removed_instances);
1806 self.resolve_updated_instances(&result.modified_instances);
1807 }
1808
1809 self.apply_intf_selections(my_ifaddrs);
1811 }
1812
1813 fn del_interface_addr(&mut self, intf: &Interface) {
1816 let if_index = intf.index.unwrap_or(0);
1817 debug!(
1818 "del_interface_addr: {} ({if_index}) addr {}",
1819 intf.name,
1820 intf.ip()
1821 );
1822
1823 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1824 debug!("del_interface_addr: interface {} not found", intf.name);
1825 return;
1826 };
1827
1828 let mut ip_removed = false;
1829
1830 if my_intf.addrs.remove(&intf.addr) {
1831 ip_removed = true;
1832
1833 match intf.addr.ip() {
1834 IpAddr::V4(ipv4) => {
1835 if my_intf.next_ifaddr_v4().is_none() {
1836 if let Some(sock) = self.ipv4_sock.as_mut() {
1837 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1838 debug!("leave multicast group for addr {ipv4}: {e}");
1839 } else {
1840 debug!("leave multicast for {ipv4}");
1841 }
1842 }
1843 }
1844 }
1845
1846 IpAddr::V6(ipv6) => {
1847 if my_intf.next_ifaddr_v6().is_none() {
1848 if let Some(sock) = self.ipv6_sock.as_mut() {
1849 if let Err(e) =
1850 sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1851 {
1852 debug!("leave multicast group for addr {ipv6}: {e}");
1853 }
1854 }
1855 }
1856 }
1857 }
1858
1859 if my_intf.addrs.is_empty() {
1860 debug!("del_interface_addr: removing interface {}", intf.name);
1862 self.my_intfs.remove(&if_index);
1863 self.dns_registry_map.remove(&if_index);
1864 self.cache
1865 .remove_addrs_on_disabled_intf(if_index, IpType::BOTH);
1866 } else {
1867 let is_v4 = intf.addr.ip().is_ipv4();
1871 let version_gone = if is_v4 {
1872 my_intf.next_ifaddr_v4().is_none()
1873 } else {
1874 my_intf.next_ifaddr_v6().is_none()
1875 };
1876 if version_gone {
1877 let ip_type = if is_v4 { IpType::V4 } else { IpType::V6 };
1878 self.cache.remove_addrs_on_disabled_intf(if_index, ip_type);
1879 }
1880 }
1881 }
1882
1883 if ip_removed {
1884 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1886 self.del_addr_in_my_services(&intf.ip());
1888 }
1889 }
1890
1891 fn add_interface(&mut self, intf: Interface) {
1892 let sock_opt = if intf.ip().is_ipv4() {
1893 &self.ipv4_sock
1894 } else {
1895 &self.ipv6_sock
1896 };
1897
1898 let Some(sock) = sock_opt else {
1899 debug!(
1900 "add_interface: no socket available for interface {} with addr {}. Skipped.",
1901 intf.name,
1902 intf.ip()
1903 );
1904 return;
1905 };
1906
1907 let if_index = intf.index.unwrap_or(0);
1908 let mut new_addr = false;
1909
1910 match self.my_intfs.entry(if_index) {
1911 Entry::Occupied(mut entry) => {
1912 let my_intf = entry.get_mut();
1914 if !my_intf.addrs.contains(&intf.addr) {
1915 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1916 debug!("add_interface: socket_config {}: {e}", &intf.name);
1917 }
1918 my_intf.addrs.insert(intf.addr.clone());
1919 new_addr = true;
1920 }
1921 }
1922 Entry::Vacant(entry) => {
1923 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1924 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1925 return;
1926 }
1927
1928 new_addr = true;
1929 let new_intf = MyIntf {
1930 name: intf.name.clone(),
1931 index: if_index,
1932 addrs: HashSet::from([intf.addr.clone()]),
1933 };
1934 entry.insert(new_intf);
1935 }
1936 }
1937
1938 if !new_addr {
1939 trace!("add_interface: interface {} already exists", &intf.name);
1940 return;
1941 }
1942
1943 debug!("add new interface {}: {}", intf.name, intf.ip());
1944
1945 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1946 debug!("add_interface: cannot find if_index {if_index}");
1947 return;
1948 };
1949
1950 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1951 Some(registry) => registry,
1952 None => self
1953 .dns_registry_map
1954 .entry(if_index)
1955 .or_insert_with(DnsRegistry::new),
1956 };
1957
1958 for (_, service_info) in self.my_services.iter_mut() {
1959 if service_info.is_addr_auto() {
1960 service_info.insert_ipaddr(&intf);
1961
1962 if let Ok(true) = announce_service_on_intf(
1963 dns_registry,
1964 service_info,
1965 my_intf,
1966 &sock.pktinfo,
1967 self.port,
1968 ) {
1969 debug!(
1970 "Announce service {} on {}",
1971 service_info.get_fullname(),
1972 intf.ip()
1973 );
1974 service_info.set_status(if_index, ServiceStatus::Announced);
1975 } else {
1976 for timer in dns_registry.new_timers.drain(..) {
1977 self.timers.push(Reverse(timer));
1978 }
1979 service_info.set_status(if_index, ServiceStatus::Probing);
1980 }
1981 }
1982 }
1983
1984 if let Some(my_intf) = self.my_intfs.get(&if_index) {
1989 for ty in self.service_queriers.keys() {
1990 self.send_query_on_intf(ty, RRType::PTR, my_intf);
1991 }
1992 }
1993
1994 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1996 }
1997
1998 fn register_service(&mut self, mut info: ServiceInfo) {
2007 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
2009 error!("check_service_name_length: {}", &e);
2010 self.notify_monitors(DaemonEvent::Error(e));
2011 return;
2012 }
2013
2014 if info.is_addr_auto() {
2015 let selected_intfs =
2016 self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
2017 for intf in selected_intfs {
2018 info.insert_ipaddr(&intf);
2019 }
2020 }
2021
2022 debug!("register service {:?}", &info);
2023
2024 let outgoing_addrs = self.send_unsolicited_response(&mut info);
2025 if !outgoing_addrs.is_empty() {
2026 self.notify_monitors(DaemonEvent::Announce(
2027 info.get_fullname().to_string(),
2028 format!("{:?}", &outgoing_addrs),
2029 ));
2030 }
2031
2032 let service_fullname = info.get_fullname().to_lowercase();
2035 self.my_services.insert(service_fullname, info);
2036 }
2037
2038 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
2041 let mut outgoing_addrs = Vec::new();
2042 let mut outgoing_intfs = HashSet::new();
2043
2044 let mut invalid_intf_addrs = HashSet::new();
2045
2046 for (if_index, intf) in self.my_intfs.iter() {
2047 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
2048 Some(registry) => registry,
2049 None => self
2050 .dns_registry_map
2051 .entry(*if_index)
2052 .or_insert_with(DnsRegistry::new),
2053 };
2054
2055 let mut announced = false;
2056
2057 if let Some(sock) = self.ipv4_sock.as_mut() {
2059 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
2060 Ok(true) => {
2061 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
2062 outgoing_addrs.push(addr.ip());
2063 }
2064 outgoing_intfs.insert(intf.index);
2065
2066 debug!(
2067 "Announce service IPv4 {} on {}",
2068 info.get_fullname(),
2069 intf.name
2070 );
2071 announced = true;
2072 }
2073 Ok(false) => {}
2074 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2075 invalid_intf_addrs.insert(intf_addr);
2076 }
2077 }
2078 }
2079
2080 if let Some(sock) = self.ipv6_sock.as_mut() {
2081 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
2082 Ok(true) => {
2083 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
2084 outgoing_addrs.push(addr.ip());
2085 }
2086 outgoing_intfs.insert(intf.index);
2087
2088 debug!(
2089 "Announce service IPv6 {} on {}",
2090 info.get_fullname(),
2091 intf.name
2092 );
2093 announced = true;
2094 }
2095 Ok(false) => {}
2096 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2097 invalid_intf_addrs.insert(intf_addr);
2098 }
2099 }
2100 }
2101
2102 if announced {
2103 info.set_status(intf.index, ServiceStatus::Announced);
2104 } else {
2105 for timer in dns_registry.new_timers.drain(..) {
2106 self.timers.push(Reverse(timer));
2107 }
2108 info.set_status(*if_index, ServiceStatus::Probing);
2109 }
2110 }
2111
2112 if !invalid_intf_addrs.is_empty() {
2113 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2114 }
2115
2116 let next_time = current_time_millis() + 1000;
2120 for if_index in outgoing_intfs {
2121 self.add_retransmission(
2122 next_time,
2123 Command::RegisterResend(info.get_fullname().to_string(), if_index),
2124 );
2125 }
2126
2127 outgoing_addrs
2128 }
2129
2130 fn probing_handler(&mut self) {
2132 let now = current_time_millis();
2133 let mut invalid_intf_addrs = HashSet::new();
2134
2135 for (if_index, intf) in self.my_intfs.iter() {
2136 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2137 continue;
2138 };
2139
2140 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2141
2142 if !out.questions().is_empty() {
2144 trace!("sending out probing of questions: {:?}", out.questions());
2145 if let Some(sock) = self.ipv4_sock.as_mut() {
2146 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2147 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2148 {
2149 invalid_intf_addrs.insert(intf_addr);
2150 }
2151 }
2152 if let Some(sock) = self.ipv6_sock.as_mut() {
2153 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2154 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2155 {
2156 invalid_intf_addrs.insert(intf_addr);
2157 }
2158 }
2159 }
2160
2161 let waiting_services =
2163 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2164
2165 for service_name in waiting_services {
2166 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2168 if info.get_status(*if_index) == ServiceStatus::Announced {
2169 debug!("service {} already announced", info.get_fullname());
2170 continue;
2171 }
2172
2173 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2174 match announce_service_on_intf(
2175 dns_registry,
2176 info,
2177 intf,
2178 &sock.pktinfo,
2179 self.port,
2180 ) {
2181 Ok(announced) => announced,
2182 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2183 invalid_intf_addrs.insert(intf_addr);
2184 false
2185 }
2186 }
2187 } else {
2188 false
2189 };
2190 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2191 match announce_service_on_intf(
2192 dns_registry,
2193 info,
2194 intf,
2195 &sock.pktinfo,
2196 self.port,
2197 ) {
2198 Ok(announced) => announced,
2199 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2200 invalid_intf_addrs.insert(intf_addr);
2201 false
2202 }
2203 }
2204 } else {
2205 false
2206 };
2207
2208 if announced_v4 || announced_v6 {
2209 let next_time = now + 1000;
2210 let command =
2211 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2212 self.retransmissions.push(ReRun { next_time, command });
2213 self.timers.push(Reverse(next_time));
2214
2215 let fullname = dns_registry.resolve_name(&service_name).to_string();
2216
2217 let hostname = dns_registry.resolve_name(info.get_hostname());
2218
2219 debug!("wake up: announce service {} on {}", fullname, intf.name);
2220 notify_monitors(
2221 &mut self.monitors,
2222 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2223 );
2224
2225 info.set_status(*if_index, ServiceStatus::Announced);
2226 }
2227 }
2228 }
2229 }
2230
2231 if !invalid_intf_addrs.is_empty() {
2232 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2233 }
2234 }
2235
2236 fn unregister_service(
2237 &self,
2238 info: &ServiceInfo,
2239 intf: &MyIntf,
2240 sock: &PktInfoUdpSocket,
2241 ) -> Vec<u8> {
2242 let is_ipv4 = sock.domain() == Domain::IPV4;
2243
2244 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2245 out.add_answer_at_time(
2246 DnsPointer::new(
2247 info.get_type(),
2248 RRType::PTR,
2249 CLASS_IN,
2250 0,
2251 info.get_fullname().to_string(),
2252 ),
2253 0,
2254 );
2255
2256 if let Some(sub) = info.get_subtype() {
2257 trace!("Adding subdomain {}", sub);
2258 out.add_answer_at_time(
2259 DnsPointer::new(
2260 sub,
2261 RRType::PTR,
2262 CLASS_IN,
2263 0,
2264 info.get_fullname().to_string(),
2265 ),
2266 0,
2267 );
2268 }
2269
2270 out.add_answer_at_time(
2271 DnsSrv::new(
2272 info.get_fullname(),
2273 CLASS_IN | CLASS_CACHE_FLUSH,
2274 0,
2275 info.get_priority(),
2276 info.get_weight(),
2277 info.get_port(),
2278 info.get_hostname().to_string(),
2279 ),
2280 0,
2281 );
2282 out.add_answer_at_time(
2283 DnsTxt::new(
2284 info.get_fullname(),
2285 CLASS_IN | CLASS_CACHE_FLUSH,
2286 0,
2287 info.generate_txt(),
2288 ),
2289 0,
2290 );
2291
2292 let if_addrs = if is_ipv4 {
2293 info.get_addrs_on_my_intf_v4(intf)
2294 } else {
2295 info.get_addrs_on_my_intf_v6(intf)
2296 };
2297
2298 if if_addrs.is_empty() {
2299 return vec![];
2300 }
2301
2302 for address in if_addrs {
2303 out.add_answer_at_time(
2304 DnsAddress::new(
2305 info.get_hostname(),
2306 ip_address_rr_type(&address),
2307 CLASS_IN | CLASS_CACHE_FLUSH,
2308 0,
2309 address,
2310 intf.into(),
2311 ),
2312 0,
2313 );
2314 }
2315
2316 let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port, None, None) {
2318 Ok(sent_vec) => sent_vec,
2319 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2320 let invalid_intf_addrs = HashSet::from([intf_addr]);
2321 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2322 vec![]
2323 }
2324 };
2325 sent_vec.into_iter().next().unwrap_or_default()
2326 }
2327
2328 fn add_hostname_resolver(
2332 &mut self,
2333 hostname: String,
2334 listener: Sender<HostnameResolutionEvent>,
2335 timeout: Option<u64>,
2336 ) {
2337 let real_timeout = timeout.map(|t| current_time_millis() + t);
2338 self.hostname_resolvers
2339 .insert(hostname.to_lowercase(), (listener, real_timeout));
2340 if let Some(t) = real_timeout {
2341 self.add_timer(t);
2342 }
2343 }
2344
2345 fn send_query(&self, name: &str, qtype: RRType) {
2347 self.send_query_vec(&[(name, qtype)]);
2348 }
2349
2350 fn send_query_on_intf(&self, name: &str, qtype: RRType, intf: &MyIntf) {
2355 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2356 out.add_question(name, qtype);
2357
2358 let mut invalid_intf_addrs = HashSet::new();
2359 if let Some(sock) = self.ipv4_sock.as_ref() {
2360 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2361 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2362 {
2363 invalid_intf_addrs.insert(intf_addr);
2364 }
2365 }
2366 if let Some(sock) = self.ipv6_sock.as_ref() {
2367 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2368 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2369 {
2370 invalid_intf_addrs.insert(intf_addr);
2371 }
2372 }
2373 if !invalid_intf_addrs.is_empty() {
2374 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2375 }
2376 }
2377
2378 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2380 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2381 let now = current_time_millis();
2382
2383 for (name, qtype) in questions {
2384 out.add_question(name, *qtype);
2385
2386 for record in self.cache.get_known_answers(name, *qtype, now) {
2387 trace!("add known answer: {:?}", record.record);
2395 let mut new_record = record.record.clone();
2396 new_record.get_record_mut().update_ttl(now);
2397 out.add_answer_box(new_record);
2398 }
2399 }
2400
2401 let mut invalid_intf_addrs = HashSet::new();
2402 for (_, intf) in self.my_intfs.iter() {
2403 if let Some(sock) = self.ipv4_sock.as_ref() {
2404 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2405 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2406 {
2407 invalid_intf_addrs.insert(intf_addr);
2408 }
2409 }
2410 if let Some(sock) = self.ipv6_sock.as_ref() {
2411 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2412 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None, None)
2413 {
2414 invalid_intf_addrs.insert(intf_addr);
2415 }
2416 }
2417 }
2418
2419 if !invalid_intf_addrs.is_empty() {
2420 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2421 }
2422 }
2423
2424 fn handle_read(&mut self, event_key: usize) -> bool {
2429 let sock_opt = match event_key {
2430 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2431 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2432 _ => {
2433 debug!("handle_read: unknown token {}", event_key);
2434 return false;
2435 }
2436 };
2437 let Some(sock) = sock_opt.as_mut() else {
2438 debug!("handle_read: socket not available for token {}", event_key);
2439 return false;
2440 };
2441 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2442
2443 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2450 Ok(sz) => sz,
2451 Err(e) => {
2452 if e.kind() != std::io::ErrorKind::WouldBlock {
2453 debug!("listening socket read failed: {}", e);
2454 }
2455 return false;
2456 }
2457 };
2458
2459 let pkt_if_index = pktinfo.if_index as u32;
2461 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2462 debug!(
2463 "handle_read: no interface found for pktinfo if_index: {}",
2464 pktinfo.if_index
2465 );
2466 return true; };
2468
2469 let is_ipv4 = event_key == IPV4_SOCK_EVENT_KEY;
2474 if (is_ipv4 && my_intf.next_ifaddr_v4().is_none())
2475 || (!is_ipv4 && my_intf.next_ifaddr_v6().is_none())
2476 {
2477 debug!(
2478 "handle_read: dropping {} packet on intf {} (disabled)",
2479 if is_ipv4 { "IPv4" } else { "IPv6" },
2480 my_intf.name
2481 );
2482 return true;
2483 }
2484
2485 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
2488 Ok(msg) => {
2489 if msg.is_query() {
2490 let querier_addr = pktinfo.addr_src;
2491 self.handle_query(msg, pkt_if_index, querier_addr);
2492 } else if msg.is_response() {
2493 self.handle_response(msg, pkt_if_index);
2494 } else {
2495 debug!("Invalid message: not query and not response");
2496 }
2497 }
2498 Err(e) => debug!("Invalid incoming DNS message: {}", e),
2499 }
2500
2501 true
2502 }
2503
2504 fn query_unresolved(&mut self, instance: &str) -> bool {
2506 if !valid_instance_name(instance) {
2507 trace!("instance name {} not valid", instance);
2508 return false;
2509 }
2510
2511 if let Some(records) = self.cache.get_srv(instance) {
2512 for record in records {
2513 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2514 if self.cache.get_addr(srv.host()).is_none() {
2515 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2516 return true;
2517 }
2518 }
2519 }
2520 } else {
2521 self.send_query(instance, RRType::ANY);
2522 return true;
2523 }
2524
2525 false
2526 }
2527
2528 fn query_cache_for_service(
2531 &mut self,
2532 ty_domain: &str,
2533 sender: &Sender<ServiceEvent>,
2534 now: u64,
2535 ) {
2536 let mut resolved: HashSet<String> = HashSet::new();
2537 let mut unresolved: HashSet<String> = HashSet::new();
2538
2539 if let Some(records) = self.cache.get_ptr(ty_domain) {
2540 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2541 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2542 let mut new_event = None;
2543 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2544 Ok(resolved_service) => {
2545 if resolved_service.is_valid() {
2546 debug!("Resolved service from cache: {}", ptr.alias());
2547 new_event =
2548 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2549 } else {
2550 debug!("Resolved service is not valid: {}", ptr.alias());
2551 }
2552 }
2553 Err(err) => {
2554 debug!("Error while resolving service from cache: {}", err);
2555 continue;
2556 }
2557 }
2558
2559 match sender.send(ServiceEvent::ServiceFound(
2560 ty_domain.to_string(),
2561 ptr.alias().to_string(),
2562 )) {
2563 Ok(()) => debug!("sent service found {}", ptr.alias()),
2564 Err(e) => {
2565 debug!("failed to send service found: {}", e);
2566 continue;
2567 }
2568 }
2569
2570 if let Some(event) = new_event {
2571 resolved.insert(ptr.alias().to_string());
2572 match sender.send(event) {
2573 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2574 Err(e) => debug!("failed to send service resolved: {}", e),
2575 }
2576 } else {
2577 unresolved.insert(ptr.alias().to_string());
2578 }
2579 }
2580 }
2581 }
2582
2583 for instance in resolved.drain() {
2584 self.pending_resolves.remove(&instance);
2585 self.resolved.insert(instance);
2586 }
2587
2588 for instance in unresolved.drain() {
2589 self.add_pending_resolve(instance);
2590 }
2591 }
2592
2593 fn query_cache_for_hostname(
2596 &mut self,
2597 hostname: &str,
2598 sender: Sender<HostnameResolutionEvent>,
2599 ) {
2600 let addresses_map = self.cache.get_addresses_for_host(hostname);
2601 for (name, addresses) in addresses_map {
2602 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2603 Ok(()) => trace!("sent hostname addresses found"),
2604 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2605 }
2606 }
2607 }
2608
2609 fn add_pending_resolve(&mut self, instance: String) {
2610 if !self.pending_resolves.contains(&instance) {
2611 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2612 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2613 self.pending_resolves.insert(instance);
2614 }
2615 }
2616
2617 fn resolve_service_from_cache(
2619 &self,
2620 ty_domain: &str,
2621 fullname: &str,
2622 ) -> Result<ResolvedService> {
2623 let now = current_time_millis();
2624 let mut resolved_service = ResolvedService {
2625 ty_domain: ty_domain.to_string(),
2626 sub_ty_domain: None,
2627 fullname: fullname.to_string(),
2628 host: String::new(),
2629 port: 0,
2630 addresses: HashSet::new(),
2631 txt_properties: TxtProperties::new(),
2632 };
2633
2634 if let Some(subtype) = self.cache.get_subtype(fullname) {
2636 trace!(
2637 "ty_domain: {} found subtype {} for instance: {}",
2638 ty_domain,
2639 subtype,
2640 fullname
2641 );
2642 if resolved_service.sub_ty_domain.is_none() {
2643 resolved_service.sub_ty_domain = Some(subtype.to_string());
2644 }
2645 }
2646
2647 if let Some(records) = self.cache.get_srv(fullname) {
2649 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2650 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2651 resolved_service.host = dns_srv.host().to_string();
2652 resolved_service.port = dns_srv.port();
2653 }
2654 }
2655 }
2656
2657 if let Some(records) = self.cache.get_txt(fullname) {
2659 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2660 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2661 resolved_service.txt_properties = dns_txt.text().into();
2662 }
2663 }
2664 }
2665
2666 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2668 for answer in records.iter() {
2669 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2670 if dns_a.expires_soon(now) {
2671 trace!(
2672 "Addr expired or expires soon: {}",
2673 dns_a.address().to_ip_addr()
2674 );
2675 } else {
2676 let scoped = dns_a.address();
2677 if let ScopedIp::V4(v4) = &scoped {
2678 let existing = resolved_service
2681 .addresses
2682 .iter()
2683 .find(|a| a.to_ip_addr() == IpAddr::V4(*v4.addr()))
2684 .cloned();
2685 if let Some(mut existing) = existing {
2686 resolved_service.addresses.remove(&existing);
2687 if let ScopedIp::V4(existing_v4) = &mut existing {
2688 for id in v4.interface_ids() {
2689 existing_v4.add_interface_id(id.clone());
2690 }
2691 }
2692 resolved_service.addresses.insert(existing);
2693 } else {
2694 resolved_service.addresses.insert(scoped);
2695 }
2696 } else {
2697 resolved_service.addresses.insert(scoped);
2698 }
2699 }
2700 }
2701 }
2702 }
2703
2704 Ok(resolved_service)
2705 }
2706
2707 fn handle_poller_events(&mut self, events: &mio::Events) {
2708 for ev in events.iter() {
2709 trace!("event received with key {:?}", ev.token());
2710 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2711 self.signal_sock_drain();
2713
2714 if let Err(e) = self.poller.registry().reregister(
2715 &mut self.signal_sock,
2716 ev.token(),
2717 mio::Interest::READABLE,
2718 ) {
2719 debug!("failed to modify poller for signal socket: {}", e);
2720 }
2721 continue; }
2723
2724 while self.handle_read(ev.token().0) {}
2726
2727 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2729 if let Some(sock) = self.ipv4_sock.as_mut() {
2731 if let Err(e) =
2732 self.poller
2733 .registry()
2734 .reregister(sock, ev.token(), mio::Interest::READABLE)
2735 {
2736 debug!("modify poller for IPv4 socket: {}", e);
2737 }
2738 }
2739 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2740 if let Some(sock) = self.ipv6_sock.as_mut() {
2742 if let Err(e) =
2743 self.poller
2744 .registry()
2745 .reregister(sock, ev.token(), mio::Interest::READABLE)
2746 {
2747 debug!("modify poller for IPv6 socket: {}", e);
2748 }
2749 }
2750 }
2751 }
2752 }
2753
2754 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2757 let now = current_time_millis();
2758
2759 let mut record_predicate = |record: &DnsRecordBox| {
2761 if !record.get_record().is_expired(now) {
2762 return true;
2763 }
2764
2765 debug!("record is expired, removing it from cache.");
2766 if self.cache.remove(record) {
2767 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2769 call_service_listener(
2770 &self.service_queriers,
2771 dns_ptr.get_name(),
2772 ServiceEvent::ServiceRemoved(
2773 dns_ptr.get_name().to_string(),
2774 dns_ptr.alias().to_string(),
2775 ),
2776 );
2777 }
2778 }
2779 false
2780 };
2781 msg.answers_mut().retain(&mut record_predicate);
2782 msg.authorities_mut().retain(&mut record_predicate);
2783 msg.additionals_mut().retain(&mut record_predicate);
2784
2785 self.conflict_handler(&msg, if_index);
2787
2788 let mut is_for_us = true; for answer in msg.answers() {
2795 if answer.get_type() == RRType::PTR {
2796 if self.service_queriers.contains_key(answer.get_name()) {
2797 is_for_us = true;
2798 break; } else {
2800 is_for_us = false;
2801 }
2802 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2803 let answer_lowercase = answer.get_name().to_lowercase();
2805 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2806 is_for_us = true;
2807 break; }
2809 }
2810 }
2811
2812 if self.accept_unsolicited {
2814 is_for_us = true;
2815 }
2816
2817 struct InstanceChange {
2819 ty: RRType, name: String, }
2822
2823 let mut changes = Vec::new();
2831 let mut timers = Vec::new();
2832 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2833 return;
2834 };
2835 for record in msg.all_records() {
2836 match self
2837 .cache
2838 .add_or_update(my_intf, record, &mut timers, is_for_us)
2839 {
2840 Some((dns_record, true)) => {
2841 timers.push(dns_record.record.get_record().get_expire_time());
2842 timers.push(dns_record.record.get_record().get_refresh_time());
2843
2844 let ty = dns_record.record.get_type();
2845 let name = dns_record.record.get_name();
2846
2847 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2849 if self.service_queriers.contains_key(name) {
2850 timers.push(dns_record.record.get_record().get_refresh_time());
2851 }
2852
2853 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2855 {
2856 debug!("calling listener with service found: {name}");
2857 call_service_listener(
2858 &self.service_queriers,
2859 name,
2860 ServiceEvent::ServiceFound(
2861 name.to_string(),
2862 dns_ptr.alias().to_string(),
2863 ),
2864 );
2865 changes.push(InstanceChange {
2866 ty,
2867 name: dns_ptr.alias().to_string(),
2868 });
2869 }
2870 } else {
2871 changes.push(InstanceChange {
2872 ty,
2873 name: name.to_string(),
2874 });
2875 }
2876 }
2877 Some((dns_record, false)) => {
2878 timers.push(dns_record.record.get_record().get_expire_time());
2879 timers.push(dns_record.record.get_record().get_refresh_time());
2880 }
2881 _ => {}
2882 }
2883 }
2884
2885 for t in timers {
2887 self.add_timer(t);
2888 }
2889
2890 for change in changes
2892 .iter()
2893 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2894 {
2895 let addr_map = self.cache.get_addresses_for_host(&change.name);
2896 for (name, addresses) in addr_map {
2897 call_hostname_resolution_listener(
2898 &self.hostname_resolvers,
2899 &change.name,
2900 HostnameResolutionEvent::AddressesFound(name, addresses),
2901 )
2902 }
2903 }
2904
2905 let mut updated_instances = HashSet::new();
2907 for update in changes {
2908 match update.ty {
2909 RRType::PTR | RRType::SRV | RRType::TXT => {
2910 updated_instances.insert(update.name);
2911 }
2912 RRType::A | RRType::AAAA => {
2913 let instances = self.cache.get_instances_on_host(&update.name);
2914 updated_instances.extend(instances);
2915 }
2916 _ => {}
2917 }
2918 }
2919
2920 self.resolve_updated_instances(&updated_instances);
2921 }
2922
2923 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2924 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2925 debug!("handle_response: no intf found for index {if_index}");
2926 return;
2927 };
2928
2929 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2930 return;
2931 };
2932
2933 for answer in msg.answers().iter() {
2934 let mut new_records = Vec::new();
2935
2936 let name = answer.get_name();
2937 let Some(probe) = dns_registry.probing.get_mut(name) else {
2938 continue;
2939 };
2940
2941 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2943 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2944 if answer_addr.interface_id.index != if_index {
2945 debug!(
2946 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2947 answer_addr, my_intf.name
2948 );
2949 continue;
2950 }
2951 }
2952
2953 let any_match = probe.records.iter().any(|r| {
2956 r.get_type() == answer.get_type()
2957 && r.get_class() == answer.get_class()
2958 && r.rrdata_match(answer.as_ref())
2959 });
2960 if any_match {
2961 continue; }
2963 }
2964
2965 probe.records.retain(|record| {
2966 if record.get_type() == answer.get_type()
2967 && record.get_class() == answer.get_class()
2968 && !record.rrdata_match(answer.as_ref())
2969 {
2970 debug!(
2971 "found conflict name: '{name}' record: {}: {} PEER: {}",
2972 record.get_type(),
2973 record.rdata_print(),
2974 answer.rdata_print()
2975 );
2976
2977 let mut new_record = record.clone();
2980 let new_name = match record.get_type() {
2981 RRType::A => hostname_change(name),
2982 RRType::AAAA => hostname_change(name),
2983 _ => name_change(name),
2984 };
2985 new_record.get_record_mut().set_new_name(new_name);
2986 new_records.push(new_record);
2987 return false; }
2989
2990 true
2991 });
2992
2993 let create_time = current_time_millis() + fastrand::u64(0..250);
3000
3001 let waiting_services = probe.waiting_services.clone();
3002
3003 for record in new_records {
3004 if dns_registry.update_hostname(name, record.get_name(), create_time) {
3005 self.timers.push(Reverse(create_time));
3006 }
3007
3008 dns_registry.name_changes.insert(
3010 record.get_record().get_original_name().to_string(),
3011 record.get_name().to_string(),
3012 );
3013
3014 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
3015 Some(p) => p,
3016 None => {
3017 let new_probe = dns_registry
3018 .probing
3019 .entry(record.get_name().to_string())
3020 .or_insert_with(|| {
3021 debug!("conflict handler: new probe of {}", record.get_name());
3022 Probe::new(create_time)
3023 });
3024 self.timers.push(Reverse(new_probe.next_send));
3025 new_probe
3026 }
3027 };
3028
3029 debug!(
3030 "insert record with new name '{}' {} into probe",
3031 record.get_name(),
3032 record.get_type()
3033 );
3034 new_probe.insert_record(record);
3035
3036 new_probe.waiting_services.extend(waiting_services.clone());
3037 }
3038 }
3039 }
3040
3041 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
3048 if updated_instances.is_empty() {
3049 return;
3050 }
3051
3052 let mut resolved: HashSet<String> = HashSet::new();
3053 let mut unresolved: HashSet<String> = HashSet::new();
3054 let mut removed_instances = HashMap::new();
3055
3056 let now = current_time_millis();
3057
3058 for (ty_domain, records) in self.cache.all_ptr().iter() {
3059 if !self.service_queriers.contains_key(ty_domain) {
3060 continue;
3062 }
3063
3064 for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
3065 let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
3066 continue;
3067 };
3068
3069 let instance = dns_ptr.alias();
3070 if !updated_instances.contains(instance) {
3071 continue;
3072 }
3073
3074 let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
3075 else {
3076 continue;
3077 };
3078
3079 debug!("resolve_updated_instances: from cache: {instance}");
3080 if resolved_service.is_valid() {
3081 debug!("call queriers to resolve {instance}");
3082 resolved.insert(instance.to_string());
3083 let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
3084 call_service_listener(&self.service_queriers, ty_domain, event);
3085 } else {
3086 debug!("Resolved service is not valid: {instance}");
3087 if self.resolved.remove(dns_ptr.alias()) {
3088 removed_instances
3089 .entry(ty_domain.to_string())
3090 .or_insert_with(HashSet::new)
3091 .insert(instance.to_string());
3092 }
3093 unresolved.insert(instance.to_string());
3094 }
3095 }
3096 }
3097
3098 for instance in resolved.drain() {
3099 self.pending_resolves.remove(&instance);
3100 self.resolved.insert(instance);
3101 }
3102
3103 for instance in unresolved.drain() {
3104 self.add_pending_resolve(instance);
3105 }
3106
3107 if !removed_instances.is_empty() {
3108 debug!(
3109 "resolve_updated_instances: removed {}",
3110 &removed_instances.len()
3111 );
3112 self.notify_service_removal(removed_instances);
3113 }
3114 }
3115
3116 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, querier_addr: SocketAddr) {
3118 let querier_ip = querier_addr.ip();
3119 let is_ipv4 = querier_ip.is_ipv4();
3120 let sock_opt = if is_ipv4 {
3121 &self.ipv4_sock
3122 } else {
3123 &self.ipv6_sock
3124 };
3125 let Some(sock) = sock_opt.as_ref() else {
3126 debug!("handle_query: socket not available for intf {}", if_index);
3127 return;
3128 };
3129 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3130
3131 const META_QUERY: &str = "_services._dns-sd._udp.local.";
3134
3135 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3136 debug!("missing dns registry for intf {}", if_index);
3137 return;
3138 };
3139
3140 let Some(intf) = self.my_intfs.get(&if_index) else {
3141 debug!("handle_query: no intf found for index {if_index}");
3142 return;
3143 };
3144
3145 for question in msg.questions().iter() {
3146 let qtype = question.entry_type();
3147 let q_name = question.entry_name();
3148
3149 if qtype == RRType::PTR {
3150 for service in self.my_services.values() {
3151 if service.get_status(if_index) != ServiceStatus::Announced {
3152 continue;
3153 }
3154
3155 if service.matches_type_or_subtype(q_name) {
3156 out.add_answer_with_additionals(&msg, service, intf, dns_registry, is_ipv4);
3157 } else if q_name == META_QUERY {
3158 let ttl = service.get_other_ttl();
3159 let alias = service.get_type().to_string();
3160 let ptr = DnsPointer::new(q_name, RRType::PTR, CLASS_IN, ttl, alias);
3161 if !out.add_answer(&msg, ptr) {
3162 trace!("answer was not added for meta-query {:?}", &question);
3163 }
3164 }
3165 }
3166 } else {
3167 if qtype == RRType::ANY && msg.num_authorities() > 0 {
3169 if let Some(probe) = dns_registry.probing.get_mut(q_name) {
3170 probe.tiebreaking(&msg, q_name);
3171 }
3172 }
3173
3174 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3175 for service in self.my_services.values() {
3176 if service.get_status(if_index) != ServiceStatus::Announced {
3177 continue;
3178 }
3179
3180 let service_hostname = dns_registry.resolve_name(service.get_hostname());
3181
3182 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3183 let mut intf_addrs: Vec<IpAddr> = Vec::new();
3190 if qtype == RRType::A || qtype == RRType::ANY {
3191 intf_addrs.extend(service.get_addrs_on_my_intf_v4(intf));
3192 }
3193 if qtype == RRType::AAAA || qtype == RRType::ANY {
3194 intf_addrs.extend(service.get_addrs_on_my_intf_v6(intf));
3195 }
3196 if intf_addrs.is_empty()
3197 && (qtype == RRType::A || qtype == RRType::AAAA)
3198 {
3199 let t = match qtype {
3200 RRType::A => "TYPE_A",
3201 RRType::AAAA => "TYPE_AAAA",
3202 _ => "invalid_type",
3203 };
3204 trace!(
3205 "Cannot find valid addrs for {} response on intf {:?}",
3206 t,
3207 &intf
3208 );
3209 continue;
3210 }
3211 for address in intf_addrs {
3212 out.add_answer(
3213 &msg,
3214 DnsAddress::new(
3215 service_hostname,
3216 ip_address_rr_type(&address),
3217 CLASS_IN | CLASS_CACHE_FLUSH,
3218 service.get_host_ttl(),
3219 address,
3220 intf.into(),
3221 ),
3222 );
3223 }
3224 }
3225 }
3226 }
3227
3228 let query_name = q_name.to_lowercase();
3229 let service_opt = self
3230 .my_services
3231 .iter()
3232 .find(|(k, _v)| dns_registry.resolve_name(k.as_str()) == query_name)
3233 .map(|(_, v)| v);
3234
3235 let Some(service) = service_opt else {
3236 continue;
3237 };
3238
3239 if service.get_status(if_index) != ServiceStatus::Announced {
3240 continue;
3241 }
3242
3243 let intf_addrs = if is_ipv4 {
3244 service.get_addrs_on_my_intf_v4(intf)
3245 } else {
3246 service.get_addrs_on_my_intf_v6(intf)
3247 };
3248 if intf_addrs.is_empty() {
3249 debug!(
3250 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3251 &intf
3252 );
3253 continue;
3254 }
3255
3256 add_answer_of_service(
3257 &mut out,
3258 &msg,
3259 question.entry_name(),
3260 service,
3261 qtype,
3262 intf_addrs,
3263 );
3264 }
3265 }
3266
3267 if out.answers_count() > 0 {
3268 debug!("sending response on intf {}", &intf.name);
3269 out.set_id(msg.id());
3270
3271 let matched_source = intf
3274 .addrs
3275 .iter()
3276 .find(|if_addr| valid_ip_on_intf(&querier_ip, if_addr));
3277
3278 let unicast_dest = if querier_addr.port() != MDNS_PORT {
3286 Some(querier_addr)
3287 } else {
3288 None
3289 };
3290
3291 if unicast_dest.is_some() {
3292 for q in msg.questions() {
3293 out.add_question(q.entry_name(), q.entry_type());
3294 }
3295 out.clear_cache_flush_bits();
3296 }
3297
3298 if let Err(InternalError::IntfAddrInvalid(intf_addr)) = send_dns_outgoing(
3299 &out,
3300 intf,
3301 &sock.pktinfo,
3302 self.port,
3303 matched_source,
3304 unicast_dest,
3305 ) {
3306 let invalid_intf_addr = HashSet::from([intf_addr]);
3307 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3308 }
3309
3310 let if_name = intf.name.clone();
3311
3312 self.increase_counter(Counter::Respond, 1);
3313 self.notify_monitors(DaemonEvent::Respond(if_name));
3314 }
3315
3316 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3317 }
3318
3319 fn increase_counter(&mut self, counter: Counter, count: i64) {
3321 let key = counter.to_string();
3322 match self.counters.get_mut(&key) {
3323 Some(v) => *v += count,
3324 None => {
3325 self.counters.insert(key, count);
3326 }
3327 }
3328 }
3329
3330 fn set_counter(&mut self, counter: Counter, count: i64) {
3332 let key = counter.to_string();
3333 self.counters.insert(key, count);
3334 }
3335
3336 fn signal_sock_drain(&self) {
3337 let mut signal_buf = [0; 1024];
3338
3339 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3341 trace!(
3342 "signal socket recvd: {}",
3343 String::from_utf8_lossy(&signal_buf[0..sz])
3344 );
3345 }
3346 }
3347
3348 fn add_retransmission(&mut self, next_time: u64, command: Command) {
3349 self.retransmissions.push(ReRun { next_time, command });
3350 self.add_timer(next_time);
3351 }
3352
3353 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3356 for (ty_domain, sender) in self.service_queriers.iter() {
3357 if let Some(instances) = expired.get(ty_domain) {
3358 for instance_name in instances {
3359 let event = ServiceEvent::ServiceRemoved(
3360 ty_domain.to_string(),
3361 instance_name.to_string(),
3362 );
3363 match sender.send(event) {
3364 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3365 Err(e) => debug!("Failed to send event: {}", e),
3366 }
3367 }
3368 }
3369 }
3370 }
3371
3372 fn exec_command(&mut self, command: Command, repeating: bool) {
3376 trace!("exec_command: {:?} repeating: {}", &command, repeating);
3377 match command {
3378 Command::Browse(ty, next_delay, cache_only, listener) => {
3379 self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3380 }
3381
3382 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3383 self.exec_command_resolve_hostname(
3384 repeating, hostname, next_delay, listener, timeout,
3385 );
3386 }
3387
3388 Command::Register(service_info) => {
3389 self.register_service(*service_info);
3390 self.increase_counter(Counter::Register, 1);
3391 }
3392
3393 Command::RegisterResend(fullname, intf) => {
3394 trace!("register-resend service: {fullname} on {}", &intf);
3395 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3396 self.exec_command_register_resend(fullname, intf)
3397 {
3398 let invalid_intf_addr = HashSet::from([intf_addr]);
3399 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3400 }
3401 }
3402
3403 Command::Unregister(fullname, resp_s) => {
3404 trace!("unregister service {} repeat {}", &fullname, &repeating);
3405 self.exec_command_unregister(repeating, fullname, resp_s);
3406 }
3407
3408 Command::UnregisterResend(packet, if_index, is_ipv4) => {
3409 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3410 }
3411
3412 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3413
3414 Command::StopResolveHostname(hostname) => {
3415 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3416 }
3417
3418 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3419
3420 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3421
3422 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3423 Ok(()) => trace!("Sent status to the client"),
3424 Err(e) => debug!("Failed to send status: {}", e),
3425 },
3426
3427 Command::Monitor(resp_s) => {
3428 self.monitors.push(resp_s);
3429 }
3430
3431 Command::SetOption(daemon_opt) => {
3432 self.process_set_option(daemon_opt);
3433 }
3434
3435 Command::GetOption(resp_s) => {
3436 let val = DaemonOptionVal {
3437 _service_name_len_max: self.service_name_len_max,
3438 ip_check_interval: self.ip_check_interval,
3439 };
3440 if let Err(e) = resp_s.send(val) {
3441 debug!("Failed to send options: {}", e);
3442 }
3443 }
3444
3445 Command::Verify(instance_fullname, timeout) => {
3446 self.exec_command_verify(instance_fullname, timeout, repeating);
3447 }
3448
3449 Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3450 for intf_addr in invalid_intf_addrs {
3451 self.del_interface_addr(&intf_addr);
3452 }
3453
3454 self.check_ip_changes();
3455 }
3456
3457 _ => {
3458 debug!("unexpected command: {:?}", &command);
3459 }
3460 }
3461 }
3462
3463 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3464 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3465 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3466 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3467 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3468 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3469 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3470 self.set_counter(Counter::Timer, self.timers.len() as i64);
3471
3472 let dns_registry_probe_count: usize = self
3473 .dns_registry_map
3474 .values()
3475 .map(|r| r.probing.len())
3476 .sum();
3477 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3478
3479 let dns_registry_active_count: usize = self
3480 .dns_registry_map
3481 .values()
3482 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3483 .sum();
3484 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3485
3486 let dns_registry_timer_count: usize = self
3487 .dns_registry_map
3488 .values()
3489 .map(|r| r.new_timers.len())
3490 .sum();
3491 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3492
3493 let dns_registry_name_change_count: usize = self
3494 .dns_registry_map
3495 .values()
3496 .map(|r| r.name_changes.len())
3497 .sum();
3498 self.set_counter(
3499 Counter::DnsRegistryNameChange,
3500 dns_registry_name_change_count as i64,
3501 );
3502
3503 if let Err(e) = resp_s.send(self.counters.clone()) {
3505 debug!("Failed to send metrics: {}", e);
3506 }
3507 }
3508
3509 fn exec_command_browse(
3510 &mut self,
3511 repeating: bool,
3512 ty: String,
3513 next_delay: u32,
3514 cache_only: bool,
3515 listener: Sender<ServiceEvent>,
3516 ) {
3517 let pretty_addrs: Vec<String> = self
3518 .my_intfs
3519 .iter()
3520 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3521 .collect();
3522
3523 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3524 "{ty} on {} interfaces [{}]",
3525 pretty_addrs.len(),
3526 pretty_addrs.join(", ")
3527 ))) {
3528 debug!(
3529 "Failed to send SearchStarted({})(repeating:{}): {}",
3530 &ty, repeating, e
3531 );
3532 return;
3533 }
3534
3535 let now = current_time_millis();
3536 if !repeating {
3537 self.service_queriers.insert(ty.clone(), listener.clone());
3541
3542 self.query_cache_for_service(&ty, &listener, now);
3544 }
3545
3546 if cache_only {
3547 match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3549 Ok(()) => debug!("SearchStopped sent for {}", &ty),
3550 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3551 }
3552 return;
3553 }
3554
3555 self.send_query(&ty, RRType::PTR);
3556 self.increase_counter(Counter::Browse, 1);
3557
3558 let next_time = now + (next_delay * 1000) as u64;
3559 let max_delay = 60 * 60;
3560 let delay = cmp::min(next_delay * 2, max_delay);
3561 self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3562 }
3563
3564 fn exec_command_resolve_hostname(
3565 &mut self,
3566 repeating: bool,
3567 hostname: String,
3568 next_delay: u32,
3569 listener: Sender<HostnameResolutionEvent>,
3570 timeout: Option<u64>,
3571 ) {
3572 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3573 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3574 "{} on addrs {:?}",
3575 &hostname, &addr_list
3576 ))) {
3577 debug!(
3578 "Failed to send ResolveStarted({})(repeating:{}): {}",
3579 &hostname, repeating, e
3580 );
3581 return;
3582 }
3583 if !repeating {
3584 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3585 self.query_cache_for_hostname(&hostname, listener.clone());
3587 }
3588
3589 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3590 self.increase_counter(Counter::ResolveHostname, 1);
3591
3592 let now = current_time_millis();
3593 let next_time = now + u64::from(next_delay) * 1000;
3594 let max_delay = 60 * 60;
3595 let delay = cmp::min(next_delay * 2, max_delay);
3596
3597 if self
3599 .hostname_resolvers
3600 .get(&hostname)
3601 .and_then(|(_sender, timeout)| *timeout)
3602 .map(|timeout| next_time < timeout)
3603 .unwrap_or(true)
3604 {
3605 self.add_retransmission(
3606 next_time,
3607 Command::ResolveHostname(hostname, delay, listener, None),
3608 );
3609 }
3610 }
3611
3612 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3613 let pending_query = self.query_unresolved(&instance);
3614 let max_try = 3;
3615 if pending_query && try_count < max_try {
3616 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3619 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3620 }
3621 }
3622
3623 fn exec_command_unregister(
3624 &mut self,
3625 repeating: bool,
3626 fullname: String,
3627 resp_s: Sender<UnregisterStatus>,
3628 ) {
3629 let response = match self.my_services.remove_entry(&fullname) {
3630 None => {
3631 debug!("unregister: cannot find such service {}", &fullname);
3632 UnregisterStatus::NotFound
3633 }
3634 Some((_k, info)) => {
3635 let mut timers = Vec::new();
3636
3637 for (if_index, intf) in self.my_intfs.iter() {
3638 if let Some(sock) = self.ipv4_sock.as_ref() {
3639 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3640 if !repeating && !packet.is_empty() {
3642 let next_time = current_time_millis() + 120;
3643 self.retransmissions.push(ReRun {
3644 next_time,
3645 command: Command::UnregisterResend(packet, *if_index, true),
3646 });
3647 timers.push(next_time);
3648 }
3649 }
3650
3651 if let Some(sock) = self.ipv6_sock.as_ref() {
3653 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3654 if !repeating && !packet.is_empty() {
3655 let next_time = current_time_millis() + 120;
3656 self.retransmissions.push(ReRun {
3657 next_time,
3658 command: Command::UnregisterResend(packet, *if_index, false),
3659 });
3660 timers.push(next_time);
3661 }
3662 }
3663 }
3664
3665 for t in timers {
3666 self.add_timer(t);
3667 }
3668
3669 self.increase_counter(Counter::Unregister, 1);
3670 UnregisterStatus::OK
3671 }
3672 };
3673 if let Err(e) = resp_s.send(response) {
3674 debug!("unregister: failed to send response: {}", e);
3675 }
3676 }
3677
3678 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3679 let Some(intf) = self.my_intfs.get(&if_index) else {
3680 return;
3681 };
3682 let sock_opt = if is_ipv4 {
3683 &self.ipv4_sock
3684 } else {
3685 &self.ipv6_sock
3686 };
3687 let Some(sock) = sock_opt else {
3688 return;
3689 };
3690
3691 let if_addr = if is_ipv4 {
3692 match intf.next_ifaddr_v4() {
3693 Some(addr) => addr,
3694 None => return,
3695 }
3696 } else {
3697 match intf.next_ifaddr_v6() {
3698 Some(addr) => addr,
3699 None => return,
3700 }
3701 };
3702
3703 debug!("UnregisterResend from {:?}", if_addr);
3704 multicast_on_intf(
3705 &packet[..],
3706 &intf.name,
3707 intf.index,
3708 if_addr,
3709 &sock.pktinfo,
3710 self.port,
3711 );
3712
3713 self.increase_counter(Counter::UnregisterResend, 1);
3714 }
3715
3716 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3717 match self.service_queriers.remove_entry(&ty_domain) {
3718 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3719 Some((ty, sender)) => {
3720 trace!("StopBrowse: removed queryer for {}", &ty);
3722 let mut i = 0;
3723 while i < self.retransmissions.len() {
3724 if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3725 if t == &ty {
3726 self.retransmissions.remove(i);
3727 trace!("StopBrowse: removed retransmission for {}", &ty);
3728 continue;
3729 }
3730 }
3731 i += 1;
3732 }
3733
3734 self.cache.remove_service_type(&ty_domain);
3736
3737 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3739 Ok(()) => trace!("Sent SearchStopped to the listener"),
3740 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3741 }
3742 }
3743 }
3744 }
3745
3746 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3747 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3748 trace!("StopResolve: removed queryer for {}", &host);
3750 let mut i = 0;
3751 while i < self.retransmissions.len() {
3752 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3753 if t == &host {
3754 self.retransmissions.remove(i);
3755 trace!("StopResolve: removed retransmission for {}", &host);
3756 continue;
3757 }
3758 }
3759 i += 1;
3760 }
3761
3762 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3764 Ok(()) => trace!("Sent SearchStopped to the listener"),
3765 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3766 }
3767 }
3768 }
3769
3770 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3771 let Some(info) = self.my_services.get_mut(&fullname) else {
3772 trace!("announce: cannot find such service {}", &fullname);
3773 return Ok(());
3774 };
3775
3776 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3777 return Ok(());
3778 };
3779
3780 let Some(intf) = self.my_intfs.get(&if_index) else {
3781 return Ok(());
3782 };
3783
3784 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3785 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3786 } else {
3787 false
3788 };
3789 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3790 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3791 } else {
3792 false
3793 };
3794
3795 if announced_v4 || announced_v6 {
3796 let hostname = dns_registry.resolve_name(info.get_hostname());
3797 let service_name = dns_registry.resolve_name(&fullname).to_string();
3798
3799 debug!("resend: announce service {service_name} on {}", intf.name);
3800
3801 notify_monitors(
3802 &mut self.monitors,
3803 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3804 );
3805 info.set_status(if_index, ServiceStatus::Announced);
3806 } else {
3807 debug!("register-resend should not fail");
3808 }
3809
3810 self.increase_counter(Counter::RegisterResend, 1);
3811 Ok(())
3812 }
3813
3814 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3815 let now = current_time_millis();
3825 let expire_at = if repeating {
3826 None
3827 } else {
3828 Some(now + timeout.as_millis() as u64)
3829 };
3830
3831 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3833
3834 if !record_vec.is_empty() {
3835 let query_vec: Vec<(&str, RRType)> = record_vec
3836 .iter()
3837 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3838 .collect();
3839 self.send_query_vec(&query_vec);
3840
3841 if let Some(new_expire) = expire_at {
3842 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3846 }
3847 }
3848 }
3849
3850 fn refresh_active_services(&mut self) {
3852 let mut query_ptr_count = 0;
3853 let mut query_srv_count = 0;
3854 let mut new_timers = HashSet::new();
3855 let mut query_addr_count = 0;
3856
3857 for (ty_domain, _sender) in self.service_queriers.iter() {
3858 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3859 if !refreshed_timers.is_empty() {
3860 trace!("sending refresh query for PTR: {}", ty_domain);
3861 self.send_query(ty_domain, RRType::PTR);
3862 query_ptr_count += 1;
3863 new_timers.extend(refreshed_timers);
3864 }
3865
3866 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3867 for (instance, types) in instances {
3868 trace!("sending refresh query for: {}", &instance);
3869 let query_vec = types
3870 .into_iter()
3871 .map(|ty| (instance.as_str(), ty))
3872 .collect::<Vec<_>>();
3873 self.send_query_vec(&query_vec);
3874 query_srv_count += 1;
3875 }
3876 new_timers.extend(timers);
3877 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3878 for hostname in hostnames.iter() {
3879 trace!("sending refresh queries for A and AAAA: {}", hostname);
3880 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3881 query_addr_count += 2;
3882 }
3883 new_timers.extend(timers);
3884 }
3885
3886 for timer in new_timers {
3887 self.add_timer(timer);
3888 }
3889
3890 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3891 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3892 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3893 }
3894}
3895
3896fn add_answer_of_service(
3898 out: &mut DnsOutgoing,
3899 msg: &DnsIncoming,
3900 entry_name: &str,
3901 service: &ServiceInfo,
3902 qtype: RRType,
3903 intf_addrs: Vec<IpAddr>,
3904) {
3905 if qtype == RRType::SRV || qtype == RRType::ANY {
3906 out.add_answer(
3907 msg,
3908 DnsSrv::new(
3909 entry_name,
3910 CLASS_IN | CLASS_CACHE_FLUSH,
3911 service.get_host_ttl(),
3912 service.get_priority(),
3913 service.get_weight(),
3914 service.get_port(),
3915 service.get_hostname().to_string(),
3916 ),
3917 );
3918 }
3919
3920 if qtype == RRType::TXT || qtype == RRType::ANY {
3921 out.add_answer(
3922 msg,
3923 DnsTxt::new(
3924 entry_name,
3925 CLASS_IN | CLASS_CACHE_FLUSH,
3926 service.get_other_ttl(),
3927 service.generate_txt(),
3928 ),
3929 );
3930 }
3931
3932 if qtype == RRType::SRV {
3933 for address in intf_addrs {
3934 out.add_additional_answer(DnsAddress::new(
3935 service.get_hostname(),
3936 ip_address_rr_type(&address),
3937 CLASS_IN | CLASS_CACHE_FLUSH,
3938 service.get_host_ttl(),
3939 address,
3940 InterfaceId::default(),
3941 ));
3942 }
3943 }
3944}
3945
3946#[derive(Clone, Debug)]
3949#[non_exhaustive]
3950pub enum ServiceEvent {
3951 SearchStarted(String),
3953
3954 ServiceFound(String, String),
3956
3957 ServiceResolved(Box<ResolvedService>),
3959
3960 ServiceRemoved(String, String),
3962
3963 SearchStopped(String),
3965}
3966
3967#[derive(Clone, Debug)]
3970#[non_exhaustive]
3971pub enum HostnameResolutionEvent {
3972 SearchStarted(String),
3974 AddressesFound(String, HashSet<ScopedIp>),
3976 AddressesRemoved(String, HashSet<ScopedIp>),
3978 SearchTimeout(String),
3980 SearchStopped(String),
3982}
3983
3984#[derive(Clone, Debug)]
3987#[non_exhaustive]
3988pub enum DaemonEvent {
3989 Announce(String, String),
3991
3992 Error(Error),
3994
3995 IpAdd(IpAddr),
3997
3998 IpDel(IpAddr),
4000
4001 NameChange(DnsNameChange),
4004
4005 Respond(String),
4007}
4008
4009#[derive(Clone, Debug)]
4012pub struct DnsNameChange {
4013 pub original: String,
4015
4016 pub new_name: String,
4026
4027 pub rr_type: RRType,
4029
4030 pub intf_name: String,
4032}
4033
4034#[derive(Debug)]
4036enum Command {
4037 Browse(String, u32, bool, Sender<ServiceEvent>),
4039
4040 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(Box<ServiceInfo>),
4045
4046 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
4067
4068 GetStatus(Sender<DaemonStatus>),
4070
4071 Monitor(Sender<DaemonEvent>),
4073
4074 SetOption(DaemonOption),
4075
4076 GetOption(Sender<DaemonOptionVal>),
4077
4078 Verify(String, Duration),
4083
4084 InvalidIntfAddrs(HashSet<Interface>),
4086
4087 Exit(Sender<DaemonStatus>),
4088}
4089
4090impl fmt::Display for Command {
4091 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4092 match self {
4093 Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
4094 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
4095 Self::Exit(_) => write!(f, "Command Exit"),
4096 Self::GetStatus(_) => write!(f, "Command GetStatus"),
4097 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
4098 Self::Monitor(_) => write!(f, "Command Monitor"),
4099 Self::Register(_) => write!(f, "Command Register"),
4100 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
4101 Self::SetOption(_) => write!(f, "Command SetOption"),
4102 Self::GetOption(_) => write!(f, "Command GetOption"),
4103 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
4104 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
4105 Self::Unregister(_, _) => write!(f, "Command Unregister"),
4106 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
4107 Self::Resolve(_, _) => write!(f, "Command Resolve"),
4108 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
4109 Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
4110 }
4111 }
4112}
4113
4114struct DaemonOptionVal {
4115 _service_name_len_max: u8,
4116 ip_check_interval: u64,
4117}
4118
4119#[derive(Debug)]
4120enum DaemonOption {
4121 ServiceNameLenMax(u8),
4122 IpCheckInterval(u64),
4123 EnableInterface(Vec<IfKind>),
4124 DisableInterface(Vec<IfKind>),
4125 MulticastLoopV4(bool),
4126 MulticastLoopV6(bool),
4127 AcceptUnsolicited(bool),
4128 IncludeAppleP2P(bool),
4129 #[cfg(test)]
4130 TestDownInterface(String),
4131 #[cfg(test)]
4132 TestUpInterface(String),
4133}
4134
4135const DOMAIN_LEN: usize = "._tcp.local.".len();
4137
4138fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
4140 if ty_domain.len() <= DOMAIN_LEN + 1 {
4141 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
4143 }
4144
4145 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
4147 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
4148 }
4149 Ok(())
4150}
4151
4152fn check_domain_suffix(name: &str) -> Result<()> {
4154 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
4155 return Err(e_fmt!(
4156 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
4157 name
4158 ));
4159 }
4160
4161 Ok(())
4162}
4163
4164fn check_service_name(fullname: &str) -> Result<()> {
4172 check_domain_suffix(fullname)?;
4173
4174 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
4175 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
4176
4177 if &name[0..1] != "_" {
4178 return Err(e_fmt!("Service name must start with '_'"));
4179 }
4180
4181 let name = &name[1..];
4182
4183 if name.contains("--") {
4184 return Err(e_fmt!("Service name must not contain '--'"));
4185 }
4186
4187 if name.starts_with('-') || name.ends_with('-') {
4188 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4189 }
4190
4191 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4192 if ascii_count < 1 {
4193 return Err(e_fmt!(
4194 "Service name must contain at least one letter (eg: 'A-Za-z')"
4195 ));
4196 }
4197
4198 Ok(())
4199}
4200
4201fn check_hostname(hostname: &str) -> Result<()> {
4203 if !hostname.ends_with(".local.") {
4204 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4205 }
4206
4207 if hostname == ".local." {
4208 return Err(e_fmt!(
4209 "The part of the hostname before '.local.' cannot be empty"
4210 ));
4211 }
4212
4213 if hostname.len() > 255 {
4214 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4215 }
4216
4217 Ok(())
4218}
4219
4220fn call_service_listener(
4221 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4222 ty_domain: &str,
4223 event: ServiceEvent,
4224) {
4225 if let Some(listener) = listeners_map.get(ty_domain) {
4226 match listener.send(event) {
4227 Ok(()) => trace!("Sent event to listener successfully"),
4228 Err(e) => debug!("Failed to send event: {}", e),
4229 }
4230 }
4231}
4232
4233fn call_hostname_resolution_listener(
4234 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4235 hostname: &str,
4236 event: HostnameResolutionEvent,
4237) {
4238 let hostname_lower = hostname.to_lowercase();
4239 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4240 match listener.send(event) {
4241 Ok(()) => trace!("Sent event to listener successfully"),
4242 Err(e) => debug!("Failed to send event: {}", e),
4243 }
4244 }
4245}
4246
4247fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4251 my_ip_interfaces_inner(with_loopback, false)
4252}
4253
4254fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4255 if_addrs::get_if_addrs()
4256 .unwrap_or_default()
4257 .into_iter()
4258 .filter(|i| {
4259 i.is_oper_up()
4260 && !i.is_p2p()
4261 && (!i.is_loopback() || with_loopback)
4262 && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4263 })
4264 .collect()
4265}
4266
4267fn is_apple_p2p_by_name(name: &str) -> bool {
4270 let p2p_prefixes = ["awdl", "llw"];
4271 p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4272}
4273
4274fn send_dns_outgoing(
4277 out: &DnsOutgoing,
4278 my_intf: &MyIntf,
4279 sock: &PktInfoUdpSocket,
4280 port: u16,
4281 source: Option<&IfAddr>,
4282 unicast_dest: Option<SocketAddr>,
4283) -> MyResult<Vec<Vec<u8>>> {
4284 let if_name = &my_intf.name;
4285
4286 let if_addr = match source {
4287 Some(addr) => addr,
4288 None => {
4289 if sock.domain() == Domain::IPV4 {
4290 match my_intf.next_ifaddr_v4() {
4291 Some(addr) => addr,
4292 None => return Ok(vec![]),
4293 }
4294 } else {
4295 match my_intf.next_ifaddr_v6() {
4296 Some(addr) => addr,
4297 None => return Ok(vec![]),
4298 }
4299 }
4300 }
4301 };
4302
4303 send_dns_outgoing_impl(
4304 out,
4305 if_name,
4306 my_intf.index,
4307 if_addr,
4308 sock,
4309 port,
4310 unicast_dest,
4311 )
4312}
4313
4314fn send_dns_outgoing_impl(
4316 out: &DnsOutgoing,
4317 if_name: &str,
4318 if_index: u32,
4319 if_addr: &IfAddr,
4320 sock: &PktInfoUdpSocket,
4321 port: u16,
4322 unicast_dest: Option<SocketAddr>,
4323) -> MyResult<Vec<Vec<u8>>> {
4324 let qtype = if out.is_query() {
4325 "query"
4326 } else {
4327 if out.answers_count() == 0 && out.additionals().is_empty() {
4328 return Ok(vec![]); }
4330 "response"
4331 };
4332 trace!(
4333 "send {}: {} questions {} answers {} authorities {} additional",
4334 qtype,
4335 out.questions().len(),
4336 out.answers_count(),
4337 out.authorities().len(),
4338 out.additionals().len()
4339 );
4340
4341 match if_addr.ip() {
4342 IpAddr::V4(ipv4) => {
4343 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4344 debug!(
4345 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4346 ipv4, e
4347 );
4348 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4350 let intf_addr = Interface {
4351 name: if_name.to_string(),
4352 addr: if_addr.clone(),
4353 index: Some(if_index),
4354 oper_status: if_addrs::IfOperStatus::Down,
4355 is_p2p: false,
4356 #[cfg(windows)]
4357 adapter_name: String::new(),
4358 };
4359 return Err(InternalError::IntfAddrInvalid(intf_addr));
4360 }
4361 return Ok(vec![]); }
4363 }
4364 IpAddr::V6(ipv6) => {
4365 if let Err(e) = sock.set_multicast_if_v6(if_index) {
4366 debug!(
4367 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4368 ipv6, e
4369 );
4370 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4372 let intf_addr = Interface {
4373 name: if_name.to_string(),
4374 addr: if_addr.clone(),
4375 index: Some(if_index),
4376 oper_status: if_addrs::IfOperStatus::Down,
4377 is_p2p: false,
4378 #[cfg(windows)]
4379 adapter_name: String::new(),
4380 };
4381 return Err(InternalError::IntfAddrInvalid(intf_addr));
4382 }
4383 return Ok(vec![]); }
4385 }
4386 }
4387
4388 let packet_list = out.to_data_on_wire();
4389 for packet in packet_list.iter() {
4390 match unicast_dest {
4391 Some(dest) => unicast_on_intf(packet, if_name, dest, sock),
4392 None => multicast_on_intf(packet, if_name, if_index, if_addr, sock, port),
4393 }
4394 }
4395 Ok(packet_list)
4396}
4397
4398fn unicast_on_intf(packet: &[u8], if_name: &str, dest: SocketAddr, socket: &PktInfoUdpSocket) {
4401 if packet.len() > MAX_MSG_ABSOLUTE {
4402 debug!("Drop over-sized packet ({})", packet.len());
4403 return;
4404 }
4405
4406 let sock_addr = dest.into();
4407 match socket.send_to(packet, &sock_addr) {
4408 Ok(sz) => trace!(
4409 "sent unicast {} bytes on interface {} to {}",
4410 sz,
4411 if_name,
4412 dest
4413 ),
4414 Err(e) => trace!(
4415 "Failed to send unicast to {} via {:?}: {}",
4416 dest,
4417 &if_name,
4418 e
4419 ),
4420 }
4421}
4422
4423fn multicast_on_intf(
4425 packet: &[u8],
4426 if_name: &str,
4427 if_index: u32,
4428 if_addr: &IfAddr,
4429 socket: &PktInfoUdpSocket,
4430 port: u16,
4431) {
4432 if packet.len() > MAX_MSG_ABSOLUTE {
4433 debug!("Drop over-sized packet ({})", packet.len());
4434 return;
4435 }
4436
4437 let addr: SocketAddr = match if_addr {
4438 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4439 if_addrs::IfAddr::V6(_) => {
4440 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4441 sock.set_scope_id(if_index); sock.into()
4443 }
4444 };
4445
4446 let sock_addr = addr.into();
4448 match socket.send_to(packet, &sock_addr) {
4449 Ok(sz) => trace!(
4450 "sent out {} bytes on interface {} (idx {}) addr {}",
4451 sz,
4452 if_name,
4453 if_index,
4454 if_addr.ip()
4455 ),
4456 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4457 }
4458}
4459
4460fn valid_instance_name(name: &str) -> bool {
4464 name.split('.').count() >= 5
4465}
4466
4467fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4468 monitors.retain(|sender| {
4469 if let Err(e) = sender.try_send(event.clone()) {
4470 debug!("notify_monitors: try_send: {}", &e);
4471 if matches!(e, TrySendError::Disconnected(_)) {
4472 return false; }
4474 }
4475 true
4476 });
4477}
4478
4479fn prepare_announce(
4482 info: &ServiceInfo,
4483 intf: &MyIntf,
4484 dns_registry: &mut DnsRegistry,
4485 is_ipv4: bool,
4486) -> Option<DnsOutgoing> {
4487 let intf_addrs = if is_ipv4 {
4488 info.get_addrs_on_my_intf_v4(intf)
4489 } else {
4490 info.get_addrs_on_my_intf_v6(intf)
4491 };
4492
4493 if intf_addrs.is_empty() {
4494 debug!(
4495 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4496 &intf.name
4497 );
4498 return None;
4499 }
4500
4501 let service_fullname = dns_registry.resolve_name(info.get_fullname());
4503
4504 debug!(
4505 "prepare to announce service {service_fullname} on {:?}",
4506 &intf_addrs
4507 );
4508
4509 let mut probing_count = 0;
4510 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4511 let create_time = current_time_millis() + fastrand::u64(0..250);
4512
4513 out.add_answer_at_time(
4514 DnsPointer::new(
4515 info.get_type(),
4516 RRType::PTR,
4517 CLASS_IN,
4518 info.get_other_ttl(),
4519 service_fullname.to_string(),
4520 ),
4521 0,
4522 );
4523
4524 if let Some(sub) = info.get_subtype() {
4525 trace!("Adding subdomain {}", sub);
4526 out.add_answer_at_time(
4527 DnsPointer::new(
4528 sub,
4529 RRType::PTR,
4530 CLASS_IN,
4531 info.get_other_ttl(),
4532 service_fullname.to_string(),
4533 ),
4534 0,
4535 );
4536 }
4537
4538 let hostname = dns_registry.resolve_name(info.get_hostname()).to_string();
4540
4541 let mut srv = DnsSrv::new(
4542 info.get_fullname(),
4543 CLASS_IN | CLASS_CACHE_FLUSH,
4544 info.get_host_ttl(),
4545 info.get_priority(),
4546 info.get_weight(),
4547 info.get_port(),
4548 hostname,
4549 );
4550
4551 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4552 srv.get_record_mut().set_new_name(new_name.to_string());
4553 }
4554
4555 if !info.requires_probe()
4556 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4557 {
4558 out.add_answer_at_time(srv, 0);
4559 } else {
4560 probing_count += 1;
4561 }
4562
4563 let mut txt = DnsTxt::new(
4566 info.get_fullname(),
4567 CLASS_IN | CLASS_CACHE_FLUSH,
4568 info.get_other_ttl(),
4569 info.generate_txt(),
4570 );
4571
4572 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4573 txt.get_record_mut().set_new_name(new_name.to_string());
4574 }
4575
4576 if !info.requires_probe()
4577 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4578 {
4579 out.add_answer_at_time(txt, 0);
4580 } else {
4581 probing_count += 1;
4582 }
4583
4584 let hostname = info.get_hostname();
4587 for address in intf_addrs {
4588 let mut dns_addr = DnsAddress::new(
4589 hostname,
4590 ip_address_rr_type(&address),
4591 CLASS_IN | CLASS_CACHE_FLUSH,
4592 info.get_host_ttl(),
4593 address,
4594 intf.into(),
4595 );
4596
4597 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4598 dns_addr.get_record_mut().set_new_name(new_name.to_string());
4599 }
4600
4601 if !info.requires_probe()
4602 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4603 {
4604 out.add_answer_at_time(dns_addr, 0);
4605 } else {
4606 probing_count += 1;
4607 }
4608 }
4609
4610 if probing_count > 0 {
4611 return None;
4612 }
4613
4614 Some(out)
4615}
4616
4617fn announce_service_on_intf(
4620 dns_registry: &mut DnsRegistry,
4621 info: &ServiceInfo,
4622 intf: &MyIntf,
4623 sock: &PktInfoUdpSocket,
4624 port: u16,
4625) -> MyResult<bool> {
4626 let is_ipv4 = sock.domain() == Domain::IPV4;
4627 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4628 let _ = send_dns_outgoing(&out, intf, sock, port, None, None)?;
4629 return Ok(true);
4630 }
4631
4632 Ok(false)
4633}
4634
4635fn name_change(original: &str) -> String {
4643 let mut parts: Vec<_> = original.split('.').collect();
4644 let Some(first_part) = parts.get_mut(0) else {
4645 return format!("{original} (2)");
4646 };
4647
4648 let mut new_name = format!("{first_part} (2)");
4649
4650 if let Some(paren_pos) = first_part.rfind(" (") {
4652 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4654 let absolute_end_pos = paren_pos + end_paren;
4655 if absolute_end_pos == first_part.len() - 1 {
4657 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4660 let base_name = &first_part[..paren_pos];
4661 new_name = format!("{} ({})", base_name, number + 1)
4662 }
4663 }
4664 }
4665 }
4666
4667 *first_part = &new_name;
4668 parts.join(".")
4669}
4670
4671fn hostname_change(original: &str) -> String {
4679 let mut parts: Vec<_> = original.split('.').collect();
4680 let Some(first_part) = parts.get_mut(0) else {
4681 return format!("{original}-2");
4682 };
4683
4684 let mut new_name = format!("{first_part}-2");
4685
4686 if let Some(hyphen_pos) = first_part.rfind('-') {
4688 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4690 let base_name = &first_part[..hyphen_pos];
4691 new_name = format!("{}-{}", base_name, number + 1);
4692 }
4693 }
4694
4695 *first_part = &new_name;
4696 parts.join(".")
4697}
4698
4699fn check_probing(
4702 dns_registry: &mut DnsRegistry,
4703 timers: &mut BinaryHeap<Reverse<u64>>,
4704 now: u64,
4705) -> (DnsOutgoing, Vec<String>) {
4706 let mut expired_probes = Vec::new();
4707 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4708
4709 for (name, probe) in dns_registry.probing.iter_mut() {
4710 if now >= probe.next_send {
4711 if probe.expired(now) {
4712 expired_probes.push(name.clone());
4714 } else {
4715 out.add_question(name, RRType::ANY);
4716
4717 for record in probe.records.iter() {
4725 out.add_authority(record.clone());
4726 }
4727
4728 probe.update_next_send(now);
4729
4730 timers.push(Reverse(probe.next_send));
4732 }
4733 }
4734 }
4735
4736 (out, expired_probes)
4737}
4738
4739fn handle_expired_probes(
4744 expired_probes: Vec<String>,
4745 intf_name: &str,
4746 dns_registry: &mut DnsRegistry,
4747 monitors: &mut Vec<Sender<DaemonEvent>>,
4748) -> HashSet<String> {
4749 let mut waiting_services = HashSet::new();
4750
4751 for name in expired_probes {
4752 let Some(probe) = dns_registry.probing.remove(&name) else {
4753 continue;
4754 };
4755
4756 for record in probe.records.iter() {
4758 if let Some(new_name) = record.get_record().get_new_name() {
4759 dns_registry
4760 .name_changes
4761 .insert(name.clone(), new_name.to_string());
4762
4763 let event = DnsNameChange {
4764 original: record.get_record().get_original_name().to_string(),
4765 new_name: new_name.to_string(),
4766 rr_type: record.get_type(),
4767 intf_name: intf_name.to_string(),
4768 };
4769 debug!("Name change event: {:?}", &event);
4770 notify_monitors(monitors, DaemonEvent::NameChange(event));
4771 }
4772 }
4773
4774 debug!(
4776 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4777 probe.records.len(),
4778 probe.waiting_services.len(),
4779 );
4780
4781 if !probe.records.is_empty() {
4783 match dns_registry.active.get_mut(&name) {
4784 Some(records) => {
4785 records.extend(probe.records);
4786 }
4787 None => {
4788 dns_registry.active.insert(name, probe.records);
4789 }
4790 }
4791
4792 waiting_services.extend(probe.waiting_services);
4793 }
4794 }
4795
4796 waiting_services
4797}
4798
4799fn resolve_addr_to_index(if_kind: IfKind, interfaces: &[Interface]) -> IfKind {
4801 if let IfKind::Addr(addr) = &if_kind {
4802 if let Some(intf) = interfaces.iter().find(|intf| &intf.ip() == addr) {
4803 let if_index = intf.index.unwrap_or(0);
4804 return if addr.is_ipv4() {
4805 IfKind::IndexV4(if_index)
4806 } else {
4807 IfKind::IndexV6(if_index)
4808 };
4809 }
4810 }
4811 if_kind
4812}
4813
4814#[cfg(test)]
4815mod tests {
4816 use super::{
4817 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4818 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4819 valid_ip_on_intf, HostnameResolutionEvent, MyIntf, ServiceDaemon, ServiceEvent,
4820 ServiceInfo, GROUP_ADDR_V4, MDNS_PORT,
4821 };
4822 use crate::{
4823 dns_parser::{
4824 DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp,
4825 CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE,
4826 },
4827 service_daemon::{add_answer_of_service, check_hostname},
4828 };
4829 use if_addrs::{IfAddr, Ifv4Addr};
4830 use std::{
4831 collections::HashSet,
4832 net::{IpAddr, Ipv4Addr, UdpSocket},
4833 time::{Duration, Instant, SystemTime},
4834 };
4835 use test_log::test;
4836
4837 #[test]
4838 fn test_response_source_ifaddr_match() {
4839 let ifaddr_a = IfAddr::V4(Ifv4Addr {
4843 ip: Ipv4Addr::new(192, 168, 1, 148),
4844 netmask: Ipv4Addr::new(255, 255, 255, 0),
4845 broadcast: None,
4846 prefixlen: 24,
4847 });
4848 let ifaddr_b = IfAddr::V4(Ifv4Addr {
4849 ip: Ipv4Addr::new(10, 238, 0, 51),
4850 netmask: Ipv4Addr::new(255, 255, 255, 0),
4851 broadcast: None,
4852 prefixlen: 24,
4853 });
4854
4855 let intf = MyIntf {
4856 name: "dummy0".to_string(),
4857 index: 1,
4858 addrs: HashSet::from([ifaddr_a.clone(), ifaddr_b.clone()]),
4859 };
4860
4861 let pick = |querier: IpAddr| -> Option<IfAddr> {
4862 intf.addrs
4863 .iter()
4864 .find(|a| valid_ip_on_intf(&querier, a))
4865 .cloned()
4866 };
4867
4868 assert_eq!(
4869 pick(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))),
4870 Some(ifaddr_a)
4871 );
4872 assert_eq!(
4873 pick(IpAddr::V4(Ipv4Addr::new(10, 238, 0, 99))),
4874 Some(ifaddr_b)
4875 );
4876 assert_eq!(pick(IpAddr::V4(Ipv4Addr::new(172, 16, 0, 1))), None);
4878 }
4879
4880 #[test]
4881 fn test_instance_name() {
4882 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4883 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4884 assert!(!valid_instance_name("_printer._tcp.local."));
4885 }
4886
4887 #[test]
4888 fn test_legacy_unicast_response() {
4889 let intf_ip = match my_ip_interfaces(false)
4900 .into_iter()
4901 .find_map(|intf| match intf.ip() {
4902 IpAddr::V4(ip) => Some(ip),
4903 IpAddr::V6(_) => None,
4904 }) {
4905 Some(ip) => ip,
4906 None => {
4907 println!("No IPv4 interface available; skipping test.");
4908 return;
4909 }
4910 };
4911
4912 let daemon = ServiceDaemon::new().expect("Failed to create daemon");
4914 let unique = SystemTime::now()
4915 .duration_since(SystemTime::UNIX_EPOCH)
4916 .unwrap()
4917 .as_micros();
4918 let hostname = format!("legacy-unicast-test-{unique}.local.");
4919 let service_info = ServiceInfo::new(
4920 "_legacy-uni._udp.local.",
4921 "test_instance",
4922 &hostname,
4923 &[IpAddr::V4(intf_ip)] as &[IpAddr],
4924 5353, None,
4926 )
4927 .expect("invalid service info");
4928 daemon.register(service_info).expect("register service");
4929
4930 let querier = UdpSocket::bind((intf_ip, 0)).expect("bind querier socket");
4934 querier
4935 .set_multicast_loop_v4(true)
4936 .expect("enable multicast loopback");
4937 querier
4938 .set_read_timeout(Some(Duration::from_millis(500)))
4939 .expect("set read timeout");
4940 assert_ne!(
4941 querier.local_addr().unwrap().port(),
4942 MDNS_PORT,
4943 "querier must use an ephemeral (non-5353) source port"
4944 );
4945
4946 let mut query = DnsOutgoing::new(FLAGS_QR_QUERY);
4948 query.add_question(&hostname, RRType::A);
4949 let query_packet = query
4950 .to_data_on_wire()
4951 .pop()
4952 .expect("query serialized to one packet");
4953
4954 let if_id = InterfaceId {
4955 name: "test".to_string(),
4956 index: 0,
4957 };
4958
4959 let deadline = Instant::now() + Duration::from_secs(8);
4962 let mut response = None;
4963 'outer: while Instant::now() < deadline {
4964 querier
4965 .send_to(&query_packet, (GROUP_ADDR_V4, MDNS_PORT))
4966 .expect("send query");
4967
4968 let mut buf = [0u8; 1500];
4971 while let Ok((len, from)) = querier.recv_from(&mut buf) {
4972 let Ok(msg) = DnsIncoming::new(buf[..len].to_vec(), if_id.clone()) else {
4973 continue;
4974 };
4975 if msg.is_response()
4976 && msg
4977 .answers()
4978 .iter()
4979 .any(|a| a.get_name().eq_ignore_ascii_case(&hostname))
4980 {
4981 response = Some((msg, from));
4982 break 'outer;
4983 }
4984 }
4985 }
4986
4987 let (msg, from) = response.expect(
4988 "expected a unicast response to the legacy query; \
4989 a multicast-only reply would never reach this un-joined socket",
4990 );
4991
4992 assert_eq!(
4994 from.port(),
4995 MDNS_PORT,
4996 "response should originate from the mDNS port"
4997 );
4998
4999 assert!(
5001 msg.questions()
5002 .iter()
5003 .any(|q| q.entry_name().eq_ignore_ascii_case(&hostname)),
5004 "legacy unicast response must echo the question section"
5005 );
5006
5007 let answer = msg
5010 .answers()
5011 .iter()
5012 .find(|a| a.get_name().eq_ignore_ascii_case(&hostname))
5013 .expect("response contains an answer for our hostname");
5014 assert_eq!(
5015 answer.get_type(),
5016 RRType::A,
5017 "an A query should be answered with an A record"
5018 );
5019 assert!(
5020 !answer.get_cache_flush(),
5021 "legacy unicast responses must clear the cache-flush bit"
5022 );
5023
5024 daemon.shutdown().unwrap();
5025 }
5026
5027 #[test]
5028 fn test_check_service_name_length() {
5029 let result = check_service_name_length("_tcp", 100);
5030 assert!(result.is_err());
5031 if let Err(e) = result {
5032 println!("{}", e);
5033 }
5034 }
5035
5036 #[test]
5037 fn test_check_hostname() {
5038 for hostname in &[
5040 "my_host.local.",
5041 &("A".repeat(255 - ".local.".len()) + ".local."),
5042 ] {
5043 let result = check_hostname(hostname);
5044 assert!(result.is_ok());
5045 }
5046
5047 for hostname in &[
5049 "my_host.local",
5050 ".local.",
5051 &("A".repeat(256 - ".local.".len()) + ".local."),
5052 ] {
5053 let result = check_hostname(hostname);
5054 assert!(result.is_err());
5055 if let Err(e) = result {
5056 println!("{}", e);
5057 }
5058 }
5059 }
5060
5061 #[test]
5062 fn test_check_domain_suffix() {
5063 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
5064 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
5065 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
5066 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
5067 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
5068 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
5069 }
5070
5071 #[test]
5072 fn test_service_with_temporarily_invalidated_ptr() {
5073 let d = ServiceDaemon::new().expect("Failed to create daemon");
5075
5076 let service = "_test_inval_ptr._udp.local.";
5077 let host_name = "my_host_tmp_invalidated_ptr.local.";
5078 let intfs: Vec<_> = my_ip_interfaces(false);
5079 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
5080 let port = 5201;
5081 let my_service =
5082 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
5083 .expect("invalid service info")
5084 .enable_addr_auto();
5085 let result = d.register(my_service.clone());
5086 assert!(result.is_ok());
5087
5088 let browse_chan = d.browse(service).unwrap();
5090 let timeout = Duration::from_secs(2);
5091 let mut resolved = false;
5092
5093 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5094 match event {
5095 ServiceEvent::ServiceResolved(info) => {
5096 resolved = true;
5097 println!("Resolved a service of {}", &info.fullname);
5098 break;
5099 }
5100 e => {
5101 println!("Received event {:?}", e);
5102 }
5103 }
5104 }
5105
5106 assert!(resolved);
5107
5108 println!("Stopping browse of {}", service);
5109 d.stop_browse(service).unwrap();
5112
5113 let mut stopped = false;
5118 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5119 match event {
5120 ServiceEvent::SearchStopped(_) => {
5121 stopped = true;
5122 println!("Stopped browsing service");
5123 break;
5124 }
5125 e => {
5129 println!("Received event {:?}", e);
5130 }
5131 }
5132 }
5133
5134 assert!(stopped);
5135
5136 let invalidate_ptr_packet = DnsPointer::new(
5138 my_service.get_type(),
5139 RRType::PTR,
5140 CLASS_IN,
5141 0,
5142 my_service.get_fullname().to_string(),
5143 );
5144
5145 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5146 packet_buffer.add_additional_answer(invalidate_ptr_packet);
5147
5148 for intf in intfs {
5149 let sock = _new_socket_bind(&intf, true).unwrap();
5150 send_dns_outgoing_impl(
5151 &packet_buffer,
5152 &intf.name,
5153 intf.index.unwrap_or(0),
5154 &intf.addr,
5155 &sock.pktinfo,
5156 MDNS_PORT,
5157 None,
5158 )
5159 .unwrap();
5160 }
5161
5162 println!(
5163 "Sent PTR record invalidation. Starting second browse for {}",
5164 service
5165 );
5166
5167 let browse_chan = d.browse(service).unwrap();
5169
5170 resolved = false;
5171 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5172 match event {
5173 ServiceEvent::ServiceResolved(info) => {
5174 resolved = true;
5175 println!("Resolved a service of {}", &info.fullname);
5176 break;
5177 }
5178 e => {
5179 println!("Received event {:?}", e);
5180 }
5181 }
5182 }
5183
5184 assert!(resolved);
5185 d.shutdown().unwrap();
5186 }
5187
5188 #[test]
5189 fn test_expired_srv() {
5190 let service_type = "_expired-srv._udp.local.";
5192 let instance = "test_instance";
5193 let host_name = "expired_srv_host.local.";
5194 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
5195 .unwrap()
5196 .enable_addr_auto();
5197 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
5202
5203 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5205 let result = mdns_server.register(my_service);
5206 assert!(result.is_ok());
5207
5208 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5209 let browse_chan = mdns_client.browse(service_type).unwrap();
5210 let timeout = Duration::from_secs(2);
5211 let mut resolved = false;
5212
5213 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5214 if let ServiceEvent::ServiceResolved(info) = event {
5215 resolved = true;
5216 println!("Resolved a service of {}", &info.fullname);
5217 break;
5218 }
5219 }
5220
5221 assert!(resolved);
5222
5223 mdns_server.shutdown().unwrap();
5225
5226 let expire_timeout = Duration::from_secs(new_ttl as u64);
5228 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
5229 if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
5230 println!("Service removed: {}: {}", &service_type, &full_name);
5231 break;
5232 }
5233 }
5234 }
5235
5236 #[test]
5237 fn test_hostname_resolution_address_removed() {
5238 let server = ServiceDaemon::new().expect("Failed to create server");
5240 let hostname = "addr_remove_host._tcp.local.";
5241 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
5242 .iter()
5243 .find(|iface| iface.ip().is_ipv4())
5244 .map(|iface| iface.into())
5245 .unwrap();
5246
5247 let mut my_service = ServiceInfo::new(
5248 "_host_res_test._tcp.local.",
5249 "my_instance",
5250 hostname,
5251 service_ip_addr.to_ip_addr(),
5252 1234,
5253 None,
5254 )
5255 .expect("invalid service info");
5256
5257 let addr_ttl = 2;
5259 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
5262
5263 let client = ServiceDaemon::new().expect("Failed to create client");
5265 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
5266 let resolved = loop {
5267 match event_receiver.recv() {
5268 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
5269 assert!(found_hostname == hostname);
5270 assert!(addresses.contains(&service_ip_addr));
5271 println!("address found: {:?}", &addresses);
5272 break true;
5273 }
5274 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
5275 Ok(_event) => {}
5276 Err(_) => break false,
5277 }
5278 };
5279
5280 assert!(resolved);
5281
5282 server.shutdown().unwrap();
5284
5285 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
5287 let removed = loop {
5288 match event_receiver.recv_timeout(timeout) {
5289 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
5290 assert!(removed_host == hostname);
5291 assert!(addresses.contains(&service_ip_addr));
5292
5293 println!(
5294 "address removed: hostname: {} addresses: {:?}",
5295 &hostname, &addresses
5296 );
5297 break true;
5298 }
5299 Ok(_event) => {}
5300 Err(_) => {
5301 break false;
5302 }
5303 }
5304 };
5305
5306 assert!(removed);
5307
5308 client.shutdown().unwrap();
5309 }
5310
5311 #[test]
5312 fn test_refresh_ptr() {
5313 let service_type = "_refresh-ptr._udp.local.";
5315 let instance = "test_instance";
5316 let host_name = "refresh_ptr_host.local.";
5317 let service_ip_addr = my_ip_interfaces(false)
5318 .iter()
5319 .find(|iface| iface.ip().is_ipv4())
5320 .map(|iface| iface.ip())
5321 .unwrap();
5322
5323 let mut my_service = ServiceInfo::new(
5324 service_type,
5325 instance,
5326 host_name,
5327 service_ip_addr,
5328 5023,
5329 None,
5330 )
5331 .unwrap();
5332
5333 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
5335
5336 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5338 let result = mdns_server.register(my_service);
5339 assert!(result.is_ok());
5340
5341 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5342 let browse_chan = mdns_client.browse(service_type).unwrap();
5343 let timeout = Duration::from_millis(1500); let mut resolved = false;
5345
5346 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5348 if let ServiceEvent::ServiceResolved(info) = event {
5349 resolved = true;
5350 println!("Resolved a service of {}", &info.fullname);
5351 break;
5352 }
5353 }
5354
5355 assert!(resolved);
5356
5357 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
5359 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5360 println!("event: {:?}", &event);
5361 }
5362
5363 let metrics_chan = mdns_client.get_metrics().unwrap();
5365 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
5366 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
5367 assert_eq!(ptr_refresh_counter, 1);
5368 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
5369 assert_eq!(srvtxt_refresh_counter, 1);
5370
5371 mdns_server.shutdown().unwrap();
5373 mdns_client.shutdown().unwrap();
5374 }
5375
5376 #[test]
5377 fn test_name_change() {
5378 assert_eq!(name_change("foo.local."), "foo (2).local.");
5379 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5380 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5381 assert_eq!(name_change("foo"), "foo (2)");
5382 assert_eq!(name_change("foo (2)"), "foo (3)");
5383 assert_eq!(name_change(""), " (2)");
5384
5385 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
5390
5391 #[test]
5392 fn test_hostname_change() {
5393 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5394 assert_eq!(hostname_change("foo"), "foo-2");
5395 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5396 assert_eq!(hostname_change("foo-9"), "foo-10");
5397 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5398 }
5399
5400 #[test]
5401 fn test_add_answer_txt_ttl() {
5402 let service_type = "_test_add_answer._udp.local.";
5404 let instance = "test_instance";
5405 let host_name = "add_answer_host.local.";
5406 let service_intf = my_ip_interfaces(false)
5407 .into_iter()
5408 .find(|iface| iface.ip().is_ipv4())
5409 .unwrap();
5410 let service_ip_addr = service_intf.ip();
5411 let my_service = ServiceInfo::new(
5412 service_type,
5413 instance,
5414 host_name,
5415 service_ip_addr,
5416 5023,
5417 None,
5418 )
5419 .unwrap();
5420
5421 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5423
5424 let mut dummy_data = out.to_data_on_wire();
5426 let interface_id = InterfaceId::from(&service_intf);
5427 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5428
5429 let if_addrs = vec![service_intf.ip()];
5431 add_answer_of_service(
5432 &mut out,
5433 &incoming,
5434 instance,
5435 &my_service,
5436 RRType::TXT,
5437 if_addrs,
5438 );
5439
5440 assert!(
5442 out.answers_count() > 0,
5443 "No answers added to the outgoing message"
5444 );
5445
5446 let answer = out._answers().first().unwrap();
5448 assert_eq!(answer.0.get_type(), RRType::TXT);
5449
5450 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5452 }
5453
5454 #[test]
5455 fn test_interface_flip() {
5456 let ty_domain = "_intf-flip._udp.local.";
5458 let host_name = "intf_flip.local.";
5459 let now = SystemTime::now()
5460 .duration_since(SystemTime::UNIX_EPOCH)
5461 .unwrap();
5462 let instance_name = now.as_micros().to_string(); let port = 5200;
5464
5465 let (ip_addr1, intf_name) = my_ip_interfaces(false)
5467 .iter()
5468 .find(|iface| iface.ip().is_ipv4())
5469 .map(|iface| (iface.ip(), iface.name.clone()))
5470 .unwrap();
5471
5472 println!("Using interface {} with IP {}", intf_name, ip_addr1);
5473
5474 let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5476 .expect("valid service info");
5477 let server1 = ServiceDaemon::new().expect("failed to start server");
5478 server1
5479 .register(service1)
5480 .expect("Failed to register service1");
5481
5482 std::thread::sleep(Duration::from_secs(2));
5484
5485 let client = ServiceDaemon::new().expect("failed to start client");
5487
5488 let receiver = client.browse(ty_domain).unwrap();
5489
5490 let timeout = Duration::from_secs(3);
5491 let mut got_data = false;
5492
5493 while let Ok(event) = receiver.recv_timeout(timeout) {
5494 if let ServiceEvent::ServiceResolved(_) = event {
5495 println!("Received ServiceResolved event");
5496 got_data = true;
5497 break;
5498 }
5499 }
5500
5501 assert!(got_data, "Should receive ServiceResolved event");
5502
5503 client.set_ip_check_interval(1).unwrap();
5505
5506 println!("Shutting down interface {}", &intf_name);
5508 client.test_down_interface(&intf_name).unwrap();
5509
5510 let mut got_removed = false;
5511
5512 while let Ok(event) = receiver.recv_timeout(timeout) {
5513 if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5514 got_removed = true;
5515 println!("removed: {ty_domain} : {instance}");
5516 break;
5517 }
5518 }
5519 assert!(got_removed, "Should receive ServiceRemoved event");
5520
5521 println!("Bringing up interface {}", &intf_name);
5522 client.test_up_interface(&intf_name).unwrap();
5523 let mut got_data = false;
5524 while let Ok(event) = receiver.recv_timeout(timeout) {
5525 if let ServiceEvent::ServiceResolved(resolved) = event {
5526 got_data = true;
5527 println!("Received ServiceResolved: {:?}", resolved);
5528 break;
5529 }
5530 }
5531 assert!(
5532 got_data,
5533 "Should receive ServiceResolved event after interface is back up"
5534 );
5535
5536 server1.shutdown().unwrap();
5537 client.shutdown().unwrap();
5538 }
5539
5540 #[test]
5541 fn test_cache_only() {
5542 let service_type = "_cache_only._udp.local.";
5544 let instance = "test_instance";
5545 let host_name = "cache_only_host.local.";
5546 let service_ip_addr = my_ip_interfaces(false)
5547 .iter()
5548 .find(|iface| iface.ip().is_ipv4())
5549 .map(|iface| iface.ip())
5550 .unwrap();
5551
5552 let mut my_service = ServiceInfo::new(
5553 service_type,
5554 instance,
5555 host_name,
5556 service_ip_addr,
5557 5023,
5558 None,
5559 )
5560 .unwrap();
5561
5562 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
5564
5565 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5566
5567 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5570 std::thread::sleep(Duration::from_secs(2));
5571
5572 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5574 let result = mdns_server.register(my_service);
5575 assert!(result.is_ok());
5576
5577 let timeout = Duration::from_millis(1500); let mut resolved = false;
5579
5580 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5582 if let ServiceEvent::ServiceResolved(info) = event {
5583 resolved = true;
5584 println!("Resolved a service of {}", &info.get_fullname());
5585 break;
5586 }
5587 }
5588
5589 assert!(resolved);
5590
5591 mdns_server.shutdown().unwrap();
5593 mdns_client.shutdown().unwrap();
5594 }
5595
5596 #[test]
5597 fn test_cache_only_unsolicited() {
5598 let service_type = "_c_unsolicit._udp.local.";
5599 let instance = "test_instance";
5600 let host_name = "c_unsolicit_host.local.";
5601 let service_ip_addr = my_ip_interfaces(false)
5602 .iter()
5603 .find(|iface| iface.ip().is_ipv4())
5604 .map(|iface| iface.ip())
5605 .unwrap();
5606
5607 let my_service = ServiceInfo::new(
5608 service_type,
5609 instance,
5610 host_name,
5611 service_ip_addr,
5612 5023,
5613 None,
5614 )
5615 .unwrap();
5616
5617 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5619 let result = mdns_server.register(my_service);
5620 assert!(result.is_ok());
5621
5622 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5623 mdns_client.accept_unsolicited(true).unwrap();
5624
5625 std::thread::sleep(Duration::from_secs(2));
5628 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5629 let timeout = Duration::from_millis(1500); let mut resolved = false;
5631
5632 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5634 if let ServiceEvent::ServiceResolved(info) = event {
5635 resolved = true;
5636 println!("Resolved a service of {}", &info.get_fullname());
5637 break;
5638 }
5639 }
5640
5641 assert!(resolved);
5642
5643 mdns_server.shutdown().unwrap();
5645 mdns_client.shutdown().unwrap();
5646 }
5647
5648 #[test]
5649 fn test_custom_port_isolation() {
5650 let service_type = "_custom_port._udp.local.";
5655 let instance_custom = "custom_port_instance";
5656 let instance_default = "default_port_instance";
5657 let host_name = "custom_port_host.local.";
5658
5659 let service_ip_addr = my_ip_interfaces(false)
5660 .iter()
5661 .find(|iface| iface.ip().is_ipv4())
5662 .map(|iface| iface.ip())
5663 .expect("Test requires an IPv4 interface");
5664
5665 let service_custom = ServiceInfo::new(
5667 service_type,
5668 instance_custom,
5669 host_name,
5670 service_ip_addr,
5671 8080,
5672 None,
5673 )
5674 .unwrap();
5675
5676 let service_default = ServiceInfo::new(
5678 service_type,
5679 instance_default,
5680 host_name,
5681 service_ip_addr,
5682 8081,
5683 None,
5684 )
5685 .unwrap();
5686
5687 let custom_port = 5454u16;
5689 let server_custom =
5690 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5691 let client_custom =
5692 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5693
5694 let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5696
5697 server_custom
5699 .register(service_custom.clone())
5700 .expect("Failed to register custom port service");
5701
5702 server_default
5704 .register(service_default.clone())
5705 .expect("Failed to register default port service");
5706
5707 let browse_custom = client_custom
5709 .browse(service_type)
5710 .expect("Failed to browse on custom port");
5711
5712 let timeout = Duration::from_secs(3);
5713 let mut found_custom = false;
5714 let mut found_default_on_custom = false;
5715
5716 while let Ok(event) = browse_custom.recv_timeout(timeout) {
5718 if let ServiceEvent::ServiceResolved(info) = event {
5719 println!(
5720 "Custom port client resolved: {} on port {}",
5721 info.get_fullname(),
5722 info.get_port()
5723 );
5724 if info.get_fullname().starts_with(instance_custom) {
5725 found_custom = true;
5726 assert_eq!(info.get_port(), 8080);
5727 }
5728 if info.get_fullname().starts_with(instance_default) {
5729 found_default_on_custom = true;
5730 }
5731 }
5732 }
5733
5734 assert!(
5735 found_custom,
5736 "Custom port client should find service on custom port"
5737 );
5738 assert!(
5739 !found_default_on_custom,
5740 "Custom port client should NOT find service on default port"
5741 );
5742
5743 let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5746 let browse_default = client_default
5747 .browse(service_type)
5748 .expect("Failed to browse on default port");
5749
5750 let mut found_default = false;
5751 let mut found_custom_on_default = false;
5752
5753 while let Ok(event) = browse_default.recv_timeout(timeout) {
5754 if let ServiceEvent::ServiceResolved(info) = event {
5755 println!(
5756 "Default port client resolved: {} on port {}",
5757 info.get_fullname(),
5758 info.get_port()
5759 );
5760 if info.get_fullname().starts_with(instance_default) {
5761 found_default = true;
5762 assert_eq!(info.get_port(), 8081);
5763 }
5764 if info.get_fullname().starts_with(instance_custom) {
5765 found_custom_on_default = true;
5766 }
5767 }
5768 }
5769
5770 assert!(
5771 found_default,
5772 "Default port client should find service on default port"
5773 );
5774 assert!(
5775 !found_custom_on_default,
5776 "Default port client should NOT find service on custom port"
5777 );
5778
5779 server_custom.shutdown().unwrap();
5781 client_custom.shutdown().unwrap();
5782 server_default.shutdown().unwrap();
5783 client_default.shutdown().unwrap();
5784 }
5785}