1#[cfg(feature = "logging")]
32use crate::log::{debug, trace};
33use crate::{
34 dns_cache::{current_time_millis, DnsCache},
35 dns_parser::{
36 ip_address_rr_type, DnsAddress, DnsEntryExt, DnsIncoming, DnsOutgoing, DnsPointer,
37 DnsRecordBox, DnsRecordExt, DnsSrv, DnsTxt, InterfaceId, RRType, ScopedIp,
38 CLASS_CACHE_FLUSH, CLASS_IN, FLAGS_AA, FLAGS_QR_QUERY, FLAGS_QR_RESPONSE, MAX_MSG_ABSOLUTE,
39 },
40 error::{e_fmt, Error, Result},
41 service_info::{DnsRegistry, MyIntf, Probe, ServiceInfo, ServiceStatus},
42 Receiver, ResolvedService, TxtProperties,
43};
44use flume::{bounded, Sender, TrySendError};
45use if_addrs::{IfAddr, Interface};
46use mio::{event::Source, net::UdpSocket as MioUdpSocket, Interest, Poll, Registry, Token};
47use socket2::Domain;
48use socket_pktinfo::PktInfoUdpSocket;
49use std::{
50 cmp::{self, Reverse},
51 collections::{hash_map::Entry, BinaryHeap, HashMap, HashSet},
52 fmt, io,
53 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
54 str, thread,
55 time::Duration,
56 vec,
57};
58
59pub const SERVICE_NAME_LEN_MAX_DEFAULT: u8 = 15;
63
64pub const IP_CHECK_INTERVAL_IN_SECS_DEFAULT: u32 = 5;
66
67pub const VERIFY_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
70
71pub const MDNS_PORT: u16 = 5353;
73
74const GROUP_ADDR_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251);
75const GROUP_ADDR_V6: Ipv6Addr = Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0xfb);
76const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);
77
78const RESOLVE_WAIT_IN_MILLIS: u64 = 500;
79
80#[derive(Debug)]
82pub enum UnregisterStatus {
83 OK,
85 NotFound,
87}
88
89#[derive(Debug, PartialEq, Clone, Eq)]
91#[non_exhaustive]
92pub enum DaemonStatus {
93 Running,
95
96 Shutdown,
98}
99
100#[derive(Hash, Eq, PartialEq)]
103enum Counter {
104 Register,
105 RegisterResend,
106 Unregister,
107 UnregisterResend,
108 Browse,
109 ResolveHostname,
110 Respond,
111 CacheRefreshPTR,
112 CacheRefreshSrvTxt,
113 CacheRefreshAddr,
114 KnownAnswerSuppression,
115 CachedPTR,
116 CachedSRV,
117 CachedAddr,
118 CachedTxt,
119 CachedNSec,
120 CachedSubtype,
121 DnsRegistryProbe,
122 DnsRegistryActive,
123 DnsRegistryTimer,
124 DnsRegistryNameChange,
125 Timer,
126}
127
128impl fmt::Display for Counter {
129 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
130 match self {
131 Self::Register => write!(f, "register"),
132 Self::RegisterResend => write!(f, "register-resend"),
133 Self::Unregister => write!(f, "unregister"),
134 Self::UnregisterResend => write!(f, "unregister-resend"),
135 Self::Browse => write!(f, "browse"),
136 Self::ResolveHostname => write!(f, "resolve-hostname"),
137 Self::Respond => write!(f, "respond"),
138 Self::CacheRefreshPTR => write!(f, "cache-refresh-ptr"),
139 Self::CacheRefreshSrvTxt => write!(f, "cache-refresh-srv-txt"),
140 Self::CacheRefreshAddr => write!(f, "cache-refresh-addr"),
141 Self::KnownAnswerSuppression => write!(f, "known-answer-suppression"),
142 Self::CachedPTR => write!(f, "cached-ptr"),
143 Self::CachedSRV => write!(f, "cached-srv"),
144 Self::CachedAddr => write!(f, "cached-addr"),
145 Self::CachedTxt => write!(f, "cached-txt"),
146 Self::CachedNSec => write!(f, "cached-nsec"),
147 Self::CachedSubtype => write!(f, "cached-subtype"),
148 Self::DnsRegistryProbe => write!(f, "dns-registry-probe"),
149 Self::DnsRegistryActive => write!(f, "dns-registry-active"),
150 Self::DnsRegistryTimer => write!(f, "dns-registry-timer"),
151 Self::DnsRegistryNameChange => write!(f, "dns-registry-name-change"),
152 Self::Timer => write!(f, "timer"),
153 }
154 }
155}
156
157struct MyUdpSocket {
162 pktinfo: PktInfoUdpSocket,
165
166 mio: MioUdpSocket,
169}
170
171impl MyUdpSocket {
172 pub fn new(pktinfo: PktInfoUdpSocket) -> io::Result<Self> {
173 let std_sock = pktinfo.try_clone_std()?;
174 let mio = MioUdpSocket::from_std(std_sock);
175
176 Ok(Self { pktinfo, mio })
177 }
178}
179
180impl Source for MyUdpSocket {
182 fn register(
183 &mut self,
184 registry: &Registry,
185 token: Token,
186 interests: Interest,
187 ) -> io::Result<()> {
188 self.mio.register(registry, token, interests)
189 }
190
191 fn reregister(
192 &mut self,
193 registry: &Registry,
194 token: Token,
195 interests: Interest,
196 ) -> io::Result<()> {
197 self.mio.reregister(registry, token, interests)
198 }
199
200 fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
201 self.mio.deregister(registry)
202 }
203}
204
205pub type Metrics = HashMap<String, i64>;
208
209const IPV4_SOCK_EVENT_KEY: usize = 4; const IPV6_SOCK_EVENT_KEY: usize = 6; const SIGNAL_SOCK_EVENT_KEY: usize = usize::MAX - 1; #[derive(Clone)]
217pub struct ServiceDaemon {
218 sender: Sender<Command>,
220
221 signal_addr: SocketAddr,
227}
228
229impl ServiceDaemon {
230 pub fn new() -> Result<Self> {
236 Self::new_with_port(MDNS_PORT)
237 }
238
239 pub fn new_with_port(port: u16) -> Result<Self> {
262 let signal_addr = SocketAddrV4::new(LOOPBACK_V4, 0);
265
266 let signal_sock = UdpSocket::bind(signal_addr)
267 .map_err(|e| e_fmt!("failed to create signal_sock for daemon: {}", e))?;
268
269 let signal_addr = signal_sock
271 .local_addr()
272 .map_err(|e| e_fmt!("failed to get signal sock addr: {}", e))?;
273
274 signal_sock
276 .set_nonblocking(true)
277 .map_err(|e| e_fmt!("failed to set nonblocking for signal socket: {}", e))?;
278
279 let poller = Poll::new().map_err(|e| e_fmt!("failed to create mio Poll: {e}"))?;
280
281 let (sender, receiver) = bounded(100);
282
283 let mio_sock = MioUdpSocket::from_std(signal_sock);
285 thread::Builder::new()
286 .name("mDNS_daemon".to_string())
287 .spawn(move || Self::daemon_thread(mio_sock, poller, receiver, port))
288 .map_err(|e| e_fmt!("thread builder failed to spawn: {}", e))?;
289
290 Ok(Self {
291 sender,
292 signal_addr,
293 })
294 }
295
296 fn send_cmd(&self, cmd: Command) -> Result<()> {
299 let cmd_name = cmd.to_string();
300
301 self.sender.try_send(cmd).map_err(|e| match e {
303 TrySendError::Full(_) => Error::Again,
304 e => e_fmt!("flume::channel::send failed: {}", e),
305 })?;
306
307 let addr = SocketAddrV4::new(LOOPBACK_V4, 0);
309 let socket = UdpSocket::bind(addr)
310 .map_err(|e| e_fmt!("Failed to create socket to send signal: {}", e))?;
311 socket
312 .send_to(cmd_name.as_bytes(), self.signal_addr)
313 .map_err(|e| {
314 e_fmt!(
315 "signal socket send_to {} ({}) failed: {}",
316 self.signal_addr,
317 cmd_name,
318 e
319 )
320 })?;
321
322 Ok(())
323 }
324
325 pub fn browse(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
336 check_domain_suffix(service_type)?;
337
338 let (resp_s, resp_r) = bounded(10);
339 self.send_cmd(Command::Browse(service_type.to_string(), 1, false, resp_s))?;
340 Ok(resp_r)
341 }
342
343 pub fn browse_cache(&self, service_type: &str) -> Result<Receiver<ServiceEvent>> {
352 check_domain_suffix(service_type)?;
353
354 let (resp_s, resp_r) = bounded(10);
355 self.send_cmd(Command::Browse(service_type.to_string(), 1, true, resp_s))?;
356 Ok(resp_r)
357 }
358
359 pub fn stop_browse(&self, ty_domain: &str) -> Result<()> {
364 self.send_cmd(Command::StopBrowse(ty_domain.to_string()))
365 }
366
367 pub fn resolve_hostname(
375 &self,
376 hostname: &str,
377 timeout: Option<u64>,
378 ) -> Result<Receiver<HostnameResolutionEvent>> {
379 check_hostname(hostname)?;
380 let (resp_s, resp_r) = bounded(10);
381 self.send_cmd(Command::ResolveHostname(
382 hostname.to_string(),
383 1,
384 resp_s,
385 timeout,
386 ))?;
387 Ok(resp_r)
388 }
389
390 pub fn stop_resolve_hostname(&self, hostname: &str) -> Result<()> {
395 self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
396 }
397
398 pub fn register(&self, service_info: ServiceInfo) -> Result<()> {
406 check_service_name(service_info.get_fullname())?;
407 check_hostname(service_info.get_hostname())?;
408
409 self.send_cmd(Command::Register(service_info.into()))
410 }
411
412 pub fn unregister(&self, fullname: &str) -> Result<Receiver<UnregisterStatus>> {
420 let (resp_s, resp_r) = bounded(1);
421 self.send_cmd(Command::Unregister(fullname.to_lowercase(), resp_s))?;
422 Ok(resp_r)
423 }
424
425 pub fn monitor(&self) -> Result<Receiver<DaemonEvent>> {
429 let (resp_s, resp_r) = bounded(100);
430 self.send_cmd(Command::Monitor(resp_s))?;
431 Ok(resp_r)
432 }
433
434 pub fn shutdown(&self) -> Result<Receiver<DaemonStatus>> {
439 let (resp_s, resp_r) = bounded(1);
440 self.send_cmd(Command::Exit(resp_s))?;
441 Ok(resp_r)
442 }
443
444 pub fn status(&self) -> Result<Receiver<DaemonStatus>> {
450 let (resp_s, resp_r) = bounded(1);
451
452 if self.sender.is_disconnected() {
453 resp_s
454 .send(DaemonStatus::Shutdown)
455 .map_err(|e| e_fmt!("failed to send daemon status to the client: {}", e))?;
456 } else {
457 self.send_cmd(Command::GetStatus(resp_s))?;
458 }
459
460 Ok(resp_r)
461 }
462
463 pub fn get_metrics(&self) -> Result<Receiver<Metrics>> {
468 let (resp_s, resp_r) = bounded(1);
469 self.send_cmd(Command::GetMetrics(resp_s))?;
470 Ok(resp_r)
471 }
472
473 pub fn set_service_name_len_max(&self, len_max: u8) -> Result<()> {
480 const SERVICE_NAME_LEN_MAX_LIMIT: u8 = 30; if len_max > SERVICE_NAME_LEN_MAX_LIMIT {
483 return Err(Error::Msg(format!(
484 "service name length max {len_max} is too large"
485 )));
486 }
487
488 self.send_cmd(Command::SetOption(DaemonOption::ServiceNameLenMax(len_max)))
489 }
490
491 pub fn set_ip_check_interval(&self, interval_in_secs: u32) -> Result<()> {
497 let interval_in_millis = interval_in_secs as u64 * 1000;
498 self.send_cmd(Command::SetOption(DaemonOption::IpCheckInterval(
499 interval_in_millis,
500 )))
501 }
502
503 pub fn get_ip_check_interval(&self) -> Result<u32> {
505 let (resp_s, resp_r) = bounded(1);
506 self.send_cmd(Command::GetOption(resp_s))?;
507
508 let option = resp_r
509 .recv_timeout(Duration::from_secs(10))
510 .map_err(|e| e_fmt!("failed to receive ip check interval: {}", e))?;
511 let ip_check_interval_in_secs = option.ip_check_interval / 1000;
512 Ok(ip_check_interval_in_secs as u32)
513 }
514
515 pub fn enable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
522 let if_kind_vec = if_kind.into_vec();
523 self.send_cmd(Command::SetOption(DaemonOption::EnableInterface(
524 if_kind_vec.kinds,
525 )))
526 }
527
528 pub fn disable_interface(&self, if_kind: impl IntoIfKindVec) -> Result<()> {
535 let if_kind_vec = if_kind.into_vec();
536 self.send_cmd(Command::SetOption(DaemonOption::DisableInterface(
537 if_kind_vec.kinds,
538 )))
539 }
540
541 pub fn accept_unsolicited(&self, accept: bool) -> Result<()> {
552 self.send_cmd(Command::SetOption(DaemonOption::AcceptUnsolicited(accept)))
553 }
554
555 #[cfg(test)]
556 pub fn test_down_interface(&self, ifname: &str) -> Result<()> {
557 self.send_cmd(Command::SetOption(DaemonOption::TestDownInterface(
558 ifname.to_string(),
559 )))
560 }
561
562 #[cfg(test)]
563 pub fn test_up_interface(&self, ifname: &str) -> Result<()> {
564 self.send_cmd(Command::SetOption(DaemonOption::TestUpInterface(
565 ifname.to_string(),
566 )))
567 }
568
569 pub fn set_multicast_loop_v4(&self, on: bool) -> Result<()> {
585 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV4(on)))
586 }
587
588 pub fn set_multicast_loop_v6(&self, on: bool) -> Result<()> {
604 self.send_cmd(Command::SetOption(DaemonOption::MulticastLoopV6(on)))
605 }
606
607 pub fn verify(&self, instance_fullname: String, timeout: Duration) -> Result<()> {
620 self.send_cmd(Command::Verify(instance_fullname, timeout))
621 }
622
623 fn daemon_thread(
624 signal_sock: MioUdpSocket,
625 poller: Poll,
626 receiver: Receiver<Command>,
627 port: u16,
628 ) {
629 let mut zc = Zeroconf::new(signal_sock, poller, port);
630
631 if let Some(cmd) = zc.run(receiver) {
632 match cmd {
633 Command::Exit(resp_s) => {
634 if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
637 debug!("exit: failed to send response of shutdown: {}", e);
638 }
639 }
640 _ => {
641 debug!("Unexpected command: {:?}", cmd);
642 }
643 }
644 }
645 }
646}
647
648fn _new_socket_bind(intf: &Interface, should_loop: bool) -> Result<MyUdpSocket> {
650 let intf_ip = &intf.ip();
653 match intf_ip {
654 IpAddr::V4(ip) => {
655 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), MDNS_PORT);
656 let sock = new_socket(addr.into(), true)?;
657
658 sock.join_multicast_v4(&GROUP_ADDR_V4, ip)
660 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", intf_ip, e))?;
661
662 sock.set_multicast_if_v4(ip)
664 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
665
666 sock.set_multicast_ttl_v4(255)
671 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr {}: {}", ip, e))?;
672
673 if !should_loop {
674 sock.set_multicast_loop_v4(false)
675 .map_err(|e| e_fmt!("failed to set multicast loop v4 for {ip}: {e}"))?;
676 }
677
678 let multicast_addr = SocketAddrV4::new(GROUP_ADDR_V4, MDNS_PORT).into();
680 let test_packets = DnsOutgoing::new(0).to_data_on_wire();
681 for packet in test_packets {
682 sock.send_to(&packet, &multicast_addr)
683 .map_err(|e| e_fmt!("send multicast packet on addr {}: {}", ip, e))?;
684 }
685 MyUdpSocket::new(sock)
686 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
687 }
688 IpAddr::V6(ip) => {
689 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), MDNS_PORT, 0, 0);
690 let sock = new_socket(addr.into(), true)?;
691
692 let if_index = intf.index.unwrap_or(0);
693
694 sock.join_multicast_v6(&GROUP_ADDR_V6, if_index)
696 .map_err(|e| e_fmt!("join multicast group on addr {}: {}", ip, e))?;
697
698 sock.set_multicast_if_v6(if_index)
700 .map_err(|e| e_fmt!("set multicast_if on addr {}: {}", ip, e))?;
701
702 MyUdpSocket::new(sock)
707 .map_err(|e| e_fmt!("failed to create MySocket for interface {}: {e}", intf.name))
708 }
709 }
710}
711
712fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
715 let domain = match addr {
716 SocketAddr::V4(_) => socket2::Domain::IPV4,
717 SocketAddr::V6(_) => socket2::Domain::IPV6,
718 };
719
720 let fd = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;
721
722 fd.set_reuse_address(true)
723 .map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
724 #[cfg(unix)] fd.set_reuse_port(true)
726 .map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;
727
728 if non_block {
729 fd.set_nonblocking(true)
730 .map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
731 }
732
733 fd.bind(&addr.into())
734 .map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;
735
736 trace!("new socket bind to {}", &addr);
737 Ok(fd)
738}
739
740struct ReRun {
742 next_time: u64,
744 command: Command,
745}
746
747#[derive(Debug, Clone)]
751#[non_exhaustive]
752pub enum IfKind {
753 All,
755
756 IPv4,
758
759 IPv6,
761
762 Name(String),
764
765 Addr(IpAddr),
767
768 LoopbackV4,
772
773 LoopbackV6,
775}
776
777impl IfKind {
778 fn matches(&self, intf: &Interface) -> bool {
780 match self {
781 Self::All => true,
782 Self::IPv4 => intf.ip().is_ipv4(),
783 Self::IPv6 => intf.ip().is_ipv6(),
784 Self::Name(ifname) => ifname == &intf.name,
785 Self::Addr(addr) => addr == &intf.ip(),
786 Self::LoopbackV4 => intf.is_loopback() && intf.ip().is_ipv4(),
787 Self::LoopbackV6 => intf.is_loopback() && intf.ip().is_ipv6(),
788 }
789 }
790}
791
792impl From<&str> for IfKind {
795 fn from(val: &str) -> Self {
796 Self::Name(val.to_string())
797 }
798}
799
800impl From<&String> for IfKind {
801 fn from(val: &String) -> Self {
802 Self::Name(val.to_string())
803 }
804}
805
806impl From<IpAddr> for IfKind {
808 fn from(val: IpAddr) -> Self {
809 Self::Addr(val)
810 }
811}
812
813pub struct IfKindVec {
815 kinds: Vec<IfKind>,
816}
817
818pub trait IntoIfKindVec {
820 fn into_vec(self) -> IfKindVec;
821}
822
823impl<T: Into<IfKind>> IntoIfKindVec for T {
824 fn into_vec(self) -> IfKindVec {
825 let if_kind: IfKind = self.into();
826 IfKindVec {
827 kinds: vec![if_kind],
828 }
829 }
830}
831
832impl<T: Into<IfKind>> IntoIfKindVec for Vec<T> {
833 fn into_vec(self) -> IfKindVec {
834 let kinds: Vec<IfKind> = self.into_iter().map(|x| x.into()).collect();
835 IfKindVec { kinds }
836 }
837}
838
839struct IfSelection {
841 if_kind: IfKind,
843
844 selected: bool,
846}
847
848struct Zeroconf {
850 port: u16,
853
854 my_intfs: HashMap<u32, MyIntf>,
856
857 ipv4_sock: Option<MyUdpSocket>,
859
860 ipv6_sock: Option<MyUdpSocket>,
862
863 my_services: HashMap<String, ServiceInfo>,
865
866 cache: DnsCache,
868
869 dns_registry_map: HashMap<u32, DnsRegistry>,
871
872 service_queriers: HashMap<String, Sender<ServiceEvent>>, hostname_resolvers: HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>, retransmissions: Vec<ReRun>,
883
884 counters: Metrics,
885
886 poller: Poll,
888
889 monitors: Vec<Sender<DaemonEvent>>,
891
892 service_name_len_max: u8,
894
895 ip_check_interval: u64,
897
898 if_selections: Vec<IfSelection>,
900
901 signal_sock: MioUdpSocket,
903
904 timers: BinaryHeap<Reverse<u64>>,
910
911 status: DaemonStatus,
912
913 pending_resolves: HashSet<String>,
915
916 resolved: HashSet<String>,
918
919 multicast_loop_v4: bool,
920
921 multicast_loop_v6: bool,
922
923 accept_unsolicited: bool,
924
925 #[cfg(test)]
926 test_down_interfaces: HashSet<String>,
927}
928
929fn join_multicast_group(my_sock: &PktInfoUdpSocket, intf: &Interface) -> Result<()> {
931 let intf_ip = &intf.ip();
932 match intf_ip {
933 IpAddr::V4(ip) => {
934 debug!("join multicast group V4 on {} addr {ip}", intf.name);
936 my_sock
937 .join_multicast_v4(&GROUP_ADDR_V4, ip)
938 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", intf_ip, e))?;
939 }
940 IpAddr::V6(ip) => {
941 let if_index = intf.index.unwrap_or(0);
942 debug!(
944 "join multicast group V6 on {} addr {ip} with index {if_index}",
945 intf.name
946 );
947 my_sock
948 .join_multicast_v6(&GROUP_ADDR_V6, if_index)
949 .map_err(|e| e_fmt!("PKT join multicast group on addr {}: {}", ip, e))?;
950 }
951 }
952 Ok(())
953}
954
955impl Zeroconf {
956 fn new(signal_sock: MioUdpSocket, poller: Poll, port: u16) -> Self {
957 let my_ifaddrs = my_ip_interfaces(true);
959
960 let mut my_intfs = HashMap::new();
964 let mut dns_registry_map = HashMap::new();
965
966 let mut ipv4_sock = None;
969 let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
970 match new_socket(addr.into(), true) {
971 Ok(sock) => {
972 sock.set_multicast_ttl_v4(255)
977 .map_err(|e| e_fmt!("set set_multicast_ttl_v4 on addr: {}", e))
978 .ok();
979
980 ipv4_sock = match MyUdpSocket::new(sock) {
982 Ok(s) => Some(s),
983 Err(e) => {
984 debug!("failed to create IPv4 MyUdpSocket: {e}");
985 None
986 }
987 };
988 }
989 Err(e) => debug!("failed to create IPv4 socket: {e}"),
991 }
992
993 let mut ipv6_sock = None;
994 let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), port, 0, 0);
995 match new_socket(addr.into(), true) {
996 Ok(sock) => {
997 sock.set_multicast_hops_v6(255)
1001 .map_err(|e| e_fmt!("set set_multicast_hops_v6: {}", e))
1002 .ok();
1003
1004 ipv6_sock = match MyUdpSocket::new(sock) {
1006 Ok(s) => Some(s),
1007 Err(e) => {
1008 debug!("failed to create IPv6 MyUdpSocket: {e}");
1009 None
1010 }
1011 };
1012 }
1013 Err(e) => debug!("failed to create IPv6 socket: {e}"),
1014 }
1015
1016 for intf in my_ifaddrs {
1018 let sock_opt = if intf.ip().is_ipv4() {
1019 &ipv4_sock
1020 } else {
1021 &ipv6_sock
1022 };
1023 let Some(sock) = sock_opt else {
1024 debug!(
1025 "no socket available for interface {} with addr {}. Skipped.",
1026 intf.name,
1027 intf.ip()
1028 );
1029 continue;
1030 };
1031
1032 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1033 debug!("failed to join multicast: {}: {e}. Skipped.", &intf.ip());
1034 }
1035
1036 let if_index = intf.index.unwrap_or(0);
1037
1038 dns_registry_map
1040 .entry(if_index)
1041 .or_insert_with(DnsRegistry::new);
1042
1043 my_intfs
1044 .entry(if_index)
1045 .and_modify(|v: &mut MyIntf| {
1046 v.addrs.insert(intf.addr.clone());
1047 })
1048 .or_insert(MyIntf {
1049 name: intf.name.clone(),
1050 index: if_index,
1051 addrs: HashSet::from([intf.addr]),
1052 });
1053 }
1054
1055 let monitors = Vec::new();
1056 let service_name_len_max = SERVICE_NAME_LEN_MAX_DEFAULT;
1057 let ip_check_interval = IP_CHECK_INTERVAL_IN_SECS_DEFAULT as u64 * 1000;
1058
1059 let timers = BinaryHeap::new();
1060
1061 let if_selections = vec![];
1063
1064 let status = DaemonStatus::Running;
1065
1066 Self {
1067 port,
1068 my_intfs,
1069 ipv4_sock,
1070 ipv6_sock,
1071 my_services: HashMap::new(),
1072 cache: DnsCache::new(),
1073 dns_registry_map,
1074 hostname_resolvers: HashMap::new(),
1075 service_queriers: HashMap::new(),
1076 retransmissions: Vec::new(),
1077 counters: HashMap::new(),
1078 poller,
1079 monitors,
1080 service_name_len_max,
1081 ip_check_interval,
1082 if_selections,
1083 signal_sock,
1084 timers,
1085 status,
1086 pending_resolves: HashSet::new(),
1087 resolved: HashSet::new(),
1088 multicast_loop_v4: true,
1089 multicast_loop_v6: true,
1090 accept_unsolicited: false,
1091
1092 #[cfg(test)]
1093 test_down_interfaces: HashSet::new(),
1094 }
1095 }
1096
1097 fn run(&mut self, receiver: Receiver<Command>) -> Option<Command> {
1106 if let Err(e) = self.poller.registry().register(
1108 &mut self.signal_sock,
1109 mio::Token(SIGNAL_SOCK_EVENT_KEY),
1110 mio::Interest::READABLE,
1111 ) {
1112 debug!("failed to add signal socket to the poller: {}", e);
1113 return None;
1114 }
1115
1116 if let Some(sock) = self.ipv4_sock.as_mut() {
1117 if let Err(e) = self.poller.registry().register(
1118 sock,
1119 mio::Token(IPV4_SOCK_EVENT_KEY),
1120 mio::Interest::READABLE,
1121 ) {
1122 debug!("failed to register ipv4 socket: {}", e);
1123 return None;
1124 }
1125 }
1126
1127 if let Some(sock) = self.ipv6_sock.as_mut() {
1128 if let Err(e) = self.poller.registry().register(
1129 sock,
1130 mio::Token(IPV6_SOCK_EVENT_KEY),
1131 mio::Interest::READABLE,
1132 ) {
1133 debug!("failed to register ipv6 socket: {}", e);
1134 return None;
1135 }
1136 }
1137
1138 let mut next_ip_check = if self.ip_check_interval > 0 {
1140 current_time_millis() + self.ip_check_interval
1141 } else {
1142 0
1143 };
1144
1145 if next_ip_check > 0 {
1146 self.add_timer(next_ip_check);
1147 }
1148
1149 let mut events = mio::Events::with_capacity(1024);
1152 loop {
1153 let now = current_time_millis();
1154
1155 let earliest_timer = self.peek_earliest_timer();
1156 let timeout = earliest_timer.map(|timer| {
1157 let millis = if timer > now { timer - now } else { 1 };
1159 Duration::from_millis(millis)
1160 });
1161
1162 events.clear();
1164 match self.poller.poll(&mut events, timeout) {
1165 Ok(_) => self.handle_poller_events(&events),
1166 Err(e) => debug!("failed to select from sockets: {}", e),
1167 }
1168
1169 let now = current_time_millis();
1170
1171 self.pop_timers_till(now);
1173
1174 for hostname in self
1176 .hostname_resolvers
1177 .clone()
1178 .into_iter()
1179 .filter(|(_, (_, timeout))| timeout.map(|t| now >= t).unwrap_or(false))
1180 .map(|(hostname, _)| hostname)
1181 {
1182 trace!("hostname resolver timeout for {}", &hostname);
1183 call_hostname_resolution_listener(
1184 &self.hostname_resolvers,
1185 &hostname,
1186 HostnameResolutionEvent::SearchTimeout(hostname.to_owned()),
1187 );
1188 call_hostname_resolution_listener(
1189 &self.hostname_resolvers,
1190 &hostname,
1191 HostnameResolutionEvent::SearchStopped(hostname.to_owned()),
1192 );
1193 self.hostname_resolvers.remove(&hostname);
1194 }
1195
1196 while let Ok(command) = receiver.try_recv() {
1198 if matches!(command, Command::Exit(_)) {
1199 self.status = DaemonStatus::Shutdown;
1200 return Some(command);
1201 }
1202 self.exec_command(command, false);
1203 }
1204
1205 let mut i = 0;
1207 while i < self.retransmissions.len() {
1208 if now >= self.retransmissions[i].next_time {
1209 let rerun = self.retransmissions.remove(i);
1210 self.exec_command(rerun.command, true);
1211 } else {
1212 i += 1;
1213 }
1214 }
1215
1216 self.refresh_active_services();
1218
1219 let mut query_count = 0;
1221 for (hostname, _sender) in self.hostname_resolvers.iter() {
1222 for (hostname, ip_addr) in
1223 self.cache.refresh_due_hostname_resolutions(hostname).iter()
1224 {
1225 self.send_query(hostname, ip_address_rr_type(&ip_addr.to_ip_addr()));
1226 query_count += 1;
1227 }
1228 }
1229
1230 self.increase_counter(Counter::CacheRefreshAddr, query_count);
1231
1232 let now = current_time_millis();
1234
1235 let expired_services = self.cache.evict_expired_services(now);
1237 if !expired_services.is_empty() {
1238 debug!(
1239 "run: send {} service removal to listeners",
1240 expired_services.len()
1241 );
1242 self.notify_service_removal(expired_services);
1243 }
1244
1245 let expired_addrs = self.cache.evict_expired_addr(now);
1247 for (hostname, addrs) in expired_addrs {
1248 call_hostname_resolution_listener(
1249 &self.hostname_resolvers,
1250 &hostname,
1251 HostnameResolutionEvent::AddressesRemoved(hostname.clone(), addrs),
1252 );
1253 let instances = self.cache.get_instances_on_host(&hostname);
1254 let instance_set: HashSet<String> = instances.into_iter().collect();
1255 self.resolve_updated_instances(&instance_set);
1256 }
1257
1258 self.probing_handler();
1260
1261 if now >= next_ip_check && next_ip_check > 0 {
1263 next_ip_check = now + self.ip_check_interval;
1264 self.add_timer(next_ip_check);
1265
1266 self.check_ip_changes();
1267 }
1268 }
1269 }
1270
1271 fn process_set_option(&mut self, daemon_opt: DaemonOption) {
1272 match daemon_opt {
1273 DaemonOption::ServiceNameLenMax(length) => self.service_name_len_max = length,
1274 DaemonOption::IpCheckInterval(interval) => self.ip_check_interval = interval,
1275 DaemonOption::EnableInterface(if_kind) => self.enable_interface(if_kind),
1276 DaemonOption::DisableInterface(if_kind) => self.disable_interface(if_kind),
1277 DaemonOption::MulticastLoopV4(on) => self.set_multicast_loop_v4(on),
1278 DaemonOption::MulticastLoopV6(on) => self.set_multicast_loop_v6(on),
1279 DaemonOption::AcceptUnsolicited(accept) => self.set_accept_unsolicited(accept),
1280 #[cfg(test)]
1281 DaemonOption::TestDownInterface(ifname) => {
1282 self.test_down_interfaces.insert(ifname);
1283 }
1284 #[cfg(test)]
1285 DaemonOption::TestUpInterface(ifname) => {
1286 self.test_down_interfaces.remove(&ifname);
1287 }
1288 }
1289 }
1290
1291 fn enable_interface(&mut self, kinds: Vec<IfKind>) {
1292 debug!("enable_interface: {:?}", kinds);
1293 for if_kind in kinds {
1294 self.if_selections.push(IfSelection {
1295 if_kind,
1296 selected: true,
1297 });
1298 }
1299
1300 self.apply_intf_selections(my_ip_interfaces(true));
1301 }
1302
1303 fn disable_interface(&mut self, kinds: Vec<IfKind>) {
1304 debug!("disable_interface: {:?}", kinds);
1305 for if_kind in kinds {
1306 self.if_selections.push(IfSelection {
1307 if_kind,
1308 selected: false,
1309 });
1310 }
1311
1312 self.apply_intf_selections(my_ip_interfaces(true));
1313 }
1314
1315 fn set_multicast_loop_v4(&mut self, on: bool) {
1316 let Some(sock) = self.ipv4_sock.as_mut() else {
1317 return;
1318 };
1319 self.multicast_loop_v4 = on;
1320 sock.pktinfo
1321 .set_multicast_loop_v4(on)
1322 .map_err(|e| e_fmt!("failed to set multicast loop v4: {}", e))
1323 .unwrap();
1324 }
1325
1326 fn set_multicast_loop_v6(&mut self, on: bool) {
1327 let Some(sock) = self.ipv6_sock.as_mut() else {
1328 return;
1329 };
1330 self.multicast_loop_v6 = on;
1331 sock.pktinfo
1332 .set_multicast_loop_v6(on)
1333 .map_err(|e| e_fmt!("failed to set multicast loop v6: {}", e))
1334 .unwrap();
1335 }
1336
1337 fn set_accept_unsolicited(&mut self, accept: bool) {
1338 self.accept_unsolicited = accept;
1339 }
1340
1341 fn notify_monitors(&mut self, event: DaemonEvent) {
1342 self.monitors.retain(|sender| {
1344 if let Err(e) = sender.try_send(event.clone()) {
1345 debug!("notify_monitors: try_send: {}", &e);
1346 if matches!(e, TrySendError::Disconnected(_)) {
1347 return false; }
1349 }
1350 true
1351 });
1352 }
1353
1354 fn del_addr_in_my_services(&mut self, addr: &IpAddr) {
1356 for (_, service_info) in self.my_services.iter_mut() {
1357 if service_info.is_addr_auto() {
1358 service_info.remove_ipaddr(addr);
1359 }
1360 }
1361 }
1362
1363 fn add_timer(&mut self, next_time: u64) {
1364 self.timers.push(Reverse(next_time));
1365 }
1366
1367 fn peek_earliest_timer(&self) -> Option<u64> {
1368 self.timers.peek().map(|Reverse(v)| *v)
1369 }
1370
1371 fn _pop_earliest_timer(&mut self) -> Option<u64> {
1372 self.timers.pop().map(|Reverse(v)| v)
1373 }
1374
1375 fn pop_timers_till(&mut self, now: u64) {
1377 while let Some(Reverse(v)) = self.timers.peek() {
1378 if *v > now {
1379 break;
1380 }
1381 self.timers.pop();
1382 }
1383 }
1384
1385 fn selected_intfs(&self, interfaces: Vec<Interface>) -> HashSet<Interface> {
1387 let intf_count = interfaces.len();
1388 let mut intf_selections = vec![true; intf_count];
1389
1390 for selection in self.if_selections.iter() {
1392 for i in 0..intf_count {
1394 if selection.if_kind.matches(&interfaces[i]) {
1395 intf_selections[i] = selection.selected;
1396 }
1397 }
1398 }
1399
1400 let mut selected_addrs = HashSet::new();
1401 for i in 0..intf_count {
1402 if intf_selections[i] {
1403 selected_addrs.insert(interfaces[i].clone());
1404 }
1405 }
1406
1407 selected_addrs
1408 }
1409
1410 fn apply_intf_selections(&mut self, interfaces: Vec<Interface>) {
1415 let intf_count = interfaces.len();
1417 let mut intf_selections = vec![true; intf_count];
1418
1419 for selection in self.if_selections.iter() {
1421 for i in 0..intf_count {
1423 if selection.if_kind.matches(&interfaces[i]) {
1424 intf_selections[i] = selection.selected;
1425 }
1426 }
1427 }
1428
1429 for (idx, intf) in interfaces.into_iter().enumerate() {
1431 if intf_selections[idx] {
1432 self.add_interface(intf);
1434 } else {
1435 self.del_interface(&intf);
1437 }
1438 }
1439 }
1440
1441 fn del_ip(&mut self, ip: IpAddr) {
1442 self.del_addr_in_my_services(&ip);
1443 self.notify_monitors(DaemonEvent::IpDel(ip));
1444 }
1445
1446 fn check_ip_changes(&mut self) {
1448 let my_ifaddrs = my_ip_interfaces(true);
1450
1451 #[cfg(test)]
1452 let my_ifaddrs: Vec<_> = my_ifaddrs
1453 .into_iter()
1454 .filter(|intf| !self.test_down_interfaces.contains(&intf.name))
1455 .collect();
1456
1457 let ifaddrs_map: HashMap<u32, Vec<&IfAddr>> =
1458 my_ifaddrs.iter().fold(HashMap::new(), |mut acc, intf| {
1459 let if_index = intf.index.unwrap_or(0);
1460 acc.entry(if_index).or_default().push(&intf.addr);
1461 acc
1462 });
1463
1464 let mut deleted_intfs = Vec::new();
1465 let mut deleted_ips = Vec::new();
1466
1467 for (if_index, my_intf) in self.my_intfs.iter_mut() {
1468 let mut last_ipv4 = None;
1469 let mut last_ipv6 = None;
1470
1471 if let Some(current_addrs) = ifaddrs_map.get(if_index) {
1472 my_intf.addrs.retain(|addr| {
1473 if current_addrs.contains(&addr) {
1474 true
1475 } else {
1476 match addr.ip() {
1477 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1478 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1479 }
1480 deleted_ips.push(addr.ip());
1481 false
1482 }
1483 });
1484 if my_intf.addrs.is_empty() {
1485 deleted_intfs.push((*if_index, last_ipv4, last_ipv6))
1486 }
1487 } else {
1488 debug!(
1490 "check_ip_changes: interface {} ({}) no longer exists, removing",
1491 my_intf.name, if_index
1492 );
1493 for addr in my_intf.addrs.iter() {
1494 match addr.ip() {
1495 IpAddr::V4(ipv4) => last_ipv4 = Some(ipv4),
1496 IpAddr::V6(ipv6) => last_ipv6 = Some(ipv6),
1497 }
1498 deleted_ips.push(addr.ip())
1499 }
1500 deleted_intfs.push((*if_index, last_ipv4, last_ipv6));
1501 }
1502 }
1503
1504 if !deleted_ips.is_empty() || !deleted_intfs.is_empty() {
1505 debug!(
1506 "check_ip_changes: {} deleted ips {} deleted intfs",
1507 deleted_ips.len(),
1508 deleted_intfs.len()
1509 );
1510 }
1511
1512 for ip in deleted_ips {
1513 self.del_ip(ip);
1514 }
1515
1516 for (if_index, last_ipv4, last_ipv6) in deleted_intfs {
1517 let Some(my_intf) = self.my_intfs.remove(&if_index) else {
1518 continue;
1519 };
1520
1521 if let Some(ipv4) = last_ipv4 {
1522 debug!("leave multicast for {ipv4}");
1523 if let Some(sock) = self.ipv4_sock.as_mut() {
1524 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1525 debug!("leave multicast group for addr {ipv4}: {e}");
1526 }
1527 }
1528 }
1529
1530 if let Some(ipv6) = last_ipv6 {
1531 debug!("leave multicast for {ipv6}");
1532 if let Some(sock) = self.ipv6_sock.as_mut() {
1533 if let Err(e) = sock
1534 .pktinfo
1535 .leave_multicast_v6(&GROUP_ADDR_V6, my_intf.index)
1536 {
1537 debug!("leave multicast group for IPv6: {ipv6}: {e}");
1538 }
1539 }
1540 }
1541
1542 let intf_id = InterfaceId {
1544 name: my_intf.name.to_string(),
1545 index: my_intf.index,
1546 };
1547 let removed_instances = self.cache.remove_records_on_intf(intf_id);
1548 self.notify_service_removal(removed_instances);
1549 }
1550
1551 self.apply_intf_selections(my_ifaddrs);
1553 }
1554
1555 fn del_interface(&mut self, intf: &Interface) {
1556 let if_index = intf.index.unwrap_or(0);
1557 trace!(
1558 "del_interface: {} ({if_index}) addr {}",
1559 intf.name,
1560 intf.ip()
1561 );
1562
1563 let Some(my_intf) = self.my_intfs.get_mut(&if_index) else {
1564 debug!("del_interface: interface {} not found", intf.name);
1565 return;
1566 };
1567
1568 let mut ip_removed = false;
1569
1570 if my_intf.addrs.remove(&intf.addr) {
1571 ip_removed = true;
1572
1573 match intf.addr.ip() {
1574 IpAddr::V4(ipv4) => {
1575 if my_intf.next_ifaddr_v4().is_none() {
1576 if let Some(sock) = self.ipv4_sock.as_mut() {
1577 if let Err(e) = sock.pktinfo.leave_multicast_v4(&GROUP_ADDR_V4, &ipv4) {
1578 debug!("leave multicast group for addr {ipv4}: {e}");
1579 }
1580 }
1581 }
1582 }
1583
1584 IpAddr::V6(ipv6) => {
1585 if my_intf.next_ifaddr_v6().is_none() {
1586 if let Some(sock) = self.ipv6_sock.as_mut() {
1587 if let Err(e) =
1588 sock.pktinfo.leave_multicast_v6(&GROUP_ADDR_V6, if_index)
1589 {
1590 debug!("leave multicast group for addr {ipv6}: {e}");
1591 }
1592 }
1593 }
1594 }
1595 }
1596
1597 if my_intf.addrs.is_empty() {
1598 debug!("del_interface: removing interface {}", intf.name);
1600 self.my_intfs.remove(&if_index);
1601 self.dns_registry_map.remove(&if_index);
1602 self.cache.remove_addrs_on_disabled_intf(if_index);
1603 }
1604 }
1605
1606 if ip_removed {
1607 self.notify_monitors(DaemonEvent::IpDel(intf.ip()));
1609 self.del_addr_in_my_services(&intf.ip());
1611 }
1612 }
1613
1614 fn add_interface(&mut self, intf: Interface) {
1615 let sock_opt = if intf.ip().is_ipv4() {
1616 &self.ipv4_sock
1617 } else {
1618 &self.ipv6_sock
1619 };
1620
1621 let Some(sock) = sock_opt else {
1622 debug!(
1623 "add_interface: no socket available for interface {} with addr {}. Skipped.",
1624 intf.name,
1625 intf.ip()
1626 );
1627 return;
1628 };
1629
1630 let if_index = intf.index.unwrap_or(0);
1631 let mut new_addr = false;
1632
1633 match self.my_intfs.entry(if_index) {
1634 Entry::Occupied(mut entry) => {
1635 let my_intf = entry.get_mut();
1637 if !my_intf.addrs.contains(&intf.addr) {
1638 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1639 debug!("add_interface: socket_config {}: {e}", &intf.name);
1640 }
1641 my_intf.addrs.insert(intf.addr.clone());
1642 new_addr = true;
1643 }
1644 }
1645 Entry::Vacant(entry) => {
1646 if let Err(e) = join_multicast_group(&sock.pktinfo, &intf) {
1647 debug!("add_interface: socket_config {}: {e}. Skipped.", &intf.name);
1648 return;
1649 }
1650
1651 new_addr = true;
1652 let new_intf = MyIntf {
1653 name: intf.name.clone(),
1654 index: if_index,
1655 addrs: HashSet::from([intf.addr.clone()]),
1656 };
1657 entry.insert(new_intf);
1658 }
1659 }
1660
1661 if !new_addr {
1662 trace!("add_interface: interface {} already exists", &intf.name);
1663 return;
1664 }
1665
1666 debug!("add new interface {}: {}", intf.name, intf.ip());
1667
1668 let Some(my_intf) = self.my_intfs.get(&if_index) else {
1669 debug!("add_interface: cannot find if_index {if_index}");
1670 return;
1671 };
1672
1673 let dns_registry = match self.dns_registry_map.get_mut(&if_index) {
1674 Some(registry) => registry,
1675 None => self
1676 .dns_registry_map
1677 .entry(if_index)
1678 .or_insert_with(DnsRegistry::new),
1679 };
1680
1681 for (_, service_info) in self.my_services.iter_mut() {
1682 if service_info.is_addr_auto() {
1683 service_info.insert_ipaddr(&intf);
1684
1685 if announce_service_on_intf(
1686 dns_registry,
1687 service_info,
1688 my_intf,
1689 &sock.pktinfo,
1690 self.port,
1691 ) {
1692 debug!(
1693 "Announce service {} on {}",
1694 service_info.get_fullname(),
1695 intf.ip()
1696 );
1697 service_info.set_status(if_index, ServiceStatus::Announced);
1698 } else {
1699 for timer in dns_registry.new_timers.drain(..) {
1700 self.timers.push(Reverse(timer));
1701 }
1702 service_info.set_status(if_index, ServiceStatus::Probing);
1703 }
1704 }
1705 }
1706
1707 let mut browse_reruns = Vec::new();
1709 let mut i = 0;
1710 while i < self.retransmissions.len() {
1711 if matches!(self.retransmissions[i].command, Command::Browse(..)) {
1712 browse_reruns.push(self.retransmissions.remove(i));
1713 } else {
1714 i += 1;
1715 }
1716 }
1717
1718 for rerun in browse_reruns {
1719 self.exec_command(rerun.command, true);
1720 }
1721
1722 self.notify_monitors(DaemonEvent::IpAdd(intf.ip()));
1724 }
1725
1726 fn register_service(&mut self, mut info: ServiceInfo) {
1735 if let Err(e) = check_service_name_length(info.get_type(), self.service_name_len_max) {
1737 debug!("check_service_name_length: {}", &e);
1738 self.notify_monitors(DaemonEvent::Error(e));
1739 return;
1740 }
1741
1742 if info.is_addr_auto() {
1743 let selected_intfs = self.selected_intfs(my_ip_interfaces(true));
1744 for intf in selected_intfs {
1745 info.insert_ipaddr(&intf);
1746 }
1747 }
1748
1749 debug!("register service {:?}", &info);
1750
1751 let outgoing_addrs = self.send_unsolicited_response(&mut info);
1752 if !outgoing_addrs.is_empty() {
1753 self.notify_monitors(DaemonEvent::Announce(
1754 info.get_fullname().to_string(),
1755 format!("{:?}", &outgoing_addrs),
1756 ));
1757 }
1758
1759 let service_fullname = info.get_fullname().to_lowercase();
1762 self.my_services.insert(service_fullname, info);
1763 }
1764
1765 fn send_unsolicited_response(&mut self, info: &mut ServiceInfo) -> Vec<IpAddr> {
1768 let mut outgoing_addrs = Vec::new();
1769 let mut outgoing_intfs = HashSet::new();
1770
1771 for (if_index, intf) in self.my_intfs.iter() {
1772 let dns_registry = match self.dns_registry_map.get_mut(if_index) {
1773 Some(registry) => registry,
1774 None => self
1775 .dns_registry_map
1776 .entry(*if_index)
1777 .or_insert_with(DnsRegistry::new),
1778 };
1779
1780 let mut announced = false;
1781
1782 if let Some(sock) = self.ipv4_sock.as_mut() {
1784 if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1785 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv4()) {
1786 outgoing_addrs.push(addr.ip());
1787 }
1788 outgoing_intfs.insert(intf.index);
1789
1790 debug!(
1791 "Announce service IPv4 {} on {}",
1792 info.get_fullname(),
1793 intf.name
1794 );
1795 announced = true;
1796 }
1797 }
1798
1799 if let Some(sock) = self.ipv6_sock.as_mut() {
1800 if announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port) {
1801 for addr in intf.addrs.iter().filter(|a| a.ip().is_ipv6()) {
1802 outgoing_addrs.push(addr.ip());
1803 }
1804 outgoing_intfs.insert(intf.index);
1805
1806 debug!(
1807 "Announce service IPv6 {} on {}",
1808 info.get_fullname(),
1809 intf.name
1810 );
1811 announced = true;
1812 }
1813 }
1814
1815 if announced {
1816 info.set_status(intf.index, ServiceStatus::Announced);
1817 } else {
1818 for timer in dns_registry.new_timers.drain(..) {
1819 self.timers.push(Reverse(timer));
1820 }
1821 info.set_status(*if_index, ServiceStatus::Probing);
1822 }
1823 }
1824
1825 let next_time = current_time_millis() + 1000;
1829 for if_index in outgoing_intfs {
1830 self.add_retransmission(
1831 next_time,
1832 Command::RegisterResend(info.get_fullname().to_string(), if_index),
1833 );
1834 }
1835
1836 outgoing_addrs
1837 }
1838
1839 fn probing_handler(&mut self) {
1841 let now = current_time_millis();
1842
1843 for (if_index, intf) in self.my_intfs.iter() {
1844 let Some(dns_registry) = self.dns_registry_map.get_mut(if_index) else {
1845 continue;
1846 };
1847
1848 let (out, expired_probes) = check_probing(dns_registry, &mut self.timers, now);
1849
1850 if !out.questions().is_empty() {
1852 trace!("sending out probing of questions: {:?}", out.questions());
1853 if let Some(sock) = self.ipv4_sock.as_mut() {
1854 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port);
1855 }
1856 if let Some(sock) = self.ipv6_sock.as_mut() {
1857 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port);
1858 }
1859 }
1860
1861 let waiting_services =
1863 handle_expired_probes(expired_probes, &intf.name, dns_registry, &mut self.monitors);
1864
1865 for service_name in waiting_services {
1866 if let Some(info) = self.my_services.get_mut(&service_name.to_lowercase()) {
1868 if info.get_status(*if_index) == ServiceStatus::Announced {
1869 debug!("service {} already announced", info.get_fullname());
1870 continue;
1871 }
1872
1873 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_mut() {
1874 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)
1875 } else {
1876 false
1877 };
1878 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_mut() {
1879 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)
1880 } else {
1881 false
1882 };
1883
1884 if announced_v4 || announced_v6 {
1885 let next_time = now + 1000;
1886 let command =
1887 Command::RegisterResend(info.get_fullname().to_string(), *if_index);
1888 self.retransmissions.push(ReRun { next_time, command });
1889 self.timers.push(Reverse(next_time));
1890
1891 let fullname = match dns_registry.name_changes.get(&service_name) {
1892 Some(new_name) => new_name.to_string(),
1893 None => service_name.to_string(),
1894 };
1895
1896 let mut hostname = info.get_hostname();
1897 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
1898 hostname = new_name;
1899 }
1900
1901 debug!("wake up: announce service {} on {}", fullname, intf.name);
1902 notify_monitors(
1903 &mut self.monitors,
1904 DaemonEvent::Announce(fullname, format!("{}:{}", hostname, &intf.name)),
1905 );
1906
1907 info.set_status(*if_index, ServiceStatus::Announced);
1908 }
1909 }
1910 }
1911 }
1912 }
1913
1914 fn unregister_service(
1915 &self,
1916 info: &ServiceInfo,
1917 intf: &MyIntf,
1918 sock: &PktInfoUdpSocket,
1919 ) -> Vec<u8> {
1920 let is_ipv4 = sock.domain() == Domain::IPV4;
1921
1922 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
1923 out.add_answer_at_time(
1924 DnsPointer::new(
1925 info.get_type(),
1926 RRType::PTR,
1927 CLASS_IN,
1928 0,
1929 info.get_fullname().to_string(),
1930 ),
1931 0,
1932 );
1933
1934 if let Some(sub) = info.get_subtype() {
1935 trace!("Adding subdomain {}", sub);
1936 out.add_answer_at_time(
1937 DnsPointer::new(
1938 sub,
1939 RRType::PTR,
1940 CLASS_IN,
1941 0,
1942 info.get_fullname().to_string(),
1943 ),
1944 0,
1945 );
1946 }
1947
1948 out.add_answer_at_time(
1949 DnsSrv::new(
1950 info.get_fullname(),
1951 CLASS_IN | CLASS_CACHE_FLUSH,
1952 0,
1953 info.get_priority(),
1954 info.get_weight(),
1955 info.get_port(),
1956 info.get_hostname().to_string(),
1957 ),
1958 0,
1959 );
1960 out.add_answer_at_time(
1961 DnsTxt::new(
1962 info.get_fullname(),
1963 CLASS_IN | CLASS_CACHE_FLUSH,
1964 0,
1965 info.generate_txt(),
1966 ),
1967 0,
1968 );
1969
1970 let if_addrs = if is_ipv4 {
1971 info.get_addrs_on_my_intf_v4(intf)
1972 } else {
1973 info.get_addrs_on_my_intf_v6(intf)
1974 };
1975
1976 if if_addrs.is_empty() {
1977 return vec![];
1978 }
1979
1980 for address in if_addrs {
1981 out.add_answer_at_time(
1982 DnsAddress::new(
1983 info.get_hostname(),
1984 ip_address_rr_type(&address),
1985 CLASS_IN | CLASS_CACHE_FLUSH,
1986 0,
1987 address,
1988 intf.into(),
1989 ),
1990 0,
1991 );
1992 }
1993
1994 send_dns_outgoing(&out, intf, sock, self.port)
1996 .into_iter()
1997 .next()
1998 .unwrap_or_default()
1999 }
2000
2001 fn add_hostname_resolver(
2005 &mut self,
2006 hostname: String,
2007 listener: Sender<HostnameResolutionEvent>,
2008 timeout: Option<u64>,
2009 ) {
2010 let real_timeout = timeout.map(|t| current_time_millis() + t);
2011 self.hostname_resolvers
2012 .insert(hostname.to_lowercase(), (listener, real_timeout));
2013 if let Some(t) = real_timeout {
2014 self.add_timer(t);
2015 }
2016 }
2017
2018 fn send_query(&self, name: &str, qtype: RRType) {
2020 self.send_query_vec(&[(name, qtype)]);
2021 }
2022
2023 fn send_query_vec(&self, questions: &[(&str, RRType)]) {
2025 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
2026 let now = current_time_millis();
2027
2028 for (name, qtype) in questions {
2029 out.add_question(name, *qtype);
2030
2031 for record in self.cache.get_known_answers(name, *qtype, now) {
2032 trace!("add known answer: {:?}", record.record);
2040 let mut new_record = record.record.clone();
2041 new_record.get_record_mut().update_ttl(now);
2042 out.add_answer_box(new_record);
2043 }
2044 }
2045
2046 for (_, intf) in self.my_intfs.iter() {
2047 if let Some(sock) = self.ipv4_sock.as_ref() {
2048 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port);
2049 }
2050 if let Some(sock) = self.ipv6_sock.as_ref() {
2051 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port);
2052 }
2053 }
2054 }
2055
2056 fn handle_read(&mut self, event_key: usize) -> bool {
2061 let sock_opt = match event_key {
2062 IPV4_SOCK_EVENT_KEY => &mut self.ipv4_sock,
2063 IPV6_SOCK_EVENT_KEY => &mut self.ipv6_sock,
2064 _ => {
2065 debug!("handle_read: unknown token {}", event_key);
2066 return false;
2067 }
2068 };
2069 let Some(sock) = sock_opt.as_mut() else {
2070 debug!("handle_read: socket not available for token {}", event_key);
2071 return false;
2072 };
2073 let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
2074
2075 let (sz, pktinfo) = match sock.pktinfo.recv(&mut buf) {
2082 Ok(sz) => sz,
2083 Err(e) => {
2084 if e.kind() != std::io::ErrorKind::WouldBlock {
2085 debug!("listening socket read failed: {}", e);
2086 }
2087 return false;
2088 }
2089 };
2090
2091 let pkt_if_index = pktinfo.if_index as u32;
2093 let Some(my_intf) = self.my_intfs.get(&pkt_if_index) else {
2094 debug!(
2095 "handle_read: no interface found for pktinfo if_index: {}",
2096 pktinfo.if_index
2097 );
2098 return true; };
2100
2101 buf.truncate(sz); match DnsIncoming::new(buf, my_intf.into()) {
2104 Ok(msg) => {
2105 if msg.is_query() {
2106 self.handle_query(msg, pkt_if_index, event_key == IPV4_SOCK_EVENT_KEY);
2107 } else if msg.is_response() {
2108 self.handle_response(msg, pkt_if_index);
2109 } else {
2110 debug!("Invalid message: not query and not response");
2111 }
2112 }
2113 Err(e) => debug!("Invalid incoming DNS message: {}", e),
2114 }
2115
2116 true
2117 }
2118
2119 fn query_unresolved(&mut self, instance: &str) -> bool {
2121 if !valid_instance_name(instance) {
2122 trace!("instance name {} not valid", instance);
2123 return false;
2124 }
2125
2126 if let Some(records) = self.cache.get_srv(instance) {
2127 for record in records {
2128 if let Some(srv) = record.record.any().downcast_ref::<DnsSrv>() {
2129 if self.cache.get_addr(srv.host()).is_none() {
2130 self.send_query_vec(&[(srv.host(), RRType::A), (srv.host(), RRType::AAAA)]);
2131 return true;
2132 }
2133 }
2134 }
2135 } else {
2136 self.send_query(instance, RRType::ANY);
2137 return true;
2138 }
2139
2140 false
2141 }
2142
2143 fn query_cache_for_service(
2146 &mut self,
2147 ty_domain: &str,
2148 sender: &Sender<ServiceEvent>,
2149 now: u64,
2150 ) {
2151 let mut resolved: HashSet<String> = HashSet::new();
2152 let mut unresolved: HashSet<String> = HashSet::new();
2153
2154 if let Some(records) = self.cache.get_ptr(ty_domain) {
2155 for record in records.iter().filter(|r| !r.record.expires_soon(now)) {
2156 if let Some(ptr) = record.record.any().downcast_ref::<DnsPointer>() {
2157 let mut new_event = None;
2158 match self.resolve_service_from_cache(ty_domain, ptr.alias()) {
2159 Ok(resolved_service) => {
2160 if resolved_service.is_valid() {
2161 debug!("Resolved service from cache: {}", ptr.alias());
2162 new_event =
2163 Some(ServiceEvent::ServiceResolved(Box::new(resolved_service)));
2164 } else {
2165 debug!("Resolved service is not valid: {}", ptr.alias());
2166 }
2167 }
2168 Err(err) => {
2169 debug!("Error while resolving service from cache: {}", err);
2170 continue;
2171 }
2172 }
2173
2174 match sender.send(ServiceEvent::ServiceFound(
2175 ty_domain.to_string(),
2176 ptr.alias().to_string(),
2177 )) {
2178 Ok(()) => debug!("sent service found {}", ptr.alias()),
2179 Err(e) => {
2180 debug!("failed to send service found: {}", e);
2181 continue;
2182 }
2183 }
2184
2185 if let Some(event) = new_event {
2186 resolved.insert(ptr.alias().to_string());
2187 match sender.send(event) {
2188 Ok(()) => debug!("sent service resolved: {}", ptr.alias()),
2189 Err(e) => debug!("failed to send service resolved: {}", e),
2190 }
2191 } else {
2192 unresolved.insert(ptr.alias().to_string());
2193 }
2194 }
2195 }
2196 }
2197
2198 for instance in resolved.drain() {
2199 self.pending_resolves.remove(&instance);
2200 self.resolved.insert(instance);
2201 }
2202
2203 for instance in unresolved.drain() {
2204 self.add_pending_resolve(instance);
2205 }
2206 }
2207
2208 fn query_cache_for_hostname(
2211 &mut self,
2212 hostname: &str,
2213 sender: Sender<HostnameResolutionEvent>,
2214 ) {
2215 let addresses_map = self.cache.get_addresses_for_host(hostname);
2216 for (name, addresses) in addresses_map {
2217 match sender.send(HostnameResolutionEvent::AddressesFound(name, addresses)) {
2218 Ok(()) => trace!("sent hostname addresses found"),
2219 Err(e) => debug!("failed to send hostname addresses found: {}", e),
2220 }
2221 }
2222 }
2223
2224 fn add_pending_resolve(&mut self, instance: String) {
2225 if !self.pending_resolves.contains(&instance) {
2226 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
2227 self.add_retransmission(next_time, Command::Resolve(instance.clone(), 1));
2228 self.pending_resolves.insert(instance);
2229 }
2230 }
2231
2232 fn resolve_service_from_cache(
2234 &self,
2235 ty_domain: &str,
2236 fullname: &str,
2237 ) -> Result<ResolvedService> {
2238 let now = current_time_millis();
2239 let mut resolved_service = ResolvedService {
2240 ty_domain: ty_domain.to_string(),
2241 sub_ty_domain: None,
2242 fullname: fullname.to_string(),
2243 host: String::new(),
2244 port: 0,
2245 addresses: HashSet::new(),
2246 txt_properties: TxtProperties::new(),
2247 };
2248
2249 if let Some(subtype) = self.cache.get_subtype(fullname) {
2251 trace!(
2252 "ty_domain: {} found subtype {} for instance: {}",
2253 ty_domain,
2254 subtype,
2255 fullname
2256 );
2257 if resolved_service.sub_ty_domain.is_none() {
2258 resolved_service.sub_ty_domain = Some(subtype.to_string());
2259 }
2260 }
2261
2262 if let Some(records) = self.cache.get_srv(fullname) {
2264 if let Some(answer) = records.iter().find(|r| !r.record.expires_soon(now)) {
2265 if let Some(dns_srv) = answer.record.any().downcast_ref::<DnsSrv>() {
2266 resolved_service.host = dns_srv.host().to_string();
2267 resolved_service.port = dns_srv.port();
2268 }
2269 }
2270 }
2271
2272 if let Some(records) = self.cache.get_txt(fullname) {
2274 if let Some(record) = records.iter().find(|r| !r.record.expires_soon(now)) {
2275 if let Some(dns_txt) = record.record.any().downcast_ref::<DnsTxt>() {
2276 resolved_service.txt_properties = dns_txt.text().into();
2277 }
2278 }
2279 }
2280
2281 if let Some(records) = self.cache.get_addr(&resolved_service.host) {
2283 for answer in records.iter() {
2284 if let Some(dns_a) = answer.record.any().downcast_ref::<DnsAddress>() {
2285 if dns_a.expires_soon(now) {
2286 trace!(
2287 "Addr expired or expires soon: {}",
2288 dns_a.address().to_ip_addr()
2289 );
2290 } else {
2291 resolved_service.addresses.insert(dns_a.address());
2292 }
2293 }
2294 }
2295 }
2296
2297 Ok(resolved_service)
2298 }
2299
2300 fn handle_poller_events(&mut self, events: &mio::Events) {
2301 for ev in events.iter() {
2302 trace!("event received with key {:?}", ev.token());
2303 if ev.token().0 == SIGNAL_SOCK_EVENT_KEY {
2304 self.signal_sock_drain();
2306
2307 if let Err(e) = self.poller.registry().reregister(
2308 &mut self.signal_sock,
2309 ev.token(),
2310 mio::Interest::READABLE,
2311 ) {
2312 debug!("failed to modify poller for signal socket: {}", e);
2313 }
2314 continue; }
2316
2317 while self.handle_read(ev.token().0) {}
2319
2320 if ev.token().0 == IPV4_SOCK_EVENT_KEY {
2322 if let Some(sock) = self.ipv4_sock.as_mut() {
2324 if let Err(e) =
2325 self.poller
2326 .registry()
2327 .reregister(sock, ev.token(), mio::Interest::READABLE)
2328 {
2329 debug!("modify poller for IPv4 socket: {}", e);
2330 }
2331 }
2332 } else if ev.token().0 == IPV6_SOCK_EVENT_KEY {
2333 if let Some(sock) = self.ipv6_sock.as_mut() {
2335 if let Err(e) =
2336 self.poller
2337 .registry()
2338 .reregister(sock, ev.token(), mio::Interest::READABLE)
2339 {
2340 debug!("modify poller for IPv6 socket: {}", e);
2341 }
2342 }
2343 }
2344 }
2345 }
2346
2347 fn handle_response(&mut self, mut msg: DnsIncoming, if_index: u32) {
2350 let now = current_time_millis();
2351
2352 let mut record_predicate = |record: &DnsRecordBox| {
2354 if !record.get_record().is_expired(now) {
2355 return true;
2356 }
2357
2358 debug!("record is expired, removing it from cache.");
2359 if self.cache.remove(record) {
2360 if let Some(dns_ptr) = record.any().downcast_ref::<DnsPointer>() {
2362 call_service_listener(
2363 &self.service_queriers,
2364 dns_ptr.get_name(),
2365 ServiceEvent::ServiceRemoved(
2366 dns_ptr.get_name().to_string(),
2367 dns_ptr.alias().to_string(),
2368 ),
2369 );
2370 }
2371 }
2372 false
2373 };
2374 msg.answers_mut().retain(&mut record_predicate);
2375 msg.authorities_mut().retain(&mut record_predicate);
2376 msg.additionals_mut().retain(&mut record_predicate);
2377
2378 self.conflict_handler(&msg, if_index);
2380
2381 let mut is_for_us = true; for answer in msg.answers() {
2388 if answer.get_type() == RRType::PTR {
2389 if self.service_queriers.contains_key(answer.get_name()) {
2390 is_for_us = true;
2391 break; } else {
2393 is_for_us = false;
2394 }
2395 } else if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2396 let answer_lowercase = answer.get_name().to_lowercase();
2398 if self.hostname_resolvers.contains_key(&answer_lowercase) {
2399 is_for_us = true;
2400 break; }
2402 }
2403 }
2404
2405 if self.accept_unsolicited {
2407 is_for_us = true;
2408 }
2409
2410 struct InstanceChange {
2412 ty: RRType, name: String, }
2415
2416 let mut changes = Vec::new();
2424 let mut timers = Vec::new();
2425 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2426 return;
2427 };
2428 for record in msg.all_records() {
2429 match self
2430 .cache
2431 .add_or_update(my_intf, record, &mut timers, is_for_us)
2432 {
2433 Some((dns_record, true)) => {
2434 timers.push(dns_record.record.get_record().get_expire_time());
2435 timers.push(dns_record.record.get_record().get_refresh_time());
2436
2437 let ty = dns_record.record.get_type();
2438 let name = dns_record.record.get_name();
2439
2440 if ty == RRType::PTR && dns_record.record.get_record().get_ttl() > 1 {
2442 if self.service_queriers.contains_key(name) {
2443 timers.push(dns_record.record.get_record().get_refresh_time());
2444 }
2445
2446 if let Some(dns_ptr) = dns_record.record.any().downcast_ref::<DnsPointer>()
2448 {
2449 debug!("calling listener with service found: {name}");
2450 call_service_listener(
2451 &self.service_queriers,
2452 name,
2453 ServiceEvent::ServiceFound(
2454 name.to_string(),
2455 dns_ptr.alias().to_string(),
2456 ),
2457 );
2458 changes.push(InstanceChange {
2459 ty,
2460 name: dns_ptr.alias().to_string(),
2461 });
2462 }
2463 } else {
2464 changes.push(InstanceChange {
2465 ty,
2466 name: name.to_string(),
2467 });
2468 }
2469 }
2470 Some((dns_record, false)) => {
2471 timers.push(dns_record.record.get_record().get_expire_time());
2472 timers.push(dns_record.record.get_record().get_refresh_time());
2473 }
2474 _ => {}
2475 }
2476 }
2477
2478 for t in timers {
2480 self.add_timer(t);
2481 }
2482
2483 for change in changes
2485 .iter()
2486 .filter(|change| change.ty == RRType::A || change.ty == RRType::AAAA)
2487 {
2488 let addr_map = self.cache.get_addresses_for_host(&change.name);
2489 for (name, addresses) in addr_map {
2490 call_hostname_resolution_listener(
2491 &self.hostname_resolvers,
2492 &change.name,
2493 HostnameResolutionEvent::AddressesFound(name, addresses),
2494 )
2495 }
2496 }
2497
2498 let mut updated_instances = HashSet::new();
2500 for update in changes {
2501 match update.ty {
2502 RRType::PTR | RRType::SRV | RRType::TXT => {
2503 updated_instances.insert(update.name);
2504 }
2505 RRType::A | RRType::AAAA => {
2506 let instances = self.cache.get_instances_on_host(&update.name);
2507 updated_instances.extend(instances);
2508 }
2509 _ => {}
2510 }
2511 }
2512
2513 self.resolve_updated_instances(&updated_instances);
2514 }
2515
2516 fn conflict_handler(&mut self, msg: &DnsIncoming, if_index: u32) {
2517 let Some(my_intf) = self.my_intfs.get(&if_index) else {
2518 debug!("handle_response: no intf found for index {if_index}");
2519 return;
2520 };
2521
2522 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2523 return;
2524 };
2525
2526 for answer in msg.answers().iter() {
2527 let mut new_records = Vec::new();
2528
2529 let name = answer.get_name();
2530 let Some(probe) = dns_registry.probing.get_mut(name) else {
2531 continue;
2532 };
2533
2534 if answer.get_type() == RRType::A || answer.get_type() == RRType::AAAA {
2536 if let Some(answer_addr) = answer.any().downcast_ref::<DnsAddress>() {
2537 if answer_addr.interface_id.index != if_index {
2538 debug!(
2539 "conflict handler: answer addr {:?} not in the subnet of intf {}",
2540 answer_addr, my_intf.name
2541 );
2542 continue;
2543 }
2544 }
2545
2546 let any_match = probe.records.iter().any(|r| {
2549 r.get_type() == answer.get_type()
2550 && r.get_class() == answer.get_class()
2551 && r.rrdata_match(answer.as_ref())
2552 });
2553 if any_match {
2554 continue; }
2556 }
2557
2558 probe.records.retain(|record| {
2559 if record.get_type() == answer.get_type()
2560 && record.get_class() == answer.get_class()
2561 && !record.rrdata_match(answer.as_ref())
2562 {
2563 debug!(
2564 "found conflict name: '{name}' record: {}: {} PEER: {}",
2565 record.get_type(),
2566 record.rdata_print(),
2567 answer.rdata_print()
2568 );
2569
2570 let mut new_record = record.clone();
2573 let new_name = match record.get_type() {
2574 RRType::A => hostname_change(name),
2575 RRType::AAAA => hostname_change(name),
2576 _ => name_change(name),
2577 };
2578 new_record.get_record_mut().set_new_name(new_name);
2579 new_records.push(new_record);
2580 return false; }
2582
2583 true
2584 });
2585
2586 let create_time = current_time_millis() + fastrand::u64(0..250);
2593
2594 let waiting_services = probe.waiting_services.clone();
2595
2596 for record in new_records {
2597 if dns_registry.update_hostname(name, record.get_name(), create_time) {
2598 self.timers.push(Reverse(create_time));
2599 }
2600
2601 dns_registry.name_changes.insert(
2603 record.get_record().get_original_name().to_string(),
2604 record.get_name().to_string(),
2605 );
2606
2607 let new_probe = match dns_registry.probing.get_mut(record.get_name()) {
2608 Some(p) => p,
2609 None => {
2610 let new_probe = dns_registry
2611 .probing
2612 .entry(record.get_name().to_string())
2613 .or_insert_with(|| {
2614 debug!("conflict handler: new probe of {}", record.get_name());
2615 Probe::new(create_time)
2616 });
2617 self.timers.push(Reverse(new_probe.next_send));
2618 new_probe
2619 }
2620 };
2621
2622 debug!(
2623 "insert record with new name '{}' {} into probe",
2624 record.get_name(),
2625 record.get_type()
2626 );
2627 new_probe.insert_record(record);
2628
2629 new_probe.waiting_services.extend(waiting_services.clone());
2630 }
2631 }
2632 }
2633
2634 fn resolve_updated_instances(&mut self, updated_instances: &HashSet<String>) {
2641 let mut resolved: HashSet<String> = HashSet::new();
2642 let mut unresolved: HashSet<String> = HashSet::new();
2643 let mut removed_instances = HashMap::new();
2644
2645 let now = current_time_millis();
2646
2647 for (ty_domain, records) in self.cache.all_ptr().iter() {
2648 if !self.service_queriers.contains_key(ty_domain) {
2649 continue;
2651 }
2652
2653 for ptr in records.iter().filter(|r| !r.record.expires_soon(now)) {
2654 let Some(dns_ptr) = ptr.record.any().downcast_ref::<DnsPointer>() else {
2655 continue;
2656 };
2657
2658 let instance = dns_ptr.alias();
2659 if !updated_instances.contains(instance) {
2660 continue;
2661 }
2662
2663 let Ok(resolved_service) = self.resolve_service_from_cache(ty_domain, instance)
2664 else {
2665 continue;
2666 };
2667
2668 debug!("resolve_updated_instances: from cache: {instance}");
2669 if resolved_service.is_valid() {
2670 debug!("call queriers to resolve {instance}");
2671 resolved.insert(instance.to_string());
2672 let event = ServiceEvent::ServiceResolved(Box::new(resolved_service));
2673 call_service_listener(&self.service_queriers, ty_domain, event);
2674 } else {
2675 debug!("Resolved service is not valid: {instance}");
2676 if self.resolved.remove(dns_ptr.alias()) {
2677 removed_instances
2678 .entry(ty_domain.to_string())
2679 .or_insert_with(HashSet::new)
2680 .insert(instance.to_string());
2681 }
2682 unresolved.insert(instance.to_string());
2683 }
2684 }
2685 }
2686
2687 for instance in resolved.drain() {
2688 self.pending_resolves.remove(&instance);
2689 self.resolved.insert(instance);
2690 }
2691
2692 for instance in unresolved.drain() {
2693 self.add_pending_resolve(instance);
2694 }
2695
2696 if !removed_instances.is_empty() {
2697 debug!(
2698 "resolve_updated_instances: removed {}",
2699 &removed_instances.len()
2700 );
2701 self.notify_service_removal(removed_instances);
2702 }
2703 }
2704
2705 fn handle_query(&mut self, msg: DnsIncoming, if_index: u32, is_ipv4: bool) {
2707 let sock_opt = if is_ipv4 {
2708 &self.ipv4_sock
2709 } else {
2710 &self.ipv6_sock
2711 };
2712 let Some(sock) = sock_opt.as_ref() else {
2713 debug!("handle_query: socket not available for intf {}", if_index);
2714 return;
2715 };
2716 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
2717
2718 const META_QUERY: &str = "_services._dns-sd._udp.local.";
2721
2722 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
2723 debug!("missing dns registry for intf {}", if_index);
2724 return;
2725 };
2726
2727 let Some(intf) = self.my_intfs.get(&if_index) else {
2728 debug!("handle_query: no intf found for index {if_index}");
2729 return;
2730 };
2731
2732 for question in msg.questions().iter() {
2733 let qtype = question.entry_type();
2734
2735 if qtype == RRType::PTR {
2736 for service in self.my_services.values() {
2737 if service.get_status(if_index) != ServiceStatus::Announced {
2738 continue;
2739 }
2740
2741 if question.entry_name() == service.get_type()
2742 || service
2743 .get_subtype()
2744 .as_ref()
2745 .is_some_and(|v| v == question.entry_name())
2746 {
2747 add_answer_with_additionals(
2748 &mut out,
2749 &msg,
2750 service,
2751 intf,
2752 dns_registry,
2753 is_ipv4,
2754 );
2755 } else if question.entry_name() == META_QUERY {
2756 let ptr_added = out.add_answer(
2757 &msg,
2758 DnsPointer::new(
2759 question.entry_name(),
2760 RRType::PTR,
2761 CLASS_IN,
2762 service.get_other_ttl(),
2763 service.get_type().to_string(),
2764 ),
2765 );
2766 if !ptr_added {
2767 trace!("answer was not added for meta-query {:?}", &question);
2768 }
2769 }
2770 }
2771 } else {
2772 if qtype == RRType::ANY && msg.num_authorities() > 0 {
2774 let probe_name = question.entry_name();
2775
2776 if let Some(probe) = dns_registry.probing.get_mut(probe_name) {
2777 let now = current_time_millis();
2778
2779 if probe.start_time < now {
2783 let incoming_records: Vec<_> = msg
2784 .authorities()
2785 .iter()
2786 .filter(|r| r.get_name() == probe_name)
2787 .collect();
2788
2789 probe.tiebreaking(&incoming_records, now, probe_name);
2790 }
2791 }
2792 }
2793
2794 if qtype == RRType::A || qtype == RRType::AAAA || qtype == RRType::ANY {
2795 for service in self.my_services.values() {
2796 if service.get_status(if_index) != ServiceStatus::Announced {
2797 continue;
2798 }
2799
2800 let service_hostname =
2801 match dns_registry.name_changes.get(service.get_hostname()) {
2802 Some(new_name) => new_name,
2803 None => service.get_hostname(),
2804 };
2805
2806 if service_hostname.to_lowercase() == question.entry_name().to_lowercase() {
2807 let intf_addrs = if is_ipv4 {
2808 service.get_addrs_on_my_intf_v4(intf)
2809 } else {
2810 service.get_addrs_on_my_intf_v6(intf)
2811 };
2812 if intf_addrs.is_empty()
2813 && (qtype == RRType::A || qtype == RRType::AAAA)
2814 {
2815 let t = match qtype {
2816 RRType::A => "TYPE_A",
2817 RRType::AAAA => "TYPE_AAAA",
2818 _ => "invalid_type",
2819 };
2820 trace!(
2821 "Cannot find valid addrs for {} response on intf {:?}",
2822 t,
2823 &intf
2824 );
2825 return;
2826 }
2827 for address in intf_addrs {
2828 out.add_answer(
2829 &msg,
2830 DnsAddress::new(
2831 service_hostname,
2832 ip_address_rr_type(&address),
2833 CLASS_IN | CLASS_CACHE_FLUSH,
2834 service.get_host_ttl(),
2835 address,
2836 intf.into(),
2837 ),
2838 );
2839 }
2840 }
2841 }
2842 }
2843
2844 let query_name = question.entry_name().to_lowercase();
2845 let service_opt = self
2846 .my_services
2847 .iter()
2848 .find(|(k, _v)| {
2849 let service_name = match dns_registry.name_changes.get(k.as_str()) {
2850 Some(new_name) => new_name,
2851 None => k,
2852 };
2853 service_name == &query_name
2854 })
2855 .map(|(_, v)| v);
2856
2857 let Some(service) = service_opt else {
2858 continue;
2859 };
2860
2861 if service.get_status(if_index) != ServiceStatus::Announced {
2862 continue;
2863 }
2864
2865 let intf_addrs = if is_ipv4 {
2866 service.get_addrs_on_my_intf_v4(intf)
2867 } else {
2868 service.get_addrs_on_my_intf_v6(intf)
2869 };
2870 if intf_addrs.is_empty() {
2871 debug!(
2872 "Cannot find valid addrs for TYPE_SRV response on intf {:?}",
2873 &intf
2874 );
2875 continue;
2876 }
2877
2878 add_answer_of_service(
2879 &mut out,
2880 &msg,
2881 question.entry_name(),
2882 service,
2883 qtype,
2884 intf_addrs,
2885 );
2886 }
2887 }
2888
2889 if out.answers_count() > 0 {
2890 debug!("sending response on intf {}", &intf.name);
2891 out.set_id(msg.id());
2892 send_dns_outgoing(&out, intf, &sock.pktinfo, self.port);
2893
2894 let if_name = intf.name.clone();
2895
2896 self.increase_counter(Counter::Respond, 1);
2897 self.notify_monitors(DaemonEvent::Respond(if_name));
2898 }
2899
2900 self.increase_counter(Counter::KnownAnswerSuppression, out.known_answer_count());
2901 }
2902
2903 fn increase_counter(&mut self, counter: Counter, count: i64) {
2905 let key = counter.to_string();
2906 match self.counters.get_mut(&key) {
2907 Some(v) => *v += count,
2908 None => {
2909 self.counters.insert(key, count);
2910 }
2911 }
2912 }
2913
2914 fn set_counter(&mut self, counter: Counter, count: i64) {
2916 let key = counter.to_string();
2917 self.counters.insert(key, count);
2918 }
2919
2920 fn signal_sock_drain(&self) {
2921 let mut signal_buf = [0; 1024];
2922
2923 while let Ok(sz) = self.signal_sock.recv(&mut signal_buf) {
2925 trace!(
2926 "signal socket recvd: {}",
2927 String::from_utf8_lossy(&signal_buf[0..sz])
2928 );
2929 }
2930 }
2931
2932 fn add_retransmission(&mut self, next_time: u64, command: Command) {
2933 self.retransmissions.push(ReRun { next_time, command });
2934 self.add_timer(next_time);
2935 }
2936
2937 fn notify_service_removal(&self, expired: HashMap<String, HashSet<String>>) {
2940 for (ty_domain, sender) in self.service_queriers.iter() {
2941 if let Some(instances) = expired.get(ty_domain) {
2942 for instance_name in instances {
2943 let event = ServiceEvent::ServiceRemoved(
2944 ty_domain.to_string(),
2945 instance_name.to_string(),
2946 );
2947 match sender.send(event) {
2948 Ok(()) => debug!("notify_service_removal: sent ServiceRemoved to listener of {ty_domain}: {instance_name}"),
2949 Err(e) => debug!("Failed to send event: {}", e),
2950 }
2951 }
2952 }
2953 }
2954 }
2955
2956 fn exec_command(&mut self, command: Command, repeating: bool) {
2960 trace!("exec_command: {:?} repeating: {}", &command, repeating);
2961 match command {
2962 Command::Browse(ty, next_delay, cache_only, listener) => {
2963 self.exec_command_browse(repeating, ty, next_delay, cache_only, listener);
2964 }
2965
2966 Command::ResolveHostname(hostname, next_delay, listener, timeout) => {
2967 self.exec_command_resolve_hostname(
2968 repeating, hostname, next_delay, listener, timeout,
2969 );
2970 }
2971
2972 Command::Register(service_info) => {
2973 self.register_service(*service_info);
2974 self.increase_counter(Counter::Register, 1);
2975 }
2976
2977 Command::RegisterResend(fullname, intf) => {
2978 trace!("register-resend service: {fullname} on {}", &intf);
2979 self.exec_command_register_resend(fullname, intf);
2980 }
2981
2982 Command::Unregister(fullname, resp_s) => {
2983 trace!("unregister service {} repeat {}", &fullname, &repeating);
2984 self.exec_command_unregister(repeating, fullname, resp_s);
2985 }
2986
2987 Command::UnregisterResend(packet, if_index, is_ipv4) => {
2988 self.exec_command_unregister_resend(packet, if_index, is_ipv4);
2989 }
2990
2991 Command::StopBrowse(ty_domain) => self.exec_command_stop_browse(ty_domain),
2992
2993 Command::StopResolveHostname(hostname) => {
2994 self.exec_command_stop_resolve_hostname(hostname.to_lowercase())
2995 }
2996
2997 Command::Resolve(instance, try_count) => self.exec_command_resolve(instance, try_count),
2998
2999 Command::GetMetrics(resp_s) => self.exec_command_get_metrics(resp_s),
3000
3001 Command::GetStatus(resp_s) => match resp_s.send(self.status.clone()) {
3002 Ok(()) => trace!("Sent status to the client"),
3003 Err(e) => debug!("Failed to send status: {}", e),
3004 },
3005
3006 Command::Monitor(resp_s) => {
3007 self.monitors.push(resp_s);
3008 }
3009
3010 Command::SetOption(daemon_opt) => {
3011 self.process_set_option(daemon_opt);
3012 }
3013
3014 Command::GetOption(resp_s) => {
3015 let val = DaemonOptionVal {
3016 _service_name_len_max: self.service_name_len_max,
3017 ip_check_interval: self.ip_check_interval,
3018 };
3019 if let Err(e) = resp_s.send(val) {
3020 debug!("Failed to send options: {}", e);
3021 }
3022 }
3023
3024 Command::Verify(instance_fullname, timeout) => {
3025 self.exec_command_verify(instance_fullname, timeout, repeating);
3026 }
3027
3028 _ => {
3029 debug!("unexpected command: {:?}", &command);
3030 }
3031 }
3032 }
3033
3034 fn exec_command_get_metrics(&mut self, resp_s: Sender<HashMap<String, i64>>) {
3035 self.set_counter(Counter::CachedPTR, self.cache.ptr_count() as i64);
3036 self.set_counter(Counter::CachedSRV, self.cache.srv_count() as i64);
3037 self.set_counter(Counter::CachedAddr, self.cache.addr_count() as i64);
3038 self.set_counter(Counter::CachedTxt, self.cache.txt_count() as i64);
3039 self.set_counter(Counter::CachedNSec, self.cache.nsec_count() as i64);
3040 self.set_counter(Counter::CachedSubtype, self.cache.subtype_count() as i64);
3041 self.set_counter(Counter::Timer, self.timers.len() as i64);
3042
3043 let dns_registry_probe_count: usize = self
3044 .dns_registry_map
3045 .values()
3046 .map(|r| r.probing.len())
3047 .sum();
3048 self.set_counter(Counter::DnsRegistryProbe, dns_registry_probe_count as i64);
3049
3050 let dns_registry_active_count: usize = self
3051 .dns_registry_map
3052 .values()
3053 .map(|r| r.active.values().map(|a| a.len()).sum::<usize>())
3054 .sum();
3055 self.set_counter(Counter::DnsRegistryActive, dns_registry_active_count as i64);
3056
3057 let dns_registry_timer_count: usize = self
3058 .dns_registry_map
3059 .values()
3060 .map(|r| r.new_timers.len())
3061 .sum();
3062 self.set_counter(Counter::DnsRegistryTimer, dns_registry_timer_count as i64);
3063
3064 let dns_registry_name_change_count: usize = self
3065 .dns_registry_map
3066 .values()
3067 .map(|r| r.name_changes.len())
3068 .sum();
3069 self.set_counter(
3070 Counter::DnsRegistryNameChange,
3071 dns_registry_name_change_count as i64,
3072 );
3073
3074 if let Err(e) = resp_s.send(self.counters.clone()) {
3076 debug!("Failed to send metrics: {}", e);
3077 }
3078 }
3079
3080 fn exec_command_browse(
3081 &mut self,
3082 repeating: bool,
3083 ty: String,
3084 next_delay: u32,
3085 cache_only: bool,
3086 listener: Sender<ServiceEvent>,
3087 ) {
3088 let pretty_addrs: Vec<String> = self
3089 .my_intfs
3090 .iter()
3091 .map(|(if_index, itf)| format!("{} ({if_index})", itf.name))
3092 .collect();
3093
3094 if let Err(e) = listener.send(ServiceEvent::SearchStarted(format!(
3095 "{ty} on {} interfaces [{}]",
3096 pretty_addrs.len(),
3097 pretty_addrs.join(", ")
3098 ))) {
3099 debug!(
3100 "Failed to send SearchStarted({})(repeating:{}): {}",
3101 &ty, repeating, e
3102 );
3103 return;
3104 }
3105
3106 let now = current_time_millis();
3107 if !repeating {
3108 self.service_queriers.insert(ty.clone(), listener.clone());
3112
3113 self.query_cache_for_service(&ty, &listener, now);
3115 }
3116
3117 if cache_only {
3118 match listener.send(ServiceEvent::SearchStopped(ty.clone())) {
3120 Ok(()) => debug!("SearchStopped sent for {}", &ty),
3121 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3122 }
3123 return;
3124 }
3125
3126 self.send_query(&ty, RRType::PTR);
3127 self.increase_counter(Counter::Browse, 1);
3128
3129 let next_time = now + (next_delay * 1000) as u64;
3130 let max_delay = 60 * 60;
3131 let delay = cmp::min(next_delay * 2, max_delay);
3132 self.add_retransmission(next_time, Command::Browse(ty, delay, cache_only, listener));
3133 }
3134
3135 fn exec_command_resolve_hostname(
3136 &mut self,
3137 repeating: bool,
3138 hostname: String,
3139 next_delay: u32,
3140 listener: Sender<HostnameResolutionEvent>,
3141 timeout: Option<u64>,
3142 ) {
3143 let addr_list: Vec<_> = self.my_intfs.iter().collect();
3144 if let Err(e) = listener.send(HostnameResolutionEvent::SearchStarted(format!(
3145 "{} on addrs {:?}",
3146 &hostname, &addr_list
3147 ))) {
3148 debug!(
3149 "Failed to send ResolveStarted({})(repeating:{}): {}",
3150 &hostname, repeating, e
3151 );
3152 return;
3153 }
3154 if !repeating {
3155 self.add_hostname_resolver(hostname.to_owned(), listener.clone(), timeout);
3156 self.query_cache_for_hostname(&hostname, listener.clone());
3158 }
3159
3160 self.send_query_vec(&[(&hostname, RRType::A), (&hostname, RRType::AAAA)]);
3161 self.increase_counter(Counter::ResolveHostname, 1);
3162
3163 let now = current_time_millis();
3164 let next_time = now + u64::from(next_delay) * 1000;
3165 let max_delay = 60 * 60;
3166 let delay = cmp::min(next_delay * 2, max_delay);
3167
3168 if self
3170 .hostname_resolvers
3171 .get(&hostname)
3172 .and_then(|(_sender, timeout)| *timeout)
3173 .map(|timeout| next_time < timeout)
3174 .unwrap_or(true)
3175 {
3176 self.add_retransmission(
3177 next_time,
3178 Command::ResolveHostname(hostname, delay, listener, None),
3179 );
3180 }
3181 }
3182
3183 fn exec_command_resolve(&mut self, instance: String, try_count: u16) {
3184 let pending_query = self.query_unresolved(&instance);
3185 let max_try = 3;
3186 if pending_query && try_count < max_try {
3187 let next_time = current_time_millis() + RESOLVE_WAIT_IN_MILLIS;
3190 self.add_retransmission(next_time, Command::Resolve(instance, try_count + 1));
3191 }
3192 }
3193
3194 fn exec_command_unregister(
3195 &mut self,
3196 repeating: bool,
3197 fullname: String,
3198 resp_s: Sender<UnregisterStatus>,
3199 ) {
3200 let response = match self.my_services.remove_entry(&fullname) {
3201 None => {
3202 debug!("unregister: cannot find such service {}", &fullname);
3203 UnregisterStatus::NotFound
3204 }
3205 Some((_k, info)) => {
3206 let mut timers = Vec::new();
3207
3208 for (if_index, intf) in self.my_intfs.iter() {
3209 if let Some(sock) = self.ipv4_sock.as_ref() {
3210 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3211 if !repeating && !packet.is_empty() {
3213 let next_time = current_time_millis() + 120;
3214 self.retransmissions.push(ReRun {
3215 next_time,
3216 command: Command::UnregisterResend(packet, *if_index, true),
3217 });
3218 timers.push(next_time);
3219 }
3220 }
3221
3222 if let Some(sock) = self.ipv6_sock.as_ref() {
3224 let packet = self.unregister_service(&info, intf, &sock.pktinfo);
3225 if !repeating && !packet.is_empty() {
3226 let next_time = current_time_millis() + 120;
3227 self.retransmissions.push(ReRun {
3228 next_time,
3229 command: Command::UnregisterResend(packet, *if_index, false),
3230 });
3231 timers.push(next_time);
3232 }
3233 }
3234 }
3235
3236 for t in timers {
3237 self.add_timer(t);
3238 }
3239
3240 self.increase_counter(Counter::Unregister, 1);
3241 UnregisterStatus::OK
3242 }
3243 };
3244 if let Err(e) = resp_s.send(response) {
3245 debug!("unregister: failed to send response: {}", e);
3246 }
3247 }
3248
3249 fn exec_command_unregister_resend(&mut self, packet: Vec<u8>, if_index: u32, is_ipv4: bool) {
3250 let Some(intf) = self.my_intfs.get(&if_index) else {
3251 return;
3252 };
3253 let sock_opt = if is_ipv4 {
3254 &self.ipv4_sock
3255 } else {
3256 &self.ipv6_sock
3257 };
3258 let Some(sock) = sock_opt else {
3259 return;
3260 };
3261
3262 let if_addr = if is_ipv4 {
3263 match intf.next_ifaddr_v4() {
3264 Some(addr) => addr,
3265 None => return,
3266 }
3267 } else {
3268 match intf.next_ifaddr_v6() {
3269 Some(addr) => addr,
3270 None => return,
3271 }
3272 };
3273
3274 debug!("UnregisterResend from {:?}", if_addr);
3275 multicast_on_intf(
3276 &packet[..],
3277 &intf.name,
3278 intf.index,
3279 if_addr,
3280 &sock.pktinfo,
3281 self.port,
3282 );
3283
3284 self.increase_counter(Counter::UnregisterResend, 1);
3285 }
3286
3287 fn exec_command_stop_browse(&mut self, ty_domain: String) {
3288 match self.service_queriers.remove_entry(&ty_domain) {
3289 None => debug!("StopBrowse: cannot find querier for {}", &ty_domain),
3290 Some((ty, sender)) => {
3291 trace!("StopBrowse: removed queryer for {}", &ty);
3293 let mut i = 0;
3294 while i < self.retransmissions.len() {
3295 if let Command::Browse(t, _, _, _) = &self.retransmissions[i].command {
3296 if t == &ty {
3297 self.retransmissions.remove(i);
3298 trace!("StopBrowse: removed retransmission for {}", &ty);
3299 continue;
3300 }
3301 }
3302 i += 1;
3303 }
3304
3305 self.cache.remove_service_type(&ty_domain);
3307
3308 match sender.send(ServiceEvent::SearchStopped(ty_domain)) {
3310 Ok(()) => trace!("Sent SearchStopped to the listener"),
3311 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3312 }
3313 }
3314 }
3315 }
3316
3317 fn exec_command_stop_resolve_hostname(&mut self, hostname: String) {
3318 if let Some((host, (sender, _timeout))) = self.hostname_resolvers.remove_entry(&hostname) {
3319 trace!("StopResolve: removed queryer for {}", &host);
3321 let mut i = 0;
3322 while i < self.retransmissions.len() {
3323 if let Command::Resolve(t, _) = &self.retransmissions[i].command {
3324 if t == &host {
3325 self.retransmissions.remove(i);
3326 trace!("StopResolve: removed retransmission for {}", &host);
3327 continue;
3328 }
3329 }
3330 i += 1;
3331 }
3332
3333 match sender.send(HostnameResolutionEvent::SearchStopped(hostname)) {
3335 Ok(()) => trace!("Sent SearchStopped to the listener"),
3336 Err(e) => debug!("Failed to send SearchStopped: {}", e),
3337 }
3338 }
3339 }
3340
3341 fn exec_command_register_resend(&mut self, fullname: String, if_index: u32) {
3342 let Some(info) = self.my_services.get_mut(&fullname) else {
3343 trace!("announce: cannot find such service {}", &fullname);
3344 return;
3345 };
3346
3347 let Some(dns_registry) = self.dns_registry_map.get_mut(&if_index) else {
3348 return;
3349 };
3350
3351 let Some(intf) = self.my_intfs.get(&if_index) else {
3352 return;
3353 };
3354
3355 let announced_v4 = if let Some(sock) = self.ipv4_sock.as_ref() {
3356 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)
3357 } else {
3358 false
3359 };
3360 let announced_v6 = if let Some(sock) = self.ipv6_sock.as_ref() {
3361 announce_service_on_intf(dns_registry, info, intf, &sock.pktinfo, self.port)
3362 } else {
3363 false
3364 };
3365
3366 if announced_v4 || announced_v6 {
3367 let mut hostname = info.get_hostname();
3368 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
3369 hostname = new_name;
3370 }
3371 let service_name = match dns_registry.name_changes.get(&fullname) {
3372 Some(new_name) => new_name.to_string(),
3373 None => fullname,
3374 };
3375
3376 debug!("resend: announce service {service_name} on {}", intf.name);
3377
3378 notify_monitors(
3379 &mut self.monitors,
3380 DaemonEvent::Announce(service_name, format!("{}:{}", hostname, &intf.name)),
3381 );
3382 info.set_status(if_index, ServiceStatus::Announced);
3383 } else {
3384 debug!("register-resend should not fail");
3385 }
3386
3387 self.increase_counter(Counter::RegisterResend, 1);
3388 }
3389
3390 fn exec_command_verify(&mut self, instance: String, timeout: Duration, repeating: bool) {
3391 let now = current_time_millis();
3401 let expire_at = if repeating {
3402 None
3403 } else {
3404 Some(now + timeout.as_millis() as u64)
3405 };
3406
3407 let record_vec = self.cache.service_verify_queries(&instance, expire_at);
3409
3410 if !record_vec.is_empty() {
3411 let query_vec: Vec<(&str, RRType)> = record_vec
3412 .iter()
3413 .map(|(record, rr_type)| (record.as_str(), *rr_type))
3414 .collect();
3415 self.send_query_vec(&query_vec);
3416
3417 if let Some(new_expire) = expire_at {
3418 self.add_timer(new_expire); self.add_retransmission(now + 1000, Command::Verify(instance, timeout));
3422 }
3423 }
3424 }
3425
3426 fn refresh_active_services(&mut self) {
3428 let mut query_ptr_count = 0;
3429 let mut query_srv_count = 0;
3430 let mut new_timers = HashSet::new();
3431 let mut query_addr_count = 0;
3432
3433 for (ty_domain, _sender) in self.service_queriers.iter() {
3434 let refreshed_timers = self.cache.refresh_due_ptr(ty_domain);
3435 if !refreshed_timers.is_empty() {
3436 trace!("sending refresh query for PTR: {}", ty_domain);
3437 self.send_query(ty_domain, RRType::PTR);
3438 query_ptr_count += 1;
3439 new_timers.extend(refreshed_timers);
3440 }
3441
3442 let (instances, timers) = self.cache.refresh_due_srv_txt(ty_domain);
3443 for (instance, types) in instances {
3444 trace!("sending refresh query for: {}", &instance);
3445 let query_vec = types
3446 .into_iter()
3447 .map(|ty| (instance.as_str(), ty))
3448 .collect::<Vec<_>>();
3449 self.send_query_vec(&query_vec);
3450 query_srv_count += 1;
3451 }
3452 new_timers.extend(timers);
3453 let (hostnames, timers) = self.cache.refresh_due_hosts(ty_domain);
3454 for hostname in hostnames.iter() {
3455 trace!("sending refresh queries for A and AAAA: {}", hostname);
3456 self.send_query_vec(&[(hostname, RRType::A), (hostname, RRType::AAAA)]);
3457 query_addr_count += 2;
3458 }
3459 new_timers.extend(timers);
3460 }
3461
3462 for timer in new_timers {
3463 self.add_timer(timer);
3464 }
3465
3466 self.increase_counter(Counter::CacheRefreshPTR, query_ptr_count);
3467 self.increase_counter(Counter::CacheRefreshSrvTxt, query_srv_count);
3468 self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
3469 }
3470}
3471
3472fn add_answer_of_service(
3474 out: &mut DnsOutgoing,
3475 msg: &DnsIncoming,
3476 entry_name: &str,
3477 service: &ServiceInfo,
3478 qtype: RRType,
3479 intf_addrs: Vec<IpAddr>,
3480) {
3481 if qtype == RRType::SRV || qtype == RRType::ANY {
3482 out.add_answer(
3483 msg,
3484 DnsSrv::new(
3485 entry_name,
3486 CLASS_IN | CLASS_CACHE_FLUSH,
3487 service.get_host_ttl(),
3488 service.get_priority(),
3489 service.get_weight(),
3490 service.get_port(),
3491 service.get_hostname().to_string(),
3492 ),
3493 );
3494 }
3495
3496 if qtype == RRType::TXT || qtype == RRType::ANY {
3497 out.add_answer(
3498 msg,
3499 DnsTxt::new(
3500 entry_name,
3501 CLASS_IN | CLASS_CACHE_FLUSH,
3502 service.get_other_ttl(),
3503 service.generate_txt(),
3504 ),
3505 );
3506 }
3507
3508 if qtype == RRType::SRV {
3509 for address in intf_addrs {
3510 out.add_additional_answer(DnsAddress::new(
3511 service.get_hostname(),
3512 ip_address_rr_type(&address),
3513 CLASS_IN | CLASS_CACHE_FLUSH,
3514 service.get_host_ttl(),
3515 address,
3516 InterfaceId::default(),
3517 ));
3518 }
3519 }
3520}
3521
3522#[derive(Clone, Debug)]
3525#[non_exhaustive]
3526pub enum ServiceEvent {
3527 SearchStarted(String),
3529
3530 ServiceFound(String, String),
3532
3533 ServiceResolved(Box<ResolvedService>),
3535
3536 ServiceRemoved(String, String),
3538
3539 SearchStopped(String),
3541}
3542
3543#[derive(Clone, Debug)]
3546#[non_exhaustive]
3547pub enum HostnameResolutionEvent {
3548 SearchStarted(String),
3550 AddressesFound(String, HashSet<ScopedIp>),
3552 AddressesRemoved(String, HashSet<ScopedIp>),
3554 SearchTimeout(String),
3556 SearchStopped(String),
3558}
3559
3560#[derive(Clone, Debug)]
3563#[non_exhaustive]
3564pub enum DaemonEvent {
3565 Announce(String, String),
3567
3568 Error(Error),
3570
3571 IpAdd(IpAddr),
3573
3574 IpDel(IpAddr),
3576
3577 NameChange(DnsNameChange),
3580
3581 Respond(String),
3583}
3584
3585#[derive(Clone, Debug)]
3588pub struct DnsNameChange {
3589 pub original: String,
3591
3592 pub new_name: String,
3602
3603 pub rr_type: RRType,
3605
3606 pub intf_name: String,
3608}
3609
3610#[derive(Debug)]
3612enum Command {
3613 Browse(String, u32, bool, Sender<ServiceEvent>),
3615
3616 ResolveHostname(String, u32, Sender<HostnameResolutionEvent>, Option<u64>), Register(Box<ServiceInfo>),
3621
3622 Unregister(String, Sender<UnregisterStatus>), RegisterResend(String, u32), UnregisterResend(Vec<u8>, u32, bool), StopBrowse(String), StopResolveHostname(String), Resolve(String, u16), GetMetrics(Sender<Metrics>),
3643
3644 GetStatus(Sender<DaemonStatus>),
3646
3647 Monitor(Sender<DaemonEvent>),
3649
3650 SetOption(DaemonOption),
3651
3652 GetOption(Sender<DaemonOptionVal>),
3653
3654 Verify(String, Duration),
3659
3660 Exit(Sender<DaemonStatus>),
3661}
3662
3663impl fmt::Display for Command {
3664 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3665 match self {
3666 Self::Browse(_, _, _, _) => write!(f, "Command Browse"),
3667 Self::ResolveHostname(_, _, _, _) => write!(f, "Command ResolveHostname"),
3668 Self::Exit(_) => write!(f, "Command Exit"),
3669 Self::GetStatus(_) => write!(f, "Command GetStatus"),
3670 Self::GetMetrics(_) => write!(f, "Command GetMetrics"),
3671 Self::Monitor(_) => write!(f, "Command Monitor"),
3672 Self::Register(_) => write!(f, "Command Register"),
3673 Self::RegisterResend(_, _) => write!(f, "Command RegisterResend"),
3674 Self::SetOption(_) => write!(f, "Command SetOption"),
3675 Self::GetOption(_) => write!(f, "Command GetOption"),
3676 Self::StopBrowse(_) => write!(f, "Command StopBrowse"),
3677 Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
3678 Self::Unregister(_, _) => write!(f, "Command Unregister"),
3679 Self::UnregisterResend(_, _, _) => write!(f, "Command UnregisterResend"),
3680 Self::Resolve(_, _) => write!(f, "Command Resolve"),
3681 Self::Verify(_, _) => write!(f, "Command VerifyResource"),
3682 }
3683 }
3684}
3685
3686struct DaemonOptionVal {
3687 _service_name_len_max: u8,
3688 ip_check_interval: u64,
3689}
3690
3691#[derive(Debug)]
3692enum DaemonOption {
3693 ServiceNameLenMax(u8),
3694 IpCheckInterval(u64),
3695 EnableInterface(Vec<IfKind>),
3696 DisableInterface(Vec<IfKind>),
3697 MulticastLoopV4(bool),
3698 MulticastLoopV6(bool),
3699 AcceptUnsolicited(bool),
3700 #[cfg(test)]
3701 TestDownInterface(String),
3702 #[cfg(test)]
3703 TestUpInterface(String),
3704}
3705
3706const DOMAIN_LEN: usize = "._tcp.local.".len();
3708
3709fn check_service_name_length(ty_domain: &str, limit: u8) -> Result<()> {
3711 if ty_domain.len() <= DOMAIN_LEN + 1 {
3712 return Err(e_fmt!("Service type name cannot be empty: {}", ty_domain));
3714 }
3715
3716 let service_name_len = ty_domain.len() - DOMAIN_LEN - 1; if service_name_len > limit as usize {
3718 return Err(e_fmt!("Service name length must be <= {} bytes", limit));
3719 }
3720 Ok(())
3721}
3722
3723fn check_domain_suffix(name: &str) -> Result<()> {
3725 if !(name.ends_with("._tcp.local.") || name.ends_with("._udp.local.")) {
3726 return Err(e_fmt!(
3727 "mDNS service {} must end with '._tcp.local.' or '._udp.local.'",
3728 name
3729 ));
3730 }
3731
3732 Ok(())
3733}
3734
3735fn check_service_name(fullname: &str) -> Result<()> {
3743 check_domain_suffix(fullname)?;
3744
3745 let remaining: Vec<&str> = fullname[..fullname.len() - DOMAIN_LEN].split('.').collect();
3746 let name = remaining.last().ok_or_else(|| e_fmt!("No service name"))?;
3747
3748 if &name[0..1] != "_" {
3749 return Err(e_fmt!("Service name must start with '_'"));
3750 }
3751
3752 let name = &name[1..];
3753
3754 if name.contains("--") {
3755 return Err(e_fmt!("Service name must not contain '--'"));
3756 }
3757
3758 if name.starts_with('-') || name.ends_with('-') {
3759 return Err(e_fmt!("Service name (%s) may not start or end with '-'"));
3760 }
3761
3762 let ascii_count = name.chars().filter(|c| c.is_ascii_alphabetic()).count();
3763 if ascii_count < 1 {
3764 return Err(e_fmt!(
3765 "Service name must contain at least one letter (eg: 'A-Za-z')"
3766 ));
3767 }
3768
3769 Ok(())
3770}
3771
3772fn check_hostname(hostname: &str) -> Result<()> {
3774 if !hostname.ends_with(".local.") {
3775 return Err(e_fmt!("Hostname must end with '.local.': {hostname}"));
3776 }
3777
3778 if hostname == ".local." {
3779 return Err(e_fmt!(
3780 "The part of the hostname before '.local.' cannot be empty"
3781 ));
3782 }
3783
3784 if hostname.len() > 255 {
3785 return Err(e_fmt!("Hostname length must be <= 255 bytes"));
3786 }
3787
3788 Ok(())
3789}
3790
3791fn call_service_listener(
3792 listeners_map: &HashMap<String, Sender<ServiceEvent>>,
3793 ty_domain: &str,
3794 event: ServiceEvent,
3795) {
3796 if let Some(listener) = listeners_map.get(ty_domain) {
3797 match listener.send(event) {
3798 Ok(()) => trace!("Sent event to listener successfully"),
3799 Err(e) => debug!("Failed to send event: {}", e),
3800 }
3801 }
3802}
3803
3804fn call_hostname_resolution_listener(
3805 listeners_map: &HashMap<String, (Sender<HostnameResolutionEvent>, Option<u64>)>,
3806 hostname: &str,
3807 event: HostnameResolutionEvent,
3808) {
3809 let hostname_lower = hostname.to_lowercase();
3810 if let Some(listener) = listeners_map.get(&hostname_lower).map(|(l, _)| l) {
3811 match listener.send(event) {
3812 Ok(()) => trace!("Sent event to listener successfully"),
3813 Err(e) => debug!("Failed to send event: {}", e),
3814 }
3815 }
3816}
3817
3818fn my_ip_interfaces(with_loopback: bool) -> Vec<Interface> {
3822 if_addrs::get_if_addrs()
3823 .unwrap_or_default()
3824 .into_iter()
3825 .filter(|i| i.is_oper_up() && (!i.is_loopback() || with_loopback))
3826 .collect()
3827}
3828
3829fn send_dns_outgoing(
3832 out: &DnsOutgoing,
3833 my_intf: &MyIntf,
3834 sock: &PktInfoUdpSocket,
3835 port: u16,
3836) -> Vec<Vec<u8>> {
3837 let if_name = &my_intf.name;
3838
3839 let if_addr = if sock.domain() == Domain::IPV4 {
3840 match my_intf.next_ifaddr_v4() {
3841 Some(addr) => addr,
3842 None => return vec![],
3843 }
3844 } else {
3845 match my_intf.next_ifaddr_v6() {
3846 Some(addr) => addr,
3847 None => return vec![],
3848 }
3849 };
3850
3851 send_dns_outgoing_impl(out, if_name, my_intf.index, if_addr, sock, port)
3852}
3853
3854fn send_dns_outgoing_impl(
3856 out: &DnsOutgoing,
3857 if_name: &str,
3858 if_index: u32,
3859 if_addr: &IfAddr,
3860 sock: &PktInfoUdpSocket,
3861 port: u16,
3862) -> Vec<Vec<u8>> {
3863 let qtype = if out.is_query() {
3864 "query"
3865 } else {
3866 if out.answers_count() == 0 && out.additionals().is_empty() {
3867 return vec![]; }
3869 "response"
3870 };
3871 trace!(
3872 "send {}: {} questions {} answers {} authorities {} additional",
3873 qtype,
3874 out.questions().len(),
3875 out.answers_count(),
3876 out.authorities().len(),
3877 out.additionals().len()
3878 );
3879
3880 match if_addr.ip() {
3881 IpAddr::V4(ipv4) => {
3882 if let Err(e) = sock.set_multicast_if_v4(&ipv4) {
3883 debug!(
3884 "send_dns_outgoing: failed to set multicast interface for IPv4 {}: {}",
3885 ipv4, e
3886 );
3887 return vec![]; }
3889 }
3890 IpAddr::V6(ipv6) => {
3891 if let Err(e) = sock.set_multicast_if_v6(if_index) {
3892 debug!(
3893 "send_dns_outgoing: failed to set multicast interface for IPv6 {}: {}",
3894 ipv6, e
3895 );
3896 return vec![]; }
3898 }
3899 }
3900
3901 let packet_list = out.to_data_on_wire();
3902 for packet in packet_list.iter() {
3903 multicast_on_intf(packet, if_name, if_index, if_addr, sock, port);
3904 }
3905 packet_list
3906}
3907
3908fn multicast_on_intf(
3910 packet: &[u8],
3911 if_name: &str,
3912 if_index: u32,
3913 if_addr: &IfAddr,
3914 socket: &PktInfoUdpSocket,
3915 port: u16,
3916) {
3917 if packet.len() > MAX_MSG_ABSOLUTE {
3918 debug!("Drop over-sized packet ({})", packet.len());
3919 return;
3920 }
3921
3922 let addr: SocketAddr = match if_addr {
3923 if_addrs::IfAddr::V4(_) => SocketAddrV4::new(GROUP_ADDR_V4, port).into(),
3924 if_addrs::IfAddr::V6(_) => {
3925 let mut sock = SocketAddrV6::new(GROUP_ADDR_V6, port, 0, 0);
3926 sock.set_scope_id(if_index); sock.into()
3928 }
3929 };
3930
3931 let sock_addr = addr.into();
3933 match socket.send_to(packet, &sock_addr) {
3934 Ok(sz) => trace!(
3935 "sent out {} bytes on interface {} (idx {}) addr {}",
3936 sz,
3937 if_name,
3938 if_index,
3939 if_addr.ip()
3940 ),
3941 Err(e) => trace!("Failed to send to {} via {:?}: {}", addr, &if_name, e),
3942 }
3943}
3944
3945fn valid_instance_name(name: &str) -> bool {
3949 name.split('.').count() >= 5
3950}
3951
3952fn notify_monitors(monitors: &mut Vec<Sender<DaemonEvent>>, event: DaemonEvent) {
3953 monitors.retain(|sender| {
3954 if let Err(e) = sender.try_send(event.clone()) {
3955 debug!("notify_monitors: try_send: {}", &e);
3956 if matches!(e, TrySendError::Disconnected(_)) {
3957 return false; }
3959 }
3960 true
3961 });
3962}
3963
3964fn prepare_announce(
3967 info: &ServiceInfo,
3968 intf: &MyIntf,
3969 dns_registry: &mut DnsRegistry,
3970 is_ipv4: bool,
3971) -> Option<DnsOutgoing> {
3972 let intf_addrs = if is_ipv4 {
3973 info.get_addrs_on_my_intf_v4(intf)
3974 } else {
3975 info.get_addrs_on_my_intf_v6(intf)
3976 };
3977
3978 if intf_addrs.is_empty() {
3979 debug!(
3980 "prepare_announce (ipv4: {is_ipv4}): no valid addrs on interface {}",
3981 &intf.name
3982 );
3983 return None;
3984 }
3985
3986 let service_fullname = match dns_registry.name_changes.get(info.get_fullname()) {
3988 Some(new_name) => new_name,
3989 None => info.get_fullname(),
3990 };
3991
3992 debug!(
3993 "prepare to announce service {service_fullname} on {:?}",
3994 &intf_addrs
3995 );
3996
3997 let mut probing_count = 0;
3998 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
3999 let create_time = current_time_millis() + fastrand::u64(0..250);
4000
4001 out.add_answer_at_time(
4002 DnsPointer::new(
4003 info.get_type(),
4004 RRType::PTR,
4005 CLASS_IN,
4006 info.get_other_ttl(),
4007 service_fullname.to_string(),
4008 ),
4009 0,
4010 );
4011
4012 if let Some(sub) = info.get_subtype() {
4013 trace!("Adding subdomain {}", sub);
4014 out.add_answer_at_time(
4015 DnsPointer::new(
4016 sub,
4017 RRType::PTR,
4018 CLASS_IN,
4019 info.get_other_ttl(),
4020 service_fullname.to_string(),
4021 ),
4022 0,
4023 );
4024 }
4025
4026 let hostname = match dns_registry.name_changes.get(info.get_hostname()) {
4028 Some(new_name) => new_name.to_string(),
4029 None => info.get_hostname().to_string(),
4030 };
4031
4032 let mut srv = DnsSrv::new(
4033 info.get_fullname(),
4034 CLASS_IN | CLASS_CACHE_FLUSH,
4035 info.get_host_ttl(),
4036 info.get_priority(),
4037 info.get_weight(),
4038 info.get_port(),
4039 hostname,
4040 );
4041
4042 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4043 srv.get_record_mut().set_new_name(new_name.to_string());
4044 }
4045
4046 if !info.requires_probe()
4047 || dns_registry.is_probing_done(&srv, info.get_fullname(), create_time)
4048 {
4049 out.add_answer_at_time(srv, 0);
4050 } else {
4051 probing_count += 1;
4052 }
4053
4054 let mut txt = DnsTxt::new(
4057 info.get_fullname(),
4058 CLASS_IN | CLASS_CACHE_FLUSH,
4059 info.get_other_ttl(),
4060 info.generate_txt(),
4061 );
4062
4063 if let Some(new_name) = dns_registry.name_changes.get(info.get_fullname()) {
4064 txt.get_record_mut().set_new_name(new_name.to_string());
4065 }
4066
4067 if !info.requires_probe()
4068 || dns_registry.is_probing_done(&txt, info.get_fullname(), create_time)
4069 {
4070 out.add_answer_at_time(txt, 0);
4071 } else {
4072 probing_count += 1;
4073 }
4074
4075 let hostname = info.get_hostname();
4078 for address in intf_addrs {
4079 let mut dns_addr = DnsAddress::new(
4080 hostname,
4081 ip_address_rr_type(&address),
4082 CLASS_IN | CLASS_CACHE_FLUSH,
4083 info.get_host_ttl(),
4084 address,
4085 intf.into(),
4086 );
4087
4088 if let Some(new_name) = dns_registry.name_changes.get(hostname) {
4089 dns_addr.get_record_mut().set_new_name(new_name.to_string());
4090 }
4091
4092 if !info.requires_probe()
4093 || dns_registry.is_probing_done(&dns_addr, info.get_fullname(), create_time)
4094 {
4095 out.add_answer_at_time(dns_addr, 0);
4096 } else {
4097 probing_count += 1;
4098 }
4099 }
4100
4101 if probing_count > 0 {
4102 return None;
4103 }
4104
4105 Some(out)
4106}
4107
4108fn announce_service_on_intf(
4111 dns_registry: &mut DnsRegistry,
4112 info: &ServiceInfo,
4113 intf: &MyIntf,
4114 sock: &PktInfoUdpSocket,
4115 port: u16,
4116) -> bool {
4117 let is_ipv4 = sock.domain() == Domain::IPV4;
4118 if let Some(out) = prepare_announce(info, intf, dns_registry, is_ipv4) {
4119 send_dns_outgoing(&out, intf, sock, port);
4120 return true;
4121 }
4122
4123 false
4124}
4125
4126fn name_change(original: &str) -> String {
4134 let mut parts: Vec<_> = original.split('.').collect();
4135 let Some(first_part) = parts.get_mut(0) else {
4136 return format!("{original} (2)");
4137 };
4138
4139 let mut new_name = format!("{first_part} (2)");
4140
4141 if let Some(paren_pos) = first_part.rfind(" (") {
4143 if let Some(end_paren) = first_part[paren_pos..].find(')') {
4145 let absolute_end_pos = paren_pos + end_paren;
4146 if absolute_end_pos == first_part.len() - 1 {
4148 let num_start = paren_pos + 2; if let Ok(number) = first_part[num_start..absolute_end_pos].parse::<u32>() {
4151 let base_name = &first_part[..paren_pos];
4152 new_name = format!("{} ({})", base_name, number + 1)
4153 }
4154 }
4155 }
4156 }
4157
4158 *first_part = &new_name;
4159 parts.join(".")
4160}
4161
4162fn hostname_change(original: &str) -> String {
4170 let mut parts: Vec<_> = original.split('.').collect();
4171 let Some(first_part) = parts.get_mut(0) else {
4172 return format!("{original}-2");
4173 };
4174
4175 let mut new_name = format!("{first_part}-2");
4176
4177 if let Some(hyphen_pos) = first_part.rfind('-') {
4179 if let Ok(number) = first_part[hyphen_pos + 1..].parse::<u32>() {
4181 let base_name = &first_part[..hyphen_pos];
4182 new_name = format!("{}-{}", base_name, number + 1);
4183 }
4184 }
4185
4186 *first_part = &new_name;
4187 parts.join(".")
4188}
4189
4190fn add_answer_with_additionals(
4191 out: &mut DnsOutgoing,
4192 msg: &DnsIncoming,
4193 service: &ServiceInfo,
4194 intf: &MyIntf,
4195 dns_registry: &DnsRegistry,
4196 is_ipv4: bool,
4197) {
4198 let intf_addrs = if is_ipv4 {
4199 service.get_addrs_on_my_intf_v4(intf)
4200 } else {
4201 service.get_addrs_on_my_intf_v6(intf)
4202 };
4203 if intf_addrs.is_empty() {
4204 trace!("No addrs on LAN of intf {:?}", intf);
4205 return;
4206 }
4207
4208 let service_fullname = match dns_registry.name_changes.get(service.get_fullname()) {
4210 Some(new_name) => new_name,
4211 None => service.get_fullname(),
4212 };
4213
4214 let hostname = match dns_registry.name_changes.get(service.get_hostname()) {
4215 Some(new_name) => new_name,
4216 None => service.get_hostname(),
4217 };
4218
4219 let ptr_added = out.add_answer(
4220 msg,
4221 DnsPointer::new(
4222 service.get_type(),
4223 RRType::PTR,
4224 CLASS_IN,
4225 service.get_other_ttl(),
4226 service_fullname.to_string(),
4227 ),
4228 );
4229
4230 if !ptr_added {
4231 trace!("answer was not added for msg {:?}", msg);
4232 return;
4233 }
4234
4235 if let Some(sub) = service.get_subtype() {
4236 trace!("Adding subdomain {}", sub);
4237 out.add_additional_answer(DnsPointer::new(
4238 sub,
4239 RRType::PTR,
4240 CLASS_IN,
4241 service.get_other_ttl(),
4242 service_fullname.to_string(),
4243 ));
4244 }
4245
4246 out.add_additional_answer(DnsSrv::new(
4249 service_fullname,
4250 CLASS_IN | CLASS_CACHE_FLUSH,
4251 service.get_host_ttl(),
4252 service.get_priority(),
4253 service.get_weight(),
4254 service.get_port(),
4255 hostname.to_string(),
4256 ));
4257
4258 out.add_additional_answer(DnsTxt::new(
4259 service_fullname,
4260 CLASS_IN | CLASS_CACHE_FLUSH,
4261 service.get_other_ttl(),
4262 service.generate_txt(),
4263 ));
4264
4265 for address in intf_addrs {
4266 out.add_additional_answer(DnsAddress::new(
4267 hostname,
4268 ip_address_rr_type(&address),
4269 CLASS_IN | CLASS_CACHE_FLUSH,
4270 service.get_host_ttl(),
4271 address,
4272 intf.into(),
4273 ));
4274 }
4275}
4276
4277fn check_probing(
4280 dns_registry: &mut DnsRegistry,
4281 timers: &mut BinaryHeap<Reverse<u64>>,
4282 now: u64,
4283) -> (DnsOutgoing, Vec<String>) {
4284 let mut expired_probes = Vec::new();
4285 let mut out = DnsOutgoing::new(FLAGS_QR_QUERY);
4286
4287 for (name, probe) in dns_registry.probing.iter_mut() {
4288 if now >= probe.next_send {
4289 if probe.expired(now) {
4290 expired_probes.push(name.clone());
4292 } else {
4293 out.add_question(name, RRType::ANY);
4294
4295 for record in probe.records.iter() {
4303 out.add_authority(record.clone());
4304 }
4305
4306 probe.update_next_send(now);
4307
4308 timers.push(Reverse(probe.next_send));
4310 }
4311 }
4312 }
4313
4314 (out, expired_probes)
4315}
4316
4317fn handle_expired_probes(
4322 expired_probes: Vec<String>,
4323 intf_name: &str,
4324 dns_registry: &mut DnsRegistry,
4325 monitors: &mut Vec<Sender<DaemonEvent>>,
4326) -> HashSet<String> {
4327 let mut waiting_services = HashSet::new();
4328
4329 for name in expired_probes {
4330 let Some(probe) = dns_registry.probing.remove(&name) else {
4331 continue;
4332 };
4333
4334 for record in probe.records.iter() {
4336 if let Some(new_name) = record.get_record().get_new_name() {
4337 dns_registry
4338 .name_changes
4339 .insert(name.clone(), new_name.to_string());
4340
4341 let event = DnsNameChange {
4342 original: record.get_record().get_original_name().to_string(),
4343 new_name: new_name.to_string(),
4344 rr_type: record.get_type(),
4345 intf_name: intf_name.to_string(),
4346 };
4347 debug!("Name change event: {:?}", &event);
4348 notify_monitors(monitors, DaemonEvent::NameChange(event));
4349 }
4350 }
4351
4352 debug!(
4354 "probe of '{name}' finished: move {} records to active. ({} waiting services)",
4355 probe.records.len(),
4356 probe.waiting_services.len(),
4357 );
4358
4359 if !probe.records.is_empty() {
4361 match dns_registry.active.get_mut(&name) {
4362 Some(records) => {
4363 records.extend(probe.records);
4364 }
4365 None => {
4366 dns_registry.active.insert(name, probe.records);
4367 }
4368 }
4369
4370 waiting_services.extend(probe.waiting_services);
4371 }
4372 }
4373
4374 waiting_services
4375}
4376
4377#[cfg(test)]
4378mod tests {
4379 use super::{
4380 _new_socket_bind, check_domain_suffix, check_service_name_length, hostname_change,
4381 my_ip_interfaces, name_change, send_dns_outgoing_impl, valid_instance_name,
4382 HostnameResolutionEvent, ServiceDaemon, ServiceEvent, ServiceInfo, MDNS_PORT,
4383 };
4384 use crate::{
4385 dns_parser::{
4386 DnsIncoming, DnsOutgoing, DnsPointer, InterfaceId, RRType, ScopedIp, CLASS_IN,
4387 FLAGS_AA, FLAGS_QR_RESPONSE,
4388 },
4389 service_daemon::{add_answer_of_service, check_hostname},
4390 };
4391 use std::time::{Duration, SystemTime};
4392 use test_log::test;
4393
4394 #[test]
4395 fn test_instance_name() {
4396 assert!(valid_instance_name("my-laser._printer._tcp.local."));
4397 assert!(valid_instance_name("my-laser.._printer._tcp.local."));
4398 assert!(!valid_instance_name("_printer._tcp.local."));
4399 }
4400
4401 #[test]
4402 fn test_check_service_name_length() {
4403 let result = check_service_name_length("_tcp", 100);
4404 assert!(result.is_err());
4405 if let Err(e) = result {
4406 println!("{}", e);
4407 }
4408 }
4409
4410 #[test]
4411 fn test_check_hostname() {
4412 for hostname in &[
4414 "my_host.local.",
4415 &("A".repeat(255 - ".local.".len()) + ".local."),
4416 ] {
4417 let result = check_hostname(hostname);
4418 assert!(result.is_ok());
4419 }
4420
4421 for hostname in &[
4423 "my_host.local",
4424 ".local.",
4425 &("A".repeat(256 - ".local.".len()) + ".local."),
4426 ] {
4427 let result = check_hostname(hostname);
4428 assert!(result.is_err());
4429 if let Err(e) = result {
4430 println!("{}", e);
4431 }
4432 }
4433 }
4434
4435 #[test]
4436 fn test_check_domain_suffix() {
4437 assert!(check_domain_suffix("_missing_dot._tcp.local").is_err());
4438 assert!(check_domain_suffix("_missing_bar.tcp.local.").is_err());
4439 assert!(check_domain_suffix("_mis_spell._tpp.local.").is_err());
4440 assert!(check_domain_suffix("_mis_spell._upp.local.").is_err());
4441 assert!(check_domain_suffix("_has_dot._tcp.local.").is_ok());
4442 assert!(check_domain_suffix("_goodname._udp.local.").is_ok());
4443 }
4444
4445 #[test]
4446 fn test_service_with_temporarily_invalidated_ptr() {
4447 let d = ServiceDaemon::new().expect("Failed to create daemon");
4449
4450 let service = "_test_inval_ptr._udp.local.";
4451 let host_name = "my_host_tmp_invalidated_ptr.local.";
4452 let intfs: Vec<_> = my_ip_interfaces(false);
4453 let intf_ips: Vec<_> = intfs.iter().map(|intf| intf.ip()).collect();
4454 let port = 5201;
4455 let my_service =
4456 ServiceInfo::new(service, "my_instance", host_name, &intf_ips[..], port, None)
4457 .expect("invalid service info")
4458 .enable_addr_auto();
4459 let result = d.register(my_service.clone());
4460 assert!(result.is_ok());
4461
4462 let browse_chan = d.browse(service).unwrap();
4464 let timeout = Duration::from_secs(2);
4465 let mut resolved = false;
4466
4467 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4468 match event {
4469 ServiceEvent::ServiceResolved(info) => {
4470 resolved = true;
4471 println!("Resolved a service of {}", &info.fullname);
4472 break;
4473 }
4474 e => {
4475 println!("Received event {:?}", e);
4476 }
4477 }
4478 }
4479
4480 assert!(resolved);
4481
4482 println!("Stopping browse of {}", service);
4483 d.stop_browse(service).unwrap();
4486
4487 let mut stopped = false;
4492 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4493 match event {
4494 ServiceEvent::SearchStopped(_) => {
4495 stopped = true;
4496 println!("Stopped browsing service");
4497 break;
4498 }
4499 e => {
4503 println!("Received event {:?}", e);
4504 }
4505 }
4506 }
4507
4508 assert!(stopped);
4509
4510 let invalidate_ptr_packet = DnsPointer::new(
4512 my_service.get_type(),
4513 RRType::PTR,
4514 CLASS_IN,
4515 0,
4516 my_service.get_fullname().to_string(),
4517 );
4518
4519 let mut packet_buffer = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4520 packet_buffer.add_additional_answer(invalidate_ptr_packet);
4521
4522 for intf in intfs {
4523 let sock = _new_socket_bind(&intf, true).unwrap();
4524 send_dns_outgoing_impl(
4525 &packet_buffer,
4526 &intf.name,
4527 intf.index.unwrap_or(0),
4528 &intf.addr,
4529 &sock.pktinfo,
4530 MDNS_PORT,
4531 );
4532 }
4533
4534 println!(
4535 "Sent PTR record invalidation. Starting second browse for {}",
4536 service
4537 );
4538
4539 let browse_chan = d.browse(service).unwrap();
4541
4542 resolved = false;
4543 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4544 match event {
4545 ServiceEvent::ServiceResolved(info) => {
4546 resolved = true;
4547 println!("Resolved a service of {}", &info.fullname);
4548 break;
4549 }
4550 e => {
4551 println!("Received event {:?}", e);
4552 }
4553 }
4554 }
4555
4556 assert!(resolved);
4557 d.shutdown().unwrap();
4558 }
4559
4560 #[test]
4561 fn test_expired_srv() {
4562 let service_type = "_expired-srv._udp.local.";
4564 let instance = "test_instance";
4565 let host_name = "expired_srv_host.local.";
4566 let mut my_service = ServiceInfo::new(service_type, instance, host_name, "", 5023, None)
4567 .unwrap()
4568 .enable_addr_auto();
4569 let new_ttl = 3; my_service._set_host_ttl(new_ttl);
4574
4575 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4577 let result = mdns_server.register(my_service);
4578 assert!(result.is_ok());
4579
4580 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4581 let browse_chan = mdns_client.browse(service_type).unwrap();
4582 let timeout = Duration::from_secs(2);
4583 let mut resolved = false;
4584
4585 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4586 if let ServiceEvent::ServiceResolved(info) = event {
4587 resolved = true;
4588 println!("Resolved a service of {}", &info.fullname);
4589 break;
4590 }
4591 }
4592
4593 assert!(resolved);
4594
4595 mdns_server.shutdown().unwrap();
4597
4598 let expire_timeout = Duration::from_secs(new_ttl as u64);
4600 while let Ok(event) = browse_chan.recv_timeout(expire_timeout) {
4601 if let ServiceEvent::ServiceRemoved(service_type, full_name) = event {
4602 println!("Service removed: {}: {}", &service_type, &full_name);
4603 break;
4604 }
4605 }
4606 }
4607
4608 #[test]
4609 fn test_hostname_resolution_address_removed() {
4610 let server = ServiceDaemon::new().expect("Failed to create server");
4612 let hostname = "addr_remove_host._tcp.local.";
4613 let service_ip_addr: ScopedIp = my_ip_interfaces(false)
4614 .iter()
4615 .find(|iface| iface.ip().is_ipv4())
4616 .map(|iface| iface.ip().into())
4617 .unwrap();
4618
4619 let mut my_service = ServiceInfo::new(
4620 "_host_res_test._tcp.local.",
4621 "my_instance",
4622 hostname,
4623 service_ip_addr.to_ip_addr(),
4624 1234,
4625 None,
4626 )
4627 .expect("invalid service info");
4628
4629 let addr_ttl = 2;
4631 my_service._set_host_ttl(addr_ttl); server.register(my_service).unwrap();
4634
4635 let client = ServiceDaemon::new().expect("Failed to create client");
4637 let event_receiver = client.resolve_hostname(hostname, None).unwrap();
4638 let resolved = loop {
4639 match event_receiver.recv() {
4640 Ok(HostnameResolutionEvent::AddressesFound(found_hostname, addresses)) => {
4641 assert!(found_hostname == hostname);
4642 assert!(addresses.contains(&service_ip_addr));
4643 println!("address found: {:?}", &addresses);
4644 break true;
4645 }
4646 Ok(HostnameResolutionEvent::SearchStopped(_)) => break false,
4647 Ok(_event) => {}
4648 Err(_) => break false,
4649 }
4650 };
4651
4652 assert!(resolved);
4653
4654 server.shutdown().unwrap();
4656
4657 let timeout = Duration::from_secs(addr_ttl as u64 + 1);
4659 let removed = loop {
4660 match event_receiver.recv_timeout(timeout) {
4661 Ok(HostnameResolutionEvent::AddressesRemoved(removed_host, addresses)) => {
4662 assert!(removed_host == hostname);
4663 assert!(addresses.contains(&service_ip_addr));
4664
4665 println!(
4666 "address removed: hostname: {} addresses: {:?}",
4667 &hostname, &addresses
4668 );
4669 break true;
4670 }
4671 Ok(_event) => {}
4672 Err(_) => {
4673 break false;
4674 }
4675 }
4676 };
4677
4678 assert!(removed);
4679
4680 client.shutdown().unwrap();
4681 }
4682
4683 #[test]
4684 fn test_refresh_ptr() {
4685 let service_type = "_refresh-ptr._udp.local.";
4687 let instance = "test_instance";
4688 let host_name = "refresh_ptr_host.local.";
4689 let service_ip_addr = my_ip_interfaces(false)
4690 .iter()
4691 .find(|iface| iface.ip().is_ipv4())
4692 .map(|iface| iface.ip())
4693 .unwrap();
4694
4695 let mut my_service = ServiceInfo::new(
4696 service_type,
4697 instance,
4698 host_name,
4699 service_ip_addr,
4700 5023,
4701 None,
4702 )
4703 .unwrap();
4704
4705 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4707
4708 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4710 let result = mdns_server.register(my_service);
4711 assert!(result.is_ok());
4712
4713 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4714 let browse_chan = mdns_client.browse(service_type).unwrap();
4715 let timeout = Duration::from_millis(1500); let mut resolved = false;
4717
4718 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4720 if let ServiceEvent::ServiceResolved(info) = event {
4721 resolved = true;
4722 println!("Resolved a service of {}", &info.fullname);
4723 break;
4724 }
4725 }
4726
4727 assert!(resolved);
4728
4729 let timeout = Duration::from_millis(new_ttl as u64 * 1000 * 90 / 100);
4731 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4732 println!("event: {:?}", &event);
4733 }
4734
4735 let metrics_chan = mdns_client.get_metrics().unwrap();
4737 let metrics = metrics_chan.recv_timeout(timeout).unwrap();
4738 let ptr_refresh_counter = metrics["cache-refresh-ptr"];
4739 assert_eq!(ptr_refresh_counter, 1);
4740 let srvtxt_refresh_counter = metrics["cache-refresh-srv-txt"];
4741 assert_eq!(srvtxt_refresh_counter, 1);
4742
4743 mdns_server.shutdown().unwrap();
4745 mdns_client.shutdown().unwrap();
4746 }
4747
4748 #[test]
4749 fn test_name_change() {
4750 assert_eq!(name_change("foo.local."), "foo (2).local.");
4751 assert_eq!(name_change("foo (2).local."), "foo (3).local.");
4752 assert_eq!(name_change("foo (9).local."), "foo (10).local.");
4753 assert_eq!(name_change("foo"), "foo (2)");
4754 assert_eq!(name_change("foo (2)"), "foo (3)");
4755 assert_eq!(name_change(""), " (2)");
4756
4757 assert_eq!(name_change("foo (abc)"), "foo (abc) (2)"); assert_eq!(name_change("foo (2"), "foo (2 (2)"); assert_eq!(name_change("foo (2) extra"), "foo (2) extra (2)"); }
4762
4763 #[test]
4764 fn test_hostname_change() {
4765 assert_eq!(hostname_change("foo.local."), "foo-2.local.");
4766 assert_eq!(hostname_change("foo"), "foo-2");
4767 assert_eq!(hostname_change("foo-2.local."), "foo-3.local.");
4768 assert_eq!(hostname_change("foo-9"), "foo-10");
4769 assert_eq!(hostname_change("test-42.domain."), "test-43.domain.");
4770 }
4771
4772 #[test]
4773 fn test_add_answer_txt_ttl() {
4774 let service_type = "_test_add_answer._udp.local.";
4776 let instance = "test_instance";
4777 let host_name = "add_answer_host.local.";
4778 let service_intf = my_ip_interfaces(false)
4779 .into_iter()
4780 .find(|iface| iface.ip().is_ipv4())
4781 .unwrap();
4782 let service_ip_addr = service_intf.ip();
4783 let my_service = ServiceInfo::new(
4784 service_type,
4785 instance,
4786 host_name,
4787 service_ip_addr,
4788 5023,
4789 None,
4790 )
4791 .unwrap();
4792
4793 let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
4795
4796 let mut dummy_data = out.to_data_on_wire();
4798 let interface_id = InterfaceId::from(&service_intf);
4799 let incoming = DnsIncoming::new(dummy_data.pop().unwrap(), interface_id).unwrap();
4800
4801 let if_addrs = vec![service_intf.ip()];
4803 add_answer_of_service(
4804 &mut out,
4805 &incoming,
4806 instance,
4807 &my_service,
4808 RRType::TXT,
4809 if_addrs,
4810 );
4811
4812 assert!(
4814 out.answers_count() > 0,
4815 "No answers added to the outgoing message"
4816 );
4817
4818 let answer = out._answers().first().unwrap();
4820 assert_eq!(answer.0.get_type(), RRType::TXT);
4821
4822 assert_eq!(answer.0.get_record().get_ttl(), my_service.get_other_ttl());
4824 }
4825
4826 #[test]
4827 fn test_interface_flip() {
4828 let ty_domain = "_intf-flip._udp.local.";
4830 let host_name = "intf_flip.local.";
4831 let now = SystemTime::now()
4832 .duration_since(SystemTime::UNIX_EPOCH)
4833 .unwrap();
4834 let instance_name = now.as_micros().to_string(); let port = 5200;
4836
4837 let (ip_addr1, intf_name) = my_ip_interfaces(false)
4839 .iter()
4840 .find(|iface| iface.ip().is_ipv4())
4841 .map(|iface| (iface.ip(), iface.name.clone()))
4842 .unwrap();
4843
4844 println!("Using interface {} with IP {}", intf_name, ip_addr1);
4845
4846 let service1 = ServiceInfo::new(ty_domain, &instance_name, host_name, ip_addr1, port, None)
4848 .expect("valid service info");
4849 let server1 = ServiceDaemon::new().expect("failed to start server");
4850 server1
4851 .register(service1)
4852 .expect("Failed to register service1");
4853
4854 std::thread::sleep(Duration::from_secs(2));
4856
4857 let client = ServiceDaemon::new().expect("failed to start client");
4859
4860 let receiver = client.browse(ty_domain).unwrap();
4861
4862 let timeout = Duration::from_secs(3);
4863 let mut got_data = false;
4864
4865 while let Ok(event) = receiver.recv_timeout(timeout) {
4866 if let ServiceEvent::ServiceResolved(_) = event {
4867 println!("Received ServiceResolved event");
4868 got_data = true;
4869 break;
4870 }
4871 }
4872
4873 assert!(got_data, "Should receive ServiceResolved event");
4874
4875 client.set_ip_check_interval(1).unwrap();
4877
4878 println!("Shutting down interface {}", &intf_name);
4880 client.test_down_interface(&intf_name).unwrap();
4881
4882 let mut got_removed = false;
4883
4884 while let Ok(event) = receiver.recv_timeout(timeout) {
4885 if let ServiceEvent::ServiceRemoved(ty_domain, instance) = event {
4886 got_removed = true;
4887 println!("removed: {ty_domain} : {instance}");
4888 break;
4889 }
4890 }
4891 assert!(got_removed, "Should receive ServiceRemoved event");
4892
4893 println!("Bringing up interface {}", &intf_name);
4894 client.test_up_interface(&intf_name).unwrap();
4895 let mut got_data = false;
4896 while let Ok(event) = receiver.recv_timeout(timeout) {
4897 if let ServiceEvent::ServiceResolved(resolved) = event {
4898 got_data = true;
4899 println!("Received ServiceResolved: {:?}", resolved);
4900 break;
4901 }
4902 }
4903 assert!(
4904 got_data,
4905 "Should receive ServiceResolved event after interface is back up"
4906 );
4907
4908 server1.shutdown().unwrap();
4909 client.shutdown().unwrap();
4910 }
4911
4912 #[test]
4913 fn test_cache_only() {
4914 let service_type = "_cache_only._udp.local.";
4916 let instance = "test_instance";
4917 let host_name = "cache_only_host.local.";
4918 let service_ip_addr = my_ip_interfaces(false)
4919 .iter()
4920 .find(|iface| iface.ip().is_ipv4())
4921 .map(|iface| iface.ip())
4922 .unwrap();
4923
4924 let mut my_service = ServiceInfo::new(
4925 service_type,
4926 instance,
4927 host_name,
4928 service_ip_addr,
4929 5023,
4930 None,
4931 )
4932 .unwrap();
4933
4934 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4936
4937 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4938
4939 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
4942 std::thread::sleep(Duration::from_secs(2));
4943
4944 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4946 let result = mdns_server.register(my_service);
4947 assert!(result.is_ok());
4948
4949 let timeout = Duration::from_millis(1500); let mut resolved = false;
4951
4952 while let Ok(event) = browse_chan.recv_timeout(timeout) {
4954 if let ServiceEvent::ServiceResolved(info) = event {
4955 resolved = true;
4956 println!("Resolved a service of {}", &info.get_fullname());
4957 break;
4958 }
4959 }
4960
4961 assert!(resolved);
4962
4963 mdns_server.shutdown().unwrap();
4965 mdns_client.shutdown().unwrap();
4966 }
4967
4968 #[test]
4969 fn test_cache_only_unsolicited() {
4970 let service_type = "_cache_only._udp.local.";
4972 let instance = "test_instance";
4973 let host_name = "cache_only_host.local.";
4974 let service_ip_addr = my_ip_interfaces(false)
4975 .iter()
4976 .find(|iface| iface.ip().is_ipv4())
4977 .map(|iface| iface.ip())
4978 .unwrap();
4979
4980 let mut my_service = ServiceInfo::new(
4981 service_type,
4982 instance,
4983 host_name,
4984 service_ip_addr,
4985 5023,
4986 None,
4987 )
4988 .unwrap();
4989
4990 let new_ttl = 3; my_service._set_other_ttl(new_ttl);
4992
4993 let mdns_server = ServiceDaemon::new().expect("Failed to create mdns server");
4995 let result = mdns_server.register(my_service);
4996 assert!(result.is_ok());
4997
4998 let mdns_client = ServiceDaemon::new().expect("Failed to create mdns client");
4999 mdns_client.accept_unsolicited(true).unwrap();
5000
5001 std::thread::sleep(Duration::from_secs(2));
5004 let browse_chan = mdns_client.browse_cache(service_type).unwrap();
5005 let timeout = Duration::from_millis(1500); let mut resolved = false;
5007
5008 while let Ok(event) = browse_chan.recv_timeout(timeout) {
5010 if let ServiceEvent::ServiceResolved(info) = event {
5011 resolved = true;
5012 println!("Resolved a service of {}", &info.get_fullname());
5013 break;
5014 }
5015 }
5016
5017 assert!(resolved);
5018
5019 mdns_server.shutdown().unwrap();
5021 mdns_client.shutdown().unwrap();
5022 }
5023
5024 #[test]
5025 fn test_custom_port_isolation() {
5026 let service_type = "_custom_port._udp.local.";
5031 let instance_custom = "custom_port_instance";
5032 let instance_default = "default_port_instance";
5033 let host_name = "custom_port_host.local.";
5034
5035 let service_ip_addr = my_ip_interfaces(false)
5036 .iter()
5037 .find(|iface| iface.ip().is_ipv4())
5038 .map(|iface| iface.ip())
5039 .expect("Test requires an IPv4 interface");
5040
5041 let service_custom = ServiceInfo::new(
5043 service_type,
5044 instance_custom,
5045 host_name,
5046 service_ip_addr,
5047 8080,
5048 None,
5049 )
5050 .unwrap();
5051
5052 let service_default = ServiceInfo::new(
5054 service_type,
5055 instance_default,
5056 host_name,
5057 service_ip_addr,
5058 8081,
5059 None,
5060 )
5061 .unwrap();
5062
5063 let custom_port = 5454u16;
5065 let server_custom =
5066 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port server");
5067 let client_custom =
5068 ServiceDaemon::new_with_port(custom_port).expect("Failed to create custom port client");
5069
5070 let server_default = ServiceDaemon::new().expect("Failed to create default port server");
5072
5073 server_custom
5075 .register(service_custom.clone())
5076 .expect("Failed to register custom port service");
5077
5078 server_default
5080 .register(service_default.clone())
5081 .expect("Failed to register default port service");
5082
5083 let browse_custom = client_custom
5085 .browse(service_type)
5086 .expect("Failed to browse on custom port");
5087
5088 let timeout = Duration::from_secs(3);
5089 let mut found_custom = false;
5090 let mut found_default_on_custom = false;
5091
5092 while let Ok(event) = browse_custom.recv_timeout(timeout) {
5094 if let ServiceEvent::ServiceResolved(info) = event {
5095 println!(
5096 "Custom port client resolved: {} on port {}",
5097 info.get_fullname(),
5098 info.get_port()
5099 );
5100 if info.get_fullname().starts_with(instance_custom) {
5101 found_custom = true;
5102 assert_eq!(info.get_port(), 8080);
5103 }
5104 if info.get_fullname().starts_with(instance_default) {
5105 found_default_on_custom = true;
5106 }
5107 }
5108 }
5109
5110 assert!(
5111 found_custom,
5112 "Custom port client should find service on custom port"
5113 );
5114 assert!(
5115 !found_default_on_custom,
5116 "Custom port client should NOT find service on default port"
5117 );
5118
5119 let client_default = ServiceDaemon::new().expect("Failed to create default port client");
5122 let browse_default = client_default
5123 .browse(service_type)
5124 .expect("Failed to browse on default port");
5125
5126 let mut found_default = false;
5127 let mut found_custom_on_default = false;
5128
5129 while let Ok(event) = browse_default.recv_timeout(timeout) {
5130 if let ServiceEvent::ServiceResolved(info) = event {
5131 println!(
5132 "Default port client resolved: {} on port {}",
5133 info.get_fullname(),
5134 info.get_port()
5135 );
5136 if info.get_fullname().starts_with(instance_default) {
5137 found_default = true;
5138 assert_eq!(info.get_port(), 8081);
5139 }
5140 if info.get_fullname().starts_with(instance_custom) {
5141 found_custom_on_default = true;
5142 }
5143 }
5144 }
5145
5146 assert!(
5147 found_default,
5148 "Default port client should find service on default port"
5149 );
5150 assert!(
5151 !found_custom_on_default,
5152 "Default port client should NOT find service on custom port"
5153 );
5154
5155 server_custom.shutdown().unwrap();
5157 client_custom.shutdown().unwrap();
5158 server_default.shutdown().unwrap();
5159 client_default.shutdown().unwrap();
5160 }
5161}