mdns_proto/query/mod.rs
1//! Query state machine — retry backoff + answer collection + KAS hints.
2
3mod retry;
4
5use crate::{
6 Instant, Name, Pool, QueryHandle,
7 backend::RdataBuf,
8 error::{HandleTimeoutError, TransmitError},
9 event::{QueryEvent, QueryUpdate},
10 transmit::Transmit,
11 wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceClass, ResourceType},
12};
13
14#[cfg(all(test, feature = "std", feature = "slab"))]
15mod tests;
16
17/// Maximum retries before giving up.
18const MAX_RETRIES: u32 = 8;
19
20/// Default maximum number of collected answers per query.
21const DEFAULT_MAX_ANSWERS: usize = 256;
22
23/// One collected answer record for a Query.
24///
25/// Stores the resource type, class, and raw rdata bytes so that
26/// deduplication, qtype/qclass filtering, and the answer cap can all
27/// be applied before inserting into the pool.
28#[derive(Debug, Clone)]
29pub struct CollectedAnswer {
30 rtype: ResourceType,
31 rclass: ResourceClass,
32 rdata: RdataBuf,
33 /// the case-FOLDED identity form of `rdata` (PTR/SRV/NSEC/CNAME
34 /// names lowercased) used for dedup, cap accounting, and mailbox coalescing —
35 /// while `rdata` keeps the original case for display. Two answers that are
36 /// the same logical record differing only in DNS name case share this key,
37 /// so a responder cannot evict/flood the bounded answer set with case
38 /// permutations.
39 ///
40 /// `None` means the folded form is byte-identical to `rdata` (the
41 /// common case: A/AAAA/TXT/unknown rdata, or a name already lowercase), so we
42 /// store ONLY one buffer — folding a large TXT/unknown flood does not double
43 /// per-answer memory. [`Self::rdata_key`] resolves `None` to `rdata`.
44 rdata_key: Option<RdataBuf>,
45 /// Monotonically increasing insertion sequence number within a single Query.
46 /// Used to identify the oldest entry for FIFO eviction.
47 seq: u64,
48}
49
50impl CollectedAnswer {
51 /// Construct an answer from its parts.
52 ///
53 /// Hidden from the documented surface: the `Query` state machine builds
54 /// these internally, but downstream crates need a way to synthesize them
55 /// for tests and synthetic answer feeds. Synthetic answers carry opaque
56 /// rdata with no DNS-name semantics, so the dedup key is the rdata itself
57 /// (`rdata_key` is `None`, i.e. identical to `rdata`).
58 #[doc(hidden)]
59 pub fn from_parts(
60 rtype: ResourceType,
61 rclass: ResourceClass,
62 rdata: impl Into<RdataBuf>,
63 seq: u64,
64 ) -> Self {
65 Self {
66 rtype,
67 rclass,
68 rdata: rdata.into(),
69 rdata_key: None,
70 seq,
71 }
72 }
73
74 /// The resource type of this answer.
75 #[inline(always)]
76 pub fn rtype(&self) -> ResourceType {
77 self.rtype
78 }
79
80 /// The resource class of this answer.
81 #[inline(always)]
82 pub fn rclass(&self) -> ResourceClass {
83 self.rclass
84 }
85
86 /// The raw rdata bytes of this answer, with DNS name case PRESERVED (for
87 /// display). For identity/dedup comparisons use [`Self::rdata_key`].
88 #[inline(always)]
89 pub fn rdata_slice(&self) -> &[u8] {
90 self.rdata.as_ref()
91 }
92
93 /// The case-FOLDED identity form of the rdata. Equal for two
94 /// answers that are the same logical record differing only in DNS name case;
95 /// callers coalescing/deduping answers should compare this, not
96 /// [`Self::rdata_slice`] (which preserves display case). resolves
97 /// to `rdata` when the folded form is identical (no separate buffer stored).
98 #[inline(always)]
99 pub fn rdata_key(&self) -> &[u8] {
100 self.rdata_key.as_deref().unwrap_or(self.rdata.as_ref())
101 }
102
103 /// Insertion sequence number (monotonically increasing per-Query).
104 ///
105 /// Used for FIFO eviction: the entry with the lowest `seq` is the oldest.
106 #[inline(always)]
107 pub fn seq(&self) -> u64 {
108 self.seq
109 }
110}
111
112/// Query state machine. One per outstanding query.
113pub struct Query<I, AN, EV> {
114 handle: QueryHandle,
115 #[cfg(feature = "stats")]
116 stats: Option<std::sync::Arc<hick_trace::stats::Stats>>,
117 qname: Name,
118 qtype: ResourceType,
119 qclass: ResourceClass,
120 txid: u16,
121 /// Number of datagrams sent so far (including the initial query).
122 /// Incremented by `poll_transmit`; drives both the §5.2 backoff interval
123 /// and the retry budget (`MAX_RETRIES`).
124 retry_count: u32,
125 next_deadline: Option<I>,
126 answers: AN,
127 pending_updates: EV,
128 done: bool,
129 /// latch tracking whether the terminal `QueryUpdate` has
130 /// already been returned to the caller via `Endpoint::poll_query`.
131 /// Used so the terminal is emitted exactly once even when both
132 /// `pending_updates` push and the `is_done` backstop would fire — and
133 /// to short-circuit subsequent `poll_query` calls on a terminated
134 /// query to `None`.
135 terminal_emitted: bool,
136 /// Maximum number of answers to collect before evicting the oldest (FIFO).
137 max_answers: usize,
138 /// Monotonic counter incremented on every successful answer insertion.
139 /// Each `CollectedAnswer` records the value at the time of its insertion;
140 /// eviction picks the entry with the lowest `next_seq` (i.e. the oldest).
141 next_seq: u64,
142 /// True when a datagram is ready to be built and sent.
143 /// Set on construction (first send is immediately due) and on each
144 /// `handle_timeout` tick that fires a retry; cleared after `poll_transmit`
145 /// consumes it. This prevents a driver looping on `poll_transmit` from
146 /// sending the same query continuously instead of honoring the backoff.
147 transmit_pending: bool,
148 /// set by `poll_transmit` for the datagram it just produced, and
149 /// cleared by `note_transmit_result` once the driver reports the send result.
150 /// The retry budget (`retry_count`) and the next-retry deadline are advanced
151 /// ONLY on a confirmed-delivered send — a datagram that fails on every socket
152 /// is re-attempted without consuming the budget, so a transient send failure
153 /// can never time out a query that never actually put a question on the wire.
154 awaiting_send_confirm: bool,
155 /// When `true`, questions are emitted with the QU bit set (RFC 6762 §5.4):
156 /// the sender prefers a unicast response rather than a multicast one.
157 unicast_response: bool,
158 /// Absolute instant at which this query should auto-cancel regardless of
159 /// the retry budget (`None` means no hard deadline beyond the retry budget).
160 timeout_deadline: Option<I>,
161}
162
163impl<I, AN, EV> Query<I, AN, EV>
164where
165 I: Instant,
166 AN: Pool<CollectedAnswer>,
167 EV: Pool<QueryUpdate>,
168{
169 /// Construct a new Query. Its first transmission is immediately due (the
170 /// next `poll_transmit` emits it); the retry schedule is then driven off
171 /// that send's instant, not construction time.
172 ///
173 /// * `unicast_response` — when `true`, questions carry the QU bit (RFC 6762 §5.4).
174 /// * `timeout_deadline` — optional absolute instant at which the query auto-cancels.
175 #[allow(dead_code, clippy::too_many_arguments)]
176 pub(crate) fn try_new(
177 handle: QueryHandle,
178 qname: Name,
179 qtype: ResourceType,
180 qclass: ResourceClass,
181 txid: u16,
182 unicast_response: bool,
183 timeout_deadline: Option<I>,
184 ) -> Self {
185 Self {
186 handle,
187 #[cfg(feature = "stats")]
188 stats: None,
189 qname,
190 qtype,
191 qclass,
192 txid,
193 retry_count: 0,
194 // No retry is scheduled yet: the first send is driven by
195 // `transmit_pending`, and `poll_transmit` schedules the first retry
196 // (+INITIAL_SECS) only after that send actually goes out. This keeps
197 // `poll_timeout` from returning `now` right after the first transmit,
198 // which would otherwise make a driver re-fire `handle_timeout` at `now`
199 // and collapse the first retry interval to zero / push it to 2s.
200 next_deadline: None,
201 answers: AN::new(),
202 pending_updates: EV::new(),
203 done: false,
204 terminal_emitted: false,
205 max_answers: DEFAULT_MAX_ANSWERS,
206 next_seq: 0,
207 transmit_pending: true,
208 awaiting_send_confirm: false,
209 unicast_response,
210 timeout_deadline,
211 }
212 }
213
214 /// Attach the shared [`hick_trace::stats::Stats`] handle from the owning
215 /// [`crate::endpoint::Endpoint`]. No allocation — the Arc is cloned from the
216 /// endpoint's existing single Arc. Called immediately after construction by
217 /// `Endpoint::try_start_query` so that all per-query counters accumulate into
218 /// the endpoint-level stats. Before this is called, stats bumps are no-ops
219 /// (the field is `None`).
220 #[cfg(feature = "stats")]
221 pub(crate) fn set_stats(&mut self, stats: std::sync::Arc<hick_trace::stats::Stats>) {
222 self.stats = Some(stats);
223 }
224
225 /// Borrow the stats handle if one has been attached.
226 #[cfg(feature = "stats")]
227 #[inline]
228 fn stat(&self) -> Option<&hick_trace::stats::Stats> {
229 self.stats.as_deref()
230 }
231
232 /// Override the maximum number of collected answers (default 256).
233 ///
234 /// When the pool reaches this limit the oldest entry (FIFO) is evicted to
235 /// make room for the incoming answer. Setting `max` to 0 disables collection
236 /// entirely.
237 #[must_use]
238 pub fn with_max_answers(mut self, max: usize) -> Self {
239 self.max_answers = max;
240 self
241 }
242
243 /// Set the maximum number of collected answers in place. Same semantics
244 /// as [`Self::with_max_answers`] but for use after construction (e.g. by
245 /// `Endpoint::try_start_query` when threading a `QuerySpec::max_answers`).
246 #[inline(always)]
247 pub fn set_max_answers(&mut self, max: usize) {
248 self.max_answers = max;
249 }
250
251 /// Returns the handle assigned at start.
252 #[inline(always)]
253 pub const fn handle(&self) -> QueryHandle {
254 self.handle
255 }
256 /// Returns the queried name.
257 #[inline(always)]
258 pub fn qname(&self) -> &Name {
259 &self.qname
260 }
261 /// Returns the queried record type.
262 #[inline(always)]
263 pub const fn qtype(&self) -> ResourceType {
264 self.qtype
265 }
266 /// Returns the queried class.
267 #[inline(always)]
268 pub const fn qclass(&self) -> ResourceClass {
269 self.qclass
270 }
271 /// Returns the transaction id used on outgoing queries.
272 #[inline(always)]
273 pub const fn txid(&self) -> u16 {
274 self.txid
275 }
276
277 /// Process an event routed to this query by the Endpoint.
278 pub fn handle_event(&mut self, event: QueryEvent<'_>) {
279 #[cfg(feature = "tracing")]
280 let _span = hick_trace::trace_span!("query", handle = self.handle.raw()).entered();
281 crate::trace::trace!(
282 target: "mdns_proto::query",
283 handle = self.handle.raw(),
284 event = ?core::mem::discriminant(&event),
285 "query: handle_event"
286 );
287 match event {
288 QueryEvent::Answer(record) => {
289 // TTL=0 records are mDNS "goodbye" / deletion records
290 // (RFC 6762 §10.1). Treating them as live answers would let a
291 // peer withdrawing a service inject a ghost entry into
292 // `collected_answers`, and under `max_answers` pressure could
293 // evict a real answer via FIFO. The cache layer already
294 // handles TTL=0 as removal; for active queries we simply
295 // ignore the record. Callers observe withdrawal indirectly
296 // via the cache.
297 if record.ttl() == 0 {
298 return;
299 }
300 // qtype filter: drop if rtype doesn't match (unless query is Any).
301 let qtype = self.qtype;
302 if !qtype.is_any() && record.rtype() != qtype {
303 return;
304 }
305 // qclass filter: drop if rclass doesn't match (unless query is Any).
306 let qclass = self.qclass;
307 if !qclass.is_any() && record.rclass() != qclass {
308 return;
309 }
310
311 // store the rdata in canonical (decompressed)
312 // wire form. PTR/SRV/NSEC rdata carries a domain name that responders
313 // (and this crate's own builder) may compress with a back-pointer into
314 // the packet; copying the raw slice would leave a dangling pointer the
315 // caller cannot decode once the datagram is gone, and two encodings of
316 // the same logical record would not dedupe. A malformed name drops the
317 // answer rather than storing undecodable bytes.
318 let owned = match record.canonical_rdata() {
319 Ok(v) => v,
320 Err(_) => return,
321 };
322 // the case-FOLDED identity key. Dedup/cap/coalescing compare
323 // this (DNS names are case-insensitive) so a responder can't flood the
324 // bounded answer set with case permutations of one record; `owned`
325 // keeps the original case for display.
326 let folded = match record.canonical_rdata_folded() {
327 Ok(v) => v,
328 Err(_) => return,
329 };
330 // only keep a SEPARATE key buffer when folding actually
331 // changed the bytes (a mixed-case name). For A/AAAA/TXT/unknown rdata —
332 // and names already lowercase — the folded form equals `owned`, so we
333 // store None and avoid doubling memory under a large-rdata flood.
334 let rdata_key = if folded == owned { None } else { Some(folded) };
335 let key: &[u8] = rdata_key.as_deref().unwrap_or(owned.as_ref());
336
337 // Dedupe: skip if a matching (rtype, rclass, folded-rdata) already in.
338 for (_, existing) in self.answers.iter() {
339 if existing.rtype() == record.rtype()
340 && existing.rclass() == record.rclass()
341 && existing.rdata_key() == key
342 {
343 crate::trace::trace!(
344 target: "mdns_proto::query",
345 handle = self.handle.raw(),
346 rtype = ?record.rtype(),
347 "query: answer deduped (already collected)"
348 );
349 return;
350 }
351 }
352
353 // A zero cap collects nothing.
354 if self.max_answers == 0 {
355 return;
356 }
357 // Make room before inserting. Evict the oldest (lowest-seq, true FIFO)
358 // entry when at the logical `max_answers` cap OR when the
359 // underlying pool has no vacant slot (a fixed-capacity pool
360 // smaller than `max_answers` would otherwise reject every new answer
361 // once full, since the `len >= max_answers` check never fires). One
362 // eviction frees room for exactly one insert.
363 if self.answers.len() >= self.max_answers || self.answers.vacant_key().is_err() {
364 let mut victim: Option<(usize, u64)> = None;
365 for (key, entry) in self.answers.iter() {
366 let s = entry.seq();
367 victim = Some(match victim {
368 // Existing candidate has a lower (older) seq — keep it.
369 Some(prev) if prev.1 <= s => prev,
370 _ => (key, s),
371 });
372 }
373 if let Some((victim_key, _)) = victim {
374 crate::trace::trace!(
375 target: "mdns_proto::query",
376 handle = self.handle.raw(),
377 "query: evicting oldest answer (cap reached)"
378 );
379 self.answers.try_remove(victim_key);
380 }
381 }
382
383 // Insert; advance `next_seq` ONLY on a successful insert so
384 // a dropped answer (a degenerate pool that cannot hold it even after
385 // eviction) is never accounted as collected — which would otherwise
386 // leave a gap in the FIFO seq ordering.
387 let new_seq = self.next_seq;
388 if self
389 .answers
390 .insert(CollectedAnswer {
391 rtype: record.rtype(),
392 rclass: record.rclass(),
393 rdata: owned,
394 rdata_key,
395 seq: new_seq,
396 })
397 .is_ok()
398 {
399 self.next_seq = self.next_seq.saturating_add(1);
400 crate::trace::trace!(
401 target: "mdns_proto::query",
402 handle = self.handle.raw(),
403 rtype = ?record.rtype(),
404 seq = new_seq,
405 "query: answer collected"
406 );
407 #[cfg(feature = "stats")]
408 if let Some(s) = self.stat() {
409 s.answers_collected(1);
410 }
411 }
412 }
413 QueryEvent::Truncated => {
414 // Hold off retry — more answers coming.
415 }
416 }
417 }
418
419 /// Next deadline for `handle_timeout`.
420 ///
421 /// Returns the earlier of `next_deadline` (next retry) and `timeout_deadline`
422 /// (absolute query cancellation). A driver that sleeps until this instant is
423 /// guaranteed to wake in time to fire the absolute timeout even when the next
424 /// retry is scheduled far in the future.
425 pub fn poll_timeout(&self) -> Option<I> {
426 match (self.next_deadline, self.timeout_deadline) {
427 (Some(n), Some(t)) => Some(if n < t { n } else { t }),
428 (Some(n), None) => Some(n),
429 (None, Some(t)) => Some(t),
430 (None, None) => None,
431 }
432 }
433
434 /// Route EVERY terminal transition through here. Idempotent: a no-op if
435 /// the query is already `done`. Sets `done = true`, queues the terminal
436 /// `QueryUpdate`, and under `#[cfg(feature="stats")]` bumps the correct
437 /// counter (`queries_timeout` or `queries_done`) and decrements
438 /// `queries_active` exactly once.
439 ///
440 /// Callers must pass the appropriate `update`:
441 /// * [`QueryUpdate::Timeout`] for timeout/retry-exhaustion/duplicate-question paths.
442 /// * [`QueryUpdate::Done`] for voluntary "done" paths (if/when added).
443 fn terminate(&mut self, update: QueryUpdate) {
444 if self.done {
445 return;
446 }
447 self.done = true;
448 self.transmit_pending = false;
449 let _ = self.pending_updates.insert(update);
450 self.next_deadline = None;
451 self.timeout_deadline = None;
452 #[cfg(feature = "stats")]
453 if let Some(s) = self.stat() {
454 match update {
455 QueryUpdate::Timeout => s.queries_timeout(1),
456 QueryUpdate::Done => {}
457 }
458 s.queries_done(1);
459 s.decr_queries_active(1);
460 }
461 }
462
463 /// Drive timer-based transitions.
464 pub fn handle_timeout(&mut self, now: I) -> Result<(), HandleTimeoutError> {
465 #[cfg(feature = "tracing")]
466 let _span = hick_trace::trace_span!("query", handle = self.handle.raw()).entered();
467 if self.done {
468 return Ok(());
469 }
470
471 // Check the absolute deadline before the per-retry deadline. A caller-
472 // supplied timeout takes priority over the built-in retry budget.
473 if let Some(td) = self.timeout_deadline
474 && now >= td
475 {
476 crate::trace::trace!(
477 target: "mdns_proto::query",
478 handle = self.handle.raw(),
479 "query: absolute timeout deadline reached"
480 );
481 self.terminate(QueryUpdate::Timeout);
482 return Ok(());
483 }
484
485 let due = match self.next_deadline {
486 Some(d) => d,
487 None => return Ok(()),
488 };
489 if now < due {
490 return Ok(());
491 }
492 // The scheduled retry is due. The retry budget is measured in datagrams
493 // actually sent (`retry_count`, incremented by `poll_transmit`); once the
494 // full budget is spent, retire the query instead of scheduling more.
495 if self.retry_count > MAX_RETRIES {
496 crate::trace::trace!(
497 target: "mdns_proto::query",
498 handle = self.handle.raw(),
499 retry_count = self.retry_count,
500 "query: retry budget exhausted — timeout"
501 );
502 self.terminate(QueryUpdate::Timeout);
503 } else {
504 // Mark a transmit due now and clear the deadline; `poll_transmit` emits
505 // the datagram and schedules the following retry. Clearing the deadline
506 // makes repeated `handle_timeout` calls before the drain no-ops, so a
507 // single fired tick yields exactly one retransmit.
508 crate::trace::trace!(
509 target: "mdns_proto::query",
510 handle = self.handle.raw(),
511 retry_count = self.retry_count,
512 "query: retry due — arming transmit"
513 );
514 self.transmit_pending = true;
515 self.next_deadline = None;
516 }
517 Ok(())
518 }
519
520 /// Force the query to its terminal TIMEOUT state at the DRIVER's request — used
521 /// when the transport can never send the question (e.g. a permanently-too-large
522 /// datagram on every reachable family), so the query would otherwise hang. This
523 /// is exactly the terminal a timer-driven timeout produces: it marks the query
524 /// `done` (so [`Self::is_done`] is true and `Endpoint::handle` freezes any late
525 /// answers) and queues a terminal [`QueryUpdate::Timeout`]. The collected answers
526 /// stay readable until the caller cancels. No-op if already done.
527 pub(crate) fn retire(&mut self) {
528 self.terminate(QueryUpdate::Timeout);
529 }
530
531 /// Produce the next outgoing datagram, if any. Writes into `buf`.
532 ///
533 /// Returns `Ok(None)` when the query is done or when no send is currently
534 /// due (i.e. `transmit_pending` is false). A single call per scheduled
535 /// deadline tick is guaranteed: the pending flag is cleared after the
536 /// datagram is built, so a driver looping on this method will not
537 /// re-send the query until the next `handle_timeout` fires.
538 pub fn poll_transmit(
539 &mut self,
540 _now: I,
541 buf: &mut [u8],
542 ) -> Result<Option<Transmit>, TransmitError> {
543 #[cfg(feature = "tracing")]
544 let _span = hick_trace::trace_span!("query", handle = self.handle.raw()).entered();
545 if self.done || !self.transmit_pending {
546 return Ok(None);
547 }
548 let buf_len = buf.len();
549 let header = Header::new().with_id(self.txid);
550 let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> = MessageBuilder::try_new(buf, header)
551 .map_err(|_| {
552 TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(
553 crate::wire::HEADER_SIZE,
554 buf_len,
555 ))
556 })?;
557 b.push_question(&self.qname, self.qtype, self.qclass, self.unicast_response)
558 .map_err(|_| TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(0, 0)))?;
559 let n = b
560 .finish()
561 .map_err(|_| TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(0, 0)))?;
562 self.transmit_pending = false;
563 // do NOT advance the retry budget or schedule the next retry
564 // here — the datagram has only been ENCODED. Await the driver's delivery
565 // result (`note_transmit_result`), which schedules the backoff on a
566 // confirmed send and re-attempts (without burning the budget) on failure.
567 self.awaiting_send_confirm = true;
568 crate::trace::debug!(
569 target: "mdns_proto::query",
570 handle = self.handle.raw(),
571 qname = self.qname.as_str(),
572 qtype = ?self.qtype,
573 bytes = n,
574 "query: poll_transmit emitting question"
575 );
576 Ok(Some(Transmit::new(
577 crate::service::multicast_dst(),
578 None,
579 n,
580 )))
581 }
582
583 /// Report the result of sending the datagram most recently produced by
584 /// [`Self::poll_transmit`]. The driver calls this after a send,
585 /// with `delivered = true` when at least one socket send succeeded.
586 ///
587 /// On a delivered send this counts the transmission against the §5.2 retry
588 /// budget and schedules the next retransmit (backoff: +1s, doubling, capped
589 /// at 60s). On a failed send the budget is NOT consumed and the query is
590 /// re-armed to re-attempt at the same interval — so a transient or total send
591 /// failure retries with backoff (no tight spin) and a query can never reach
592 /// its retry-budget timeout having put nothing on the wire.
593 pub fn note_transmit_result(&mut self, now: I, delivered: bool) {
594 if !self.awaiting_send_confirm {
595 return;
596 }
597 self.awaiting_send_confirm = false;
598 if delivered {
599 self.retry_count = self.retry_count.saturating_add(1);
600 self.next_deadline = retry::next_deadline(now, self.retry_count);
601 } else {
602 // Re-attempt this (undelivered) send after its backoff interval without
603 // advancing `retry_count`. `transmit_pending` stays false until the
604 // deadline fires, so the driver's drain loop does not spin.
605 self.next_deadline = retry::next_deadline(now, self.retry_count.saturating_add(1));
606 }
607 }
608
609 /// RFC 6762 §7.3 duplicate-question suppression. Another host has multicast
610 /// the SAME question that this query is ABOUT TO (re)transmit. Treat the
611 /// peer's query as our own ("treat its own query as having been sent"):
612 /// consume this retry slot and arm the next retransmit on the normal backoff,
613 /// without putting a redundant query on the wire — the peer's query elicits
614 /// the multicast answers we want.
615 ///
616 /// A retransmit is "imminent" either when it is already armed
617 /// (`transmit_pending`, the window between `handle_timeout` firing and
618 /// `poll_transmit` draining) OR when its `next_deadline` is already due but
619 /// not yet armed — the latter covers drivers that pump received packets BEFORE
620 /// firing query timeouts, so suppression does not depend on that ordering.
621 /// Either way it consumes exactly one retry slot: `transmit_pending`
622 /// is cleared and the due deadline is pushed forward, so a second duplicate in
623 /// the same slot is a no-op — suppression is idempotent per slot.
624 ///
625 /// The retry budget advances exactly as a real send would (and the query
626 /// retires via `MAX_RETRIES` here too), so a continuously-duplicated query
627 /// still progresses to its terminal timeout instead of being
628 /// deferred forever. An in-flight (awaiting-confirm) send is left alone.
629 ///
630 /// Returns `true` if a transmit slot was actually consumed (i.e. real
631 /// suppression happened) and `false` if the call was a no-op (query is
632 /// done, awaiting send confirmation, or no send was imminent). Callers use
633 /// the return value to decide whether to bump the
634 /// `duplicate_questions_suppressed` counter.
635 pub fn note_duplicate_question(&mut self, now: I) -> bool {
636 if self.done || self.awaiting_send_confirm {
637 return false;
638 }
639 let imminent = self.transmit_pending || self.next_deadline.is_some_and(|d| now >= d);
640 if !imminent {
641 return false;
642 }
643 self.transmit_pending = false;
644 self.retry_count = self.retry_count.saturating_add(1);
645 if self.retry_count > MAX_RETRIES {
646 // Budget spent (counting suppressed slots as our sends) — retire exactly
647 // as `handle_timeout` would after the final retransmit. Route through
648 // `terminate` so stats (queries_timeout, queries_done, decr_queries_active)
649 // are bumped exactly once on this path too.
650 self.terminate(QueryUpdate::Timeout);
651 // The slot was consumed even though the query is now terminal.
652 return true;
653 }
654 self.next_deadline = retry::next_deadline(now, self.retry_count);
655 true
656 }
657
658 /// Drain a pending app-level update.
659 pub fn poll(&mut self) -> Option<QueryUpdate> {
660 let key = self.pending_updates.iter().next().map(|(k, _)| k)?;
661 self.pending_updates.try_remove(key)
662 }
663
664 /// Has the query reached a terminal state? Backstop for
665 /// [`Endpoint::poll_query`](crate::endpoint::Endpoint::poll_query) under
666 /// EV-pool pressure: if `handle_timeout` cannot push the terminal
667 /// `QueryUpdate::Timeout`, `Endpoint::poll_query` falls back to this
668 /// flag and synthesises `Timeout`. External callers normally do NOT
669 /// need to consult this directly — drive `Endpoint::poll_query` and
670 /// react to its terminal return value.
671 #[inline(always)]
672 pub const fn is_done(&self) -> bool {
673 self.done
674 }
675
676 /// Has the terminal `QueryUpdate` already been delivered to the
677 /// caller via `Endpoint::poll_query`? Internal latch — set
678 /// the first time `poll_query` returns `Done`/`Timeout` so subsequent
679 /// calls return `None` instead of re-emitting (or worse, double-emitting
680 /// from both the `pending_updates` push AND the `is_done` backstop).
681 #[inline(always)]
682 pub(crate) const fn terminal_emitted(&self) -> bool {
683 self.terminal_emitted
684 }
685
686 /// Mark the terminal as delivered. Intended for
687 /// `Endpoint::poll_query` to call after returning `Done`/`Timeout`.
688 #[inline(always)]
689 pub(crate) fn mark_terminal_emitted(&mut self) {
690 self.terminal_emitted = true;
691 }
692
693 /// Iterate the answers collected so far by this query.
694 pub fn collected_answers(&self) -> impl Iterator<Item = &CollectedAnswer> + '_ {
695 self.answers.iter().map(|(_, a)| a)
696 }
697
698 /// Total number of answers ever accepted by this query, including ones
699 /// already evicted by the `max_answers` cap.
700 ///
701 /// Equal to the next sequence number to be assigned, so it is monotonic and
702 /// `>=` the highest `seq` currently in [`Self::collected_answers`]. A
703 /// consumer that delivers answers by ascending `seq` can compare this
704 /// against the count it has observed to detect (and count) answers the cap
705 /// evicted before they were read.
706 pub fn accepted_count(&self) -> u64 {
707 self.next_seq
708 }
709}