1#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34 current_time_millis,
35 dns_cache::{DnsCache, IpType},
36 dns_parser::{
37 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
38 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
39 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
40 },
41 error::{e_fmt, Error, Result},
42 service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
43 Receiver, ResolvedService, TxtProperties,
44};
45use flume::{bounded, Sender, TrySendError};
46use if_addrs::{IfAddr, Interface};
47use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
48use socket2::Domain;
49use socket_pktinfo::PktInfoUdpSocket;
50use std::{
51 cmp::{self, Reverse},
52 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
53 fmt, io,
54 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55 str, thread,
56 time::Duration,
57 vec,
58};
59
60pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72pub const MDNS_PORT: u16 = 5353;
74
75const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
76const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
77const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
78
79const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
80
81#[derive(Debug)]
83pub enum UnregisterStatus {
84 OK,
86 NotFound,
88}
89
90#[derive(Debug, PartialEq, Clone, Eq)]
92#[non_exhaustive]
93pub enum DaemonStatus {
94 Running,
96
97 Shutdown,
99}
100
101#[derive(Hash, Eq, PartialEq)]
104enum Counter {
105 Register,
106 RegisterResend,
107 Unregister,
108 UnregisterResend,
109 Browse,
110 ResolveHostname,
111 Respond,
112 CacheRefreshPTR,
113 CacheRefreshSrvTxt,
114 CacheRefreshAddr,
115 KnownAnswerSuppression,
116 CachedPTR,
117 CachedSRV,
118 CachedAddr,
119 CachedTxt,
120 CachedNSec,
121 CachedSubtype,
122 DnsRegistryProbe,
123 DnsRegistryActive,
124 DnsRegistryTimer,
125 DnsRegistryNameChange,
126 Timer,
127}
128
129impl fmt::Display for Counter {
130 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131 match self {
132 Self::Register => write!(f, "register"),
133 Self::RegisterResend => write!(f, "register-resend"),
134 Self::Unregister => write!(f, "unregister"),
135 Self::UnregisterResend => write!(f, "unregister-resend"),
136 Self::Browse => write!(f, "browse"),
137 Self::ResolveHostname => write!(f, "resolve-hostname"),
138 Self::Respond => write!(f, "respond"),
139 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
140 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
141 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
142 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
143 Self::CachedPTR => write!(f, "cached-ptr"),
144 Self::CachedSRV => write!(f, "cached-srv"),
145 Self::CachedAddr => write!(f, "cached-addr"),
146 Self::CachedTxt => write!(f, "cached-txt"),
147 Self::CachedNSec => write!(f, "cached-nsec"),
148 Self::CachedSubtype => write!(f, "cached-subtype"),
149 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
150 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
151 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
152 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
153 Self::Timer => write!(f, "timer"),
154 }
155 }
156}
157
158#[derive(Debug)]
159enum InternalError {
160 IntfAddrInvalid(Interface),
161}
162
163impl fmt::Display for InternalError {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 match self {
166 InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
167 }
168 }
169}
170
171type MyResult<T> = core::result::Result<T, InternalError>;
172
173struct MyUdpSocket {
178 pktinfo: PktInfoUdpSocket,
181
182 mio: MioUdpSocket,
185}
186
187impl MyUdpSocket {
188 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
189 let std_sock = pktinfo.try_clone_std()?;
190 let mio = MioUdpSocket::from_std(std_sock);
191
192 Ok(Self { pktinfo, mio })
193 }
194}
195
196impl Source for MyUdpSocket {
198 fn register(
199 &mut self,
200 registry: &Registry,
201 token: Token,
202 interests: Interest,
203 ) -> io::Result<()> {
204 self.mio.register(registry, token, interests)
205 }
206
207 fn reregister(
208 &mut self,
209 registry: &Registry,
210 token: Token,
211 interests: Interest,
212 ) -> io::Result<()> {
213 self.mio.reregister(registry, token, interests)
214 }
215
216 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
217 self.mio.deregister(registry)
218 }
219}
220
221pub type Metrics = HashMap<String, i64>;
224
225const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
233pub struct ServiceDaemon {
234 sender: Sender<Command>,
236
237 signal_addr: SocketAddr,
243}
244
245impl ServiceDaemon {
246 pub fn new() -> Result<Self> {
252 Self::new_with_port(MDNS_PORT)
253 }
254
255 pub fn new_with_port(port: u16) -> Result<Self> {
278 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
281
282 let signal_sock = UdpSocket::bind(signal_addr)
283 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
284
285 let signal_addr = signal_sock
287 .local_addr()
288 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
289
290 signal_sock
292 .set_nonblocking(true)
293 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
294
295 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
296
297 let (sender, receiver) = bounded(100);
298
299 let mio_sock = MioUdpSocket::from_std(signal_sock);
301 let cmd_sender = sender.clone();
302 thread::Builder::new()
303 .name("mDNS_daemon".to_string())
304 .spawn(move || {
305 Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
306 })
307 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
308
309 Ok(Self {
310 sender,
311 signal_addr,
312 })
313 }
314
315 fn send_cmd(&self, cmd: Command) -> Result<()> {
318 let cmd_name = cmd.to_string();
319
320 self.sender.try_send(cmd).map_err(|e| match e {
322 TrySendError::Full(_) => Error::Again,
323 e => e_fmt!("flume::channel::send failed: {}", e),
324 })?;
325
326 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
328 let socket = UdpSocket::bind(addr)
329 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
330 socket
331 .send_to(cmd_name.as_bytes(), self.signal_addr)
332 .map_err(|e| {
333 e_fmt!(
334 "signal socket send_to {} ({}) failed: {}",
335 self.signal_addr,
336 cmd_name,
337 e
338 )
339 })?;
340
341 Ok(())
342 }
343
344 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
355 check_domain_suffix(service_type)?;
356
357 let (resp_s, resp_r) = bounded(10);
358 self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
359 Ok(resp_r)
360 }
361
362 pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
371 check_domain_suffix(service_type)?;
372
373 let (resp_s, resp_r) = bounded(10);
374 self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
375 Ok(resp_r)
376 }
377
378 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
383 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
384 }
385
386 pub fn resolve_hostname(
394 &self,
395 hostname: &str,
396 timeout: Option<u64>,
397 ) -> Result<Receiver<HostnameResolutionEvent>> {
398 check_hostname(hostname)?;
399 let (resp_s, resp_r) = bounded(10);
400 self.send_cmd(Command::ResolveHostname(
401 hostname.to_string(),
402 1,
403 resp_s,
404 timeout,
405 ))?;
406 Ok(resp_r)
407 }
408
409 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
414 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
415 }
416
417 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
425 check_service_name(service_info.get_fullname())?;
426 check_hostname(service_info.get_hostname())?;
427
428 self.send_cmd(Command::Register(service_info.into()))
429 }
430
431 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
439 let (resp_s, resp_r) = bounded(1);
440 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
441 Ok(resp_r)
442 }
443
444 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
448 let (resp_s, resp_r) = bounded(100);
449 self.send_cmd(Command::Monitor(resp_s))?;
450 Ok(resp_r)
451 }
452
453 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
458 let (resp_s, resp_r) = bounded(1);
459 self.send_cmd(Command::Exit(resp_s))?;
460 Ok(resp_r)
461 }
462
463 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
469 let (resp_s, resp_r) = bounded(1);
470
471 if self.sender.is_disconnected() {
472 resp_s
473 .send(DaemonStatus::Shutdown)
474 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
475 } else {
476 self.send_cmd(Command::GetStatus(resp_s))?;
477 }
478
479 Ok(resp_r)
480 }
481
482 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
487 let (resp_s, resp_r) = bounded(1);
488 self.send_cmd(Command::GetMetrics(resp_s))?;
489 Ok(resp_r)
490 }
491
492 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
499 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
502 return Err(Error::Msg(format!(
503 "service name length max {len_max} is too large"
504 )));
505 }
506
507 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
508 }
509
510 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
516 let interval_in_millis = interval_in_secs as u64 * 1000;
517 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
518 interval_in_millis,
519 )))
520 }
521
522 pub fn get_ip_check_interval(&self) -> Result<u32> {
524 let (resp_s, resp_r) = bounded(1);
525 self.send_cmd(Command::GetOption(resp_s))?;
526
527 let option = resp_r
528 .recv_timeout(Duration::from_secs(10))
529 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
530 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
531 Ok(ip_check_interval_in_secs as u32)
532 }
533
534 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
541 let if_kind_vec = if_kind.into_vec();
542 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
543 if_kind_vec.kinds,
544 )))
545 }
546
547 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
554 let if_kind_vec = if_kind.into_vec();
555 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
556 if_kind_vec.kinds,
557 )))
558 }
559
560 pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
571 self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
572 }
573
574 pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
577 self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
578 }
579
580 #[cfg(test)]
581 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
582 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
583 ifname.to_string(),
584 )))
585 }
586
587 #[cfg(test)]
588 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
589 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
590 ifname.to_string(),
591 )))
592 }
593
594 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
610 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
611 }
612
613 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
629 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
630 }
631
632 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
645 self.send_cmd(Command::Verify(instance_fullname, timeout))
646 }
647
648 fn daemon_thread(
649 signal_sock: MioUdpSocket,
650 poller: Poll,
651 receiver: Receiver<Command>,
652 port: u16,
653 cmd_sender: Sender<Command>,
654 signal_addr: SocketAddr,
655 ) {
656 let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
657
658 if let Some(cmd) = zc.run(receiver) {
659 match cmd {
660 Command::Exit(resp_s) => {
661 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
664 debug!("exit: failed to send response of shutdown: {}", e);
665 }
666 }
667 _ => {
668 debug!("Unexpected command: {:?}", cmd);
669 }
670 }
671 }
672 }
673}
674
675fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
677 let intf_ip = &intf.ip();
680 match intf_ip {
681 IpAddr::V4(ip) => {
682 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
683 let sock = new_socket(addr.into(), true)?;
684
685 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
687 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
688
689 sock.set_multicast_if_v4(ip)
691 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
692
693 sock.set_multicast_ttl_v4(255)
698 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
699
700 if !should_loop {
701 sock.set_multicast_loop_v4(false)
702 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
703 }
704
705 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
707 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
708 for packet in test_packets {
709 sock.send_to(&packet, &multicast_addr)
710 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
711 }
712 MyUdpSocket::new(sock)
713 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
714 }
715 IpAddr::V6(ip) => {
716 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
717 let sock = new_socket(addr.into(), true)?;
718
719 let if_index = intf.index.unwrap_or(0);
720
721 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
723 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
724
725 sock.set_multicast_if_v6(if_index)
727 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
728
729 MyUdpSocket::new(sock)
734 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
735 }
736 }
737}
738
739fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
742 let domain = match addr {
743 SocketAddr::V4(_) => socket2::Domain::IPV4,
744 SocketAddr::V6(_) => socket2::Domain::IPV6,
745 };
746
747 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
748
749 fd.set_reuse_address(true)
750 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
751 #[cfg(unix)]
752 if let Err(e) = fd.set_reuse_port(true) {
753 debug!(
754 "SO_REUSEPORT is not supported, continuing without it: {}",
755 e
756 );
757 }
758
759 if non_block {
760 fd.set_nonblocking(true)
761 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
762 }
763
764 fd.bind(&addr.into())
765 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
766
767 trace!("new socket bind to {}", &addr);
768 Ok(fd)
769}
770
771struct ReRun {
773 next_time: u64,
775 command: Command,
776}
777
778#[derive(Debug, Clone)]
782#[non_exhaustive]
783pub enum IfKind {
784 All,
786
787 IPv4,
789
790 IPv6,
792
793 Name(String),
795
796 Addr(IpAddr),
800
801 LoopbackV4,
805
806 LoopbackV6,
808
809 IndexV4(u32),
811
812 IndexV6(u32),
814}
815
816impl IfKind {
817 fn matches(&self, intf: &Interface) -> bool {
819 match self {
820 Self::All => true,
821 Self::IPv4 => intf.ip().is_ipv4(),
822 Self::IPv6 => intf.ip().is_ipv6(),
823 Self::Name(ifname) => ifname == &intf.name,
824 Self::Addr(addr) => addr == &intf.ip(),
825 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
826 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
827 Self::IndexV4(idx) => intf.index == Some(*idx) && intf.ip().is_ipv4(),
828 Self::IndexV6(idx) => intf.index == Some(*idx) && intf.ip().is_ipv6(),
829 }
830 }
831}
832
833impl From<&str> for IfKind {
836 fn from(val: &str) -> Self {
837 Self::Name(val.to_string())
838 }
839}
840
841impl From<&String> for IfKind {
842 fn from(val: &String) -> Self {
843 Self::Name(val.to_string())
844 }
845}
846
847impl From<IpAddr> for IfKind {
849 fn from(val: IpAddr) -> Self {
850 Self::Addr(val)
851 }
852}
853
854pub struct IfKindVec {
856 kinds: Vec<IfKind>,
857}
858
859pub trait IntoIfKindVec {
861 fn into_vec(self) -> IfKindVec;
862}
863
864impl<T: Into<IfKind>> IntoIfKindVec for T {
865 fn into_vec(self) -> IfKindVec {
866 let if_kind: IfKind = self.into();
867 IfKindVec {
868 kinds: vec![if_kind],
869 }
870 }
871}
872
873impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
874 fn into_vec(self) -> IfKindVec {
875 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
876 IfKindVec { kinds }
877 }
878}
879
880struct IfSelection {
882 if_kind: IfKind,
884
885 selected: bool,
887}
888
889struct Zeroconf {
891 port: u16,
894
895 my_intfs: HashMap<u32, MyIntf>,
897
898 ipv4_sock: Option<MyUdpSocket>,
900
901 ipv6_sock: Option<MyUdpSocket>,
903
904 my_services: HashMap<String, ServiceInfo>,
906
907 cache: DnsCache,
909
910 dns_registry_map: HashMap<u32, DnsRegistry>,
912
913 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
924
925 counters: Metrics,
926
927 poller: Poll,
929
930 monitors: Vec<Sender<DaemonEvent>>,
932
933 service_name_len_max: u8,
935
936 ip_check_interval: u64,
938
939 if_selections: Vec<IfSelection>,
941
942 signal_sock: MioUdpSocket,
944
945 timers: BinaryHeap<Reverse<u64>>,
951
952 status: DaemonStatus,
953
954 pending_resolves: HashSet<String>,
956
957 resolved: HashSet<String>,
959
960 multicast_loop_v4: bool,
961
962 multicast_loop_v6: bool,
963
964 accept_unsolicited: bool,
965
966 include_apple_p2p: bool,
967
968 cmd_sender: Sender<Command>,
969
970 signal_addr: SocketAddr,
971
972 #[cfg(test)]
973 test_down_interfaces: HashSet<String>,
974}
975
976fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
978 let intf_ip = &intf.ip();
979 match intf_ip {
980 IpAddr::V4(ip) => {
981 debug!("join multicast group V4 on {} addr {ip}", intf.name);
983 my_sock
984 .join_multicast_v4(&GROUP_ADDR_V4, ip)
985 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
986 }
987 IpAddr::V6(ip) => {
988 let if_index = intf.index.unwrap_or(0);
989 debug!(
991 "join multicast group V6 on {} addr {ip} with index {if_index}",
992 intf.name
993 );
994 my_sock
995 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
996 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
997 }
998 }
999 Ok(())
1000}
1001
1002impl Zeroconf {
1003 fn new(
1004 signal_sock: MioUdpSocket,
1005 poller: Poll,
1006 port: u16,
1007 cmd_sender: Sender<Command>,
1008 signal_addr: SocketAddr,
1009 ) -> Self {
1010 let my_ifaddrs = my_ip_interfaces(true);
1012
1013 let mut my_intfs = HashMap::new();
1017 let mut dns_registry_map = HashMap::new();
1018
1019 let mut ipv4_sock = None;
1022 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1023 match new_socket(addr.into(), true) {
1024 Ok(sock) => {
1025 sock.set_multicast_ttl_v4(255)
1030 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1031 .ok();
1032
1033 ipv4_sock = match MyUdpSocket::new(sock) {
1035 Ok(s) => Some(s),
1036 Err(e) => {
1037 debug!("failed to create IPv4 MyUdpSocket: {e}");
1038 None
1039 }
1040 };
1041 }
1042 Err(e) => debug!("failed to create IPv4 socket: {e}"),
1044 }
1045
1046 let mut ipv6_sock = None;
1047 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1048 match new_socket(addr.into(), true) {
1049 Ok(sock) => {
1050 sock.set_multicast_hops_v6(255)
1054 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1055 .ok();
1056
1057 ipv6_sock = match MyUdpSocket::new(sock) {
1059 Ok(s) => Some(s),
1060 Err(e) => {
1061 debug!("failed to create IPv6 MyUdpSocket: {e}");
1062 None
1063 }
1064 };
1065 }
1066 Err(e) => debug!("failed to create IPv6 socket: {e}"),
1067 }
1068
1069 for intf in my_ifaddrs {
1071 let sock_opt = if intf.ip().is_ipv4() {
1072 &ipv4_sock
1073 } else {
1074 &ipv6_sock
1075 };
1076 let Some(sock) = sock_opt else {
1077 debug!(
1078 "no socket available for interface {} with addr {}. Skipped.",
1079 intf.name,
1080 intf.ip()
1081 );
1082 continue;
1083 };
1084
1085 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1086 debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1087 }
1088
1089 let if_index = intf.index.unwrap_or(0);
1090
1091 dns_registry_map
1093 .entry(if_index)
1094 .or_insert_with(DnsRegistry::new);
1095
1096 my_intfs
1097 .entry(if_index)
1098 .and_modify(|v: &mut MyIntf| {
1099 v.addrs.insert(intf.addr.clone());
1100 })
1101 .or_insert(MyIntf {
1102 name: intf.name.clone(),
1103 index: if_index,
1104 addrs: HashSet::from([intf.addr]),
1105 });
1106 }
1107
1108 let monitors = Vec::new();
1109 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1110 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1111
1112 let timers = BinaryHeap::new();
1113
1114 let if_selections = vec![];
1116
1117 let status = DaemonStatus::Running;
1118
1119 Self {
1120 port,
1121 my_intfs,
1122 ipv4_sock,
1123 ipv6_sock,
1124 my_services: HashMap::new(),
1125 cache: DnsCache::new(),
1126 dns_registry_map,
1127 hostname_resolvers: HashMap::new(),
1128 service_queriers: HashMap::new(),
1129 retransmissions: Vec::new(),
1130 counters: HashMap::new(),
1131 poller,
1132 monitors,
1133 service_name_len_max,
1134 ip_check_interval,
1135 if_selections,
1136 signal_sock,
1137 timers,
1138 status,
1139 pending_resolves: HashSet::new(),
1140 resolved: HashSet::new(),
1141 multicast_loop_v4: true,
1142 multicast_loop_v6: true,
1143 accept_unsolicited: false,
1144 include_apple_p2p: false,
1145 cmd_sender,
1146 signal_addr,
1147
1148 #[cfg(test)]
1149 test_down_interfaces: HashSet::new(),
1150 }
1151 }
1152
1153 fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1155 let cmd_name = cmd.to_string();
1156
1157 self.cmd_sender.try_send(cmd).map_err(|e| match e {
1158 TrySendError::Full(_) => Error::Again,
1159 e => e_fmt!("flume::channel::send failed: {}", e),
1160 })?;
1161
1162 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1163 let socket = UdpSocket::bind(addr)
1164 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1165 socket
1166 .send_to(cmd_name.as_bytes(), self.signal_addr)
1167 .map_err(|e| {
1168 e_fmt!(
1169 "signal socket send_to {} ({}) failed: {}",
1170 self.signal_addr,
1171 cmd_name,
1172 e
1173 )
1174 })?;
1175
1176 Ok(())
1177 }
1178
1179 fn cleanup(&mut self) {
1187 debug!("Starting cleanup for shutdown");
1188
1189 let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1191 for fullname in service_names {
1192 if let Some(info) = self.my_services.get(&fullname) {
1193 debug!("Unregistering service during shutdown: {}", &fullname);
1194
1195 for intf in self.my_intfs.values() {
1196 if let Some(sock) = self.ipv4_sock.as_ref() {
1197 self.unregister_service(info, intf, &sock.pktinfo);
1198 }
1199
1200 if let Some(sock) = self.ipv6_sock.as_ref() {
1201 self.unregister_service(info, intf, &sock.pktinfo);
1202 }
1203 }
1204 }
1205 }
1206 self.my_services.clear();
1207
1208 let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1210 for ty_domain in browse_types {
1211 debug!("Stopping browse during shutdown: {}", &ty_domain);
1212 if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1213 if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1215 debug!("Failed to send SearchStopped during shutdown: {}", e);
1216 }
1217 }
1218 }
1219
1220 let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1222 for hostname in hostnames {
1223 debug!(
1224 "Stopping hostname resolution during shutdown: {}",
1225 &hostname
1226 );
1227 if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1228 if let Err(e) =
1230 sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1231 {
1232 debug!(
1233 "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1234 e
1235 );
1236 }
1237 }
1238 }
1239
1240 self.retransmissions.clear();
1242
1243 debug!("Cleanup completed");
1244 }
1245
1246 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1255 if let Err(e) = self.poller.registry().register(
1257 &mut self.signal_sock,
1258 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1259 mio::Interest::READABLE,
1260 ) {
1261 debug!("failed to add signal socket to the poller: {}", e);
1262 return None;
1263 }
1264
1265 if let Some(sock) = self.ipv4_sock.as_mut() {
1266 if let Err(e) = self.poller.registry().register(
1267 sock,
1268 mio::Token(IPV4_SOCK_EVENT_KEY),
1269 mio::Interest::READABLE,
1270 ) {
1271 debug!("failed to register ipv4 socket: {}", e);
1272 return None;
1273 }
1274 }
1275
1276 if let Some(sock) = self.ipv6_sock.as_mut() {
1277 if let Err(e) = self.poller.registry().register(
1278 sock,
1279 mio::Token(IPV6_SOCK_EVENT_KEY),
1280 mio::Interest::READABLE,
1281 ) {
1282 debug!("failed to register ipv6 socket: {}", e);
1283 return None;
1284 }
1285 }
1286
1287 let mut next_ip_check = if self.ip_check_interval > 0 {
1289 current_time_millis() + self.ip_check_interval
1290 } else {
1291 0
1292 };
1293
1294 if next_ip_check > 0 {
1295 self.add_timer(next_ip_check);
1296 }
1297
1298 let mut events = mio::Events::with_capacity(1024);
1301 loop {
1302 let now = current_time_millis();
1303
1304 let earliest_timer = self.peek_earliest_timer();
1305 let timeout = earliest_timer.map(|timer| {
1306 let millis = if timer > now { timer - now } else { 1 };
1308 Duration::from_millis(millis)
1309 });
1310
1311 events.clear();
1313 match self.poller.poll(&mut events, timeout) {
1314 Ok(_) => self.handle_poller_events(&events),
1315 Err(e) => debug!("failed to select from sockets: {}", e),
1316 }
1317
1318 let now = current_time_millis();
1319
1320 self.pop_timers_till(now);
1322
1323 for hostname in self
1325 .hostname_resolvers
1326 .clone()
1327 .into_iter()
1328 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1329 .map(|(hostname, _)| hostname)
1330 {
1331 trace!("hostname resolver timeout for {}", &hostname);
1332 call_hostname_resolution_listener(
1333 &self.hostname_resolvers,
1334 &hostname,
1335 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1336 );
1337 call_hostname_resolution_listener(
1338 &self.hostname_resolvers,
1339 &hostname,
1340 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1341 );
1342 self.hostname_resolvers.remove(&hostname);
1343 }
1344
1345 while let Ok(command) = receiver.try_recv() {
1347 if matches!(command, Command::Exit(_)) {
1348 debug!("Exit command received, performing cleanup");
1349 self.cleanup();
1350 self.status = DaemonStatus::Shutdown;
1351 return Some(command);
1352 }
1353 self.exec_command(command, false);
1354 }
1355
1356 let mut i = 0;
1358 while i < self.retransmissions.len() {
1359 if now >= self.retransmissions[i].next_time {
1360 let rerun = self.retransmissions.remove(i);
1361 self.exec_command(rerun.command, true);
1362 } else {
1363 i += 1;
1364 }
1365 }
1366
1367 self.refresh_active_services();
1369
1370 let mut query_count = 0;
1372 for (hostname, _sender) in self.hostname_resolvers.iter() {
1373 for (hostname, ip_addr) in
1374 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1375 {
1376 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1377 query_count += 1;
1378 }
1379 }
1380
1381 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1382
1383 let now = current_time_millis();
1385
1386 let expired_services = self.cache.evict_expired_services(now);
1388 if !expired_services.is_empty() {
1389 debug!(
1390 "run: send {} service removal to listeners",
1391 expired_services.len()
1392 );
1393 self.notify_service_removal(expired_services);
1394 }
1395
1396 let expired_addrs = self.cache.evict_expired_addr(now);
1398 for (hostname, addrs) in expired_addrs {
1399 call_hostname_resolution_listener(
1400 &self.hostname_resolvers,
1401 &hostname,
1402 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1403 );
1404 let instances = self.cache.get_instances_on_host(&hostname);
1405 let instance_set: HashSet<String> = instances.into_iter().collect();
1406 self.resolve_updated_instances(&instance_set);
1407 }
1408
1409 self.probing_handler();
1411
1412 if now >= next_ip_check && next_ip_check > 0 {
1414 next_ip_check = now + self.ip_check_interval;
1415 self.add_timer(next_ip_check);
1416
1417 self.check_ip_changes();
1418 }
1419 }
1420 }
1421
1422 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1423 match daemon_opt {
1424 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1425 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1426 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1427 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1428 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1429 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1430 DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1431 DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1432 #[cfg(test)]
1433 DaemonOption::TestDownInterface(ifname) => {
1434 self.test_down_interfaces.insert(ifname);
1435 }
1436 #[cfg(test)]
1437 DaemonOption::TestUpInterface(ifname) => {
1438 self.test_down_interfaces.remove(&ifname);
1439 }
1440 }
1441 }
1442
1443 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1444 debug!("enable_interface: {:?}", kinds);
1445 let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1446
1447 for if_kind in kinds {
1448 self.if_selections.push(IfSelection {
1449 if_kind: resolve_addr_to_index(if_kind, &interfaces),
1450 selected: true,
1451 });
1452 }
1453
1454 self.apply_intf_selections(interfaces);
1455 }
1456
1457 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1458 debug!("disable_interface: {:?}", kinds);
1459 let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1460
1461 for if_kind in kinds {
1462 self.if_selections.push(IfSelection {
1463 if_kind: resolve_addr_to_index(if_kind, &interfaces),
1464 selected: false,
1465 });
1466 }
1467
1468 self.apply_intf_selections(interfaces);
1469 }
1470
1471 fn set_multicast_loop_v4(&mut self, on: bool) {
1472 let Some(sock) = self.ipv4_sock.as_mut() else {
1473 return;
1474 };
1475 self.multicast_loop_v4 = on;
1476 sock.pktinfo
1477 .set_multicast_loop_v4(on)
1478 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1479 .unwrap();
1480 }
1481
1482 fn set_multicast_loop_v6(&mut self, on: bool) {
1483 let Some(sock) = self.ipv6_sock.as_mut() else {
1484 return;
1485 };
1486 self.multicast_loop_v6 = on;
1487 sock.pktinfo
1488 .set_multicast_loop_v6(on)
1489 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1490 .unwrap();
1491 }
1492
1493 fn set_accept_unsolicited(&mut self, accept: bool) {
1494 self.accept_unsolicited = accept;
1495 }
1496
1497 fn set_apple_p2p(&mut self, include: bool) {
1498 if self.include_apple_p2p != include {
1499 self.include_apple_p2p = include;
1500 self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1501 }
1502 }
1503
1504 fn notify_monitors(&mut self, event: DaemonEvent) {
1505 self.monitors.retain(|sender| {
1507 if let Err(e) = sender.try_send(event.clone()) {
1508 debug!("notify_monitors: try_send: {}", &e);
1509 if matches!(e, TrySendError::Disconnected(_)) {
1510 return false; }
1512 }
1513 true
1514 });
1515 }
1516
1517 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1519 for (_, service_info) in self.my_services.iter_mut() {
1520 if service_info.is_addr_auto() {
1521 service_info.remove_ipaddr(addr);
1522 }
1523 }
1524 }
1525
1526 fn add_timer(&mut self, next_time: u64) {
1527 self.timers.push(Reverse(next_time));
1528 }
1529
1530 fn peek_earliest_timer(&self) -> Option<u64> {
1531 self.timers.peek().map(|Reverse(v)| *v)
1532 }
1533
1534 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1535 self.timers.pop().map(|Reverse(v)| v)
1536 }
1537
1538 fn pop_timers_till(&mut self, now: u64) {
1540 while let Some(Reverse(v)) = self.timers.peek() {
1541 if *v > now {
1542 break;
1543 }
1544 self.timers.pop();
1545 }
1546 }
1547
1548 fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1550 let intf_count = interfaces.len();
1551 let mut intf_selections = vec![true; intf_count];
1552
1553 for selection in self.if_selections.iter() {
1555 for i in 0..intf_count {
1557 if selection.if_kind.matches(&interfaces[i]) {
1558 intf_selections[i] = selection.selected;
1559 }
1560 }
1561 }
1562
1563 let mut selected_addrs = HashSet::new();
1564 for i in 0..intf_count {
1565 if intf_selections[i] {
1566 selected_addrs.insert(interfaces[i].clone());
1567 }
1568 }
1569
1570 selected_addrs
1571 }
1572
1573 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1578 let intf_count = interfaces.len();
1580 let mut intf_selections = vec![true; intf_count];
1581
1582 for selection in self.if_selections.iter() {
1584 for i in 0..intf_count {
1586 if selection.if_kind.matches(&interfaces[i]) {
1587 intf_selections[i] = selection.selected;
1588 }
1589 }
1590 }
1591
1592 for (idx, intf) in interfaces.into_iter().enumerate() {
1594 if intf_selections[idx] {
1595 self.add_interface(intf);
1597 } else {
1598 self.del_interface_addr(&intf);
1600 }
1601 }
1602 }
1603
1604 fn del_ip(&mut self, ip: IpAddr) {
1605 self.del_addr_in_my_services(&ip);
1606 self.notify_monitors(DaemonEvent::IpDel(ip));
1607 }
1608
1609 fn check_ip_changes(&mut self) {
1611 let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1613
1614 #[cfg(test)]
1615 let my_ifaddrs: Vec<_> = my_ifaddrs
1616 .into_iter()
1617 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1618 .collect();
1619
1620 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1621 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1622 let if_index = intf.index.unwrap_or(0);
1623 acc.entry(if_index).or_default().push(&intf.addr);
1624 acc
1625 });
1626
1627 let mut deleted_intfs = Vec::new();
1628 let mut deleted_ips = Vec::new();
1629
1630 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1631 let mut last_ipv4 = None;
1632 let mut last_ipv6 = None;
1633
1634 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1635 my_intf.addrs.retain(|addr| {
1636 if current_addrs.contains(&addr) {
1637 true
1638 } else {
1639 match addr.ip() {
1640 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1641 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1642 }
1643 deleted_ips.push(addr.ip());
1644 false
1645 }
1646 });
1647 if my_intf.addrs.is_empty() {
1648 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1649 }
1650 } else {
1651 debug!(
1653 "check_ip_changes: interface {} ({}) no longer exists, removing",
1654 my_intf.name, if_index
1655 );
1656 for addr in my_intf.addrs.iter() {
1657 match addr.ip() {
1658 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1659 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1660 }
1661 deleted_ips.push(addr.ip())
1662 }
1663 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1664 }
1665 }
1666
1667 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1668 debug!(
1669 "check_ip_changes: {} deleted ips {} deleted intfs",
1670 deleted_ips.len(),
1671 deleted_intfs.len()
1672 );
1673 }
1674
1675 for ip in deleted_ips {
1676 self.del_ip(ip);
1677 }
1678
1679 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1680 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1681 continue;
1682 };
1683
1684 if let Some(ipv4) = last_ipv4 {
1685 debug!("leave multicast for {ipv4}");
1686 if let Some(sock) = self.ipv4_sock.as_mut() {
1687 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1688 debug!("leave multicast group for addr {ipv4}: {e}");
1689 }
1690 }
1691 }
1692
1693 if let Some(ipv6) = last_ipv6 {
1694 debug!("leave multicast for {ipv6}");
1695 if let Some(sock) = self.ipv6_sock.as_mut() {
1696 if let Err(e) = sock
1697 .pktinfo
1698 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1699 {
1700 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1701 }
1702 }
1703 }
1704
1705 let intf_id = InterfaceId {
1707 name: my_intf.name.to_string(),
1708 index: my_intf.index,
1709 };
1710 let result = self.cache.remove_records_on_intf(intf_id);
1711 self.notify_service_removal(result.removed_instances);
1712 self.resolve_updated_instances(&result.modified_instances);
1713 }
1714
1715 self.apply_intf_selections(my_ifaddrs);
1717 }
1718
1719 fn del_interface_addr(&mut self, intf: &Interface) {
1722 let if_index = intf.index.unwrap_or(0);
1723 debug!(
1724 "del_interface_addr: {} ({if_index}) addr {}",
1725 intf.name,
1726 intf.ip()
1727 );
1728
1729 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1730 debug!("del_interface_addr: interface {} not found", intf.name);
1731 return;
1732 };
1733
1734 let mut ip_removed = false;
1735
1736 if my_intf.addrs.remove(&intf.addr) {
1737 ip_removed = true;
1738
1739 match intf.addr.ip() {
1740 IpAddr::V4(ipv4) => {
1741 if my_intf.next_ifaddr_v4().is_none() {
1742 if let Some(sock) = self.ipv4_sock.as_mut() {
1743 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1744 debug!("leave multicast group for addr {ipv4}: {e}");
1745 } else {
1746 debug!("leave multicast for {ipv4}");
1747 }
1748 }
1749 }
1750 }
1751
1752 IpAddr::V6(ipv6) => {
1753 if my_intf.next_ifaddr_v6().is_none() {
1754 if let Some(sock) = self.ipv6_sock.as_mut() {
1755 if let Err(e) =
1756 sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1757 {
1758 debug!("leave multicast group for addr {ipv6}: {e}");
1759 }
1760 }
1761 }
1762 }
1763 }
1764
1765 if my_intf.addrs.is_empty() {
1766 debug!("del_interface_addr: removing interface {}", intf.name);
1768 self.my_intfs.remove(&if_index);
1769 self.dns_registry_map.remove(&if_index);
1770 self.cache
1771 .remove_addrs_on_disabled_intf(if_index, IpType::BOTH);
1772 } else {
1773 let is_v4 = intf.addr.ip().is_ipv4();
1777 let version_gone = if is_v4 {
1778 my_intf.next_ifaddr_v4().is_none()
1779 } else {
1780 my_intf.next_ifaddr_v6().is_none()
1781 };
1782 if version_gone {
1783 let ip_type = if is_v4 { IpType::V4 } else { IpType::V6 };
1784 self.cache.remove_addrs_on_disabled_intf(if_index, ip_type);
1785 }
1786 }
1787 }
1788
1789 if ip_removed {
1790 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1792 self.del_addr_in_my_services(&intf.ip());
1794 }
1795 }
1796
1797 fn add_interface(&mut self, intf: Interface) {
1798 let sock_opt = if intf.ip().is_ipv4() {
1799 &self.ipv4_sock
1800 } else {
1801 &self.ipv6_sock
1802 };
1803
1804 let Some(sock) = sock_opt else {
1805 debug!(
1806 "add_interface: no socket available for interface {} with addr {}. Skipped.",
1807 intf.name,
1808 intf.ip()
1809 );
1810 return;
1811 };
1812
1813 let if_index = intf.index.unwrap_or(0);
1814 let mut new_addr = false;
1815
1816 match self.my_intfs.entry(if_index) {
1817 Entry::Occupied(mut entry) => {
1818 let my_intf = entry.get_mut();
1820 if !my_intf.addrs.contains(&intf.addr) {
1821 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1822 debug!("add_interface: socket_config {}: {e}", &intf.name);
1823 }
1824 my_intf.addrs.insert(intf.addr.clone());
1825 new_addr = true;
1826 }
1827 }
1828 Entry::Vacant(entry) => {
1829 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1830 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1831 return;
1832 }
1833
1834 new_addr = true;
1835 let new_intf = MyIntf {
1836 name: intf.name.clone(),
1837 index: if_index,
1838 addrs: HashSet::from([intf.addr.clone()]),
1839 };
1840 entry.insert(new_intf);
1841 }
1842 }
1843
1844 if !new_addr {
1845 trace!("add_interface: interface {} already exists", &intf.name);
1846 return;
1847 }
1848
1849 debug!("add new interface {}: {}", intf.name, intf.ip());
1850
1851 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1852 debug!("add_interface: cannot find if_index {if_index}");
1853 return;
1854 };
1855
1856 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1857 Some(registry) => registry,
1858 None => self
1859 .dns_registry_map
1860 .entry(if_index)
1861 .or_insert_with(DnsRegistry::new),
1862 };
1863
1864 for (_, service_info) in self.my_services.iter_mut() {
1865 if service_info.is_addr_auto() {
1866 service_info.insert_ipaddr(&intf);
1867
1868 if let Ok(true) = announce_service_on_intf(
1869 dns_registry,
1870 service_info,
1871 my_intf,
1872 &sock.pktinfo,
1873 self.port,
1874 ) {
1875 debug!(
1876 "Announce service {} on {}",
1877 service_info.get_fullname(),
1878 intf.ip()
1879 );
1880 service_info.set_status(if_index, ServiceStatus::Announced);
1881 } else {
1882 for timer in dns_registry.new_timers.drain(..) {
1883 self.timers.push(Reverse(timer));
1884 }
1885 service_info.set_status(if_index, ServiceStatus::Probing);
1886 }
1887 }
1888 }
1889
1890 if let Some(my_intf) = self.my_intfs.get(&if_index) {
1895 for ty in self.service_queriers.keys() {
1896 self.send_query_on_intf(ty, RRType::PTR, my_intf);
1897 }
1898 }
1899
1900 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1902 }
1903
1904 fn register_service(&mut self, mut info: ServiceInfo) {
1913 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1915 error!("check_service_name_length: {}", &e);
1916 self.notify_monitors(DaemonEvent::Error(e));
1917 return;
1918 }
1919
1920 if info.is_addr_auto() {
1921 let selected_intfs =
1922 self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
1923 for intf in selected_intfs {
1924 info.insert_ipaddr(&intf);
1925 }
1926 }
1927
1928 debug!("register service {:?}", &info);
1929
1930 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1931 if !outgoing_addrs.is_empty() {
1932 self.notify_monitors(DaemonEvent::Announce(
1933 info.get_fullname().to_string(),
1934 format!("{:?}", &outgoing_addrs),
1935 ));
1936 }
1937
1938 let service_fullname = info.get_fullname().to_lowercase();
1941 self.my_services.insert(service_fullname, info);
1942 }
1943
1944 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1947 let mut outgoing_addrs = Vec::new();
1948 let mut outgoing_intfs = HashSet::new();
1949
1950 let mut invalid_intf_addrs = HashSet::new();
1951
1952 for (if_index, intf) in self.my_intfs.iter() {
1953 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1954 Some(registry) => registry,
1955 None => self
1956 .dns_registry_map
1957 .entry(*if_index)
1958 .or_insert_with(DnsRegistry::new),
1959 };
1960
1961 let mut announced = false;
1962
1963 if let Some(sock) = self.ipv4_sock.as_mut() {
1965 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1966 Ok(true) => {
1967 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1968 outgoing_addrs.push(addr.ip());
1969 }
1970 outgoing_intfs.insert(intf.index);
1971
1972 debug!(
1973 "Announce service IPv4 {} on {}",
1974 info.get_fullname(),
1975 intf.name
1976 );
1977 announced = true;
1978 }
1979 Ok(false) => {}
1980 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
1981 invalid_intf_addrs.insert(intf_addr);
1982 }
1983 }
1984 }
1985
1986 if let Some(sock) = self.ipv6_sock.as_mut() {
1987 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1988 Ok(true) => {
1989 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1990 outgoing_addrs.push(addr.ip());
1991 }
1992 outgoing_intfs.insert(intf.index);
1993
1994 debug!(
1995 "Announce service IPv6 {} on {}",
1996 info.get_fullname(),
1997 intf.name
1998 );
1999 announced = true;
2000 }
2001 Ok(false) => {}
2002 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2003 invalid_intf_addrs.insert(intf_addr);
2004 }
2005 }
2006 }
2007
2008 if announced {
2009 info.set_status(intf.index, ServiceStatus::Announced);
2010 } else {
2011 for timer in dns_registry.new_timers.drain(..) {
2012 self.timers.push(Reverse(timer));
2013 }
2014 info.set_status(*if_index, ServiceStatus::Probing);
2015 }
2016 }
2017
2018 if !invalid_intf_addrs.is_empty() {
2019 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2020 }
2021
2022 let next_time = current_time_millis() + 1000;
2026 for if_index in outgoing_intfs {
2027 self.add_retransmission(
2028 next_time,
2029 Command::RegisterResend(info.get_fullname().to_string(), if_index),
2030 );
2031 }
2032
2033 outgoing_addrs
2034 }
2035
2036 fn probing_handler(&mut self) {
2038 let now = current_time_millis();
2039 let mut invalid_intf_addrs = HashSet::new();
2040
2041 for (if_index, intf) in self.my_intfs.iter() {
2042 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2043 continue;
2044 };
2045
2046 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2047
2048 if !out.questions().is_empty() {
2050 trace!("sending out probing of questions: {:?}", out.questions());
2051 if let Some(sock) = self.ipv4_sock.as_mut() {
2052 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2053 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2054 {
2055 invalid_intf_addrs.insert(intf_addr);
2056 }
2057 }
2058 if let Some(sock) = self.ipv6_sock.as_mut() {
2059 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2060 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2061 {
2062 invalid_intf_addrs.insert(intf_addr);
2063 }
2064 }
2065 }
2066
2067 let waiting_services =
2069 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2070
2071 for service_name in waiting_services {
2072 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2074 if info.get_status(*if_index) == ServiceStatus::Announced {
2075 debug!("service {} already announced", info.get_fullname());
2076 continue;
2077 }
2078
2079 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2080 match announce_service_on_intf(
2081 dns_registry,
2082 info,
2083 intf,
2084 &sock.pktinfo,
2085 self.port,
2086 ) {
2087 Ok(announced) => announced,
2088 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2089 invalid_intf_addrs.insert(intf_addr);
2090 false
2091 }
2092 }
2093 } else {
2094 false
2095 };
2096 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2097 match announce_service_on_intf(
2098 dns_registry,
2099 info,
2100 intf,
2101 &sock.pktinfo,
2102 self.port,
2103 ) {
2104 Ok(announced) => announced,
2105 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2106 invalid_intf_addrs.insert(intf_addr);
2107 false
2108 }
2109 }
2110 } else {
2111 false
2112 };
2113
2114 if announced_v4 || announced_v6 {
2115 let next_time = now + 1000;
2116 let command =
2117 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2118 self.retransmissions.push(ReRun { next_time, command });
2119 self.timers.push(Reverse(next_time));
2120
2121 let fullname = dns_registry.resolve_name(&service_name).to_string();
2122
2123 let hostname = dns_registry.resolve_name(info.get_hostname());
2124
2125 debug!("wake up: announce service {} on {}", fullname, intf.name);
2126 notify_monitors(
2127 &mut self.monitors,
2128 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2129 );
2130
2131 info.set_status(*if_index, ServiceStatus::Announced);
2132 }
2133 }
2134 }
2135 }
2136
2137 if !invalid_intf_addrs.is_empty() {
2138 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2139 }
2140 }
2141
2142 fn unregister_service(
2143 &self,
2144 info: &ServiceInfo,
2145 intf: &MyIntf,
2146 sock: &PktInfoUdpSocket,
2147 ) -> Vec<u8> {
2148 let is_ipv4 = sock.domain() == Domain::IPV4;
2149
2150 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2151 out.add_answer_at_time(
2152 DnsPointer::new(
2153 info.get_type(),
2154 RRType::PTR,
2155 CLASS_IN,
2156 0,
2157 info.get_fullname().to_string(),
2158 ),
2159 0,
2160 );
2161
2162 if let Some(sub) = info.get_subtype() {
2163 trace!("Adding subdomain {}", sub);
2164 out.add_answer_at_time(
2165 DnsPointer::new(
2166 sub,
2167 RRType::PTR,
2168 CLASS_IN,
2169 0,
2170 info.get_fullname().to_string(),
2171 ),
2172 0,
2173 );
2174 }
2175
2176 out.add_answer_at_time(
2177 DnsSrv::new(
2178 info.get_fullname(),
2179 CLASS_IN | CLASS_CACHE_FLUSH,
2180 0,
2181 info.get_priority(),
2182 info.get_weight(),
2183 info.get_port(),
2184 info.get_hostname().to_string(),
2185 ),
2186 0,
2187 );
2188 out.add_answer_at_time(
2189 DnsTxt::new(
2190 info.get_fullname(),
2191 CLASS_IN | CLASS_CACHE_FLUSH,
2192 0,
2193 info.generate_txt(),
2194 ),
2195 0,
2196 );
2197
2198 let if_addrs = if is_ipv4 {
2199 info.get_addrs_on_my_intf_v4(intf)
2200 } else {
2201 info.get_addrs_on_my_intf_v6(intf)
2202 };
2203
2204 if if_addrs.is_empty() {
2205 return vec![];
2206 }
2207
2208 for address in if_addrs {
2209 out.add_answer_at_time(
2210 DnsAddress::new(
2211 info.get_hostname(),
2212 ip_address_rr_type(&address),
2213 CLASS_IN | CLASS_CACHE_FLUSH,
2214 0,
2215 address,
2216 intf.into(),
2217 ),
2218 0,
2219 );
2220 }
2221
2222 let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port) {
2224 Ok(sent_vec) => sent_vec,
2225 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2226 let invalid_intf_addrs = HashSet::from([intf_addr]);
2227 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2228 vec![]
2229 }
2230 };
2231 sent_vec.into_iter().next().unwrap_or_default()
2232 }
2233
2234 fn add_hostname_resolver(
2238 &mut self,
2239 hostname: String,
2240 listener: Sender<HostnameResolutionEvent>,
2241 timeout: Option<u64>,
2242 ) {
2243 let real_timeout = timeout.map(|t| current_time_millis() + t);
2244 self.hostname_resolvers
2245 .insert(hostname.to_lowercase(), (listener, real_timeout));
2246 if let Some(t) = real_timeout {
2247 self.add_timer(t);
2248 }
2249 }
2250
2251 fn send_query(&self, name: &str, qtype: RRType) {
2253 self.send_query_vec(&[(name, qtype)]);
2254 }
2255
2256 fn send_query_on_intf(&self, name: &str, qtype: RRType, intf: &MyIntf) {
2261 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2262 out.add_question(name, qtype);
2263
2264 let mut invalid_intf_addrs = HashSet::new();
2265 if let Some(sock) = self.ipv4_sock.as_ref() {
2266 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2267 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2268 {
2269 invalid_intf_addrs.insert(intf_addr);
2270 }
2271 }
2272 if let Some(sock) = self.ipv6_sock.as_ref() {
2273 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2274 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2275 {
2276 invalid_intf_addrs.insert(intf_addr);
2277 }
2278 }
2279 if !invalid_intf_addrs.is_empty() {
2280 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2281 }
2282 }
2283
2284 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2286 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2287 let now = current_time_millis();
2288
2289 for (name, qtype) in questions {
2290 out.add_question(name, *qtype);
2291
2292 for record in self.cache.get_known_answers(name, *qtype, now) {
2293 trace!("add known answer: {:?}", record.record);
2301 let mut new_record = record.record.clone();
2302 new_record.get_record_mut().update_ttl(now);
2303 out.add_answer_box(new_record);
2304 }
2305 }
2306
2307 let mut invalid_intf_addrs = HashSet::new();
2308 for (_, intf) in self.my_intfs.iter() {
2309 if let Some(sock) = self.ipv4_sock.as_ref() {
2310 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2311 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2312 {
2313 invalid_intf_addrs.insert(intf_addr);
2314 }
2315 }
2316 if let Some(sock) = self.ipv6_sock.as_ref() {
2317 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2318 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
2319 {
2320 invalid_intf_addrs.insert(intf_addr);
2321 }
2322 }
2323 }
2324
2325 if !invalid_intf_addrs.is_empty() {
2326 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2327 }
2328 }
2329
2330 fn handle_read(&mut self, event_key: usize) -> bool {
2335 let sock_opt = match event_key {
2336 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2337 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2338 _ => {
2339 debug!("handle_read: unknown token {}", event_key);
2340 return false;
2341 }
2342 };
2343 let Some(sock) = sock_opt.as_mut() else {
2344 debug!("handle_read: socket not available for token {}", event_key);
2345 return false;
2346 };
2347 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2348
2349 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2356 Ok(sz) => sz,
2357 Err(e) => {
2358 if e.kind() != std::io::ErrorKind::WouldBlock {
2359 debug!("listening socket read failed: {}", e);
2360 }
2361 return false;
2362 }
2363 };
2364
2365 let pkt_if_index = pktinfo.if_index as u32;
2367 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2368 debug!(
2369 "handle_read: no interface found for pktinfo if_index: {}",
2370 pktinfo.if_index
2371 );
2372 return true; };
2374
2375 let is_ipv4 = event_key == IPV4_SOCK_EVENT_KEY;
2380 if (is_ipv4 && my_intf.next_ifaddr_v4().is_none())
2381 || (!is_ipv4 && my_intf.next_ifaddr_v6().is_none())
2382 {
2383 debug!(
2384 "handle_read: dropping {} packet on intf {} (disabled)",
2385 if is_ipv4 { "IPv4" } else { "IPv6" },
2386 my_intf.name
2387 );
2388 return true;
2389 }
2390
2391 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
2394 Ok(msg) => {
2395 if msg.is_query() {
2396 self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2397 } else if msg.is_response() {
2398 self.handle_response(msg, pkt_if_index);
2399 } else {
2400 debug!("Invalid message: not query and not response");
2401 }
2402 }
2403 Err(e) => debug!("Invalid incoming DNS message: {}", e),
2404 }
2405
2406 true
2407 }
2408
2409 fn query_unresolved(&mut self, instance: &str) -> bool {
2411 if !valid_instance_name(instance) {
2412 trace!("instance name {} not valid", instance);
2413 return false;
2414 }
2415
2416 if let Some(records) = self.cache.get_srv(instance) {
2417 for record in records {
2418 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2419 if self.cache.get_addr(srv.host()).is_none() {
2420 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2421 return true;
2422 }
2423 }
2424 }
2425 } else {
2426 self.send_query(instance, RRType::ANY);
2427 return true;
2428 }
2429
2430 false
2431 }
2432
2433 fn query_cache_for_service(
2436 &mut self,
2437 ty_domain: &str,
2438 sender: &Sender<ServiceEvent>,
2439 now: u64,
2440 ) {
2441 let mut resolved: HashSet<String> = HashSet::new();
2442 let mut unresolved: HashSet<String> = HashSet::new();
2443
2444 if let Some(records) = self.cache.get_ptr(ty_domain) {
2445 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2446 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2447 let mut new_event = None;
2448 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2449 Ok(resolved_service) => {
2450 if resolved_service.is_valid() {
2451 debug!("Resolved service from cache: {}", ptr.alias());
2452 new_event =
2453 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2454 } else {
2455 debug!("Resolved service is not valid: {}", ptr.alias());
2456 }
2457 }
2458 Err(err) => {
2459 debug!("Error while resolving service from cache: {}", err);
2460 continue;
2461 }
2462 }
2463
2464 match sender.send(ServiceEvent::ServiceFound(
2465 ty_domain.to_string(),
2466 ptr.alias().to_string(),
2467 )) {
2468 Ok(()) => debug!("sent service found {}", ptr.alias()),
2469 Err(e) => {
2470 debug!("failed to send service found: {}", e);
2471 continue;
2472 }
2473 }
2474
2475 if let Some(event) = new_event {
2476 resolved.insert(ptr.alias().to_string());
2477 match sender.send(event) {
2478 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2479 Err(e) => debug!("failed to send service resolved: {}", e),
2480 }
2481 } else {
2482 unresolved.insert(ptr.alias().to_string());
2483 }
2484 }
2485 }
2486 }
2487
2488 for instance in resolved.drain() {
2489 self.pending_resolves.remove(&instance);
2490 self.resolved.insert(instance);
2491 }
2492
2493 for instance in unresolved.drain() {
2494 self.add_pending_resolve(instance);
2495 }
2496 }
2497
2498 fn query_cache_for_hostname(
2501 &mut self,
2502 hostname: &str,
2503 sender: Sender<HostnameResolutionEvent>,
2504 ) {
2505 let addresses_map = self.cache.get_addresses_for_host(hostname);
2506 for (name, addresses) in addresses_map {
2507 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2508 Ok(()) => trace!("sent hostname addresses found"),
2509 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2510 }
2511 }
2512 }
2513
2514 fn add_pending_resolve(&mut self, instance: String) {
2515 if !self.pending_resolves.contains(&instance) {
2516 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2517 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2518 self.pending_resolves.insert(instance);
2519 }
2520 }
2521
2522 fn resolve_service_from_cache(
2524 &self,
2525 ty_domain: &str,
2526 fullname: &str,
2527 ) -> Result<ResolvedService> {
2528 let now = current_time_millis();
2529 let mut resolved_service = ResolvedService {
2530 ty_domain: ty_domain.to_string(),
2531 sub_ty_domain: None,
2532 fullname: fullname.to_string(),
2533 host: String::new(),
2534 port: 0,
2535 addresses: HashSet::new(),
2536 txt_properties: TxtProperties::new(),
2537 };
2538
2539 if let Some(subtype) = self.cache.get_subtype(fullname) {
2541 trace!(
2542 "ty_domain: {} found subtype {} for instance: {}",
2543 ty_domain,
2544 subtype,
2545 fullname
2546 );
2547 if resolved_service.sub_ty_domain.is_none() {
2548 resolved_service.sub_ty_domain = Some(subtype.to_string());
2549 }
2550 }
2551
2552 if let Some(records) = self.cache.get_srv(fullname) {
2554 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2555 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2556 resolved_service.host = dns_srv.host().to_string();
2557 resolved_service.port = dns_srv.port();
2558 }
2559 }
2560 }
2561
2562 if let Some(records) = self.cache.get_txt(fullname) {
2564 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2565 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2566 resolved_service.txt_properties = dns_txt.text().into();
2567 }
2568 }
2569 }
2570
2571 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2573 for answer in records.iter() {
2574 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2575 if dns_a.expires_soon(now) {
2576 trace!(
2577 "Addr expired or expires soon: {}",
2578 dns_a.address().to_ip_addr()
2579 );
2580 } else {
2581 let scoped = dns_a.address();
2582 if let ScopedIp::V4(v4) = &scoped {
2583 let existing = resolved_service
2586 .addresses
2587 .iter()
2588 .find(|a| a.to_ip_addr() == IpAddr::V4(*v4.addr()))
2589 .cloned();
2590 if let Some(mut existing) = existing {
2591 resolved_service.addresses.remove(&existing);
2592 if let ScopedIp::V4(existing_v4) = &mut existing {
2593 for id in v4.interface_ids() {
2594 existing_v4.add_interface_id(id.clone());
2595 }
2596 }
2597 resolved_service.addresses.insert(existing);
2598 } else {
2599 resolved_service.addresses.insert(scoped);
2600 }
2601 } else {
2602 resolved_service.addresses.insert(scoped);
2603 }
2604 }
2605 }
2606 }
2607 }
2608
2609 Ok(resolved_service)
2610 }
2611
2612 fn handle_poller_events(&mut self, events: &mio::Events) {
2613 for ev in events.iter() {
2614 trace!("event received with key {:?}", ev.token());
2615 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2616 self.signal_sock_drain();
2618
2619 if let Err(e) = self.poller.registry().reregister(
2620 &mut self.signal_sock,
2621 ev.token(),
2622 mio::Interest::READABLE,
2623 ) {
2624 debug!("failed to modify poller for signal socket: {}", e);
2625 }
2626 continue; }
2628
2629 while self.handle_read(ev.token().0) {}
2631
2632 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2634 if let Some(sock) = self.ipv4_sock.as_mut() {
2636 if let Err(e) =
2637 self.poller
2638 .registry()
2639 .reregister(sock, ev.token(), mio::Interest::READABLE)
2640 {
2641 debug!("modify poller for IPv4 socket: {}", e);
2642 }
2643 }
2644 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2645 if let Some(sock) = self.ipv6_sock.as_mut() {
2647 if let Err(e) =
2648 self.poller
2649 .registry()
2650 .reregister(sock, ev.token(), mio::Interest::READABLE)
2651 {
2652 debug!("modify poller for IPv6 socket: {}", e);
2653 }
2654 }
2655 }
2656 }
2657 }
2658
2659 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2662 let now = current_time_millis();
2663
2664 let mut record_predicate = |record: &DnsRecordBox| {
2666 if !record.get_record().is_expired(now) {
2667 return true;
2668 }
2669
2670 debug!("record is expired, removing it from cache.");
2671 if self.cache.remove(record) {
2672 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2674 call_service_listener(
2675 &self.service_queriers,
2676 dns_ptr.get_name(),
2677 ServiceEvent::ServiceRemoved(
2678 dns_ptr.get_name().to_string(),
2679 dns_ptr.alias().to_string(),
2680 ),
2681 );
2682 }
2683 }
2684 false
2685 };
2686 msg.answers_mut().retain(&mut record_predicate);
2687 msg.authorities_mut().retain(&mut record_predicate);
2688 msg.additionals_mut().retain(&mut record_predicate);
2689
2690 self.conflict_handler(&msg, if_index);
2692
2693 let mut is_for_us = true; for answer in msg.answers() {
2700 if answer.get_type() == RRType::PTR {
2701 if self.service_queriers.contains_key(answer.get_name()) {
2702 is_for_us = true;
2703 break; } else {
2705 is_for_us = false;
2706 }
2707 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2708 let answer_lowercase = answer.get_name().to_lowercase();
2710 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2711 is_for_us = true;
2712 break; }
2714 }
2715 }
2716
2717 if self.accept_unsolicited {
2719 is_for_us = true;
2720 }
2721
2722 struct InstanceChange {
2724 ty: RRType, name: String, }
2727
2728 let mut changes = Vec::new();
2736 let mut timers = Vec::new();
2737 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2738 return;
2739 };
2740 for record in msg.all_records() {
2741 match self
2742 .cache
2743 .add_or_update(my_intf, record, &mut timers, is_for_us)
2744 {
2745 Some((dns_record, true)) => {
2746 timers.push(dns_record.record.get_record().get_expire_time());
2747 timers.push(dns_record.record.get_record().get_refresh_time());
2748
2749 let ty = dns_record.record.get_type();
2750 let name = dns_record.record.get_name();
2751
2752 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2754 if self.service_queriers.contains_key(name) {
2755 timers.push(dns_record.record.get_record().get_refresh_time());
2756 }
2757
2758 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2760 {
2761 debug!("calling listener with service found: {name}");
2762 call_service_listener(
2763 &self.service_queriers,
2764 name,
2765 ServiceEvent::ServiceFound(
2766 name.to_string(),
2767 dns_ptr.alias().to_string(),
2768 ),
2769 );
2770 changes.push(InstanceChange {
2771 ty,
2772 name: dns_ptr.alias().to_string(),
2773 });
2774 }
2775 } else {
2776 changes.push(InstanceChange {
2777 ty,
2778 name: name.to_string(),
2779 });
2780 }
2781 }
2782 Some((dns_record, false)) => {
2783 timers.push(dns_record.record.get_record().get_expire_time());
2784 timers.push(dns_record.record.get_record().get_refresh_time());
2785 }
2786 _ => {}
2787 }
2788 }
2789
2790 for t in timers {
2792 self.add_timer(t);
2793 }
2794
2795 for change in changes
2797 .iter()
2798 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2799 {
2800 let addr_map = self.cache.get_addresses_for_host(&change.name);
2801 for (name, addresses) in addr_map {
2802 call_hostname_resolution_listener(
2803 &self.hostname_resolvers,
2804 &change.name,
2805 HostnameResolutionEvent::AddressesFound(name, addresses),
2806 )
2807 }
2808 }
2809
2810 let mut updated_instances = HashSet::new();
2812 for update in changes {
2813 match update.ty {
2814 RRType::PTR | RRType::SRV | RRType::TXT => {
2815 updated_instances.insert(update.name);
2816 }
2817 RRType::A | RRType::AAAA => {
2818 let instances = self.cache.get_instances_on_host(&update.name);
2819 updated_instances.extend(instances);
2820 }
2821 _ => {}
2822 }
2823 }
2824
2825 self.resolve_updated_instances(&updated_instances);
2826 }
2827
2828 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2829 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2830 debug!("handle_response: no intf found for index {if_index}");
2831 return;
2832 };
2833
2834 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2835 return;
2836 };
2837
2838 for answer in msg.answers().iter() {
2839 let mut new_records = Vec::new();
2840
2841 let name = answer.get_name();
2842 let Some(probe) = dns_registry.probing.get_mut(name) else {
2843 continue;
2844 };
2845
2846 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2848 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2849 if answer_addr.interface_id.index != if_index {
2850 debug!(
2851 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2852 answer_addr, my_intf.name
2853 );
2854 continue;
2855 }
2856 }
2857
2858 let any_match = probe.records.iter().any(|r| {
2861 r.get_type() == answer.get_type()
2862 && r.get_class() == answer.get_class()
2863 && r.rrdata_match(answer.as_ref())
2864 });
2865 if any_match {
2866 continue; }
2868 }
2869
2870 probe.records.retain(|record| {
2871 if record.get_type() == answer.get_type()
2872 && record.get_class() == answer.get_class()
2873 && !record.rrdata_match(answer.as_ref())
2874 {
2875 debug!(
2876 "found conflict name: '{name}' record: {}: {} PEER: {}",
2877 record.get_type(),
2878 record.rdata_print(),
2879 answer.rdata_print()
2880 );
2881
2882 let mut new_record = record.clone();
2885 let new_name = match record.get_type() {
2886 RRType::A => hostname_change(name),
2887 RRType::AAAA => hostname_change(name),
2888 _ => name_change(name),
2889 };
2890 new_record.get_record_mut().set_new_name(new_name);
2891 new_records.push(new_record);
2892 return false; }
2894
2895 true
2896 });
2897
2898 let create_time = current_time_millis() + fastrand::u64(0..250);
2905
2906 let waiting_services = probe.waiting_services.clone();
2907
2908 for record in new_records {
2909 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2910 self.timers.push(Reverse(create_time));
2911 }
2912
2913 dns_registry.name_changes.insert(
2915 record.get_record().get_original_name().to_string(),
2916 record.get_name().to_string(),
2917 );
2918
2919 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2920 Some(p) => p,
2921 None => {
2922 let new_probe = dns_registry
2923 .probing
2924 .entry(record.get_name().to_string())
2925 .or_insert_with(|| {
2926 debug!("conflict handler: new probe of {}", record.get_name());
2927 Probe::new(create_time)
2928 });
2929 self.timers.push(Reverse(new_probe.next_send));
2930 new_probe
2931 }
2932 };
2933
2934 debug!(
2935 "insert record with new name '{}' {} into probe",
2936 record.get_name(),
2937 record.get_type()
2938 );
2939 new_probe.insert_record(record);
2940
2941 new_probe.waiting_services.extend(waiting_services.clone());
2942 }
2943 }
2944 }
2945
2946 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2953 if updated_instances.is_empty() {
2954 return;
2955 }
2956
2957 let mut resolved: HashSet<String> = HashSet::new();
2958 let mut unresolved: HashSet<String> = HashSet::new();
2959 let mut removed_instances = HashMap::new();
2960
2961 let now = current_time_millis();
2962
2963 for (ty_domain, records) in self.cache.all_ptr().iter() {
2964 if !self.service_queriers.contains_key(ty_domain) {
2965 continue;
2967 }
2968
2969 for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2970 let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2971 continue;
2972 };
2973
2974 let instance = dns_ptr.alias();
2975 if !updated_instances.contains(instance) {
2976 continue;
2977 }
2978
2979 let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2980 else {
2981 continue;
2982 };
2983
2984 debug!("resolve_updated_instances: from cache: {instance}");
2985 if resolved_service.is_valid() {
2986 debug!("call queriers to resolve {instance}");
2987 resolved.insert(instance.to_string());
2988 let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2989 call_service_listener(&self.service_queriers, ty_domain, event);
2990 } else {
2991 debug!("Resolved service is not valid: {instance}");
2992 if self.resolved.remove(dns_ptr.alias()) {
2993 removed_instances
2994 .entry(ty_domain.to_string())
2995 .or_insert_with(HashSet::new)
2996 .insert(instance.to_string());
2997 }
2998 unresolved.insert(instance.to_string());
2999 }
3000 }
3001 }
3002
3003 for instance in resolved.drain() {
3004 self.pending_resolves.remove(&instance);
3005 self.resolved.insert(instance);
3006 }
3007
3008 for instance in unresolved.drain() {
3009 self.add_pending_resolve(instance);
3010 }
3011
3012 if !removed_instances.is_empty() {
3013 debug!(
3014 "resolve_updated_instances: removed {}",
3015 &removed_instances.len()
3016 );
3017 self.notify_service_removal(removed_instances);
3018 }
3019 }
3020
3021 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
3023 let sock_opt = if is_ipv4 {
3024 &self.ipv4_sock
3025 } else {
3026 &self.ipv6_sock
3027 };
3028 let Some(sock) = sock_opt.as_ref() else {
3029 debug!("handle_query: socket not available for intf {}", if_index);
3030 return;
3031 };
3032 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3033
3034 const META_QUERY: &str = "_services._dns-sd._udp.local.";
3037
3038 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3039 debug!("missing dns registry for intf {}", if_index);
3040 return;
3041 };
3042
3043 let Some(intf) = self.my_intfs.get(&if_index) else {
3044 debug!("handle_query: no intf found for index {if_index}");
3045 return;
3046 };
3047
3048 for question in msg.questions().iter() {
3049 let qtype = question.entry_type();
3050 let q_name = question.entry_name();
3051
3052 if qtype == RRType::PTR {
3053 for service in self.my_services.values() {
3054 if service.get_status(if_index) != ServiceStatus::Announced {
3055 continue;
3056 }
3057
3058 if service.matches_type_or_subtype(q_name) {
3059 out.add_answer_with_additionals(&msg, service, intf, dns_registry, is_ipv4);
3060 } else if q_name == META_QUERY {
3061 let ttl = service.get_other_ttl();
3062 let alias = service.get_type().to_string();
3063 let ptr = DnsPointer::new(q_name, RRType::PTR, CLASS_IN, ttl, alias);
3064 if !out.add_answer(&msg, ptr) {
3065 trace!("answer was not added for meta-query {:?}", &question);
3066 }
3067 }
3068 }
3069 } else {
3070 if qtype == RRType::ANY && msg.num_authorities() > 0 {
3072 if let Some(probe) = dns_registry.probing.get_mut(q_name) {
3073 probe.tiebreaking(&msg, q_name);
3074 }
3075 }
3076
3077 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3078 for service in self.my_services.values() {
3079 if service.get_status(if_index) != ServiceStatus::Announced {
3080 continue;
3081 }
3082
3083 let service_hostname = dns_registry.resolve_name(service.get_hostname());
3084
3085 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3086 let intf_addrs = if is_ipv4 {
3087 service.get_addrs_on_my_intf_v4(intf)
3088 } else {
3089 service.get_addrs_on_my_intf_v6(intf)
3090 };
3091 if intf_addrs.is_empty()
3092 && (qtype == RRType::A || qtype == RRType::AAAA)
3093 {
3094 let t = match qtype {
3095 RRType::A => "TYPE_A",
3096 RRType::AAAA => "TYPE_AAAA",
3097 _ => "invalid_type",
3098 };
3099 trace!(
3100 "Cannot find valid addrs for {} response on intf {:?}",
3101 t,
3102 &intf
3103 );
3104 return;
3105 }
3106 for address in intf_addrs {
3107 out.add_answer(
3108 &msg,
3109 DnsAddress::new(
3110 service_hostname,
3111 ip_address_rr_type(&address),
3112 CLASS_IN | CLASS_CACHE_FLUSH,
3113 service.get_host_ttl(),
3114 address,
3115 intf.into(),
3116 ),
3117 );
3118 }
3119 }
3120 }
3121 }
3122
3123 let query_name = q_name.to_lowercase();
3124 let service_opt = self
3125 .my_services
3126 .iter()
3127 .find(|(k, _v)| dns_registry.resolve_name(k.as_str()) == query_name)
3128 .map(|(_, v)| v);
3129
3130 let Some(service) = service_opt else {
3131 continue;
3132 };
3133
3134 if service.get_status(if_index) != ServiceStatus::Announced {
3135 continue;
3136 }
3137
3138 let intf_addrs = if is_ipv4 {
3139 service.get_addrs_on_my_intf_v4(intf)
3140 } else {
3141 service.get_addrs_on_my_intf_v6(intf)
3142 };
3143 if intf_addrs.is_empty() {
3144 debug!(
3145 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3146 &intf
3147 );
3148 continue;
3149 }
3150
3151 add_answer_of_service(
3152 &mut out,
3153 &msg,
3154 question.entry_name(),
3155 service,
3156 qtype,
3157 intf_addrs,
3158 );
3159 }
3160 }
3161
3162 if out.answers_count() > 0 {
3163 debug!("sending response on intf {}", &intf.name);
3164 out.set_id(msg.id());
3165 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3166 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port)
3167 {
3168 let invalid_intf_addr = HashSet::from([intf_addr]);
3169 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3170 }
3171
3172 let if_name = intf.name.clone();
3173
3174 self.increase_counter(Counter::Respond, 1);
3175 self.notify_monitors(DaemonEvent::Respond(if_name));
3176 }
3177
3178 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3179 }
3180
3181 fn increase_counter(&mut self, counter: Counter, count: i64) {
3183 let key = counter.to_string();
3184 match self.counters.get_mut(&key) {
3185 Some(v) => *v += count,
3186 None => {
3187 self.counters.insert(key, count);
3188 }
3189 }
3190 }
3191
3192 fn set_counter(&mut self, counter: Counter, count: i64) {
3194 let key = counter.to_string();
3195 self.counters.insert(key, count);
3196 }
3197
3198 fn signal_sock_drain(&self) {
3199 let mut signal_buf = [0; 1024];
3200
3201 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3203 trace!(
3204 "signal socket recvd: {}",
3205 String::from_utf8_lossy(&signal_buf[0..sz])
3206 );
3207 }
3208 }
3209
3210 fn add_retransmission(&mut self, next_time: u64, command: Command) {
3211 self.retransmissions.push(ReRun { next_time, command });
3212 self.add_timer(next_time);
3213 }
3214
3215 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3218 for (ty_domain, sender) in self.service_queriers.iter() {
3219 if let Some(instances) = expired.get(ty_domain) {
3220 for instance_name in instances {
3221 let event = ServiceEvent::ServiceRemoved(
3222 ty_domain.to_string(),
3223 instance_name.to_string(),
3224 );
3225 match sender.send(event) {
3226 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3227 Err(e) => debug!("Failed to send event: {}", e),
3228 }
3229 }
3230 }
3231 }
3232 }
3233
3234 fn exec_command(&mut self, command: Command, repeating: bool) {
3238 trace!("exec_command: {:?} repeating: {}", &command, repeating);
3239 match command {
3240 Command::Browse(ty, next_delay, cache_only, listener) => {
3241 self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3242 }
3243
3244 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3245 self.exec_command_resolve_hostname(
3246 repeating, hostname, next_delay, listener, timeout,
3247 );
3248 }
3249
3250 Command::Register(service_info) => {
3251 self.register_service(*service_info);
3252 self.increase_counter(Counter::Register, 1);
3253 }
3254
3255 Command::RegisterResend(fullname, intf) => {
3256 trace!("register-resend service: {fullname} on {}", &intf);
3257 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3258 self.exec_command_register_resend(fullname, intf)
3259 {
3260 let invalid_intf_addr = HashSet::from([intf_addr]);
3261 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3262 }
3263 }
3264
3265 Command::Unregister(fullname, resp_s) => {
3266 trace!("unregister service {} repeat {}", &fullname, &repeating);
3267 self.exec_command_unregister(repeating, fullname, resp_s);
3268 }
3269
3270 Command::UnregisterResend(packet, if_index, is_ipv4) => {
3271 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3272 }
3273
3274 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3275
3276 Command::StopResolveHostname(hostname) => {
3277 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3278 }
3279
3280 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3281
3282 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3283
3284 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3285 Ok(()) => trace!("Sent status to the client"),
3286 Err(e) => debug!("Failed to send status: {}", e),
3287 },
3288
3289 Command::Monitor(resp_s) => {
3290 self.monitors.push(resp_s);
3291 }
3292
3293 Command::SetOption(daemon_opt) => {
3294 self.process_set_option(daemon_opt);
3295 }
3296
3297 Command::GetOption(resp_s) => {
3298 let val = DaemonOptionVal {
3299 _service_name_len_max: self.service_name_len_max,
3300 ip_check_interval: self.ip_check_interval,
3301 };
3302 if let Err(e) = resp_s.send(val) {
3303 debug!("Failed to send options: {}", e);
3304 }
3305 }
3306
3307 Command::Verify(instance_fullname, timeout) => {
3308 self.exec_command_verify(instance_fullname, timeout, repeating);
3309 }
3310
3311 Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3312 for intf_addr in invalid_intf_addrs {
3313 self.del_interface_addr(&intf_addr);
3314 }
3315
3316 self.check_ip_changes();
3317 }
3318
3319 _ => {
3320 debug!("unexpected command: {:?}", &command);
3321 }
3322 }
3323 }
3324
3325 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3326 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3327 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3328 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3329 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3330 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3331 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3332 self.set_counter(Counter::Timer, self.timers.len() as i64);
3333
3334 let dns_registry_probe_count: usize = self
3335 .dns_registry_map
3336 .values()
3337 .map(|r| r.probing.len())
3338 .sum();
3339 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3340
3341 let dns_registry_active_count: usize = self
3342 .dns_registry_map
3343 .values()
3344 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3345 .sum();
3346 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3347
3348 let dns_registry_timer_count: usize = self
3349 .dns_registry_map
3350 .values()
3351 .map(|r| r.new_timers.len())
3352 .sum();
3353 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3354
3355 let dns_registry_name_change_count: usize = self
3356 .dns_registry_map
3357 .values()
3358 .map(|r| r.name_changes.len())
3359 .sum();
3360 self.set_counter(
3361 Counter::DnsRegistryNameChange,
3362 dns_registry_name_change_count as i64,
3363 );
3364
3365 if let Err(e) = resp_s.send(self.counters.clone()) {
3367 debug!("Failed to send metrics: {}", e);
3368 }
3369 }
3370
3371 fn exec_command_browse(
3372 &mut self,
3373 repeating: bool,
3374 ty: String,
3375 next_delay: u32,
3376 cache_only: bool,
3377 listener: Sender<ServiceEvent>,
3378 ) {
3379 let pretty_addrs: Vec<String> = self
3380 .my_intfs
3381 .iter()
3382 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3383 .collect();
3384
3385 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3386 "{ty} on {} interfaces [{}]",
3387 pretty_addrs.len(),
3388 pretty_addrs.join(", ")
3389 ))) {
3390 debug!(
3391 "Failed to send SearchStarted({})(repeating:{}): {}",
3392 &ty, repeating, e
3393 );
3394 return;
3395 }
3396
3397 let now = current_time_millis();
3398 if !repeating {
3399 self.service_queriers.insert(ty.clone(), listener.clone());
3403
3404 self.query_cache_for_service(&ty, &listener, now);
3406 }
3407
3408 if cache_only {
3409 match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3411 Ok(()) => debug!("SearchStopped sent for {}", &ty),
3412 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3413 }
3414 return;
3415 }
3416
3417 self.send_query(&ty, RRType::PTR);
3418 self.increase_counter(Counter::Browse, 1);
3419
3420 let next_time = now + (next_delay * 1000) as u64;
3421 let max_delay = 60 * 60;
3422 let delay = cmp::min(next_delay * 2, max_delay);
3423 self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3424 }
3425
3426 fn exec_command_resolve_hostname(
3427 &mut self,
3428 repeating: bool,
3429 hostname: String,
3430 next_delay: u32,
3431 listener: Sender<HostnameResolutionEvent>,
3432 timeout: Option<u64>,
3433 ) {
3434 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3435 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3436 "{} on addrs {:?}",
3437 &hostname, &addr_list
3438 ))) {
3439 debug!(
3440 "Failed to send ResolveStarted({})(repeating:{}): {}",
3441 &hostname, repeating, e
3442 );
3443 return;
3444 }
3445 if !repeating {
3446 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3447 self.query_cache_for_hostname(&hostname, listener.clone());
3449 }
3450
3451 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3452 self.increase_counter(Counter::ResolveHostname, 1);
3453
3454 let now = current_time_millis();
3455 let next_time = now + u64::from(next_delay) * 1000;
3456 let max_delay = 60 * 60;
3457 let delay = cmp::min(next_delay * 2, max_delay);
3458
3459 if self
3461 .hostname_resolvers
3462 .get(&hostname)
3463 .and_then(|(_sender, timeout)| *timeout)
3464 .map(|timeout| next_time < timeout)
3465 .unwrap_or(true)
3466 {
3467 self.add_retransmission(
3468 next_time,
3469 Command::ResolveHostname(hostname, delay, listener, None),
3470 );
3471 }
3472 }
3473
3474 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3475 let pending_query = self.query_unresolved(&instance);
3476 let max_try = 3;
3477 if pending_query && try_count < max_try {
3478 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3481 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3482 }
3483 }
3484
3485 fn exec_command_unregister(
3486 &mut self,
3487 repeating: bool,
3488 fullname: String,
3489 resp_s: Sender<UnregisterStatus>,
3490 ) {
3491 let response = match self.my_services.remove_entry(&fullname) {
3492 None => {
3493 debug!("unregister: cannot find such service {}", &fullname);
3494 UnregisterStatus::NotFound
3495 }
3496 Some((_k, info)) => {
3497 let mut timers = Vec::new();
3498
3499 for (if_index, intf) in self.my_intfs.iter() {
3500 if let Some(sock) = self.ipv4_sock.as_ref() {
3501 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3502 if !repeating && !packet.is_empty() {
3504 let next_time = current_time_millis() + 120;
3505 self.retransmissions.push(ReRun {
3506 next_time,
3507 command: Command::UnregisterResend(packet, *if_index, true),
3508 });
3509 timers.push(next_time);
3510 }
3511 }
3512
3513 if let Some(sock) = self.ipv6_sock.as_ref() {
3515 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3516 if !repeating && !packet.is_empty() {
3517 let next_time = current_time_millis() + 120;
3518 self.retransmissions.push(ReRun {
3519 next_time,
3520 command: Command::UnregisterResend(packet, *if_index, false),
3521 });
3522 timers.push(next_time);
3523 }
3524 }
3525 }
3526
3527 for t in timers {
3528 self.add_timer(t);
3529 }
3530
3531 self.increase_counter(Counter::Unregister, 1);
3532 UnregisterStatus::OK
3533 }
3534 };
3535 if let Err(e) = resp_s.send(response) {
3536 debug!("unregister: failed to send response: {}", e);
3537 }
3538 }
3539
3540 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3541 let Some(intf) = self.my_intfs.get(&if_index) else {
3542 return;
3543 };
3544 let sock_opt = if is_ipv4 {
3545 &self.ipv4_sock
3546 } else {
3547 &self.ipv6_sock
3548 };
3549 let Some(sock) = sock_opt else {
3550 return;
3551 };
3552
3553 let if_addr = if is_ipv4 {
3554 match intf.next_ifaddr_v4() {
3555 Some(addr) => addr,
3556 None => return,
3557 }
3558 } else {
3559 match intf.next_ifaddr_v6() {
3560 Some(addr) => addr,
3561 None => return,
3562 }
3563 };
3564
3565 debug!("UnregisterResend from {:?}", if_addr);
3566 multicast_on_intf(
3567 &packet[..],
3568 &intf.name,
3569 intf.index,
3570 if_addr,
3571 &sock.pktinfo,
3572 self.port,
3573 );
3574
3575 self.increase_counter(Counter::UnregisterResend, 1);
3576 }
3577
3578 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3579 match self.service_queriers.remove_entry(&ty_domain) {
3580 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3581 Some((ty, sender)) => {
3582 trace!("StopBrowse: removed queryer for {}", &ty);
3584 let mut i = 0;
3585 while i < self.retransmissions.len() {
3586 if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3587 if t == &ty {
3588 self.retransmissions.remove(i);
3589 trace!("StopBrowse: removed retransmission for {}", &ty);
3590 continue;
3591 }
3592 }
3593 i += 1;
3594 }
3595
3596 self.cache.remove_service_type(&ty_domain);
3598
3599 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3601 Ok(()) => trace!("Sent SearchStopped to the listener"),
3602 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3603 }
3604 }
3605 }
3606 }
3607
3608 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3609 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3610 trace!("StopResolve: removed queryer for {}", &host);
3612 let mut i = 0;
3613 while i < self.retransmissions.len() {
3614 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3615 if t == &host {
3616 self.retransmissions.remove(i);
3617 trace!("StopResolve: removed retransmission for {}", &host);
3618 continue;
3619 }
3620 }
3621 i += 1;
3622 }
3623
3624 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3626 Ok(()) => trace!("Sent SearchStopped to the listener"),
3627 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3628 }
3629 }
3630 }
3631
3632 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3633 let Some(info) = self.my_services.get_mut(&fullname) else {
3634 trace!("announce: cannot find such service {}", &fullname);
3635 return Ok(());
3636 };
3637
3638 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3639 return Ok(());
3640 };
3641
3642 let Some(intf) = self.my_intfs.get(&if_index) else {
3643 return Ok(());
3644 };
3645
3646 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3647 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3648 } else {
3649 false
3650 };
3651 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3652 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3653 } else {
3654 false
3655 };
3656
3657 if announced_v4 || announced_v6 {
3658 let hostname = dns_registry.resolve_name(info.get_hostname());
3659 let service_name = dns_registry.resolve_name(&fullname).to_string();
3660
3661 debug!("resend: announce service {service_name} on {}", intf.name);
3662
3663 notify_monitors(
3664 &mut self.monitors,
3665 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3666 );
3667 info.set_status(if_index, ServiceStatus::Announced);
3668 } else {
3669 debug!("register-resend should not fail");
3670 }
3671
3672 self.increase_counter(Counter::RegisterResend, 1);
3673 Ok(())
3674 }
3675
3676 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3677 let now = current_time_millis();
3687 let expire_at = if repeating {
3688 None
3689 } else {
3690 Some(now + timeout.as_millis() as u64)
3691 };
3692
3693 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3695
3696 if !record_vec.is_empty() {
3697 let query_vec: Vec<(&str, RRType)> = record_vec
3698 .iter()
3699 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3700 .collect();
3701 self.send_query_vec(&query_vec);
3702
3703 if let Some(new_expire) = expire_at {
3704 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3708 }
3709 }
3710 }
3711
3712 fn refresh_active_services(&mut self) {
3714 let mut query_ptr_count = 0;
3715 let mut query_srv_count = 0;
3716 let mut new_timers = HashSet::new();
3717 let mut query_addr_count = 0;
3718
3719 for (ty_domain, _sender) in self.service_queriers.iter() {
3720 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3721 if !refreshed_timers.is_empty() {
3722 trace!("sending refresh query for PTR: {}", ty_domain);
3723 self.send_query(ty_domain, RRType::PTR);
3724 query_ptr_count += 1;
3725 new_timers.extend(refreshed_timers);
3726 }
3727
3728 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3729 for (instance, types) in instances {
3730 trace!("sending refresh query for: {}", &instance);
3731 let query_vec = types
3732 .into_iter()
3733 .map(|ty| (instance.as_str(), ty))
3734 .collect::<Vec<_>>();
3735 self.send_query_vec(&query_vec);
3736 query_srv_count += 1;
3737 }
3738 new_timers.extend(timers);
3739 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3740 for hostname in hostnames.iter() {
3741 trace!("sending refresh queries for A and AAAA: {}", hostname);
3742 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3743 query_addr_count += 2;
3744 }
3745 new_timers.extend(timers);
3746 }
3747
3748 for timer in new_timers {
3749 self.add_timer(timer);
3750 }
3751
3752 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3753 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3754 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3755 }
3756}
3757
3758fn add_answer_of_service(
3760 out: &mut DnsOutgoing,
3761 msg: &DnsIncoming,
3762 entry_name: &str,
3763 service: &ServiceInfo,
3764 qtype: RRType,
3765 intf_addrs: Vec<IpAddr>,
3766) {
3767 if qtype == RRType::SRV || qtype == RRType::ANY {
3768 out.add_answer(
3769 msg,
3770 DnsSrv::new(
3771 entry_name,
3772 CLASS_IN | CLASS_CACHE_FLUSH,
3773 service.get_host_ttl(),
3774 service.get_priority(),
3775 service.get_weight(),
3776 service.get_port(),
3777 service.get_hostname().to_string(),
3778 ),
3779 );
3780 }
3781
3782 if qtype == RRType::TXT || qtype == RRType::ANY {
3783 out.add_answer(
3784 msg,
3785 DnsTxt::new(
3786 entry_name,
3787 CLASS_IN | CLASS_CACHE_FLUSH,
3788 service.get_other_ttl(),
3789 service.generate_txt(),
3790 ),
3791 );
3792 }
3793
3794 if qtype == RRType::SRV {
3795 for address in intf_addrs {
3796 out.add_additional_answer(DnsAddress::new(
3797 service.get_hostname(),
3798 ip_address_rr_type(&address),
3799 CLASS_IN | CLASS_CACHE_FLUSH,
3800 service.get_host_ttl(),
3801 address,
3802 InterfaceId::default(),
3803 ));
3804 }
3805 }
3806}
3807
3808#[derive(Clone, Debug)]
3811#[non_exhaustive]
3812pub enum ServiceEvent {
3813 SearchStarted(String),
3815
3816 ServiceFound(String, String),
3818
3819 ServiceResolved(Box<ResolvedService>),
3821
3822 ServiceRemoved(String, String),
3824
3825 SearchStopped(String),
3827}
3828
3829#[derive(Clone, Debug)]
3832#[non_exhaustive]
3833pub enum HostnameResolutionEvent {
3834 SearchStarted(String),
3836 AddressesFound(String, HashSet<ScopedIp>),
3838 AddressesRemoved(String, HashSet<ScopedIp>),
3840 SearchTimeout(String),
3842 SearchStopped(String),
3844}
3845
3846#[derive(Clone, Debug)]
3849#[non_exhaustive]
3850pub enum DaemonEvent {
3851 Announce(String, String),
3853
3854 Error(Error),
3856
3857 IpAdd(IpAddr),
3859
3860 IpDel(IpAddr),
3862
3863 NameChange(DnsNameChange),
3866
3867 Respond(String),
3869}
3870
3871#[derive(Clone, Debug)]
3874pub struct DnsNameChange {
3875 pub original: String,
3877
3878 pub new_name: String,
3888
3889 pub rr_type: RRType,
3891
3892 pub intf_name: String,
3894}
3895
3896#[derive(Debug)]
3898enum Command {
3899 Browse(String, u32, bool, Sender<ServiceEvent>),
3901
3902 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(Box<ServiceInfo>),
3907
3908 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3929
3930 GetStatus(Sender<DaemonStatus>),
3932
3933 Monitor(Sender<DaemonEvent>),
3935
3936 SetOption(DaemonOption),
3937
3938 GetOption(Sender<DaemonOptionVal>),
3939
3940 Verify(String, Duration),
3945
3946 InvalidIntfAddrs(HashSet<Interface>),
3948
3949 Exit(Sender<DaemonStatus>),
3950}
3951
3952impl fmt::Display for Command {
3953 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3954 match self {
3955 Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3956 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3957 Self::Exit(_) => write!(f, "Command Exit"),
3958 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3959 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3960 Self::Monitor(_) => write!(f, "Command Monitor"),
3961 Self::Register(_) => write!(f, "Command Register"),
3962 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3963 Self::SetOption(_) => write!(f, "Command SetOption"),
3964 Self::GetOption(_) => write!(f, "Command GetOption"),
3965 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3966 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3967 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3968 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3969 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3970 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3971 Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
3972 }
3973 }
3974}
3975
3976struct DaemonOptionVal {
3977 _service_name_len_max: u8,
3978 ip_check_interval: u64,
3979}
3980
3981#[derive(Debug)]
3982enum DaemonOption {
3983 ServiceNameLenMax(u8),
3984 IpCheckInterval(u64),
3985 EnableInterface(Vec<IfKind>),
3986 DisableInterface(Vec<IfKind>),
3987 MulticastLoopV4(bool),
3988 MulticastLoopV6(bool),
3989 AcceptUnsolicited(bool),
3990 IncludeAppleP2P(bool),
3991 #[cfg(test)]
3992 TestDownInterface(String),
3993 #[cfg(test)]
3994 TestUpInterface(String),
3995}
3996
3997const DOMAIN_LEN: usize = "._tcp.local.".len();
3999
4000fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
4002 if ty_domain.len() <= DOMAIN_LEN + 1 {
4003 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
4005 }
4006
4007 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
4009 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
4010 }
4011 Ok(())
4012}
4013
4014fn check_domain_suffix(name: &str) -> Result<()> {
4016 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
4017 return Err(e_fmt!(
4018 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
4019 name
4020 ));
4021 }
4022
4023 Ok(())
4024}
4025
4026fn check_service_name(fullname: &str) -> Result<()> {
4034 check_domain_suffix(fullname)?;
4035
4036 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
4037 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
4038
4039 if &name[0..1] != "_" {
4040 return Err(e_fmt!("Service name must start with '_'"));
4041 }
4042
4043 let name = &name[1..];
4044
4045 if name.contains("--") {
4046 return Err(e_fmt!("Service name must not contain '--'"));
4047 }
4048
4049 if name.starts_with('-') || name.ends_with('-') {
4050 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4051 }
4052
4053 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4054 if ascii_count < 1 {
4055 return Err(e_fmt!(
4056 "Service name must contain at least one letter (eg: 'A-Za-z')"
4057 ));
4058 }
4059
4060 Ok(())
4061}
4062
4063fn check_hostname(hostname: &str) -> Result<()> {
4065 if !hostname.ends_with(".local.") {
4066 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4067 }
4068
4069 if hostname == ".local." {
4070 return Err(e_fmt!(
4071 "The part of the hostname before '.local.' cannot be empty"
4072 ));
4073 }
4074
4075 if hostname.len() > 255 {
4076 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4077 }
4078
4079 Ok(())
4080}
4081
4082fn call_service_listener(
4083 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4084 ty_domain: &str,
4085 event: ServiceEvent,
4086) {
4087 if let Some(listener) = listeners_map.get(ty_domain) {
4088 match listener.send(event) {
4089 Ok(()) => trace!("Sent event to listener successfully"),
4090 Err(e) => debug!("Failed to send event: {}", e),
4091 }
4092 }
4093}
4094
4095fn call_hostname_resolution_listener(
4096 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4097 hostname: &str,
4098 event: HostnameResolutionEvent,
4099) {
4100 let hostname_lower = hostname.to_lowercase();
4101 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4102 match listener.send(event) {
4103 Ok(()) => trace!("Sent event to listener successfully"),
4104 Err(e) => debug!("Failed to send event: {}", e),
4105 }
4106 }
4107}
4108
4109fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4113 my_ip_interfaces_inner(with_loopback, false)
4114}
4115
4116fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4117 if_addrs::get_if_addrs()
4118 .unwrap_or_default()
4119 .into_iter()
4120 .filter(|i| {
4121 i.is_oper_up()
4122 && !i.is_p2p()
4123 && (!i.is_loopback() || with_loopback)
4124 && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4125 })
4126 .collect()
4127}
4128
4129fn is_apple_p2p_by_name(name: &str) -> bool {
4132 let p2p_prefixes = ["awdl", "llw"];
4133 p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4134}
4135
4136fn send_dns_outgoing(
4139 out: &DnsOutgoing,
4140 my_intf: &MyIntf,
4141 sock: &PktInfoUdpSocket,
4142 port: u16,
4143) -> MyResult<Vec<Vec<u8>>> {
4144 let if_name = &my_intf.name;
4145
4146 let if_addr = if sock.domain() == Domain::IPV4 {
4147 match my_intf.next_ifaddr_v4() {
4148 Some(addr) => addr,
4149 None => return Ok(vec![]),
4150 }
4151 } else {
4152 match my_intf.next_ifaddr_v6() {
4153 Some(addr) => addr,
4154 None => return Ok(vec![]),
4155 }
4156 };
4157
4158 send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock, port)
4159}
4160
4161fn send_dns_outgoing_impl(
4163 out: &DnsOutgoing,
4164 if_name: &str,
4165 if_index: u32,
4166 if_addr: &IfAddr,
4167 sock: &PktInfoUdpSocket,
4168 port: u16,
4169) -> MyResult<Vec<Vec<u8>>> {
4170 let qtype = if out.is_query() {
4171 "query"
4172 } else {
4173 if out.answers_count() == 0 && out.additionals().is_empty() {
4174 return Ok(vec![]); }
4176 "response"
4177 };
4178 trace!(
4179 "send {}: {} questions {} answers {} authorities {} additional",
4180 qtype,
4181 out.questions().len(),
4182 out.answers_count(),
4183 out.authorities().len(),
4184 out.additionals().len()
4185 );
4186
4187 match if_addr.ip() {
4188 IpAddr::V4(ipv4) => {
4189 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4190 debug!(
4191 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4192 ipv4, e
4193 );
4194 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4196 let intf_addr = Interface {
4197 name: if_name.to_string(),
4198 addr: if_addr.clone(),
4199 index: Some(if_index),
4200 oper_status: if_addrs::IfOperStatus::Down,
4201 is_p2p: false,
4202 #[cfg(windows)]
4203 adapter_name: String::new(),
4204 };
4205 return Err(InternalError::IntfAddrInvalid(intf_addr));
4206 }
4207 return Ok(vec![]); }
4209 }
4210 IpAddr::V6(ipv6) => {
4211 if let Err(e) = sock.set_multicast_if_v6(if_index) {
4212 debug!(
4213 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4214 ipv6, e
4215 );
4216 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4218 let intf_addr = Interface {
4219 name: if_name.to_string(),
4220 addr: if_addr.clone(),
4221 index: Some(if_index),
4222 oper_status: if_addrs::IfOperStatus::Down,
4223 is_p2p: false,
4224 #[cfg(windows)]
4225 adapter_name: String::new(),
4226 };
4227 return Err(InternalError::IntfAddrInvalid(intf_addr));
4228 }
4229 return Ok(vec![]); }
4231 }
4232 }
4233
4234 let packet_list = out.to_data_on_wire();
4235 for packet in packet_list.iter() {
4236 multicast_on_intf(packet, if_name, if_index, if_addr, sock, port);
4237 }
4238 Ok(packet_list)
4239}
4240
4241fn multicast_on_intf(
4243 packet: &[u8],
4244 if_name: &str,
4245 if_index: u32,
4246 if_addr: &IfAddr,
4247 socket: &PktInfoUdpSocket,
4248 port: u16,
4249) {
4250 if packet.len() > MAX_MSG_ABSOLUTE {
4251 debug!("Drop over-sized packet ({})", packet.len());
4252 return;
4253 }
4254
4255 let addr: SocketAddr = match if_addr {
4256 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4257 if_addrs::IfAddr::V6(_) => {
4258 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4259 sock.set_scope_id(if_index); sock.into()
4261 }
4262 };
4263
4264 let sock_addr = addr.into();
4266 match socket.send_to(packet, &sock_addr) {
4267 Ok(sz) => trace!(
4268 "sent out {} bytes on interface {} (idx {}) addr {}",
4269 sz,
4270 if_name,
4271 if_index,
4272 if_addr.ip()
4273 ),
4274 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4275 }
4276}
4277
4278fn valid_instance_name(name: &str) -> bool {
4282 name.split('.').count() >= 5
4283}
4284
4285fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4286 monitors.retain(|sender| {
4287 if let Err(e) = sender.try_send(event.clone()) {
4288 debug!("notify_monitors: try_send: {}", &e);
4289 if matches!(e, TrySendError::Disconnected(_)) {
4290 return false; }
4292 }
4293 true
4294 });
4295}
4296
4297fn prepare_announce(
4300 info: &ServiceInfo,
4301 intf: &MyIntf,
4302 dns_registry: &mut DnsRegistry,
4303 is_ipv4: bool,
4304) -> Option<DnsOutgoing> {
4305 let intf_addrs = if is_ipv4 {
4306 info.get_addrs_on_my_intf_v4(intf)
4307 } else {
4308 info.get_addrs_on_my_intf_v6(intf)
4309 };
4310
4311 if intf_addrs.is_empty() {
4312 debug!(
4313 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4314 &intf.name
4315 );
4316 return None;
4317 }
4318
4319 let service_fullname = dns_registry.resolve_name(info.get_fullname());
4321
4322 debug!(
4323 "prepare to announce service {service_fullname} on {:?}",
4324 &intf_addrs
4325 );
4326
4327 let mut probing_count = 0;
4328 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4329 let create_time = current_time_millis() + fastrand::u64(0..250);
4330
4331 out.add_answer_at_time(
4332 DnsPointer::new(
4333 info.get_type(),
4334 RRType::PTR,
4335 CLASS_IN,
4336 info.get_other_ttl(),
4337 service_fullname.to_string(),
4338 ),
4339 0,
4340 );
4341
4342 if let Some(sub) = info.get_subtype() {
4343 trace!("Adding subdomain {}", sub);
4344 out.add_answer_at_time(
4345 DnsPointer::new(
4346 sub,
4347 RRType::PTR,
4348 CLASS_IN,
4349 info.get_other_ttl(),
4350 service_fullname.to_string(),
4351 ),
4352 0,
4353 );
4354 }
4355
4356 let hostname = dns_registry.resolve_name(info.get_hostname()).to_string();
4358
4359 let mut srv = DnsSrv::new(
4360 info.get_fullname(),
4361 CLASS_IN | CLASS_CACHE_FLUSH,
4362 info.get_host_ttl(),
4363 info.get_priority(),
4364 info.get_weight(),
4365 info.get_port(),
4366 hostname,
4367 );
4368
4369 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4370 srv.get_record_mut().set_new_name(new_name.to_string());
4371 }
4372
4373 if !info.requires_probe()
4374 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4375 {
4376 out.add_answer_at_time(srv, 0);
4377 } else {
4378 probing_count += 1;
4379 }
4380
4381 let mut txt = DnsTxt::new(
4384 info.get_fullname(),
4385 CLASS_IN | CLASS_CACHE_FLUSH,
4386 info.get_other_ttl(),
4387 info.generate_txt(),
4388 );
4389
4390 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4391 txt.get_record_mut().set_new_name(new_name.to_string());
4392 }
4393
4394 if !info.requires_probe()
4395 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4396 {
4397 out.add_answer_at_time(txt, 0);
4398 } else {
4399 probing_count += 1;
4400 }
4401
4402 let hostname = info.get_hostname();
4405 for address in intf_addrs {
4406 let mut dns_addr = DnsAddress::new(
4407 hostname,
4408 ip_address_rr_type(&address),
4409 CLASS_IN | CLASS_CACHE_FLUSH,
4410 info.get_host_ttl(),
4411 address,
4412 intf.into(),
4413 );
4414
4415 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4416 dns_addr.get_record_mut().set_new_name(new_name.to_string());
4417 }
4418
4419 if !info.requires_probe()
4420 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4421 {
4422 out.add_answer_at_time(dns_addr, 0);
4423 } else {
4424 probing_count += 1;
4425 }
4426 }
4427
4428 if probing_count > 0 {
4429 return None;
4430 }
4431
4432 Some(out)
4433}
4434
4435fn announce_service_on_intf(
4438 dns_registry: &mut DnsRegistry,
4439 info: &ServiceInfo,
4440 intf: &MyIntf,
4441 sock: &PktInfoUdpSocket,
4442 port: u16,
4443) -> MyResult<bool> {
4444 let is_ipv4 = sock.domain() == Domain::IPV4;
4445 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4446 let _ = send_dns_outgoing(&out, intf, sock, port)?;
4447 return Ok(true);
4448 }
4449
4450 Ok(false)
4451}
4452
4453fn name_change(original: &str) -> String {
4461 let mut parts: Vec<_> = original.split('.').collect();
4462 let Some(first_part) = parts.get_mut(0) else {
4463 return format!("{original} (2)");
4464 };
4465
4466 let mut new_name = format!("{first_part} (2)");
4467
4468 if let Some(paren_pos) = first_part.rfind(" (") {
4470 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4472 let absolute_end_pos = paren_pos + end_paren;
4473 if absolute_end_pos == first_part.len() - 1 {
4475 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4478 let base_name = &first_part[..paren_pos];
4479 new_name = format!("{} ({})", base_name, number + 1)
4480 }
4481 }
4482 }
4483 }
4484
4485 *first_part = &new_name;
4486 parts.join(".")
4487}
4488
4489fn hostname_change(original: &str) -> String {
4497 let mut parts: Vec<_> = original.split('.').collect();
4498 let Some(first_part) = parts.get_mut(0) else {
4499 return format!("{original}-2");
4500 };
4501
4502 let mut new_name = format!("{first_part}-2");
4503
4504 if let Some(hyphen_pos) = first_part.rfind('-') {
4506 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4508 let base_name = &first_part[..hyphen_pos];
4509 new_name = format!("{}-{}", base_name, number + 1);
4510 }
4511 }
4512
4513 *first_part = &new_name;
4514 parts.join(".")
4515}
4516
4517fn check_probing(
4520 dns_registry: &mut DnsRegistry,
4521 timers: &mut BinaryHeap<Reverse<u64>>,
4522 now: u64,
4523) -> (DnsOutgoing, Vec<String>) {
4524 let mut expired_probes = Vec::new();
4525 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4526
4527 for (name, probe) in dns_registry.probing.iter_mut() {
4528 if now >= probe.next_send {
4529 if probe.expired(now) {
4530 expired_probes.push(name.clone());
4532 } else {
4533 out.add_question(name, RRType::ANY);
4534
4535 for record in probe.records.iter() {
4543 out.add_authority(record.clone());
4544 }
4545
4546 probe.update_next_send(now);
4547
4548 timers.push(Reverse(probe.next_send));
4550 }
4551 }
4552 }
4553
4554 (out, expired_probes)
4555}
4556
4557fn handle_expired_probes(
4562 expired_probes: Vec<String>,
4563 intf_name: &str,
4564 dns_registry: &mut DnsRegistry,
4565 monitors: &mut Vec<Sender<DaemonEvent>>,
4566) -> HashSet<String> {
4567 let mut waiting_services = HashSet::new();
4568
4569 for name in expired_probes {
4570 let Some(probe) = dns_registry.probing.remove(&name) else {
4571 continue;
4572 };
4573
4574 for record in probe.records.iter() {
4576 if let Some(new_name) = record.get_record().get_new_name() {
4577 dns_registry
4578 .name_changes
4579 .insert(name.clone(), new_name.to_string());
4580
4581 let event = DnsNameChange {
4582 original: record.get_record().get_original_name().to_string(),
4583 new_name: new_name.to_string(),
4584 rr_type: record.get_type(),
4585 intf_name: intf_name.to_string(),
4586 };
4587 debug!("Name change event: {:?}", &event);
4588 notify_monitors(monitors, DaemonEvent::NameChange(event));
4589 }
4590 }
4591
4592 debug!(
4594 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4595 probe.records.len(),
4596 probe.waiting_services.len(),
4597 );
4598
4599 if !probe.records.is_empty() {
4601 match dns_registry.active.get_mut(&name) {
4602 Some(records) => {
4603 records.extend(probe.records);
4604 }
4605 None => {
4606 dns_registry.active.insert(name, probe.records);
4607 }
4608 }
4609
4610 waiting_services.extend(probe.waiting_services);
4611 }
4612 }
4613
4614 waiting_services
4615}
4616
4617fn resolve_addr_to_index(if_kind: IfKind, interfaces: &[Interface]) -> IfKind {
4619 if let IfKind::Addr(addr) = &if_kind {
4620 if let Some(intf) = interfaces.iter().find(|intf| &intf.ip() == addr) {
4621 let if_index = intf.index.unwrap_or(0);
4622 return if addr.is_ipv4() {
4623 IfKind::IndexV4(if_index)
4624 } else {
4625 IfKind::IndexV6(if_index)
4626 };
4627 }
4628 }
4629 if_kind
4630}
4631
4632#[cfg(test)]
4633mod tests {
4634 use super::{
4635 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4636 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4637 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, MDNS_PORT,
4638 };
4639 use crate::{
4640 dns_parser::{
4641 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4642 FLAGS_AA, FLAGS_QR_RESPONSE,
4643 },
4644 service_daemon::{add_answer_of_service, check_hostname},
4645 };
4646 use std::time::{Duration, SystemTime};
4647 use test_log::test;
4648
4649 #[test]
4650 fn test_instance_name() {
4651 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4652 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4653 assert!(!valid_instance_name("_printer._tcp.local."));
4654 }
4655
4656 #[test]
4657 fn test_check_service_name_length() {
4658 let result = check_service_name_length("_tcp", 100);
4659 assert!(result.is_err());
4660 if let Err(e) = result {
4661 println!("{}", e);
4662 }
4663 }
4664
4665 #[test]
4666 fn test_check_hostname() {
4667 for hostname in &[
4669 "my_host.local.",
4670 &("A".repeat(255 - ".local.".len()) + ".local."),
4671 ] {
4672 let result = check_hostname(hostname);
4673 assert!(result.is_ok());
4674 }
4675
4676 for hostname in &[
4678 "my_host.local",
4679 ".local.",
4680 &("A".repeat(256 - ".local.".len()) + ".local."),
4681 ] {
4682 let result = check_hostname(hostname);
4683 assert!(result.is_err());
4684 if let Err(e) = result {
4685 println!("{}", e);
4686 }
4687 }
4688 }
4689
4690 #[test]
4691 fn test_check_domain_suffix() {
4692 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4693 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4694 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4695 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4696 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4697 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4698 }
4699
4700 #[test]
4701 fn test_service_with_temporarily_invalidated_ptr() {
4702 let d = ServiceDaemon::new().expect("Failed to create daemon");
4704
4705 let service = "_test_inval_ptr._udp.local.";
4706 let host_name = "my_host_tmp_invalidated_ptr.local.";
4707 let intfs: Vec<_> = my_ip_interfaces(false);
4708 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4709 let port = 5201;
4710 let my_service =
4711 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4712 .expect("invalid service info")
4713 .enable_addr_auto();
4714 let result = d.register(my_service.clone());
4715 assert!(result.is_ok());
4716
4717 let browse_chan = d.browse(service).unwrap();
4719 let timeout = Duration::from_secs(2);
4720 let mut resolved = false;
4721
4722 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4723 match event {
4724 ServiceEvent::ServiceResolved(info) => {
4725 resolved = true;
4726 println!("Resolved a service of {}", &info.fullname);
4727 break;
4728 }
4729 e => {
4730 println!("Received event {:?}", e);
4731 }
4732 }
4733 }
4734
4735 assert!(resolved);
4736
4737 println!("Stopping browse of {}", service);
4738 d.stop_browse(service).unwrap();
4741
4742 let mut stopped = false;
4747 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4748 match event {
4749 ServiceEvent::SearchStopped(_) => {
4750 stopped = true;
4751 println!("Stopped browsing service");
4752 break;
4753 }
4754 e => {
4758 println!("Received event {:?}", e);
4759 }
4760 }
4761 }
4762
4763 assert!(stopped);
4764
4765 let invalidate_ptr_packet = DnsPointer::new(
4767 my_service.get_type(),
4768 RRType::PTR,
4769 CLASS_IN,
4770 0,
4771 my_service.get_fullname().to_string(),
4772 );
4773
4774 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4775 packet_buffer.add_additional_answer(invalidate_ptr_packet);
4776
4777 for intf in intfs {
4778 let sock = _new_socket_bind(&intf, true).unwrap();
4779 send_dns_outgoing_impl(
4780 &packet_buffer,
4781 &intf.name,
4782 intf.index.unwrap_or(0),
4783 &intf.addr,
4784 &sock.pktinfo,
4785 MDNS_PORT,
4786 )
4787 .unwrap();
4788 }
4789
4790 println!(
4791 "Sent PTR record invalidation. Starting second browse for {}",
4792 service
4793 );
4794
4795 let browse_chan = d.browse(service).unwrap();
4797
4798 resolved = false;
4799 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4800 match event {
4801 ServiceEvent::ServiceResolved(info) => {
4802 resolved = true;
4803 println!("Resolved a service of {}", &info.fullname);
4804 break;
4805 }
4806 e => {
4807 println!("Received event {:?}", e);
4808 }
4809 }
4810 }
4811
4812 assert!(resolved);
4813 d.shutdown().unwrap();
4814 }
4815
4816 #[test]
4817 fn test_expired_srv() {
4818 let service_type = "_expired-srv._udp.local.";
4820 let instance = "test_instance";
4821 let host_name = "expired_srv_host.local.";
4822 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4823 .unwrap()
4824 .enable_addr_auto();
4825 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
4830
4831 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4833 let result = mdns_server.register(my_service);
4834 assert!(result.is_ok());
4835
4836 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4837 let browse_chan = mdns_client.browse(service_type).unwrap();
4838 let timeout = Duration::from_secs(2);
4839 let mut resolved = false;
4840
4841 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4842 if let ServiceEvent::ServiceResolved(info) = event {
4843 resolved = true;
4844 println!("Resolved a service of {}", &info.fullname);
4845 break;
4846 }
4847 }
4848
4849 assert!(resolved);
4850
4851 mdns_server.shutdown().unwrap();
4853
4854 let expire_timeout = Duration::from_secs(new_ttl as u64);
4856 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4857 if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4858 println!("Service removed: {}: {}", &service_type, &full_name);
4859 break;
4860 }
4861 }
4862 }
4863
4864 #[test]
4865 fn test_hostname_resolution_address_removed() {
4866 let server = ServiceDaemon::new().expect("Failed to create server");
4868 let hostname = "addr_remove_host._tcp.local.";
4869 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4870 .iter()
4871 .find(|iface| iface.ip().is_ipv4())
4872 .map(|iface| iface.into())
4873 .unwrap();
4874
4875 let mut my_service = ServiceInfo::new(
4876 "_host_res_test._tcp.local.",
4877 "my_instance",
4878 hostname,
4879 service_ip_addr.to_ip_addr(),
4880 1234,
4881 None,
4882 )
4883 .expect("invalid service info");
4884
4885 let addr_ttl = 2;
4887 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
4890
4891 let client = ServiceDaemon::new().expect("Failed to create client");
4893 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4894 let resolved = loop {
4895 match event_receiver.recv() {
4896 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4897 assert!(found_hostname == hostname);
4898 assert!(addresses.contains(&service_ip_addr));
4899 println!("address found: {:?}", &addresses);
4900 break true;
4901 }
4902 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4903 Ok(_event) => {}
4904 Err(_) => break false,
4905 }
4906 };
4907
4908 assert!(resolved);
4909
4910 server.shutdown().unwrap();
4912
4913 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4915 let removed = loop {
4916 match event_receiver.recv_timeout(timeout) {
4917 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4918 assert!(removed_host == hostname);
4919 assert!(addresses.contains(&service_ip_addr));
4920
4921 println!(
4922 "address removed: hostname: {} addresses: {:?}",
4923 &hostname, &addresses
4924 );
4925 break true;
4926 }
4927 Ok(_event) => {}
4928 Err(_) => {
4929 break false;
4930 }
4931 }
4932 };
4933
4934 assert!(removed);
4935
4936 client.shutdown().unwrap();
4937 }
4938
4939 #[test]
4940 fn test_refresh_ptr() {
4941 let service_type = "_refresh-ptr._udp.local.";
4943 let instance = "test_instance";
4944 let host_name = "refresh_ptr_host.local.";
4945 let service_ip_addr = my_ip_interfaces(false)
4946 .iter()
4947 .find(|iface| iface.ip().is_ipv4())
4948 .map(|iface| iface.ip())
4949 .unwrap();
4950
4951 let mut my_service = ServiceInfo::new(
4952 service_type,
4953 instance,
4954 host_name,
4955 service_ip_addr,
4956 5023,
4957 None,
4958 )
4959 .unwrap();
4960
4961 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4963
4964 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4966 let result = mdns_server.register(my_service);
4967 assert!(result.is_ok());
4968
4969 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4970 let browse_chan = mdns_client.browse(service_type).unwrap();
4971 let timeout = Duration::from_millis(1500); let mut resolved = false;
4973
4974 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4976 if let ServiceEvent::ServiceResolved(info) = event {
4977 resolved = true;
4978 println!("Resolved a service of {}", &info.fullname);
4979 break;
4980 }
4981 }
4982
4983 assert!(resolved);
4984
4985 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4987 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4988 println!("event: {:?}", &event);
4989 }
4990
4991 let metrics_chan = mdns_client.get_metrics().unwrap();
4993 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4994 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4995 assert_eq!(ptr_refresh_counter, 1);
4996 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4997 assert_eq!(srvtxt_refresh_counter, 1);
4998
4999 mdns_server.shutdown().unwrap();
5001 mdns_client.shutdown().unwrap();
5002 }
5003
5004 #[test]
5005 fn test_name_change() {
5006 assert_eq!(name_change("foo.local."), "foo (2).local.");
5007 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5008 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5009 assert_eq!(name_change("foo"), "foo (2)");
5010 assert_eq!(name_change("foo (2)"), "foo (3)");
5011 assert_eq!(name_change(""), " (2)");
5012
5013 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
5018
5019 #[test]
5020 fn test_hostname_change() {
5021 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5022 assert_eq!(hostname_change("foo"), "foo-2");
5023 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5024 assert_eq!(hostname_change("foo-9"), "foo-10");
5025 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5026 }
5027
5028 #[test]
5029 fn test_add_answer_txt_ttl() {
5030 let service_type = "_test_add_answer._udp.local.";
5032 let instance = "test_instance";
5033 let host_name = "add_answer_host.local.";
5034 let service_intf = my_ip_interfaces(false)
5035 .into_iter()
5036 .find(|iface| iface.ip().is_ipv4())
5037 .unwrap();
5038 let service_ip_addr = service_intf.ip();
5039 let my_service = ServiceInfo::new(
5040 service_type,
5041 instance,
5042 host_name,
5043 service_ip_addr,
5044 5023,
5045 None,
5046 )
5047 .unwrap();
5048
5049 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5051
5052 let mut dummy_data = out.to_data_on_wire();
5054 let interface_id = InterfaceId::from(&service_intf);
5055 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5056
5057 let if_addrs = vec![service_intf.ip()];
5059 add_answer_of_service(
5060 &mut out,
5061 &incoming,
5062 instance,
5063 &my_service,
5064 RRType::TXT,
5065 if_addrs,
5066 );
5067
5068 assert!(
5070 out.answers_count() > 0,
5071 "No answers added to the outgoing message"
5072 );
5073
5074 let answer = out._answers().first().unwrap();
5076 assert_eq!(answer.0.get_type(), RRType::TXT);
5077
5078 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5080 }
5081
5082 #[test]
5083 fn test_interface_flip() {
5084 let ty_domain = "_intf-flip._udp.local.";
5086 let host_name = "intf_flip.local.";
5087 let now = SystemTime::now()
5088 .duration_since(SystemTime::UNIX_EPOCH)
5089 .unwrap();
5090 let instance_name = now.as_micros().to_string(); let port = 5200;
5092
5093 let (ip_addr1, intf_name) = my_ip_interfaces(false)
5095 .iter()
5096 .find(|iface| iface.ip().is_ipv4())
5097 .map(|iface| (iface.ip(), iface.name.clone()))
5098 .unwrap();
5099
5100 println!("Using interface {} with IP {}", intf_name, ip_addr1);
5101
5102 let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5104 .expect("valid service info");
5105 let server1 = ServiceDaemon::new().expect("failed to start server");
5106 server1
5107 .register(service1)
5108 .expect("Failed to register service1");
5109
5110 std::thread::sleep(Duration::from_secs(2));
5112
5113 let client = ServiceDaemon::new().expect("failed to start client");
5115
5116 let receiver = client.browse(ty_domain).unwrap();
5117
5118 let timeout = Duration::from_secs(3);
5119 let mut got_data = false;
5120
5121 while let Ok(event) = receiver.recv_timeout(timeout) {
5122 if let ServiceEvent::ServiceResolved(_) = event {
5123 println!("Received ServiceResolved event");
5124 got_data = true;
5125 break;
5126 }
5127 }
5128
5129 assert!(got_data, "Should receive ServiceResolved event");
5130
5131 client.set_ip_check_interval(1).unwrap();
5133
5134 println!("Shutting down interface {}", &intf_name);
5136 client.test_down_interface(&intf_name).unwrap();
5137
5138 let mut got_removed = false;
5139
5140 while let Ok(event) = receiver.recv_timeout(timeout) {
5141 if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5142 got_removed = true;
5143 println!("removed: {ty_domain} : {instance}");
5144 break;
5145 }
5146 }
5147 assert!(got_removed, "Should receive ServiceRemoved event");
5148
5149 println!("Bringing up interface {}", &intf_name);
5150 client.test_up_interface(&intf_name).unwrap();
5151 let mut got_data = false;
5152 while let Ok(event) = receiver.recv_timeout(timeout) {
5153 if let ServiceEvent::ServiceResolved(resolved) = event {
5154 got_data = true;
5155 println!("Received ServiceResolved: {:?}", resolved);
5156 break;
5157 }
5158 }
5159 assert!(
5160 got_data,
5161 "Should receive ServiceResolved event after interface is back up"
5162 );
5163
5164 server1.shutdown().unwrap();
5165 client.shutdown().unwrap();
5166 }
5167
5168 #[test]
5169 fn test_cache_only() {
5170 let service_type = "_cache_only._udp.local.";
5172 let instance = "test_instance";
5173 let host_name = "cache_only_host.local.";
5174 let service_ip_addr = my_ip_interfaces(false)
5175 .iter()
5176 .find(|iface| iface.ip().is_ipv4())
5177 .map(|iface| iface.ip())
5178 .unwrap();
5179
5180 let mut my_service = ServiceInfo::new(
5181 service_type,
5182 instance,
5183 host_name,
5184 service_ip_addr,
5185 5023,
5186 None,
5187 )
5188 .unwrap();
5189
5190 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
5192
5193 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5194
5195 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5198 std::thread::sleep(Duration::from_secs(2));
5199
5200 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5202 let result = mdns_server.register(my_service);
5203 assert!(result.is_ok());
5204
5205 let timeout = Duration::from_millis(1500); let mut resolved = false;
5207
5208 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5210 if let ServiceEvent::ServiceResolved(info) = event {
5211 resolved = true;
5212 println!("Resolved a service of {}", &info.get_fullname());
5213 break;
5214 }
5215 }
5216
5217 assert!(resolved);
5218
5219 mdns_server.shutdown().unwrap();
5221 mdns_client.shutdown().unwrap();
5222 }
5223
5224 #[test]
5225 fn test_cache_only_unsolicited() {
5226 let service_type = "_c_unsolicit._udp.local.";
5227 let instance = "test_instance";
5228 let host_name = "c_unsolicit_host.local.";
5229 let service_ip_addr = my_ip_interfaces(false)
5230 .iter()
5231 .find(|iface| iface.ip().is_ipv4())
5232 .map(|iface| iface.ip())
5233 .unwrap();
5234
5235 let my_service = ServiceInfo::new(
5236 service_type,
5237 instance,
5238 host_name,
5239 service_ip_addr,
5240 5023,
5241 None,
5242 )
5243 .unwrap();
5244
5245 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5247 let result = mdns_server.register(my_service);
5248 assert!(result.is_ok());
5249
5250 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5251 mdns_client.accept_unsolicited(true).unwrap();
5252
5253 std::thread::sleep(Duration::from_secs(2));
5256 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5257 let timeout = Duration::from_millis(1500); let mut resolved = false;
5259
5260 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5262 if let ServiceEvent::ServiceResolved(info) = event {
5263 resolved = true;
5264 println!("Resolved a service of {}", &info.get_fullname());
5265 break;
5266 }
5267 }
5268
5269 assert!(resolved);
5270
5271 mdns_server.shutdown().unwrap();
5273 mdns_client.shutdown().unwrap();
5274 }
5275
5276 #[test]
5277 fn test_custom_port_isolation() {
5278 let service_type = "_custom_port._udp.local.";
5283 let instance_custom = "custom_port_instance";
5284 let instance_default = "default_port_instance";
5285 let host_name = "custom_port_host.local.";
5286
5287 let service_ip_addr = my_ip_interfaces(false)
5288 .iter()
5289 .find(|iface| iface.ip().is_ipv4())
5290 .map(|iface| iface.ip())
5291 .expect("Test requires an IPv4 interface");
5292
5293 let service_custom = ServiceInfo::new(
5295 service_type,
5296 instance_custom,
5297 host_name,
5298 service_ip_addr,
5299 8080,
5300 None,
5301 )
5302 .unwrap();
5303
5304 let service_default = ServiceInfo::new(
5306 service_type,
5307 instance_default,
5308 host_name,
5309 service_ip_addr,
5310 8081,
5311 None,
5312 )
5313 .unwrap();
5314
5315 let custom_port = 5454u16;
5317 let server_custom =
5318 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5319 let client_custom =
5320 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5321
5322 let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5324
5325 server_custom
5327 .register(service_custom.clone())
5328 .expect("Failed to register custom port service");
5329
5330 server_default
5332 .register(service_default.clone())
5333 .expect("Failed to register default port service");
5334
5335 let browse_custom = client_custom
5337 .browse(service_type)
5338 .expect("Failed to browse on custom port");
5339
5340 let timeout = Duration::from_secs(3);
5341 let mut found_custom = false;
5342 let mut found_default_on_custom = false;
5343
5344 while let Ok(event) = browse_custom.recv_timeout(timeout) {
5346 if let ServiceEvent::ServiceResolved(info) = event {
5347 println!(
5348 "Custom port client resolved: {} on port {}",
5349 info.get_fullname(),
5350 info.get_port()
5351 );
5352 if info.get_fullname().starts_with(instance_custom) {
5353 found_custom = true;
5354 assert_eq!(info.get_port(), 8080);
5355 }
5356 if info.get_fullname().starts_with(instance_default) {
5357 found_default_on_custom = true;
5358 }
5359 }
5360 }
5361
5362 assert!(
5363 found_custom,
5364 "Custom port client should find service on custom port"
5365 );
5366 assert!(
5367 !found_default_on_custom,
5368 "Custom port client should NOT find service on default port"
5369 );
5370
5371 let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5374 let browse_default = client_default
5375 .browse(service_type)
5376 .expect("Failed to browse on default port");
5377
5378 let mut found_default = false;
5379 let mut found_custom_on_default = false;
5380
5381 while let Ok(event) = browse_default.recv_timeout(timeout) {
5382 if let ServiceEvent::ServiceResolved(info) = event {
5383 println!(
5384 "Default port client resolved: {} on port {}",
5385 info.get_fullname(),
5386 info.get_port()
5387 );
5388 if info.get_fullname().starts_with(instance_default) {
5389 found_default = true;
5390 assert_eq!(info.get_port(), 8081);
5391 }
5392 if info.get_fullname().starts_with(instance_custom) {
5393 found_custom_on_default = true;
5394 }
5395 }
5396 }
5397
5398 assert!(
5399 found_default,
5400 "Default port client should find service on default port"
5401 );
5402 assert!(
5403 !found_custom_on_default,
5404 "Default port client should NOT find service on custom port"
5405 );
5406
5407 server_custom.shutdown().unwrap();
5409 client_custom.shutdown().unwrap();
5410 server_default.shutdown().unwrap();
5411 client_default.shutdown().unwrap();
5412 }
5413}