Skip to main content

mdns_proto/service/
mod.rs

1//! Service state machine — probing, announcing, response generation.
2
3cfg_heap! {
4  use crate::trace::*;
5
6  mod respond;
7}
8mod schedule;
9mod state;
10
11cfg_heap! {
12  use crate::backend::{RdataBuf, rdata_from_vec};
13}
14
15cfg_heap! {
16  /// Which of OUR owner names a known-answer's record name matched. §7.1
17  /// suppression is per RRset, and an RRset is identified by (name, type, class,
18  /// rdata). A known-answer with our rtype + rdata but a DIFFERENT owner name is a
19  /// DIFFERENT RRset and must NOT suppress our record — otherwise a querier could
20  /// silence our `host.local A x` by sending a same-rdata `_svc._tcp.local A x`.
21  #[derive(Debug, Clone, Copy, PartialEq, Eq)]
22  enum KasOwner {
23    /// The shared service-type name (owns the PTR).
24    ServiceType,
25    /// The service instance name (owns SRV + TXT).
26    Instance,
27    /// The host name (owns A + AAAA).
28    Host,
29  }
30
31  /// A single observed known-answer hint. KAS suppression checks records
32  /// against this list before emitting.
33  #[derive(Debug, Clone, Copy)]
34  struct KasHint<I> {
35    owner: KasOwner,
36    rtype: crate::wire::ResourceType,
37    rdata_hash: u64,
38    expires_at: I,
39  }
40
41  /// Number of KAS hints we'll remember at once (per service).
42  const KAS_RING_SIZE: usize = 16;
43
44  /// Cap on the number of distinct questioner sources tracked per
45  /// response cycle.  Mirrors `MAX_PEER_PROBES` — bursts of
46  /// queries from more than this many distinct sources within one
47  /// jitter window get the excess sources rejected (no hint storage
48  /// for them), which is conservative but bounded.
49  const MAX_QUESTIONER_SRCS: usize = 8;
50
51  /// Maximum legacy unicast responses queued per response cycle. Each
52  /// distinct legacy querier gets its own reply; beyond this cap, excess legacy
53  /// queriers in the same window are dropped (bounded against a flood).
54  const MAX_LEGACY_RESPONSES: usize = 8;
55
56  /// A pending RFC 6762 §6.7 legacy unicast response: a non-mDNS querier (source
57  /// port != 5353) gets a direct reply that echoes its query ID + question.
58  #[derive(Debug, Clone)]
59  struct LegacyResp {
60    dst: core::net::SocketAddr,
61    query_id: u16,
62    /// The matched owned name to echo in the response's question section (our
63    /// own canonical name; case-insensitively equal to the querier's qname). For
64    /// a meta reply (`is_meta`) this is the `_services._dns-sd._udp.<domain>`
65    /// meta-query name.
66    name: crate::Name,
67    qtype: crate::wire::ResourceType,
68    qclass: crate::wire::ResourceClass,
69    /// this is an RFC 6763 §9 service-type enumeration reply — emit the
70    /// shared meta-PTR (`<meta> -> service_type`) rather than the instance record
71    /// set. A legacy resolver isn't on the multicast group, so the §9 reply it
72    /// needs must go out as a unicast echo too.
73    is_meta: bool,
74  }
75
76  /// Maximum number of peer-probe records buffered per source for a single
77  /// tiebreak decision (RFC §8.2). Incoming records beyond this cap are silently
78  /// dropped.
79  const MAX_PEER_PROBE_RECORDS: usize = 16;
80
81  /// Maximum number of distinct peer sources we track per tiebreak round.
82  /// Records from sources beyond this cap are silently dropped.
83  const MAX_PEER_PROBES: usize = 8;
84
85  /// minimum interval between conflict-driven re-probes of an
86  /// Established/Announcing service (RFC 6762 §9 conflict rate-limiting). A
87  /// conflict flood cannot reset us to Probing faster than this, so a hostile
88  /// peer cannot prevent the service from ever (re)establishing.
89  const CONFLICT_REPROBE_MIN_INTERVAL: core::time::Duration = core::time::Duration::from_secs(1);
90
91  /// One record from a peer's simultaneous probe, retained for the RFC §8.2
92  /// tiebreak comparison (lexicographic comparison of proposed RR sets).
93  #[derive(Debug, Clone)]
94  struct PeerRecord {
95    rtype: crate::wire::ResourceType,
96    /// Canonical byte form of the rdata (same encoding used by KAS hashing).
97    canonical: RdataBuf,
98  }
99
100  /// A per-source bucket of probe records observed during the current probe round.
101  /// Each distinct peer source gets its own bucket so that the RFC §8.2 tiebreak
102  /// compares against each peer independently (we lose if ANY peer wins).
103  #[derive(Debug)]
104  struct PeerProbe {
105    src: core::net::SocketAddr,
106    records: std::vec::Vec<PeerRecord>,
107  }
108}
109
110cfg_heap! {
111  /// Write a DNS name in canonical wire form (length-prefixed labels, root
112  /// terminator). Used for SRV target encoding in RFC §8.2 tiebreak comparison.
113  /// This produces byte-identical output for both OUR outgoing SRV and for a
114  /// peer SRV parsed via `canonical_rdata_for_hash`, ensuring the bytewise
115  /// comparison is correct.
116  fn write_canonical_wire_name(name_str: &str, out: &mut std::vec::Vec<u8>) {
117    let trimmed = match name_str.strip_suffix('.') {
118      Some(t) => t,
119      None => name_str,
120    };
121    if trimmed.is_empty() {
122      out.push(0);
123      return;
124    }
125    for label in trimmed.split('.') {
126      if label.is_empty() {
127        continue;
128      }
129      let len = label.len().min(63);
130      #[allow(clippy::cast_possible_truncation)]
131      out.push(len as u8);
132      for &b in label.as_bytes().iter().take(63) {
133        out.push(b.to_ascii_lowercase());
134      }
135    }
136    out.push(0); // root terminator
137  }
138
139  /// FNV-1a hash of rdata bytes — used to dedupe KAS hints without storing rdata.
140  fn hash_rdata(bytes: &[u8]) -> u64 {
141    const FNV_BASIS: u64 = 0xcbf29ce484222325;
142    const FNV_PRIME: u64 = 0x100000001b3;
143    let mut h: u64 = FNV_BASIS;
144    for &b in bytes {
145      h ^= b as u64;
146      h = h.wrapping_mul(FNV_PRIME);
147    }
148    h
149  }
150}
151
152cfg_heap! {
153  #[allow(unused_imports)]
154  pub(crate) use respond::{EmittedRecords, multicast_dst, write_goodbye};
155}
156#[allow(unused_imports)]
157pub(crate) use schedule::{announce_deadline, probe_deadline, re_announce_deadline};
158pub use state::ServiceState;
159
160cfg_heap! {
161  use rand::SeedableRng;
162
163  use crate::error::{HandleTimeoutError, TransmitError};
164  use crate::event::{ServiceEvent, ServiceUpdate};
165  use crate::records::ServiceRecords;
166  use crate::transmit::Transmit;
167  use crate::{Instant, Pool, ServiceHandle};
168
169  type Rng = rand::rngs::StdRng;
170}
171
172cfg_heap! {
173  /// Build a new instance-name string by appending (or replacing) a `-N` suffix
174  /// on the first DNS label.
175  ///
176  /// `current` is the full FQDN of the instance (e.g. `"myprinter._ipp._tcp.local."`).
177  /// `attempt` is the rename counter (1, 2, …).
178  ///
179  /// For a name like `"myprinter._ipp._tcp.local."` and attempt `2` the result
180  /// is `"myprinter-2._ipp._tcp.local."`.  Any existing `-N` suffix on the
181  /// instance label is stripped first so repeated conflicts don't accumulate.
182  fn rename_with_suffix(current: &str, attempt: u32) -> std::string::String {
183    use std::string::ToString;
184    // Strip optional trailing dot so we can work with the plain label sequence.
185    let (body, trailing_dot) = match current.strip_suffix('.') {
186      Some(b) => (b, true),
187      None => (current, false),
188    };
189    // Split off the first label (the instance name) from the rest of the FQDN.
190    let (instance, rest) = match body.split_once('.') {
191      Some((i, r)) => (i, Some(r)),
192      None => (body, None),
193    };
194    // Strip any existing "-N" suffix from the instance label.
195    let base_instance = match instance.rsplit_once('-') {
196      Some((prefix, n)) if !n.is_empty() && n.chars().all(|c| c.is_ascii_digit()) => prefix,
197      _ => instance,
198    };
199    let mut out = std::string::String::new();
200    out.push_str(base_instance);
201    out.push('-');
202    out.push_str(&attempt.to_string());
203    if let Some(r) = rest {
204      out.push('.');
205      out.push_str(r);
206    }
207    if trailing_dot {
208      out.push('.');
209    }
210    out
211  }
212
213  /// RFC 6762 §8.2 tiebreak comparison.
214  ///
215  /// Returns `true` if WE should lose (i.e. we must rename): the peer's
216  /// proposed RR set is lexicographically >= ours when both sets are
217  /// sorted and concatenated in canonical form. A tie (equal sets) also
218  /// counts as a loss (§8.2.1 — "the host MUST rename itself").
219  ///
220  /// Compares against EACH peer bucket independently; returns `true` (we lose)
221  /// if ANY single peer's set is >= ours. This prevents a peer that claims a
222  /// smaller set from masking a different peer that actually wins.
223  ///
224  /// The local set is restricted to records owned by the service INSTANCE
225  /// (SRV + TXT only) per RFC §8.2, which compares records "owned by the
226  /// conflicting name". A/AAAA records are owned by the host name and are
227  /// excluded from both sides.
228  fn compare_rr_sets_we_lose(
229    our: &crate::records::ServiceRecords,
230    peer_probes: &[PeerProbe],
231  ) -> bool {
232    // Build OUR canonical RR set restricted to SRV + TXT (instance-owned records).
233    // RFC §8.2 says compare records owned by the conflicting NAME; the conflicting
234    // name is the service instance, which owns SRV and TXT — NOT A/AAAA (those
235    // are owned by the host name).
236    let mut our_set: std::vec::Vec<std::vec::Vec<u8>> = std::vec::Vec::new();
237    // SRV — priority(2 BE) + weight(2 BE) + port(2 BE) + wire-form target name.
238    // Wire form: length-octet + label bytes, repeated, terminated by 0x00.
239    {
240      let mut buf = std::vec::Vec::new();
241      buf.extend_from_slice(&crate::wire::ResourceType::Srv.to_u16().to_be_bytes());
242      buf.extend_from_slice(&our.priority().to_be_bytes());
243      buf.extend_from_slice(&our.weight().to_be_bytes());
244      buf.extend_from_slice(&our.port().to_be_bytes());
245      write_canonical_wire_name(our.host().as_str(), &mut buf);
246      our_set.push(buf);
247    }
248    // TXT — always include (matches what write_probe emits unconditionally).
249    // write_probe sends a TXT authority record even with no segments, so a
250    // peer comparing against our probe sees the TXT we sent; omitting it from our
251    // local comparison set would bias the tiebreak. An empty TXT
252    // canonicalizes (like the wire form) to the rtype prefix + a single
253    // zero-length string (one 0x00), so both sides agree byte-for-byte.
254    {
255      let mut buf = std::vec::Vec::new();
256      buf.extend_from_slice(&crate::wire::ResourceType::Txt.to_u16().to_be_bytes());
257      respond::write_canonical_txt(our.txt_segments(), &mut buf);
258      our_set.push(buf);
259    }
260    our_set.sort();
261    let our_concat: std::vec::Vec<u8> = our_set.into_iter().flatten().collect();
262
263    // For EACH peer bucket: if that peer's sorted set >= ours, we lose.
264    for probe in peer_probes {
265      let mut peer_set: std::vec::Vec<std::vec::Vec<u8>> = probe
266        .records
267        .iter()
268        .map(|p| {
269          let mut buf = std::vec::Vec::new();
270          buf.extend_from_slice(&p.rtype.to_u16().to_be_bytes());
271          buf.extend_from_slice(&p.canonical[..]);
272          buf
273        })
274        .collect();
275      peer_set.sort();
276      let peer_concat: std::vec::Vec<u8> = peer_set.into_iter().flatten().collect();
277      // We LOSE when peer_concat >= our_concat (tie = loss per §8.2.1).
278      if peer_concat >= our_concat {
279        return true;
280      }
281    }
282    false
283  }
284}
285
286cfg_heap! {
287  /// What kind of transmit is pending for a service.
288  ///
289  /// Capturing the kind at deadline-fire time (before state is advanced) ensures
290  /// `poll_transmit` encodes the correct packet type even when state has already
291  /// transitioned (e.g., Probing(2) → Announcing(0) on the final probe tick).
292  #[derive(Debug, Copy, Clone, Eq, PartialEq)]
293  enum PendingTransmitKind {
294    /// Send a probe (state was Probing(_) when the deadline fired).
295    Probe,
296    /// Send an unsolicited announcement (state was Announcing(_) or Established
297    /// firing the periodic re-announce deadline). KAS filtering is NOT applied —
298    /// RFC 6762 §7.1 known-answer suppression only applies to question responses,
299    /// not to unsolicited multicast announcements.
300    Announcement,
301    /// Send a jittered question response (the response_pending path in Established
302    /// or Announcing(_) state). KAS filtering IS applied (RFC 6762 §7.1).
303    Response,
304  }
305
306  /// The commit token stamped by `poll_transmit` and resolved by
307  /// `note_transmit_result`. Unlike [`PendingTransmitKind`] (which is
308  /// queued at deadline-fire time), this carries what was ACTUALLY encoded, so a
309  /// response that known-answer-suppression (§7.1) trimmed latches goodbye
310  /// ownership only for the concrete records it really put on the wire
311  /// (per record, not per group).
312  #[derive(Debug, Clone)]
313  enum AwaitingConfirm {
314    /// A probe is awaiting its delivery result (§8.1 sequence advance). A probe is
315    /// a QUESTION — it advertises no records, so it latches no goodbye ownership.
316    Probe,
317    /// An unsolicited announcement is awaiting confirmation (§8.3 phase advance).
318    /// Carries the concrete records it emitted (a full announcement: all of
319    /// PTR/SRV/TXT plus every host address) so a confirmed send latches exactly
320    /// those for goodbye ownership.
321    Announcement(respond::EmittedRecords),
322    /// A question/legacy response is awaiting confirmation. Carries the concrete
323    /// records actually emitted (§7.1 KAS may have trimmed any subset), so only
324    /// those latch goodbye ownership on a confirmed send.  The second field is
325    /// the count of records §7.1 KAS suppressed from THIS response (partial
326    /// suppression); it is bumped into `answers_suppressed_kas` ONLY on a
327    /// confirmed delivery so a socket failure cannot inflate the counter.
328    Response(respond::EmittedRecords, u64),
329    /// A RFC 6763 §9 service-type enumeration meta-response (multicast or legacy
330    /// unicast) is awaiting confirmation. The meta-PTR is a shared record — it
331    /// advertises no instance-owned records and is never withdrawn — so a confirmed
332    /// delivery bumps `responses_tx` WITHOUT touching goodbye ownership or any
333    /// lifecycle state.
334    MetaResponse,
335  }
336
337  /// Goodbye ownership: which CONCRETE records peers may have cached FROM US, and
338  /// therefore what a graceful goodbye (TTL=0) must withdraw. The granularity is
339  /// per record — each instance-owned record (PTR/SRV/TXT) independently, and each
340  /// host-owned address (A/AAAA) independently — matching what the endpoint's
341  /// withdrawal (built from [`Service::withdrawal_snapshot`]) withdraws (host
342  /// addresses are further filtered against sibling-retained addresses).
343  ///
344  /// INVARIANT: a record becomes "advertised" ONLY through a CONFIRMED send that
345  /// actually emitted THAT record ([`Self::record_emitted`], driven by the
346  /// encoder's per-record report via `note_transmit_result`). A send that never
347  /// reaches the link — or whose record was known-answer-suppressed (§7.1) —
348  /// advertises nothing, so a later goodbye never withdraws a record we did not
349  /// put on the wire (which could otherwise flush a peer's matching shared
350  /// record). Per-record (not per-group) granularity closes the over-withdrawal
351  /// class where §7.1 trims a subset of a group or a legacy reply emits a whole
352  /// group the per-group latch mis-attributed.
353  #[derive(Debug, Default, Clone)]
354  struct GoodbyeOwnership {
355    /// The instance PTR (service-type → instance) has been advertised. RESET on a
356    /// conflict rename (the new instance name has not been advertised).
357    ptr: bool,
358    /// The instance SRV has been advertised. Reset on rename.
359    srv: bool,
360    /// The instance TXT has been advertised. Reset on rename.
361    txt: bool,
362    /// The RFC 6763 §7.1 subtype PTRs (`<sub>._sub.<type>` → instance) have been
363    /// advertised. Instance-associated (target = instance), so RESET on rename and
364    /// withdrawn with the instance records. All-or-nothing — subtype PTRs are not
365    /// KAS-filtered, so they are always emitted together.
366    subtypes: bool,
367    /// Host A addresses advertised FROM US, tracked per address. SURVIVES a
368    /// conflict rename: the host name is invariant across instance renames, so
369    /// peers keep caching the host records.
370    a: std::vec::Vec<core::net::Ipv4Addr>,
371    /// Host AAAA addresses advertised FROM US, tracked per address. Survives rename.
372    aaaa: std::vec::Vec<core::net::Ipv6Addr>,
373  }
374
375  impl GoodbyeOwnership {
376    /// Latch the concrete records a confirmed-delivered send actually emitted — the
377    /// SOLE way ownership is gained (besides being reset to none on rename).
378    fn record_emitted(&mut self, e: &respond::EmittedRecords) {
379      self.ptr |= e.ptr();
380      self.srv |= e.srv();
381      self.txt |= e.txt();
382      self.subtypes |= e.subtypes();
383      for ip in e.a_slice() {
384        if !self.a.contains(ip) {
385          self.a.push(*ip);
386        }
387      }
388      for ip in e.aaaa_slice() {
389        if !self.aaaa.contains(ip) {
390          self.aaaa.push(*ip);
391        }
392      }
393    }
394    /// Drop INSTANCE ownership (PTR/SRV/TXT) on a conflict rename; host A/AAAA
395    /// ownership persists (the host name does not change on an instance rename).
396    #[inline]
397    fn reset_instance(&mut self) {
398      self.ptr = false;
399      self.srv = false;
400      self.txt = false;
401      self.subtypes = false;
402    }
403    /// Whether ANY instance-owned record (PTR/SRV/TXT or a subtype PTR) has been
404    /// advertised.
405    #[inline]
406    fn any_instance(&self) -> bool {
407      self.ptr || self.srv || self.txt || self.subtypes
408    }
409    /// Whether ANY host-owned address (A/AAAA) has been advertised.
410    #[inline]
411    fn any_host(&self) -> bool {
412      !self.a.is_empty() || !self.aaaa.is_empty()
413    }
414  }
415}
416
417cfg_heap! {
418  /// A point-in-time snapshot of everything the [`crate::Endpoint`] needs to re-encode
419  /// the TTL=0 goodbye for a service being withdrawn.
420  ///
421  /// Produced by [`Service::withdrawal_snapshot`] and consumed by the endpoint's
422  /// withdrawal state machine. Each resend round calls the
423  /// encoder with the same snapshot so the goodbye is idempotent over multiple
424  /// attempts (RFC 6762 §10.1 recommends at least two sends for loss resilience).
425  ///
426  /// The `#[cfg]` gate matches the goodbye code it supports — the goodbye path is
427  /// only compiled when heap allocation is available.
428  #[derive(Debug, Clone)]
429  pub struct WithdrawalSnapshot {
430    /// The service records (names, port, TXT) for this withdrawal. Carried so
431    /// the encoder can re-encode PTR/SRV/TXT at TTL=0 without a live `Service`.
432    pub records: crate::records::ServiceRecords,
433    /// Which record kinds (PTR/SRV/TXT/subtypes) this service actually put on the
434    /// wire (per-record, not per-group). Mirrors the [`GoodbyeOwnership`] latch
435    /// semantics: only records that reached a peer cache need to be withdrawn.
436    /// `pub(crate)` because `EmittedRecords` is a crate-internal type; the
437    /// endpoint (same crate) reads this directly.
438    // `allow(dead_code)`: the field is read by the endpoint withdrawal state
439    // machine; suppress the false positive here.
440    #[allow(dead_code)]
441    pub(crate) owned: respond::EmittedRecords,
442    /// Host A (IPv4) addresses this service confirmed-emitted. The endpoint will
443    /// further filter these against same-host siblings before re-encoding.
444    pub host_a: std::vec::Vec<core::net::Ipv4Addr>,
445    /// Host AAAA (IPv6) addresses this service confirmed-emitted.
446    pub host_aaaa: std::vec::Vec<core::net::Ipv6Addr>,
447  }
448}
449
450cfg_heap! {
451  /// The one-shot §9 conflict-rename goodbye handoff: the OLD instance name's
452  /// records plus the per-record ownership of what that name actually advertised.
453  ///
454  /// Produced by
455  /// [`Service::take_rename_goodbye_handoff`] the instant a conflict rename
456  /// happens, and handed straight to
457  /// [`Endpoint::enqueue_rename_withdrawal`](crate::Endpoint::enqueue_rename_withdrawal),
458  /// which turns it into an independent DETACHED withdrawal item (the renamed-away
459  /// old name's TTL=0 goodbye). It is **opaque** to the driver — both fields are
460  /// crate-internal (`EmittedRecords` is `pub(crate)`) — so a driver only ever
461  /// moves the whole value between the two calls, exactly like
462  /// [`WithdrawalSnapshot`]. A rename never withdraws host A/AAAA (the host name is
463  /// invariant), so this carries no host addresses.
464  ///
465  /// The `#[cfg]` gate matches the goodbye code it supports.
466  #[derive(Debug, Clone)]
467  pub struct RenameGoodbyeHandoff {
468    /// The OLD instance name's records (names, port, TXT), captured BEFORE the
469    /// rename mutated the instance name. `pub(crate)`: the endpoint (same crate)
470    /// reads it directly.
471    pub(crate) records: crate::records::ServiceRecords,
472    /// Which instance records (PTR/SRV/TXT/subtypes) the OLD name actually put on
473    /// the wire — only these are withdrawn (§7.1 KAS can suppress a subset). The
474    /// address lists are always empty (a rename never withdraws host A/AAAA).
475    /// `pub(crate)`: `EmittedRecords` is a crate-internal type.
476    pub(crate) owned: respond::EmittedRecords,
477  }
478}
479
480cfg_heap! {
481  /// Service state machine. One per registered service.
482  pub struct Service<I, TQ, EV> {
483  handle: ServiceHandle,
484  state: ServiceState,
485  records: ServiceRecords,
486  #[cfg(feature = "stats")]
487  stats: Option<std::sync::Arc<hick_trace::stats::Stats>>,
488  /// The next scheduled lifecycle deadline (probe, announce, re-announce).
489  /// Never modified by response scheduling — only advanced by lifecycle logic.
490  lifecycle_deadline: Option<I>,
491  /// The jittered question-response deadline, if any (RFC 6762 §6).
492  /// Independent of `lifecycle_deadline`; whichever is earlier fires first.
493  /// Set directly by `handle_event(Question)`; cleared when it fires in
494  /// `handle_timeout`. `response_deadline.is_some()` replaces the old
495  /// `response_pending` + `response_deadline_active` flags.
496  response_deadline: Option<I>,
497  probe_count: u8,
498  announce_count: u8,
499  rename_attempt: u32,
500  /// Up to 2 pending transmits (a response can ride alongside an announcement
501  /// when both deadlines fire at the same `now`).  `poll_transmit` drains one
502  /// per call in queue order, so the driver loop emits both in the same poll
503  /// cycle by calling `poll_transmit` until it returns `Ok(None)`.
504  pending_transmits: [Option<PendingTransmitKind>; 2],
505  rng: Rng,
506  pending_tx: TQ,
507  pending_updates: EV,
508  /// Most-recently-seen `now`, cached for use by `poll_transmit`'s KAS
509  /// filtering closure. Updated by both `handle_timeout` and `handle_event`
510  /// (`handle_event` now takes `now` directly).
511  last_now: Option<I>,
512  /// Ring buffer of observed known-answer hints (RFC 6762 §7.1).
513  kas_hints: [Option<KasHint<I>>; KAS_RING_SIZE],
514  /// Next slot index for writing a new KAS hint (wraps at KAS_RING_SIZE).
515  kas_next_slot: usize,
516  /// sources that have issued a Question in the current
517  /// response cycle.  KAS hints are only accepted from sources in
518  /// this set — otherwise an attacker could inject hints during a
519  /// legitimate questioner's jitter window and suppress the
520  /// response.  Cleared alongside `kas_hints` when the Response
521  /// fires.  Bounded by `MAX_QUESTIONER_SRCS`.
522  questioner_srcs: std::vec::Vec<core::net::SocketAddr>,
523  /// Per-source buckets of peer-proposed records observed during the current
524  /// probe round, buffered for RFC §8.2 tiebreak comparison on the next
525  /// `handle_timeout` call. Each entry holds records from a distinct peer source
526  /// so that the tiebreak compares against each peer independently.
527  peer_probes: std::vec::Vec<PeerProbe>,
528  /// Set when a tiebreak decision is pending on the next `handle_timeout`.
529  tiebreak_pending: bool,
530  /// Which owner groups peers may have cached from us, i.e. what a goodbye must
531  /// withdraw. The SOLE source of truth for goodbye ownership; see
532  /// [`GoodbyeOwnership`] for the invariants (confirmed-send-driven, instance
533  /// resets on rename, host persists).
534  goodbye: GoodbyeOwnership,
535  /// the commit token for the datagram `poll_transmit`
536  /// most recently produced — `Some(kind)` while that send is awaiting a
537  /// delivery result, `None` otherwise. This is the structural heart of the
538  /// confirm-on-send invariant: `poll_transmit` ONLY stamps this token and
539  /// advances no lifecycle state; ALL lifecycle progression happens in
540  /// [`Self::note_transmit_result`], keyed on the token. Because of that a send
541  /// that never reaches the link (all sockets error) advances nothing — neither
542  /// the goodbye-ownership latches (`announce_emitted` / `host_advertised`) for
543  /// an announcement, nor the probe sequence (RFC 6762 §8.1) for a probe.
544  awaiting_confirm: Option<AwaitingConfirm>,
545  /// queued legacy unicast responses (RFC 6762 §6.7) for
546  /// non-mDNS queriers (source port != 5353). Each is drained by
547  /// `poll_transmit` into its own unicast, query-shaped datagram. QU-bit
548  /// queriers (§5.4) are on the multicast group and are served by the normal
549  /// multicast response, so they do NOT go here. Bounded by
550  /// [`MAX_LEGACY_RESPONSES`].
551  pending_legacy: std::vec::Vec<LegacyResp>,
552  /// instant of the last conflict-driven revert-to-probe, used to
553  /// rate-limit RFC 6762 §9 re-probing under a conflict flood.
554  last_conflict_reprobe: Option<I>,
555  /// One-shot handoff of the OLD instance name's TTL=0 goodbye when a §9 conflict
556  /// renames an ANNOUNCED service. Set at the rename site (`handle_timeout`) with
557  /// the OLD records and WHICH instance records that name actually advertised
558  /// (`EmittedRecords` with the instance bits set, addresses empty — a rename
559  /// never withdraws host A/AAAA, the host name is invariant). The Service no
560  /// longer drains this itself: the driver takes it via
561  /// [`Self::take_rename_goodbye_handoff`] immediately after observing the
562  /// `Renamed` update and hands it to
563  /// [`crate::Endpoint::enqueue_rename_withdrawal`], which models the old-name
564  /// goodbye as an INDEPENDENT detached withdrawal item (its own per-family debt,
565  /// schedule, and loss-resilience resends). `None` when the renamed name had
566  /// never advertised an instance record (nothing for peers to evict) or after
567  /// the handoff has been taken.
568  rename_goodbye_handoff: Option<RenameGoodbyeHandoff>,
569  /// §9: jittered deadline for a pending RFC 6763 service-type
570  /// enumeration (`_services._dns-sd._udp.<domain>`) reply. Set when a meta-query
571  /// arrives; `poll_transmit` emits a standalone shared meta-PTR when it fires.
572  /// Independent of `response_deadline` — the meta reply carries no instance
573  /// records and latches no goodbye ownership, so it stays isolated from the
574  /// normal response/confirm cycle.
575  meta_response_deadline: Option<I>,
576  /// sources that issued a §9 service-type enumeration meta-query in the
577  /// current meta cycle. A meta known-answer is only honoured from a source in
578  /// this set (mirrors `questioner_srcs`), so an off-cycle peer cannot
579  /// inject a known-answer that suppresses our meta reply. Bounded by
580  /// `MAX_QUESTIONER_SRCS`; cleared when the meta reply fires or is suppressed.
581  meta_questioner_srcs: std::vec::Vec<core::net::SocketAddr>,
582  /// (RFC 6763 §9 + §7.1): set when a meta questioner's known-answer
583  /// section already carries the meta-PTR for OUR service type — our pending
584  /// meta reply is then suppressed. Reset each meta cycle.
585  meta_known_answered: bool,
586  }
587}
588
589cfg_heap! {
590impl<I, TQ, EV> Service<I, TQ, EV>
591where
592  I: Instant,
593  TQ: Pool<Transmit>,
594  EV: Pool<ServiceUpdate>,
595{
596  /// Construct a new Service.
597  ///
598  /// When `probe` is `true` (RFC 6762 §8.1, the conformant default) the service
599  /// starts in `Init` and probes for name uniqueness before announcing. When
600  /// `false` the caller asserts the name is already unique (§8.1 permits
601  /// skipping probing in that case), so the service starts directly in
602  /// `Announcing(0)` and announces without the probe sequence. A later §9
603  /// conflict still reverts it to probing to resolve the collision.
604  #[allow(dead_code)]
605  pub(crate) fn try_new(
606    handle: ServiceHandle,
607    records: ServiceRecords,
608    now: I,
609    rng_seed: [u8; 32],
610    probe: bool,
611  ) -> Self {
612    let mut rng = Rng::from_seed(rng_seed);
613    let (state, lifecycle_deadline) = if probe {
614      (ServiceState::Init, probe_deadline(now, 0, &mut rng))
615    } else {
616      (ServiceState::Announcing(0), announce_deadline(now, 0))
617    };
618    Self {
619      handle,
620      state,
621      records,
622      #[cfg(feature = "stats")]
623      stats: None,
624      lifecycle_deadline,
625      response_deadline: None,
626      probe_count: 0,
627      announce_count: 0,
628      rename_attempt: 0,
629      pending_transmits: [None, None],
630      rng,
631      pending_tx: TQ::new(),
632      pending_updates: EV::new(),
633      last_now: Some(now),
634      kas_hints: [None; KAS_RING_SIZE],
635      kas_next_slot: 0,
636      questioner_srcs: std::vec::Vec::new(),
637      peer_probes: std::vec::Vec::new(),
638      tiebreak_pending: false,
639      goodbye: GoodbyeOwnership::default(),
640      awaiting_confirm: None,
641      pending_legacy: std::vec::Vec::new(),
642      last_conflict_reprobe: None,
643      rename_goodbye_handoff: None,
644      meta_response_deadline: None,
645      meta_questioner_srcs: std::vec::Vec::new(),
646      meta_known_answered: false,
647    }
648  }
649
650  /// Attach the shared [`hick_trace::stats::Stats`] handle from the owning
651  /// [`crate::endpoint::Endpoint`]. No allocation — the Arc is cloned from the
652  /// endpoint's existing single Arc. Called immediately after construction by
653  /// `Endpoint::try_register_service` so that all per-service counters accumulate
654  /// into the endpoint-level stats. Before this is called, stats bumps are no-ops
655  /// (the field is `None`).
656  #[cfg(feature = "stats")]
657  pub(crate) fn set_stats(&mut self, stats: std::sync::Arc<hick_trace::stats::Stats>) {
658    self.stats = Some(stats);
659  }
660
661  /// Borrow the stats handle if one has been attached.
662  #[cfg(feature = "stats")]
663  #[inline]
664  fn stat(&self) -> Option<&hick_trace::stats::Stats> {
665    self.stats.as_deref()
666  }
667
668  /// Returns the handle assigned at registration.
669  #[inline(always)]
670  pub const fn handle(&self) -> ServiceHandle {
671    self.handle
672  }
673  /// Returns the current state.
674  #[inline(always)]
675  pub const fn state(&self) -> ServiceState {
676    self.state
677  }
678  /// Returns the canonical name of this service.
679  #[inline(always)]
680  pub fn name(&self) -> &crate::Name {
681    self.records.instance()
682  }
683  /// Returns the records this service advertises.
684  #[inline(always)]
685  pub const fn records(&self) -> &ServiceRecords {
686    &self.records
687  }
688
689  /// Whether this service has advertised (announced) its host A/AAAA records
690  /// and they may still be cached by peers.
691  ///
692  /// Unlike the instance-level announce state, this latch survives a conflict
693  /// rename (the host name does not change). The driver consults it to decide
694  /// whether a same-host sibling genuinely owns the shared host records: a
695  /// merely-registered (still probing / never announced) sibling has put
696  /// nothing into peer caches and so does NOT keep the withdrawing service from
697  /// retracting the host addresses, whereas a renamed-but-previously-announced
698  /// sibling DOES.
699  ///
700  /// also requires the service to actually carry host A/AAAA — an
701  /// address-less service advertises no host records and so owns none.
702  #[inline(always)]
703  pub fn advertises_host(&self) -> bool {
704    // per-address ownership is non-empty ONLY if we confirmed-emitted at
705    // least one host address, which in turn requires the service to carry one —
706    // so this subsumes the earlier explicit "has addresses" guard.
707    self.goodbye.any_host()
708  }
709
710  /// Whether this service has CONFIRMED-EMITTED at least one INSTANCE record
711  /// (PTR/SRV/TXT) on the wire — i.e. it has truly advertised its name, not merely
712  /// probed for it. Unlike [`Self::advertises_host`] this is set even for an
713  /// address-less service. Drivers gate the endpoint's cancel-on-announce reclaim
714  /// on this so a probe alone cannot cancel a renamed-away old name's goodbye
715  /// before the reclaiming service has actually announced.
716  #[inline(always)]
717  pub fn advertises_instance(&self) -> bool {
718    self.goodbye.any_instance()
719  }
720
721  /// The host IPv4 addresses this service has actually ADVERTISED (confirmed-
722  /// emitted), per address. This is the set a sibling truly owns in peer
723  /// caches — NOT [`ServiceRecords::a_addrs_slice`], which is the configured set
724  /// (a §7.1 KAS-filtered send may have emitted only a subset). The driver
725  /// builds its shared-host retention set from this so a withdrawing service
726  /// retracts only addresses no remaining service actually advertised.
727  #[inline]
728  pub fn advertised_a_addrs(&self) -> &[core::net::Ipv4Addr] {
729    &self.goodbye.a
730  }
731
732  /// The host IPv6 addresses this service has actually ADVERTISED, per address
733  /// (the AAAA counterpart of [`Self::advertised_a_addrs`]).
734  #[inline]
735  pub fn advertised_aaaa_addrs(&self) -> &[core::net::Ipv6Addr] {
736    &self.goodbye.aaaa
737  }
738
739  /// Report the delivery result of the datagram most recently produced by
740  /// [`Self::poll_transmit`] (the confirm-on-send chokepoint).
741  ///
742  /// This is the SOLE place service lifecycle state advances. `poll_transmit`
743  /// only encodes bytes and stamps a commit token (`awaiting_confirm`); the
744  /// driver then calls this with `delivered = true` when at least one socket
745  /// send succeeded (`used > 0`). Behaviour is keyed on the token:
746  ///
747  /// * **Probe, delivered** — advance the §8.1 probe sequence (next probe, or
748  ///   enter `Announcing(0)` after the third). A name is therefore claimed only
749  ///   once a probe has actually reached the link.
750  /// * **Probe, NOT delivered** — re-arm the same probe WITHOUT advancing, so a
751  ///   service whose probes never leave the host never announces (the RFC 6762
752  ///   §8.1 guarantee; the fix).
753  /// * **Announcement, delivered** — latch the goodbye-ownership guards
754  ///   (`announce_emitted` / `host_advertised`) and advance the §8.3
755  ///   announce phase, reaching `Established` after the second.
756  /// * **Announcement, NOT delivered** — re-arm without advancing; the
757  ///   announcement is retried.
758  /// * **Response / nothing pending** — no lifecycle state to advance.
759  pub fn note_transmit_result(&mut self, now: I, delivered: bool) {
760    let kind = match self.awaiting_confirm.take() {
761      Some(k) => k,
762      None => return,
763    };
764    match kind {
765      AwaitingConfirm::Probe => {
766        if let ServiceState::Probing(n) = self.state {
767          if !delivered {
768            // §8.1: the probe never reached the link — do NOT advance the
769            // sequence. Re-arm the SAME probe from post-send time so it
770            // retries, rather than the service progressing toward Announcing
771            // with nothing on the wire.
772            self.lifecycle_deadline = probe_deadline(now, n, &mut self.rng);
773          } else {
774            // Probe reached the link — count it now (confirmed delivery).
775            #[cfg(feature = "stats")]
776            if let Some(s) = self.stat() {
777              s.probes_tx(1);
778            }
779            if n >= 2 {
780              // Third probe confirmed (§8.1: exactly three) → begin announcing.
781              self.state = ServiceState::Announcing(0);
782              self.probe_count = 3;
783              self.lifecycle_deadline = announce_deadline(now, 0);
784            } else {
785              // Probe confirmed → schedule the next one PROBE_INTERVAL later.
786              let new_n = n.saturating_add(1);
787              self.state = ServiceState::Probing(new_n);
788              self.probe_count = new_n;
789              self.lifecycle_deadline = probe_deadline(now, new_n, &mut self.rng);
790            }
791          }
792        }
793      }
794      AwaitingConfirm::Announcement(emitted) => {
795        if !delivered {
796          // The announcement never reached the link — re-arm without advancing.
797          // Retry at the §8.3 inter-announce interval, anchored to
798          // post-send time. This MUST also cover the periodic
799          // `Established` re-announce — otherwise a single transient send failure
800          // leaves the next attempt a full re-announce interval (~80% of TTL)
801          // away, during which peers expire the records and the service silently
802          // disappears. A short 1 s retry keeps the records alive.
803          if matches!(
804            self.state,
805            ServiceState::Announcing(_) | ServiceState::Established
806          ) {
807            self.lifecycle_deadline = announce_deadline(now, 1);
808          }
809          return;
810        }
811        // Confirmed announcement → count it, then latch goodbye ownership for
812        // the records it carried (peers can only have cached our records once a
813        // send truly reached the link). Driven by the encoder's per-record
814        // report, same as a response: a full announcement emits all of
815        // PTR/SRV/TXT plus every host address.
816        #[cfg(feature = "stats")]
817        if let Some(s) = self.stat() {
818          s.announcements_tx(1);
819        }
820        self.goodbye.record_emitted(&emitted);
821        if let ServiceState::Announcing(n) = self.state {
822          if n >= 1 {
823            // Second announcement confirmed → the §8.3 startup sequence is
824            // complete: become Established and notify the caller exactly once.
825            self.state = ServiceState::Established;
826            self.announce_count = 2;
827            let _ = self.pending_updates.insert(ServiceUpdate::Established);
828            self.lifecycle_deadline = re_announce_deadline(now, self.records.ttl_secs());
829            debug!(
830              target: "mdns_proto::service",
831              handle = self.handle.raw(),
832              "service: Announcing → Established"
833            );
834            #[cfg(feature = "stats")]
835            if let Some(s) = self.stat() {
836              s.services_established(1);
837            }
838          } else {
839            // First announcement confirmed → schedule the second one (§8.3: ≥1 s
840            // later).
841            let new_n = n.saturating_add(1);
842            self.state = ServiceState::Announcing(new_n);
843            self.announce_count = new_n;
844            self.lifecycle_deadline = announce_deadline(now, new_n);
845            debug!(
846              target: "mdns_proto::service",
847              handle = self.handle.raw(),
848              announce_n = new_n,
849              "service: Announcing — first announcement confirmed, scheduling second"
850            );
851          }
852        }
853      }
854      AwaitingConfirm::Response(emitted, _kas_suppressed_count) => {
855        #[cfg(feature = "stats")]
856        let kas_suppressed_count = _kas_suppressed_count;
857        // a DELIVERED response (multicast question reply or §6.7
858        // legacy unicast reply) put our positive-TTL records on the wire, so
859        // peers may now cache them — even before the first §8.3 announcement is
860        // confirmed (a query can arrive during `Announcing(0)`). Latch the
861        // goodbye-ownership guards so a later unregister/conflict actually
862        // withdraws those records.
863        //
864        // latch ONLY the concrete records this response actually
865        // emitted. Known-answer suppression (§7.1) can trim any subset — down to
866        // individual PTR/SRV/TXT and individual addresses — so latching a whole
867        // group would let a later TTL=0 goodbye withdraw records this service
868        // never put on the wire, potentially cache-flushing a peer's matching
869        // shared record. NOT a lifecycle PHASE change.
870        //
871        // answers_suppressed_kas (partial suppression) is also deferred here:
872        // a socket failure must not inflate the suppression counter — the
873        // records were encoded but never left the host, so from the network's
874        // perspective they were NOT suppressed.
875        if delivered {
876          #[cfg(feature = "stats")]
877          if let Some(s) = self.stat() {
878            s.responses_tx(1);
879            if kas_suppressed_count > 0 {
880              s.answers_suppressed_kas(kas_suppressed_count);
881            }
882          }
883          self.goodbye.record_emitted(&emitted);
884        }
885      }
886      AwaitingConfirm::MetaResponse => {
887        // A §9 meta-response (multicast or legacy) put a shared meta-PTR on the
888        // wire.  No instance-owned records were emitted, so goodbye ownership is
889        // NOT touched.  On a confirmed delivery bump responses_tx so the
890        // *_tx counters reflect every datagram that left the host.
891        if delivered {
892          #[cfg(feature = "stats")]
893          if let Some(s) = self.stat() {
894            s.responses_tx(1);
895          }
896        }
897      }
898    }
899  }
900
901  /// Convenience wrapper for a CONFIRMED delivery — equivalent to
902  /// `note_transmit_result(now, true)`. Retained so call sites (and tests) that
903  /// always represent a successful send stay terse; all advancement logic lives
904  /// in [`Self::note_transmit_result`].
905  #[inline]
906  pub fn note_transmit_delivered(&mut self, now: I) {
907    self.note_transmit_result(now, true);
908  }
909
910  /// Capture everything the endpoint needs to re-encode a TTL=0 goodbye for
911  /// this service without holding the [`Service`] alive.
912  ///
913  /// **Always captures the CURRENT confirmed-emitted state:** the current
914  /// `ServiceRecords`, which instance record kinds (PTR/SRV/TXT/subtypes) were
915  /// actually put on the wire, and which host A/AAAA addresses were
916  /// confirmed-emitted. The endpoint further filters host addresses against
917  /// same-host siblings before encoding the actual goodbye datagram.
918  ///
919  /// The OLD instance name of an in-flight §9 conflict rename is NOT carried
920  /// here. A rename now hands its old-name goodbye off via
921  /// [`Self::take_rename_goodbye_handoff`] the instant it happens, and the driver
922  /// enqueues it as an INDEPENDENT detached withdrawal item
923  /// ([`crate::Endpoint::enqueue_rename_withdrawal`]). A teardown during that
924  /// window is therefore simply two independent items — the detached old-name
925  /// item already enqueued, plus the route-attached current-name item this
926  /// snapshot produces — with no `snapshot.rename` inheritance.
927  pub fn withdrawal_snapshot(&mut self) -> WithdrawalSnapshot {
928    // Snapshot the CURRENT goodbye-ownership latch (the live name's records).
929    // After a rename the current name is the freshly re-announced one, and its
930    // confirmed instance + host records still need withdrawing; the OLD name is
931    // handled separately as its own detached item.
932    let owned = respond::EmittedRecords::new(
933      self.goodbye.ptr,
934      self.goodbye.srv,
935      self.goodbye.txt,
936      std::vec::Vec::new(), // addresses are passed separately below
937      std::vec::Vec::new(),
938      self.goodbye.subtypes,
939    );
940    WithdrawalSnapshot {
941      records: self.records.clone(),
942      owned,
943      host_a: self.goodbye.a.clone(),
944      host_aaaa: self.goodbye.aaaa.clone(),
945    }
946  }
947
948  /// Take the one-shot §9 rename goodbye handoff, if a conflict rename installed
949  /// one (the OLD instance name advertised ≥1 instance record and so still needs
950  /// a TTL=0 withdrawal so peers evict it).
951  ///
952  /// Returns the OLD name's `ServiceRecords` plus the per-record ownership
953  /// (`EmittedRecords` with the instance bits the old name actually put on the
954  /// wire; host A/AAAA empty — a rename never withdraws host addresses). The
955  /// driver MUST call this immediately after observing
956  /// [`ServiceUpdate::Renamed`](crate::event::ServiceUpdate) from [`Self::poll`]
957  /// and hand the result to [`crate::Endpoint::enqueue_rename_withdrawal`], which
958  /// models the old-name goodbye as an independent detached withdrawal item. The
959  /// field is consumed (`.take()`) so the handoff happens exactly once. Returns
960  /// `None` when the renamed name had never advertised an instance record.
961  pub fn take_rename_goodbye_handoff(&mut self) -> Option<RenameGoodbyeHandoff> {
962    self.rename_goodbye_handoff.take()
963  }
964
965  /// Returns the next deadline at which `handle_timeout` should be called.
966  ///
967  /// This is the minimum of `lifecycle_deadline` and `response_deadline`
968  /// (either or both may be `None`). The caller should drive `handle_timeout`
969  /// when this instant is reached.
970  pub fn poll_timeout(&self) -> Option<I> {
971    // a queued legacy unicast response is due immediately (no jitter).
972    if !self.pending_legacy.is_empty() {
973      return self.last_now;
974    }
975    // Earliest of: lifecycle, response, and the meta-response deadline. The §9
976    // rename goodbye is no longer drained by the Service — it is handed off to
977    // the endpoint as a detached withdrawal item — so it contributes no wakeup
978    // here.
979    let mut best: Option<I> = None;
980    for d in [
981      self.lifecycle_deadline,
982      self.response_deadline,
983      self.meta_response_deadline,
984    ]
985    .into_iter()
986    .flatten()
987    {
988      best = Some(match best {
989        Some(b) if b <= d => b,
990        _ => d,
991      });
992    }
993    best
994  }
995
996  /// Push a transmit kind onto the tail of the FIFO queue.
997  ///
998  /// Invariant: the queue is left-packed — slot 0 is always `Some` whenever
999  /// the queue is non-empty, and slot 1 is `Some` only if slot 0 is.  This
1000  /// makes `peek_pending` a cheap slot-0 read and keeps FIFO order across
1001  /// pop / push interleavings.
1002  ///
1003  /// If both slots are already occupied the entry is silently dropped.  Under
1004  /// normal scheduling at most one lifecycle event + one response are queued
1005  /// per tick, so overflow should not occur.
1006  fn push_pending(&mut self, kind: PendingTransmitKind) {
1007    if self.pending_transmits[0].is_none() {
1008      self.pending_transmits[0] = Some(kind);
1009    } else if self.pending_transmits[1].is_none() {
1010      self.pending_transmits[1] = Some(kind);
1011    }
1012    // Both slots full — drop.
1013  }
1014
1015  /// Pop the head of the FIFO queue, compacting the tail down.
1016  ///
1017  /// a previous implementation cleared whichever slot held the head
1018  /// (leaving a hole at index 0 when the head was popped from there), then
1019  /// `push_pending` re-filled that hole with a NEWER item — overtaking the
1020  /// older item still parked in slot 1.  Compacting on pop preserves true
1021  /// FIFO order: shift slot 1 down to slot 0 every time we drain slot 0.
1022  fn pop_pending(&mut self) -> Option<PendingTransmitKind> {
1023    let head = self.pending_transmits[0].take();
1024    if head.is_some() {
1025      // Shift the tail (slot 1) into the head position so the queue stays
1026      // left-packed.  If slot 1 was None this is a no-op.
1027      self.pending_transmits[0] = self.pending_transmits[1].take();
1028    }
1029    head
1030  }
1031
1032  /// Peek at the head of the FIFO queue without removing it.
1033  ///
1034  /// Relies on the left-packed invariant maintained by `push_pending` and
1035  /// `pop_pending`: if anything is queued, it is in slot 0.
1036  fn peek_pending(&self) -> Option<PendingTransmitKind> {
1037    self.pending_transmits[0]
1038  }
1039
1040  /// Drain a pending app-level update, if any.
1041  pub fn poll(&mut self) -> Option<ServiceUpdate> {
1042    let entry = self.pending_updates.iter().next().map(|(k, _)| k)?;
1043    let upd = self.pending_updates.try_remove(entry);
1044    if upd.is_some() {
1045      debug!(
1046        target: "mdns_proto::service",
1047        handle = self.handle.raw(),
1048        update = ?upd,
1049        "Service::poll emitted update"
1050      );
1051    }
1052    upd
1053  }
1054
1055  /// OUR canonical rdata for `rtype`, in the SAME byte format
1056  /// `respond::canonical_rdata_for_hash` produces for a peer record, so a §9
1057  /// conflict check can tell identical (consistent) rdata from a real conflict.
1058  /// SRV → priority+weight+port (BE) + lowercased wire-form host; TXT →
1059  /// length-prefixed segments. Other types → empty (never matched as conflicts).
1060  fn our_canonical_record_for(&self, rtype: crate::wire::ResourceType) -> std::vec::Vec<u8> {
1061    let mut out = std::vec::Vec::new();
1062    match rtype {
1063      crate::wire::ResourceType::Srv => {
1064        out.extend_from_slice(&self.records.priority().to_be_bytes());
1065        out.extend_from_slice(&self.records.weight().to_be_bytes());
1066        out.extend_from_slice(&self.records.port().to_be_bytes());
1067        write_canonical_wire_name(self.records.host().as_str(), &mut out);
1068      }
1069      crate::wire::ResourceType::Txt => {
1070        // empty TXT → single zero-length string (one 0x00), matching
1071        // both our wire form and a peer's compliant empty TXT canonicalization.
1072        respond::write_canonical_txt(self.records.txt_segments(), &mut out);
1073      }
1074      _ => {}
1075    }
1076    out
1077  }
1078
1079  /// clear pending response-CYCLE state — queued legacy unicast
1080  /// replies and the KAS-hint / questioner-source suppression set. Called when a
1081  /// response cycle is cancelled: on a §9 revert-to-probe (we must NOT answer
1082  /// for a name we are re-verifying — `pending_legacy` is drained by
1083  /// `poll_transmit` before any state check) and on a conflict rename. Does NOT
1084  /// touch `announce_emitted` — see [`Self::reset_advertised_name_state`].
1085  fn clear_response_cycle_state(&mut self) {
1086    self.pending_legacy.clear();
1087    self.kas_hints = [None; KAS_RING_SIZE];
1088    self.kas_next_slot = 0;
1089    self.questioner_srcs.clear();
1090    // §9: a pending meta-query reply belongs to the response cycle of the
1091    // old (Established) name — drop it on a revert-to-probe / rename so we don't
1092    // answer the meta-query while not authoritative.
1093    self.meta_response_deadline = None;
1094    self.meta_questioner_srcs.clear();
1095    self.meta_known_answered = false;
1096  }
1097
1098  /// clear all per-advertised-name generation state on a conflict-
1099  /// driven RENAME. The NEW instance name has not been announced, so the
1100  /// instance goodbye must not fire for it (host ownership persists — the host
1101  /// name is unchanged); the response-cycle state tied to the OLD
1102  /// name must not carry over either.
1103  fn reset_advertised_name_state(&mut self) {
1104    self.goodbye.reset_instance();
1105    self.clear_response_cycle_state();
1106  }
1107
1108  /// whether `record` (an A/AAAA owned by our host name) carries an
1109  /// address WE advertise — CONSISTENT rdata (our own multicast echo, or another
1110  /// instance correctly sharing the host), not a §9 conflict.
1111  ///
1112  /// a LINK-LOCAL address (IPv4 169.254/16, IPv6 fe80::/10) is scoped
1113  /// to a single interface, so the same raw address on a DIFFERENT interface is
1114  /// a genuine conflict. `HostConflict` carries no receive interface to
1115  /// disambiguate, so we do NOT suppress link-local matches — we surface them.
1116  /// (Our own echo is already filtered upstream by self-loopback detection, so
1117  /// surfacing a link-local match never re-reports our own packet.) A different
1118  /// address, or malformed/unparseable rdata, is also treated as a conflict.
1119  fn host_record_is_ours(&self, record: &crate::wire::Ref<'_>) -> bool {
1120    match record.rdata_view() {
1121      Ok(crate::wire::Rdata::A(a)) => {
1122        let addr = a.addr();
1123        !addr.is_link_local() && self.records.a_addrs_slice().contains(&addr)
1124      }
1125      Ok(crate::wire::Rdata::AAAA(a)) => {
1126        let addr = a.addr();
1127        let link_local = (addr.segments()[0] & 0xffc0) == 0xfe80;
1128        !link_local && self.records.aaaa_addrs_slice().contains(&addr)
1129      }
1130      _ => false,
1131    }
1132  }
1133
1134  /// Process an event routed to this service by the Endpoint.
1135  ///
1136  /// `now` is the current time; it is cached so that `handle_event` can
1137  /// compute KAS-hint expiration times and schedule the jittered response
1138  /// deadline without needing `handle_timeout` to have fired first.
1139  pub fn handle_event(&mut self, event: ServiceEvent<'_>, now: I) {
1140    #[cfg(feature = "tracing")]
1141    let _span = hick_trace::trace_span!("service", handle = self.handle.raw()).entered();
1142    // always refresh last_now so that subsequent calls (e.g.
1143    // Question→response_deadline, KnownAnswer→expiry) use a current reference
1144    // even when handle_timeout has not recently fired.
1145    self.last_now = Some(now);
1146    trace!(
1147      target: "mdns_proto::service",
1148      handle = self.handle.raw(),
1149      state = ?self.state,
1150      event = ?core::mem::discriminant(&event),
1151      "service: handle_event"
1152    );
1153    match (self.state, event) {
1154      (ServiceState::Probing(_) | ServiceState::Init, ServiceEvent::ProbeConflict(pc)) => {
1155        // RFC 6762 §8.2 SIMULTANEOUS-PROBE tiebreak: don't rename immediately.
1156        // Buffer the peer's proposed record into a per-source bucket so the next
1157        // handle_timeout can compare per-peer and rename only if any peer wins.
1158        // (Post-establishment §9 conflicts use a SEPARATE arm below — the
1159        // lexicographic tiebreak is wrong for §9.)
1160
1161        // Only SRV and TXT records are owned by the conflicting instance
1162        // name and contribute to the RFC §8.2 tiebreak. NSEC, A, AAAA, Unknown
1163        // etc. are owned by different names or carry no tiebreak semantics.
1164        // Drop anything that isn't SRV or TXT silently.
1165        if !matches!(
1166          pc.record().rtype(),
1167          crate::wire::ResourceType::Srv | crate::wire::ResourceType::Txt
1168        ) {
1169          return;
1170        }
1171
1172        // Canonicalize FIRST. Only create/find a bucket on success.
1173        // This prevents malformed records from consuming a peer-probe slot and
1174        // exhausting the MAX_PEER_PROBES cap before any legitimate record lands.
1175        let view = match pc.record().rdata_view() {
1176          Ok(v) => v,
1177          Err(_) => return, // malformed rdata — drop without touching buckets
1178        };
1179        let mut scratch = std::vec::Vec::new();
1180        let canonical = match respond::canonical_rdata_for_hash(&view, &mut scratch) {
1181          Ok(c) => rdata_from_vec(c.to_vec()),
1182          Err(_) => return, // canonicalization error — drop without touching buckets
1183        };
1184        let rtype = pc.record().rtype();
1185
1186        let src = pc.src();
1187        // Find existing bucket for this source, or create a new one.
1188        let bucket_idx = self.peer_probes.iter().position(|b| b.src == src);
1189        let bucket_idx = match bucket_idx {
1190          Some(i) => i,
1191          None => {
1192            // No existing bucket; only create if under the cap.
1193            if self.peer_probes.len() >= MAX_PEER_PROBES {
1194              return; // too many peer sources — drop
1195            }
1196            self.peer_probes.push(PeerProbe {
1197              src,
1198              records: std::vec::Vec::new(),
1199            });
1200            self.peer_probes.len().saturating_sub(1)
1201          }
1202        };
1203        let bucket = match self.peer_probes.get_mut(bucket_idx) {
1204          Some(b) => b,
1205          None => return,
1206        };
1207        if bucket.records.len() >= MAX_PEER_PROBE_RECORDS {
1208          return; // bucket full — drop
1209        }
1210        bucket.records.push(PeerRecord { rtype, canonical });
1211        self.tiebreak_pending = true;
1212      }
1213      (
1214        ServiceState::Announcing(_) | ServiceState::Established,
1215        ServiceEvent::ProbeConflict(pc),
1216      ) => {
1217        // RFC 6762 §9 post-establishment conflict — NOT the §8.2
1218        // lexicographic probe tiebreak. A §9 conflict is the same name/type/
1219        // class with DIFFERENT rdata; an identical record is consistent and
1220        // MUST be ignored (otherwise a benign duplicate / our own echo would
1221        // force a healthy service to rename). A genuine conflict triggers
1222        // re-verification: revert to Probing, which re-announces the name on
1223        // success (active defense) and renames via the §8.2 tiebreak only if
1224        // the conflict persists during re-probe.
1225        if !matches!(
1226          pc.record().rtype(),
1227          crate::wire::ResourceType::Srv | crate::wire::ResourceType::Txt
1228        ) {
1229          return;
1230        }
1231        let view = match pc.record().rdata_view() {
1232          Ok(v) => v,
1233          Err(_) => return,
1234        };
1235        let mut scratch = std::vec::Vec::new();
1236        let peer_canonical = match respond::canonical_rdata_for_hash(&view, &mut scratch) {
1237          Ok(c) => c,
1238          Err(_) => return,
1239        };
1240        // Identical rdata → consistent, not a conflict (§9). Ignore.
1241        if peer_canonical
1242          == self
1243            .our_canonical_record_for(pc.record().rtype())
1244            .as_slice()
1245        {
1246          return;
1247        }
1248        // Rate-limit (§9): don't thrash on a conflict flood — if we reverted to
1249        // re-probe within the last interval, ignore further conflicts. (Once we
1250        // are back in Probing, subsequent conflicts route through the §8.2 arm.)
1251        if let Some(last) = self.last_conflict_reprobe
1252          && let Some(elapsed) = now.checked_duration_since(last)
1253          && elapsed < CONFLICT_REPROBE_MIN_INTERVAL
1254        {
1255          return;
1256        }
1257        // Genuine §9 conflict: revert to Probing to re-verify the SAME name
1258        // (do NOT rename yet — peers still hold our records, so `announce_emitted`
1259        // stays set for goodbye-on-unregister). But we MUST stop
1260        // serving the name while it is unverified — clear the cancelled response
1261        // cycle (queued legacy replies drained before any state check, plus KAS
1262        // / questioner suppression state) so the re-probe window doesn't answer
1263        // the very name we reverted to re-verify.
1264        warn!(
1265          target: "mdns_proto::service",
1266          handle = self.handle.raw(),
1267          state = ?self.state,
1268          rtype = ?pc.record().rtype(),
1269          "service: ProbeConflict (§9 post-establishment) — reverting to probe"
1270        );
1271        #[cfg(feature = "stats")]
1272        if let Some(s) = self.stat() {
1273          s.conflicts(1);
1274        }
1275        self.last_conflict_reprobe = Some(now);
1276        self.state = ServiceState::Init;
1277        self.probe_count = 0;
1278        self.announce_count = 0;
1279        self.pending_transmits = [None, None];
1280        self.response_deadline = None;
1281        self.clear_response_cycle_state();
1282        self.lifecycle_deadline = probe_deadline(now, 0, &mut self.rng);
1283      }
1284      (ServiceState::Established | ServiceState::Announcing(_), ServiceEvent::Question(sq)) => {
1285        let src = sq.src();
1286        // RFC 6763 §9 service-type enumeration meta-query: reply with a shared
1287        // PTR `_services._dns-sd._udp.<domain>. -> <service_type>`. The reply
1288        // advertises no instance records and latches no goodbye ownership, so it
1289        // is fully independent of the normal response cycle below (§9).
1290        // A 5353 querier is on the multicast group → schedule a jittered
1291        // MULTICAST reply; a legacy (non-5353) resolver is NOT on the group, so
1292        // it gets a UNICAST meta echo instead.
1293        if crate::endpoint::is_meta_query_name(sq.question().qname()) {
1294          if src.port() != crate::constants::MDNS_PORT {
1295            if self.pending_legacy.len() < MAX_LEGACY_RESPONSES
1296              && let Ok(meta) = crate::Name::try_from_str(crate::endpoint::DNS_SD_META_QUERY_NAME)
1297            {
1298              let query_id = sq.query_id();
1299              let qtype = sq.question().qtype();
1300              let qclass = sq.question().qclass();
1301              let dup = self
1302                .pending_legacy
1303                .iter()
1304                .any(|l| l.dst == src && l.query_id == query_id && l.is_meta);
1305              if !dup {
1306                self.pending_legacy.push(LegacyResp {
1307                  dst: src,
1308                  query_id,
1309                  name: meta,
1310                  qtype,
1311                  qclass,
1312                  is_meta: true,
1313                });
1314              }
1315            }
1316          } else {
1317            use rand_core::Rng as _;
1318            // record this meta questioner so a later meta known-answer
1319            // from the SAME source can suppress our reply (§9 + §7.1). Mirrors
1320            // the normal cycle's `questioner_srcs` gate.
1321            if !self.meta_questioner_srcs.contains(&src)
1322              && self.meta_questioner_srcs.len() < MAX_QUESTIONER_SRCS
1323            {
1324              self.meta_questioner_srcs.push(src);
1325            }
1326            // RFC 6762 §7.2: a TC-bit meta-query is also spreading its known
1327            // answers across packets (a large service-type enumeration can carry
1328            // many known PTRs), so delay 400–500 ms instead of 20–120 ms.
1329            let jitter_ms = if sq.truncated() {
1330              400u32.saturating_add(self.rng.next_u32() % 101) // [400, 500]
1331            } else {
1332              20u32.saturating_add(self.rng.next_u32() % 101) // [20, 120]
1333            };
1334            if let Some(due) =
1335              now.checked_add_duration(core::time::Duration::from_millis(u64::from(jitter_ms)))
1336            {
1337              self.meta_response_deadline = Some(match self.meta_response_deadline {
1338                Some(existing) if existing <= due => existing,
1339                _ => due,
1340              });
1341            }
1342          }
1343          return;
1344        }
1345        // RFC 6762 §6.7 legacy unicast. A querier whose source port
1346        // is not 5353 is a non-mDNS resolver — NOT joined to the multicast
1347        // group, so a multicast response never reaches it. Queue a direct,
1348        // query-shaped unicast reply (echoing its query ID + question) drained
1349        // by `poll_transmit`. This is independent of the multicast response
1350        // cycle below, and one entry per distinct querier.
1351        if src.port() != crate::constants::MDNS_PORT {
1352          if self.pending_legacy.len() < MAX_LEGACY_RESPONSES {
1353            let qname = sq.question().qname();
1354            // Echo our matching name: case-insensitively equal to the
1355            // querier's qname, but byte-correct since it is our own
1356            // validated Name (avoids lossy NameRef→Name reconstruction).
1357            let echo = if crate::endpoint::names_match(self.records.service_type(), qname) {
1358              Some(self.records.service_type().clone())
1359            } else if crate::endpoint::names_match(self.records.instance(), qname) {
1360              Some(self.records.instance().clone())
1361            } else if crate::endpoint::names_match(self.records.host(), qname) {
1362              Some(self.records.host().clone())
1363            } else {
1364              // a legacy subtype browse (`<sub>._sub.<type>`). Echo the
1365              // matched subtype name — write_legacy_response emits the subtype
1366              // PTR as part of the full record set, so the resolver gets it.
1367              self
1368                .records
1369                .subtype_names()
1370                .iter()
1371                .find(|s| crate::endpoint::names_match(s, qname))
1372                .cloned()
1373            };
1374            if let Some(name) = echo {
1375              let qtype = sq.question().qtype();
1376              let qclass = sq.question().qclass();
1377              let query_id = sq.query_id();
1378              // dedup on the FULL request key, not just `dst` — a
1379              // resolver reuses one socket for distinct transactions (A+AAAA,
1380              // different query IDs), and each must get its own ID-echoing
1381              // reply. Only a verbatim duplicate (e.g. a retransmit) coalesces.
1382              let dup = self.pending_legacy.iter().any(|l| {
1383                l.dst == src
1384                  && l.query_id == query_id
1385                  && l.qtype == qtype
1386                  && l.qclass == qclass
1387                  && l.name == name
1388              });
1389              if !dup {
1390                self.pending_legacy.push(LegacyResp {
1391                  dst: src,
1392                  query_id,
1393                  name,
1394                  qtype,
1395                  qclass,
1396                  is_meta: false,
1397                });
1398              }
1399            }
1400          }
1401          return;
1402        }
1403
1404        // Item 2: schedule a jittered MULTICAST response (RFC 6762 §6 — 20–120
1405        // ms for shared records). QU-bit queriers (§5.4) are group members, so
1406        // this multicast reply serves them too. The deadline uses `now` so it
1407        // stays independent of the lifecycle deadline, and multiple
1408        // questions in the window coalesce onto the earliest deadline.
1409        //
1410        // RFC 6762 §7.2 (multipacket known-answer suppression): a query with the
1411        // TC bit set means the querier is spreading its known-answer list across
1412        // multiple packets. Delay 400–500 ms instead of 20–120 ms so the
1413        // follow-up known-answer packets (routed as `KnownAnswer` hints from the
1414        // same source) arrive and accumulate before we decide what to suppress.
1415        use rand_core::Rng as _;
1416        let jitter_ms = if sq.truncated() {
1417          400u32.saturating_add(self.rng.next_u32() % 101) // [400, 500]
1418        } else {
1419          20u32.saturating_add(self.rng.next_u32() % 101) // [20, 120]
1420        };
1421        let wait = core::time::Duration::from_millis(u64::from(jitter_ms));
1422        let new_rd = match now.checked_add_duration(wait) {
1423          Some(t) => t,
1424          None => return,
1425        };
1426        self.response_deadline = Some(match self.response_deadline {
1427          Some(existing) if existing <= new_rd => existing,
1428          _ => new_rd,
1429        });
1430        // record the questioner's source so KAS hints from this same
1431        // source can be accepted in the current response cycle (bounded).
1432        if !self.questioner_srcs.contains(&src) && self.questioner_srcs.len() < MAX_QUESTIONER_SRCS
1433        {
1434          self.questioner_srcs.push(src);
1435        }
1436      }
1437      (_, ServiceEvent::KnownAnswer(ka)) => {
1438        // (RFC 6763 §9 + §7.1): a known-answer whose OWNER is the DNS-SD
1439        // service-type enumeration meta name can only ever suppress our meta
1440        // reply — never one of our normal RRsets — so handle it here and return.
1441        // Suppress only when our meta reply is pending, the source is a meta
1442        // questioner from this cycle (questioner-source gate), the record is an IN
1443        // PTR above the §7.1 half-TTL threshold, and its target is OUR service
1444        // type. A meta-owned record that fails any check suppresses nothing.
1445        if crate::endpoint::is_meta_query_name(ka.record().name()) {
1446          if self.meta_response_deadline.is_some()
1447            && self.meta_questioner_srcs.contains(&ka.src())
1448            && ka.record().rclass().is_in()
1449            && ka.record().rtype() == crate::wire::ResourceType::Ptr
1450            && ka.record().ttl().saturating_mul(2) >= self.records.ttl_secs()
1451            && let Ok(crate::wire::Rdata::Ptr(p)) = ka.record().rdata_view()
1452            && crate::endpoint::names_match(self.records.service_type(), p.target())
1453          {
1454            self.meta_known_answered = true;
1455          }
1456          return;
1457        }
1458        // KAS hints are tied to the response cycle initiated by
1459        // a Question.  RFC 6762 §7.1 specifies known-answer suppression
1460        // as a per-query mechanism: the hint applies to the response
1461        // we are about to send for THIS query.  Without that scope, a
1462        // hostile peer could pre-seed long-TTL hints that suppress
1463        // responses to UNRELATED future queriers.
1464        //
1465        // tighten the gate further by also requiring the
1466        // hint's source to be one that issued a Question in the
1467        // current response cycle.  Without this, an attacker could
1468        // wait for a legitimate Question to schedule
1469        // response_deadline and then inject hints from a different
1470        // source during the jitter window, suppressing the response
1471        // to the legitimate questioner.  The hints from an attacker
1472        // who never asked a question are now silently dropped.
1473        if self.response_deadline.is_none() {
1474          return;
1475        }
1476        if !self.questioner_srcs.contains(&ka.src()) {
1477          return;
1478        }
1479        // class is part of RRset identity. We only ever emit CLASS=IN
1480        // records, so a known-answer in a different class (e.g. CLASS=ANY or
1481        // CHAOS) is NOT the same RRset and MUST NOT suppress our IN response —
1482        // otherwise a querier could send a matching-rdata wrong-class answer to
1483        // silence us (§7.1). `rclass()` already strips the cache-flush bit.
1484        if !ka.record().rclass().is_in() {
1485          return;
1486        }
1487
1488        // Item 5: store the KAS hint with expiration based on the record's TTL.
1489        // `now` is available directly (parameter).
1490        let last_now = now;
1491
1492        // RFC 6762 §7.1 half-TTL rule: a known-answer MUST NOT suppress our
1493        // record if the querier's remaining TTL is less than half of our
1494        // authoritative TTL — their cache is about to expire, so suppressing
1495        // would force them to re-query before we re-announce.
1496        let querier_ttl = ka.record().ttl();
1497        let our_ttl = self.records.ttl_secs();
1498        if querier_ttl.saturating_mul(2) < our_ttl {
1499          // Querier's record is below the half-TTL threshold — don't suppress.
1500          return;
1501        }
1502
1503        let ttl = core::time::Duration::from_secs(u64::from(ka.record().ttl()));
1504        let expires_at = match last_now.checked_add_duration(ttl) {
1505          Some(t) => t,
1506          None => return,
1507        };
1508        // Use canonical rdata bytes so the hash matches what write_announce_filtered
1509        // produces, regardless of wire-level name compression in the incoming packet.
1510        // Drop the hint on any parse error rather than storing an incorrect hash.
1511        let view = match ka.record().rdata_view() {
1512          Ok(v) => v,
1513          Err(_) => return, // malformed rdata — drop the hint
1514        };
1515        let mut scratch = std::vec::Vec::new();
1516        let canonical = match respond::canonical_rdata_for_hash(&view, &mut scratch) {
1517          Ok(c) => c,
1518          Err(_) => return, // canonicalization error (e.g. pointer cycle) — drop the hint
1519        };
1520        let rdata_hash = hash_rdata(canonical);
1521        // a known-answer may only suppress an RRset WE own, so bind the
1522        // hint to which of our owner names its record name matches. A KA whose
1523        // name is none of ours suppresses nothing (dropped here); one whose name
1524        // matches but under the wrong type (e.g. `_svc._tcp.local A x`) is kept
1525        // but will never match a candidate, because the filter pairs each
1526        // candidate's owner-kind with its rtype.
1527        let owner = if crate::endpoint::names_match_record(self.records.service_type(), ka.record())
1528        {
1529          KasOwner::ServiceType
1530        } else if crate::endpoint::names_match_record(self.records.instance(), ka.record()) {
1531          KasOwner::Instance
1532        } else if crate::endpoint::names_match_record(self.records.host(), ka.record()) {
1533          KasOwner::Host
1534        } else {
1535          return; // not one of our RRset names — cannot suppress any of our records
1536        };
1537        let hint = KasHint {
1538          owner,
1539          rtype: ka.record().rtype(),
1540          rdata_hash,
1541          expires_at,
1542        };
1543        if let Some(slot) = self.kas_hints.get_mut(self.kas_next_slot) {
1544          *slot = Some(hint);
1545          self.kas_next_slot = self.kas_next_slot.saturating_add(1) % KAS_RING_SIZE;
1546          trace!(
1547            target: "mdns_proto::service",
1548            handle = self.handle.raw(),
1549            rtype = ?ka.record().rtype(),
1550            "service: KnownAnswer hint stored (§7.1 KAS)"
1551          );
1552        }
1553      }
1554      (_, ServiceEvent::HostConflict(hc)) => {
1555        // RFC 6762 §9 only treats DIFFERENT rdata as a conflict. A
1556        // host A/AAAA whose address is one WE advertise is consistent (our own
1557        // multicast echo, or another instance correctly sharing the host) — not
1558        // a conflict. Ignore it; surface HostConflict only for a genuinely
1559        // different address.
1560        if self.host_record_is_ours(hc.record()) {
1561          return;
1562        }
1563        // A peer is claiming our host name (A/AAAA owner) with a DIFFERENT
1564        // address. Unlike an instance-name conflict we do NOT auto-rename —
1565        // renaming only the instance would leave the host conflict unresolved,
1566        // and multiple services may share one host so renaming all of them would
1567        // be incorrect. Surface the event to the caller via
1568        // ServiceUpdate::HostConflict; the caller must intervene (e.g. choose a
1569        // new host name and re-register).
1570        warn!(
1571          target: "mdns_proto::service",
1572          handle = self.handle.raw(),
1573          state = ?self.state,
1574          rtype = ?hc.record().rtype(),
1575          "service: HostConflict — peer claimed our host name with different rdata"
1576        );
1577        #[cfg(feature = "stats")]
1578        if let Some(s) = self.stat() {
1579          s.conflicts(1);
1580        }
1581        let _ = self.pending_updates.insert(ServiceUpdate::HostConflict);
1582      }
1583      _ => {}
1584    }
1585  }
1586
1587  /// Drive timer-based transitions. Returns Ok unless arithmetic overflowed.
1588  #[allow(clippy::arithmetic_side_effects)]
1589  pub fn handle_timeout(&mut self, now: I) -> Result<(), HandleTimeoutError> {
1590    #[cfg(feature = "tracing")]
1591    let _span = hick_trace::trace_span!("service", handle = self.handle.raw()).entered();
1592    // Cache latest `now` for use by poll_transmit's KAS filtering closure.
1593    // handle_event now receives `now` directly, so this is only needed
1594    // for the filtering closure in poll_transmit.
1595    self.last_now = Some(now);
1596
1597    // Item 5: prune expired KAS hints.
1598    for slot in self.kas_hints.iter_mut() {
1599      if let Some(hint) = slot
1600        && hint.expires_at <= now
1601      {
1602        *slot = None;
1603      }
1604    }
1605
1606    // RFC 6762 §8.2 tiebreak: if a ProbeConflict was buffered since the last
1607    // timeout, compare our proposed RR set against the peer's. Only rename if
1608    // we lose (or tie — RFC §8.2.1 treats a tie as a loss). The §8.2
1609    // lexicographic tiebreak applies ONLY to Init/Probing (simultaneous
1610    // probing). Post-establishment (§9) conflicts are handled separately in
1611    // `handle_event` (revert-to-probe), not via this tiebreak.
1612    if self.tiebreak_pending && matches!(self.state, ServiceState::Init | ServiceState::Probing(_))
1613    {
1614      self.tiebreak_pending = false;
1615      let we_lose = compare_rr_sets_we_lose(&self.records, &self.peer_probes);
1616      self.peer_probes.clear();
1617      if we_lose {
1618        // if the OLD name had been announced, peers have its
1619        // PTR/SRV/TXT cached — withdraw them with a TTL=0 goodbye BEFORE
1620        // switching names, or they linger as a ghost/duplicate until TTL.
1621        // Snapshot the old records now (records are about to be mutated /
1622        // instance ownership about to be reset). Probe-time names that were
1623        // never announced have nothing cached, so no goodbye.
1624        warn!(
1625          target: "mdns_proto::service",
1626          handle = self.handle.raw(),
1627          state = ?self.state,
1628          rename_attempt = self.rename_attempt.saturating_add(1),
1629          "service: probe tiebreak lost (§8.2) — renaming"
1630        );
1631        #[cfg(feature = "stats")]
1632        if let Some(s) = self.stat() {
1633          s.conflicts(1);
1634          s.renames(1);
1635        }
1636        if self.goodbye.any_instance() {
1637          // capture WHICH instance records the old name actually put on
1638          // the wire (§7.1 KAS may have emitted only a subset), so the rename
1639          // goodbye withdraws exactly those — not all of PTR/SRV/TXT, which
1640          // could flush a peer's matching same-name record we never sent. Host
1641          // A/AAAA are not withdrawn by a rename (the host name is unchanged).
1642          // Captured BEFORE `set_instance(new_name)` below, so `self.records`
1643          // still names the OLD instance. The Service no longer drains this —
1644          // it is handed off (`take_rename_goodbye_handoff`) to the endpoint as
1645          // an independent detached withdrawal item.
1646          let owned = respond::EmittedRecords::new(
1647            self.goodbye.ptr,
1648            self.goodbye.srv,
1649            self.goodbye.txt,
1650            std::vec::Vec::new(),
1651            std::vec::Vec::new(),
1652            self.goodbye.subtypes,
1653          );
1654          self.rename_goodbye_handoff = Some(RenameGoodbyeHandoff {
1655            records: self.records.clone(),
1656            owned,
1657          });
1658        }
1659        self.rename_attempt = self.rename_attempt.saturating_add(1);
1660        let new_name_str =
1661          rename_with_suffix(self.records.instance().as_str(), self.rename_attempt);
1662        match crate::Name::try_from_str(&new_name_str) {
1663          Ok(new_name) => {
1664            self.records.set_instance(new_name.clone());
1665            let _ = self.pending_updates.insert(ServiceUpdate::Renamed(
1666              crate::event::ServiceRenamed::new(new_name),
1667            ));
1668            self.state = ServiceState::Init;
1669            self.probe_count = 0;
1670            self.announce_count = 0;
1671            self.pending_transmits = [None, None];
1672            self.response_deadline = None;
1673            self.lifecycle_deadline = probe_deadline(now, 0, &mut self.rng);
1674            // the new name has NOT been announced yet, and the
1675            // old name's per-advertised-name state must not leak into it —
1676            // otherwise a later unregister/local-collision could goodbye a
1677            // never-announced name, and queued legacy replies / KAS hints
1678            // would advertise/suppress under the wrong (un-probed) name.
1679            self.reset_advertised_name_state();
1680          }
1681          Err(_) => {
1682            // rename failed (the suffixed name isn't a valid DNS
1683            // name) — give up. Mirror the success-branch cleanup so no stale
1684            // transmit / response-cycle work can still be drained by
1685            // poll_transmit after we've declared Conflicting.
1686            self.state = ServiceState::Conflicting;
1687            let _ = self.pending_updates.insert(ServiceUpdate::Conflict);
1688            self.lifecycle_deadline = None;
1689            self.pending_transmits = [None, None];
1690            self.response_deadline = None;
1691            self.goodbye.reset_instance();
1692            self.clear_response_cycle_state();
1693          }
1694        }
1695        return Ok(());
1696      }
1697      // We win: continue probing as if no conflict happened.
1698    }
1699
1700    // Drain BOTH deadlines if both are due at `now`. The old
1701    // code returned early after firing response_deadline, silently skipping
1702    // lifecycle_deadline if it was also due. Now we check both independently,
1703    // push each kind into the two-slot queue via push_pending, and drain them
1704    // in poll_transmit one-by-one.  Both transmits are preserved — the old
1705    // single-slot design would drop the lifecycle transmit when both fired.
1706
1707    // Step 1: check response deadline.
1708    let response_fired = if let Some(rd) = self.response_deadline {
1709      if now >= rd {
1710        self.response_deadline = None;
1711        true
1712      } else {
1713        false
1714      }
1715    } else {
1716      false
1717    };
1718
1719    // Step 2: check lifecycle deadline (Init-synthesis + normal fire path).
1720    // For the Init state: if lifecycle_deadline is None (e.g. renamed before
1721    // first handle_timeout), synthesise a fresh probe deadline now.
1722    if self.state == ServiceState::Init && self.lifecycle_deadline.is_none() {
1723      self.lifecycle_deadline = probe_deadline(now, 0, &mut self.rng);
1724      // lifecycle didn't "fire" a transmit here — just scheduled; fall through.
1725    }
1726
1727    let lifecycle_fired = if let Some(due) = self.lifecycle_deadline {
1728      if now >= due {
1729        // Advance lifecycle state and push a transmit kind into the queue via
1730        // push_pending.  The state advance MUST happen regardless of whether
1731        // the response deadline also fired at the same tick.
1732        match self.state {
1733          ServiceState::Init => {
1734            // Enter Probing phase; schedule the first probe delay.
1735            // No transmit yet — the probe fires when the delay elapses.
1736            self.state = ServiceState::Probing(0);
1737            self.lifecycle_deadline = probe_deadline(now, 0, &mut self.rng);
1738            debug!(
1739              target: "mdns_proto::service",
1740              handle = self.handle.raw(),
1741              "service: Init → Probing(0)"
1742            );
1743            // Init→Probing(0) schedules the NEXT deadline; no transmit this tick.
1744            false // no lifecycle transmit this tick
1745          }
1746          ServiceState::Probing(n) => {
1747            // a probe deadline fired — ENQUEUE the probe and re-arm a
1748            // fallback retry deadline, but do NOT advance the probe sequence
1749            // here. The §8.1 progression (next probe, or entering Announcing
1750            // after the third) happens in `note_transmit_result` ONLY once the
1751            // driver confirms the probe actually reached the link — mirroring
1752            // the Announcing arm below. An unconfirmed probe is retried at the
1753            // probe interval instead of the service silently marching toward
1754            // Announcing with nothing on the wire (RFC 6762 §8.1: a name must be
1755            // probed before it is claimed).
1756            debug!(
1757              target: "mdns_proto::service",
1758              handle = self.handle.raw(),
1759              probe_n = n,
1760              "service: Probing — enqueueing probe"
1761            );
1762            self.push_pending(PendingTransmitKind::Probe);
1763            self.lifecycle_deadline = probe_deadline(now, n, &mut self.rng);
1764            true
1765          }
1766          ServiceState::Announcing(_n) => {
1767            // an announce deadline fired — schedule the announcement
1768            // transmit but do NOT advance the phase here. The phase progression
1769            // and the Established update happen on CONFIRMED delivery
1770            // (`note_transmit_delivered`); peers learn of us only once a send
1771            // actually reaches the link. Re-arm at the announce interval so an
1772            // unconfirmed (all-socket-failed) send is retried rather than the
1773            // service silently progressing to Established with nothing on the
1774            // wire. A confirmed send overwrites this deadline.
1775            debug!(
1776              target: "mdns_proto::service",
1777              handle = self.handle.raw(),
1778              announce_n = _n,
1779              "service: Announcing — enqueueing announcement"
1780            );
1781            self.push_pending(PendingTransmitKind::Announcement);
1782            self.lifecycle_deadline = announce_deadline(now, 1);
1783            true
1784          }
1785          ServiceState::Established => {
1786            // The lifecycle deadline that fired is the periodic re-announce.
1787            debug!(
1788              target: "mdns_proto::service",
1789              handle = self.handle.raw(),
1790              "service: Established — enqueueing periodic re-announce"
1791            );
1792            self.push_pending(PendingTransmitKind::Announcement);
1793            self.lifecycle_deadline = re_announce_deadline(now, self.records.ttl_secs());
1794            true
1795          }
1796          ServiceState::Conflicting => {
1797            // No automatic progression — caller must intervene.
1798            false
1799          }
1800        }
1801      } else {
1802        false
1803      }
1804    } else {
1805      false
1806    };
1807
1808    // Step 3: push a Response transmit if the response deadline fired.
1809    // The lifecycle arm already pushed its transmit (Probe/Announcement) above
1810    // via push_pending.  When both fire at the same tick, BOTH entries land in
1811    // the two-slot queue and poll_transmit drains them one-by-one.  This
1812    // preserves both transmits — the old single-slot design would silently drop
1813    // the lifecycle transmit by overwriting it with Response (fix).
1814    if response_fired {
1815      self.push_pending(PendingTransmitKind::Response);
1816    }
1817    let _ = lifecycle_fired; // used for clarity
1818
1819    Ok(())
1820  }
1821
1822  /// Produce the next outgoing datagram, if any. Writes into `buf`.
1823  ///
1824  /// Returns `Ok(None)` when the transmit queue is empty.  The caller should
1825  /// loop on this method until it returns `Ok(None)` to drain all pending
1826  /// transmits (at most 2 can be queued when both a response deadline and a
1827  /// lifecycle deadline fired at the same `now`).
1828  pub fn poll_transmit(
1829    &mut self,
1830    now: I,
1831    buf: &mut [u8],
1832  ) -> Result<Option<Transmit>, TransmitError> {
1833    #[cfg(feature = "tracing")]
1834    let _span = hick_trace::trace_span!("service", handle = self.handle.raw()).entered();
1835    // the commit token is a SINGLE slot. If a previously produced
1836    // datagram has not yet been confirmed via `note_transmit_result`, do NOT
1837    // hand out (and silently overwrite the token of) another one — that would
1838    // lose the first send's pending confirmation and mis-apply the next result
1839    // to the wrong datagram. Returning `Ok(None)` makes the documented
1840    // "poll until Ok(None)" drain contract enforce poll→confirm→poll ordering
1841    // for EVERY Sans-I/O caller, not just the tokio driver (which already
1842    // confirms after each send). The token is cleared by `note_transmit_result`
1843    // (`.take()`), so the next poll after a confirm proceeds normally; a probe/
1844    // announce/response branch below re-stamps it, while the early-return
1845    // datagram (legacy unicast) only stamps where it owns lifecycle/ownership
1846    // state.
1847    if self.awaiting_confirm.is_some() {
1848      return Ok(None);
1849    }
1850    // §9: emit a pending service-type enumeration reply (a single shared
1851    // meta-PTR). Standalone like the rename goodbye — stamps NO awaiting_confirm
1852    // (it advertises no instance records and is never withdrawn), so it gates no
1853    // lifecycle/goodbye state. An un-encodable reply (near-MTU) is dropped, not
1854    // surfaced as an error, so a remote meta-query can't poison the service.
1855    if self.meta_response_deadline.is_some_and(|due| now >= due) {
1856      // Consume the meta cycle up-front: clear the deadline, the questioner set,
1857      // and the suppression flag regardless of outcome.
1858      self.meta_response_deadline = None;
1859      // (§9 + §7.1): suppress our redundant meta reply if a meta
1860      // questioner already holds our service-type PTR (sent it as a
1861      // known-answer). Only when EXACTLY ONE meta questioner coalesced
1862      // this window — mirrors the guard for the normal response path. With several
1863      // coalesced meta queriers a single source that already has our type must
1864      // NOT suppress the multicast reply the others still need.
1865      let suppressed = self.meta_known_answered && self.meta_questioner_srcs.len() == 1;
1866      self.meta_questioner_srcs.clear();
1867      self.meta_known_answered = false;
1868      if !suppressed
1869        && let Ok(meta) = crate::Name::try_from_str(crate::endpoint::DNS_SD_META_QUERY_NAME)
1870        && let Ok(n) = respond::write_meta_response(&self.records, &meta, buf)
1871      {
1872        // Stamp the MetaResponse token so note_transmit_result can count
1873        // responses_tx on a confirmed delivery.  No goodbye ownership is
1874        // latched (the meta-PTR is shared and never withdrawn).
1875        self.awaiting_confirm = Some(AwaitingConfirm::MetaResponse);
1876        return Ok(Some(Transmit::new(respond::multicast_dst(), None, n)));
1877      }
1878      // Suppressed, or name build (impossible) / encode failed — drop the reply
1879      // (state already cleared above), do not poison; fall through to the queue.
1880    }
1881    // drain legacy unicast responses (RFC 6762 §6.7) first — one
1882    // query-shaped, ID-echoing, TTL-capped datagram per legacy querier, sent
1883    // to its source.
1884    if let Some(legacy) = self.pending_legacy.first() {
1885      // a §9 meta reply emits only the shared meta-PTR (no instance
1886      // records, no goodbye ownership); a normal reply emits the full record set
1887      // and reports the EmittedRecords to latch on a confirmed delivery.
1888      let encoded = if legacy.is_meta {
1889        respond::write_legacy_meta_response(
1890          &self.records,
1891          legacy.query_id,
1892          &legacy.name,
1893          legacy.qtype,
1894          legacy.qclass,
1895          buf,
1896        )
1897        .map(|n| (n, None::<respond::EmittedRecords>))
1898      } else {
1899        respond::write_legacy_response(
1900          &self.records,
1901          legacy.query_id,
1902          &legacy.name,
1903          legacy.qtype,
1904          legacy.qclass,
1905          buf,
1906        )
1907        .map(|(n, emitted)| (n, Some(emitted)))
1908      };
1909      match encoded {
1910        Ok((n, emitted)) => {
1911          let resp = self.pending_legacy.remove(0);
1912          // a §6.7 legacy reply puts positive-TTL records on
1913          // the wire — the FULL record set, since legacy replies are not
1914          // KAS-filtered. Stamp the Response commit token with exactly what the
1915          // encoder reported it emitted; a confirmed delivery then latches
1916          // goodbye ownership for those records via `note_transmit_result`. A
1917          // meta reply (`emitted` is None) uses MetaResponse — shared PTR, no
1918          // goodbye ownership — but still counts responses_tx on delivery.
1919          // Legacy replies are not KAS-filtered, so the partial-suppression
1920          // count is always 0.
1921          self.awaiting_confirm = match emitted {
1922            Some(e) => Some(AwaitingConfirm::Response(e, 0)),
1923            None => Some(AwaitingConfirm::MetaResponse),
1924          };
1925          return Ok(Some(Transmit::new(resp.dst, None, n)));
1926        }
1927        // a legacy reply echoes the question, so it can exceed the
1928        // buffer for a near-MTU service whose normal announcement still fits.
1929        // DROP the un-encodable entry rather than (a) leaving it stuck at the
1930        // head blocking all transmits, or (b) surfacing BufferTooSmall — which
1931        // the driver counts as a SERVICE encode failure and would use to
1932        // unregister an otherwise-healthy service. A remote query
1933        // must not be able to poison the service. The legacy querier simply
1934        // gets no reply (it retries / falls back); the service is untouched.
1935        Err(_) => {
1936          let _ = self.pending_legacy.remove(0);
1937          // Fall through to the normal (announce/probe/response) queue.
1938        }
1939      }
1940    }
1941    // PEEK without removing — if encoding fails the kind stays in the queue so
1942    // the caller can retry with a larger buffer.
1943    let kind = match self.peek_pending() {
1944      Some(k) => k,
1945      None => return Ok(None),
1946    };
1947    // which owner groups a Response actually emitted (after KAS).
1948    let mut resp_emitted = respond::EmittedRecords::default();
1949    // Per-response KAS suppression count (incremented inside the filter closure
1950    // via shared Cell, then bumped into stats after encoding).
1951    #[cfg(feature = "stats")]
1952    let kas_suppressed = core::cell::Cell::new(0u64);
1953    let n = match kind {
1954      PendingTransmitKind::Probe => {
1955        let n = respond::write_probe(&self.records, buf).map_err(|_| {
1956          warn!(
1957            target: "mdns_proto::service",
1958            handle = self.handle.raw(),
1959            "service: poll_transmit probe BufferTooSmall"
1960          );
1961          TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(
1962            buf.len(),
1963            buf.len(),
1964          ))
1965        })?;
1966        debug!(
1967          target: "mdns_proto::service",
1968          handle = self.handle.raw(),
1969          bytes = n,
1970          "service: poll_transmit emitting probe"
1971        );
1972        // probes_tx is bumped in note_transmit_result on confirmed delivery.
1973        n
1974      }
1975      PendingTransmitKind::Announcement => {
1976        // Unsolicited announcements (Announcing(_) phase and periodic re-announce
1977        // from Established) are sent without KAS filtering. RFC 6762 §7.1
1978        // known-answer suppression only applies to question responses.
1979        let n = respond::write_announce(&self.records, buf).map_err(|_| {
1980          warn!(
1981            target: "mdns_proto::service",
1982            handle = self.handle.raw(),
1983            "service: poll_transmit announcement BufferTooSmall"
1984          );
1985          TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(
1986            buf.len(),
1987            buf.len(),
1988          ))
1989        })?;
1990        debug!(
1991          target: "mdns_proto::service",
1992          handle = self.handle.raw(),
1993          bytes = n,
1994          "service: poll_transmit emitting announcement"
1995        );
1996        // announcements_tx is bumped in note_transmit_result on confirmed delivery.
1997        n
1998      }
1999      PendingTransmitKind::Response => {
2000        // Jittered question responses normally apply KAS filtering
2001        // (RFC 6762 §7.1) — skip records the querier already holds.
2002        //
2003        // when MULTIPLE questioners coalesced in the same
2004        // response window, hints from one source must NOT suppress
2005        // records that another source needs.  Per-source KAS state
2006        // is a deeper refactor; this defensive simplification
2007        // disables KAS filtering entirely for coalesced responses.
2008        // The cost is sending a few extra records the single hinter
2009        // already had; the gain is closing the cross-source DoS
2010        // path where peer B's hint suppresses peer A's answer.
2011        let single_questioner = self.questioner_srcs.len() <= 1;
2012        let hints = self.kas_hints;
2013        let last_now = self.last_now;
2014        let (encoded, emitted) =
2015          respond::write_announce_filtered(&self.records, buf, |rtype, rdata| {
2016            if !single_questioner {
2017              return false;
2018            }
2019            let h = hash_rdata(rdata);
2020            let now_ref = match last_now {
2021              Some(t) => t,
2022              None => return false,
2023            };
2024            // a hint may only suppress the RRset it actually names. Map
2025            // this candidate's owner to its kind (PTR↦service-type, SRV/TXT↦
2026            // instance, A/AAAA↦host) and require the hint to share it — so a
2027            // same-rtype/same-rdata known-answer under a DIFFERENT owner name
2028            // cannot silence our record.
2029            let owner = match rtype {
2030              crate::wire::ResourceType::Ptr => KasOwner::ServiceType,
2031              crate::wire::ResourceType::Srv | crate::wire::ResourceType::Txt => KasOwner::Instance,
2032              crate::wire::ResourceType::A | crate::wire::ResourceType::AAAA => KasOwner::Host,
2033              _ => return false,
2034            };
2035            let suppressed = hints.iter().any(|slot| match slot {
2036              Some(hint) => {
2037                hint.owner == owner
2038                  && hint.rtype == rtype
2039                  && hint.rdata_hash == h
2040                  && hint.expires_at > now_ref
2041              }
2042              None => false,
2043            });
2044            #[cfg(feature = "stats")]
2045            if suppressed {
2046              kas_suppressed.set(kas_suppressed.get().saturating_add(1));
2047            }
2048            suppressed
2049          })
2050          .map_err(|_| {
2051            warn!(
2052              target: "mdns_proto::service",
2053              handle = self.handle.raw(),
2054              "service: poll_transmit response BufferTooSmall"
2055            );
2056            TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(
2057              buf.len(),
2058              buf.len(),
2059            ))
2060          })?;
2061        resp_emitted = emitted;
2062        debug!(
2063          target: "mdns_proto::service",
2064          handle = self.handle.raw(),
2065          bytes = encoded,
2066          "service: poll_transmit emitting response"
2067        );
2068        encoded
2069      }
2070    };
2071    // Encoding succeeded — NOW remove from the queue (peek-then-pop).
2072    let kind = self.peek_pending();
2073    self.pop_pending();
2074    // the datagram has been
2075    // ENCODED, but no lifecycle state advances here. Map the queued kind to the
2076    // commit token the driver resolves via `note_transmit_result` — the SOLE
2077    // place probe/announce progression AND goodbye-ownership latching happen,
2078    // and only on a confirmed send.
2079    self.awaiting_confirm = match kind {
2080      Some(PendingTransmitKind::Probe) => Some(AwaitingConfirm::Probe),
2081      Some(PendingTransmitKind::Announcement) => {
2082        // A full (unfiltered) announcement carries every instance record
2083        // (PTR/SRV/TXT) and every host address — §7.1 known-answer suppression
2084        // does NOT apply to unsolicited announcements. Latch goodbye ownership
2085        // for exactly that record set, same path as a response.
2086        Some(AwaitingConfirm::Announcement(respond::EmittedRecords::new(
2087          true,
2088          true,
2089          true,
2090          self.records.a_addrs_slice().to_vec(),
2091          self.records.aaaa_addrs_slice().to_vec(),
2092          !self.records.subtype_names().is_empty(),
2093        )))
2094      }
2095      Some(PendingTransmitKind::Response) => {
2096        // KAS state is per-response-cycle — clear the hint ring
2097        // and questioner set now that this Response consumed it.
2098        self.kas_hints = [None; KAS_RING_SIZE];
2099        self.questioner_srcs.clear();
2100        // §7.1: if KAS suppressed EVERY record the response is header-only —
2101        // do not put an empty response on the wire, and latch nothing (a
2102        // header-only datagram advertises nothing to withdraw).
2103        //
2104        // Full suppression: no datagram leaves the host, so there is no
2105        // delivery to wait for. Count answers_suppressed_kas immediately at
2106        // the point of suppression — this is a genuine suppression event, not
2107        // a send failure. Document: this is the ONLY counter bump in
2108        // poll_transmit that is NOT deferred to note_transmit_result, because
2109        // Ok(None) means no datagram (and thus no AwaitingConfirm token) is
2110        // ever produced.
2111        if resp_emitted.is_empty() {
2112          #[cfg(feature = "stats")]
2113          if let Some(s) = self.stat() {
2114            let suppressed = kas_suppressed.get();
2115            if suppressed > 0 {
2116              s.answers_suppressed_kas(suppressed);
2117            }
2118          }
2119          return Ok(None);
2120        }
2121        // Partial suppression: carry the suppressed count in the AwaitingConfirm
2122        // token and defer the answers_suppressed_kas bump to note_transmit_result
2123        // so a socket failure does NOT inflate the counter.
2124        // responses_tx is also deferred there.
2125        #[cfg(feature = "stats")]
2126        let partial_suppressed = kas_suppressed.get();
2127        #[cfg(not(feature = "stats"))]
2128        let partial_suppressed = 0u64;
2129        // Latch goodbye ownership only for the concrete records actually emitted.
2130        Some(AwaitingConfirm::Response(resp_emitted, partial_suppressed))
2131      }
2132      None => None,
2133    };
2134    let _ = self.pending_tx.iter().next(); // silence unused-field warning
2135    // Multicast response — serves QM and QU (§5.4) group members. Legacy unicast
2136    // queriers are handled separately via `pending_legacy`.
2137    Ok(Some(Transmit::new(respond::multicast_dst(), None, n)))
2138  }
2139}
2140}
2141
2142#[cfg(test)]
2143#[cfg(all(any(feature = "alloc", feature = "std"), feature = "slab"))]
2144mod tests;