Skip to main content

mdns_proto/query/
mod.rs

1//! Query state machine — retry backoff + answer collection + KAS hints.
2
3mod retry;
4
5use crate::{
6  Instant, Name, Pool, QueryHandle,
7  backend::RdataBuf,
8  error::{HandleTimeoutError, TransmitError},
9  event::{QueryEvent, QueryUpdate},
10  transmit::Transmit,
11  wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceClass, ResourceType},
12};
13
14#[cfg(all(test, feature = "std", feature = "slab"))]
15mod tests;
16
17/// Maximum retries before giving up.
18const MAX_RETRIES: u32 = 8;
19
20/// Default maximum number of collected answers per query.
21const DEFAULT_MAX_ANSWERS: usize = 256;
22
23/// One collected answer record for a Query.
24///
25/// Stores the resource type, class, and raw rdata bytes so that
26/// deduplication, qtype/qclass filtering, and the answer cap can all
27/// be applied before inserting into the pool.
28#[derive(Debug, Clone)]
29pub struct CollectedAnswer {
30  rtype: ResourceType,
31  rclass: ResourceClass,
32  rdata: RdataBuf,
33  /// the case-FOLDED identity form of `rdata` (PTR/SRV/NSEC/CNAME
34  /// names lowercased) used for dedup, cap accounting, and mailbox coalescing —
35  /// while `rdata` keeps the original case for display. Two answers that are
36  /// the same logical record differing only in DNS name case share this key,
37  /// so a responder cannot evict/flood the bounded answer set with case
38  /// permutations.
39  ///
40  /// `None` means the folded form is byte-identical to `rdata` (the
41  /// common case: A/AAAA/TXT/unknown rdata, or a name already lowercase), so we
42  /// store ONLY one buffer — folding a large TXT/unknown flood does not double
43  /// per-answer memory. [`Self::rdata_key`] resolves `None` to `rdata`.
44  rdata_key: Option<RdataBuf>,
45  /// Monotonically increasing insertion sequence number within a single Query.
46  /// Used to identify the oldest entry for FIFO eviction.
47  seq: u64,
48}
49
50impl CollectedAnswer {
51  /// Construct an answer from its parts.
52  ///
53  /// Hidden from the documented surface: the `Query` state machine builds
54  /// these internally, but downstream crates need a way to synthesize them
55  /// for tests and synthetic answer feeds. Synthetic answers carry opaque
56  /// rdata with no DNS-name semantics, so the dedup key is the rdata itself
57  /// (`rdata_key` is `None`, i.e. identical to `rdata`).
58  #[doc(hidden)]
59  pub fn from_parts(
60    rtype: ResourceType,
61    rclass: ResourceClass,
62    rdata: impl Into<RdataBuf>,
63    seq: u64,
64  ) -> Self {
65    Self {
66      rtype,
67      rclass,
68      rdata: rdata.into(),
69      rdata_key: None,
70      seq,
71    }
72  }
73
74  /// The resource type of this answer.
75  #[inline(always)]
76  pub fn rtype(&self) -> ResourceType {
77    self.rtype
78  }
79
80  /// The resource class of this answer.
81  #[inline(always)]
82  pub fn rclass(&self) -> ResourceClass {
83    self.rclass
84  }
85
86  /// The raw rdata bytes of this answer, with DNS name case PRESERVED (for
87  /// display). For identity/dedup comparisons use [`Self::rdata_key`].
88  #[inline(always)]
89  pub fn rdata_slice(&self) -> &[u8] {
90    self.rdata.as_ref()
91  }
92
93  /// The case-FOLDED identity form of the rdata. Equal for two
94  /// answers that are the same logical record differing only in DNS name case;
95  /// callers coalescing/deduping answers should compare this, not
96  /// [`Self::rdata_slice`] (which preserves display case). resolves
97  /// to `rdata` when the folded form is identical (no separate buffer stored).
98  #[inline(always)]
99  pub fn rdata_key(&self) -> &[u8] {
100    self.rdata_key.as_deref().unwrap_or(self.rdata.as_ref())
101  }
102
103  /// Insertion sequence number (monotonically increasing per-Query).
104  ///
105  /// Used for FIFO eviction: the entry with the lowest `seq` is the oldest.
106  #[inline(always)]
107  pub fn seq(&self) -> u64 {
108    self.seq
109  }
110}
111
112/// Query state machine. One per outstanding query.
113pub struct Query<I, AN, EV> {
114  handle: QueryHandle,
115  #[cfg(feature = "stats")]
116  stats: Option<std::sync::Arc<hick_trace::stats::Stats>>,
117  qname: Name,
118  qtype: ResourceType,
119  qclass: ResourceClass,
120  txid: u16,
121  /// Number of datagrams sent so far (including the initial query).
122  /// Incremented by `poll_transmit`; drives both the §5.2 backoff interval
123  /// and the retry budget (`MAX_RETRIES`).
124  retry_count: u32,
125  next_deadline: Option<I>,
126  answers: AN,
127  pending_updates: EV,
128  done: bool,
129  /// latch tracking whether the terminal `QueryUpdate` has
130  /// already been returned to the caller via `Endpoint::poll_query`.
131  /// Used so the terminal is emitted exactly once even when both
132  /// `pending_updates` push and the `is_done` backstop would fire — and
133  /// to short-circuit subsequent `poll_query` calls on a terminated
134  /// query to `None`.
135  terminal_emitted: bool,
136  /// Maximum number of answers to collect before evicting the oldest (FIFO).
137  max_answers: usize,
138  /// Monotonic counter incremented on every successful answer insertion.
139  /// Each `CollectedAnswer` records the value at the time of its insertion;
140  /// eviction picks the entry with the lowest `next_seq` (i.e. the oldest).
141  next_seq: u64,
142  /// True when a datagram is ready to be built and sent.
143  /// Set on construction (first send is immediately due) and on each
144  /// `handle_timeout` tick that fires a retry; cleared after `poll_transmit`
145  /// consumes it. This prevents a driver looping on `poll_transmit` from
146  /// sending the same query continuously instead of honoring the backoff.
147  transmit_pending: bool,
148  /// set by `poll_transmit` for the datagram it just produced, and
149  /// cleared by `note_transmit_result` once the driver reports the send result.
150  /// The retry budget (`retry_count`) and the next-retry deadline are advanced
151  /// ONLY on a confirmed-delivered send — a datagram that fails on every socket
152  /// is re-attempted without consuming the budget, so a transient send failure
153  /// can never time out a query that never actually put a question on the wire.
154  awaiting_send_confirm: bool,
155  /// When `true`, questions are emitted with the QU bit set (RFC 6762 §5.4):
156  /// the sender prefers a unicast response rather than a multicast one.
157  unicast_response: bool,
158  /// Absolute instant at which this query should auto-cancel regardless of
159  /// the retry budget (`None` means no hard deadline beyond the retry budget).
160  timeout_deadline: Option<I>,
161}
162
163impl<I, AN, EV> Query<I, AN, EV>
164where
165  I: Instant,
166  AN: Pool<CollectedAnswer>,
167  EV: Pool<QueryUpdate>,
168{
169  /// Construct a new Query. Its first transmission is immediately due (the
170  /// next `poll_transmit` emits it); the retry schedule is then driven off
171  /// that send's instant, not construction time.
172  ///
173  /// * `unicast_response` — when `true`, questions carry the QU bit (RFC 6762 §5.4).
174  /// * `timeout_deadline` — optional absolute instant at which the query auto-cancels.
175  #[allow(dead_code, clippy::too_many_arguments)]
176  pub(crate) fn try_new(
177    handle: QueryHandle,
178    qname: Name,
179    qtype: ResourceType,
180    qclass: ResourceClass,
181    txid: u16,
182    unicast_response: bool,
183    timeout_deadline: Option<I>,
184  ) -> Self {
185    Self {
186      handle,
187      #[cfg(feature = "stats")]
188      stats: None,
189      qname,
190      qtype,
191      qclass,
192      txid,
193      retry_count: 0,
194      // No retry is scheduled yet: the first send is driven by
195      // `transmit_pending`, and `poll_transmit` schedules the first retry
196      // (+INITIAL_SECS) only after that send actually goes out. This keeps
197      // `poll_timeout` from returning `now` right after the first transmit,
198      // which would otherwise make a driver re-fire `handle_timeout` at `now`
199      // and collapse the first retry interval to zero / push it to 2s.
200      next_deadline: None,
201      answers: AN::new(),
202      pending_updates: EV::new(),
203      done: false,
204      terminal_emitted: false,
205      max_answers: DEFAULT_MAX_ANSWERS,
206      next_seq: 0,
207      transmit_pending: true,
208      awaiting_send_confirm: false,
209      unicast_response,
210      timeout_deadline,
211    }
212  }
213
214  /// Attach the shared [`hick_trace::stats::Stats`] handle from the owning
215  /// [`crate::endpoint::Endpoint`]. No allocation — the Arc is cloned from the
216  /// endpoint's existing single Arc. Called immediately after construction by
217  /// `Endpoint::try_start_query` so that all per-query counters accumulate into
218  /// the endpoint-level stats. Before this is called, stats bumps are no-ops
219  /// (the field is `None`).
220  #[cfg(feature = "stats")]
221  pub(crate) fn set_stats(&mut self, stats: std::sync::Arc<hick_trace::stats::Stats>) {
222    self.stats = Some(stats);
223  }
224
225  /// Borrow the stats handle if one has been attached.
226  #[cfg(feature = "stats")]
227  #[inline]
228  fn stat(&self) -> Option<&hick_trace::stats::Stats> {
229    self.stats.as_deref()
230  }
231
232  /// Override the maximum number of collected answers (default 256).
233  ///
234  /// When the pool reaches this limit the oldest entry (FIFO) is evicted to
235  /// make room for the incoming answer. Setting `max` to 0 disables collection
236  /// entirely.
237  #[must_use]
238  pub fn with_max_answers(mut self, max: usize) -> Self {
239    self.max_answers = max;
240    self
241  }
242
243  /// Set the maximum number of collected answers in place.  Same semantics
244  /// as [`Self::with_max_answers`] but for use after construction (e.g. by
245  /// `Endpoint::try_start_query` when threading a `QuerySpec::max_answers`).
246  #[inline(always)]
247  pub fn set_max_answers(&mut self, max: usize) {
248    self.max_answers = max;
249  }
250
251  /// Returns the handle assigned at start.
252  #[inline(always)]
253  pub const fn handle(&self) -> QueryHandle {
254    self.handle
255  }
256  /// Returns the queried name.
257  #[inline(always)]
258  pub fn qname(&self) -> &Name {
259    &self.qname
260  }
261  /// Returns the queried record type.
262  #[inline(always)]
263  pub const fn qtype(&self) -> ResourceType {
264    self.qtype
265  }
266  /// Returns the queried class.
267  #[inline(always)]
268  pub const fn qclass(&self) -> ResourceClass {
269    self.qclass
270  }
271  /// Returns the transaction id used on outgoing queries.
272  #[inline(always)]
273  pub const fn txid(&self) -> u16 {
274    self.txid
275  }
276
277  /// Process an event routed to this query by the Endpoint.
278  pub fn handle_event(&mut self, event: QueryEvent<'_>) {
279    #[cfg(feature = "tracing")]
280    let _span = hick_trace::trace_span!("query", handle = self.handle.raw()).entered();
281    crate::trace::trace!(
282      target: "mdns_proto::query",
283      handle = self.handle.raw(),
284      event = ?core::mem::discriminant(&event),
285      "query: handle_event"
286    );
287    match event {
288      QueryEvent::Answer(record) => {
289        // TTL=0 records are mDNS "goodbye" / deletion records
290        // (RFC 6762 §10.1).  Treating them as live answers would let a
291        // peer withdrawing a service inject a ghost entry into
292        // `collected_answers`, and under `max_answers` pressure could
293        // evict a real answer via FIFO.  The cache layer already
294        // handles TTL=0 as removal; for active queries we simply
295        // ignore the record.  Callers observe withdrawal indirectly
296        // via the cache.
297        if record.ttl() == 0 {
298          return;
299        }
300        // qtype filter: drop if rtype doesn't match (unless query is Any).
301        let qtype = self.qtype;
302        if !qtype.is_any() && record.rtype() != qtype {
303          return;
304        }
305        // qclass filter: drop if rclass doesn't match (unless query is Any).
306        let qclass = self.qclass;
307        if !qclass.is_any() && record.rclass() != qclass {
308          return;
309        }
310
311        // store the rdata in canonical (decompressed)
312        // wire form. PTR/SRV/NSEC rdata carries a domain name that responders
313        // (and this crate's own builder) may compress with a back-pointer into
314        // the packet; copying the raw slice would leave a dangling pointer the
315        // caller cannot decode once the datagram is gone, and two encodings of
316        // the same logical record would not dedupe. A malformed name drops the
317        // answer rather than storing undecodable bytes.
318        let owned = match record.canonical_rdata() {
319          Ok(v) => v,
320          Err(_) => return,
321        };
322        // the case-FOLDED identity key. Dedup/cap/coalescing compare
323        // this (DNS names are case-insensitive) so a responder can't flood the
324        // bounded answer set with case permutations of one record; `owned`
325        // keeps the original case for display.
326        let folded = match record.canonical_rdata_folded() {
327          Ok(v) => v,
328          Err(_) => return,
329        };
330        // only keep a SEPARATE key buffer when folding actually
331        // changed the bytes (a mixed-case name). For A/AAAA/TXT/unknown rdata —
332        // and names already lowercase — the folded form equals `owned`, so we
333        // store None and avoid doubling memory under a large-rdata flood.
334        let rdata_key = if folded == owned { None } else { Some(folded) };
335        let key: &[u8] = rdata_key.as_deref().unwrap_or(owned.as_ref());
336
337        // Dedupe: skip if a matching (rtype, rclass, folded-rdata) already in.
338        for (_, existing) in self.answers.iter() {
339          if existing.rtype() == record.rtype()
340            && existing.rclass() == record.rclass()
341            && existing.rdata_key() == key
342          {
343            crate::trace::trace!(
344              target: "mdns_proto::query",
345              handle = self.handle.raw(),
346              rtype = ?record.rtype(),
347              "query: answer deduped (already collected)"
348            );
349            return;
350          }
351        }
352
353        // A zero cap collects nothing.
354        if self.max_answers == 0 {
355          return;
356        }
357        // Make room before inserting. Evict the oldest (lowest-seq, true FIFO)
358        // entry when at the logical `max_answers` cap OR when the
359        // underlying pool has no vacant slot (a fixed-capacity pool
360        // smaller than `max_answers` would otherwise reject every new answer
361        // once full, since the `len >= max_answers` check never fires). One
362        // eviction frees room for exactly one insert.
363        if self.answers.len() >= self.max_answers || self.answers.vacant_key().is_err() {
364          let mut victim: Option<(usize, u64)> = None;
365          for (key, entry) in self.answers.iter() {
366            let s = entry.seq();
367            victim = Some(match victim {
368              // Existing candidate has a lower (older) seq — keep it.
369              Some(prev) if prev.1 <= s => prev,
370              _ => (key, s),
371            });
372          }
373          if let Some((victim_key, _)) = victim {
374            crate::trace::trace!(
375              target: "mdns_proto::query",
376              handle = self.handle.raw(),
377              "query: evicting oldest answer (cap reached)"
378            );
379            self.answers.try_remove(victim_key);
380          }
381        }
382
383        // Insert; advance `next_seq` ONLY on a successful insert so
384        // a dropped answer (a degenerate pool that cannot hold it even after
385        // eviction) is never accounted as collected — which would otherwise
386        // leave a gap in the FIFO seq ordering.
387        let new_seq = self.next_seq;
388        if self
389          .answers
390          .insert(CollectedAnswer {
391            rtype: record.rtype(),
392            rclass: record.rclass(),
393            rdata: owned,
394            rdata_key,
395            seq: new_seq,
396          })
397          .is_ok()
398        {
399          self.next_seq = self.next_seq.saturating_add(1);
400          crate::trace::trace!(
401            target: "mdns_proto::query",
402            handle = self.handle.raw(),
403            rtype = ?record.rtype(),
404            seq = new_seq,
405            "query: answer collected"
406          );
407          #[cfg(feature = "stats")]
408          if let Some(s) = self.stat() {
409            s.answers_collected(1);
410          }
411        }
412      }
413      QueryEvent::Truncated => {
414        // Hold off retry — more answers coming.
415      }
416    }
417  }
418
419  /// Next deadline for `handle_timeout`.
420  ///
421  /// Returns the earlier of `next_deadline` (next retry) and `timeout_deadline`
422  /// (absolute query cancellation). A driver that sleeps until this instant is
423  /// guaranteed to wake in time to fire the absolute timeout even when the next
424  /// retry is scheduled far in the future.
425  pub fn poll_timeout(&self) -> Option<I> {
426    match (self.next_deadline, self.timeout_deadline) {
427      (Some(n), Some(t)) => Some(if n < t { n } else { t }),
428      (Some(n), None) => Some(n),
429      (None, Some(t)) => Some(t),
430      (None, None) => None,
431    }
432  }
433
434  /// Route EVERY terminal transition through here. Idempotent: a no-op if
435  /// the query is already `done`. Sets `done = true`, queues the terminal
436  /// `QueryUpdate`, and under `#[cfg(feature="stats")]` bumps the correct
437  /// counter (`queries_timeout` or `queries_done`) and decrements
438  /// `queries_active` exactly once.
439  ///
440  /// Callers must pass the appropriate `update`:
441  /// * [`QueryUpdate::Timeout`] for timeout/retry-exhaustion/duplicate-question paths.
442  /// * [`QueryUpdate::Done`] for voluntary "done" paths (if/when added).
443  fn terminate(&mut self, update: QueryUpdate) {
444    if self.done {
445      return;
446    }
447    self.done = true;
448    self.transmit_pending = false;
449    let _ = self.pending_updates.insert(update);
450    self.next_deadline = None;
451    self.timeout_deadline = None;
452    #[cfg(feature = "stats")]
453    if let Some(s) = self.stat() {
454      match update {
455        QueryUpdate::Timeout => s.queries_timeout(1),
456        QueryUpdate::Done => {}
457      }
458      s.queries_done(1);
459      s.decr_queries_active(1);
460    }
461  }
462
463  /// Drive timer-based transitions.
464  pub fn handle_timeout(&mut self, now: I) -> Result<(), HandleTimeoutError> {
465    #[cfg(feature = "tracing")]
466    let _span = hick_trace::trace_span!("query", handle = self.handle.raw()).entered();
467    if self.done {
468      return Ok(());
469    }
470
471    // Check the absolute deadline before the per-retry deadline. A caller-
472    // supplied timeout takes priority over the built-in retry budget.
473    if let Some(td) = self.timeout_deadline
474      && now >= td
475    {
476      crate::trace::trace!(
477        target: "mdns_proto::query",
478        handle = self.handle.raw(),
479        "query: absolute timeout deadline reached"
480      );
481      self.terminate(QueryUpdate::Timeout);
482      return Ok(());
483    }
484
485    let due = match self.next_deadline {
486      Some(d) => d,
487      None => return Ok(()),
488    };
489    if now < due {
490      return Ok(());
491    }
492    // The scheduled retry is due. The retry budget is measured in datagrams
493    // actually sent (`retry_count`, incremented by `poll_transmit`); once the
494    // full budget is spent, retire the query instead of scheduling more.
495    if self.retry_count > MAX_RETRIES {
496      crate::trace::trace!(
497        target: "mdns_proto::query",
498        handle = self.handle.raw(),
499        retry_count = self.retry_count,
500        "query: retry budget exhausted — timeout"
501      );
502      self.terminate(QueryUpdate::Timeout);
503    } else {
504      // Mark a transmit due now and clear the deadline; `poll_transmit` emits
505      // the datagram and schedules the following retry. Clearing the deadline
506      // makes repeated `handle_timeout` calls before the drain no-ops, so a
507      // single fired tick yields exactly one retransmit.
508      crate::trace::trace!(
509        target: "mdns_proto::query",
510        handle = self.handle.raw(),
511        retry_count = self.retry_count,
512        "query: retry due — arming transmit"
513      );
514      self.transmit_pending = true;
515      self.next_deadline = None;
516    }
517    Ok(())
518  }
519
520  /// Force the query to its terminal TIMEOUT state at the DRIVER's request — used
521  /// when the transport can never send the question (e.g. a permanently-too-large
522  /// datagram on every reachable family), so the query would otherwise hang. This
523  /// is exactly the terminal a timer-driven timeout produces: it marks the query
524  /// `done` (so [`Self::is_done`] is true and `Endpoint::handle` freezes any late
525  /// answers) and queues a terminal [`QueryUpdate::Timeout`]. The collected answers
526  /// stay readable until the caller cancels. No-op if already done.
527  pub(crate) fn retire(&mut self) {
528    self.terminate(QueryUpdate::Timeout);
529  }
530
531  /// Produce the next outgoing datagram, if any. Writes into `buf`.
532  ///
533  /// Returns `Ok(None)` when the query is done or when no send is currently
534  /// due (i.e. `transmit_pending` is false). A single call per scheduled
535  /// deadline tick is guaranteed: the pending flag is cleared after the
536  /// datagram is built, so a driver looping on this method will not
537  /// re-send the query until the next `handle_timeout` fires.
538  pub fn poll_transmit(
539    &mut self,
540    _now: I,
541    buf: &mut [u8],
542  ) -> Result<Option<Transmit>, TransmitError> {
543    #[cfg(feature = "tracing")]
544    let _span = hick_trace::trace_span!("query", handle = self.handle.raw()).entered();
545    if self.done || !self.transmit_pending {
546      return Ok(None);
547    }
548    let buf_len = buf.len();
549    let header = Header::new().with_id(self.txid);
550    let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> = MessageBuilder::try_new(buf, header)
551      .map_err(|_| {
552        TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(
553          crate::wire::HEADER_SIZE,
554          buf_len,
555        ))
556      })?;
557    b.push_question(&self.qname, self.qtype, self.qclass, self.unicast_response)
558      .map_err(|_| TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(0, 0)))?;
559    let n = b
560      .finish()
561      .map_err(|_| TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(0, 0)))?;
562    self.transmit_pending = false;
563    // do NOT advance the retry budget or schedule the next retry
564    // here — the datagram has only been ENCODED. Await the driver's delivery
565    // result (`note_transmit_result`), which schedules the backoff on a
566    // confirmed send and re-attempts (without burning the budget) on failure.
567    self.awaiting_send_confirm = true;
568    crate::trace::debug!(
569      target: "mdns_proto::query",
570      handle = self.handle.raw(),
571      qname = self.qname.as_str(),
572      qtype = ?self.qtype,
573      bytes = n,
574      "query: poll_transmit emitting question"
575    );
576    Ok(Some(Transmit::new(
577      crate::service::multicast_dst(),
578      None,
579      n,
580    )))
581  }
582
583  /// Report the result of sending the datagram most recently produced by
584  /// [`Self::poll_transmit`]. The driver calls this after a send,
585  /// with `delivered = true` when at least one socket send succeeded.
586  ///
587  /// On a delivered send this counts the transmission against the §5.2 retry
588  /// budget and schedules the next retransmit (backoff: +1s, doubling, capped
589  /// at 60s). On a failed send the budget is NOT consumed and the query is
590  /// re-armed to re-attempt at the same interval — so a transient or total send
591  /// failure retries with backoff (no tight spin) and a query can never reach
592  /// its retry-budget timeout having put nothing on the wire.
593  pub fn note_transmit_result(&mut self, now: I, delivered: bool) {
594    if !self.awaiting_send_confirm {
595      return;
596    }
597    self.awaiting_send_confirm = false;
598    if delivered {
599      self.retry_count = self.retry_count.saturating_add(1);
600      self.next_deadline = retry::next_deadline(now, self.retry_count);
601    } else {
602      // Re-attempt this (undelivered) send after its backoff interval without
603      // advancing `retry_count`. `transmit_pending` stays false until the
604      // deadline fires, so the driver's drain loop does not spin.
605      self.next_deadline = retry::next_deadline(now, self.retry_count.saturating_add(1));
606    }
607  }
608
609  /// RFC 6762 §7.3 duplicate-question suppression. Another host has multicast
610  /// the SAME question that this query is ABOUT TO (re)transmit. Treat the
611  /// peer's query as our own ("treat its own query as having been sent"):
612  /// consume this retry slot and arm the next retransmit on the normal backoff,
613  /// without putting a redundant query on the wire — the peer's query elicits
614  /// the multicast answers we want.
615  ///
616  /// A retransmit is "imminent" either when it is already armed
617  /// (`transmit_pending`, the window between `handle_timeout` firing and
618  /// `poll_transmit` draining) OR when its `next_deadline` is already due but
619  /// not yet armed — the latter covers drivers that pump received packets BEFORE
620  /// firing query timeouts, so suppression does not depend on that ordering.
621  /// Either way it consumes exactly one retry slot: `transmit_pending`
622  /// is cleared and the due deadline is pushed forward, so a second duplicate in
623  /// the same slot is a no-op — suppression is idempotent per slot.
624  ///
625  /// The retry budget advances exactly as a real send would (and the query
626  /// retires via `MAX_RETRIES` here too), so a continuously-duplicated query
627  /// still progresses to its terminal timeout instead of being
628  /// deferred forever. An in-flight (awaiting-confirm) send is left alone.
629  ///
630  /// Returns `true` if a transmit slot was actually consumed (i.e. real
631  /// suppression happened) and `false` if the call was a no-op (query is
632  /// done, awaiting send confirmation, or no send was imminent). Callers use
633  /// the return value to decide whether to bump the
634  /// `duplicate_questions_suppressed` counter.
635  pub fn note_duplicate_question(&mut self, now: I) -> bool {
636    if self.done || self.awaiting_send_confirm {
637      return false;
638    }
639    let imminent = self.transmit_pending || self.next_deadline.is_some_and(|d| now >= d);
640    if !imminent {
641      return false;
642    }
643    self.transmit_pending = false;
644    self.retry_count = self.retry_count.saturating_add(1);
645    if self.retry_count > MAX_RETRIES {
646      // Budget spent (counting suppressed slots as our sends) — retire exactly
647      // as `handle_timeout` would after the final retransmit. Route through
648      // `terminate` so stats (queries_timeout, queries_done, decr_queries_active)
649      // are bumped exactly once on this path too.
650      self.terminate(QueryUpdate::Timeout);
651      // The slot was consumed even though the query is now terminal.
652      return true;
653    }
654    self.next_deadline = retry::next_deadline(now, self.retry_count);
655    true
656  }
657
658  /// Drain a pending app-level update.
659  pub fn poll(&mut self) -> Option<QueryUpdate> {
660    let key = self.pending_updates.iter().next().map(|(k, _)| k)?;
661    self.pending_updates.try_remove(key)
662  }
663
664  /// Has the query reached a terminal state?  Backstop for
665  /// [`Endpoint::poll_query`](crate::endpoint::Endpoint::poll_query) under
666  /// EV-pool pressure: if `handle_timeout` cannot push the terminal
667  /// `QueryUpdate::Timeout`, `Endpoint::poll_query` falls back to this
668  /// flag and synthesises `Timeout`.  External callers normally do NOT
669  /// need to consult this directly — drive `Endpoint::poll_query` and
670  /// react to its terminal return value.
671  #[inline(always)]
672  pub const fn is_done(&self) -> bool {
673    self.done
674  }
675
676  /// Has the terminal `QueryUpdate` already been delivered to the
677  /// caller via `Endpoint::poll_query`?  Internal latch — set
678  /// the first time `poll_query` returns `Done`/`Timeout` so subsequent
679  /// calls return `None` instead of re-emitting (or worse, double-emitting
680  /// from both the `pending_updates` push AND the `is_done` backstop).
681  #[inline(always)]
682  pub(crate) const fn terminal_emitted(&self) -> bool {
683    self.terminal_emitted
684  }
685
686  /// Mark the terminal as delivered.  Intended for
687  /// `Endpoint::poll_query` to call after returning `Done`/`Timeout`.
688  #[inline(always)]
689  pub(crate) fn mark_terminal_emitted(&mut self) {
690    self.terminal_emitted = true;
691  }
692
693  /// Iterate the answers collected so far by this query.
694  pub fn collected_answers(&self) -> impl Iterator<Item = &CollectedAnswer> + '_ {
695    self.answers.iter().map(|(_, a)| a)
696  }
697
698  /// Total number of answers ever accepted by this query, including ones
699  /// already evicted by the `max_answers` cap.
700  ///
701  /// Equal to the next sequence number to be assigned, so it is monotonic and
702  /// `>=` the highest `seq` currently in [`Self::collected_answers`]. A
703  /// consumer that delivers answers by ascending `seq` can compare this
704  /// against the count it has observed to detect (and count) answers the cap
705  /// evicted before they were read.
706  pub fn accepted_count(&self) -> u64 {
707    self.next_seq
708  }
709}