1#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache, IpType},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42 Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50 cmp::{self, Reverse},
51 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52 fmt, io,
53 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54 str, thread,
55 time::Duration,
56 vec,
57};
58
59pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71pub const MDNS_PORT: u16 = 5353;
73
74const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
75const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
76const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
77
78const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
79
80#[derive(Debug)]
82pub enum UnregisterStatus {
83 OK,
85 NotFound,
87}
88
89#[derive(Debug, PartialEq, Clone, Eq)]
91#[non_exhaustive]
92pub enum DaemonStatus {
93 Running,
95
96 Shutdown,
98}
99
100#[derive(Hash, Eq, PartialEq)]
103enum Counter {
104 Register,
105 RegisterResend,
106 Unregister,
107 UnregisterResend,
108 Browse,
109 ResolveHostname,
110 Respond,
111 CacheRefreshPTR,
112 CacheRefreshSrvTxt,
113 CacheRefreshAddr,
114 KnownAnswerSuppression,
115 CachedPTR,
116 CachedSRV,
117 CachedAddr,
118 CachedTxt,
119 CachedNSec,
120 CachedSubtype,
121 DnsRegistryProbe,
122 DnsRegistryActive,
123 DnsRegistryTimer,
124 DnsRegistryNameChange,
125 Timer,
126}
127
128impl fmt::Display for Counter {
129 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
130 match self {
131 Self::Register => write!(f, "register"),
132 Self::RegisterResend => write!(f, "register-resend"),
133 Self::Unregister => write!(f, "unregister"),
134 Self::UnregisterResend => write!(f, "unregister-resend"),
135 Self::Browse => write!(f, "browse"),
136 Self::ResolveHostname => write!(f, "resolve-hostname"),
137 Self::Respond => write!(f, "respond"),
138 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
139 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
140 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
141 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
142 Self::CachedPTR => write!(f, "cached-ptr"),
143 Self::CachedSRV => write!(f, "cached-srv"),
144 Self::CachedAddr => write!(f, "cached-addr"),
145 Self::CachedTxt => write!(f, "cached-txt"),
146 Self::CachedNSec => write!(f, "cached-nsec"),
147 Self::CachedSubtype => write!(f, "cached-subtype"),
148 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
149 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
150 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
151 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
152 Self::Timer => write!(f, "timer"),
153 }
154 }
155}
156
157#[derive(Debug)]
158enum InternalError {
159 IntfAddrInvalid(Interface),
160}
161
162impl fmt::Display for InternalError {
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 match self {
165 InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
166 }
167 }
168}
169
170type MyResult<T> = core::result::Result<T, InternalError>;
171
172struct MyUdpSocket {
177 pktinfo: PktInfoUdpSocket,
180
181 mio: MioUdpSocket,
184}
185
186impl MyUdpSocket {
187 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
188 let std_sock = pktinfo.try_clone_std()?;
189 let mio = MioUdpSocket::from_std(std_sock);
190
191 Ok(Self { pktinfo, mio })
192 }
193}
194
195impl Source for MyUdpSocket {
197 fn register(
198 &mut self,
199 registry: &Registry,
200 token: Token,
201 interests: Interest,
202 ) -> io::Result<()> {
203 self.mio.register(registry, token, interests)
204 }
205
206 fn reregister(
207 &mut self,
208 registry: &Registry,
209 token: Token,
210 interests: Interest,
211 ) -> io::Result<()> {
212 self.mio.reregister(registry, token, interests)
213 }
214
215 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
216 self.mio.deregister(registry)
217 }
218}
219
220pub type Metrics = HashMap<String, i64>;
223
224const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
232pub struct ServiceDaemon {
233 sender: Sender<Command>,
235
236 signal_addr: SocketAddr,
242}
243
244impl ServiceDaemon {
245 pub fn new() -> Result<Self> {
251 Self::new_with_port(MDNS_PORT)
252 }
253
254 pub fn new_with_port(port: u16) -> Result<Self> {
277 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
280
281 let signal_sock = UdpSocket::bind(signal_addr)
282 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
283
284 let signal_addr = signal_sock
286 .local_addr()
287 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
288
289 signal_sock
291 .set_nonblocking(true)
292 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
293
294 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
295
296 let (sender, receiver) = bounded(100);
297
298 let mio_sock = MioUdpSocket::from_std(signal_sock);
300 let cmd_sender = sender.clone();
301 thread::Builder::new()
302 .name("mDNS_daemon".to_string())
303 .spawn(move || {
304 Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
305 })
306 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
307
308 Ok(Self {
309 sender,
310 signal_addr,
311 })
312 }
313
314 fn send_cmd(&self, cmd: Command) -> Result<()> {
317 let cmd_name = cmd.to_string();
318
319 self.sender.try_send(cmd).map_err(|e| match e {
321 TrySendError::Full(_) => Error::Again,
322 e => e_fmt!("flume::channel::send failed: {}", e),
323 })?;
324
325 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
327 let socket = UdpSocket::bind(addr)
328 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
329 socket
330 .send_to(cmd_name.as_bytes(), self.signal_addr)
331 .map_err(|e| {
332 e_fmt!(
333 "signal socket send_to {} ({}) failed: {}",
334 self.signal_addr,
335 cmd_name,
336 e
337 )
338 })?;
339
340 Ok(())
341 }
342
343 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
354 check_domain_suffix(service_type)?;
355
356 let (resp_s, resp_r) = bounded(10);
357 self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
358 Ok(resp_r)
359 }
360
361 pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
370 check_domain_suffix(service_type)?;
371
372 let (resp_s, resp_r) = bounded(10);
373 self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
374 Ok(resp_r)
375 }
376
377 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
382 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
383 }
384
385 pub fn resolve_hostname(
393 &self,
394 hostname: &str,
395 timeout: Option<u64>,
396 ) -> Result<Receiver<HostnameResolutionEvent>> {
397 check_hostname(hostname)?;
398 let (resp_s, resp_r) = bounded(10);
399 self.send_cmd(Command::ResolveHostname(
400 hostname.to_string(),
401 1,
402 resp_s,
403 timeout,
404 ))?;
405 Ok(resp_r)
406 }
407
408 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
413 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
414 }
415
416 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
424 check_service_name(service_info.get_fullname())?;
425 check_hostname(service_info.get_hostname())?;
426
427 self.send_cmd(Command::Register(service_info.into()))
428 }
429
430 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
438 let (resp_s, resp_r) = bounded(1);
439 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
440 Ok(resp_r)
441 }
442
443 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
447 let (resp_s, resp_r) = bounded(100);
448 self.send_cmd(Command::Monitor(resp_s))?;
449 Ok(resp_r)
450 }
451
452 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
457 let (resp_s, resp_r) = bounded(1);
458 self.send_cmd(Command::Exit(resp_s))?;
459 Ok(resp_r)
460 }
461
462 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
468 let (resp_s, resp_r) = bounded(1);
469
470 if self.sender.is_disconnected() {
471 resp_s
472 .send(DaemonStatus::Shutdown)
473 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
474 } else {
475 self.send_cmd(Command::GetStatus(resp_s))?;
476 }
477
478 Ok(resp_r)
479 }
480
481 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
486 let (resp_s, resp_r) = bounded(1);
487 self.send_cmd(Command::GetMetrics(resp_s))?;
488 Ok(resp_r)
489 }
490
491 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
498 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
501 return Err(Error::Msg(format!(
502 "service name length max {len_max} is too large"
503 )));
504 }
505
506 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
507 }
508
509 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
515 let interval_in_millis = interval_in_secs as u64 * 1000;
516 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
517 interval_in_millis,
518 )))
519 }
520
521 pub fn get_ip_check_interval(&self) -> Result<u32> {
523 let (resp_s, resp_r) = bounded(1);
524 self.send_cmd(Command::GetOption(resp_s))?;
525
526 let option = resp_r
527 .recv_timeout(Duration::from_secs(10))
528 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
529 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
530 Ok(ip_check_interval_in_secs as u32)
531 }
532
533 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
540 let if_kind_vec = if_kind.into_vec();
541 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
542 if_kind_vec.kinds,
543 )))
544 }
545
546 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
553 let if_kind_vec = if_kind.into_vec();
554 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
555 if_kind_vec.kinds,
556 )))
557 }
558
559 pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
570 self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
571 }
572
573 pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
576 self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
577 }
578
579 #[cfg(test)]
580 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
581 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
582 ifname.to_string(),
583 )))
584 }
585
586 #[cfg(test)]
587 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
588 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
589 ifname.to_string(),
590 )))
591 }
592
593 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
609 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
610 }
611
612 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
628 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
629 }
630
631 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
644 self.send_cmd(Command::Verify(instance_fullname, timeout))
645 }
646
647 fn daemon_thread(
648 signal_sock: MioUdpSocket,
649 poller: Poll,
650 receiver: Receiver<Command>,
651 port: u16,
652 cmd_sender: Sender<Command>,
653 signal_addr: SocketAddr,
654 ) {
655 let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
656
657 if let Some(cmd) = zc.run(receiver) {
658 match cmd {
659 Command::Exit(resp_s) => {
660 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
663 debug!("exit: failed to send response of shutdown: {}", e);
664 }
665 }
666 _ => {
667 debug!("Unexpected command: {:?}", cmd);
668 }
669 }
670 }
671 }
672}
673
674fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
676 let intf_ip = &intf.ip();
679 match intf_ip {
680 IpAddr::V4(ip) => {
681 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
682 let sock = new_socket(addr.into(), true)?;
683
684 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
686 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
687
688 sock.set_multicast_if_v4(ip)
690 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
691
692 sock.set_multicast_ttl_v4(255)
697 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
698
699 if !should_loop {
700 sock.set_multicast_loop_v4(false)
701 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
702 }
703
704 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
706 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
707 for packet in test_packets {
708 sock.send_to(&packet, &multicast_addr)
709 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
710 }
711 MyUdpSocket::new(sock)
712 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
713 }
714 IpAddr::V6(ip) => {
715 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
716 let sock = new_socket(addr.into(), true)?;
717
718 let if_index = intf.index.unwrap_or(0);
719
720 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
722 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
723
724 sock.set_multicast_if_v6(if_index)
726 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
727
728 MyUdpSocket::new(sock)
733 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
734 }
735 }
736}
737
738fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
741 let domain = match addr {
742 SocketAddr::V4(_) => socket2::Domain::IPV4,
743 SocketAddr::V6(_) => socket2::Domain::IPV6,
744 };
745
746 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
747
748 fd.set_reuse_address(true)
749 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
750 #[cfg(unix)]
751 if let Err(e) = fd.set_reuse_port(true) {
752 debug!(
753 "SO_REUSEPORT is not supported, continuing without it: {}",
754 e
755 );
756 }
757
758 if non_block {
759 fd.set_nonblocking(true)
760 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
761 }
762
763 fd.bind(&addr.into())
764 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
765
766 trace!("new socket bind to {}", &addr);
767 Ok(fd)
768}
769
770struct ReRun {
772 next_time: u64,
774 command: Command,
775}
776
777#[derive(Debug, Clone)]
781#[non_exhaustive]
782pub enum IfKind {
783 All,
785
786 IPv4,
788
789 IPv6,
791
792 Name(String),
794
795 Addr(IpAddr),
799
800 LoopbackV4,
804
805 LoopbackV6,
807
808 IndexV4(u32),
810
811 IndexV6(u32),
813}
814
815impl IfKind {
816 fn matches(&self, intf: &Interface) -> bool {
818 match self {
819 Self::All => true,
820 Self::IPv4 => intf.ip().is_ipv4(),
821 Self::IPv6 => intf.ip().is_ipv6(),
822 Self::Name(ifname) => ifname == &intf.name,
823 Self::Addr(addr) => addr == &intf.ip(),
824 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
825 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
826 Self::IndexV4(idx) => intf.index == Some(*idx) && intf.ip().is_ipv4(),
827 Self::IndexV6(idx) => intf.index == Some(*idx) && intf.ip().is_ipv6(),
828 }
829 }
830}
831
832impl From<&str> for IfKind {
835 fn from(val: &str) -> Self {
836 Self::Name(val.to_string())
837 }
838}
839
840impl From<&String> for IfKind {
841 fn from(val: &String) -> Self {
842 Self::Name(val.to_string())
843 }
844}
845
846impl From<IpAddr> for IfKind {
848 fn from(val: IpAddr) -> Self {
849 Self::Addr(val)
850 }
851}
852
853pub struct IfKindVec {
855 kinds: Vec<IfKind>,
856}
857
858pub trait IntoIfKindVec {
860 fn into_vec(self) -> IfKindVec;
861}
862
863impl<T: Into<IfKind>> IntoIfKindVec for T {
864 fn into_vec(self) -> IfKindVec {
865 let if_kind: IfKind = self.into();
866 IfKindVec {
867 kinds: vec![if_kind],
868 }
869 }
870}
871
872impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
873 fn into_vec(self) -> IfKindVec {
874 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
875 IfKindVec { kinds }
876 }
877}
878
879struct IfSelection {
881 if_kind: IfKind,
883
884 selected: bool,
886}
887
888struct Zeroconf {
890 port: u16,
893
894 my_intfs: HashMap<u32, MyIntf>,
896
897 ipv4_sock: Option<MyUdpSocket>,
899
900 ipv6_sock: Option<MyUdpSocket>,
902
903 my_services: HashMap<String, ServiceInfo>,
905
906 cache: DnsCache,
908
909 dns_registry_map: HashMap<u32, DnsRegistry>,
911
912 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
923
924 counters: Metrics,
925
926 poller: Poll,
928
929 monitors: Vec<Sender<DaemonEvent>>,
931
932 service_name_len_max: u8,
934
935 ip_check_interval: u64,
937
938 if_selections: Vec<IfSelection>,
940
941 signal_sock: MioUdpSocket,
943
944 timers: BinaryHeap<Reverse<u64>>,
950
951 status: DaemonStatus,
952
953 pending_resolves: HashSet<String>,
955
956 resolved: HashSet<String>,
958
959 multicast_loop_v4: bool,
960
961 multicast_loop_v6: bool,
962
963 accept_unsolicited: bool,
964
965 include_apple_p2p: bool,
966
967 cmd_sender: Sender<Command>,
968
969 signal_addr: SocketAddr,
970
971 #[cfg(test)]
972 test_down_interfaces: HashSet<String>,
973}
974
975fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
977 let intf_ip = &intf.ip();
978 match intf_ip {
979 IpAddr::V4(ip) => {
980 debug!("join multicast group V4 on {} addr {ip}", intf.name);
982 my_sock
983 .join_multicast_v4(&GROUP_ADDR_V4, ip)
984 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
985 }
986 IpAddr::V6(ip) => {
987 let if_index = intf.index.unwrap_or(0);
988 debug!(
990 "join multicast group V6 on {} addr {ip} with index {if_index}",
991 intf.name
992 );
993 my_sock
994 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
995 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
996 }
997 }
998 Ok(())
999}
1000
1001impl Zeroconf {
1002 fn new(
1003 signal_sock: MioUdpSocket,
1004 poller: Poll,
1005 port: u16,
1006 cmd_sender: Sender<Command>,
1007 signal_addr: SocketAddr,
1008 ) -> Self {
1009 let my_ifaddrs = my_ip_interfaces(true);
1011
1012 let mut my_intfs = HashMap::new();
1016 let mut dns_registry_map = HashMap::new();
1017
1018 let mut ipv4_sock = None;
1021 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1022 match new_socket(addr.into(), true) {
1023 Ok(sock) => {
1024 sock.set_multicast_ttl_v4(255)
1029 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1030 .ok();
1031
1032 ipv4_sock = match MyUdpSocket::new(sock) {
1034 Ok(s) => Some(s),
1035 Err(e) => {
1036 debug!("failed to create IPv4 MyUdpSocket: {e}");
1037 None
1038 }
1039 };
1040 }
1041 Err(e) => debug!("failed to create IPv4 socket: {e}"),
1043 }
1044
1045 let mut ipv6_sock = None;
1046 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1047 match new_socket(addr.into(), true) {
1048 Ok(sock) => {
1049 sock.set_multicast_hops_v6(255)
1053 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1054 .ok();
1055
1056 ipv6_sock = match MyUdpSocket::new(sock) {
1058 Ok(s) => Some(s),
1059 Err(e) => {
1060 debug!("failed to create IPv6 MyUdpSocket: {e}");
1061 None
1062 }
1063 };
1064 }
1065 Err(e) => debug!("failed to create IPv6 socket: {e}"),
1066 }
1067
1068 for intf in my_ifaddrs {
1070 let sock_opt = if intf.ip().is_ipv4() {
1071 &ipv4_sock
1072 } else {
1073 &ipv6_sock
1074 };
1075 let Some(sock) = sock_opt else {
1076 debug!(
1077 "no socket available for interface {} with addr {}. Skipped.",
1078 intf.name,
1079 intf.ip()
1080 );
1081 continue;
1082 };
1083
1084 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1085 debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1086 }
1087
1088 let if_index = intf.index.unwrap_or(0);
1089
1090 dns_registry_map
1092 .entry(if_index)
1093 .or_insert_with(DnsRegistry::new);
1094
1095 my_intfs
1096 .entry(if_index)
1097 .and_modify(|v: &mut MyIntf| {
1098 v.addrs.insert(intf.addr.clone());
1099 })
1100 .or_insert(MyIntf {
1101 name: intf.name.clone(),
1102 index: if_index,
1103 addrs: HashSet::from([intf.addr]),
1104 });
1105 }
1106
1107 let monitors = Vec::new();
1108 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1109 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1110
1111 let timers = BinaryHeap::new();
1112
1113 let if_selections = vec![];
1115
1116 let status = DaemonStatus::Running;
1117
1118 Self {
1119 port,
1120 my_intfs,
1121 ipv4_sock,
1122 ipv6_sock,
1123 my_services: HashMap::new(),
1124 cache: DnsCache::new(),
1125 dns_registry_map,
1126 hostname_resolvers: HashMap::new(),
1127 service_queriers: HashMap::new(),
1128 retransmissions: Vec::new(),
1129 counters: HashMap::new(),
1130 poller,
1131 monitors,
1132 service_name_len_max,
1133 ip_check_interval,
1134 if_selections,
1135 signal_sock,
1136 timers,
1137 status,
1138 pending_resolves: HashSet::new(),
1139 resolved: HashSet::new(),
1140 multicast_loop_v4: true,
1141 multicast_loop_v6: true,
1142 accept_unsolicited: false,
1143 include_apple_p2p: false,
1144 cmd_sender,
1145 signal_addr,
1146
1147 #[cfg(test)]
1148 test_down_interfaces: HashSet::new(),
1149 }
1150 }
1151
1152 fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1154 let cmd_name = cmd.to_string();
1155
1156 self.cmd_sender.try_send(cmd).map_err(|e| match e {
1157 TrySendError::Full(_) => Error::Again,
1158 e => e_fmt!("flume::channel::send failed: {}", e),
1159 })?;
1160
1161 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1162 let socket = UdpSocket::bind(addr)
1163 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1164 socket
1165 .send_to(cmd_name.as_bytes(), self.signal_addr)
1166 .map_err(|e| {
1167 e_fmt!(
1168 "signal socket send_to {} ({}) failed: {}",
1169 self.signal_addr,
1170 cmd_name,
1171 e
1172 )
1173 })?;
1174
1175 Ok(())
1176 }
1177
1178 fn cleanup(&mut self) {
1186 debug!("Starting cleanup for shutdown");
1187
1188 let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1190 for fullname in service_names {
1191 if let Some(info) = self.my_services.get(&fullname) {
1192 debug!("Unregistering service during shutdown: {}", &fullname);
1193
1194 for intf in self.my_intfs.values() {
1195 if let Some(sock) = self.ipv4_sock.as_ref() {
1196 self.unregister_service(info, intf, &sock.pktinfo);
1197 }
1198
1199 if let Some(sock) = self.ipv6_sock.as_ref() {
1200 self.unregister_service(info, intf, &sock.pktinfo);
1201 }
1202 }
1203 }
1204 }
1205 self.my_services.clear();
1206
1207 let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1209 for ty_domain in browse_types {
1210 debug!("Stopping browse during shutdown: {}", &ty_domain);
1211 if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1212 if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1214 debug!("Failed to send SearchStopped during shutdown: {}", e);
1215 }
1216 }
1217 }
1218
1219 let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1221 for hostname in hostnames {
1222 debug!(
1223 "Stopping hostname resolution during shutdown: {}",
1224 &hostname
1225 );
1226 if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1227 if let Err(e) =
1229 sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1230 {
1231 debug!(
1232 "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1233 e
1234 );
1235 }
1236 }
1237 }
1238
1239 self.retransmissions.clear();
1241
1242 debug!("Cleanup completed");
1243 }
1244
1245 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1254 if let Err(e) = self.poller.registry().register(
1256 &mut self.signal_sock,
1257 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1258 mio::Interest::READABLE,
1259 ) {
1260 debug!("failed to add signal socket to the poller: {}", e);
1261 return None;
1262 }
1263
1264 if let Some(sock) = self.ipv4_sock.as_mut() {
1265 if let Err(e) = self.poller.registry().register(
1266 sock,
1267 mio::Token(IPV4_SOCK_EVENT_KEY),
1268 mio::Interest::READABLE,
1269 ) {
1270 debug!("failed to register ipv4 socket: {}", e);
1271 return None;
1272 }
1273 }
1274
1275 if let Some(sock) = self.ipv6_sock.as_mut() {
1276 if let Err(e) = self.poller.registry().register(
1277 sock,
1278 mio::Token(IPV6_SOCK_EVENT_KEY),
1279 mio::Interest::READABLE,
1280 ) {
1281 debug!("failed to register ipv6 socket: {}", e);
1282 return None;
1283 }
1284 }
1285
1286 let mut next_ip_check = if self.ip_check_interval > 0 {
1288 current_time_millis() + self.ip_check_interval
1289 } else {
1290 0
1291 };
1292
1293 if next_ip_check > 0 {
1294 self.add_timer(next_ip_check);
1295 }
1296
1297 let mut events = mio::Events::with_capacity(1024);
1300 loop {
1301 let now = current_time_millis();
1302
1303 let earliest_timer = self.peek_earliest_timer();
1304 let timeout = earliest_timer.map(|timer| {
1305 let millis = if timer > now { timer - now } else { 1 };
1307 Duration::from_millis(millis)
1308 });
1309
1310 events.clear();
1312 match self.poller.poll(&mut events, timeout) {
1313 Ok(_) => self.handle_poller_events(&events),
1314 Err(e) => debug!("failed to select from sockets: {}", e),
1315 }
1316
1317 let now = current_time_millis();
1318
1319 self.pop_timers_till(now);
1321
1322 for hostname in self
1324 .hostname_resolvers
1325 .clone()
1326 .into_iter()
1327 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1328 .map(|(hostname, _)| hostname)
1329 {
1330 trace!("hostname resolver timeout for {}", &hostname);
1331 call_hostname_resolution_listener(
1332 &self.hostname_resolvers,
1333 &hostname,
1334 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1335 );
1336 call_hostname_resolution_listener(
1337 &self.hostname_resolvers,
1338 &hostname,
1339 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1340 );
1341 self.hostname_resolvers.remove(&hostname);
1342 }
1343
1344 while let Ok(command) = receiver.try_recv() {
1346 if matches!(command, Command::Exit(_)) {
1347 debug!("Exit command received, performing cleanup");
1348 self.cleanup();
1349 self.status = DaemonStatus::Shutdown;
1350 return Some(command);
1351 }
1352 self.exec_command(command, false);
1353 }
1354
1355 let mut i = 0;
1357 while i < self.retransmissions.len() {
1358 if now >= self.retransmissions[i].next_time {
1359 let rerun = self.retransmissions.remove(i);
1360 self.exec_command(rerun.command, true);
1361 } else {
1362 i += 1;
1363 }
1364 }
1365
1366 self.refresh_active_services();
1368
1369 let mut query_count = 0;
1371 for (hostname, _sender) in self.hostname_resolvers.iter() {
1372 for (hostname, ip_addr) in
1373 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1374 {
1375 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1376 query_count += 1;
1377 }
1378 }
1379
1380 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1381
1382 let now = current_time_millis();
1384
1385 let expired_services = self.cache.evict_expired_services(now);
1387 if !expired_services.is_empty() {
1388 debug!(
1389 "run: send {} service removal to listeners",
1390 expired_services.len()
1391 );
1392 self.notify_service_removal(expired_services);
1393 }
1394
1395 let expired_addrs = self.cache.evict_expired_addr(now);
1397 for (hostname, addrs) in expired_addrs {
1398 call_hostname_resolution_listener(
1399 &self.hostname_resolvers,
1400 &hostname,
1401 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1402 );
1403 let instances = self.cache.get_instances_on_host(&hostname);
1404 let instance_set: HashSet<String> = instances.into_iter().collect();
1405 self.resolve_updated_instances(&instance_set);
1406 }
1407
1408 self.probing_handler();
1410
1411 if now >= next_ip_check && next_ip_check > 0 {
1413 next_ip_check = now + self.ip_check_interval;
1414 self.add_timer(next_ip_check);
1415
1416 self.check_ip_changes();
1417 }
1418 }
1419 }
1420
1421 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1422 match daemon_opt {
1423 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1424 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1425 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1426 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1427 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1428 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1429 DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1430 DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1431 #[cfg(test)]
1432 DaemonOption::TestDownInterface(ifname) => {
1433 self.test_down_interfaces.insert(ifname);
1434 }
1435 #[cfg(test)]
1436 DaemonOption::TestUpInterface(ifname) => {
1437 self.test_down_interfaces.remove(&ifname);
1438 }
1439 }
1440 }
1441
1442 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1443 debug!("enable_interface: {:?}", kinds);
1444 let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1445
1446 for if_kind in kinds {
1447 self.if_selections.push(IfSelection {
1448 if_kind: resolve_addr_to_index(if_kind, &interfaces),
1449 selected: true,
1450 });
1451 }
1452
1453 self.apply_intf_selections(interfaces);
1454 }
1455
1456 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1457 debug!("disable_interface: {:?}", kinds);
1458 let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1459
1460 for if_kind in kinds {
1461 self.if_selections.push(IfSelection {
1462 if_kind: resolve_addr_to_index(if_kind, &interfaces),
1463 selected: false,
1464 });
1465 }
1466
1467 self.apply_intf_selections(interfaces);
1468 }
1469
1470 fn set_multicast_loop_v4(&mut self, on: bool) {
1471 let Some(sock) = self.ipv4_sock.as_mut() else {
1472 return;
1473 };
1474 self.multicast_loop_v4 = on;
1475 sock.pktinfo
1476 .set_multicast_loop_v4(on)
1477 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1478 .unwrap();
1479 }
1480
1481 fn set_multicast_loop_v6(&mut self, on: bool) {
1482 let Some(sock) = self.ipv6_sock.as_mut() else {
1483 return;
1484 };
1485 self.multicast_loop_v6 = on;
1486 sock.pktinfo
1487 .set_multicast_loop_v6(on)
1488 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1489 .unwrap();
1490 }
1491
1492 fn set_accept_unsolicited(&mut self, accept: bool) {
1493 self.accept_unsolicited = accept;
1494 }
1495
1496 fn set_apple_p2p(&mut self, include: bool) {
1497 if self.include_apple_p2p != include {
1498 self.include_apple_p2p = include;
1499 self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1500 }
1501 }
1502
1503 fn notify_monitors(&mut self, event: DaemonEvent) {
1504 self.monitors.retain(|sender| {
1506 if let Err(e) = sender.try_send(event.clone()) {
1507 debug!("notify_monitors: try_send: {}", &e);
1508 if matches!(e, TrySendError::Disconnected(_)) {
1509 return false; }
1511 }
1512 true
1513 });
1514 }
1515
1516 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1518 for (_, service_info) in self.my_services.iter_mut() {
1519 if service_info.is_addr_auto() {
1520 service_info.remove_ipaddr(addr);
1521 }
1522 }
1523 }
1524
1525 fn add_timer(&mut self, next_time: u64) {
1526 self.timers.push(Reverse(next_time));
1527 }
1528
1529 fn peek_earliest_timer(&self) -> Option<u64> {
1530 self.timers.peek().map(|Reverse(v)| *v)
1531 }
1532
1533 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1534 self.timers.pop().map(|Reverse(v)| v)
1535 }
1536
1537 fn pop_timers_till(&mut self, now: u64) {
1539 while let Some(Reverse(v)) = self.timers.peek() {
1540 if *v > now {
1541 break;
1542 }
1543 self.timers.pop();
1544 }
1545 }
1546
1547 fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1549 let intf_count = interfaces.len();
1550 let mut intf_selections = vec![true; intf_count];
1551
1552 for selection in self.if_selections.iter() {
1554 for i in 0..intf_count {
1556 if selection.if_kind.matches(&interfaces[i]) {
1557 intf_selections[i] = selection.selected;
1558 }
1559 }
1560 }
1561
1562 let mut selected_addrs = HashSet::new();
1563 for i in 0..intf_count {
1564 if intf_selections[i] {
1565 selected_addrs.insert(interfaces[i].clone());
1566 }
1567 }
1568
1569 selected_addrs
1570 }
1571
1572 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1577 let intf_count = interfaces.len();
1579 let mut intf_selections = vec![true; intf_count];
1580
1581 for selection in self.if_selections.iter() {
1583 for i in 0..intf_count {
1585 if selection.if_kind.matches(&interfaces[i]) {
1586 intf_selections[i] = selection.selected;
1587 }
1588 }
1589 }
1590
1591 for (idx, intf) in interfaces.into_iter().enumerate() {
1593 if intf_selections[idx] {
1594 self.add_interface(intf);
1596 } else {
1597 self.del_interface_addr(&intf);
1599 }
1600 }
1601 }
1602
1603 fn del_ip(&mut self, ip: IpAddr) {
1604 self.del_addr_in_my_services(&ip);
1605 self.notify_monitors(DaemonEvent::IpDel(ip));
1606 }
1607
1608 fn check_ip_changes(&mut self) {
1610 let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1612
1613 #[cfg(test)]
1614 let my_ifaddrs: Vec<_> = my_ifaddrs
1615 .into_iter()
1616 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1617 .collect();
1618
1619 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1620 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1621 let if_index = intf.index.unwrap_or(0);
1622 acc.entry(if_index).or_default().push(&intf.addr);
1623 acc
1624 });
1625
1626 let mut deleted_intfs = Vec::new();
1627 let mut deleted_ips = Vec::new();
1628
1629 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1630 let mut last_ipv4 = None;
1631 let mut last_ipv6 = None;
1632
1633 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1634 my_intf.addrs.retain(|addr| {
1635 if current_addrs.contains(&addr) {
1636 true
1637 } else {
1638 match addr.ip() {
1639 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1640 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1641 }
1642 deleted_ips.push(addr.ip());
1643 false
1644 }
1645 });
1646 if my_intf.addrs.is_empty() {
1647 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1648 }
1649 } else {
1650 debug!(
1652 "check_ip_changes: interface {} ({}) no longer exists, removing",
1653 my_intf.name, if_index
1654 );
1655 for addr in my_intf.addrs.iter() {
1656 match addr.ip() {
1657 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1658 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1659 }
1660 deleted_ips.push(addr.ip())
1661 }
1662 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1663 }
1664 }
1665
1666 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1667 debug!(
1668 "check_ip_changes: {} deleted ips {} deleted intfs",
1669 deleted_ips.len(),
1670 deleted_intfs.len()
1671 );
1672 }
1673
1674 for ip in deleted_ips {
1675 self.del_ip(ip);
1676 }
1677
1678 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1679 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1680 continue;
1681 };
1682
1683 if let Some(ipv4) = last_ipv4 {
1684 debug!("leave multicast for {ipv4}");
1685 if let Some(sock) = self.ipv4_sock.as_mut() {
1686 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1687 debug!("leave multicast group for addr {ipv4}: {e}");
1688 }
1689 }
1690 }
1691
1692 if let Some(ipv6) = last_ipv6 {
1693 debug!("leave multicast for {ipv6}");
1694 if let Some(sock) = self.ipv6_sock.as_mut() {
1695 if let Err(e) = sock
1696 .pktinfo
1697 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1698 {
1699 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1700 }
1701 }
1702 }
1703
1704 let intf_id = InterfaceId {
1706 name: my_intf.name.to_string(),
1707 index: my_intf.index,
1708 };
1709 let removed_instances = self.cache.remove_records_on_intf(intf_id);
1710 self.notify_service_removal(removed_instances);
1711 }
1712
1713 self.apply_intf_selections(my_ifaddrs);
1715 }
1716
1717 fn del_interface_addr(&mut self, intf: &Interface) {
1720 let if_index = intf.index.unwrap_or(0);
1721 debug!(
1722 "del_interface_addr: {} ({if_index}) addr {}",
1723 intf.name,
1724 intf.ip()
1725 );
1726
1727 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1728 debug!("del_interface_addr: interface {} not found", intf.name);
1729 return;
1730 };
1731
1732 let mut ip_removed = false;
1733
1734 if my_intf.addrs.remove(&intf.addr) {
1735 ip_removed = true;
1736
1737 match intf.addr.ip() {
1738 IpAddr::V4(ipv4) => {
1739 if my_intf.next_ifaddr_v4().is_none() {
1740 if let Some(sock) = self.ipv4_sock.as_mut() {
1741 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1742 debug!("leave multicast group for addr {ipv4}: {e}");
1743 } else {
1744 debug!("leave multicast for {ipv4}");
1745 }
1746 }
1747 }
1748 }
1749
1750 IpAddr::V6(ipv6) => {
1751 if my_intf.next_ifaddr_v6().is_none() {
1752 if let Some(sock) = self.ipv6_sock.as_mut() {
1753 if let Err(e) =
1754 sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1755 {
1756 debug!("leave multicast group for addr {ipv6}: {e}");
1757 }
1758 }
1759 }
1760 }
1761 }
1762
1763 if my_intf.addrs.is_empty() {
1764 debug!("del_interface_addr: removing interface {}", intf.name);
1766 self.my_intfs.remove(&if_index);
1767 self.dns_registry_map.remove(&if_index);
1768 self.cache
1769 .remove_addrs_on_disabled_intf(if_index, IpType::BOTH);
1770 } else {
1771 let is_v4 = intf.addr.ip().is_ipv4();
1775 let version_gone = if is_v4 {
1776 my_intf.next_ifaddr_v4().is_none()
1777 } else {
1778 my_intf.next_ifaddr_v6().is_none()
1779 };
1780 if version_gone {
1781 let ip_type = if is_v4 { IpType::V4 } else { IpType::V6 };
1782 self.cache.remove_addrs_on_disabled_intf(if_index, ip_type);
1783 }
1784 }
1785 }
1786
1787 if ip_removed {
1788 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1790 self.del_addr_in_my_services(&intf.ip());
1792 }
1793 }
1794
1795 fn add_interface(&mut self, intf: Interface) {
1796 let sock_opt = if intf.ip().is_ipv4() {
1797 &self.ipv4_sock
1798 } else {
1799 &self.ipv6_sock
1800 };
1801
1802 let Some(sock) = sock_opt else {
1803 debug!(
1804 "add_interface: no socket available for interface {} with addr {}. Skipped.",
1805 intf.name,
1806 intf.ip()
1807 );
1808 return;
1809 };
1810
1811 let if_index = intf.index.unwrap_or(0);
1812 let mut new_addr = false;
1813
1814 match self.my_intfs.entry(if_index) {
1815 Entry::Occupied(mut entry) => {
1816 let my_intf = entry.get_mut();
1818 if !my_intf.addrs.contains(&intf.addr) {
1819 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1820 debug!("add_interface: socket_config {}: {e}", &intf.name);
1821 }
1822 my_intf.addrs.insert(intf.addr.clone());
1823 new_addr = true;
1824 }
1825 }
1826 Entry::Vacant(entry) => {
1827 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1828 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1829 return;
1830 }
1831
1832 new_addr = true;
1833 let new_intf = MyIntf {
1834 name: intf.name.clone(),
1835 index: if_index,
1836 addrs: HashSet::from([intf.addr.clone()]),
1837 };
1838 entry.insert(new_intf);
1839 }
1840 }
1841
1842 if !new_addr {
1843 trace!("add_interface: interface {} already exists", &intf.name);
1844 return;
1845 }
1846
1847 debug!("add new interface {}: {}", intf.name, intf.ip());
1848
1849 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1850 debug!("add_interface: cannot find if_index {if_index}");
1851 return;
1852 };
1853
1854 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1855 Some(registry) => registry,
1856 None => self
1857 .dns_registry_map
1858 .entry(if_index)
1859 .or_insert_with(DnsRegistry::new),
1860 };
1861
1862 for (_, service_info) in self.my_services.iter_mut() {
1863 if service_info.is_addr_auto() {
1864 service_info.insert_ipaddr(&intf);
1865
1866 if let Ok(true) = announce_service_on_intf(
1867 dns_registry,
1868 service_info,
1869 my_intf,
1870 &sock.pktinfo,
1871 self.port,
1872 ) {
1873 debug!(
1874 "Announce service {} on {}",
1875 service_info.get_fullname(),
1876 intf.ip()
1877 );
1878 service_info.set_status(if_index, ServiceStatus::Announced);
1879 } else {
1880 for timer in dns_registry.new_timers.drain(..) {
1881 self.timers.push(Reverse(timer));
1882 }
1883 service_info.set_status(if_index, ServiceStatus::Probing);
1884 }
1885 }
1886 }
1887
1888 let mut browse_reruns = Vec::new();
1890 let mut i = 0;
1891 while i < self.retransmissions.len() {
1892 if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1893 browse_reruns.push(self.retransmissions.remove(i));
1894 } else {
1895 i += 1;
1896 }
1897 }
1898
1899 for rerun in browse_reruns {
1900 self.exec_command(rerun.command, true);
1901 }
1902
1903 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1905 }
1906
1907 fn register_service(&mut self, mut info: ServiceInfo) {
1916 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1918 error!("check_service_name_length: {}", &e);
1919 self.notify_monitors(DaemonEvent::Error(e));
1920 return;
1921 }
1922
1923 if info.is_addr_auto() {
1924 let selected_intfs =
1925 self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
1926 for intf in selected_intfs {
1927 info.insert_ipaddr(&intf);
1928 }
1929 }
1930
1931 debug!("register service {:?}", &info);
1932
1933 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1934 if !outgoing_addrs.is_empty() {
1935 self.notify_monitors(DaemonEvent::Announce(
1936 info.get_fullname().to_string(),
1937 format!("{:?}", &outgoing_addrs),
1938 ));
1939 }
1940
1941 let service_fullname = info.get_fullname().to_lowercase();
1944 self.my_services.insert(service_fullname, info);
1945 }
1946
1947 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1950 let mut outgoing_addrs = Vec::new();
1951 let mut outgoing_intfs = HashSet::new();
1952
1953 let mut invalid_intf_addrs = HashSet::new();
1954
1955 for (if_index, intf) in self.my_intfs.iter() {
1956 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1957 Some(registry) => registry,
1958 None => self
1959 .dns_registry_map
1960 .entry(*if_index)
1961 .or_insert_with(DnsRegistry::new),
1962 };
1963
1964 let mut announced = false;
1965
1966 if let Some(sock) = self.ipv4_sock.as_mut() {
1968 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1969 Ok(true) => {
1970 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1971 outgoing_addrs.push(addr.ip());
1972 }
1973 outgoing_intfs.insert(intf.index);
1974
1975 debug!(
1976 "Announce service IPv4 {} on {}",
1977 info.get_fullname(),
1978 intf.name
1979 );
1980 announced = true;
1981 }
1982 Ok(false) => {}
1983 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
1984 invalid_intf_addrs.insert(intf_addr);
1985 }
1986 }
1987 }
1988
1989 if let Some(sock) = self.ipv6_sock.as_mut() {
1990 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1991 Ok(true) => {
1992 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1993 outgoing_addrs.push(addr.ip());
1994 }
1995 outgoing_intfs.insert(intf.index);
1996
1997 debug!(
1998 "Announce service IPv6 {} on {}",
1999 info.get_fullname(),
2000 intf.name
2001 );
2002 announced = true;
2003 }
2004 Ok(false) => {}
2005 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2006 invalid_intf_addrs.insert(intf_addr);
2007 }
2008 }
2009 }
2010
2011 if announced {
2012 info.set_status(intf.index, ServiceStatus::Announced);
2013 } else {
2014 for timer in dns_registry.new_timers.drain(..) {
2015 self.timers.push(Reverse(timer));
2016 }
2017 info.set_status(*if_index, ServiceStatus::Probing);
2018 }
2019 }
2020
2021 if !invalid_intf_addrs.is_empty() {
2022 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2023 }
2024
2025 let next_time = current_time_millis() + 1000;
2029 for if_index in outgoing_intfs {
2030 self.add_retransmission(
2031 next_time,
2032 Command::RegisterResend(info.get_fullname().to_string(), if_index),
2033 );
2034 }
2035
2036 outgoing_addrs
2037 }
2038
2039 fn probing_handler(&mut self) {
2041 let now = current_time_millis();
2042 let mut invalid_intf_addrs = HashSet::new();
2043
2044 for (if_index, intf) in self.my_intfs.iter() {
2045 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2046 continue;
2047 };
2048
2049 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2050
2051 if !out.questions().is_empty() {
2053 trace!("sending out probing of questions: {:?}", out.questions());
2054 if let Some(sock) = self.ipv4_sock.as_mut() {
2055 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2056 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2057 {
2058 invalid_intf_addrs.insert(intf_addr);
2059 }
2060 }
2061 if let Some(sock) = self.ipv6_sock.as_mut() {
2062 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2063 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2064 {
2065 invalid_intf_addrs.insert(intf_addr);
2066 }
2067 }
2068 }
2069
2070 let waiting_services =
2072 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2073
2074 for service_name in waiting_services {
2075 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2077 if info.get_status(*if_index) == ServiceStatus::Announced {
2078 debug!("service {} already announced", info.get_fullname());
2079 continue;
2080 }
2081
2082 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2083 match announce_service_on_intf(
2084 dns_registry,
2085 info,
2086 intf,
2087 &sock.pktinfo,
2088 self.port,
2089 ) {
2090 Ok(announced) => announced,
2091 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2092 invalid_intf_addrs.insert(intf_addr);
2093 false
2094 }
2095 }
2096 } else {
2097 false
2098 };
2099 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2100 match announce_service_on_intf(
2101 dns_registry,
2102 info,
2103 intf,
2104 &sock.pktinfo,
2105 self.port,
2106 ) {
2107 Ok(announced) => announced,
2108 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2109 invalid_intf_addrs.insert(intf_addr);
2110 false
2111 }
2112 }
2113 } else {
2114 false
2115 };
2116
2117 if announced_v4 || announced_v6 {
2118 let next_time = now + 1000;
2119 let command =
2120 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2121 self.retransmissions.push(ReRun { next_time, command });
2122 self.timers.push(Reverse(next_time));
2123
2124 let fullname = match dns_registry.name_changes.get(&service_name) {
2125 Some(new_name) => new_name.to_string(),
2126 None => service_name.to_string(),
2127 };
2128
2129 let mut hostname = info.get_hostname();
2130 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
2131 hostname = new_name;
2132 }
2133
2134 debug!("wake up: announce service {} on {}", fullname, intf.name);
2135 notify_monitors(
2136 &mut self.monitors,
2137 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2138 );
2139
2140 info.set_status(*if_index, ServiceStatus::Announced);
2141 }
2142 }
2143 }
2144 }
2145
2146 if !invalid_intf_addrs.is_empty() {
2147 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2148 }
2149 }
2150
2151 fn unregister_service(
2152 &self,
2153 info: &ServiceInfo,
2154 intf: &MyIntf,
2155 sock: &PktInfoUdpSocket,
2156 ) -> Vec<u8> {
2157 let is_ipv4 = sock.domain() == Domain::IPV4;
2158
2159 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2160 out.add_answer_at_time(
2161 DnsPointer::new(
2162 info.get_type(),
2163 RRType::PTR,
2164 CLASS_IN,
2165 0,
2166 info.get_fullname().to_string(),
2167 ),
2168 0,
2169 );
2170
2171 if let Some(sub) = info.get_subtype() {
2172 trace!("Adding subdomain {}", sub);
2173 out.add_answer_at_time(
2174 DnsPointer::new(
2175 sub,
2176 RRType::PTR,
2177 CLASS_IN,
2178 0,
2179 info.get_fullname().to_string(),
2180 ),
2181 0,
2182 );
2183 }
2184
2185 out.add_answer_at_time(
2186 DnsSrv::new(
2187 info.get_fullname(),
2188 CLASS_IN | CLASS_CACHE_FLUSH,
2189 0,
2190 info.get_priority(),
2191 info.get_weight(),
2192 info.get_port(),
2193 info.get_hostname().to_string(),
2194 ),
2195 0,
2196 );
2197 out.add_answer_at_time(
2198 DnsTxt::new(
2199 info.get_fullname(),
2200 CLASS_IN | CLASS_CACHE_FLUSH,
2201 0,
2202 info.generate_txt(),
2203 ),
2204 0,
2205 );
2206
2207 let if_addrs = if is_ipv4 {
2208 info.get_addrs_on_my_intf_v4(intf)
2209 } else {
2210 info.get_addrs_on_my_intf_v6(intf)
2211 };
2212
2213 if if_addrs.is_empty() {
2214 return vec![];
2215 }
2216
2217 for address in if_addrs {
2218 out.add_answer_at_time(
2219 DnsAddress::new(
2220 info.get_hostname(),
2221 ip_address_rr_type(&address),
2222 CLASS_IN | CLASS_CACHE_FLUSH,
2223 0,
2224 address,
2225 intf.into(),
2226 ),
2227 0,
2228 );
2229 }
2230
2231 let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port) {
2233 Ok(sent_vec) => sent_vec,
2234 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2235 let invalid_intf_addrs = HashSet::from([intf_addr]);
2236 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2237 vec![]
2238 }
2239 };
2240 sent_vec.into_iter().next().unwrap_or_default()
2241 }
2242
2243 fn add_hostname_resolver(
2247 &mut self,
2248 hostname: String,
2249 listener: Sender<HostnameResolutionEvent>,
2250 timeout: Option<u64>,
2251 ) {
2252 let real_timeout = timeout.map(|t| current_time_millis() + t);
2253 self.hostname_resolvers
2254 .insert(hostname.to_lowercase(), (listener, real_timeout));
2255 if let Some(t) = real_timeout {
2256 self.add_timer(t);
2257 }
2258 }
2259
2260 fn send_query(&self, name: &str, qtype: RRType) {
2262 self.send_query_vec(&[(name, qtype)]);
2263 }
2264
2265 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2267 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2268 let now = current_time_millis();
2269
2270 for (name, qtype) in questions {
2271 out.add_question(name, *qtype);
2272
2273 for record in self.cache.get_known_answers(name, *qtype, now) {
2274 trace!("add known answer: {:?}", record.record);
2282 let mut new_record = record.record.clone();
2283 new_record.get_record_mut().update_ttl(now);
2284 out.add_answer_box(new_record);
2285 }
2286 }
2287
2288 let mut invalid_intf_addrs = HashSet::new();
2289 for (_, intf) in self.my_intfs.iter() {
2290 if let Some(sock) = self.ipv4_sock.as_ref() {
2291 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2292 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2293 {
2294 invalid_intf_addrs.insert(intf_addr);
2295 }
2296 }
2297 if let Some(sock) = self.ipv6_sock.as_ref() {
2298 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2299 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2300 {
2301 invalid_intf_addrs.insert(intf_addr);
2302 }
2303 }
2304 }
2305
2306 if !invalid_intf_addrs.is_empty() {
2307 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2308 }
2309 }
2310
2311 fn handle_read(&mut self, event_key: usize) -> bool {
2316 let sock_opt = match event_key {
2317 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2318 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2319 _ => {
2320 debug!("handle_read: unknown token {}", event_key);
2321 return false;
2322 }
2323 };
2324 let Some(sock) = sock_opt.as_mut() else {
2325 debug!("handle_read: socket not available for token {}", event_key);
2326 return false;
2327 };
2328 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2329
2330 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2337 Ok(sz) => sz,
2338 Err(e) => {
2339 if e.kind() != std::io::ErrorKind::WouldBlock {
2340 debug!("listening socket read failed: {}", e);
2341 }
2342 return false;
2343 }
2344 };
2345
2346 let pkt_if_index = pktinfo.if_index as u32;
2348 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2349 debug!(
2350 "handle_read: no interface found for pktinfo if_index: {}",
2351 pktinfo.if_index
2352 );
2353 return true; };
2355
2356 let is_ipv4 = event_key == IPV4_SOCK_EVENT_KEY;
2361 if (is_ipv4 && my_intf.next_ifaddr_v4().is_none())
2362 || (!is_ipv4 && my_intf.next_ifaddr_v6().is_none())
2363 {
2364 debug!(
2365 "handle_read: dropping {} packet on intf {} (disabled)",
2366 if is_ipv4 { "IPv4" } else { "IPv6" },
2367 my_intf.name
2368 );
2369 return true;
2370 }
2371
2372 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
2375 Ok(msg) => {
2376 if msg.is_query() {
2377 self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2378 } else if msg.is_response() {
2379 self.handle_response(msg, pkt_if_index);
2380 } else {
2381 debug!("Invalid message: not query and not response");
2382 }
2383 }
2384 Err(e) => debug!("Invalid incoming DNS message: {}", e),
2385 }
2386
2387 true
2388 }
2389
2390 fn query_unresolved(&mut self, instance: &str) -> bool {
2392 if !valid_instance_name(instance) {
2393 trace!("instance name {} not valid", instance);
2394 return false;
2395 }
2396
2397 if let Some(records) = self.cache.get_srv(instance) {
2398 for record in records {
2399 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2400 if self.cache.get_addr(srv.host()).is_none() {
2401 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2402 return true;
2403 }
2404 }
2405 }
2406 } else {
2407 self.send_query(instance, RRType::ANY);
2408 return true;
2409 }
2410
2411 false
2412 }
2413
2414 fn query_cache_for_service(
2417 &mut self,
2418 ty_domain: &str,
2419 sender: &Sender<ServiceEvent>,
2420 now: u64,
2421 ) {
2422 let mut resolved: HashSet<String> = HashSet::new();
2423 let mut unresolved: HashSet<String> = HashSet::new();
2424
2425 if let Some(records) = self.cache.get_ptr(ty_domain) {
2426 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2427 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2428 let mut new_event = None;
2429 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2430 Ok(resolved_service) => {
2431 if resolved_service.is_valid() {
2432 debug!("Resolved service from cache: {}", ptr.alias());
2433 new_event =
2434 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2435 } else {
2436 debug!("Resolved service is not valid: {}", ptr.alias());
2437 }
2438 }
2439 Err(err) => {
2440 debug!("Error while resolving service from cache: {}", err);
2441 continue;
2442 }
2443 }
2444
2445 match sender.send(ServiceEvent::ServiceFound(
2446 ty_domain.to_string(),
2447 ptr.alias().to_string(),
2448 )) {
2449 Ok(()) => debug!("sent service found {}", ptr.alias()),
2450 Err(e) => {
2451 debug!("failed to send service found: {}", e);
2452 continue;
2453 }
2454 }
2455
2456 if let Some(event) = new_event {
2457 resolved.insert(ptr.alias().to_string());
2458 match sender.send(event) {
2459 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2460 Err(e) => debug!("failed to send service resolved: {}", e),
2461 }
2462 } else {
2463 unresolved.insert(ptr.alias().to_string());
2464 }
2465 }
2466 }
2467 }
2468
2469 for instance in resolved.drain() {
2470 self.pending_resolves.remove(&instance);
2471 self.resolved.insert(instance);
2472 }
2473
2474 for instance in unresolved.drain() {
2475 self.add_pending_resolve(instance);
2476 }
2477 }
2478
2479 fn query_cache_for_hostname(
2482 &mut self,
2483 hostname: &str,
2484 sender: Sender<HostnameResolutionEvent>,
2485 ) {
2486 let addresses_map = self.cache.get_addresses_for_host(hostname);
2487 for (name, addresses) in addresses_map {
2488 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2489 Ok(()) => trace!("sent hostname addresses found"),
2490 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2491 }
2492 }
2493 }
2494
2495 fn add_pending_resolve(&mut self, instance: String) {
2496 if !self.pending_resolves.contains(&instance) {
2497 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2498 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2499 self.pending_resolves.insert(instance);
2500 }
2501 }
2502
2503 fn resolve_service_from_cache(
2505 &self,
2506 ty_domain: &str,
2507 fullname: &str,
2508 ) -> Result<ResolvedService> {
2509 let now = current_time_millis();
2510 let mut resolved_service = ResolvedService {
2511 ty_domain: ty_domain.to_string(),
2512 sub_ty_domain: None,
2513 fullname: fullname.to_string(),
2514 host: String::new(),
2515 port: 0,
2516 addresses: HashSet::new(),
2517 txt_properties: TxtProperties::new(),
2518 };
2519
2520 if let Some(subtype) = self.cache.get_subtype(fullname) {
2522 trace!(
2523 "ty_domain: {} found subtype {} for instance: {}",
2524 ty_domain,
2525 subtype,
2526 fullname
2527 );
2528 if resolved_service.sub_ty_domain.is_none() {
2529 resolved_service.sub_ty_domain = Some(subtype.to_string());
2530 }
2531 }
2532
2533 if let Some(records) = self.cache.get_srv(fullname) {
2535 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2536 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2537 resolved_service.host = dns_srv.host().to_string();
2538 resolved_service.port = dns_srv.port();
2539 }
2540 }
2541 }
2542
2543 if let Some(records) = self.cache.get_txt(fullname) {
2545 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2546 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2547 resolved_service.txt_properties = dns_txt.text().into();
2548 }
2549 }
2550 }
2551
2552 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2554 for answer in records.iter() {
2555 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2556 if dns_a.expires_soon(now) {
2557 trace!(
2558 "Addr expired or expires soon: {}",
2559 dns_a.address().to_ip_addr()
2560 );
2561 } else {
2562 resolved_service.addresses.insert(dns_a.address());
2563 }
2564 }
2565 }
2566 }
2567
2568 Ok(resolved_service)
2569 }
2570
2571 fn handle_poller_events(&mut self, events: &mio::Events) {
2572 for ev in events.iter() {
2573 trace!("event received with key {:?}", ev.token());
2574 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2575 self.signal_sock_drain();
2577
2578 if let Err(e) = self.poller.registry().reregister(
2579 &mut self.signal_sock,
2580 ev.token(),
2581 mio::Interest::READABLE,
2582 ) {
2583 debug!("failed to modify poller for signal socket: {}", e);
2584 }
2585 continue; }
2587
2588 while self.handle_read(ev.token().0) {}
2590
2591 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2593 if let Some(sock) = self.ipv4_sock.as_mut() {
2595 if let Err(e) =
2596 self.poller
2597 .registry()
2598 .reregister(sock, ev.token(), mio::Interest::READABLE)
2599 {
2600 debug!("modify poller for IPv4 socket: {}", e);
2601 }
2602 }
2603 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2604 if let Some(sock) = self.ipv6_sock.as_mut() {
2606 if let Err(e) =
2607 self.poller
2608 .registry()
2609 .reregister(sock, ev.token(), mio::Interest::READABLE)
2610 {
2611 debug!("modify poller for IPv6 socket: {}", e);
2612 }
2613 }
2614 }
2615 }
2616 }
2617
2618 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2621 let now = current_time_millis();
2622
2623 let mut record_predicate = |record: &DnsRecordBox| {
2625 if !record.get_record().is_expired(now) {
2626 return true;
2627 }
2628
2629 debug!("record is expired, removing it from cache.");
2630 if self.cache.remove(record) {
2631 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2633 call_service_listener(
2634 &self.service_queriers,
2635 dns_ptr.get_name(),
2636 ServiceEvent::ServiceRemoved(
2637 dns_ptr.get_name().to_string(),
2638 dns_ptr.alias().to_string(),
2639 ),
2640 );
2641 }
2642 }
2643 false
2644 };
2645 msg.answers_mut().retain(&mut record_predicate);
2646 msg.authorities_mut().retain(&mut record_predicate);
2647 msg.additionals_mut().retain(&mut record_predicate);
2648
2649 self.conflict_handler(&msg, if_index);
2651
2652 let mut is_for_us = true; for answer in msg.answers() {
2659 if answer.get_type() == RRType::PTR {
2660 if self.service_queriers.contains_key(answer.get_name()) {
2661 is_for_us = true;
2662 break; } else {
2664 is_for_us = false;
2665 }
2666 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2667 let answer_lowercase = answer.get_name().to_lowercase();
2669 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2670 is_for_us = true;
2671 break; }
2673 }
2674 }
2675
2676 if self.accept_unsolicited {
2678 is_for_us = true;
2679 }
2680
2681 struct InstanceChange {
2683 ty: RRType, name: String, }
2686
2687 let mut changes = Vec::new();
2695 let mut timers = Vec::new();
2696 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2697 return;
2698 };
2699 for record in msg.all_records() {
2700 match self
2701 .cache
2702 .add_or_update(my_intf, record, &mut timers, is_for_us)
2703 {
2704 Some((dns_record, true)) => {
2705 timers.push(dns_record.record.get_record().get_expire_time());
2706 timers.push(dns_record.record.get_record().get_refresh_time());
2707
2708 let ty = dns_record.record.get_type();
2709 let name = dns_record.record.get_name();
2710
2711 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2713 if self.service_queriers.contains_key(name) {
2714 timers.push(dns_record.record.get_record().get_refresh_time());
2715 }
2716
2717 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2719 {
2720 debug!("calling listener with service found: {name}");
2721 call_service_listener(
2722 &self.service_queriers,
2723 name,
2724 ServiceEvent::ServiceFound(
2725 name.to_string(),
2726 dns_ptr.alias().to_string(),
2727 ),
2728 );
2729 changes.push(InstanceChange {
2730 ty,
2731 name: dns_ptr.alias().to_string(),
2732 });
2733 }
2734 } else {
2735 changes.push(InstanceChange {
2736 ty,
2737 name: name.to_string(),
2738 });
2739 }
2740 }
2741 Some((dns_record, false)) => {
2742 timers.push(dns_record.record.get_record().get_expire_time());
2743 timers.push(dns_record.record.get_record().get_refresh_time());
2744 }
2745 _ => {}
2746 }
2747 }
2748
2749 for t in timers {
2751 self.add_timer(t);
2752 }
2753
2754 for change in changes
2756 .iter()
2757 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2758 {
2759 let addr_map = self.cache.get_addresses_for_host(&change.name);
2760 for (name, addresses) in addr_map {
2761 call_hostname_resolution_listener(
2762 &self.hostname_resolvers,
2763 &change.name,
2764 HostnameResolutionEvent::AddressesFound(name, addresses),
2765 )
2766 }
2767 }
2768
2769 let mut updated_instances = HashSet::new();
2771 for update in changes {
2772 match update.ty {
2773 RRType::PTR | RRType::SRV | RRType::TXT => {
2774 updated_instances.insert(update.name);
2775 }
2776 RRType::A | RRType::AAAA => {
2777 let instances = self.cache.get_instances_on_host(&update.name);
2778 updated_instances.extend(instances);
2779 }
2780 _ => {}
2781 }
2782 }
2783
2784 self.resolve_updated_instances(&updated_instances);
2785 }
2786
2787 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2788 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2789 debug!("handle_response: no intf found for index {if_index}");
2790 return;
2791 };
2792
2793 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2794 return;
2795 };
2796
2797 for answer in msg.answers().iter() {
2798 let mut new_records = Vec::new();
2799
2800 let name = answer.get_name();
2801 let Some(probe) = dns_registry.probing.get_mut(name) else {
2802 continue;
2803 };
2804
2805 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2807 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2808 if answer_addr.interface_id.index != if_index {
2809 debug!(
2810 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2811 answer_addr, my_intf.name
2812 );
2813 continue;
2814 }
2815 }
2816
2817 let any_match = probe.records.iter().any(|r| {
2820 r.get_type() == answer.get_type()
2821 && r.get_class() == answer.get_class()
2822 && r.rrdata_match(answer.as_ref())
2823 });
2824 if any_match {
2825 continue; }
2827 }
2828
2829 probe.records.retain(|record| {
2830 if record.get_type() == answer.get_type()
2831 && record.get_class() == answer.get_class()
2832 && !record.rrdata_match(answer.as_ref())
2833 {
2834 debug!(
2835 "found conflict name: '{name}' record: {}: {} PEER: {}",
2836 record.get_type(),
2837 record.rdata_print(),
2838 answer.rdata_print()
2839 );
2840
2841 let mut new_record = record.clone();
2844 let new_name = match record.get_type() {
2845 RRType::A => hostname_change(name),
2846 RRType::AAAA => hostname_change(name),
2847 _ => name_change(name),
2848 };
2849 new_record.get_record_mut().set_new_name(new_name);
2850 new_records.push(new_record);
2851 return false; }
2853
2854 true
2855 });
2856
2857 let create_time = current_time_millis() + fastrand::u64(0..250);
2864
2865 let waiting_services = probe.waiting_services.clone();
2866
2867 for record in new_records {
2868 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2869 self.timers.push(Reverse(create_time));
2870 }
2871
2872 dns_registry.name_changes.insert(
2874 record.get_record().get_original_name().to_string(),
2875 record.get_name().to_string(),
2876 );
2877
2878 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2879 Some(p) => p,
2880 None => {
2881 let new_probe = dns_registry
2882 .probing
2883 .entry(record.get_name().to_string())
2884 .or_insert_with(|| {
2885 debug!("conflict handler: new probe of {}", record.get_name());
2886 Probe::new(create_time)
2887 });
2888 self.timers.push(Reverse(new_probe.next_send));
2889 new_probe
2890 }
2891 };
2892
2893 debug!(
2894 "insert record with new name '{}' {} into probe",
2895 record.get_name(),
2896 record.get_type()
2897 );
2898 new_probe.insert_record(record);
2899
2900 new_probe.waiting_services.extend(waiting_services.clone());
2901 }
2902 }
2903 }
2904
2905 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2912 let mut resolved: HashSet<String> = HashSet::new();
2913 let mut unresolved: HashSet<String> = HashSet::new();
2914 let mut removed_instances = HashMap::new();
2915
2916 let now = current_time_millis();
2917
2918 for (ty_domain, records) in self.cache.all_ptr().iter() {
2919 if !self.service_queriers.contains_key(ty_domain) {
2920 continue;
2922 }
2923
2924 for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2925 let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2926 continue;
2927 };
2928
2929 let instance = dns_ptr.alias();
2930 if !updated_instances.contains(instance) {
2931 continue;
2932 }
2933
2934 let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2935 else {
2936 continue;
2937 };
2938
2939 debug!("resolve_updated_instances: from cache: {instance}");
2940 if resolved_service.is_valid() {
2941 debug!("call queriers to resolve {instance}");
2942 resolved.insert(instance.to_string());
2943 let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2944 call_service_listener(&self.service_queriers, ty_domain, event);
2945 } else {
2946 debug!("Resolved service is not valid: {instance}");
2947 if self.resolved.remove(dns_ptr.alias()) {
2948 removed_instances
2949 .entry(ty_domain.to_string())
2950 .or_insert_with(HashSet::new)
2951 .insert(instance.to_string());
2952 }
2953 unresolved.insert(instance.to_string());
2954 }
2955 }
2956 }
2957
2958 for instance in resolved.drain() {
2959 self.pending_resolves.remove(&instance);
2960 self.resolved.insert(instance);
2961 }
2962
2963 for instance in unresolved.drain() {
2964 self.add_pending_resolve(instance);
2965 }
2966
2967 if !removed_instances.is_empty() {
2968 debug!(
2969 "resolve_updated_instances: removed {}",
2970 &removed_instances.len()
2971 );
2972 self.notify_service_removal(removed_instances);
2973 }
2974 }
2975
2976 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2978 let sock_opt = if is_ipv4 {
2979 &self.ipv4_sock
2980 } else {
2981 &self.ipv6_sock
2982 };
2983 let Some(sock) = sock_opt.as_ref() else {
2984 debug!("handle_query: socket not available for intf {}", if_index);
2985 return;
2986 };
2987 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2988
2989 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2992
2993 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2994 debug!("missing dns registry for intf {}", if_index);
2995 return;
2996 };
2997
2998 let Some(intf) = self.my_intfs.get(&if_index) else {
2999 debug!("handle_query: no intf found for index {if_index}");
3000 return;
3001 };
3002
3003 for question in msg.questions().iter() {
3004 let qtype = question.entry_type();
3005
3006 if qtype == RRType::PTR {
3007 for service in self.my_services.values() {
3008 if service.get_status(if_index) != ServiceStatus::Announced {
3009 continue;
3010 }
3011
3012 if question.entry_name() == service.get_type()
3013 || service
3014 .get_subtype()
3015 .as_ref()
3016 .is_some_and(|v| v == question.entry_name())
3017 {
3018 add_answer_with_additionals(
3019 &mut out,
3020 &msg,
3021 service,
3022 intf,
3023 dns_registry,
3024 is_ipv4,
3025 );
3026 } else if question.entry_name() == META_QUERY {
3027 let ptr_added = out.add_answer(
3028 &msg,
3029 DnsPointer::new(
3030 question.entry_name(),
3031 RRType::PTR,
3032 CLASS_IN,
3033 service.get_other_ttl(),
3034 service.get_type().to_string(),
3035 ),
3036 );
3037 if !ptr_added {
3038 trace!("answer was not added for meta-query {:?}", &question);
3039 }
3040 }
3041 }
3042 } else {
3043 if qtype == RRType::ANY && msg.num_authorities() > 0 {
3045 let probe_name = question.entry_name();
3046
3047 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
3048 let now = current_time_millis();
3049
3050 if probe.start_time < now {
3054 let incoming_records: Vec<_> = msg
3055 .authorities()
3056 .iter()
3057 .filter(|r| r.get_name() == probe_name)
3058 .collect();
3059
3060 probe.tiebreaking(&incoming_records, now, probe_name);
3061 }
3062 }
3063 }
3064
3065 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3066 for service in self.my_services.values() {
3067 if service.get_status(if_index) != ServiceStatus::Announced {
3068 continue;
3069 }
3070
3071 let service_hostname =
3072 match dns_registry.name_changes.get(service.get_hostname()) {
3073 Some(new_name) => new_name,
3074 None => service.get_hostname(),
3075 };
3076
3077 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3078 let intf_addrs = if is_ipv4 {
3079 service.get_addrs_on_my_intf_v4(intf)
3080 } else {
3081 service.get_addrs_on_my_intf_v6(intf)
3082 };
3083 if intf_addrs.is_empty()
3084 && (qtype == RRType::A || qtype == RRType::AAAA)
3085 {
3086 let t = match qtype {
3087 RRType::A => "TYPE_A",
3088 RRType::AAAA => "TYPE_AAAA",
3089 _ => "invalid_type",
3090 };
3091 trace!(
3092 "Cannot find valid addrs for {} response on intf {:?}",
3093 t,
3094 &intf
3095 );
3096 return;
3097 }
3098 for address in intf_addrs {
3099 out.add_answer(
3100 &msg,
3101 DnsAddress::new(
3102 service_hostname,
3103 ip_address_rr_type(&address),
3104 CLASS_IN | CLASS_CACHE_FLUSH,
3105 service.get_host_ttl(),
3106 address,
3107 intf.into(),
3108 ),
3109 );
3110 }
3111 }
3112 }
3113 }
3114
3115 let query_name = question.entry_name().to_lowercase();
3116 let service_opt = self
3117 .my_services
3118 .iter()
3119 .find(|(k, _v)| {
3120 let service_name = match dns_registry.name_changes.get(k.as_str()) {
3121 Some(new_name) => new_name,
3122 None => k,
3123 };
3124 service_name == &query_name
3125 })
3126 .map(|(_, v)| v);
3127
3128 let Some(service) = service_opt else {
3129 continue;
3130 };
3131
3132 if service.get_status(if_index) != ServiceStatus::Announced {
3133 continue;
3134 }
3135
3136 let intf_addrs = if is_ipv4 {
3137 service.get_addrs_on_my_intf_v4(intf)
3138 } else {
3139 service.get_addrs_on_my_intf_v6(intf)
3140 };
3141 if intf_addrs.is_empty() {
3142 debug!(
3143 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3144 &intf
3145 );
3146 continue;
3147 }
3148
3149 add_answer_of_service(
3150 &mut out,
3151 &msg,
3152 question.entry_name(),
3153 service,
3154 qtype,
3155 intf_addrs,
3156 );
3157 }
3158 }
3159
3160 if out.answers_count() > 0 {
3161 debug!("sending response on intf {}", &intf.name);
3162 out.set_id(msg.id());
3163 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3164 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
3165 {
3166 let invalid_intf_addr = HashSet::from([intf_addr]);
3167 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3168 }
3169
3170 let if_name = intf.name.clone();
3171
3172 self.increase_counter(Counter::Respond, 1);
3173 self.notify_monitors(DaemonEvent::Respond(if_name));
3174 }
3175
3176 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3177 }
3178
3179 fn increase_counter(&mut self, counter: Counter, count: i64) {
3181 let key = counter.to_string();
3182 match self.counters.get_mut(&key) {
3183 Some(v) => *v += count,
3184 None => {
3185 self.counters.insert(key, count);
3186 }
3187 }
3188 }
3189
3190 fn set_counter(&mut self, counter: Counter, count: i64) {
3192 let key = counter.to_string();
3193 self.counters.insert(key, count);
3194 }
3195
3196 fn signal_sock_drain(&self) {
3197 let mut signal_buf = [0; 1024];
3198
3199 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3201 trace!(
3202 "signal socket recvd: {}",
3203 String::from_utf8_lossy(&signal_buf[0..sz])
3204 );
3205 }
3206 }
3207
3208 fn add_retransmission(&mut self, next_time: u64, command: Command) {
3209 self.retransmissions.push(ReRun { next_time, command });
3210 self.add_timer(next_time);
3211 }
3212
3213 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3216 for (ty_domain, sender) in self.service_queriers.iter() {
3217 if let Some(instances) = expired.get(ty_domain) {
3218 for instance_name in instances {
3219 let event = ServiceEvent::ServiceRemoved(
3220 ty_domain.to_string(),
3221 instance_name.to_string(),
3222 );
3223 match sender.send(event) {
3224 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3225 Err(e) => debug!("Failed to send event: {}", e),
3226 }
3227 }
3228 }
3229 }
3230 }
3231
3232 fn exec_command(&mut self, command: Command, repeating: bool) {
3236 trace!("exec_command: {:?} repeating: {}", &command, repeating);
3237 match command {
3238 Command::Browse(ty, next_delay, cache_only, listener) => {
3239 self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3240 }
3241
3242 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3243 self.exec_command_resolve_hostname(
3244 repeating, hostname, next_delay, listener, timeout,
3245 );
3246 }
3247
3248 Command::Register(service_info) => {
3249 self.register_service(*service_info);
3250 self.increase_counter(Counter::Register, 1);
3251 }
3252
3253 Command::RegisterResend(fullname, intf) => {
3254 trace!("register-resend service: {fullname} on {}", &intf);
3255 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3256 self.exec_command_register_resend(fullname, intf)
3257 {
3258 let invalid_intf_addr = HashSet::from([intf_addr]);
3259 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3260 }
3261 }
3262
3263 Command::Unregister(fullname, resp_s) => {
3264 trace!("unregister service {} repeat {}", &fullname, &repeating);
3265 self.exec_command_unregister(repeating, fullname, resp_s);
3266 }
3267
3268 Command::UnregisterResend(packet, if_index, is_ipv4) => {
3269 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3270 }
3271
3272 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3273
3274 Command::StopResolveHostname(hostname) => {
3275 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3276 }
3277
3278 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3279
3280 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3281
3282 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3283 Ok(()) => trace!("Sent status to the client"),
3284 Err(e) => debug!("Failed to send status: {}", e),
3285 },
3286
3287 Command::Monitor(resp_s) => {
3288 self.monitors.push(resp_s);
3289 }
3290
3291 Command::SetOption(daemon_opt) => {
3292 self.process_set_option(daemon_opt);
3293 }
3294
3295 Command::GetOption(resp_s) => {
3296 let val = DaemonOptionVal {
3297 _service_name_len_max: self.service_name_len_max,
3298 ip_check_interval: self.ip_check_interval,
3299 };
3300 if let Err(e) = resp_s.send(val) {
3301 debug!("Failed to send options: {}", e);
3302 }
3303 }
3304
3305 Command::Verify(instance_fullname, timeout) => {
3306 self.exec_command_verify(instance_fullname, timeout, repeating);
3307 }
3308
3309 Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3310 for intf_addr in invalid_intf_addrs {
3311 self.del_interface_addr(&intf_addr);
3312 }
3313
3314 self.check_ip_changes();
3315 }
3316
3317 _ => {
3318 debug!("unexpected command: {:?}", &command);
3319 }
3320 }
3321 }
3322
3323 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3324 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3325 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3326 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3327 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3328 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3329 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3330 self.set_counter(Counter::Timer, self.timers.len() as i64);
3331
3332 let dns_registry_probe_count: usize = self
3333 .dns_registry_map
3334 .values()
3335 .map(|r| r.probing.len())
3336 .sum();
3337 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3338
3339 let dns_registry_active_count: usize = self
3340 .dns_registry_map
3341 .values()
3342 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3343 .sum();
3344 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3345
3346 let dns_registry_timer_count: usize = self
3347 .dns_registry_map
3348 .values()
3349 .map(|r| r.new_timers.len())
3350 .sum();
3351 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3352
3353 let dns_registry_name_change_count: usize = self
3354 .dns_registry_map
3355 .values()
3356 .map(|r| r.name_changes.len())
3357 .sum();
3358 self.set_counter(
3359 Counter::DnsRegistryNameChange,
3360 dns_registry_name_change_count as i64,
3361 );
3362
3363 if let Err(e) = resp_s.send(self.counters.clone()) {
3365 debug!("Failed to send metrics: {}", e);
3366 }
3367 }
3368
3369 fn exec_command_browse(
3370 &mut self,
3371 repeating: bool,
3372 ty: String,
3373 next_delay: u32,
3374 cache_only: bool,
3375 listener: Sender<ServiceEvent>,
3376 ) {
3377 let pretty_addrs: Vec<String> = self
3378 .my_intfs
3379 .iter()
3380 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3381 .collect();
3382
3383 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3384 "{ty} on {} interfaces [{}]",
3385 pretty_addrs.len(),
3386 pretty_addrs.join(", ")
3387 ))) {
3388 debug!(
3389 "Failed to send SearchStarted({})(repeating:{}): {}",
3390 &ty, repeating, e
3391 );
3392 return;
3393 }
3394
3395 let now = current_time_millis();
3396 if !repeating {
3397 self.service_queriers.insert(ty.clone(), listener.clone());
3401
3402 self.query_cache_for_service(&ty, &listener, now);
3404 }
3405
3406 if cache_only {
3407 match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3409 Ok(()) => debug!("SearchStopped sent for {}", &ty),
3410 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3411 }
3412 return;
3413 }
3414
3415 self.send_query(&ty, RRType::PTR);
3416 self.increase_counter(Counter::Browse, 1);
3417
3418 let next_time = now + (next_delay * 1000) as u64;
3419 let max_delay = 60 * 60;
3420 let delay = cmp::min(next_delay * 2, max_delay);
3421 self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3422 }
3423
3424 fn exec_command_resolve_hostname(
3425 &mut self,
3426 repeating: bool,
3427 hostname: String,
3428 next_delay: u32,
3429 listener: Sender<HostnameResolutionEvent>,
3430 timeout: Option<u64>,
3431 ) {
3432 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3433 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3434 "{} on addrs {:?}",
3435 &hostname, &addr_list
3436 ))) {
3437 debug!(
3438 "Failed to send ResolveStarted({})(repeating:{}): {}",
3439 &hostname, repeating, e
3440 );
3441 return;
3442 }
3443 if !repeating {
3444 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3445 self.query_cache_for_hostname(&hostname, listener.clone());
3447 }
3448
3449 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3450 self.increase_counter(Counter::ResolveHostname, 1);
3451
3452 let now = current_time_millis();
3453 let next_time = now + u64::from(next_delay) * 1000;
3454 let max_delay = 60 * 60;
3455 let delay = cmp::min(next_delay * 2, max_delay);
3456
3457 if self
3459 .hostname_resolvers
3460 .get(&hostname)
3461 .and_then(|(_sender, timeout)| *timeout)
3462 .map(|timeout| next_time < timeout)
3463 .unwrap_or(true)
3464 {
3465 self.add_retransmission(
3466 next_time,
3467 Command::ResolveHostname(hostname, delay, listener, None),
3468 );
3469 }
3470 }
3471
3472 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3473 let pending_query = self.query_unresolved(&instance);
3474 let max_try = 3;
3475 if pending_query && try_count < max_try {
3476 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3479 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3480 }
3481 }
3482
3483 fn exec_command_unregister(
3484 &mut self,
3485 repeating: bool,
3486 fullname: String,
3487 resp_s: Sender<UnregisterStatus>,
3488 ) {
3489 let response = match self.my_services.remove_entry(&fullname) {
3490 None => {
3491 debug!("unregister: cannot find such service {}", &fullname);
3492 UnregisterStatus::NotFound
3493 }
3494 Some((_k, info)) => {
3495 let mut timers = Vec::new();
3496
3497 for (if_index, intf) in self.my_intfs.iter() {
3498 if let Some(sock) = self.ipv4_sock.as_ref() {
3499 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3500 if !repeating && !packet.is_empty() {
3502 let next_time = current_time_millis() + 120;
3503 self.retransmissions.push(ReRun {
3504 next_time,
3505 command: Command::UnregisterResend(packet, *if_index, true),
3506 });
3507 timers.push(next_time);
3508 }
3509 }
3510
3511 if let Some(sock) = self.ipv6_sock.as_ref() {
3513 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3514 if !repeating && !packet.is_empty() {
3515 let next_time = current_time_millis() + 120;
3516 self.retransmissions.push(ReRun {
3517 next_time,
3518 command: Command::UnregisterResend(packet, *if_index, false),
3519 });
3520 timers.push(next_time);
3521 }
3522 }
3523 }
3524
3525 for t in timers {
3526 self.add_timer(t);
3527 }
3528
3529 self.increase_counter(Counter::Unregister, 1);
3530 UnregisterStatus::OK
3531 }
3532 };
3533 if let Err(e) = resp_s.send(response) {
3534 debug!("unregister: failed to send response: {}", e);
3535 }
3536 }
3537
3538 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3539 let Some(intf) = self.my_intfs.get(&if_index) else {
3540 return;
3541 };
3542 let sock_opt = if is_ipv4 {
3543 &self.ipv4_sock
3544 } else {
3545 &self.ipv6_sock
3546 };
3547 let Some(sock) = sock_opt else {
3548 return;
3549 };
3550
3551 let if_addr = if is_ipv4 {
3552 match intf.next_ifaddr_v4() {
3553 Some(addr) => addr,
3554 None => return,
3555 }
3556 } else {
3557 match intf.next_ifaddr_v6() {
3558 Some(addr) => addr,
3559 None => return,
3560 }
3561 };
3562
3563 debug!("UnregisterResend from {:?}", if_addr);
3564 multicast_on_intf(
3565 &packet[..],
3566 &intf.name,
3567 intf.index,
3568 if_addr,
3569 &sock.pktinfo,
3570 self.port,
3571 );
3572
3573 self.increase_counter(Counter::UnregisterResend, 1);
3574 }
3575
3576 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3577 match self.service_queriers.remove_entry(&ty_domain) {
3578 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3579 Some((ty, sender)) => {
3580 trace!("StopBrowse: removed queryer for {}", &ty);
3582 let mut i = 0;
3583 while i < self.retransmissions.len() {
3584 if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3585 if t == &ty {
3586 self.retransmissions.remove(i);
3587 trace!("StopBrowse: removed retransmission for {}", &ty);
3588 continue;
3589 }
3590 }
3591 i += 1;
3592 }
3593
3594 self.cache.remove_service_type(&ty_domain);
3596
3597 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3599 Ok(()) => trace!("Sent SearchStopped to the listener"),
3600 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3601 }
3602 }
3603 }
3604 }
3605
3606 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3607 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3608 trace!("StopResolve: removed queryer for {}", &host);
3610 let mut i = 0;
3611 while i < self.retransmissions.len() {
3612 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3613 if t == &host {
3614 self.retransmissions.remove(i);
3615 trace!("StopResolve: removed retransmission for {}", &host);
3616 continue;
3617 }
3618 }
3619 i += 1;
3620 }
3621
3622 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3624 Ok(()) => trace!("Sent SearchStopped to the listener"),
3625 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3626 }
3627 }
3628 }
3629
3630 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3631 let Some(info) = self.my_services.get_mut(&fullname) else {
3632 trace!("announce: cannot find such service {}", &fullname);
3633 return Ok(());
3634 };
3635
3636 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3637 return Ok(());
3638 };
3639
3640 let Some(intf) = self.my_intfs.get(&if_index) else {
3641 return Ok(());
3642 };
3643
3644 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3645 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3646 } else {
3647 false
3648 };
3649 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3650 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3651 } else {
3652 false
3653 };
3654
3655 if announced_v4 || announced_v6 {
3656 let mut hostname = info.get_hostname();
3657 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3658 hostname = new_name;
3659 }
3660 let service_name = match dns_registry.name_changes.get(&fullname) {
3661 Some(new_name) => new_name.to_string(),
3662 None => fullname,
3663 };
3664
3665 debug!("resend: announce service {service_name} on {}", intf.name);
3666
3667 notify_monitors(
3668 &mut self.monitors,
3669 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3670 );
3671 info.set_status(if_index, ServiceStatus::Announced);
3672 } else {
3673 debug!("register-resend should not fail");
3674 }
3675
3676 self.increase_counter(Counter::RegisterResend, 1);
3677 Ok(())
3678 }
3679
3680 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3681 let now = current_time_millis();
3691 let expire_at = if repeating {
3692 None
3693 } else {
3694 Some(now + timeout.as_millis() as u64)
3695 };
3696
3697 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3699
3700 if !record_vec.is_empty() {
3701 let query_vec: Vec<(&str, RRType)> = record_vec
3702 .iter()
3703 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3704 .collect();
3705 self.send_query_vec(&query_vec);
3706
3707 if let Some(new_expire) = expire_at {
3708 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3712 }
3713 }
3714 }
3715
3716 fn refresh_active_services(&mut self) {
3718 let mut query_ptr_count = 0;
3719 let mut query_srv_count = 0;
3720 let mut new_timers = HashSet::new();
3721 let mut query_addr_count = 0;
3722
3723 for (ty_domain, _sender) in self.service_queriers.iter() {
3724 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3725 if !refreshed_timers.is_empty() {
3726 trace!("sending refresh query for PTR: {}", ty_domain);
3727 self.send_query(ty_domain, RRType::PTR);
3728 query_ptr_count += 1;
3729 new_timers.extend(refreshed_timers);
3730 }
3731
3732 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3733 for (instance, types) in instances {
3734 trace!("sending refresh query for: {}", &instance);
3735 let query_vec = types
3736 .into_iter()
3737 .map(|ty| (instance.as_str(), ty))
3738 .collect::<Vec<_>>();
3739 self.send_query_vec(&query_vec);
3740 query_srv_count += 1;
3741 }
3742 new_timers.extend(timers);
3743 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3744 for hostname in hostnames.iter() {
3745 trace!("sending refresh queries for A and AAAA: {}", hostname);
3746 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3747 query_addr_count += 2;
3748 }
3749 new_timers.extend(timers);
3750 }
3751
3752 for timer in new_timers {
3753 self.add_timer(timer);
3754 }
3755
3756 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3757 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3758 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3759 }
3760}
3761
3762fn add_answer_of_service(
3764 out: &mut DnsOutgoing,
3765 msg: &DnsIncoming,
3766 entry_name: &str,
3767 service: &ServiceInfo,
3768 qtype: RRType,
3769 intf_addrs: Vec<IpAddr>,
3770) {
3771 if qtype == RRType::SRV || qtype == RRType::ANY {
3772 out.add_answer(
3773 msg,
3774 DnsSrv::new(
3775 entry_name,
3776 CLASS_IN | CLASS_CACHE_FLUSH,
3777 service.get_host_ttl(),
3778 service.get_priority(),
3779 service.get_weight(),
3780 service.get_port(),
3781 service.get_hostname().to_string(),
3782 ),
3783 );
3784 }
3785
3786 if qtype == RRType::TXT || qtype == RRType::ANY {
3787 out.add_answer(
3788 msg,
3789 DnsTxt::new(
3790 entry_name,
3791 CLASS_IN | CLASS_CACHE_FLUSH,
3792 service.get_other_ttl(),
3793 service.generate_txt(),
3794 ),
3795 );
3796 }
3797
3798 if qtype == RRType::SRV {
3799 for address in intf_addrs {
3800 out.add_additional_answer(DnsAddress::new(
3801 service.get_hostname(),
3802 ip_address_rr_type(&address),
3803 CLASS_IN | CLASS_CACHE_FLUSH,
3804 service.get_host_ttl(),
3805 address,
3806 InterfaceId::default(),
3807 ));
3808 }
3809 }
3810}
3811
3812#[derive(Clone, Debug)]
3815#[non_exhaustive]
3816pub enum ServiceEvent {
3817 SearchStarted(String),
3819
3820 ServiceFound(String, String),
3822
3823 ServiceResolved(Box<ResolvedService>),
3825
3826 ServiceRemoved(String, String),
3828
3829 SearchStopped(String),
3831}
3832
3833#[derive(Clone, Debug)]
3836#[non_exhaustive]
3837pub enum HostnameResolutionEvent {
3838 SearchStarted(String),
3840 AddressesFound(String, HashSet<ScopedIp>),
3842 AddressesRemoved(String, HashSet<ScopedIp>),
3844 SearchTimeout(String),
3846 SearchStopped(String),
3848}
3849
3850#[derive(Clone, Debug)]
3853#[non_exhaustive]
3854pub enum DaemonEvent {
3855 Announce(String, String),
3857
3858 Error(Error),
3860
3861 IpAdd(IpAddr),
3863
3864 IpDel(IpAddr),
3866
3867 NameChange(DnsNameChange),
3870
3871 Respond(String),
3873}
3874
3875#[derive(Clone, Debug)]
3878pub struct DnsNameChange {
3879 pub original: String,
3881
3882 pub new_name: String,
3892
3893 pub rr_type: RRType,
3895
3896 pub intf_name: String,
3898}
3899
3900#[derive(Debug)]
3902enum Command {
3903 Browse(String, u32, bool, Sender<ServiceEvent>),
3905
3906 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(Box<ServiceInfo>),
3911
3912 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3933
3934 GetStatus(Sender<DaemonStatus>),
3936
3937 Monitor(Sender<DaemonEvent>),
3939
3940 SetOption(DaemonOption),
3941
3942 GetOption(Sender<DaemonOptionVal>),
3943
3944 Verify(String, Duration),
3949
3950 InvalidIntfAddrs(HashSet<Interface>),
3952
3953 Exit(Sender<DaemonStatus>),
3954}
3955
3956impl fmt::Display for Command {
3957 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3958 match self {
3959 Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3960 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3961 Self::Exit(_) => write!(f, "Command Exit"),
3962 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3963 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3964 Self::Monitor(_) => write!(f, "Command Monitor"),
3965 Self::Register(_) => write!(f, "Command Register"),
3966 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3967 Self::SetOption(_) => write!(f, "Command SetOption"),
3968 Self::GetOption(_) => write!(f, "Command GetOption"),
3969 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3970 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3971 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3972 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3973 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3974 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3975 Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
3976 }
3977 }
3978}
3979
3980struct DaemonOptionVal {
3981 _service_name_len_max: u8,
3982 ip_check_interval: u64,
3983}
3984
3985#[derive(Debug)]
3986enum DaemonOption {
3987 ServiceNameLenMax(u8),
3988 IpCheckInterval(u64),
3989 EnableInterface(Vec<IfKind>),
3990 DisableInterface(Vec<IfKind>),
3991 MulticastLoopV4(bool),
3992 MulticastLoopV6(bool),
3993 AcceptUnsolicited(bool),
3994 IncludeAppleP2P(bool),
3995 #[cfg(test)]
3996 TestDownInterface(String),
3997 #[cfg(test)]
3998 TestUpInterface(String),
3999}
4000
4001const DOMAIN_LEN: usize = "._tcp.local.".len();
4003
4004fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
4006 if ty_domain.len() <= DOMAIN_LEN + 1 {
4007 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
4009 }
4010
4011 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
4013 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
4014 }
4015 Ok(())
4016}
4017
4018fn check_domain_suffix(name: &str) -> Result<()> {
4020 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
4021 return Err(e_fmt!(
4022 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
4023 name
4024 ));
4025 }
4026
4027 Ok(())
4028}
4029
4030fn check_service_name(fullname: &str) -> Result<()> {
4038 check_domain_suffix(fullname)?;
4039
4040 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
4041 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
4042
4043 if &name[0..1] != "_" {
4044 return Err(e_fmt!("Service name must start with '_'"));
4045 }
4046
4047 let name = &name[1..];
4048
4049 if name.contains("--") {
4050 return Err(e_fmt!("Service name must not contain '--'"));
4051 }
4052
4053 if name.starts_with('-') || name.ends_with('-') {
4054 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4055 }
4056
4057 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4058 if ascii_count < 1 {
4059 return Err(e_fmt!(
4060 "Service name must contain at least one letter (eg: 'A-Za-z')"
4061 ));
4062 }
4063
4064 Ok(())
4065}
4066
4067fn check_hostname(hostname: &str) -> Result<()> {
4069 if !hostname.ends_with(".local.") {
4070 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4071 }
4072
4073 if hostname == ".local." {
4074 return Err(e_fmt!(
4075 "The part of the hostname before '.local.' cannot be empty"
4076 ));
4077 }
4078
4079 if hostname.len() > 255 {
4080 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4081 }
4082
4083 Ok(())
4084}
4085
4086fn call_service_listener(
4087 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4088 ty_domain: &str,
4089 event: ServiceEvent,
4090) {
4091 if let Some(listener) = listeners_map.get(ty_domain) {
4092 match listener.send(event) {
4093 Ok(()) => trace!("Sent event to listener successfully"),
4094 Err(e) => debug!("Failed to send event: {}", e),
4095 }
4096 }
4097}
4098
4099fn call_hostname_resolution_listener(
4100 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4101 hostname: &str,
4102 event: HostnameResolutionEvent,
4103) {
4104 let hostname_lower = hostname.to_lowercase();
4105 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4106 match listener.send(event) {
4107 Ok(()) => trace!("Sent event to listener successfully"),
4108 Err(e) => debug!("Failed to send event: {}", e),
4109 }
4110 }
4111}
4112
4113fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4117 my_ip_interfaces_inner(with_loopback, false)
4118}
4119
4120fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4121 if_addrs::get_if_addrs()
4122 .unwrap_or_default()
4123 .into_iter()
4124 .filter(|i| {
4125 i.is_oper_up()
4126 && !i.is_p2p()
4127 && (!i.is_loopback() || with_loopback)
4128 && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4129 })
4130 .collect()
4131}
4132
4133fn is_apple_p2p_by_name(name: &str) -> bool {
4136 let p2p_prefixes = ["awdl", "llw"];
4137 p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4138}
4139
4140fn send_dns_outgoing(
4143 out: &DnsOutgoing,
4144 my_intf: &MyIntf,
4145 sock: &PktInfoUdpSocket,
4146 port: u16,
4147) -> MyResult<Vec<Vec<u8>>> {
4148 let if_name = &my_intf.name;
4149
4150 let if_addr = if sock.domain() == Domain::IPV4 {
4151 match my_intf.next_ifaddr_v4() {
4152 Some(addr) => addr,
4153 None => return Ok(vec![]),
4154 }
4155 } else {
4156 match my_intf.next_ifaddr_v6() {
4157 Some(addr) => addr,
4158 None => return Ok(vec![]),
4159 }
4160 };
4161
4162 send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock, port)
4163}
4164
4165fn send_dns_outgoing_impl(
4167 out: &DnsOutgoing,
4168 if_name: &str,
4169 if_index: u32,
4170 if_addr: &IfAddr,
4171 sock: &PktInfoUdpSocket,
4172 port: u16,
4173) -> MyResult<Vec<Vec<u8>>> {
4174 let qtype = if out.is_query() {
4175 "query"
4176 } else {
4177 if out.answers_count() == 0 && out.additionals().is_empty() {
4178 return Ok(vec![]); }
4180 "response"
4181 };
4182 trace!(
4183 "send {}: {} questions {} answers {} authorities {} additional",
4184 qtype,
4185 out.questions().len(),
4186 out.answers_count(),
4187 out.authorities().len(),
4188 out.additionals().len()
4189 );
4190
4191 match if_addr.ip() {
4192 IpAddr::V4(ipv4) => {
4193 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4194 debug!(
4195 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4196 ipv4, e
4197 );
4198 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4200 let intf_addr = Interface {
4201 name: if_name.to_string(),
4202 addr: if_addr.clone(),
4203 index: Some(if_index),
4204 oper_status: if_addrs::IfOperStatus::Down,
4205 is_p2p: false,
4206 #[cfg(windows)]
4207 adapter_name: String::new(),
4208 };
4209 return Err(InternalError::IntfAddrInvalid(intf_addr));
4210 }
4211 return Ok(vec![]); }
4213 }
4214 IpAddr::V6(ipv6) => {
4215 if let Err(e) = sock.set_multicast_if_v6(if_index) {
4216 debug!(
4217 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4218 ipv6, e
4219 );
4220 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4222 let intf_addr = Interface {
4223 name: if_name.to_string(),
4224 addr: if_addr.clone(),
4225 index: Some(if_index),
4226 oper_status: if_addrs::IfOperStatus::Down,
4227 is_p2p: false,
4228 #[cfg(windows)]
4229 adapter_name: String::new(),
4230 };
4231 return Err(InternalError::IntfAddrInvalid(intf_addr));
4232 }
4233 return Ok(vec![]); }
4235 }
4236 }
4237
4238 let packet_list = out.to_data_on_wire();
4239 for packet in packet_list.iter() {
4240 multicast_on_intf(packet, if_name, if_index, if_addr, sock, port);
4241 }
4242 Ok(packet_list)
4243}
4244
4245fn multicast_on_intf(
4247 packet: &[u8],
4248 if_name: &str,
4249 if_index: u32,
4250 if_addr: &IfAddr,
4251 socket: &PktInfoUdpSocket,
4252 port: u16,
4253) {
4254 if packet.len() > MAX_MSG_ABSOLUTE {
4255 debug!("Drop over-sized packet ({})", packet.len());
4256 return;
4257 }
4258
4259 let addr: SocketAddr = match if_addr {
4260 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4261 if_addrs::IfAddr::V6(_) => {
4262 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4263 sock.set_scope_id(if_index); sock.into()
4265 }
4266 };
4267
4268 let sock_addr = addr.into();
4270 match socket.send_to(packet, &sock_addr) {
4271 Ok(sz) => trace!(
4272 "sent out {} bytes on interface {} (idx {}) addr {}",
4273 sz,
4274 if_name,
4275 if_index,
4276 if_addr.ip()
4277 ),
4278 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4279 }
4280}
4281
4282fn valid_instance_name(name: &str) -> bool {
4286 name.split('.').count() >= 5
4287}
4288
4289fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4290 monitors.retain(|sender| {
4291 if let Err(e) = sender.try_send(event.clone()) {
4292 debug!("notify_monitors: try_send: {}", &e);
4293 if matches!(e, TrySendError::Disconnected(_)) {
4294 return false; }
4296 }
4297 true
4298 });
4299}
4300
4301fn prepare_announce(
4304 info: &ServiceInfo,
4305 intf: &MyIntf,
4306 dns_registry: &mut DnsRegistry,
4307 is_ipv4: bool,
4308) -> Option<DnsOutgoing> {
4309 let intf_addrs = if is_ipv4 {
4310 info.get_addrs_on_my_intf_v4(intf)
4311 } else {
4312 info.get_addrs_on_my_intf_v6(intf)
4313 };
4314
4315 if intf_addrs.is_empty() {
4316 debug!(
4317 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4318 &intf.name
4319 );
4320 return None;
4321 }
4322
4323 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
4325 Some(new_name) => new_name,
4326 None => info.get_fullname(),
4327 };
4328
4329 debug!(
4330 "prepare to announce service {service_fullname} on {:?}",
4331 &intf_addrs
4332 );
4333
4334 let mut probing_count = 0;
4335 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4336 let create_time = current_time_millis() + fastrand::u64(0..250);
4337
4338 out.add_answer_at_time(
4339 DnsPointer::new(
4340 info.get_type(),
4341 RRType::PTR,
4342 CLASS_IN,
4343 info.get_other_ttl(),
4344 service_fullname.to_string(),
4345 ),
4346 0,
4347 );
4348
4349 if let Some(sub) = info.get_subtype() {
4350 trace!("Adding subdomain {}", sub);
4351 out.add_answer_at_time(
4352 DnsPointer::new(
4353 sub,
4354 RRType::PTR,
4355 CLASS_IN,
4356 info.get_other_ttl(),
4357 service_fullname.to_string(),
4358 ),
4359 0,
4360 );
4361 }
4362
4363 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
4365 Some(new_name) => new_name.to_string(),
4366 None => info.get_hostname().to_string(),
4367 };
4368
4369 let mut srv = DnsSrv::new(
4370 info.get_fullname(),
4371 CLASS_IN | CLASS_CACHE_FLUSH,
4372 info.get_host_ttl(),
4373 info.get_priority(),
4374 info.get_weight(),
4375 info.get_port(),
4376 hostname,
4377 );
4378
4379 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4380 srv.get_record_mut().set_new_name(new_name.to_string());
4381 }
4382
4383 if !info.requires_probe()
4384 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4385 {
4386 out.add_answer_at_time(srv, 0);
4387 } else {
4388 probing_count += 1;
4389 }
4390
4391 let mut txt = DnsTxt::new(
4394 info.get_fullname(),
4395 CLASS_IN | CLASS_CACHE_FLUSH,
4396 info.get_other_ttl(),
4397 info.generate_txt(),
4398 );
4399
4400 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4401 txt.get_record_mut().set_new_name(new_name.to_string());
4402 }
4403
4404 if !info.requires_probe()
4405 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4406 {
4407 out.add_answer_at_time(txt, 0);
4408 } else {
4409 probing_count += 1;
4410 }
4411
4412 let hostname = info.get_hostname();
4415 for address in intf_addrs {
4416 let mut dns_addr = DnsAddress::new(
4417 hostname,
4418 ip_address_rr_type(&address),
4419 CLASS_IN | CLASS_CACHE_FLUSH,
4420 info.get_host_ttl(),
4421 address,
4422 intf.into(),
4423 );
4424
4425 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4426 dns_addr.get_record_mut().set_new_name(new_name.to_string());
4427 }
4428
4429 if !info.requires_probe()
4430 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4431 {
4432 out.add_answer_at_time(dns_addr, 0);
4433 } else {
4434 probing_count += 1;
4435 }
4436 }
4437
4438 if probing_count > 0 {
4439 return None;
4440 }
4441
4442 Some(out)
4443}
4444
4445fn announce_service_on_intf(
4448 dns_registry: &mut DnsRegistry,
4449 info: &ServiceInfo,
4450 intf: &MyIntf,
4451 sock: &PktInfoUdpSocket,
4452 port: u16,
4453) -> MyResult<bool> {
4454 let is_ipv4 = sock.domain() == Domain::IPV4;
4455 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4456 let _ = send_dns_outgoing(&out, intf, sock, port)?;
4457 return Ok(true);
4458 }
4459
4460 Ok(false)
4461}
4462
4463fn name_change(original: &str) -> String {
4471 let mut parts: Vec<_> = original.split('.').collect();
4472 let Some(first_part) = parts.get_mut(0) else {
4473 return format!("{original} (2)");
4474 };
4475
4476 let mut new_name = format!("{first_part} (2)");
4477
4478 if let Some(paren_pos) = first_part.rfind(" (") {
4480 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4482 let absolute_end_pos = paren_pos + end_paren;
4483 if absolute_end_pos == first_part.len() - 1 {
4485 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4488 let base_name = &first_part[..paren_pos];
4489 new_name = format!("{} ({})", base_name, number + 1)
4490 }
4491 }
4492 }
4493 }
4494
4495 *first_part = &new_name;
4496 parts.join(".")
4497}
4498
4499fn hostname_change(original: &str) -> String {
4507 let mut parts: Vec<_> = original.split('.').collect();
4508 let Some(first_part) = parts.get_mut(0) else {
4509 return format!("{original}-2");
4510 };
4511
4512 let mut new_name = format!("{first_part}-2");
4513
4514 if let Some(hyphen_pos) = first_part.rfind('-') {
4516 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4518 let base_name = &first_part[..hyphen_pos];
4519 new_name = format!("{}-{}", base_name, number + 1);
4520 }
4521 }
4522
4523 *first_part = &new_name;
4524 parts.join(".")
4525}
4526
4527fn add_answer_with_additionals(
4528 out: &mut DnsOutgoing,
4529 msg: &DnsIncoming,
4530 service: &ServiceInfo,
4531 intf: &MyIntf,
4532 dns_registry: &DnsRegistry,
4533 is_ipv4: bool,
4534) {
4535 let intf_addrs = if is_ipv4 {
4536 service.get_addrs_on_my_intf_v4(intf)
4537 } else {
4538 service.get_addrs_on_my_intf_v6(intf)
4539 };
4540 if intf_addrs.is_empty() {
4541 trace!("No addrs on LAN of intf {:?}", intf);
4542 return;
4543 }
4544
4545 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4547 Some(new_name) => new_name,
4548 None => service.get_fullname(),
4549 };
4550
4551 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4552 Some(new_name) => new_name,
4553 None => service.get_hostname(),
4554 };
4555
4556 let ptr_added = out.add_answer(
4557 msg,
4558 DnsPointer::new(
4559 service.get_type(),
4560 RRType::PTR,
4561 CLASS_IN,
4562 service.get_other_ttl(),
4563 service_fullname.to_string(),
4564 ),
4565 );
4566
4567 if !ptr_added {
4568 trace!("answer was not added for msg {:?}", msg);
4569 return;
4570 }
4571
4572 if let Some(sub) = service.get_subtype() {
4573 trace!("Adding subdomain {}", sub);
4574 out.add_additional_answer(DnsPointer::new(
4575 sub,
4576 RRType::PTR,
4577 CLASS_IN,
4578 service.get_other_ttl(),
4579 service_fullname.to_string(),
4580 ));
4581 }
4582
4583 out.add_additional_answer(DnsSrv::new(
4586 service_fullname,
4587 CLASS_IN | CLASS_CACHE_FLUSH,
4588 service.get_host_ttl(),
4589 service.get_priority(),
4590 service.get_weight(),
4591 service.get_port(),
4592 hostname.to_string(),
4593 ));
4594
4595 out.add_additional_answer(DnsTxt::new(
4596 service_fullname,
4597 CLASS_IN | CLASS_CACHE_FLUSH,
4598 service.get_other_ttl(),
4599 service.generate_txt(),
4600 ));
4601
4602 for address in intf_addrs {
4603 out.add_additional_answer(DnsAddress::new(
4604 hostname,
4605 ip_address_rr_type(&address),
4606 CLASS_IN | CLASS_CACHE_FLUSH,
4607 service.get_host_ttl(),
4608 address,
4609 intf.into(),
4610 ));
4611 }
4612}
4613
4614fn check_probing(
4617 dns_registry: &mut DnsRegistry,
4618 timers: &mut BinaryHeap<Reverse<u64>>,
4619 now: u64,
4620) -> (DnsOutgoing, Vec<String>) {
4621 let mut expired_probes = Vec::new();
4622 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4623
4624 for (name, probe) in dns_registry.probing.iter_mut() {
4625 if now >= probe.next_send {
4626 if probe.expired(now) {
4627 expired_probes.push(name.clone());
4629 } else {
4630 out.add_question(name, RRType::ANY);
4631
4632 for record in probe.records.iter() {
4640 out.add_authority(record.clone());
4641 }
4642
4643 probe.update_next_send(now);
4644
4645 timers.push(Reverse(probe.next_send));
4647 }
4648 }
4649 }
4650
4651 (out, expired_probes)
4652}
4653
4654fn handle_expired_probes(
4659 expired_probes: Vec<String>,
4660 intf_name: &str,
4661 dns_registry: &mut DnsRegistry,
4662 monitors: &mut Vec<Sender<DaemonEvent>>,
4663) -> HashSet<String> {
4664 let mut waiting_services = HashSet::new();
4665
4666 for name in expired_probes {
4667 let Some(probe) = dns_registry.probing.remove(&name) else {
4668 continue;
4669 };
4670
4671 for record in probe.records.iter() {
4673 if let Some(new_name) = record.get_record().get_new_name() {
4674 dns_registry
4675 .name_changes
4676 .insert(name.clone(), new_name.to_string());
4677
4678 let event = DnsNameChange {
4679 original: record.get_record().get_original_name().to_string(),
4680 new_name: new_name.to_string(),
4681 rr_type: record.get_type(),
4682 intf_name: intf_name.to_string(),
4683 };
4684 debug!("Name change event: {:?}", &event);
4685 notify_monitors(monitors, DaemonEvent::NameChange(event));
4686 }
4687 }
4688
4689 debug!(
4691 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4692 probe.records.len(),
4693 probe.waiting_services.len(),
4694 );
4695
4696 if !probe.records.is_empty() {
4698 match dns_registry.active.get_mut(&name) {
4699 Some(records) => {
4700 records.extend(probe.records);
4701 }
4702 None => {
4703 dns_registry.active.insert(name, probe.records);
4704 }
4705 }
4706
4707 waiting_services.extend(probe.waiting_services);
4708 }
4709 }
4710
4711 waiting_services
4712}
4713
4714fn resolve_addr_to_index(if_kind: IfKind, interfaces: &[Interface]) -> IfKind {
4716 if let IfKind::Addr(addr) = &if_kind {
4717 if let Some(intf) = interfaces.iter().find(|intf| &intf.ip() == addr) {
4718 let if_index = intf.index.unwrap_or(0);
4719 return if addr.is_ipv4() {
4720 IfKind::IndexV4(if_index)
4721 } else {
4722 IfKind::IndexV6(if_index)
4723 };
4724 }
4725 }
4726 if_kind
4727}
4728
4729#[cfg(test)]
4730mod tests {
4731 use super::{
4732 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4733 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4734 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, MDNS_PORT,
4735 };
4736 use crate::{
4737 dns_parser::{
4738 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4739 FLAGS_AA, FLAGS_QR_RESPONSE,
4740 },
4741 service_daemon::{add_answer_of_service, check_hostname},
4742 };
4743 use std::time::{Duration, SystemTime};
4744 use test_log::test;
4745
4746 #[test]
4747 fn test_instance_name() {
4748 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4749 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4750 assert!(!valid_instance_name("_printer._tcp.local."));
4751 }
4752
4753 #[test]
4754 fn test_check_service_name_length() {
4755 let result = check_service_name_length("_tcp", 100);
4756 assert!(result.is_err());
4757 if let Err(e) = result {
4758 println!("{}", e);
4759 }
4760 }
4761
4762 #[test]
4763 fn test_check_hostname() {
4764 for hostname in &[
4766 "my_host.local.",
4767 &("A".repeat(255 - ".local.".len()) + ".local."),
4768 ] {
4769 let result = check_hostname(hostname);
4770 assert!(result.is_ok());
4771 }
4772
4773 for hostname in &[
4775 "my_host.local",
4776 ".local.",
4777 &("A".repeat(256 - ".local.".len()) + ".local."),
4778 ] {
4779 let result = check_hostname(hostname);
4780 assert!(result.is_err());
4781 if let Err(e) = result {
4782 println!("{}", e);
4783 }
4784 }
4785 }
4786
4787 #[test]
4788 fn test_check_domain_suffix() {
4789 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4790 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4791 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4792 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4793 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4794 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4795 }
4796
4797 #[test]
4798 fn test_service_with_temporarily_invalidated_ptr() {
4799 let d = ServiceDaemon::new().expect("Failed to create daemon");
4801
4802 let service = "_test_inval_ptr._udp.local.";
4803 let host_name = "my_host_tmp_invalidated_ptr.local.";
4804 let intfs: Vec<_> = my_ip_interfaces(false);
4805 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4806 let port = 5201;
4807 let my_service =
4808 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4809 .expect("invalid service info")
4810 .enable_addr_auto();
4811 let result = d.register(my_service.clone());
4812 assert!(result.is_ok());
4813
4814 let browse_chan = d.browse(service).unwrap();
4816 let timeout = Duration::from_secs(2);
4817 let mut resolved = false;
4818
4819 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4820 match event {
4821 ServiceEvent::ServiceResolved(info) => {
4822 resolved = true;
4823 println!("Resolved a service of {}", &info.fullname);
4824 break;
4825 }
4826 e => {
4827 println!("Received event {:?}", e);
4828 }
4829 }
4830 }
4831
4832 assert!(resolved);
4833
4834 println!("Stopping browse of {}", service);
4835 d.stop_browse(service).unwrap();
4838
4839 let mut stopped = false;
4844 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4845 match event {
4846 ServiceEvent::SearchStopped(_) => {
4847 stopped = true;
4848 println!("Stopped browsing service");
4849 break;
4850 }
4851 e => {
4855 println!("Received event {:?}", e);
4856 }
4857 }
4858 }
4859
4860 assert!(stopped);
4861
4862 let invalidate_ptr_packet = DnsPointer::new(
4864 my_service.get_type(),
4865 RRType::PTR,
4866 CLASS_IN,
4867 0,
4868 my_service.get_fullname().to_string(),
4869 );
4870
4871 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4872 packet_buffer.add_additional_answer(invalidate_ptr_packet);
4873
4874 for intf in intfs {
4875 let sock = _new_socket_bind(&intf, true).unwrap();
4876 send_dns_outgoing_impl(
4877 &packet_buffer,
4878 &intf.name,
4879 intf.index.unwrap_or(0),
4880 &intf.addr,
4881 &sock.pktinfo,
4882 MDNS_PORT,
4883 )
4884 .unwrap();
4885 }
4886
4887 println!(
4888 "Sent PTR record invalidation. Starting second browse for {}",
4889 service
4890 );
4891
4892 let browse_chan = d.browse(service).unwrap();
4894
4895 resolved = false;
4896 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4897 match event {
4898 ServiceEvent::ServiceResolved(info) => {
4899 resolved = true;
4900 println!("Resolved a service of {}", &info.fullname);
4901 break;
4902 }
4903 e => {
4904 println!("Received event {:?}", e);
4905 }
4906 }
4907 }
4908
4909 assert!(resolved);
4910 d.shutdown().unwrap();
4911 }
4912
4913 #[test]
4914 fn test_expired_srv() {
4915 let service_type = "_expired-srv._udp.local.";
4917 let instance = "test_instance";
4918 let host_name = "expired_srv_host.local.";
4919 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4920 .unwrap()
4921 .enable_addr_auto();
4922 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
4927
4928 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4930 let result = mdns_server.register(my_service);
4931 assert!(result.is_ok());
4932
4933 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4934 let browse_chan = mdns_client.browse(service_type).unwrap();
4935 let timeout = Duration::from_secs(2);
4936 let mut resolved = false;
4937
4938 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4939 if let ServiceEvent::ServiceResolved(info) = event {
4940 resolved = true;
4941 println!("Resolved a service of {}", &info.fullname);
4942 break;
4943 }
4944 }
4945
4946 assert!(resolved);
4947
4948 mdns_server.shutdown().unwrap();
4950
4951 let expire_timeout = Duration::from_secs(new_ttl as u64);
4953 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4954 if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4955 println!("Service removed: {}: {}", &service_type, &full_name);
4956 break;
4957 }
4958 }
4959 }
4960
4961 #[test]
4962 fn test_hostname_resolution_address_removed() {
4963 let server = ServiceDaemon::new().expect("Failed to create server");
4965 let hostname = "addr_remove_host._tcp.local.";
4966 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4967 .iter()
4968 .find(|iface| iface.ip().is_ipv4())
4969 .map(|iface| iface.ip().into())
4970 .unwrap();
4971
4972 let mut my_service = ServiceInfo::new(
4973 "_host_res_test._tcp.local.",
4974 "my_instance",
4975 hostname,
4976 service_ip_addr.to_ip_addr(),
4977 1234,
4978 None,
4979 )
4980 .expect("invalid service info");
4981
4982 let addr_ttl = 2;
4984 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
4987
4988 let client = ServiceDaemon::new().expect("Failed to create client");
4990 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4991 let resolved = loop {
4992 match event_receiver.recv() {
4993 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4994 assert!(found_hostname == hostname);
4995 assert!(addresses.contains(&service_ip_addr));
4996 println!("address found: {:?}", &addresses);
4997 break true;
4998 }
4999 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
5000 Ok(_event) => {}
5001 Err(_) => break false,
5002 }
5003 };
5004
5005 assert!(resolved);
5006
5007 server.shutdown().unwrap();
5009
5010 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
5012 let removed = loop {
5013 match event_receiver.recv_timeout(timeout) {
5014 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
5015 assert!(removed_host == hostname);
5016 assert!(addresses.contains(&service_ip_addr));
5017
5018 println!(
5019 "address removed: hostname: {} addresses: {:?}",
5020 &hostname, &addresses
5021 );
5022 break true;
5023 }
5024 Ok(_event) => {}
5025 Err(_) => {
5026 break false;
5027 }
5028 }
5029 };
5030
5031 assert!(removed);
5032
5033 client.shutdown().unwrap();
5034 }
5035
5036 #[test]
5037 fn test_refresh_ptr() {
5038 let service_type = "_refresh-ptr._udp.local.";
5040 let instance = "test_instance";
5041 let host_name = "refresh_ptr_host.local.";
5042 let service_ip_addr = my_ip_interfaces(false)
5043 .iter()
5044 .find(|iface| iface.ip().is_ipv4())
5045 .map(|iface| iface.ip())
5046 .unwrap();
5047
5048 let mut my_service = ServiceInfo::new(
5049 service_type,
5050 instance,
5051 host_name,
5052 service_ip_addr,
5053 5023,
5054 None,
5055 )
5056 .unwrap();
5057
5058 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
5060
5061 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5063 let result = mdns_server.register(my_service);
5064 assert!(result.is_ok());
5065
5066 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5067 let browse_chan = mdns_client.browse(service_type).unwrap();
5068 let timeout = Duration::from_millis(1500); let mut resolved = false;
5070
5071 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5073 if let ServiceEvent::ServiceResolved(info) = event {
5074 resolved = true;
5075 println!("Resolved a service of {}", &info.fullname);
5076 break;
5077 }
5078 }
5079
5080 assert!(resolved);
5081
5082 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
5084 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5085 println!("event: {:?}", &event);
5086 }
5087
5088 let metrics_chan = mdns_client.get_metrics().unwrap();
5090 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
5091 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
5092 assert_eq!(ptr_refresh_counter, 1);
5093 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
5094 assert_eq!(srvtxt_refresh_counter, 1);
5095
5096 mdns_server.shutdown().unwrap();
5098 mdns_client.shutdown().unwrap();
5099 }
5100
5101 #[test]
5102 fn test_name_change() {
5103 assert_eq!(name_change("foo.local."), "foo (2).local.");
5104 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5105 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5106 assert_eq!(name_change("foo"), "foo (2)");
5107 assert_eq!(name_change("foo (2)"), "foo (3)");
5108 assert_eq!(name_change(""), " (2)");
5109
5110 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
5115
5116 #[test]
5117 fn test_hostname_change() {
5118 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5119 assert_eq!(hostname_change("foo"), "foo-2");
5120 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5121 assert_eq!(hostname_change("foo-9"), "foo-10");
5122 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5123 }
5124
5125 #[test]
5126 fn test_add_answer_txt_ttl() {
5127 let service_type = "_test_add_answer._udp.local.";
5129 let instance = "test_instance";
5130 let host_name = "add_answer_host.local.";
5131 let service_intf = my_ip_interfaces(false)
5132 .into_iter()
5133 .find(|iface| iface.ip().is_ipv4())
5134 .unwrap();
5135 let service_ip_addr = service_intf.ip();
5136 let my_service = ServiceInfo::new(
5137 service_type,
5138 instance,
5139 host_name,
5140 service_ip_addr,
5141 5023,
5142 None,
5143 )
5144 .unwrap();
5145
5146 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5148
5149 let mut dummy_data = out.to_data_on_wire();
5151 let interface_id = InterfaceId::from(&service_intf);
5152 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5153
5154 let if_addrs = vec![service_intf.ip()];
5156 add_answer_of_service(
5157 &mut out,
5158 &incoming,
5159 instance,
5160 &my_service,
5161 RRType::TXT,
5162 if_addrs,
5163 );
5164
5165 assert!(
5167 out.answers_count() > 0,
5168 "No answers added to the outgoing message"
5169 );
5170
5171 let answer = out._answers().first().unwrap();
5173 assert_eq!(answer.0.get_type(), RRType::TXT);
5174
5175 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5177 }
5178
5179 #[test]
5180 fn test_interface_flip() {
5181 let ty_domain = "_intf-flip._udp.local.";
5183 let host_name = "intf_flip.local.";
5184 let now = SystemTime::now()
5185 .duration_since(SystemTime::UNIX_EPOCH)
5186 .unwrap();
5187 let instance_name = now.as_micros().to_string(); let port = 5200;
5189
5190 let (ip_addr1, intf_name) = my_ip_interfaces(false)
5192 .iter()
5193 .find(|iface| iface.ip().is_ipv4())
5194 .map(|iface| (iface.ip(), iface.name.clone()))
5195 .unwrap();
5196
5197 println!("Using interface {} with IP {}", intf_name, ip_addr1);
5198
5199 let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5201 .expect("valid service info");
5202 let server1 = ServiceDaemon::new().expect("failed to start server");
5203 server1
5204 .register(service1)
5205 .expect("Failed to register service1");
5206
5207 std::thread::sleep(Duration::from_secs(2));
5209
5210 let client = ServiceDaemon::new().expect("failed to start client");
5212
5213 let receiver = client.browse(ty_domain).unwrap();
5214
5215 let timeout = Duration::from_secs(3);
5216 let mut got_data = false;
5217
5218 while let Ok(event) = receiver.recv_timeout(timeout) {
5219 if let ServiceEvent::ServiceResolved(_) = event {
5220 println!("Received ServiceResolved event");
5221 got_data = true;
5222 break;
5223 }
5224 }
5225
5226 assert!(got_data, "Should receive ServiceResolved event");
5227
5228 client.set_ip_check_interval(1).unwrap();
5230
5231 println!("Shutting down interface {}", &intf_name);
5233 client.test_down_interface(&intf_name).unwrap();
5234
5235 let mut got_removed = false;
5236
5237 while let Ok(event) = receiver.recv_timeout(timeout) {
5238 if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5239 got_removed = true;
5240 println!("removed: {ty_domain} : {instance}");
5241 break;
5242 }
5243 }
5244 assert!(got_removed, "Should receive ServiceRemoved event");
5245
5246 println!("Bringing up interface {}", &intf_name);
5247 client.test_up_interface(&intf_name).unwrap();
5248 let mut got_data = false;
5249 while let Ok(event) = receiver.recv_timeout(timeout) {
5250 if let ServiceEvent::ServiceResolved(resolved) = event {
5251 got_data = true;
5252 println!("Received ServiceResolved: {:?}", resolved);
5253 break;
5254 }
5255 }
5256 assert!(
5257 got_data,
5258 "Should receive ServiceResolved event after interface is back up"
5259 );
5260
5261 server1.shutdown().unwrap();
5262 client.shutdown().unwrap();
5263 }
5264
5265 #[test]
5266 fn test_cache_only() {
5267 let service_type = "_cache_only._udp.local.";
5269 let instance = "test_instance";
5270 let host_name = "cache_only_host.local.";
5271 let service_ip_addr = my_ip_interfaces(false)
5272 .iter()
5273 .find(|iface| iface.ip().is_ipv4())
5274 .map(|iface| iface.ip())
5275 .unwrap();
5276
5277 let mut my_service = ServiceInfo::new(
5278 service_type,
5279 instance,
5280 host_name,
5281 service_ip_addr,
5282 5023,
5283 None,
5284 )
5285 .unwrap();
5286
5287 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
5289
5290 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5291
5292 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5295 std::thread::sleep(Duration::from_secs(2));
5296
5297 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5299 let result = mdns_server.register(my_service);
5300 assert!(result.is_ok());
5301
5302 let timeout = Duration::from_millis(1500); let mut resolved = false;
5304
5305 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5307 if let ServiceEvent::ServiceResolved(info) = event {
5308 resolved = true;
5309 println!("Resolved a service of {}", &info.get_fullname());
5310 break;
5311 }
5312 }
5313
5314 assert!(resolved);
5315
5316 mdns_server.shutdown().unwrap();
5318 mdns_client.shutdown().unwrap();
5319 }
5320
5321 #[test]
5322 fn test_cache_only_unsolicited() {
5323 let service_type = "_c_unsolicit._udp.local.";
5324 let instance = "test_instance";
5325 let host_name = "c_unsolicit_host.local.";
5326 let service_ip_addr = my_ip_interfaces(false)
5327 .iter()
5328 .find(|iface| iface.ip().is_ipv4())
5329 .map(|iface| iface.ip())
5330 .unwrap();
5331
5332 let my_service = ServiceInfo::new(
5333 service_type,
5334 instance,
5335 host_name,
5336 service_ip_addr,
5337 5023,
5338 None,
5339 )
5340 .unwrap();
5341
5342 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5344 let result = mdns_server.register(my_service);
5345 assert!(result.is_ok());
5346
5347 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5348 mdns_client.accept_unsolicited(true).unwrap();
5349
5350 std::thread::sleep(Duration::from_secs(2));
5353 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5354 let timeout = Duration::from_millis(1500); let mut resolved = false;
5356
5357 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5359 if let ServiceEvent::ServiceResolved(info) = event {
5360 resolved = true;
5361 println!("Resolved a service of {}", &info.get_fullname());
5362 break;
5363 }
5364 }
5365
5366 assert!(resolved);
5367
5368 mdns_server.shutdown().unwrap();
5370 mdns_client.shutdown().unwrap();
5371 }
5372
5373 #[test]
5374 fn test_custom_port_isolation() {
5375 let service_type = "_custom_port._udp.local.";
5380 let instance_custom = "custom_port_instance";
5381 let instance_default = "default_port_instance";
5382 let host_name = "custom_port_host.local.";
5383
5384 let service_ip_addr = my_ip_interfaces(false)
5385 .iter()
5386 .find(|iface| iface.ip().is_ipv4())
5387 .map(|iface| iface.ip())
5388 .expect("Test requires an IPv4 interface");
5389
5390 let service_custom = ServiceInfo::new(
5392 service_type,
5393 instance_custom,
5394 host_name,
5395 service_ip_addr,
5396 8080,
5397 None,
5398 )
5399 .unwrap();
5400
5401 let service_default = ServiceInfo::new(
5403 service_type,
5404 instance_default,
5405 host_name,
5406 service_ip_addr,
5407 8081,
5408 None,
5409 )
5410 .unwrap();
5411
5412 let custom_port = 5454u16;
5414 let server_custom =
5415 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5416 let client_custom =
5417 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5418
5419 let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5421
5422 server_custom
5424 .register(service_custom.clone())
5425 .expect("Failed to register custom port service");
5426
5427 server_default
5429 .register(service_default.clone())
5430 .expect("Failed to register default port service");
5431
5432 let browse_custom = client_custom
5434 .browse(service_type)
5435 .expect("Failed to browse on custom port");
5436
5437 let timeout = Duration::from_secs(3);
5438 let mut found_custom = false;
5439 let mut found_default_on_custom = false;
5440
5441 while let Ok(event) = browse_custom.recv_timeout(timeout) {
5443 if let ServiceEvent::ServiceResolved(info) = event {
5444 println!(
5445 "Custom port client resolved: {} on port {}",
5446 info.get_fullname(),
5447 info.get_port()
5448 );
5449 if info.get_fullname().starts_with(instance_custom) {
5450 found_custom = true;
5451 assert_eq!(info.get_port(), 8080);
5452 }
5453 if info.get_fullname().starts_with(instance_default) {
5454 found_default_on_custom = true;
5455 }
5456 }
5457 }
5458
5459 assert!(
5460 found_custom,
5461 "Custom port client should find service on custom port"
5462 );
5463 assert!(
5464 !found_default_on_custom,
5465 "Custom port client should NOT find service on default port"
5466 );
5467
5468 let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5471 let browse_default = client_default
5472 .browse(service_type)
5473 .expect("Failed to browse on default port");
5474
5475 let mut found_default = false;
5476 let mut found_custom_on_default = false;
5477
5478 while let Ok(event) = browse_default.recv_timeout(timeout) {
5479 if let ServiceEvent::ServiceResolved(info) = event {
5480 println!(
5481 "Default port client resolved: {} on port {}",
5482 info.get_fullname(),
5483 info.get_port()
5484 );
5485 if info.get_fullname().starts_with(instance_default) {
5486 found_default = true;
5487 assert_eq!(info.get_port(), 8081);
5488 }
5489 if info.get_fullname().starts_with(instance_custom) {
5490 found_custom_on_default = true;
5491 }
5492 }
5493 }
5494
5495 assert!(
5496 found_default,
5497 "Default port client should find service on default port"
5498 );
5499 assert!(
5500 !found_custom_on_default,
5501 "Default port client should NOT find service on custom port"
5502 );
5503
5504 server_custom.shutdown().unwrap();
5506 client_custom.shutdown().unwrap();
5507 server_default.shutdown().unwrap();
5508 client_default.shutdown().unwrap();
5509 }
5510}