1#[cfg(feature = "logging")]
32use crate::log::{debug, error, trace};
33use crate::{
34 current_time_millis,
35 dns_cache::{DnsCache, IpType},
36 dns_parser::{
37 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
38 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
39 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
40 },
41 error::{e_fmt, Error, Result},
42 service_info::{valid_ip_on_intf, DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
43 Receiver, ResolvedService, TxtProperties,
44};
45use flume::{bounded, Sender, TrySendError};
46use if_addrs::{IfAddr, Interface};
47use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
48use socket2::Domain;
49use socket_pktinfo::PktInfoUdpSocket;
50use std::{
51 cmp::{self, Reverse},
52 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
53 fmt, io,
54 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
55 str, thread,
56 time::Duration,
57 vec,
58};
59
60pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
64
65pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
67
68pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
71
72pub const MDNS_PORT: u16 = 5353;
74
75const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
76const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
77const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
78
79const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
80
81#[derive(Debug)]
83pub enum UnregisterStatus {
84 OK,
86 NotFound,
88}
89
90#[derive(Debug, PartialEq, Clone, Eq)]
92#[non_exhaustive]
93pub enum DaemonStatus {
94 Running,
96
97 Shutdown,
99}
100
101#[derive(Hash, Eq, PartialEq)]
104enum Counter {
105 Register,
106 RegisterResend,
107 Unregister,
108 UnregisterResend,
109 Browse,
110 ResolveHostname,
111 Respond,
112 CacheRefreshPTR,
113 CacheRefreshSrvTxt,
114 CacheRefreshAddr,
115 KnownAnswerSuppression,
116 CachedPTR,
117 CachedSRV,
118 CachedAddr,
119 CachedTxt,
120 CachedNSec,
121 CachedSubtype,
122 DnsRegistryProbe,
123 DnsRegistryActive,
124 DnsRegistryTimer,
125 DnsRegistryNameChange,
126 Timer,
127}
128
129impl fmt::Display for Counter {
130 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131 match self {
132 Self::Register => write!(f, "register"),
133 Self::RegisterResend => write!(f, "register-resend"),
134 Self::Unregister => write!(f, "unregister"),
135 Self::UnregisterResend => write!(f, "unregister-resend"),
136 Self::Browse => write!(f, "browse"),
137 Self::ResolveHostname => write!(f, "resolve-hostname"),
138 Self::Respond => write!(f, "respond"),
139 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
140 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
141 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
142 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
143 Self::CachedPTR => write!(f, "cached-ptr"),
144 Self::CachedSRV => write!(f, "cached-srv"),
145 Self::CachedAddr => write!(f, "cached-addr"),
146 Self::CachedTxt => write!(f, "cached-txt"),
147 Self::CachedNSec => write!(f, "cached-nsec"),
148 Self::CachedSubtype => write!(f, "cached-subtype"),
149 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
150 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
151 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
152 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
153 Self::Timer => write!(f, "timer"),
154 }
155 }
156}
157
158#[derive(Debug)]
159enum InternalError {
160 IntfAddrInvalid(Interface),
161}
162
163impl fmt::Display for InternalError {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 match self {
166 InternalError::IntfAddrInvalid(iface) => write!(f, "interface addr invalid: {iface:?}"),
167 }
168 }
169}
170
171type MyResult<T> = core::result::Result<T, InternalError>;
172
173struct MyUdpSocket {
178 pktinfo: PktInfoUdpSocket,
181
182 mio: MioUdpSocket,
185}
186
187impl MyUdpSocket {
188 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
189 let std_sock = pktinfo.try_clone_std()?;
190 let mio = MioUdpSocket::from_std(std_sock);
191
192 Ok(Self { pktinfo, mio })
193 }
194}
195
196impl Source for MyUdpSocket {
198 fn register(
199 &mut self,
200 registry: &Registry,
201 token: Token,
202 interests: Interest,
203 ) -> io::Result<()> {
204 self.mio.register(registry, token, interests)
205 }
206
207 fn reregister(
208 &mut self,
209 registry: &Registry,
210 token: Token,
211 interests: Interest,
212 ) -> io::Result<()> {
213 self.mio.reregister(registry, token, interests)
214 }
215
216 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
217 self.mio.deregister(registry)
218 }
219}
220
221pub type Metrics = HashMap<String, i64>;
224
225const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
233pub struct ServiceDaemon {
234 sender: Sender<Command>,
236
237 signal_addr: SocketAddr,
243}
244
245impl ServiceDaemon {
246 pub fn new() -> Result<Self> {
252 Self::new_with_port(MDNS_PORT)
253 }
254
255 pub fn new_with_port(port: u16) -> Result<Self> {
278 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
281
282 let signal_sock = UdpSocket::bind(signal_addr)
283 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
284
285 let signal_addr = signal_sock
287 .local_addr()
288 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
289
290 signal_sock
292 .set_nonblocking(true)
293 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
294
295 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
296
297 let (sender, receiver) = bounded(100);
298
299 let mio_sock = MioUdpSocket::from_std(signal_sock);
301 let cmd_sender = sender.clone();
302 thread::Builder::new()
303 .name("mDNS_daemon".to_string())
304 .spawn(move || {
305 Self::daemon_thread(mio_sock, poller, receiver, port, cmd_sender, signal_addr)
306 })
307 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
308
309 Ok(Self {
310 sender,
311 signal_addr,
312 })
313 }
314
315 fn send_cmd(&self, cmd: Command) -> Result<()> {
318 let cmd_name = cmd.to_string();
319
320 self.sender.try_send(cmd).map_err(|e| match e {
322 TrySendError::Full(_) => Error::Again,
323 e => e_fmt!("flume::channel::send failed: {}", e),
324 })?;
325
326 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
328 let socket = UdpSocket::bind(addr)
329 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
330 socket
331 .send_to(cmd_name.as_bytes(), self.signal_addr)
332 .map_err(|e| {
333 e_fmt!(
334 "signal socket send_to {} ({}) failed: {}",
335 self.signal_addr,
336 cmd_name,
337 e
338 )
339 })?;
340
341 Ok(())
342 }
343
344 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
355 check_domain_suffix(service_type)?;
356
357 let (resp_s, resp_r) = bounded(10);
358 self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
359 Ok(resp_r)
360 }
361
362 pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
371 check_domain_suffix(service_type)?;
372
373 let (resp_s, resp_r) = bounded(10);
374 self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
375 Ok(resp_r)
376 }
377
378 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
383 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
384 }
385
386 pub fn resolve_hostname(
394 &self,
395 hostname: &str,
396 timeout: Option<u64>,
397 ) -> Result<Receiver<HostnameResolutionEvent>> {
398 check_hostname(hostname)?;
399 let (resp_s, resp_r) = bounded(10);
400 self.send_cmd(Command::ResolveHostname(
401 hostname.to_string(),
402 1,
403 resp_s,
404 timeout,
405 ))?;
406 Ok(resp_r)
407 }
408
409 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
414 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
415 }
416
417 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
425 check_service_name(service_info.get_fullname())?;
426 check_hostname(service_info.get_hostname())?;
427
428 self.send_cmd(Command::Register(service_info.into()))
429 }
430
431 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
439 let (resp_s, resp_r) = bounded(1);
440 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
441 Ok(resp_r)
442 }
443
444 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
448 let (resp_s, resp_r) = bounded(100);
449 self.send_cmd(Command::Monitor(resp_s))?;
450 Ok(resp_r)
451 }
452
453 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
458 let (resp_s, resp_r) = bounded(1);
459 self.send_cmd(Command::Exit(resp_s))?;
460 Ok(resp_r)
461 }
462
463 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
469 let (resp_s, resp_r) = bounded(1);
470
471 if self.sender.is_disconnected() {
472 resp_s
473 .send(DaemonStatus::Shutdown)
474 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
475 } else {
476 self.send_cmd(Command::GetStatus(resp_s))?;
477 }
478
479 Ok(resp_r)
480 }
481
482 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
487 let (resp_s, resp_r) = bounded(1);
488 self.send_cmd(Command::GetMetrics(resp_s))?;
489 Ok(resp_r)
490 }
491
492 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
499 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
502 return Err(Error::Msg(format!(
503 "service name length max {len_max} is too large"
504 )));
505 }
506
507 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
508 }
509
510 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
516 let interval_in_millis = interval_in_secs as u64 * 1000;
517 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
518 interval_in_millis,
519 )))
520 }
521
522 pub fn get_ip_check_interval(&self) -> Result<u32> {
524 let (resp_s, resp_r) = bounded(1);
525 self.send_cmd(Command::GetOption(resp_s))?;
526
527 let option = resp_r
528 .recv_timeout(Duration::from_secs(10))
529 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
530 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
531 Ok(ip_check_interval_in_secs as u32)
532 }
533
534 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
541 let if_kind_vec = if_kind.into_vec();
542 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
543 if_kind_vec.kinds,
544 )))
545 }
546
547 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
554 let if_kind_vec = if_kind.into_vec();
555 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
556 if_kind_vec.kinds,
557 )))
558 }
559
560 pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
571 self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
572 }
573
574 pub fn include_apple_p2p(&self, include: bool) -> Result<()> {
577 self.send_cmd(Command::SetOption(DaemonOption::IncludeAppleP2P(include)))
578 }
579
580 #[cfg(test)]
581 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
582 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
583 ifname.to_string(),
584 )))
585 }
586
587 #[cfg(test)]
588 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
589 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
590 ifname.to_string(),
591 )))
592 }
593
594 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
610 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
611 }
612
613 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
629 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
630 }
631
632 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
645 self.send_cmd(Command::Verify(instance_fullname, timeout))
646 }
647
648 fn daemon_thread(
649 signal_sock: MioUdpSocket,
650 poller: Poll,
651 receiver: Receiver<Command>,
652 port: u16,
653 cmd_sender: Sender<Command>,
654 signal_addr: SocketAddr,
655 ) {
656 let mut zc = Zeroconf::new(signal_sock, poller, port, cmd_sender, signal_addr);
657
658 if let Some(cmd) = zc.run(receiver) {
659 match cmd {
660 Command::Exit(resp_s) => {
661 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
664 debug!("exit: failed to send response of shutdown: {}", e);
665 }
666 }
667 _ => {
668 debug!("Unexpected command: {:?}", cmd);
669 }
670 }
671 }
672 }
673}
674
675fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
677 let intf_ip = &intf.ip();
680 match intf_ip {
681 IpAddr::V4(ip) => {
682 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
683 let sock = new_socket(addr.into(), true)?;
684
685 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
687 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
688
689 sock.set_multicast_if_v4(ip)
691 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
692
693 sock.set_multicast_ttl_v4(255)
698 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
699
700 if !should_loop {
701 sock.set_multicast_loop_v4(false)
702 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
703 }
704
705 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
707 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
708 for packet in test_packets {
709 sock.send_to(&packet, &multicast_addr)
710 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
711 }
712 MyUdpSocket::new(sock)
713 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
714 }
715 IpAddr::V6(ip) => {
716 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
717 let sock = new_socket(addr.into(), true)?;
718
719 let if_index = intf.index.unwrap_or(0);
720
721 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
723 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
724
725 sock.set_multicast_if_v6(if_index)
727 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
728
729 MyUdpSocket::new(sock)
734 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
735 }
736 }
737}
738
739fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
742 let domain = match addr {
743 SocketAddr::V4(_) => socket2::Domain::IPV4,
744 SocketAddr::V6(_) => socket2::Domain::IPV6,
745 };
746
747 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
748
749 fd.set_reuse_address(true)
750 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
751 #[cfg(unix)]
752 if let Err(e) = fd.set_reuse_port(true) {
753 debug!(
754 "SO_REUSEPORT is not supported, continuing without it: {}",
755 e
756 );
757 }
758
759 if non_block {
760 fd.set_nonblocking(true)
761 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
762 }
763
764 fd.bind(&addr.into())
765 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
766
767 trace!("new socket bind to {}", &addr);
768 Ok(fd)
769}
770
771struct ReRun {
773 next_time: u64,
775 command: Command,
776}
777
778#[derive(Debug, Clone)]
782#[non_exhaustive]
783pub enum IfKind {
784 All,
786
787 IPv4,
789
790 IPv6,
792
793 Name(String),
795
796 Addr(IpAddr),
800
801 LoopbackV4,
805
806 LoopbackV6,
808
809 IndexV4(u32),
811
812 IndexV6(u32),
814}
815
816impl IfKind {
817 fn matches(&self, intf: &Interface) -> bool {
819 match self {
820 Self::All => true,
821 Self::IPv4 => intf.ip().is_ipv4(),
822 Self::IPv6 => intf.ip().is_ipv6(),
823 Self::Name(ifname) => ifname == &intf.name,
824 Self::Addr(addr) => addr == &intf.ip(),
825 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
826 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
827 Self::IndexV4(idx) => intf.index == Some(*idx) && intf.ip().is_ipv4(),
828 Self::IndexV6(idx) => intf.index == Some(*idx) && intf.ip().is_ipv6(),
829 }
830 }
831}
832
833impl From<&str> for IfKind {
836 fn from(val: &str) -> Self {
837 Self::Name(val.to_string())
838 }
839}
840
841impl From<&String> for IfKind {
842 fn from(val: &String) -> Self {
843 Self::Name(val.to_string())
844 }
845}
846
847impl From<IpAddr> for IfKind {
849 fn from(val: IpAddr) -> Self {
850 Self::Addr(val)
851 }
852}
853
854pub struct IfKindVec {
856 kinds: Vec<IfKind>,
857}
858
859pub trait IntoIfKindVec {
861 fn into_vec(self) -> IfKindVec;
862}
863
864impl<T: Into<IfKind>> IntoIfKindVec for T {
865 fn into_vec(self) -> IfKindVec {
866 let if_kind: IfKind = self.into();
867 IfKindVec {
868 kinds: vec![if_kind],
869 }
870 }
871}
872
873impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
874 fn into_vec(self) -> IfKindVec {
875 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
876 IfKindVec { kinds }
877 }
878}
879
880struct IfSelection {
882 if_kind: IfKind,
884
885 selected: bool,
887}
888
889struct Zeroconf {
891 port: u16,
894
895 my_intfs: HashMap<u32, MyIntf>,
897
898 ipv4_sock: Option<MyUdpSocket>,
900
901 ipv6_sock: Option<MyUdpSocket>,
903
904 my_services: HashMap<String, ServiceInfo>,
906
907 cache: DnsCache,
909
910 dns_registry_map: HashMap<u32, DnsRegistry>,
912
913 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
924
925 counters: Metrics,
926
927 poller: Poll,
929
930 monitors: Vec<Sender<DaemonEvent>>,
932
933 service_name_len_max: u8,
935
936 ip_check_interval: u64,
938
939 if_selections: Vec<IfSelection>,
941
942 signal_sock: MioUdpSocket,
944
945 timers: BinaryHeap<Reverse<u64>>,
951
952 status: DaemonStatus,
953
954 pending_resolves: HashSet<String>,
956
957 resolved: HashSet<String>,
959
960 multicast_loop_v4: bool,
961
962 multicast_loop_v6: bool,
963
964 accept_unsolicited: bool,
965
966 include_apple_p2p: bool,
967
968 cmd_sender: Sender<Command>,
969
970 signal_addr: SocketAddr,
971
972 #[cfg(test)]
973 test_down_interfaces: HashSet<String>,
974}
975
976fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
978 let intf_ip = &intf.ip();
979 match intf_ip {
980 IpAddr::V4(ip) => {
981 debug!("join multicast group V4 on {} addr {ip}", intf.name);
983 my_sock
984 .join_multicast_v4(&GROUP_ADDR_V4, ip)
985 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
986 }
987 IpAddr::V6(ip) => {
988 let if_index = intf.index.unwrap_or(0);
989 debug!(
991 "join multicast group V6 on {} addr {ip} with index {if_index}",
992 intf.name
993 );
994 my_sock
995 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
996 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
997 }
998 }
999 Ok(())
1000}
1001
1002impl Zeroconf {
1003 fn new(
1004 signal_sock: MioUdpSocket,
1005 poller: Poll,
1006 port: u16,
1007 cmd_sender: Sender<Command>,
1008 signal_addr: SocketAddr,
1009 ) -> Self {
1010 let my_ifaddrs = my_ip_interfaces(true);
1012
1013 let mut my_intfs = HashMap::new();
1017 let mut dns_registry_map = HashMap::new();
1018
1019 let mut ipv4_sock = None;
1022 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
1023 match new_socket(addr.into(), true) {
1024 Ok(sock) => {
1025 sock.set_multicast_ttl_v4(255)
1030 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
1031 .ok();
1032
1033 ipv4_sock = match MyUdpSocket::new(sock) {
1035 Ok(s) => Some(s),
1036 Err(e) => {
1037 debug!("failed to create IPv4 MyUdpSocket: {e}");
1038 None
1039 }
1040 };
1041 }
1042 Err(e) => debug!("failed to create IPv4 socket: {e}"),
1044 }
1045
1046 let mut ipv6_sock = None;
1047 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
1048 match new_socket(addr.into(), true) {
1049 Ok(sock) => {
1050 sock.set_multicast_hops_v6(255)
1054 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1055 .ok();
1056
1057 ipv6_sock = match MyUdpSocket::new(sock) {
1059 Ok(s) => Some(s),
1060 Err(e) => {
1061 debug!("failed to create IPv6 MyUdpSocket: {e}");
1062 None
1063 }
1064 };
1065 }
1066 Err(e) => debug!("failed to create IPv6 socket: {e}"),
1067 }
1068
1069 for intf in my_ifaddrs {
1071 let sock_opt = if intf.ip().is_ipv4() {
1072 &ipv4_sock
1073 } else {
1074 &ipv6_sock
1075 };
1076 let Some(sock) = sock_opt else {
1077 debug!(
1078 "no socket available for interface {} with addr {}. Skipped.",
1079 intf.name,
1080 intf.ip()
1081 );
1082 continue;
1083 };
1084
1085 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1086 debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1087 }
1088
1089 let if_index = intf.index.unwrap_or(0);
1090
1091 dns_registry_map
1093 .entry(if_index)
1094 .or_insert_with(DnsRegistry::new);
1095
1096 my_intfs
1097 .entry(if_index)
1098 .and_modify(|v: &mut MyIntf| {
1099 v.addrs.insert(intf.addr.clone());
1100 })
1101 .or_insert(MyIntf {
1102 name: intf.name.clone(),
1103 index: if_index,
1104 addrs: HashSet::from([intf.addr]),
1105 });
1106 }
1107
1108 let monitors = Vec::new();
1109 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1110 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1111
1112 let timers = BinaryHeap::new();
1113
1114 let if_selections = vec![];
1116
1117 let status = DaemonStatus::Running;
1118
1119 Self {
1120 port,
1121 my_intfs,
1122 ipv4_sock,
1123 ipv6_sock,
1124 my_services: HashMap::new(),
1125 cache: DnsCache::new(),
1126 dns_registry_map,
1127 hostname_resolvers: HashMap::new(),
1128 service_queriers: HashMap::new(),
1129 retransmissions: Vec::new(),
1130 counters: HashMap::new(),
1131 poller,
1132 monitors,
1133 service_name_len_max,
1134 ip_check_interval,
1135 if_selections,
1136 signal_sock,
1137 timers,
1138 status,
1139 pending_resolves: HashSet::new(),
1140 resolved: HashSet::new(),
1141 multicast_loop_v4: true,
1142 multicast_loop_v6: true,
1143 accept_unsolicited: false,
1144 include_apple_p2p: false,
1145 cmd_sender,
1146 signal_addr,
1147
1148 #[cfg(test)]
1149 test_down_interfaces: HashSet::new(),
1150 }
1151 }
1152
1153 fn send_cmd_to_self(&self, cmd: Command) -> Result<()> {
1155 let cmd_name = cmd.to_string();
1156
1157 self.cmd_sender.try_send(cmd).map_err(|e| match e {
1158 TrySendError::Full(_) => Error::Again,
1159 e => e_fmt!("flume::channel::send failed: {}", e),
1160 })?;
1161
1162 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
1163 let socket = UdpSocket::bind(addr)
1164 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
1165 socket
1166 .send_to(cmd_name.as_bytes(), self.signal_addr)
1167 .map_err(|e| {
1168 e_fmt!(
1169 "signal socket send_to {} ({}) failed: {}",
1170 self.signal_addr,
1171 cmd_name,
1172 e
1173 )
1174 })?;
1175
1176 Ok(())
1177 }
1178
1179 fn cleanup(&mut self) {
1187 debug!("Starting cleanup for shutdown");
1188
1189 let service_names: Vec<String> = self.my_services.keys().cloned().collect();
1191 for fullname in service_names {
1192 if let Some(info) = self.my_services.get(&fullname) {
1193 debug!("Unregistering service during shutdown: {}", &fullname);
1194
1195 for intf in self.my_intfs.values() {
1196 if let Some(sock) = self.ipv4_sock.as_ref() {
1197 self.unregister_service(info, intf, &sock.pktinfo);
1198 }
1199
1200 if let Some(sock) = self.ipv6_sock.as_ref() {
1201 self.unregister_service(info, intf, &sock.pktinfo);
1202 }
1203 }
1204 }
1205 }
1206 self.my_services.clear();
1207
1208 let browse_types: Vec<String> = self.service_queriers.keys().cloned().collect();
1210 for ty_domain in browse_types {
1211 debug!("Stopping browse during shutdown: {}", &ty_domain);
1212 if let Some(sender) = self.service_queriers.remove(&ty_domain) {
1213 if let Err(e) = sender.send(ServiceEvent::SearchStopped(ty_domain.clone())) {
1215 debug!("Failed to send SearchStopped during shutdown: {}", e);
1216 }
1217 }
1218 }
1219
1220 let hostnames: Vec<String> = self.hostname_resolvers.keys().cloned().collect();
1222 for hostname in hostnames {
1223 debug!(
1224 "Stopping hostname resolution during shutdown: {}",
1225 &hostname
1226 );
1227 if let Some((sender, _timeout)) = self.hostname_resolvers.remove(&hostname) {
1228 if let Err(e) =
1230 sender.send(HostnameResolutionEvent::SearchStopped(hostname.clone()))
1231 {
1232 debug!(
1233 "Failed to send HostnameResolutionEvent::SearchStopped during shutdown: {}",
1234 e
1235 );
1236 }
1237 }
1238 }
1239
1240 self.retransmissions.clear();
1242
1243 debug!("Cleanup completed");
1244 }
1245
1246 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1255 if let Err(e) = self.poller.registry().register(
1257 &mut self.signal_sock,
1258 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1259 mio::Interest::READABLE,
1260 ) {
1261 debug!("failed to add signal socket to the poller: {}", e);
1262 return None;
1263 }
1264
1265 if let Some(sock) = self.ipv4_sock.as_mut() {
1266 if let Err(e) = self.poller.registry().register(
1267 sock,
1268 mio::Token(IPV4_SOCK_EVENT_KEY),
1269 mio::Interest::READABLE,
1270 ) {
1271 debug!("failed to register ipv4 socket: {}", e);
1272 return None;
1273 }
1274 }
1275
1276 if let Some(sock) = self.ipv6_sock.as_mut() {
1277 if let Err(e) = self.poller.registry().register(
1278 sock,
1279 mio::Token(IPV6_SOCK_EVENT_KEY),
1280 mio::Interest::READABLE,
1281 ) {
1282 debug!("failed to register ipv6 socket: {}", e);
1283 return None;
1284 }
1285 }
1286
1287 let mut next_ip_check = if self.ip_check_interval > 0 {
1289 current_time_millis() + self.ip_check_interval
1290 } else {
1291 0
1292 };
1293
1294 if next_ip_check > 0 {
1295 self.add_timer(next_ip_check);
1296 }
1297
1298 let mut events = mio::Events::with_capacity(1024);
1301 loop {
1302 let now = current_time_millis();
1303
1304 let earliest_timer = self.peek_earliest_timer();
1305 let timeout = earliest_timer.map(|timer| {
1306 let millis = if timer > now { timer - now } else { 1 };
1308 Duration::from_millis(millis)
1309 });
1310
1311 events.clear();
1313 match self.poller.poll(&mut events, timeout) {
1314 Ok(_) => self.handle_poller_events(&events),
1315 Err(e) => debug!("failed to select from sockets: {}", e),
1316 }
1317
1318 let now = current_time_millis();
1319
1320 self.pop_timers_till(now);
1322
1323 for hostname in self
1325 .hostname_resolvers
1326 .clone()
1327 .into_iter()
1328 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1329 .map(|(hostname, _)| hostname)
1330 {
1331 trace!("hostname resolver timeout for {}", &hostname);
1332 call_hostname_resolution_listener(
1333 &self.hostname_resolvers,
1334 &hostname,
1335 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1336 );
1337 call_hostname_resolution_listener(
1338 &self.hostname_resolvers,
1339 &hostname,
1340 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1341 );
1342 self.hostname_resolvers.remove(&hostname);
1343 }
1344
1345 while let Ok(command) = receiver.try_recv() {
1347 if matches!(command, Command::Exit(_)) {
1348 debug!("Exit command received, performing cleanup");
1349 self.cleanup();
1350 self.status = DaemonStatus::Shutdown;
1351 return Some(command);
1352 }
1353 self.exec_command(command, false);
1354 }
1355
1356 let mut i = 0;
1358 while i < self.retransmissions.len() {
1359 if now >= self.retransmissions[i].next_time {
1360 let rerun = self.retransmissions.remove(i);
1361 self.exec_command(rerun.command, true);
1362 } else {
1363 i += 1;
1364 }
1365 }
1366
1367 self.refresh_active_services();
1369
1370 let mut query_count = 0;
1372 for (hostname, _sender) in self.hostname_resolvers.iter() {
1373 for (hostname, ip_addr) in
1374 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1375 {
1376 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1377 query_count += 1;
1378 }
1379 }
1380
1381 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1382
1383 let now = current_time_millis();
1385
1386 let expired_services = self.cache.evict_expired_services(now);
1388 if !expired_services.is_empty() {
1389 debug!(
1390 "run: send {} service removal to listeners",
1391 expired_services.len()
1392 );
1393 self.notify_service_removal(expired_services);
1394 }
1395
1396 let expired_addrs = self.cache.evict_expired_addr(now);
1398 for (hostname, addrs) in expired_addrs {
1399 call_hostname_resolution_listener(
1400 &self.hostname_resolvers,
1401 &hostname,
1402 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1403 );
1404 let instances = self.cache.get_instances_on_host(&hostname);
1405 let instance_set: HashSet<String> = instances.into_iter().collect();
1406 self.resolve_updated_instances(&instance_set);
1407 }
1408
1409 self.probing_handler();
1411
1412 if now >= next_ip_check && next_ip_check > 0 {
1414 next_ip_check = now + self.ip_check_interval;
1415 self.add_timer(next_ip_check);
1416
1417 self.check_ip_changes();
1418 }
1419 }
1420 }
1421
1422 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1423 match daemon_opt {
1424 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1425 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1426 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1427 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1428 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1429 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1430 DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1431 DaemonOption::IncludeAppleP2P(enable) => self.set_apple_p2p(enable),
1432 #[cfg(test)]
1433 DaemonOption::TestDownInterface(ifname) => {
1434 self.test_down_interfaces.insert(ifname);
1435 }
1436 #[cfg(test)]
1437 DaemonOption::TestUpInterface(ifname) => {
1438 self.test_down_interfaces.remove(&ifname);
1439 }
1440 }
1441 }
1442
1443 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1444 debug!("enable_interface: {:?}", kinds);
1445 let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1446
1447 for if_kind in kinds {
1448 self.if_selections.push(IfSelection {
1449 if_kind: resolve_addr_to_index(if_kind, &interfaces),
1450 selected: true,
1451 });
1452 }
1453
1454 self.apply_intf_selections(interfaces);
1455 }
1456
1457 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1458 debug!("disable_interface: {:?}", kinds);
1459 let interfaces = my_ip_interfaces_inner(true, self.include_apple_p2p);
1460
1461 for if_kind in kinds {
1462 self.if_selections.push(IfSelection {
1463 if_kind: resolve_addr_to_index(if_kind, &interfaces),
1464 selected: false,
1465 });
1466 }
1467
1468 self.apply_intf_selections(interfaces);
1469 }
1470
1471 fn set_multicast_loop_v4(&mut self, on: bool) {
1472 let Some(sock) = self.ipv4_sock.as_mut() else {
1473 return;
1474 };
1475 self.multicast_loop_v4 = on;
1476 sock.pktinfo
1477 .set_multicast_loop_v4(on)
1478 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1479 .unwrap();
1480 }
1481
1482 fn set_multicast_loop_v6(&mut self, on: bool) {
1483 let Some(sock) = self.ipv6_sock.as_mut() else {
1484 return;
1485 };
1486 self.multicast_loop_v6 = on;
1487 sock.pktinfo
1488 .set_multicast_loop_v6(on)
1489 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1490 .unwrap();
1491 }
1492
1493 fn set_accept_unsolicited(&mut self, accept: bool) {
1494 self.accept_unsolicited = accept;
1495 }
1496
1497 fn set_apple_p2p(&mut self, include: bool) {
1498 if self.include_apple_p2p != include {
1499 self.include_apple_p2p = include;
1500 self.apply_intf_selections(my_ip_interfaces_inner(true, self.include_apple_p2p));
1501 }
1502 }
1503
1504 fn notify_monitors(&mut self, event: DaemonEvent) {
1505 self.monitors.retain(|sender| {
1507 if let Err(e) = sender.try_send(event.clone()) {
1508 debug!("notify_monitors: try_send: {}", &e);
1509 if matches!(e, TrySendError::Disconnected(_)) {
1510 return false; }
1512 }
1513 true
1514 });
1515 }
1516
1517 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1519 for (_, service_info) in self.my_services.iter_mut() {
1520 if service_info.is_addr_auto() {
1521 service_info.remove_ipaddr(addr);
1522 }
1523 }
1524 }
1525
1526 fn add_timer(&mut self, next_time: u64) {
1527 self.timers.push(Reverse(next_time));
1528 }
1529
1530 fn peek_earliest_timer(&self) -> Option<u64> {
1531 self.timers.peek().map(|Reverse(v)| *v)
1532 }
1533
1534 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1535 self.timers.pop().map(|Reverse(v)| v)
1536 }
1537
1538 fn pop_timers_till(&mut self, now: u64) {
1540 while let Some(Reverse(v)) = self.timers.peek() {
1541 if *v > now {
1542 break;
1543 }
1544 self.timers.pop();
1545 }
1546 }
1547
1548 fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1550 let intf_count = interfaces.len();
1551 let mut intf_selections = vec![true; intf_count];
1552
1553 for selection in self.if_selections.iter() {
1555 for i in 0..intf_count {
1557 if selection.if_kind.matches(&interfaces[i]) {
1558 intf_selections[i] = selection.selected;
1559 }
1560 }
1561 }
1562
1563 let mut selected_addrs = HashSet::new();
1564 for i in 0..intf_count {
1565 if intf_selections[i] {
1566 selected_addrs.insert(interfaces[i].clone());
1567 }
1568 }
1569
1570 selected_addrs
1571 }
1572
1573 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1578 let intf_count = interfaces.len();
1580 let mut intf_selections = vec![true; intf_count];
1581
1582 for selection in self.if_selections.iter() {
1584 for i in 0..intf_count {
1586 if selection.if_kind.matches(&interfaces[i]) {
1587 intf_selections[i] = selection.selected;
1588 }
1589 }
1590 }
1591
1592 for (idx, intf) in interfaces.into_iter().enumerate() {
1594 if intf_selections[idx] {
1595 self.add_interface(intf);
1597 } else {
1598 self.del_interface_addr(&intf);
1600 }
1601 }
1602 }
1603
1604 fn del_ip(&mut self, ip: IpAddr) {
1605 self.del_addr_in_my_services(&ip);
1606 self.notify_monitors(DaemonEvent::IpDel(ip));
1607 }
1608
1609 fn check_ip_changes(&mut self) {
1611 let my_ifaddrs = my_ip_interfaces_inner(true, self.include_apple_p2p);
1613
1614 #[cfg(test)]
1615 let my_ifaddrs: Vec<_> = my_ifaddrs
1616 .into_iter()
1617 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1618 .collect();
1619
1620 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1621 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1622 let if_index = intf.index.unwrap_or(0);
1623 acc.entry(if_index).or_default().push(&intf.addr);
1624 acc
1625 });
1626
1627 let mut deleted_intfs = Vec::new();
1628 let mut deleted_ips = Vec::new();
1629
1630 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1631 let mut last_ipv4 = None;
1632 let mut last_ipv6 = None;
1633
1634 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1635 my_intf.addrs.retain(|addr| {
1636 if current_addrs.contains(&addr) {
1637 true
1638 } else {
1639 match addr.ip() {
1640 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1641 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1642 }
1643 deleted_ips.push(addr.ip());
1644 false
1645 }
1646 });
1647 if my_intf.addrs.is_empty() {
1648 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1649 }
1650 } else {
1651 debug!(
1653 "check_ip_changes: interface {} ({}) no longer exists, removing",
1654 my_intf.name, if_index
1655 );
1656 for addr in my_intf.addrs.iter() {
1657 match addr.ip() {
1658 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1659 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1660 }
1661 deleted_ips.push(addr.ip())
1662 }
1663 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1664 }
1665 }
1666
1667 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1668 debug!(
1669 "check_ip_changes: {} deleted ips {} deleted intfs",
1670 deleted_ips.len(),
1671 deleted_intfs.len()
1672 );
1673 }
1674
1675 for ip in deleted_ips {
1676 self.del_ip(ip);
1677 }
1678
1679 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1680 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1681 continue;
1682 };
1683
1684 if let Some(ipv4) = last_ipv4 {
1685 debug!("leave multicast for {ipv4}");
1686 if let Some(sock) = self.ipv4_sock.as_mut() {
1687 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1688 debug!("leave multicast group for addr {ipv4}: {e}");
1689 }
1690 }
1691 }
1692
1693 if let Some(ipv6) = last_ipv6 {
1694 debug!("leave multicast for {ipv6}");
1695 if let Some(sock) = self.ipv6_sock.as_mut() {
1696 if let Err(e) = sock
1697 .pktinfo
1698 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1699 {
1700 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1701 }
1702 }
1703 }
1704
1705 let intf_id = InterfaceId {
1707 name: my_intf.name.to_string(),
1708 index: my_intf.index,
1709 };
1710 let result = self.cache.remove_records_on_intf(intf_id);
1711 self.notify_service_removal(result.removed_instances);
1712 self.resolve_updated_instances(&result.modified_instances);
1713 }
1714
1715 self.apply_intf_selections(my_ifaddrs);
1717 }
1718
1719 fn del_interface_addr(&mut self, intf: &Interface) {
1722 let if_index = intf.index.unwrap_or(0);
1723 debug!(
1724 "del_interface_addr: {} ({if_index}) addr {}",
1725 intf.name,
1726 intf.ip()
1727 );
1728
1729 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1730 debug!("del_interface_addr: interface {} not found", intf.name);
1731 return;
1732 };
1733
1734 let mut ip_removed = false;
1735
1736 if my_intf.addrs.remove(&intf.addr) {
1737 ip_removed = true;
1738
1739 match intf.addr.ip() {
1740 IpAddr::V4(ipv4) => {
1741 if my_intf.next_ifaddr_v4().is_none() {
1742 if let Some(sock) = self.ipv4_sock.as_mut() {
1743 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1744 debug!("leave multicast group for addr {ipv4}: {e}");
1745 } else {
1746 debug!("leave multicast for {ipv4}");
1747 }
1748 }
1749 }
1750 }
1751
1752 IpAddr::V6(ipv6) => {
1753 if my_intf.next_ifaddr_v6().is_none() {
1754 if let Some(sock) = self.ipv6_sock.as_mut() {
1755 if let Err(e) =
1756 sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1757 {
1758 debug!("leave multicast group for addr {ipv6}: {e}");
1759 }
1760 }
1761 }
1762 }
1763 }
1764
1765 if my_intf.addrs.is_empty() {
1766 debug!("del_interface_addr: removing interface {}", intf.name);
1768 self.my_intfs.remove(&if_index);
1769 self.dns_registry_map.remove(&if_index);
1770 self.cache
1771 .remove_addrs_on_disabled_intf(if_index, IpType::BOTH);
1772 } else {
1773 let is_v4 = intf.addr.ip().is_ipv4();
1777 let version_gone = if is_v4 {
1778 my_intf.next_ifaddr_v4().is_none()
1779 } else {
1780 my_intf.next_ifaddr_v6().is_none()
1781 };
1782 if version_gone {
1783 let ip_type = if is_v4 { IpType::V4 } else { IpType::V6 };
1784 self.cache.remove_addrs_on_disabled_intf(if_index, ip_type);
1785 }
1786 }
1787 }
1788
1789 if ip_removed {
1790 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1792 self.del_addr_in_my_services(&intf.ip());
1794 }
1795 }
1796
1797 fn add_interface(&mut self, intf: Interface) {
1798 let sock_opt = if intf.ip().is_ipv4() {
1799 &self.ipv4_sock
1800 } else {
1801 &self.ipv6_sock
1802 };
1803
1804 let Some(sock) = sock_opt else {
1805 debug!(
1806 "add_interface: no socket available for interface {} with addr {}. Skipped.",
1807 intf.name,
1808 intf.ip()
1809 );
1810 return;
1811 };
1812
1813 let if_index = intf.index.unwrap_or(0);
1814 let mut new_addr = false;
1815
1816 match self.my_intfs.entry(if_index) {
1817 Entry::Occupied(mut entry) => {
1818 let my_intf = entry.get_mut();
1820 if !my_intf.addrs.contains(&intf.addr) {
1821 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1822 debug!("add_interface: socket_config {}: {e}", &intf.name);
1823 }
1824 my_intf.addrs.insert(intf.addr.clone());
1825 new_addr = true;
1826 }
1827 }
1828 Entry::Vacant(entry) => {
1829 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1830 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1831 return;
1832 }
1833
1834 new_addr = true;
1835 let new_intf = MyIntf {
1836 name: intf.name.clone(),
1837 index: if_index,
1838 addrs: HashSet::from([intf.addr.clone()]),
1839 };
1840 entry.insert(new_intf);
1841 }
1842 }
1843
1844 if !new_addr {
1845 trace!("add_interface: interface {} already exists", &intf.name);
1846 return;
1847 }
1848
1849 debug!("add new interface {}: {}", intf.name, intf.ip());
1850
1851 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1852 debug!("add_interface: cannot find if_index {if_index}");
1853 return;
1854 };
1855
1856 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1857 Some(registry) => registry,
1858 None => self
1859 .dns_registry_map
1860 .entry(if_index)
1861 .or_insert_with(DnsRegistry::new),
1862 };
1863
1864 for (_, service_info) in self.my_services.iter_mut() {
1865 if service_info.is_addr_auto() {
1866 service_info.insert_ipaddr(&intf);
1867
1868 if let Ok(true) = announce_service_on_intf(
1869 dns_registry,
1870 service_info,
1871 my_intf,
1872 &sock.pktinfo,
1873 self.port,
1874 ) {
1875 debug!(
1876 "Announce service {} on {}",
1877 service_info.get_fullname(),
1878 intf.ip()
1879 );
1880 service_info.set_status(if_index, ServiceStatus::Announced);
1881 } else {
1882 for timer in dns_registry.new_timers.drain(..) {
1883 self.timers.push(Reverse(timer));
1884 }
1885 service_info.set_status(if_index, ServiceStatus::Probing);
1886 }
1887 }
1888 }
1889
1890 if let Some(my_intf) = self.my_intfs.get(&if_index) {
1895 for ty in self.service_queriers.keys() {
1896 self.send_query_on_intf(ty, RRType::PTR, my_intf);
1897 }
1898 }
1899
1900 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1902 }
1903
1904 fn register_service(&mut self, mut info: ServiceInfo) {
1913 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1915 error!("check_service_name_length: {}", &e);
1916 self.notify_monitors(DaemonEvent::Error(e));
1917 return;
1918 }
1919
1920 if info.is_addr_auto() {
1921 let selected_intfs =
1922 self.selected_intfs(my_ip_interfaces_inner(true, self.include_apple_p2p));
1923 for intf in selected_intfs {
1924 info.insert_ipaddr(&intf);
1925 }
1926 }
1927
1928 debug!("register service {:?}", &info);
1929
1930 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1931 if !outgoing_addrs.is_empty() {
1932 self.notify_monitors(DaemonEvent::Announce(
1933 info.get_fullname().to_string(),
1934 format!("{:?}", &outgoing_addrs),
1935 ));
1936 }
1937
1938 let service_fullname = info.get_fullname().to_lowercase();
1941 self.my_services.insert(service_fullname, info);
1942 }
1943
1944 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1947 let mut outgoing_addrs = Vec::new();
1948 let mut outgoing_intfs = HashSet::new();
1949
1950 let mut invalid_intf_addrs = HashSet::new();
1951
1952 for (if_index, intf) in self.my_intfs.iter() {
1953 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1954 Some(registry) => registry,
1955 None => self
1956 .dns_registry_map
1957 .entry(*if_index)
1958 .or_insert_with(DnsRegistry::new),
1959 };
1960
1961 let mut announced = false;
1962
1963 if let Some(sock) = self.ipv4_sock.as_mut() {
1965 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1966 Ok(true) => {
1967 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1968 outgoing_addrs.push(addr.ip());
1969 }
1970 outgoing_intfs.insert(intf.index);
1971
1972 debug!(
1973 "Announce service IPv4 {} on {}",
1974 info.get_fullname(),
1975 intf.name
1976 );
1977 announced = true;
1978 }
1979 Ok(false) => {}
1980 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
1981 invalid_intf_addrs.insert(intf_addr);
1982 }
1983 }
1984 }
1985
1986 if let Some(sock) = self.ipv6_sock.as_mut() {
1987 match announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1988 Ok(true) => {
1989 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1990 outgoing_addrs.push(addr.ip());
1991 }
1992 outgoing_intfs.insert(intf.index);
1993
1994 debug!(
1995 "Announce service IPv6 {} on {}",
1996 info.get_fullname(),
1997 intf.name
1998 );
1999 announced = true;
2000 }
2001 Ok(false) => {}
2002 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2003 invalid_intf_addrs.insert(intf_addr);
2004 }
2005 }
2006 }
2007
2008 if announced {
2009 info.set_status(intf.index, ServiceStatus::Announced);
2010 } else {
2011 for timer in dns_registry.new_timers.drain(..) {
2012 self.timers.push(Reverse(timer));
2013 }
2014 info.set_status(*if_index, ServiceStatus::Probing);
2015 }
2016 }
2017
2018 if !invalid_intf_addrs.is_empty() {
2019 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2020 }
2021
2022 let next_time = current_time_millis() + 1000;
2026 for if_index in outgoing_intfs {
2027 self.add_retransmission(
2028 next_time,
2029 Command::RegisterResend(info.get_fullname().to_string(), if_index),
2030 );
2031 }
2032
2033 outgoing_addrs
2034 }
2035
2036 fn probing_handler(&mut self) {
2038 let now = current_time_millis();
2039 let mut invalid_intf_addrs = HashSet::new();
2040
2041 for (if_index, intf) in self.my_intfs.iter() {
2042 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
2043 continue;
2044 };
2045
2046 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
2047
2048 if !out.questions().is_empty() {
2050 trace!("sending out probing of questions: {:?}", out.questions());
2051 if let Some(sock) = self.ipv4_sock.as_mut() {
2052 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2053 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2054 {
2055 invalid_intf_addrs.insert(intf_addr);
2056 }
2057 }
2058 if let Some(sock) = self.ipv6_sock.as_mut() {
2059 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2060 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2061 {
2062 invalid_intf_addrs.insert(intf_addr);
2063 }
2064 }
2065 }
2066
2067 let waiting_services =
2069 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
2070
2071 for service_name in waiting_services {
2072 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
2074 if info.get_status(*if_index) == ServiceStatus::Announced {
2075 debug!("service {} already announced", info.get_fullname());
2076 continue;
2077 }
2078
2079 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
2080 match announce_service_on_intf(
2081 dns_registry,
2082 info,
2083 intf,
2084 &sock.pktinfo,
2085 self.port,
2086 ) {
2087 Ok(announced) => announced,
2088 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2089 invalid_intf_addrs.insert(intf_addr);
2090 false
2091 }
2092 }
2093 } else {
2094 false
2095 };
2096 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
2097 match announce_service_on_intf(
2098 dns_registry,
2099 info,
2100 intf,
2101 &sock.pktinfo,
2102 self.port,
2103 ) {
2104 Ok(announced) => announced,
2105 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2106 invalid_intf_addrs.insert(intf_addr);
2107 false
2108 }
2109 }
2110 } else {
2111 false
2112 };
2113
2114 if announced_v4 || announced_v6 {
2115 let next_time = now + 1000;
2116 let command =
2117 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
2118 self.retransmissions.push(ReRun { next_time, command });
2119 self.timers.push(Reverse(next_time));
2120
2121 let fullname = dns_registry.resolve_name(&service_name).to_string();
2122
2123 let hostname = dns_registry.resolve_name(info.get_hostname());
2124
2125 debug!("wake up: announce service {} on {}", fullname, intf.name);
2126 notify_monitors(
2127 &mut self.monitors,
2128 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
2129 );
2130
2131 info.set_status(*if_index, ServiceStatus::Announced);
2132 }
2133 }
2134 }
2135 }
2136
2137 if !invalid_intf_addrs.is_empty() {
2138 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2139 }
2140 }
2141
2142 fn unregister_service(
2143 &self,
2144 info: &ServiceInfo,
2145 intf: &MyIntf,
2146 sock: &PktInfoUdpSocket,
2147 ) -> Vec<u8> {
2148 let is_ipv4 = sock.domain() == Domain::IPV4;
2149
2150 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2151 out.add_answer_at_time(
2152 DnsPointer::new(
2153 info.get_type(),
2154 RRType::PTR,
2155 CLASS_IN,
2156 0,
2157 info.get_fullname().to_string(),
2158 ),
2159 0,
2160 );
2161
2162 if let Some(sub) = info.get_subtype() {
2163 trace!("Adding subdomain {}", sub);
2164 out.add_answer_at_time(
2165 DnsPointer::new(
2166 sub,
2167 RRType::PTR,
2168 CLASS_IN,
2169 0,
2170 info.get_fullname().to_string(),
2171 ),
2172 0,
2173 );
2174 }
2175
2176 out.add_answer_at_time(
2177 DnsSrv::new(
2178 info.get_fullname(),
2179 CLASS_IN | CLASS_CACHE_FLUSH,
2180 0,
2181 info.get_priority(),
2182 info.get_weight(),
2183 info.get_port(),
2184 info.get_hostname().to_string(),
2185 ),
2186 0,
2187 );
2188 out.add_answer_at_time(
2189 DnsTxt::new(
2190 info.get_fullname(),
2191 CLASS_IN | CLASS_CACHE_FLUSH,
2192 0,
2193 info.generate_txt(),
2194 ),
2195 0,
2196 );
2197
2198 let if_addrs = if is_ipv4 {
2199 info.get_addrs_on_my_intf_v4(intf)
2200 } else {
2201 info.get_addrs_on_my_intf_v6(intf)
2202 };
2203
2204 if if_addrs.is_empty() {
2205 return vec![];
2206 }
2207
2208 for address in if_addrs {
2209 out.add_answer_at_time(
2210 DnsAddress::new(
2211 info.get_hostname(),
2212 ip_address_rr_type(&address),
2213 CLASS_IN | CLASS_CACHE_FLUSH,
2214 0,
2215 address,
2216 intf.into(),
2217 ),
2218 0,
2219 );
2220 }
2221
2222 let sent_vec = match send_dns_outgoing(&out, intf, sock, self.port, None) {
2224 Ok(sent_vec) => sent_vec,
2225 Err(InternalError::IntfAddrInvalid(intf_addr)) => {
2226 let invalid_intf_addrs = HashSet::from([intf_addr]);
2227 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2228 vec![]
2229 }
2230 };
2231 sent_vec.into_iter().next().unwrap_or_default()
2232 }
2233
2234 fn add_hostname_resolver(
2238 &mut self,
2239 hostname: String,
2240 listener: Sender<HostnameResolutionEvent>,
2241 timeout: Option<u64>,
2242 ) {
2243 let real_timeout = timeout.map(|t| current_time_millis() + t);
2244 self.hostname_resolvers
2245 .insert(hostname.to_lowercase(), (listener, real_timeout));
2246 if let Some(t) = real_timeout {
2247 self.add_timer(t);
2248 }
2249 }
2250
2251 fn send_query(&self, name: &str, qtype: RRType) {
2253 self.send_query_vec(&[(name, qtype)]);
2254 }
2255
2256 fn send_query_on_intf(&self, name: &str, qtype: RRType, intf: &MyIntf) {
2261 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2262 out.add_question(name, qtype);
2263
2264 let mut invalid_intf_addrs = HashSet::new();
2265 if let Some(sock) = self.ipv4_sock.as_ref() {
2266 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2267 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2268 {
2269 invalid_intf_addrs.insert(intf_addr);
2270 }
2271 }
2272 if let Some(sock) = self.ipv6_sock.as_ref() {
2273 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2274 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2275 {
2276 invalid_intf_addrs.insert(intf_addr);
2277 }
2278 }
2279 if !invalid_intf_addrs.is_empty() {
2280 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2281 }
2282 }
2283
2284 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2286 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2287 let now = current_time_millis();
2288
2289 for (name, qtype) in questions {
2290 out.add_question(name, *qtype);
2291
2292 for record in self.cache.get_known_answers(name, *qtype, now) {
2293 trace!("add known answer: {:?}", record.record);
2301 let mut new_record = record.record.clone();
2302 new_record.get_record_mut().update_ttl(now);
2303 out.add_answer_box(new_record);
2304 }
2305 }
2306
2307 let mut invalid_intf_addrs = HashSet::new();
2308 for (_, intf) in self.my_intfs.iter() {
2309 if let Some(sock) = self.ipv4_sock.as_ref() {
2310 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2311 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2312 {
2313 invalid_intf_addrs.insert(intf_addr);
2314 }
2315 }
2316 if let Some(sock) = self.ipv6_sock.as_ref() {
2317 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
2318 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, None)
2319 {
2320 invalid_intf_addrs.insert(intf_addr);
2321 }
2322 }
2323 }
2324
2325 if !invalid_intf_addrs.is_empty() {
2326 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addrs));
2327 }
2328 }
2329
2330 fn handle_read(&mut self, event_key: usize) -> bool {
2335 let sock_opt = match event_key {
2336 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2337 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2338 _ => {
2339 debug!("handle_read: unknown token {}", event_key);
2340 return false;
2341 }
2342 };
2343 let Some(sock) = sock_opt.as_mut() else {
2344 debug!("handle_read: socket not available for token {}", event_key);
2345 return false;
2346 };
2347 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2348
2349 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2356 Ok(sz) => sz,
2357 Err(e) => {
2358 if e.kind() != std::io::ErrorKind::WouldBlock {
2359 debug!("listening socket read failed: {}", e);
2360 }
2361 return false;
2362 }
2363 };
2364
2365 let pkt_if_index = pktinfo.if_index as u32;
2367 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2368 debug!(
2369 "handle_read: no interface found for pktinfo if_index: {}",
2370 pktinfo.if_index
2371 );
2372 return true; };
2374
2375 let is_ipv4 = event_key == IPV4_SOCK_EVENT_KEY;
2380 if (is_ipv4 && my_intf.next_ifaddr_v4().is_none())
2381 || (!is_ipv4 && my_intf.next_ifaddr_v6().is_none())
2382 {
2383 debug!(
2384 "handle_read: dropping {} packet on intf {} (disabled)",
2385 if is_ipv4 { "IPv4" } else { "IPv6" },
2386 my_intf.name
2387 );
2388 return true;
2389 }
2390
2391 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
2394 Ok(msg) => {
2395 if msg.is_query() {
2396 let querier_ip = pktinfo.addr_src.ip();
2397 self.handle_query(msg, pkt_if_index, querier_ip);
2398 } else if msg.is_response() {
2399 self.handle_response(msg, pkt_if_index);
2400 } else {
2401 debug!("Invalid message: not query and not response");
2402 }
2403 }
2404 Err(e) => debug!("Invalid incoming DNS message: {}", e),
2405 }
2406
2407 true
2408 }
2409
2410 fn query_unresolved(&mut self, instance: &str) -> bool {
2412 if !valid_instance_name(instance) {
2413 trace!("instance name {} not valid", instance);
2414 return false;
2415 }
2416
2417 if let Some(records) = self.cache.get_srv(instance) {
2418 for record in records {
2419 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2420 if self.cache.get_addr(srv.host()).is_none() {
2421 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2422 return true;
2423 }
2424 }
2425 }
2426 } else {
2427 self.send_query(instance, RRType::ANY);
2428 return true;
2429 }
2430
2431 false
2432 }
2433
2434 fn query_cache_for_service(
2437 &mut self,
2438 ty_domain: &str,
2439 sender: &Sender<ServiceEvent>,
2440 now: u64,
2441 ) {
2442 let mut resolved: HashSet<String> = HashSet::new();
2443 let mut unresolved: HashSet<String> = HashSet::new();
2444
2445 if let Some(records) = self.cache.get_ptr(ty_domain) {
2446 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2447 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2448 let mut new_event = None;
2449 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2450 Ok(resolved_service) => {
2451 if resolved_service.is_valid() {
2452 debug!("Resolved service from cache: {}", ptr.alias());
2453 new_event =
2454 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2455 } else {
2456 debug!("Resolved service is not valid: {}", ptr.alias());
2457 }
2458 }
2459 Err(err) => {
2460 debug!("Error while resolving service from cache: {}", err);
2461 continue;
2462 }
2463 }
2464
2465 match sender.send(ServiceEvent::ServiceFound(
2466 ty_domain.to_string(),
2467 ptr.alias().to_string(),
2468 )) {
2469 Ok(()) => debug!("sent service found {}", ptr.alias()),
2470 Err(e) => {
2471 debug!("failed to send service found: {}", e);
2472 continue;
2473 }
2474 }
2475
2476 if let Some(event) = new_event {
2477 resolved.insert(ptr.alias().to_string());
2478 match sender.send(event) {
2479 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2480 Err(e) => debug!("failed to send service resolved: {}", e),
2481 }
2482 } else {
2483 unresolved.insert(ptr.alias().to_string());
2484 }
2485 }
2486 }
2487 }
2488
2489 for instance in resolved.drain() {
2490 self.pending_resolves.remove(&instance);
2491 self.resolved.insert(instance);
2492 }
2493
2494 for instance in unresolved.drain() {
2495 self.add_pending_resolve(instance);
2496 }
2497 }
2498
2499 fn query_cache_for_hostname(
2502 &mut self,
2503 hostname: &str,
2504 sender: Sender<HostnameResolutionEvent>,
2505 ) {
2506 let addresses_map = self.cache.get_addresses_for_host(hostname);
2507 for (name, addresses) in addresses_map {
2508 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2509 Ok(()) => trace!("sent hostname addresses found"),
2510 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2511 }
2512 }
2513 }
2514
2515 fn add_pending_resolve(&mut self, instance: String) {
2516 if !self.pending_resolves.contains(&instance) {
2517 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2518 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2519 self.pending_resolves.insert(instance);
2520 }
2521 }
2522
2523 fn resolve_service_from_cache(
2525 &self,
2526 ty_domain: &str,
2527 fullname: &str,
2528 ) -> Result<ResolvedService> {
2529 let now = current_time_millis();
2530 let mut resolved_service = ResolvedService {
2531 ty_domain: ty_domain.to_string(),
2532 sub_ty_domain: None,
2533 fullname: fullname.to_string(),
2534 host: String::new(),
2535 port: 0,
2536 addresses: HashSet::new(),
2537 txt_properties: TxtProperties::new(),
2538 };
2539
2540 if let Some(subtype) = self.cache.get_subtype(fullname) {
2542 trace!(
2543 "ty_domain: {} found subtype {} for instance: {}",
2544 ty_domain,
2545 subtype,
2546 fullname
2547 );
2548 if resolved_service.sub_ty_domain.is_none() {
2549 resolved_service.sub_ty_domain = Some(subtype.to_string());
2550 }
2551 }
2552
2553 if let Some(records) = self.cache.get_srv(fullname) {
2555 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2556 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2557 resolved_service.host = dns_srv.host().to_string();
2558 resolved_service.port = dns_srv.port();
2559 }
2560 }
2561 }
2562
2563 if let Some(records) = self.cache.get_txt(fullname) {
2565 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2566 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2567 resolved_service.txt_properties = dns_txt.text().into();
2568 }
2569 }
2570 }
2571
2572 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2574 for answer in records.iter() {
2575 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2576 if dns_a.expires_soon(now) {
2577 trace!(
2578 "Addr expired or expires soon: {}",
2579 dns_a.address().to_ip_addr()
2580 );
2581 } else {
2582 let scoped = dns_a.address();
2583 if let ScopedIp::V4(v4) = &scoped {
2584 let existing = resolved_service
2587 .addresses
2588 .iter()
2589 .find(|a| a.to_ip_addr() == IpAddr::V4(*v4.addr()))
2590 .cloned();
2591 if let Some(mut existing) = existing {
2592 resolved_service.addresses.remove(&existing);
2593 if let ScopedIp::V4(existing_v4) = &mut existing {
2594 for id in v4.interface_ids() {
2595 existing_v4.add_interface_id(id.clone());
2596 }
2597 }
2598 resolved_service.addresses.insert(existing);
2599 } else {
2600 resolved_service.addresses.insert(scoped);
2601 }
2602 } else {
2603 resolved_service.addresses.insert(scoped);
2604 }
2605 }
2606 }
2607 }
2608 }
2609
2610 Ok(resolved_service)
2611 }
2612
2613 fn handle_poller_events(&mut self, events: &mio::Events) {
2614 for ev in events.iter() {
2615 trace!("event received with key {:?}", ev.token());
2616 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2617 self.signal_sock_drain();
2619
2620 if let Err(e) = self.poller.registry().reregister(
2621 &mut self.signal_sock,
2622 ev.token(),
2623 mio::Interest::READABLE,
2624 ) {
2625 debug!("failed to modify poller for signal socket: {}", e);
2626 }
2627 continue; }
2629
2630 while self.handle_read(ev.token().0) {}
2632
2633 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2635 if let Some(sock) = self.ipv4_sock.as_mut() {
2637 if let Err(e) =
2638 self.poller
2639 .registry()
2640 .reregister(sock, ev.token(), mio::Interest::READABLE)
2641 {
2642 debug!("modify poller for IPv4 socket: {}", e);
2643 }
2644 }
2645 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2646 if let Some(sock) = self.ipv6_sock.as_mut() {
2648 if let Err(e) =
2649 self.poller
2650 .registry()
2651 .reregister(sock, ev.token(), mio::Interest::READABLE)
2652 {
2653 debug!("modify poller for IPv6 socket: {}", e);
2654 }
2655 }
2656 }
2657 }
2658 }
2659
2660 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2663 let now = current_time_millis();
2664
2665 let mut record_predicate = |record: &DnsRecordBox| {
2667 if !record.get_record().is_expired(now) {
2668 return true;
2669 }
2670
2671 debug!("record is expired, removing it from cache.");
2672 if self.cache.remove(record) {
2673 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2675 call_service_listener(
2676 &self.service_queriers,
2677 dns_ptr.get_name(),
2678 ServiceEvent::ServiceRemoved(
2679 dns_ptr.get_name().to_string(),
2680 dns_ptr.alias().to_string(),
2681 ),
2682 );
2683 }
2684 }
2685 false
2686 };
2687 msg.answers_mut().retain(&mut record_predicate);
2688 msg.authorities_mut().retain(&mut record_predicate);
2689 msg.additionals_mut().retain(&mut record_predicate);
2690
2691 self.conflict_handler(&msg, if_index);
2693
2694 let mut is_for_us = true; for answer in msg.answers() {
2701 if answer.get_type() == RRType::PTR {
2702 if self.service_queriers.contains_key(answer.get_name()) {
2703 is_for_us = true;
2704 break; } else {
2706 is_for_us = false;
2707 }
2708 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2709 let answer_lowercase = answer.get_name().to_lowercase();
2711 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2712 is_for_us = true;
2713 break; }
2715 }
2716 }
2717
2718 if self.accept_unsolicited {
2720 is_for_us = true;
2721 }
2722
2723 struct InstanceChange {
2725 ty: RRType, name: String, }
2728
2729 let mut changes = Vec::new();
2737 let mut timers = Vec::new();
2738 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2739 return;
2740 };
2741 for record in msg.all_records() {
2742 match self
2743 .cache
2744 .add_or_update(my_intf, record, &mut timers, is_for_us)
2745 {
2746 Some((dns_record, true)) => {
2747 timers.push(dns_record.record.get_record().get_expire_time());
2748 timers.push(dns_record.record.get_record().get_refresh_time());
2749
2750 let ty = dns_record.record.get_type();
2751 let name = dns_record.record.get_name();
2752
2753 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2755 if self.service_queriers.contains_key(name) {
2756 timers.push(dns_record.record.get_record().get_refresh_time());
2757 }
2758
2759 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2761 {
2762 debug!("calling listener with service found: {name}");
2763 call_service_listener(
2764 &self.service_queriers,
2765 name,
2766 ServiceEvent::ServiceFound(
2767 name.to_string(),
2768 dns_ptr.alias().to_string(),
2769 ),
2770 );
2771 changes.push(InstanceChange {
2772 ty,
2773 name: dns_ptr.alias().to_string(),
2774 });
2775 }
2776 } else {
2777 changes.push(InstanceChange {
2778 ty,
2779 name: name.to_string(),
2780 });
2781 }
2782 }
2783 Some((dns_record, false)) => {
2784 timers.push(dns_record.record.get_record().get_expire_time());
2785 timers.push(dns_record.record.get_record().get_refresh_time());
2786 }
2787 _ => {}
2788 }
2789 }
2790
2791 for t in timers {
2793 self.add_timer(t);
2794 }
2795
2796 for change in changes
2798 .iter()
2799 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2800 {
2801 let addr_map = self.cache.get_addresses_for_host(&change.name);
2802 for (name, addresses) in addr_map {
2803 call_hostname_resolution_listener(
2804 &self.hostname_resolvers,
2805 &change.name,
2806 HostnameResolutionEvent::AddressesFound(name, addresses),
2807 )
2808 }
2809 }
2810
2811 let mut updated_instances = HashSet::new();
2813 for update in changes {
2814 match update.ty {
2815 RRType::PTR | RRType::SRV | RRType::TXT => {
2816 updated_instances.insert(update.name);
2817 }
2818 RRType::A | RRType::AAAA => {
2819 let instances = self.cache.get_instances_on_host(&update.name);
2820 updated_instances.extend(instances);
2821 }
2822 _ => {}
2823 }
2824 }
2825
2826 self.resolve_updated_instances(&updated_instances);
2827 }
2828
2829 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2830 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2831 debug!("handle_response: no intf found for index {if_index}");
2832 return;
2833 };
2834
2835 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2836 return;
2837 };
2838
2839 for answer in msg.answers().iter() {
2840 let mut new_records = Vec::new();
2841
2842 let name = answer.get_name();
2843 let Some(probe) = dns_registry.probing.get_mut(name) else {
2844 continue;
2845 };
2846
2847 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2849 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2850 if answer_addr.interface_id.index != if_index {
2851 debug!(
2852 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2853 answer_addr, my_intf.name
2854 );
2855 continue;
2856 }
2857 }
2858
2859 let any_match = probe.records.iter().any(|r| {
2862 r.get_type() == answer.get_type()
2863 && r.get_class() == answer.get_class()
2864 && r.rrdata_match(answer.as_ref())
2865 });
2866 if any_match {
2867 continue; }
2869 }
2870
2871 probe.records.retain(|record| {
2872 if record.get_type() == answer.get_type()
2873 && record.get_class() == answer.get_class()
2874 && !record.rrdata_match(answer.as_ref())
2875 {
2876 debug!(
2877 "found conflict name: '{name}' record: {}: {} PEER: {}",
2878 record.get_type(),
2879 record.rdata_print(),
2880 answer.rdata_print()
2881 );
2882
2883 let mut new_record = record.clone();
2886 let new_name = match record.get_type() {
2887 RRType::A => hostname_change(name),
2888 RRType::AAAA => hostname_change(name),
2889 _ => name_change(name),
2890 };
2891 new_record.get_record_mut().set_new_name(new_name);
2892 new_records.push(new_record);
2893 return false; }
2895
2896 true
2897 });
2898
2899 let create_time = current_time_millis() + fastrand::u64(0..250);
2906
2907 let waiting_services = probe.waiting_services.clone();
2908
2909 for record in new_records {
2910 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2911 self.timers.push(Reverse(create_time));
2912 }
2913
2914 dns_registry.name_changes.insert(
2916 record.get_record().get_original_name().to_string(),
2917 record.get_name().to_string(),
2918 );
2919
2920 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2921 Some(p) => p,
2922 None => {
2923 let new_probe = dns_registry
2924 .probing
2925 .entry(record.get_name().to_string())
2926 .or_insert_with(|| {
2927 debug!("conflict handler: new probe of {}", record.get_name());
2928 Probe::new(create_time)
2929 });
2930 self.timers.push(Reverse(new_probe.next_send));
2931 new_probe
2932 }
2933 };
2934
2935 debug!(
2936 "insert record with new name '{}' {} into probe",
2937 record.get_name(),
2938 record.get_type()
2939 );
2940 new_probe.insert_record(record);
2941
2942 new_probe.waiting_services.extend(waiting_services.clone());
2943 }
2944 }
2945 }
2946
2947 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2954 if updated_instances.is_empty() {
2955 return;
2956 }
2957
2958 let mut resolved: HashSet<String> = HashSet::new();
2959 let mut unresolved: HashSet<String> = HashSet::new();
2960 let mut removed_instances = HashMap::new();
2961
2962 let now = current_time_millis();
2963
2964 for (ty_domain, records) in self.cache.all_ptr().iter() {
2965 if !self.service_queriers.contains_key(ty_domain) {
2966 continue;
2968 }
2969
2970 for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2971 let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2972 continue;
2973 };
2974
2975 let instance = dns_ptr.alias();
2976 if !updated_instances.contains(instance) {
2977 continue;
2978 }
2979
2980 let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2981 else {
2982 continue;
2983 };
2984
2985 debug!("resolve_updated_instances: from cache: {instance}");
2986 if resolved_service.is_valid() {
2987 debug!("call queriers to resolve {instance}");
2988 resolved.insert(instance.to_string());
2989 let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2990 call_service_listener(&self.service_queriers, ty_domain, event);
2991 } else {
2992 debug!("Resolved service is not valid: {instance}");
2993 if self.resolved.remove(dns_ptr.alias()) {
2994 removed_instances
2995 .entry(ty_domain.to_string())
2996 .or_insert_with(HashSet::new)
2997 .insert(instance.to_string());
2998 }
2999 unresolved.insert(instance.to_string());
3000 }
3001 }
3002 }
3003
3004 for instance in resolved.drain() {
3005 self.pending_resolves.remove(&instance);
3006 self.resolved.insert(instance);
3007 }
3008
3009 for instance in unresolved.drain() {
3010 self.add_pending_resolve(instance);
3011 }
3012
3013 if !removed_instances.is_empty() {
3014 debug!(
3015 "resolve_updated_instances: removed {}",
3016 &removed_instances.len()
3017 );
3018 self.notify_service_removal(removed_instances);
3019 }
3020 }
3021
3022 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, querier_ip: IpAddr) {
3024 let is_ipv4 = querier_ip.is_ipv4();
3025 let sock_opt = if is_ipv4 {
3026 &self.ipv4_sock
3027 } else {
3028 &self.ipv6_sock
3029 };
3030 let Some(sock) = sock_opt.as_ref() else {
3031 debug!("handle_query: socket not available for intf {}", if_index);
3032 return;
3033 };
3034 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3035
3036 const META_QUERY: &str = "_services._dns-sd._udp.local.";
3039
3040 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3041 debug!("missing dns registry for intf {}", if_index);
3042 return;
3043 };
3044
3045 let Some(intf) = self.my_intfs.get(&if_index) else {
3046 debug!("handle_query: no intf found for index {if_index}");
3047 return;
3048 };
3049
3050 for question in msg.questions().iter() {
3051 let qtype = question.entry_type();
3052 let q_name = question.entry_name();
3053
3054 if qtype == RRType::PTR {
3055 for service in self.my_services.values() {
3056 if service.get_status(if_index) != ServiceStatus::Announced {
3057 continue;
3058 }
3059
3060 if service.matches_type_or_subtype(q_name) {
3061 out.add_answer_with_additionals(&msg, service, intf, dns_registry, is_ipv4);
3062 } else if q_name == META_QUERY {
3063 let ttl = service.get_other_ttl();
3064 let alias = service.get_type().to_string();
3065 let ptr = DnsPointer::new(q_name, RRType::PTR, CLASS_IN, ttl, alias);
3066 if !out.add_answer(&msg, ptr) {
3067 trace!("answer was not added for meta-query {:?}", &question);
3068 }
3069 }
3070 }
3071 } else {
3072 if qtype == RRType::ANY && msg.num_authorities() > 0 {
3074 if let Some(probe) = dns_registry.probing.get_mut(q_name) {
3075 probe.tiebreaking(&msg, q_name);
3076 }
3077 }
3078
3079 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
3080 for service in self.my_services.values() {
3081 if service.get_status(if_index) != ServiceStatus::Announced {
3082 continue;
3083 }
3084
3085 let service_hostname = dns_registry.resolve_name(service.get_hostname());
3086
3087 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
3088 let intf_addrs = if is_ipv4 {
3089 service.get_addrs_on_my_intf_v4(intf)
3090 } else {
3091 service.get_addrs_on_my_intf_v6(intf)
3092 };
3093 if intf_addrs.is_empty()
3094 && (qtype == RRType::A || qtype == RRType::AAAA)
3095 {
3096 let t = match qtype {
3097 RRType::A => "TYPE_A",
3098 RRType::AAAA => "TYPE_AAAA",
3099 _ => "invalid_type",
3100 };
3101 trace!(
3102 "Cannot find valid addrs for {} response on intf {:?}",
3103 t,
3104 &intf
3105 );
3106 return;
3107 }
3108 for address in intf_addrs {
3109 out.add_answer(
3110 &msg,
3111 DnsAddress::new(
3112 service_hostname,
3113 ip_address_rr_type(&address),
3114 CLASS_IN | CLASS_CACHE_FLUSH,
3115 service.get_host_ttl(),
3116 address,
3117 intf.into(),
3118 ),
3119 );
3120 }
3121 }
3122 }
3123 }
3124
3125 let query_name = q_name.to_lowercase();
3126 let service_opt = self
3127 .my_services
3128 .iter()
3129 .find(|(k, _v)| dns_registry.resolve_name(k.as_str()) == query_name)
3130 .map(|(_, v)| v);
3131
3132 let Some(service) = service_opt else {
3133 continue;
3134 };
3135
3136 if service.get_status(if_index) != ServiceStatus::Announced {
3137 continue;
3138 }
3139
3140 let intf_addrs = if is_ipv4 {
3141 service.get_addrs_on_my_intf_v4(intf)
3142 } else {
3143 service.get_addrs_on_my_intf_v6(intf)
3144 };
3145 if intf_addrs.is_empty() {
3146 debug!(
3147 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
3148 &intf
3149 );
3150 continue;
3151 }
3152
3153 add_answer_of_service(
3154 &mut out,
3155 &msg,
3156 question.entry_name(),
3157 service,
3158 qtype,
3159 intf_addrs,
3160 );
3161 }
3162 }
3163
3164 if out.answers_count() > 0 {
3165 debug!("sending response on intf {}", &intf.name);
3166 out.set_id(msg.id());
3167
3168 let matched_source = intf
3171 .addrs
3172 .iter()
3173 .find(|if_addr| valid_ip_on_intf(&querier_ip, if_addr));
3174
3175 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3176 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port, matched_source)
3177 {
3178 let invalid_intf_addr = HashSet::from([intf_addr]);
3179 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3180 }
3181
3182 let if_name = intf.name.clone();
3183
3184 self.increase_counter(Counter::Respond, 1);
3185 self.notify_monitors(DaemonEvent::Respond(if_name));
3186 }
3187
3188 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
3189 }
3190
3191 fn increase_counter(&mut self, counter: Counter, count: i64) {
3193 let key = counter.to_string();
3194 match self.counters.get_mut(&key) {
3195 Some(v) => *v += count,
3196 None => {
3197 self.counters.insert(key, count);
3198 }
3199 }
3200 }
3201
3202 fn set_counter(&mut self, counter: Counter, count: i64) {
3204 let key = counter.to_string();
3205 self.counters.insert(key, count);
3206 }
3207
3208 fn signal_sock_drain(&self) {
3209 let mut signal_buf = [0; 1024];
3210
3211 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
3213 trace!(
3214 "signal socket recvd: {}",
3215 String::from_utf8_lossy(&signal_buf[0..sz])
3216 );
3217 }
3218 }
3219
3220 fn add_retransmission(&mut self, next_time: u64, command: Command) {
3221 self.retransmissions.push(ReRun { next_time, command });
3222 self.add_timer(next_time);
3223 }
3224
3225 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
3228 for (ty_domain, sender) in self.service_queriers.iter() {
3229 if let Some(instances) = expired.get(ty_domain) {
3230 for instance_name in instances {
3231 let event = ServiceEvent::ServiceRemoved(
3232 ty_domain.to_string(),
3233 instance_name.to_string(),
3234 );
3235 match sender.send(event) {
3236 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
3237 Err(e) => debug!("Failed to send event: {}", e),
3238 }
3239 }
3240 }
3241 }
3242 }
3243
3244 fn exec_command(&mut self, command: Command, repeating: bool) {
3248 trace!("exec_command: {:?} repeating: {}", &command, repeating);
3249 match command {
3250 Command::Browse(ty, next_delay, cache_only, listener) => {
3251 self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
3252 }
3253
3254 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
3255 self.exec_command_resolve_hostname(
3256 repeating, hostname, next_delay, listener, timeout,
3257 );
3258 }
3259
3260 Command::Register(service_info) => {
3261 self.register_service(*service_info);
3262 self.increase_counter(Counter::Register, 1);
3263 }
3264
3265 Command::RegisterResend(fullname, intf) => {
3266 trace!("register-resend service: {fullname} on {}", &intf);
3267 if let Err(InternalError::IntfAddrInvalid(intf_addr)) =
3268 self.exec_command_register_resend(fullname, intf)
3269 {
3270 let invalid_intf_addr = HashSet::from([intf_addr]);
3271 let _ = self.send_cmd_to_self(Command::InvalidIntfAddrs(invalid_intf_addr));
3272 }
3273 }
3274
3275 Command::Unregister(fullname, resp_s) => {
3276 trace!("unregister service {} repeat {}", &fullname, &repeating);
3277 self.exec_command_unregister(repeating, fullname, resp_s);
3278 }
3279
3280 Command::UnregisterResend(packet, if_index, is_ipv4) => {
3281 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
3282 }
3283
3284 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
3285
3286 Command::StopResolveHostname(hostname) => {
3287 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
3288 }
3289
3290 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
3291
3292 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3293
3294 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3295 Ok(()) => trace!("Sent status to the client"),
3296 Err(e) => debug!("Failed to send status: {}", e),
3297 },
3298
3299 Command::Monitor(resp_s) => {
3300 self.monitors.push(resp_s);
3301 }
3302
3303 Command::SetOption(daemon_opt) => {
3304 self.process_set_option(daemon_opt);
3305 }
3306
3307 Command::GetOption(resp_s) => {
3308 let val = DaemonOptionVal {
3309 _service_name_len_max: self.service_name_len_max,
3310 ip_check_interval: self.ip_check_interval,
3311 };
3312 if let Err(e) = resp_s.send(val) {
3313 debug!("Failed to send options: {}", e);
3314 }
3315 }
3316
3317 Command::Verify(instance_fullname, timeout) => {
3318 self.exec_command_verify(instance_fullname, timeout, repeating);
3319 }
3320
3321 Command::InvalidIntfAddrs(invalid_intf_addrs) => {
3322 for intf_addr in invalid_intf_addrs {
3323 self.del_interface_addr(&intf_addr);
3324 }
3325
3326 self.check_ip_changes();
3327 }
3328
3329 _ => {
3330 debug!("unexpected command: {:?}", &command);
3331 }
3332 }
3333 }
3334
3335 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3336 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3337 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3338 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3339 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3340 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3341 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3342 self.set_counter(Counter::Timer, self.timers.len() as i64);
3343
3344 let dns_registry_probe_count: usize = self
3345 .dns_registry_map
3346 .values()
3347 .map(|r| r.probing.len())
3348 .sum();
3349 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3350
3351 let dns_registry_active_count: usize = self
3352 .dns_registry_map
3353 .values()
3354 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3355 .sum();
3356 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3357
3358 let dns_registry_timer_count: usize = self
3359 .dns_registry_map
3360 .values()
3361 .map(|r| r.new_timers.len())
3362 .sum();
3363 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3364
3365 let dns_registry_name_change_count: usize = self
3366 .dns_registry_map
3367 .values()
3368 .map(|r| r.name_changes.len())
3369 .sum();
3370 self.set_counter(
3371 Counter::DnsRegistryNameChange,
3372 dns_registry_name_change_count as i64,
3373 );
3374
3375 if let Err(e) = resp_s.send(self.counters.clone()) {
3377 debug!("Failed to send metrics: {}", e);
3378 }
3379 }
3380
3381 fn exec_command_browse(
3382 &mut self,
3383 repeating: bool,
3384 ty: String,
3385 next_delay: u32,
3386 cache_only: bool,
3387 listener: Sender<ServiceEvent>,
3388 ) {
3389 let pretty_addrs: Vec<String> = self
3390 .my_intfs
3391 .iter()
3392 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3393 .collect();
3394
3395 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3396 "{ty} on {} interfaces [{}]",
3397 pretty_addrs.len(),
3398 pretty_addrs.join(", ")
3399 ))) {
3400 debug!(
3401 "Failed to send SearchStarted({})(repeating:{}): {}",
3402 &ty, repeating, e
3403 );
3404 return;
3405 }
3406
3407 let now = current_time_millis();
3408 if !repeating {
3409 self.service_queriers.insert(ty.clone(), listener.clone());
3413
3414 self.query_cache_for_service(&ty, &listener, now);
3416 }
3417
3418 if cache_only {
3419 match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3421 Ok(()) => debug!("SearchStopped sent for {}", &ty),
3422 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3423 }
3424 return;
3425 }
3426
3427 self.send_query(&ty, RRType::PTR);
3428 self.increase_counter(Counter::Browse, 1);
3429
3430 let next_time = now + (next_delay * 1000) as u64;
3431 let max_delay = 60 * 60;
3432 let delay = cmp::min(next_delay * 2, max_delay);
3433 self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3434 }
3435
3436 fn exec_command_resolve_hostname(
3437 &mut self,
3438 repeating: bool,
3439 hostname: String,
3440 next_delay: u32,
3441 listener: Sender<HostnameResolutionEvent>,
3442 timeout: Option<u64>,
3443 ) {
3444 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3445 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3446 "{} on addrs {:?}",
3447 &hostname, &addr_list
3448 ))) {
3449 debug!(
3450 "Failed to send ResolveStarted({})(repeating:{}): {}",
3451 &hostname, repeating, e
3452 );
3453 return;
3454 }
3455 if !repeating {
3456 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3457 self.query_cache_for_hostname(&hostname, listener.clone());
3459 }
3460
3461 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3462 self.increase_counter(Counter::ResolveHostname, 1);
3463
3464 let now = current_time_millis();
3465 let next_time = now + u64::from(next_delay) * 1000;
3466 let max_delay = 60 * 60;
3467 let delay = cmp::min(next_delay * 2, max_delay);
3468
3469 if self
3471 .hostname_resolvers
3472 .get(&hostname)
3473 .and_then(|(_sender, timeout)| *timeout)
3474 .map(|timeout| next_time < timeout)
3475 .unwrap_or(true)
3476 {
3477 self.add_retransmission(
3478 next_time,
3479 Command::ResolveHostname(hostname, delay, listener, None),
3480 );
3481 }
3482 }
3483
3484 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3485 let pending_query = self.query_unresolved(&instance);
3486 let max_try = 3;
3487 if pending_query && try_count < max_try {
3488 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3491 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3492 }
3493 }
3494
3495 fn exec_command_unregister(
3496 &mut self,
3497 repeating: bool,
3498 fullname: String,
3499 resp_s: Sender<UnregisterStatus>,
3500 ) {
3501 let response = match self.my_services.remove_entry(&fullname) {
3502 None => {
3503 debug!("unregister: cannot find such service {}", &fullname);
3504 UnregisterStatus::NotFound
3505 }
3506 Some((_k, info)) => {
3507 let mut timers = Vec::new();
3508
3509 for (if_index, intf) in self.my_intfs.iter() {
3510 if let Some(sock) = self.ipv4_sock.as_ref() {
3511 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3512 if !repeating && !packet.is_empty() {
3514 let next_time = current_time_millis() + 120;
3515 self.retransmissions.push(ReRun {
3516 next_time,
3517 command: Command::UnregisterResend(packet, *if_index, true),
3518 });
3519 timers.push(next_time);
3520 }
3521 }
3522
3523 if let Some(sock) = self.ipv6_sock.as_ref() {
3525 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3526 if !repeating && !packet.is_empty() {
3527 let next_time = current_time_millis() + 120;
3528 self.retransmissions.push(ReRun {
3529 next_time,
3530 command: Command::UnregisterResend(packet, *if_index, false),
3531 });
3532 timers.push(next_time);
3533 }
3534 }
3535 }
3536
3537 for t in timers {
3538 self.add_timer(t);
3539 }
3540
3541 self.increase_counter(Counter::Unregister, 1);
3542 UnregisterStatus::OK
3543 }
3544 };
3545 if let Err(e) = resp_s.send(response) {
3546 debug!("unregister: failed to send response: {}", e);
3547 }
3548 }
3549
3550 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3551 let Some(intf) = self.my_intfs.get(&if_index) else {
3552 return;
3553 };
3554 let sock_opt = if is_ipv4 {
3555 &self.ipv4_sock
3556 } else {
3557 &self.ipv6_sock
3558 };
3559 let Some(sock) = sock_opt else {
3560 return;
3561 };
3562
3563 let if_addr = if is_ipv4 {
3564 match intf.next_ifaddr_v4() {
3565 Some(addr) => addr,
3566 None => return,
3567 }
3568 } else {
3569 match intf.next_ifaddr_v6() {
3570 Some(addr) => addr,
3571 None => return,
3572 }
3573 };
3574
3575 debug!("UnregisterResend from {:?}", if_addr);
3576 multicast_on_intf(
3577 &packet[..],
3578 &intf.name,
3579 intf.index,
3580 if_addr,
3581 &sock.pktinfo,
3582 self.port,
3583 );
3584
3585 self.increase_counter(Counter::UnregisterResend, 1);
3586 }
3587
3588 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3589 match self.service_queriers.remove_entry(&ty_domain) {
3590 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3591 Some((ty, sender)) => {
3592 trace!("StopBrowse: removed queryer for {}", &ty);
3594 let mut i = 0;
3595 while i < self.retransmissions.len() {
3596 if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3597 if t == &ty {
3598 self.retransmissions.remove(i);
3599 trace!("StopBrowse: removed retransmission for {}", &ty);
3600 continue;
3601 }
3602 }
3603 i += 1;
3604 }
3605
3606 self.cache.remove_service_type(&ty_domain);
3608
3609 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3611 Ok(()) => trace!("Sent SearchStopped to the listener"),
3612 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3613 }
3614 }
3615 }
3616 }
3617
3618 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3619 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3620 trace!("StopResolve: removed queryer for {}", &host);
3622 let mut i = 0;
3623 while i < self.retransmissions.len() {
3624 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3625 if t == &host {
3626 self.retransmissions.remove(i);
3627 trace!("StopResolve: removed retransmission for {}", &host);
3628 continue;
3629 }
3630 }
3631 i += 1;
3632 }
3633
3634 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3636 Ok(()) => trace!("Sent SearchStopped to the listener"),
3637 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3638 }
3639 }
3640 }
3641
3642 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) -> MyResult<()> {
3643 let Some(info) = self.my_services.get_mut(&fullname) else {
3644 trace!("announce: cannot find such service {}", &fullname);
3645 return Ok(());
3646 };
3647
3648 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3649 return Ok(());
3650 };
3651
3652 let Some(intf) = self.my_intfs.get(&if_index) else {
3653 return Ok(());
3654 };
3655
3656 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3657 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3658 } else {
3659 false
3660 };
3661 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3662 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)?
3663 } else {
3664 false
3665 };
3666
3667 if announced_v4 || announced_v6 {
3668 let hostname = dns_registry.resolve_name(info.get_hostname());
3669 let service_name = dns_registry.resolve_name(&fullname).to_string();
3670
3671 debug!("resend: announce service {service_name} on {}", intf.name);
3672
3673 notify_monitors(
3674 &mut self.monitors,
3675 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3676 );
3677 info.set_status(if_index, ServiceStatus::Announced);
3678 } else {
3679 debug!("register-resend should not fail");
3680 }
3681
3682 self.increase_counter(Counter::RegisterResend, 1);
3683 Ok(())
3684 }
3685
3686 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3687 let now = current_time_millis();
3697 let expire_at = if repeating {
3698 None
3699 } else {
3700 Some(now + timeout.as_millis() as u64)
3701 };
3702
3703 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3705
3706 if !record_vec.is_empty() {
3707 let query_vec: Vec<(&str, RRType)> = record_vec
3708 .iter()
3709 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3710 .collect();
3711 self.send_query_vec(&query_vec);
3712
3713 if let Some(new_expire) = expire_at {
3714 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3718 }
3719 }
3720 }
3721
3722 fn refresh_active_services(&mut self) {
3724 let mut query_ptr_count = 0;
3725 let mut query_srv_count = 0;
3726 let mut new_timers = HashSet::new();
3727 let mut query_addr_count = 0;
3728
3729 for (ty_domain, _sender) in self.service_queriers.iter() {
3730 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3731 if !refreshed_timers.is_empty() {
3732 trace!("sending refresh query for PTR: {}", ty_domain);
3733 self.send_query(ty_domain, RRType::PTR);
3734 query_ptr_count += 1;
3735 new_timers.extend(refreshed_timers);
3736 }
3737
3738 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3739 for (instance, types) in instances {
3740 trace!("sending refresh query for: {}", &instance);
3741 let query_vec = types
3742 .into_iter()
3743 .map(|ty| (instance.as_str(), ty))
3744 .collect::<Vec<_>>();
3745 self.send_query_vec(&query_vec);
3746 query_srv_count += 1;
3747 }
3748 new_timers.extend(timers);
3749 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3750 for hostname in hostnames.iter() {
3751 trace!("sending refresh queries for A and AAAA: {}", hostname);
3752 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3753 query_addr_count += 2;
3754 }
3755 new_timers.extend(timers);
3756 }
3757
3758 for timer in new_timers {
3759 self.add_timer(timer);
3760 }
3761
3762 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3763 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3764 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3765 }
3766}
3767
3768fn add_answer_of_service(
3770 out: &mut DnsOutgoing,
3771 msg: &DnsIncoming,
3772 entry_name: &str,
3773 service: &ServiceInfo,
3774 qtype: RRType,
3775 intf_addrs: Vec<IpAddr>,
3776) {
3777 if qtype == RRType::SRV || qtype == RRType::ANY {
3778 out.add_answer(
3779 msg,
3780 DnsSrv::new(
3781 entry_name,
3782 CLASS_IN | CLASS_CACHE_FLUSH,
3783 service.get_host_ttl(),
3784 service.get_priority(),
3785 service.get_weight(),
3786 service.get_port(),
3787 service.get_hostname().to_string(),
3788 ),
3789 );
3790 }
3791
3792 if qtype == RRType::TXT || qtype == RRType::ANY {
3793 out.add_answer(
3794 msg,
3795 DnsTxt::new(
3796 entry_name,
3797 CLASS_IN | CLASS_CACHE_FLUSH,
3798 service.get_other_ttl(),
3799 service.generate_txt(),
3800 ),
3801 );
3802 }
3803
3804 if qtype == RRType::SRV {
3805 for address in intf_addrs {
3806 out.add_additional_answer(DnsAddress::new(
3807 service.get_hostname(),
3808 ip_address_rr_type(&address),
3809 CLASS_IN | CLASS_CACHE_FLUSH,
3810 service.get_host_ttl(),
3811 address,
3812 InterfaceId::default(),
3813 ));
3814 }
3815 }
3816}
3817
3818#[derive(Clone, Debug)]
3821#[non_exhaustive]
3822pub enum ServiceEvent {
3823 SearchStarted(String),
3825
3826 ServiceFound(String, String),
3828
3829 ServiceResolved(Box<ResolvedService>),
3831
3832 ServiceRemoved(String, String),
3834
3835 SearchStopped(String),
3837}
3838
3839#[derive(Clone, Debug)]
3842#[non_exhaustive]
3843pub enum HostnameResolutionEvent {
3844 SearchStarted(String),
3846 AddressesFound(String, HashSet<ScopedIp>),
3848 AddressesRemoved(String, HashSet<ScopedIp>),
3850 SearchTimeout(String),
3852 SearchStopped(String),
3854}
3855
3856#[derive(Clone, Debug)]
3859#[non_exhaustive]
3860pub enum DaemonEvent {
3861 Announce(String, String),
3863
3864 Error(Error),
3866
3867 IpAdd(IpAddr),
3869
3870 IpDel(IpAddr),
3872
3873 NameChange(DnsNameChange),
3876
3877 Respond(String),
3879}
3880
3881#[derive(Clone, Debug)]
3884pub struct DnsNameChange {
3885 pub original: String,
3887
3888 pub new_name: String,
3898
3899 pub rr_type: RRType,
3901
3902 pub intf_name: String,
3904}
3905
3906#[derive(Debug)]
3908enum Command {
3909 Browse(String, u32, bool, Sender<ServiceEvent>),
3911
3912 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(Box<ServiceInfo>),
3917
3918 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3939
3940 GetStatus(Sender<DaemonStatus>),
3942
3943 Monitor(Sender<DaemonEvent>),
3945
3946 SetOption(DaemonOption),
3947
3948 GetOption(Sender<DaemonOptionVal>),
3949
3950 Verify(String, Duration),
3955
3956 InvalidIntfAddrs(HashSet<Interface>),
3958
3959 Exit(Sender<DaemonStatus>),
3960}
3961
3962impl fmt::Display for Command {
3963 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3964 match self {
3965 Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3966 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3967 Self::Exit(_) => write!(f, "Command Exit"),
3968 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3969 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3970 Self::Monitor(_) => write!(f, "Command Monitor"),
3971 Self::Register(_) => write!(f, "Command Register"),
3972 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3973 Self::SetOption(_) => write!(f, "Command SetOption"),
3974 Self::GetOption(_) => write!(f, "Command GetOption"),
3975 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3976 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3977 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3978 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3979 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3980 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3981 Self::InvalidIntfAddrs(_) => write!(f, "Command InvalidIntfAddrs"),
3982 }
3983 }
3984}
3985
3986struct DaemonOptionVal {
3987 _service_name_len_max: u8,
3988 ip_check_interval: u64,
3989}
3990
3991#[derive(Debug)]
3992enum DaemonOption {
3993 ServiceNameLenMax(u8),
3994 IpCheckInterval(u64),
3995 EnableInterface(Vec<IfKind>),
3996 DisableInterface(Vec<IfKind>),
3997 MulticastLoopV4(bool),
3998 MulticastLoopV6(bool),
3999 AcceptUnsolicited(bool),
4000 IncludeAppleP2P(bool),
4001 #[cfg(test)]
4002 TestDownInterface(String),
4003 #[cfg(test)]
4004 TestUpInterface(String),
4005}
4006
4007const DOMAIN_LEN: usize = "._tcp.local.".len();
4009
4010fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
4012 if ty_domain.len() <= DOMAIN_LEN + 1 {
4013 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
4015 }
4016
4017 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
4019 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
4020 }
4021 Ok(())
4022}
4023
4024fn check_domain_suffix(name: &str) -> Result<()> {
4026 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
4027 return Err(e_fmt!(
4028 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
4029 name
4030 ));
4031 }
4032
4033 Ok(())
4034}
4035
4036fn check_service_name(fullname: &str) -> Result<()> {
4044 check_domain_suffix(fullname)?;
4045
4046 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
4047 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
4048
4049 if &name[0..1] != "_" {
4050 return Err(e_fmt!("Service name must start with '_'"));
4051 }
4052
4053 let name = &name[1..];
4054
4055 if name.contains("--") {
4056 return Err(e_fmt!("Service name must not contain '--'"));
4057 }
4058
4059 if name.starts_with('-') || name.ends_with('-') {
4060 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
4061 }
4062
4063 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
4064 if ascii_count < 1 {
4065 return Err(e_fmt!(
4066 "Service name must contain at least one letter (eg: 'A-Za-z')"
4067 ));
4068 }
4069
4070 Ok(())
4071}
4072
4073fn check_hostname(hostname: &str) -> Result<()> {
4075 if !hostname.ends_with(".local.") {
4076 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
4077 }
4078
4079 if hostname == ".local." {
4080 return Err(e_fmt!(
4081 "The part of the hostname before '.local.' cannot be empty"
4082 ));
4083 }
4084
4085 if hostname.len() > 255 {
4086 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
4087 }
4088
4089 Ok(())
4090}
4091
4092fn call_service_listener(
4093 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
4094 ty_domain: &str,
4095 event: ServiceEvent,
4096) {
4097 if let Some(listener) = listeners_map.get(ty_domain) {
4098 match listener.send(event) {
4099 Ok(()) => trace!("Sent event to listener successfully"),
4100 Err(e) => debug!("Failed to send event: {}", e),
4101 }
4102 }
4103}
4104
4105fn call_hostname_resolution_listener(
4106 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
4107 hostname: &str,
4108 event: HostnameResolutionEvent,
4109) {
4110 let hostname_lower = hostname.to_lowercase();
4111 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
4112 match listener.send(event) {
4113 Ok(()) => trace!("Sent event to listener successfully"),
4114 Err(e) => debug!("Failed to send event: {}", e),
4115 }
4116 }
4117}
4118
4119fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
4123 my_ip_interfaces_inner(with_loopback, false)
4124}
4125
4126fn my_ip_interfaces_inner(with_loopback: bool, with_apple_p2p: bool) -> Vec<Interface> {
4127 if_addrs::get_if_addrs()
4128 .unwrap_or_default()
4129 .into_iter()
4130 .filter(|i| {
4131 i.is_oper_up()
4132 && !i.is_p2p()
4133 && (!i.is_loopback() || with_loopback)
4134 && (with_apple_p2p || !is_apple_p2p_by_name(&i.name))
4135 })
4136 .collect()
4137}
4138
4139fn is_apple_p2p_by_name(name: &str) -> bool {
4142 let p2p_prefixes = ["awdl", "llw"];
4143 p2p_prefixes.iter().any(|prefix| name.starts_with(prefix))
4144}
4145
4146fn send_dns_outgoing(
4149 out: &DnsOutgoing,
4150 my_intf: &MyIntf,
4151 sock: &PktInfoUdpSocket,
4152 port: u16,
4153 source: Option<&IfAddr>,
4154) -> MyResult<Vec<Vec<u8>>> {
4155 let if_name = &my_intf.name;
4156
4157 let if_addr = match source {
4158 Some(addr) => addr,
4159 None => {
4160 if sock.domain() == Domain::IPV4 {
4161 match my_intf.next_ifaddr_v4() {
4162 Some(addr) => addr,
4163 None => return Ok(vec![]),
4164 }
4165 } else {
4166 match my_intf.next_ifaddr_v6() {
4167 Some(addr) => addr,
4168 None => return Ok(vec![]),
4169 }
4170 }
4171 }
4172 };
4173
4174 send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock, port)
4175}
4176
4177fn send_dns_outgoing_impl(
4179 out: &DnsOutgoing,
4180 if_name: &str,
4181 if_index: u32,
4182 if_addr: &IfAddr,
4183 sock: &PktInfoUdpSocket,
4184 port: u16,
4185) -> MyResult<Vec<Vec<u8>>> {
4186 let qtype = if out.is_query() {
4187 "query"
4188 } else {
4189 if out.answers_count() == 0 && out.additionals().is_empty() {
4190 return Ok(vec![]); }
4192 "response"
4193 };
4194 trace!(
4195 "send {}: {} questions {} answers {} authorities {} additional",
4196 qtype,
4197 out.questions().len(),
4198 out.answers_count(),
4199 out.authorities().len(),
4200 out.additionals().len()
4201 );
4202
4203 match if_addr.ip() {
4204 IpAddr::V4(ipv4) => {
4205 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
4206 debug!(
4207 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
4208 ipv4, e
4209 );
4210 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4212 let intf_addr = Interface {
4213 name: if_name.to_string(),
4214 addr: if_addr.clone(),
4215 index: Some(if_index),
4216 oper_status: if_addrs::IfOperStatus::Down,
4217 is_p2p: false,
4218 #[cfg(windows)]
4219 adapter_name: String::new(),
4220 };
4221 return Err(InternalError::IntfAddrInvalid(intf_addr));
4222 }
4223 return Ok(vec![]); }
4225 }
4226 IpAddr::V6(ipv6) => {
4227 if let Err(e) = sock.set_multicast_if_v6(if_index) {
4228 debug!(
4229 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
4230 ipv6, e
4231 );
4232 if e.kind() == std::io::ErrorKind::AddrNotAvailable {
4234 let intf_addr = Interface {
4235 name: if_name.to_string(),
4236 addr: if_addr.clone(),
4237 index: Some(if_index),
4238 oper_status: if_addrs::IfOperStatus::Down,
4239 is_p2p: false,
4240 #[cfg(windows)]
4241 adapter_name: String::new(),
4242 };
4243 return Err(InternalError::IntfAddrInvalid(intf_addr));
4244 }
4245 return Ok(vec![]); }
4247 }
4248 }
4249
4250 let packet_list = out.to_data_on_wire();
4251 for packet in packet_list.iter() {
4252 multicast_on_intf(packet, if_name, if_index, if_addr, sock, port);
4253 }
4254 Ok(packet_list)
4255}
4256
4257fn multicast_on_intf(
4259 packet: &[u8],
4260 if_name: &str,
4261 if_index: u32,
4262 if_addr: &IfAddr,
4263 socket: &PktInfoUdpSocket,
4264 port: u16,
4265) {
4266 if packet.len() > MAX_MSG_ABSOLUTE {
4267 debug!("Drop over-sized packet ({})", packet.len());
4268 return;
4269 }
4270
4271 let addr: SocketAddr = match if_addr {
4272 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
4273 if_addrs::IfAddr::V6(_) => {
4274 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
4275 sock.set_scope_id(if_index); sock.into()
4277 }
4278 };
4279
4280 let sock_addr = addr.into();
4282 match socket.send_to(packet, &sock_addr) {
4283 Ok(sz) => trace!(
4284 "sent out {} bytes on interface {} (idx {}) addr {}",
4285 sz,
4286 if_name,
4287 if_index,
4288 if_addr.ip()
4289 ),
4290 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
4291 }
4292}
4293
4294fn valid_instance_name(name: &str) -> bool {
4298 name.split('.').count() >= 5
4299}
4300
4301fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
4302 monitors.retain(|sender| {
4303 if let Err(e) = sender.try_send(event.clone()) {
4304 debug!("notify_monitors: try_send: {}", &e);
4305 if matches!(e, TrySendError::Disconnected(_)) {
4306 return false; }
4308 }
4309 true
4310 });
4311}
4312
4313fn prepare_announce(
4316 info: &ServiceInfo,
4317 intf: &MyIntf,
4318 dns_registry: &mut DnsRegistry,
4319 is_ipv4: bool,
4320) -> Option<DnsOutgoing> {
4321 let intf_addrs = if is_ipv4 {
4322 info.get_addrs_on_my_intf_v4(intf)
4323 } else {
4324 info.get_addrs_on_my_intf_v6(intf)
4325 };
4326
4327 if intf_addrs.is_empty() {
4328 debug!(
4329 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
4330 &intf.name
4331 );
4332 return None;
4333 }
4334
4335 let service_fullname = dns_registry.resolve_name(info.get_fullname());
4337
4338 debug!(
4339 "prepare to announce service {service_fullname} on {:?}",
4340 &intf_addrs
4341 );
4342
4343 let mut probing_count = 0;
4344 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4345 let create_time = current_time_millis() + fastrand::u64(0..250);
4346
4347 out.add_answer_at_time(
4348 DnsPointer::new(
4349 info.get_type(),
4350 RRType::PTR,
4351 CLASS_IN,
4352 info.get_other_ttl(),
4353 service_fullname.to_string(),
4354 ),
4355 0,
4356 );
4357
4358 if let Some(sub) = info.get_subtype() {
4359 trace!("Adding subdomain {}", sub);
4360 out.add_answer_at_time(
4361 DnsPointer::new(
4362 sub,
4363 RRType::PTR,
4364 CLASS_IN,
4365 info.get_other_ttl(),
4366 service_fullname.to_string(),
4367 ),
4368 0,
4369 );
4370 }
4371
4372 let hostname = dns_registry.resolve_name(info.get_hostname()).to_string();
4374
4375 let mut srv = DnsSrv::new(
4376 info.get_fullname(),
4377 CLASS_IN | CLASS_CACHE_FLUSH,
4378 info.get_host_ttl(),
4379 info.get_priority(),
4380 info.get_weight(),
4381 info.get_port(),
4382 hostname,
4383 );
4384
4385 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4386 srv.get_record_mut().set_new_name(new_name.to_string());
4387 }
4388
4389 if !info.requires_probe()
4390 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4391 {
4392 out.add_answer_at_time(srv, 0);
4393 } else {
4394 probing_count += 1;
4395 }
4396
4397 let mut txt = DnsTxt::new(
4400 info.get_fullname(),
4401 CLASS_IN | CLASS_CACHE_FLUSH,
4402 info.get_other_ttl(),
4403 info.generate_txt(),
4404 );
4405
4406 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4407 txt.get_record_mut().set_new_name(new_name.to_string());
4408 }
4409
4410 if !info.requires_probe()
4411 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4412 {
4413 out.add_answer_at_time(txt, 0);
4414 } else {
4415 probing_count += 1;
4416 }
4417
4418 let hostname = info.get_hostname();
4421 for address in intf_addrs {
4422 let mut dns_addr = DnsAddress::new(
4423 hostname,
4424 ip_address_rr_type(&address),
4425 CLASS_IN | CLASS_CACHE_FLUSH,
4426 info.get_host_ttl(),
4427 address,
4428 intf.into(),
4429 );
4430
4431 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4432 dns_addr.get_record_mut().set_new_name(new_name.to_string());
4433 }
4434
4435 if !info.requires_probe()
4436 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4437 {
4438 out.add_answer_at_time(dns_addr, 0);
4439 } else {
4440 probing_count += 1;
4441 }
4442 }
4443
4444 if probing_count > 0 {
4445 return None;
4446 }
4447
4448 Some(out)
4449}
4450
4451fn announce_service_on_intf(
4454 dns_registry: &mut DnsRegistry,
4455 info: &ServiceInfo,
4456 intf: &MyIntf,
4457 sock: &PktInfoUdpSocket,
4458 port: u16,
4459) -> MyResult<bool> {
4460 let is_ipv4 = sock.domain() == Domain::IPV4;
4461 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4462 let _ = send_dns_outgoing(&out, intf, sock, port, None)?;
4463 return Ok(true);
4464 }
4465
4466 Ok(false)
4467}
4468
4469fn name_change(original: &str) -> String {
4477 let mut parts: Vec<_> = original.split('.').collect();
4478 let Some(first_part) = parts.get_mut(0) else {
4479 return format!("{original} (2)");
4480 };
4481
4482 let mut new_name = format!("{first_part} (2)");
4483
4484 if let Some(paren_pos) = first_part.rfind(" (") {
4486 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4488 let absolute_end_pos = paren_pos + end_paren;
4489 if absolute_end_pos == first_part.len() - 1 {
4491 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4494 let base_name = &first_part[..paren_pos];
4495 new_name = format!("{} ({})", base_name, number + 1)
4496 }
4497 }
4498 }
4499 }
4500
4501 *first_part = &new_name;
4502 parts.join(".")
4503}
4504
4505fn hostname_change(original: &str) -> String {
4513 let mut parts: Vec<_> = original.split('.').collect();
4514 let Some(first_part) = parts.get_mut(0) else {
4515 return format!("{original}-2");
4516 };
4517
4518 let mut new_name = format!("{first_part}-2");
4519
4520 if let Some(hyphen_pos) = first_part.rfind('-') {
4522 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4524 let base_name = &first_part[..hyphen_pos];
4525 new_name = format!("{}-{}", base_name, number + 1);
4526 }
4527 }
4528
4529 *first_part = &new_name;
4530 parts.join(".")
4531}
4532
4533fn check_probing(
4536 dns_registry: &mut DnsRegistry,
4537 timers: &mut BinaryHeap<Reverse<u64>>,
4538 now: u64,
4539) -> (DnsOutgoing, Vec<String>) {
4540 let mut expired_probes = Vec::new();
4541 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4542
4543 for (name, probe) in dns_registry.probing.iter_mut() {
4544 if now >= probe.next_send {
4545 if probe.expired(now) {
4546 expired_probes.push(name.clone());
4548 } else {
4549 out.add_question(name, RRType::ANY);
4550
4551 for record in probe.records.iter() {
4559 out.add_authority(record.clone());
4560 }
4561
4562 probe.update_next_send(now);
4563
4564 timers.push(Reverse(probe.next_send));
4566 }
4567 }
4568 }
4569
4570 (out, expired_probes)
4571}
4572
4573fn handle_expired_probes(
4578 expired_probes: Vec<String>,
4579 intf_name: &str,
4580 dns_registry: &mut DnsRegistry,
4581 monitors: &mut Vec<Sender<DaemonEvent>>,
4582) -> HashSet<String> {
4583 let mut waiting_services = HashSet::new();
4584
4585 for name in expired_probes {
4586 let Some(probe) = dns_registry.probing.remove(&name) else {
4587 continue;
4588 };
4589
4590 for record in probe.records.iter() {
4592 if let Some(new_name) = record.get_record().get_new_name() {
4593 dns_registry
4594 .name_changes
4595 .insert(name.clone(), new_name.to_string());
4596
4597 let event = DnsNameChange {
4598 original: record.get_record().get_original_name().to_string(),
4599 new_name: new_name.to_string(),
4600 rr_type: record.get_type(),
4601 intf_name: intf_name.to_string(),
4602 };
4603 debug!("Name change event: {:?}", &event);
4604 notify_monitors(monitors, DaemonEvent::NameChange(event));
4605 }
4606 }
4607
4608 debug!(
4610 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4611 probe.records.len(),
4612 probe.waiting_services.len(),
4613 );
4614
4615 if !probe.records.is_empty() {
4617 match dns_registry.active.get_mut(&name) {
4618 Some(records) => {
4619 records.extend(probe.records);
4620 }
4621 None => {
4622 dns_registry.active.insert(name, probe.records);
4623 }
4624 }
4625
4626 waiting_services.extend(probe.waiting_services);
4627 }
4628 }
4629
4630 waiting_services
4631}
4632
4633fn resolve_addr_to_index(if_kind: IfKind, interfaces: &[Interface]) -> IfKind {
4635 if let IfKind::Addr(addr) = &if_kind {
4636 if let Some(intf) = interfaces.iter().find(|intf| &intf.ip() == addr) {
4637 let if_index = intf.index.unwrap_or(0);
4638 return if addr.is_ipv4() {
4639 IfKind::IndexV4(if_index)
4640 } else {
4641 IfKind::IndexV6(if_index)
4642 };
4643 }
4644 }
4645 if_kind
4646}
4647
4648#[cfg(test)]
4649mod tests {
4650 use super::{
4651 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4652 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4653 valid_ip_on_intf, HostnameResolutionEvent, MyIntf, ServiceDaemon, ServiceEvent,
4654 ServiceInfo, MDNS_PORT,
4655 };
4656 use crate::{
4657 dns_parser::{
4658 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4659 FLAGS_AA, FLAGS_QR_RESPONSE,
4660 },
4661 service_daemon::{add_answer_of_service, check_hostname},
4662 };
4663 use if_addrs::{IfAddr, Ifv4Addr};
4664 use std::{
4665 collections::HashSet,
4666 net::{IpAddr, Ipv4Addr},
4667 time::{Duration, SystemTime},
4668 };
4669 use test_log::test;
4670
4671 #[test]
4672 fn test_response_source_ifaddr_match() {
4673 let ifaddr_a = IfAddr::V4(Ifv4Addr {
4677 ip: Ipv4Addr::new(192, 168, 1, 148),
4678 netmask: Ipv4Addr::new(255, 255, 255, 0),
4679 broadcast: None,
4680 prefixlen: 24,
4681 });
4682 let ifaddr_b = IfAddr::V4(Ifv4Addr {
4683 ip: Ipv4Addr::new(10, 238, 0, 51),
4684 netmask: Ipv4Addr::new(255, 255, 255, 0),
4685 broadcast: None,
4686 prefixlen: 24,
4687 });
4688
4689 let intf = MyIntf {
4690 name: "dummy0".to_string(),
4691 index: 1,
4692 addrs: HashSet::from([ifaddr_a.clone(), ifaddr_b.clone()]),
4693 };
4694
4695 let pick = |querier: IpAddr| -> Option<IfAddr> {
4696 intf.addrs
4697 .iter()
4698 .find(|a| valid_ip_on_intf(&querier, a))
4699 .cloned()
4700 };
4701
4702 assert_eq!(
4703 pick(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))),
4704 Some(ifaddr_a)
4705 );
4706 assert_eq!(
4707 pick(IpAddr::V4(Ipv4Addr::new(10, 238, 0, 99))),
4708 Some(ifaddr_b)
4709 );
4710 assert_eq!(pick(IpAddr::V4(Ipv4Addr::new(172, 16, 0, 1))), None);
4712 }
4713
4714 #[test]
4715 fn test_instance_name() {
4716 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4717 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4718 assert!(!valid_instance_name("_printer._tcp.local."));
4719 }
4720
4721 #[test]
4722 fn test_check_service_name_length() {
4723 let result = check_service_name_length("_tcp", 100);
4724 assert!(result.is_err());
4725 if let Err(e) = result {
4726 println!("{}", e);
4727 }
4728 }
4729
4730 #[test]
4731 fn test_check_hostname() {
4732 for hostname in &[
4734 "my_host.local.",
4735 &("A".repeat(255 - ".local.".len()) + ".local."),
4736 ] {
4737 let result = check_hostname(hostname);
4738 assert!(result.is_ok());
4739 }
4740
4741 for hostname in &[
4743 "my_host.local",
4744 ".local.",
4745 &("A".repeat(256 - ".local.".len()) + ".local."),
4746 ] {
4747 let result = check_hostname(hostname);
4748 assert!(result.is_err());
4749 if let Err(e) = result {
4750 println!("{}", e);
4751 }
4752 }
4753 }
4754
4755 #[test]
4756 fn test_check_domain_suffix() {
4757 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4758 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4759 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4760 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4761 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4762 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4763 }
4764
4765 #[test]
4766 fn test_service_with_temporarily_invalidated_ptr() {
4767 let d = ServiceDaemon::new().expect("Failed to create daemon");
4769
4770 let service = "_test_inval_ptr._udp.local.";
4771 let host_name = "my_host_tmp_invalidated_ptr.local.";
4772 let intfs: Vec<_> = my_ip_interfaces(false);
4773 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4774 let port = 5201;
4775 let my_service =
4776 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4777 .expect("invalid service info")
4778 .enable_addr_auto();
4779 let result = d.register(my_service.clone());
4780 assert!(result.is_ok());
4781
4782 let browse_chan = d.browse(service).unwrap();
4784 let timeout = Duration::from_secs(2);
4785 let mut resolved = false;
4786
4787 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4788 match event {
4789 ServiceEvent::ServiceResolved(info) => {
4790 resolved = true;
4791 println!("Resolved a service of {}", &info.fullname);
4792 break;
4793 }
4794 e => {
4795 println!("Received event {:?}", e);
4796 }
4797 }
4798 }
4799
4800 assert!(resolved);
4801
4802 println!("Stopping browse of {}", service);
4803 d.stop_browse(service).unwrap();
4806
4807 let mut stopped = false;
4812 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4813 match event {
4814 ServiceEvent::SearchStopped(_) => {
4815 stopped = true;
4816 println!("Stopped browsing service");
4817 break;
4818 }
4819 e => {
4823 println!("Received event {:?}", e);
4824 }
4825 }
4826 }
4827
4828 assert!(stopped);
4829
4830 let invalidate_ptr_packet = DnsPointer::new(
4832 my_service.get_type(),
4833 RRType::PTR,
4834 CLASS_IN,
4835 0,
4836 my_service.get_fullname().to_string(),
4837 );
4838
4839 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4840 packet_buffer.add_additional_answer(invalidate_ptr_packet);
4841
4842 for intf in intfs {
4843 let sock = _new_socket_bind(&intf, true).unwrap();
4844 send_dns_outgoing_impl(
4845 &packet_buffer,
4846 &intf.name,
4847 intf.index.unwrap_or(0),
4848 &intf.addr,
4849 &sock.pktinfo,
4850 MDNS_PORT,
4851 )
4852 .unwrap();
4853 }
4854
4855 println!(
4856 "Sent PTR record invalidation. Starting second browse for {}",
4857 service
4858 );
4859
4860 let browse_chan = d.browse(service).unwrap();
4862
4863 resolved = false;
4864 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4865 match event {
4866 ServiceEvent::ServiceResolved(info) => {
4867 resolved = true;
4868 println!("Resolved a service of {}", &info.fullname);
4869 break;
4870 }
4871 e => {
4872 println!("Received event {:?}", e);
4873 }
4874 }
4875 }
4876
4877 assert!(resolved);
4878 d.shutdown().unwrap();
4879 }
4880
4881 #[test]
4882 fn test_expired_srv() {
4883 let service_type = "_expired-srv._udp.local.";
4885 let instance = "test_instance";
4886 let host_name = "expired_srv_host.local.";
4887 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4888 .unwrap()
4889 .enable_addr_auto();
4890 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
4895
4896 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4898 let result = mdns_server.register(my_service);
4899 assert!(result.is_ok());
4900
4901 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4902 let browse_chan = mdns_client.browse(service_type).unwrap();
4903 let timeout = Duration::from_secs(2);
4904 let mut resolved = false;
4905
4906 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4907 if let ServiceEvent::ServiceResolved(info) = event {
4908 resolved = true;
4909 println!("Resolved a service of {}", &info.fullname);
4910 break;
4911 }
4912 }
4913
4914 assert!(resolved);
4915
4916 mdns_server.shutdown().unwrap();
4918
4919 let expire_timeout = Duration::from_secs(new_ttl as u64);
4921 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4922 if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4923 println!("Service removed: {}: {}", &service_type, &full_name);
4924 break;
4925 }
4926 }
4927 }
4928
4929 #[test]
4930 fn test_hostname_resolution_address_removed() {
4931 let server = ServiceDaemon::new().expect("Failed to create server");
4933 let hostname = "addr_remove_host._tcp.local.";
4934 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4935 .iter()
4936 .find(|iface| iface.ip().is_ipv4())
4937 .map(|iface| iface.into())
4938 .unwrap();
4939
4940 let mut my_service = ServiceInfo::new(
4941 "_host_res_test._tcp.local.",
4942 "my_instance",
4943 hostname,
4944 service_ip_addr.to_ip_addr(),
4945 1234,
4946 None,
4947 )
4948 .expect("invalid service info");
4949
4950 let addr_ttl = 2;
4952 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
4955
4956 let client = ServiceDaemon::new().expect("Failed to create client");
4958 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4959 let resolved = loop {
4960 match event_receiver.recv() {
4961 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4962 assert!(found_hostname == hostname);
4963 assert!(addresses.contains(&service_ip_addr));
4964 println!("address found: {:?}", &addresses);
4965 break true;
4966 }
4967 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4968 Ok(_event) => {}
4969 Err(_) => break false,
4970 }
4971 };
4972
4973 assert!(resolved);
4974
4975 server.shutdown().unwrap();
4977
4978 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4980 let removed = loop {
4981 match event_receiver.recv_timeout(timeout) {
4982 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4983 assert!(removed_host == hostname);
4984 assert!(addresses.contains(&service_ip_addr));
4985
4986 println!(
4987 "address removed: hostname: {} addresses: {:?}",
4988 &hostname, &addresses
4989 );
4990 break true;
4991 }
4992 Ok(_event) => {}
4993 Err(_) => {
4994 break false;
4995 }
4996 }
4997 };
4998
4999 assert!(removed);
5000
5001 client.shutdown().unwrap();
5002 }
5003
5004 #[test]
5005 fn test_refresh_ptr() {
5006 let service_type = "_refresh-ptr._udp.local.";
5008 let instance = "test_instance";
5009 let host_name = "refresh_ptr_host.local.";
5010 let service_ip_addr = my_ip_interfaces(false)
5011 .iter()
5012 .find(|iface| iface.ip().is_ipv4())
5013 .map(|iface| iface.ip())
5014 .unwrap();
5015
5016 let mut my_service = ServiceInfo::new(
5017 service_type,
5018 instance,
5019 host_name,
5020 service_ip_addr,
5021 5023,
5022 None,
5023 )
5024 .unwrap();
5025
5026 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
5028
5029 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5031 let result = mdns_server.register(my_service);
5032 assert!(result.is_ok());
5033
5034 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5035 let browse_chan = mdns_client.browse(service_type).unwrap();
5036 let timeout = Duration::from_millis(1500); let mut resolved = false;
5038
5039 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5041 if let ServiceEvent::ServiceResolved(info) = event {
5042 resolved = true;
5043 println!("Resolved a service of {}", &info.fullname);
5044 break;
5045 }
5046 }
5047
5048 assert!(resolved);
5049
5050 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
5052 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5053 println!("event: {:?}", &event);
5054 }
5055
5056 let metrics_chan = mdns_client.get_metrics().unwrap();
5058 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
5059 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
5060 assert_eq!(ptr_refresh_counter, 1);
5061 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
5062 assert_eq!(srvtxt_refresh_counter, 1);
5063
5064 mdns_server.shutdown().unwrap();
5066 mdns_client.shutdown().unwrap();
5067 }
5068
5069 #[test]
5070 fn test_name_change() {
5071 assert_eq!(name_change("foo.local."), "foo (2).local.");
5072 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
5073 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
5074 assert_eq!(name_change("foo"), "foo (2)");
5075 assert_eq!(name_change("foo (2)"), "foo (3)");
5076 assert_eq!(name_change(""), " (2)");
5077
5078 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
5083
5084 #[test]
5085 fn test_hostname_change() {
5086 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
5087 assert_eq!(hostname_change("foo"), "foo-2");
5088 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
5089 assert_eq!(hostname_change("foo-9"), "foo-10");
5090 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
5091 }
5092
5093 #[test]
5094 fn test_add_answer_txt_ttl() {
5095 let service_type = "_test_add_answer._udp.local.";
5097 let instance = "test_instance";
5098 let host_name = "add_answer_host.local.";
5099 let service_intf = my_ip_interfaces(false)
5100 .into_iter()
5101 .find(|iface| iface.ip().is_ipv4())
5102 .unwrap();
5103 let service_ip_addr = service_intf.ip();
5104 let my_service = ServiceInfo::new(
5105 service_type,
5106 instance,
5107 host_name,
5108 service_ip_addr,
5109 5023,
5110 None,
5111 )
5112 .unwrap();
5113
5114 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
5116
5117 let mut dummy_data = out.to_data_on_wire();
5119 let interface_id = InterfaceId::from(&service_intf);
5120 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
5121
5122 let if_addrs = vec![service_intf.ip()];
5124 add_answer_of_service(
5125 &mut out,
5126 &incoming,
5127 instance,
5128 &my_service,
5129 RRType::TXT,
5130 if_addrs,
5131 );
5132
5133 assert!(
5135 out.answers_count() > 0,
5136 "No answers added to the outgoing message"
5137 );
5138
5139 let answer = out._answers().first().unwrap();
5141 assert_eq!(answer.0.get_type(), RRType::TXT);
5142
5143 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
5145 }
5146
5147 #[test]
5148 fn test_interface_flip() {
5149 let ty_domain = "_intf-flip._udp.local.";
5151 let host_name = "intf_flip.local.";
5152 let now = SystemTime::now()
5153 .duration_since(SystemTime::UNIX_EPOCH)
5154 .unwrap();
5155 let instance_name = now.as_micros().to_string(); let port = 5200;
5157
5158 let (ip_addr1, intf_name) = my_ip_interfaces(false)
5160 .iter()
5161 .find(|iface| iface.ip().is_ipv4())
5162 .map(|iface| (iface.ip(), iface.name.clone()))
5163 .unwrap();
5164
5165 println!("Using interface {} with IP {}", intf_name, ip_addr1);
5166
5167 let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
5169 .expect("valid service info");
5170 let server1 = ServiceDaemon::new().expect("failed to start server");
5171 server1
5172 .register(service1)
5173 .expect("Failed to register service1");
5174
5175 std::thread::sleep(Duration::from_secs(2));
5177
5178 let client = ServiceDaemon::new().expect("failed to start client");
5180
5181 let receiver = client.browse(ty_domain).unwrap();
5182
5183 let timeout = Duration::from_secs(3);
5184 let mut got_data = false;
5185
5186 while let Ok(event) = receiver.recv_timeout(timeout) {
5187 if let ServiceEvent::ServiceResolved(_) = event {
5188 println!("Received ServiceResolved event");
5189 got_data = true;
5190 break;
5191 }
5192 }
5193
5194 assert!(got_data, "Should receive ServiceResolved event");
5195
5196 client.set_ip_check_interval(1).unwrap();
5198
5199 println!("Shutting down interface {}", &intf_name);
5201 client.test_down_interface(&intf_name).unwrap();
5202
5203 let mut got_removed = false;
5204
5205 while let Ok(event) = receiver.recv_timeout(timeout) {
5206 if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
5207 got_removed = true;
5208 println!("removed: {ty_domain} : {instance}");
5209 break;
5210 }
5211 }
5212 assert!(got_removed, "Should receive ServiceRemoved event");
5213
5214 println!("Bringing up interface {}", &intf_name);
5215 client.test_up_interface(&intf_name).unwrap();
5216 let mut got_data = false;
5217 while let Ok(event) = receiver.recv_timeout(timeout) {
5218 if let ServiceEvent::ServiceResolved(resolved) = event {
5219 got_data = true;
5220 println!("Received ServiceResolved: {:?}", resolved);
5221 break;
5222 }
5223 }
5224 assert!(
5225 got_data,
5226 "Should receive ServiceResolved event after interface is back up"
5227 );
5228
5229 server1.shutdown().unwrap();
5230 client.shutdown().unwrap();
5231 }
5232
5233 #[test]
5234 fn test_cache_only() {
5235 let service_type = "_cache_only._udp.local.";
5237 let instance = "test_instance";
5238 let host_name = "cache_only_host.local.";
5239 let service_ip_addr = my_ip_interfaces(false)
5240 .iter()
5241 .find(|iface| iface.ip().is_ipv4())
5242 .map(|iface| iface.ip())
5243 .unwrap();
5244
5245 let mut my_service = ServiceInfo::new(
5246 service_type,
5247 instance,
5248 host_name,
5249 service_ip_addr,
5250 5023,
5251 None,
5252 )
5253 .unwrap();
5254
5255 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
5257
5258 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5259
5260 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5263 std::thread::sleep(Duration::from_secs(2));
5264
5265 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5267 let result = mdns_server.register(my_service);
5268 assert!(result.is_ok());
5269
5270 let timeout = Duration::from_millis(1500); let mut resolved = false;
5272
5273 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5275 if let ServiceEvent::ServiceResolved(info) = event {
5276 resolved = true;
5277 println!("Resolved a service of {}", &info.get_fullname());
5278 break;
5279 }
5280 }
5281
5282 assert!(resolved);
5283
5284 mdns_server.shutdown().unwrap();
5286 mdns_client.shutdown().unwrap();
5287 }
5288
5289 #[test]
5290 fn test_cache_only_unsolicited() {
5291 let service_type = "_c_unsolicit._udp.local.";
5292 let instance = "test_instance";
5293 let host_name = "c_unsolicit_host.local.";
5294 let service_ip_addr = my_ip_interfaces(false)
5295 .iter()
5296 .find(|iface| iface.ip().is_ipv4())
5297 .map(|iface| iface.ip())
5298 .unwrap();
5299
5300 let my_service = ServiceInfo::new(
5301 service_type,
5302 instance,
5303 host_name,
5304 service_ip_addr,
5305 5023,
5306 None,
5307 )
5308 .unwrap();
5309
5310 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
5312 let result = mdns_server.register(my_service);
5313 assert!(result.is_ok());
5314
5315 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
5316 mdns_client.accept_unsolicited(true).unwrap();
5317
5318 std::thread::sleep(Duration::from_secs(2));
5321 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5322 let timeout = Duration::from_millis(1500); let mut resolved = false;
5324
5325 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5327 if let ServiceEvent::ServiceResolved(info) = event {
5328 resolved = true;
5329 println!("Resolved a service of {}", &info.get_fullname());
5330 break;
5331 }
5332 }
5333
5334 assert!(resolved);
5335
5336 mdns_server.shutdown().unwrap();
5338 mdns_client.shutdown().unwrap();
5339 }
5340
5341 #[test]
5342 fn test_custom_port_isolation() {
5343 let service_type = "_custom_port._udp.local.";
5348 let instance_custom = "custom_port_instance";
5349 let instance_default = "default_port_instance";
5350 let host_name = "custom_port_host.local.";
5351
5352 let service_ip_addr = my_ip_interfaces(false)
5353 .iter()
5354 .find(|iface| iface.ip().is_ipv4())
5355 .map(|iface| iface.ip())
5356 .expect("Test requires an IPv4 interface");
5357
5358 let service_custom = ServiceInfo::new(
5360 service_type,
5361 instance_custom,
5362 host_name,
5363 service_ip_addr,
5364 8080,
5365 None,
5366 )
5367 .unwrap();
5368
5369 let service_default = ServiceInfo::new(
5371 service_type,
5372 instance_default,
5373 host_name,
5374 service_ip_addr,
5375 8081,
5376 None,
5377 )
5378 .unwrap();
5379
5380 let custom_port = 5454u16;
5382 let server_custom =
5383 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5384 let client_custom =
5385 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5386
5387 let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5389
5390 server_custom
5392 .register(service_custom.clone())
5393 .expect("Failed to register custom port service");
5394
5395 server_default
5397 .register(service_default.clone())
5398 .expect("Failed to register default port service");
5399
5400 let browse_custom = client_custom
5402 .browse(service_type)
5403 .expect("Failed to browse on custom port");
5404
5405 let timeout = Duration::from_secs(3);
5406 let mut found_custom = false;
5407 let mut found_default_on_custom = false;
5408
5409 while let Ok(event) = browse_custom.recv_timeout(timeout) {
5411 if let ServiceEvent::ServiceResolved(info) = event {
5412 println!(
5413 "Custom port client resolved: {} on port {}",
5414 info.get_fullname(),
5415 info.get_port()
5416 );
5417 if info.get_fullname().starts_with(instance_custom) {
5418 found_custom = true;
5419 assert_eq!(info.get_port(), 8080);
5420 }
5421 if info.get_fullname().starts_with(instance_default) {
5422 found_default_on_custom = true;
5423 }
5424 }
5425 }
5426
5427 assert!(
5428 found_custom,
5429 "Custom port client should find service on custom port"
5430 );
5431 assert!(
5432 !found_default_on_custom,
5433 "Custom port client should NOT find service on default port"
5434 );
5435
5436 let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5439 let browse_default = client_default
5440 .browse(service_type)
5441 .expect("Failed to browse on default port");
5442
5443 let mut found_default = false;
5444 let mut found_custom_on_default = false;
5445
5446 while let Ok(event) = browse_default.recv_timeout(timeout) {
5447 if let ServiceEvent::ServiceResolved(info) = event {
5448 println!(
5449 "Default port client resolved: {} on port {}",
5450 info.get_fullname(),
5451 info.get_port()
5452 );
5453 if info.get_fullname().starts_with(instance_default) {
5454 found_default = true;
5455 assert_eq!(info.get_port(), 8081);
5456 }
5457 if info.get_fullname().starts_with(instance_custom) {
5458 found_custom_on_default = true;
5459 }
5460 }
5461 }
5462
5463 assert!(
5464 found_default,
5465 "Default port client should find service on default port"
5466 );
5467 assert!(
5468 !found_custom_on_default,
5469 "Default port client should NOT find service on custom port"
5470 );
5471
5472 server_custom.shutdown().unwrap();
5474 client_custom.shutdown().unwrap();
5475 server_default.shutdown().unwrap();
5476 client_default.shutdown().unwrap();
5477 }
5478}