#[cfg(any(feature = "alloc", feature = "std"))]
use crate::trace::*;
#[cfg(any(feature = "alloc", feature = "std"))]
mod respond;
mod schedule;
mod state;
#[cfg(any(feature = "alloc", feature = "std"))]
use bytes::Bytes;
#[cfg(any(feature = "alloc", feature = "std"))]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum KasOwner {
ServiceType,
Instance,
Host,
}
#[cfg(any(feature = "alloc", feature = "std"))]
#[derive(Debug, Clone, Copy)]
struct KasHint<I> {
owner: KasOwner,
rtype: crate::wire::ResourceType,
rdata_hash: u64,
expires_at: I,
}
#[cfg(any(feature = "alloc", feature = "std"))]
const KAS_RING_SIZE: usize = 16;
#[cfg(any(feature = "alloc", feature = "std"))]
const MAX_QUESTIONER_SRCS: usize = 8;
#[cfg(any(feature = "alloc", feature = "std"))]
const MAX_LEGACY_RESPONSES: usize = 8;
#[cfg(any(feature = "alloc", feature = "std"))]
#[derive(Debug, Clone)]
struct LegacyResp {
dst: core::net::SocketAddr,
query_id: u16,
name: crate::Name,
qtype: crate::wire::ResourceType,
qclass: crate::wire::ResourceClass,
is_meta: bool,
}
#[cfg(any(feature = "alloc", feature = "std"))]
const MAX_PEER_PROBE_RECORDS: usize = 16;
#[cfg(any(feature = "alloc", feature = "std"))]
const MAX_PEER_PROBES: usize = 8;
#[cfg(any(feature = "alloc", feature = "std"))]
const CONFLICT_REPROBE_MIN_INTERVAL: core::time::Duration = core::time::Duration::from_secs(1);
#[cfg(any(feature = "alloc", feature = "std"))]
#[derive(Debug, Clone)]
struct PeerRecord {
rtype: crate::wire::ResourceType,
canonical: Bytes,
}
#[cfg(any(feature = "alloc", feature = "std"))]
#[derive(Debug)]
struct PeerProbe {
src: core::net::SocketAddr,
records: std::vec::Vec<PeerRecord>,
}
#[cfg(any(feature = "alloc", feature = "std"))]
fn write_canonical_wire_name(name_str: &str, out: &mut std::vec::Vec<u8>) {
let trimmed = match name_str.strip_suffix('.') {
Some(t) => t,
None => name_str,
};
if trimmed.is_empty() {
out.push(0);
return;
}
for label in trimmed.split('.') {
if label.is_empty() {
continue;
}
let len = label.len().min(63);
#[allow(clippy::cast_possible_truncation)]
out.push(len as u8);
for &b in label.as_bytes().iter().take(63) {
out.push(b.to_ascii_lowercase());
}
}
out.push(0); }
#[cfg(any(feature = "alloc", feature = "std"))]
fn hash_rdata(bytes: &[u8]) -> u64 {
const FNV_BASIS: u64 = 0xcbf29ce484222325;
const FNV_PRIME: u64 = 0x100000001b3;
let mut h: u64 = FNV_BASIS;
for &b in bytes {
h ^= b as u64;
h = h.wrapping_mul(FNV_PRIME);
}
h
}
#[cfg(any(feature = "alloc", feature = "std"))]
#[allow(unused_imports)]
pub(crate) use respond::{EmittedRecords, multicast_dst, write_goodbye};
#[allow(unused_imports)]
pub(crate) use schedule::{announce_deadline, probe_deadline, re_announce_deadline};
pub use state::ServiceState;
#[cfg(any(feature = "alloc", feature = "std"))]
use rand::SeedableRng;
#[cfg(any(feature = "alloc", feature = "std"))]
use crate::error::{HandleTimeoutError, TransmitError};
#[cfg(any(feature = "alloc", feature = "std"))]
use crate::event::{ServiceEvent, ServiceUpdate};
#[cfg(any(feature = "alloc", feature = "std"))]
use crate::records::ServiceRecords;
#[cfg(any(feature = "alloc", feature = "std"))]
use crate::transmit::Transmit;
#[cfg(any(feature = "alloc", feature = "std"))]
use crate::{Instant, Pool, ServiceHandle};
#[cfg(any(feature = "alloc", feature = "std"))]
type Rng = rand::rngs::StdRng;
#[cfg(any(feature = "alloc", feature = "std"))]
fn rename_with_suffix(current: &str, attempt: u32) -> std::string::String {
use std::string::ToString;
let (body, trailing_dot) = match current.strip_suffix('.') {
Some(b) => (b, true),
None => (current, false),
};
let (instance, rest) = match body.split_once('.') {
Some((i, r)) => (i, Some(r)),
None => (body, None),
};
let base_instance = match instance.rsplit_once('-') {
Some((prefix, n)) if !n.is_empty() && n.chars().all(|c| c.is_ascii_digit()) => prefix,
_ => instance,
};
let mut out = std::string::String::new();
out.push_str(base_instance);
out.push('-');
out.push_str(&attempt.to_string());
if let Some(r) = rest {
out.push('.');
out.push_str(r);
}
if trailing_dot {
out.push('.');
}
out
}
#[cfg(any(feature = "alloc", feature = "std"))]
fn compare_rr_sets_we_lose(
our: &crate::records::ServiceRecords,
peer_probes: &[PeerProbe],
) -> bool {
let mut our_set: std::vec::Vec<std::vec::Vec<u8>> = std::vec::Vec::new();
{
let mut buf = std::vec::Vec::new();
buf.extend_from_slice(&crate::wire::ResourceType::Srv.to_u16().to_be_bytes());
buf.extend_from_slice(&our.priority().to_be_bytes());
buf.extend_from_slice(&our.weight().to_be_bytes());
buf.extend_from_slice(&our.port().to_be_bytes());
write_canonical_wire_name(our.host().as_str(), &mut buf);
our_set.push(buf);
}
{
let mut buf = std::vec::Vec::new();
buf.extend_from_slice(&crate::wire::ResourceType::Txt.to_u16().to_be_bytes());
respond::write_canonical_txt(our.txt_segments(), &mut buf);
our_set.push(buf);
}
our_set.sort();
let our_concat: std::vec::Vec<u8> = our_set.into_iter().flatten().collect();
for probe in peer_probes {
let mut peer_set: std::vec::Vec<std::vec::Vec<u8>> = probe
.records
.iter()
.map(|p| {
let mut buf = std::vec::Vec::new();
buf.extend_from_slice(&p.rtype.to_u16().to_be_bytes());
buf.extend_from_slice(&p.canonical[..]);
buf
})
.collect();
peer_set.sort();
let peer_concat: std::vec::Vec<u8> = peer_set.into_iter().flatten().collect();
if peer_concat >= our_concat {
return true;
}
}
false
}
#[cfg(any(feature = "alloc", feature = "std"))]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum PendingTransmitKind {
Probe,
Announcement,
Response,
}
#[cfg(any(feature = "alloc", feature = "std"))]
#[derive(Debug, Clone)]
enum AwaitingConfirm {
Probe,
Announcement(respond::EmittedRecords),
Response(respond::EmittedRecords, u64),
MetaResponse,
}
#[cfg(any(feature = "alloc", feature = "std"))]
#[derive(Debug, Default, Clone)]
struct GoodbyeOwnership {
ptr: bool,
srv: bool,
txt: bool,
subtypes: bool,
a: std::vec::Vec<core::net::Ipv4Addr>,
aaaa: std::vec::Vec<core::net::Ipv6Addr>,
}
#[cfg(any(feature = "alloc", feature = "std"))]
impl GoodbyeOwnership {
fn record_emitted(&mut self, e: &respond::EmittedRecords) {
self.ptr |= e.ptr();
self.srv |= e.srv();
self.txt |= e.txt();
self.subtypes |= e.subtypes();
for ip in e.a_slice() {
if !self.a.contains(ip) {
self.a.push(*ip);
}
}
for ip in e.aaaa_slice() {
if !self.aaaa.contains(ip) {
self.aaaa.push(*ip);
}
}
}
#[inline]
fn reset_instance(&mut self) {
self.ptr = false;
self.srv = false;
self.txt = false;
self.subtypes = false;
}
#[inline]
fn any_instance(&self) -> bool {
self.ptr || self.srv || self.txt || self.subtypes
}
#[inline]
fn any_host(&self) -> bool {
!self.a.is_empty() || !self.aaaa.is_empty()
}
}
#[cfg(any(feature = "alloc", feature = "std"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "alloc", feature = "std"))))]
#[derive(Debug, Clone)]
pub struct WithdrawalSnapshot {
pub records: crate::records::ServiceRecords,
#[allow(dead_code)]
pub(crate) owned: respond::EmittedRecords,
pub host_a: std::vec::Vec<core::net::Ipv4Addr>,
pub host_aaaa: std::vec::Vec<core::net::Ipv6Addr>,
}
#[cfg(any(feature = "alloc", feature = "std"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "alloc", feature = "std"))))]
#[derive(Debug, Clone)]
pub struct RenameGoodbyeHandoff {
pub(crate) records: crate::records::ServiceRecords,
pub(crate) owned: respond::EmittedRecords,
}
#[cfg(any(feature = "alloc", feature = "std"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "alloc", feature = "std"))))]
pub struct Service<I, TQ, EV> {
handle: ServiceHandle,
state: ServiceState,
records: ServiceRecords,
#[cfg(feature = "stats")]
stats: Option<std::sync::Arc<hick_trace::stats::Stats>>,
lifecycle_deadline: Option<I>,
response_deadline: Option<I>,
probe_count: u8,
announce_count: u8,
rename_attempt: u32,
pending_transmits: [Option<PendingTransmitKind>; 2],
rng: Rng,
pending_tx: TQ,
pending_updates: EV,
last_now: Option<I>,
kas_hints: [Option<KasHint<I>>; KAS_RING_SIZE],
kas_next_slot: usize,
questioner_srcs: std::vec::Vec<core::net::SocketAddr>,
peer_probes: std::vec::Vec<PeerProbe>,
tiebreak_pending: bool,
goodbye: GoodbyeOwnership,
awaiting_confirm: Option<AwaitingConfirm>,
pending_legacy: std::vec::Vec<LegacyResp>,
last_conflict_reprobe: Option<I>,
rename_goodbye_handoff: Option<RenameGoodbyeHandoff>,
meta_response_deadline: Option<I>,
meta_questioner_srcs: std::vec::Vec<core::net::SocketAddr>,
meta_known_answered: bool,
}
#[cfg(any(feature = "alloc", feature = "std"))]
impl<I, TQ, EV> Service<I, TQ, EV>
where
I: Instant,
TQ: Pool<Transmit>,
EV: Pool<ServiceUpdate>,
{
#[allow(dead_code)]
pub(crate) fn try_new(
handle: ServiceHandle,
records: ServiceRecords,
now: I,
rng_seed: [u8; 32],
probe: bool,
) -> Self {
let mut rng = Rng::from_seed(rng_seed);
let (state, lifecycle_deadline) = if probe {
(ServiceState::Init, probe_deadline(now, 0, &mut rng))
} else {
(ServiceState::Announcing(0), announce_deadline(now, 0))
};
Self {
handle,
state,
records,
#[cfg(feature = "stats")]
stats: None,
lifecycle_deadline,
response_deadline: None,
probe_count: 0,
announce_count: 0,
rename_attempt: 0,
pending_transmits: [None, None],
rng,
pending_tx: TQ::new(),
pending_updates: EV::new(),
last_now: Some(now),
kas_hints: [None; KAS_RING_SIZE],
kas_next_slot: 0,
questioner_srcs: std::vec::Vec::new(),
peer_probes: std::vec::Vec::new(),
tiebreak_pending: false,
goodbye: GoodbyeOwnership::default(),
awaiting_confirm: None,
pending_legacy: std::vec::Vec::new(),
last_conflict_reprobe: None,
rename_goodbye_handoff: None,
meta_response_deadline: None,
meta_questioner_srcs: std::vec::Vec::new(),
meta_known_answered: false,
}
}
#[cfg(feature = "stats")]
pub(crate) fn set_stats(&mut self, stats: std::sync::Arc<hick_trace::stats::Stats>) {
self.stats = Some(stats);
}
#[cfg(feature = "stats")]
#[inline]
fn stat(&self) -> Option<&hick_trace::stats::Stats> {
self.stats.as_deref()
}
#[inline(always)]
pub const fn handle(&self) -> ServiceHandle {
self.handle
}
#[inline(always)]
pub const fn state(&self) -> ServiceState {
self.state
}
#[inline(always)]
pub fn name(&self) -> &crate::Name {
self.records.instance()
}
#[inline(always)]
pub const fn records(&self) -> &ServiceRecords {
&self.records
}
#[inline(always)]
pub fn advertises_host(&self) -> bool {
self.goodbye.any_host()
}
#[inline(always)]
pub fn advertises_instance(&self) -> bool {
self.goodbye.any_instance()
}
#[inline]
pub fn advertised_a_addrs(&self) -> &[core::net::Ipv4Addr] {
&self.goodbye.a
}
#[inline]
pub fn advertised_aaaa_addrs(&self) -> &[core::net::Ipv6Addr] {
&self.goodbye.aaaa
}
pub fn note_transmit_result(&mut self, now: I, delivered: bool) {
let kind = match self.awaiting_confirm.take() {
Some(k) => k,
None => return,
};
match kind {
AwaitingConfirm::Probe => {
if let ServiceState::Probing(n) = self.state {
if !delivered {
self.lifecycle_deadline = probe_deadline(now, n, &mut self.rng);
} else {
#[cfg(feature = "stats")]
if let Some(s) = self.stat() {
s.probes_tx(1);
}
if n >= 2 {
self.state = ServiceState::Announcing(0);
self.probe_count = 3;
self.lifecycle_deadline = announce_deadline(now, 0);
} else {
let new_n = n.saturating_add(1);
self.state = ServiceState::Probing(new_n);
self.probe_count = new_n;
self.lifecycle_deadline = probe_deadline(now, new_n, &mut self.rng);
}
}
}
}
AwaitingConfirm::Announcement(emitted) => {
if !delivered {
if matches!(
self.state,
ServiceState::Announcing(_) | ServiceState::Established
) {
self.lifecycle_deadline = announce_deadline(now, 1);
}
return;
}
#[cfg(feature = "stats")]
if let Some(s) = self.stat() {
s.announcements_tx(1);
}
self.goodbye.record_emitted(&emitted);
if let ServiceState::Announcing(n) = self.state {
if n >= 1 {
self.state = ServiceState::Established;
self.announce_count = 2;
let _ = self.pending_updates.insert(ServiceUpdate::Established);
self.lifecycle_deadline = re_announce_deadline(now, self.records.ttl_secs());
debug!(
target: "mdns_proto::service",
handle = self.handle.raw(),
"service: Announcing → Established"
);
#[cfg(feature = "stats")]
if let Some(s) = self.stat() {
s.services_established(1);
}
} else {
let new_n = n.saturating_add(1);
self.state = ServiceState::Announcing(new_n);
self.announce_count = new_n;
self.lifecycle_deadline = announce_deadline(now, new_n);
debug!(
target: "mdns_proto::service",
handle = self.handle.raw(),
announce_n = new_n,
"service: Announcing — first announcement confirmed, scheduling second"
);
}
}
}
AwaitingConfirm::Response(emitted, _kas_suppressed_count) => {
#[cfg(feature = "stats")]
let kas_suppressed_count = _kas_suppressed_count;
if delivered {
#[cfg(feature = "stats")]
if let Some(s) = self.stat() {
s.responses_tx(1);
if kas_suppressed_count > 0 {
s.answers_suppressed_kas(kas_suppressed_count);
}
}
self.goodbye.record_emitted(&emitted);
}
}
AwaitingConfirm::MetaResponse => {
if delivered {
#[cfg(feature = "stats")]
if let Some(s) = self.stat() {
s.responses_tx(1);
}
}
}
}
}
#[inline]
pub fn note_transmit_delivered(&mut self, now: I) {
self.note_transmit_result(now, true);
}
pub fn withdrawal_snapshot(&mut self) -> WithdrawalSnapshot {
let owned = respond::EmittedRecords::new(
self.goodbye.ptr,
self.goodbye.srv,
self.goodbye.txt,
std::vec::Vec::new(), std::vec::Vec::new(),
self.goodbye.subtypes,
);
WithdrawalSnapshot {
records: self.records.clone(),
owned,
host_a: self.goodbye.a.clone(),
host_aaaa: self.goodbye.aaaa.clone(),
}
}
pub fn take_rename_goodbye_handoff(&mut self) -> Option<RenameGoodbyeHandoff> {
self.rename_goodbye_handoff.take()
}
pub fn poll_timeout(&self) -> Option<I> {
if !self.pending_legacy.is_empty() {
return self.last_now;
}
let mut best: Option<I> = None;
for d in [
self.lifecycle_deadline,
self.response_deadline,
self.meta_response_deadline,
]
.into_iter()
.flatten()
{
best = Some(match best {
Some(b) if b <= d => b,
_ => d,
});
}
best
}
fn push_pending(&mut self, kind: PendingTransmitKind) {
if self.pending_transmits[0].is_none() {
self.pending_transmits[0] = Some(kind);
} else if self.pending_transmits[1].is_none() {
self.pending_transmits[1] = Some(kind);
}
}
fn pop_pending(&mut self) -> Option<PendingTransmitKind> {
let head = self.pending_transmits[0].take();
if head.is_some() {
self.pending_transmits[0] = self.pending_transmits[1].take();
}
head
}
fn peek_pending(&self) -> Option<PendingTransmitKind> {
self.pending_transmits[0]
}
pub fn poll(&mut self) -> Option<ServiceUpdate> {
let entry = self.pending_updates.iter().next().map(|(k, _)| k)?;
let upd = self.pending_updates.try_remove(entry);
if upd.is_some() {
debug!(
target: "mdns_proto::service",
handle = self.handle.raw(),
update = ?upd,
"Service::poll emitted update"
);
}
upd
}
fn our_canonical_record_for(&self, rtype: crate::wire::ResourceType) -> std::vec::Vec<u8> {
let mut out = std::vec::Vec::new();
match rtype {
crate::wire::ResourceType::Srv => {
out.extend_from_slice(&self.records.priority().to_be_bytes());
out.extend_from_slice(&self.records.weight().to_be_bytes());
out.extend_from_slice(&self.records.port().to_be_bytes());
write_canonical_wire_name(self.records.host().as_str(), &mut out);
}
crate::wire::ResourceType::Txt => {
respond::write_canonical_txt(self.records.txt_segments(), &mut out);
}
_ => {}
}
out
}
fn clear_response_cycle_state(&mut self) {
self.pending_legacy.clear();
self.kas_hints = [None; KAS_RING_SIZE];
self.kas_next_slot = 0;
self.questioner_srcs.clear();
self.meta_response_deadline = None;
self.meta_questioner_srcs.clear();
self.meta_known_answered = false;
}
fn reset_advertised_name_state(&mut self) {
self.goodbye.reset_instance();
self.clear_response_cycle_state();
}
fn host_record_is_ours(&self, record: &crate::wire::Ref<'_>) -> bool {
match record.rdata_view() {
Ok(crate::wire::Rdata::A(a)) => {
let addr = a.addr();
!addr.is_link_local() && self.records.a_addrs_slice().contains(&addr)
}
Ok(crate::wire::Rdata::AAAA(a)) => {
let addr = a.addr();
let link_local = (addr.segments()[0] & 0xffc0) == 0xfe80;
!link_local && self.records.aaaa_addrs_slice().contains(&addr)
}
_ => false,
}
}
pub fn handle_event(&mut self, event: ServiceEvent<'_>, now: I) {
#[cfg(feature = "tracing")]
let _span = hick_trace::trace_span!("service", handle = self.handle.raw()).entered();
self.last_now = Some(now);
trace!(
target: "mdns_proto::service",
handle = self.handle.raw(),
state = ?self.state,
event = ?core::mem::discriminant(&event),
"service: handle_event"
);
match (self.state, event) {
(ServiceState::Probing(_) | ServiceState::Init, ServiceEvent::ProbeConflict(pc)) => {
if !matches!(
pc.record().rtype(),
crate::wire::ResourceType::Srv | crate::wire::ResourceType::Txt
) {
return;
}
let view = match pc.record().rdata_view() {
Ok(v) => v,
Err(_) => return, };
let mut scratch = std::vec::Vec::new();
let canonical = match respond::canonical_rdata_for_hash(&view, &mut scratch) {
Ok(c) => Bytes::copy_from_slice(c),
Err(_) => return, };
let rtype = pc.record().rtype();
let src = pc.src();
let bucket_idx = self.peer_probes.iter().position(|b| b.src == src);
let bucket_idx = match bucket_idx {
Some(i) => i,
None => {
if self.peer_probes.len() >= MAX_PEER_PROBES {
return; }
self.peer_probes.push(PeerProbe {
src,
records: std::vec::Vec::new(),
});
self.peer_probes.len().saturating_sub(1)
}
};
let bucket = match self.peer_probes.get_mut(bucket_idx) {
Some(b) => b,
None => return,
};
if bucket.records.len() >= MAX_PEER_PROBE_RECORDS {
return; }
bucket.records.push(PeerRecord { rtype, canonical });
self.tiebreak_pending = true;
}
(
ServiceState::Announcing(_) | ServiceState::Established,
ServiceEvent::ProbeConflict(pc),
) => {
if !matches!(
pc.record().rtype(),
crate::wire::ResourceType::Srv | crate::wire::ResourceType::Txt
) {
return;
}
let view = match pc.record().rdata_view() {
Ok(v) => v,
Err(_) => return,
};
let mut scratch = std::vec::Vec::new();
let peer_canonical = match respond::canonical_rdata_for_hash(&view, &mut scratch) {
Ok(c) => c,
Err(_) => return,
};
if peer_canonical
== self
.our_canonical_record_for(pc.record().rtype())
.as_slice()
{
return;
}
if let Some(last) = self.last_conflict_reprobe
&& let Some(elapsed) = now.checked_duration_since(last)
&& elapsed < CONFLICT_REPROBE_MIN_INTERVAL
{
return;
}
warn!(
target: "mdns_proto::service",
handle = self.handle.raw(),
state = ?self.state,
rtype = ?pc.record().rtype(),
"service: ProbeConflict (§9 post-establishment) — reverting to probe"
);
#[cfg(feature = "stats")]
if let Some(s) = self.stat() {
s.conflicts(1);
}
self.last_conflict_reprobe = Some(now);
self.state = ServiceState::Init;
self.probe_count = 0;
self.announce_count = 0;
self.pending_transmits = [None, None];
self.response_deadline = None;
self.clear_response_cycle_state();
self.lifecycle_deadline = probe_deadline(now, 0, &mut self.rng);
}
(ServiceState::Established | ServiceState::Announcing(_), ServiceEvent::Question(sq)) => {
let src = sq.src();
if crate::endpoint::is_meta_query_name(sq.question().qname()) {
if src.port() != crate::constants::MDNS_PORT {
if self.pending_legacy.len() < MAX_LEGACY_RESPONSES
&& let Ok(meta) = crate::Name::try_from_str(crate::endpoint::DNS_SD_META_QUERY_NAME)
{
let query_id = sq.query_id();
let qtype = sq.question().qtype();
let qclass = sq.question().qclass();
let dup = self
.pending_legacy
.iter()
.any(|l| l.dst == src && l.query_id == query_id && l.is_meta);
if !dup {
self.pending_legacy.push(LegacyResp {
dst: src,
query_id,
name: meta,
qtype,
qclass,
is_meta: true,
});
}
}
} else {
use rand_core::Rng as _;
if !self.meta_questioner_srcs.contains(&src)
&& self.meta_questioner_srcs.len() < MAX_QUESTIONER_SRCS
{
self.meta_questioner_srcs.push(src);
}
let jitter_ms = if sq.truncated() {
400u32.saturating_add(self.rng.next_u32() % 101) } else {
20u32.saturating_add(self.rng.next_u32() % 101) };
if let Some(due) =
now.checked_add_duration(core::time::Duration::from_millis(u64::from(jitter_ms)))
{
self.meta_response_deadline = Some(match self.meta_response_deadline {
Some(existing) if existing <= due => existing,
_ => due,
});
}
}
return;
}
if src.port() != crate::constants::MDNS_PORT {
if self.pending_legacy.len() < MAX_LEGACY_RESPONSES {
let qname = sq.question().qname();
let echo = if crate::endpoint::names_match(self.records.service_type(), qname) {
Some(self.records.service_type().clone())
} else if crate::endpoint::names_match(self.records.instance(), qname) {
Some(self.records.instance().clone())
} else if crate::endpoint::names_match(self.records.host(), qname) {
Some(self.records.host().clone())
} else {
self
.records
.subtype_names()
.iter()
.find(|s| crate::endpoint::names_match(s, qname))
.cloned()
};
if let Some(name) = echo {
let qtype = sq.question().qtype();
let qclass = sq.question().qclass();
let query_id = sq.query_id();
let dup = self.pending_legacy.iter().any(|l| {
l.dst == src
&& l.query_id == query_id
&& l.qtype == qtype
&& l.qclass == qclass
&& l.name == name
});
if !dup {
self.pending_legacy.push(LegacyResp {
dst: src,
query_id,
name,
qtype,
qclass,
is_meta: false,
});
}
}
}
return;
}
use rand_core::Rng as _;
let jitter_ms = if sq.truncated() {
400u32.saturating_add(self.rng.next_u32() % 101) } else {
20u32.saturating_add(self.rng.next_u32() % 101) };
let wait = core::time::Duration::from_millis(u64::from(jitter_ms));
let new_rd = match now.checked_add_duration(wait) {
Some(t) => t,
None => return,
};
self.response_deadline = Some(match self.response_deadline {
Some(existing) if existing <= new_rd => existing,
_ => new_rd,
});
if !self.questioner_srcs.contains(&src) && self.questioner_srcs.len() < MAX_QUESTIONER_SRCS
{
self.questioner_srcs.push(src);
}
}
(_, ServiceEvent::KnownAnswer(ka)) => {
if crate::endpoint::is_meta_query_name(ka.record().name()) {
if self.meta_response_deadline.is_some()
&& self.meta_questioner_srcs.contains(&ka.src())
&& ka.record().rclass().is_in()
&& ka.record().rtype() == crate::wire::ResourceType::Ptr
&& ka.record().ttl().saturating_mul(2) >= self.records.ttl_secs()
&& let Ok(crate::wire::Rdata::Ptr(p)) = ka.record().rdata_view()
&& crate::endpoint::names_match(self.records.service_type(), p.target())
{
self.meta_known_answered = true;
}
return;
}
if self.response_deadline.is_none() {
return;
}
if !self.questioner_srcs.contains(&ka.src()) {
return;
}
if !ka.record().rclass().is_in() {
return;
}
let last_now = now;
let querier_ttl = ka.record().ttl();
let our_ttl = self.records.ttl_secs();
if querier_ttl.saturating_mul(2) < our_ttl {
return;
}
let ttl = core::time::Duration::from_secs(u64::from(ka.record().ttl()));
let expires_at = match last_now.checked_add_duration(ttl) {
Some(t) => t,
None => return,
};
let view = match ka.record().rdata_view() {
Ok(v) => v,
Err(_) => return, };
let mut scratch = std::vec::Vec::new();
let canonical = match respond::canonical_rdata_for_hash(&view, &mut scratch) {
Ok(c) => c,
Err(_) => return, };
let rdata_hash = hash_rdata(canonical);
let owner = if crate::endpoint::names_match_record(self.records.service_type(), ka.record())
{
KasOwner::ServiceType
} else if crate::endpoint::names_match_record(self.records.instance(), ka.record()) {
KasOwner::Instance
} else if crate::endpoint::names_match_record(self.records.host(), ka.record()) {
KasOwner::Host
} else {
return; };
let hint = KasHint {
owner,
rtype: ka.record().rtype(),
rdata_hash,
expires_at,
};
if let Some(slot) = self.kas_hints.get_mut(self.kas_next_slot) {
*slot = Some(hint);
self.kas_next_slot = self.kas_next_slot.saturating_add(1) % KAS_RING_SIZE;
trace!(
target: "mdns_proto::service",
handle = self.handle.raw(),
rtype = ?ka.record().rtype(),
"service: KnownAnswer hint stored (§7.1 KAS)"
);
}
}
(_, ServiceEvent::HostConflict(hc)) => {
if self.host_record_is_ours(hc.record()) {
return;
}
warn!(
target: "mdns_proto::service",
handle = self.handle.raw(),
state = ?self.state,
rtype = ?hc.record().rtype(),
"service: HostConflict — peer claimed our host name with different rdata"
);
#[cfg(feature = "stats")]
if let Some(s) = self.stat() {
s.conflicts(1);
}
let _ = self.pending_updates.insert(ServiceUpdate::HostConflict);
}
_ => {}
}
}
#[allow(clippy::arithmetic_side_effects)]
pub fn handle_timeout(&mut self, now: I) -> Result<(), HandleTimeoutError> {
#[cfg(feature = "tracing")]
let _span = hick_trace::trace_span!("service", handle = self.handle.raw()).entered();
self.last_now = Some(now);
for slot in self.kas_hints.iter_mut() {
if let Some(hint) = slot
&& hint.expires_at <= now
{
*slot = None;
}
}
if self.tiebreak_pending && matches!(self.state, ServiceState::Init | ServiceState::Probing(_))
{
self.tiebreak_pending = false;
let we_lose = compare_rr_sets_we_lose(&self.records, &self.peer_probes);
self.peer_probes.clear();
if we_lose {
warn!(
target: "mdns_proto::service",
handle = self.handle.raw(),
state = ?self.state,
rename_attempt = self.rename_attempt.saturating_add(1),
"service: probe tiebreak lost (§8.2) — renaming"
);
#[cfg(feature = "stats")]
if let Some(s) = self.stat() {
s.conflicts(1);
s.renames(1);
}
if self.goodbye.any_instance() {
let owned = respond::EmittedRecords::new(
self.goodbye.ptr,
self.goodbye.srv,
self.goodbye.txt,
std::vec::Vec::new(),
std::vec::Vec::new(),
self.goodbye.subtypes,
);
self.rename_goodbye_handoff = Some(RenameGoodbyeHandoff {
records: self.records.clone(),
owned,
});
}
self.rename_attempt = self.rename_attempt.saturating_add(1);
let new_name_str =
rename_with_suffix(self.records.instance().as_str(), self.rename_attempt);
match crate::Name::try_from_str(&new_name_str) {
Ok(new_name) => {
self.records.set_instance(new_name.clone());
let _ = self.pending_updates.insert(ServiceUpdate::Renamed(
crate::event::ServiceRenamed::new(new_name),
));
self.state = ServiceState::Init;
self.probe_count = 0;
self.announce_count = 0;
self.pending_transmits = [None, None];
self.response_deadline = None;
self.lifecycle_deadline = probe_deadline(now, 0, &mut self.rng);
self.reset_advertised_name_state();
}
Err(_) => {
self.state = ServiceState::Conflicting;
let _ = self.pending_updates.insert(ServiceUpdate::Conflict);
self.lifecycle_deadline = None;
self.pending_transmits = [None, None];
self.response_deadline = None;
self.goodbye.reset_instance();
self.clear_response_cycle_state();
}
}
return Ok(());
}
}
let response_fired = if let Some(rd) = self.response_deadline {
if now >= rd {
self.response_deadline = None;
true
} else {
false
}
} else {
false
};
if self.state == ServiceState::Init && self.lifecycle_deadline.is_none() {
self.lifecycle_deadline = probe_deadline(now, 0, &mut self.rng);
}
let lifecycle_fired = if let Some(due) = self.lifecycle_deadline {
if now >= due {
match self.state {
ServiceState::Init => {
self.state = ServiceState::Probing(0);
self.lifecycle_deadline = probe_deadline(now, 0, &mut self.rng);
debug!(
target: "mdns_proto::service",
handle = self.handle.raw(),
"service: Init → Probing(0)"
);
false }
ServiceState::Probing(n) => {
debug!(
target: "mdns_proto::service",
handle = self.handle.raw(),
probe_n = n,
"service: Probing — enqueueing probe"
);
self.push_pending(PendingTransmitKind::Probe);
self.lifecycle_deadline = probe_deadline(now, n, &mut self.rng);
true
}
ServiceState::Announcing(_n) => {
debug!(
target: "mdns_proto::service",
handle = self.handle.raw(),
announce_n = _n,
"service: Announcing — enqueueing announcement"
);
self.push_pending(PendingTransmitKind::Announcement);
self.lifecycle_deadline = announce_deadline(now, 1);
true
}
ServiceState::Established => {
debug!(
target: "mdns_proto::service",
handle = self.handle.raw(),
"service: Established — enqueueing periodic re-announce"
);
self.push_pending(PendingTransmitKind::Announcement);
self.lifecycle_deadline = re_announce_deadline(now, self.records.ttl_secs());
true
}
ServiceState::Conflicting => {
false
}
}
} else {
false
}
} else {
false
};
if response_fired {
self.push_pending(PendingTransmitKind::Response);
}
let _ = lifecycle_fired;
Ok(())
}
pub fn poll_transmit(
&mut self,
now: I,
buf: &mut [u8],
) -> Result<Option<Transmit>, TransmitError> {
#[cfg(feature = "tracing")]
let _span = hick_trace::trace_span!("service", handle = self.handle.raw()).entered();
if self.awaiting_confirm.is_some() {
return Ok(None);
}
if self.meta_response_deadline.is_some_and(|due| now >= due) {
self.meta_response_deadline = None;
let suppressed = self.meta_known_answered && self.meta_questioner_srcs.len() == 1;
self.meta_questioner_srcs.clear();
self.meta_known_answered = false;
if !suppressed
&& let Ok(meta) = crate::Name::try_from_str(crate::endpoint::DNS_SD_META_QUERY_NAME)
&& let Ok(n) = respond::write_meta_response(&self.records, &meta, buf)
{
self.awaiting_confirm = Some(AwaitingConfirm::MetaResponse);
return Ok(Some(Transmit::new(respond::multicast_dst(), None, n)));
}
}
if let Some(legacy) = self.pending_legacy.first() {
let encoded = if legacy.is_meta {
respond::write_legacy_meta_response(
&self.records,
legacy.query_id,
&legacy.name,
legacy.qtype,
legacy.qclass,
buf,
)
.map(|n| (n, None::<respond::EmittedRecords>))
} else {
respond::write_legacy_response(
&self.records,
legacy.query_id,
&legacy.name,
legacy.qtype,
legacy.qclass,
buf,
)
.map(|(n, emitted)| (n, Some(emitted)))
};
match encoded {
Ok((n, emitted)) => {
let resp = self.pending_legacy.remove(0);
self.awaiting_confirm = match emitted {
Some(e) => Some(AwaitingConfirm::Response(e, 0)),
None => Some(AwaitingConfirm::MetaResponse),
};
return Ok(Some(Transmit::new(resp.dst, None, n)));
}
Err(_) => {
let _ = self.pending_legacy.remove(0);
}
}
}
let kind = match self.peek_pending() {
Some(k) => k,
None => return Ok(None),
};
let mut resp_emitted = respond::EmittedRecords::default();
#[cfg(feature = "stats")]
let kas_suppressed = core::cell::Cell::new(0u64);
let n = match kind {
PendingTransmitKind::Probe => {
let n = respond::write_probe(&self.records, buf).map_err(|_| {
warn!(
target: "mdns_proto::service",
handle = self.handle.raw(),
"service: poll_transmit probe BufferTooSmall"
);
TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(
buf.len(),
buf.len(),
))
})?;
debug!(
target: "mdns_proto::service",
handle = self.handle.raw(),
bytes = n,
"service: poll_transmit emitting probe"
);
n
}
PendingTransmitKind::Announcement => {
let n = respond::write_announce(&self.records, buf).map_err(|_| {
warn!(
target: "mdns_proto::service",
handle = self.handle.raw(),
"service: poll_transmit announcement BufferTooSmall"
);
TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(
buf.len(),
buf.len(),
))
})?;
debug!(
target: "mdns_proto::service",
handle = self.handle.raw(),
bytes = n,
"service: poll_transmit emitting announcement"
);
n
}
PendingTransmitKind::Response => {
let single_questioner = self.questioner_srcs.len() <= 1;
let hints = self.kas_hints;
let last_now = self.last_now;
let (encoded, emitted) =
respond::write_announce_filtered(&self.records, buf, |rtype, rdata| {
if !single_questioner {
return false;
}
let h = hash_rdata(rdata);
let now_ref = match last_now {
Some(t) => t,
None => return false,
};
let owner = match rtype {
crate::wire::ResourceType::Ptr => KasOwner::ServiceType,
crate::wire::ResourceType::Srv | crate::wire::ResourceType::Txt => KasOwner::Instance,
crate::wire::ResourceType::A | crate::wire::ResourceType::AAAA => KasOwner::Host,
_ => return false,
};
let suppressed = hints.iter().any(|slot| match slot {
Some(hint) => {
hint.owner == owner
&& hint.rtype == rtype
&& hint.rdata_hash == h
&& hint.expires_at > now_ref
}
None => false,
});
#[cfg(feature = "stats")]
if suppressed {
kas_suppressed.set(kas_suppressed.get().saturating_add(1));
}
suppressed
})
.map_err(|_| {
warn!(
target: "mdns_proto::service",
handle = self.handle.raw(),
"service: poll_transmit response BufferTooSmall"
);
TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(
buf.len(),
buf.len(),
))
})?;
resp_emitted = emitted;
debug!(
target: "mdns_proto::service",
handle = self.handle.raw(),
bytes = encoded,
"service: poll_transmit emitting response"
);
encoded
}
};
let kind = self.peek_pending();
self.pop_pending();
self.awaiting_confirm = match kind {
Some(PendingTransmitKind::Probe) => Some(AwaitingConfirm::Probe),
Some(PendingTransmitKind::Announcement) => {
Some(AwaitingConfirm::Announcement(respond::EmittedRecords::new(
true,
true,
true,
self.records.a_addrs_slice().to_vec(),
self.records.aaaa_addrs_slice().to_vec(),
!self.records.subtype_names().is_empty(),
)))
}
Some(PendingTransmitKind::Response) => {
self.kas_hints = [None; KAS_RING_SIZE];
self.questioner_srcs.clear();
if resp_emitted.is_empty() {
#[cfg(feature = "stats")]
if let Some(s) = self.stat() {
let suppressed = kas_suppressed.get();
if suppressed > 0 {
s.answers_suppressed_kas(suppressed);
}
}
return Ok(None);
}
#[cfg(feature = "stats")]
let partial_suppressed = kas_suppressed.get();
#[cfg(not(feature = "stats"))]
let partial_suppressed = 0u64;
Some(AwaitingConfirm::Response(resp_emitted, partial_suppressed))
}
None => None,
};
let _ = self.pending_tx.iter().next(); Ok(Some(Transmit::new(respond::multicast_dst(), None, n)))
}
}
#[cfg(test)]
#[cfg(all(any(feature = "alloc", feature = "std"), feature = "slab"))]
mod tests;