1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
//! Query state machine — retry backoff + answer collection + KAS hints.
mod retry;
use crate::{
Instant, Name, Pool, QueryHandle,
error::{HandleTimeoutError, TransmitError},
event::{QueryEvent, QueryUpdate},
transmit::Transmit,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceClass, ResourceType},
};
use bytes::Bytes;
#[cfg(all(test, feature = "std", feature = "slab"))]
mod tests;
/// Maximum retries before giving up.
const MAX_RETRIES: u32 = 8;
/// Default maximum number of collected answers per query.
const DEFAULT_MAX_ANSWERS: usize = 256;
/// One collected answer record for a Query.
///
/// Stores the resource type, class, and raw rdata bytes so that
/// deduplication, qtype/qclass filtering, and the answer cap can all
/// be applied before inserting into the pool.
#[derive(Debug, Clone)]
pub struct CollectedAnswer {
rtype: ResourceType,
rclass: ResourceClass,
rdata: Bytes,
/// the case-FOLDED identity form of `rdata` (PTR/SRV/NSEC/CNAME
/// names lowercased) used for dedup, cap accounting, and mailbox coalescing —
/// while `rdata` keeps the original case for display. Two answers that are
/// the same logical record differing only in DNS name case share this key,
/// so a responder cannot evict/flood the bounded answer set with case
/// permutations.
///
/// `None` means the folded form is byte-identical to `rdata` (the
/// common case: A/AAAA/TXT/unknown rdata, or a name already lowercase), so we
/// store ONLY one buffer — folding a large TXT/unknown flood does not double
/// per-answer memory. [`Self::rdata_key`] resolves `None` to `rdata`.
rdata_key: Option<Bytes>,
/// Monotonically increasing insertion sequence number within a single Query.
/// Used to identify the oldest entry for FIFO eviction.
seq: u64,
}
impl CollectedAnswer {
/// Construct an answer from its parts.
///
/// Hidden from the documented surface: the `Query` state machine builds
/// these internally, but downstream crates need a way to synthesize them
/// for tests and synthetic answer feeds. Synthetic answers carry opaque
/// rdata with no DNS-name semantics, so the dedup key is the rdata itself
/// (`rdata_key` is `None`, i.e. identical to `rdata`).
#[doc(hidden)]
pub fn from_parts(
rtype: ResourceType,
rclass: ResourceClass,
rdata: impl Into<Bytes>,
seq: u64,
) -> Self {
Self {
rtype,
rclass,
rdata: rdata.into(),
rdata_key: None,
seq,
}
}
/// The resource type of this answer.
#[inline(always)]
pub fn rtype(&self) -> ResourceType {
self.rtype
}
/// The resource class of this answer.
#[inline(always)]
pub fn rclass(&self) -> ResourceClass {
self.rclass
}
/// The raw rdata bytes of this answer, with DNS name case PRESERVED (for
/// display). For identity/dedup comparisons use [`Self::rdata_key`].
#[inline(always)]
pub fn rdata_slice(&self) -> &[u8] {
self.rdata.as_ref()
}
/// The case-FOLDED identity form of the rdata. Equal for two
/// answers that are the same logical record differing only in DNS name case;
/// callers coalescing/deduping answers should compare this, not
/// [`Self::rdata_slice`] (which preserves display case). resolves
/// to `rdata` when the folded form is identical (no separate buffer stored).
#[inline(always)]
pub fn rdata_key(&self) -> &[u8] {
self.rdata_key.as_deref().unwrap_or(self.rdata.as_ref())
}
/// Insertion sequence number (monotonically increasing per-Query).
///
/// Used for FIFO eviction: the entry with the lowest `seq` is the oldest.
#[inline(always)]
pub fn seq(&self) -> u64 {
self.seq
}
}
/// Query state machine. One per outstanding query.
pub struct Query<I, AN, EV> {
handle: QueryHandle,
#[cfg(feature = "stats")]
stats: Option<std::sync::Arc<hick_trace::stats::Stats>>,
qname: Name,
qtype: ResourceType,
qclass: ResourceClass,
txid: u16,
/// Number of datagrams sent so far (including the initial query).
/// Incremented by `poll_transmit`; drives both the §5.2 backoff interval
/// and the retry budget (`MAX_RETRIES`).
retry_count: u32,
next_deadline: Option<I>,
answers: AN,
pending_updates: EV,
done: bool,
/// latch tracking whether the terminal `QueryUpdate` has
/// already been returned to the caller via `Endpoint::poll_query`.
/// Used so the terminal is emitted exactly once even when both
/// `pending_updates` push and the `is_done` backstop would fire — and
/// to short-circuit subsequent `poll_query` calls on a terminated
/// query to `None`.
terminal_emitted: bool,
/// Maximum number of answers to collect before evicting the oldest (FIFO).
max_answers: usize,
/// Monotonic counter incremented on every successful answer insertion.
/// Each `CollectedAnswer` records the value at the time of its insertion;
/// eviction picks the entry with the lowest `next_seq` (i.e. the oldest).
next_seq: u64,
/// True when a datagram is ready to be built and sent.
/// Set on construction (first send is immediately due) and on each
/// `handle_timeout` tick that fires a retry; cleared after `poll_transmit`
/// consumes it. This prevents a driver looping on `poll_transmit` from
/// sending the same query continuously instead of honoring the backoff.
transmit_pending: bool,
/// set by `poll_transmit` for the datagram it just produced, and
/// cleared by `note_transmit_result` once the driver reports the send result.
/// The retry budget (`retry_count`) and the next-retry deadline are advanced
/// ONLY on a confirmed-delivered send — a datagram that fails on every socket
/// is re-attempted without consuming the budget, so a transient send failure
/// can never time out a query that never actually put a question on the wire.
awaiting_send_confirm: bool,
/// When `true`, questions are emitted with the QU bit set (RFC 6762 §5.4):
/// the sender prefers a unicast response rather than a multicast one.
unicast_response: bool,
/// Absolute instant at which this query should auto-cancel regardless of
/// the retry budget (`None` means no hard deadline beyond the retry budget).
timeout_deadline: Option<I>,
}
impl<I, AN, EV> Query<I, AN, EV>
where
I: Instant,
AN: Pool<CollectedAnswer>,
EV: Pool<QueryUpdate>,
{
/// Construct a new Query. Its first transmission is immediately due (the
/// next `poll_transmit` emits it); the retry schedule is then driven off
/// that send's instant, not construction time.
///
/// * `unicast_response` — when `true`, questions carry the QU bit (RFC 6762 §5.4).
/// * `timeout_deadline` — optional absolute instant at which the query auto-cancels.
#[allow(dead_code, clippy::too_many_arguments)]
pub(crate) fn try_new(
handle: QueryHandle,
qname: Name,
qtype: ResourceType,
qclass: ResourceClass,
txid: u16,
unicast_response: bool,
timeout_deadline: Option<I>,
) -> Self {
Self {
handle,
#[cfg(feature = "stats")]
stats: None,
qname,
qtype,
qclass,
txid,
retry_count: 0,
// No retry is scheduled yet: the first send is driven by
// `transmit_pending`, and `poll_transmit` schedules the first retry
// (+INITIAL_SECS) only after that send actually goes out. This keeps
// `poll_timeout` from returning `now` right after the first transmit,
// which would otherwise make a driver re-fire `handle_timeout` at `now`
// and collapse the first retry interval to zero / push it to 2s.
next_deadline: None,
answers: AN::new(),
pending_updates: EV::new(),
done: false,
terminal_emitted: false,
max_answers: DEFAULT_MAX_ANSWERS,
next_seq: 0,
transmit_pending: true,
awaiting_send_confirm: false,
unicast_response,
timeout_deadline,
}
}
/// Attach the shared [`hick_trace::stats::Stats`] handle from the owning
/// [`crate::endpoint::Endpoint`]. No allocation — the Arc is cloned from the
/// endpoint's existing single Arc. Called immediately after construction by
/// `Endpoint::try_start_query` so that all per-query counters accumulate into
/// the endpoint-level stats. Before this is called, stats bumps are no-ops
/// (the field is `None`).
#[cfg(feature = "stats")]
pub(crate) fn set_stats(&mut self, stats: std::sync::Arc<hick_trace::stats::Stats>) {
self.stats = Some(stats);
}
/// Borrow the stats handle if one has been attached.
#[cfg(feature = "stats")]
#[inline]
fn stat(&self) -> Option<&hick_trace::stats::Stats> {
self.stats.as_deref()
}
/// Override the maximum number of collected answers (default 256).
///
/// When the pool reaches this limit the oldest entry (FIFO) is evicted to
/// make room for the incoming answer. Setting `max` to 0 disables collection
/// entirely.
#[must_use]
pub fn with_max_answers(mut self, max: usize) -> Self {
self.max_answers = max;
self
}
/// Set the maximum number of collected answers in place. Same semantics
/// as [`Self::with_max_answers`] but for use after construction (e.g. by
/// `Endpoint::try_start_query` when threading a `QuerySpec::max_answers`).
#[inline(always)]
pub fn set_max_answers(&mut self, max: usize) {
self.max_answers = max;
}
/// Returns the handle assigned at start.
#[inline(always)]
pub const fn handle(&self) -> QueryHandle {
self.handle
}
/// Returns the queried name.
#[inline(always)]
pub fn qname(&self) -> &Name {
&self.qname
}
/// Returns the queried record type.
#[inline(always)]
pub const fn qtype(&self) -> ResourceType {
self.qtype
}
/// Returns the queried class.
#[inline(always)]
pub const fn qclass(&self) -> ResourceClass {
self.qclass
}
/// Returns the transaction id used on outgoing queries.
#[inline(always)]
pub const fn txid(&self) -> u16 {
self.txid
}
/// Process an event routed to this query by the Endpoint.
pub fn handle_event(&mut self, event: QueryEvent<'_>) {
#[cfg(feature = "tracing")]
let _span = hick_trace::trace_span!("query", handle = self.handle.raw()).entered();
crate::trace::trace!(
target: "mdns_proto::query",
handle = self.handle.raw(),
event = ?core::mem::discriminant(&event),
"query: handle_event"
);
match event {
QueryEvent::Answer(record) => {
// TTL=0 records are mDNS "goodbye" / deletion records
// (RFC 6762 §10.1). Treating them as live answers would let a
// peer withdrawing a service inject a ghost entry into
// `collected_answers`, and under `max_answers` pressure could
// evict a real answer via FIFO. The cache layer already
// handles TTL=0 as removal; for active queries we simply
// ignore the record. Callers observe withdrawal indirectly
// via the cache.
if record.ttl() == 0 {
return;
}
// qtype filter: drop if rtype doesn't match (unless query is Any).
let qtype = self.qtype;
if !qtype.is_any() && record.rtype() != qtype {
return;
}
// qclass filter: drop if rclass doesn't match (unless query is Any).
let qclass = self.qclass;
if !qclass.is_any() && record.rclass() != qclass {
return;
}
// store the rdata in canonical (decompressed)
// wire form. PTR/SRV/NSEC rdata carries a domain name that responders
// (and this crate's own builder) may compress with a back-pointer into
// the packet; copying the raw slice would leave a dangling pointer the
// caller cannot decode once the datagram is gone, and two encodings of
// the same logical record would not dedupe. A malformed name drops the
// answer rather than storing undecodable bytes.
let owned = match record.canonical_rdata() {
Ok(v) => v,
Err(_) => return,
};
// the case-FOLDED identity key. Dedup/cap/coalescing compare
// this (DNS names are case-insensitive) so a responder can't flood the
// bounded answer set with case permutations of one record; `owned`
// keeps the original case for display.
let folded = match record.canonical_rdata_folded() {
Ok(v) => v,
Err(_) => return,
};
// only keep a SEPARATE key buffer when folding actually
// changed the bytes (a mixed-case name). For A/AAAA/TXT/unknown rdata —
// and names already lowercase — the folded form equals `owned`, so we
// store None and avoid doubling memory under a large-rdata flood.
let rdata_key = if folded == owned { None } else { Some(folded) };
let key: &[u8] = rdata_key.as_deref().unwrap_or(owned.as_ref());
// Dedupe: skip if a matching (rtype, rclass, folded-rdata) already in.
for (_, existing) in self.answers.iter() {
if existing.rtype() == record.rtype()
&& existing.rclass() == record.rclass()
&& existing.rdata_key() == key
{
crate::trace::trace!(
target: "mdns_proto::query",
handle = self.handle.raw(),
rtype = ?record.rtype(),
"query: answer deduped (already collected)"
);
return;
}
}
// A zero cap collects nothing.
if self.max_answers == 0 {
return;
}
// Make room before inserting. Evict the oldest (lowest-seq, true FIFO)
// entry when at the logical `max_answers` cap OR when the
// underlying pool has no vacant slot (a fixed-capacity pool
// smaller than `max_answers` would otherwise reject every new answer
// once full, since the `len >= max_answers` check never fires). One
// eviction frees room for exactly one insert.
if self.answers.len() >= self.max_answers || self.answers.vacant_key().is_err() {
let mut victim: Option<(usize, u64)> = None;
for (key, entry) in self.answers.iter() {
let s = entry.seq();
victim = Some(match victim {
// Existing candidate has a lower (older) seq — keep it.
Some(prev) if prev.1 <= s => prev,
_ => (key, s),
});
}
if let Some((victim_key, _)) = victim {
crate::trace::trace!(
target: "mdns_proto::query",
handle = self.handle.raw(),
"query: evicting oldest answer (cap reached)"
);
self.answers.try_remove(victim_key);
}
}
// Insert; advance `next_seq` ONLY on a successful insert so
// a dropped answer (a degenerate pool that cannot hold it even after
// eviction) is never accounted as collected — which would otherwise
// leave a gap in the FIFO seq ordering.
let new_seq = self.next_seq;
if self
.answers
.insert(CollectedAnswer {
rtype: record.rtype(),
rclass: record.rclass(),
rdata: owned,
rdata_key,
seq: new_seq,
})
.is_ok()
{
self.next_seq = self.next_seq.saturating_add(1);
crate::trace::trace!(
target: "mdns_proto::query",
handle = self.handle.raw(),
rtype = ?record.rtype(),
seq = new_seq,
"query: answer collected"
);
#[cfg(feature = "stats")]
if let Some(s) = self.stat() {
s.answers_collected(1);
}
}
}
QueryEvent::Truncated => {
// Hold off retry — more answers coming.
}
}
}
/// Next deadline for `handle_timeout`.
///
/// Returns the earlier of `next_deadline` (next retry) and `timeout_deadline`
/// (absolute query cancellation). A driver that sleeps until this instant is
/// guaranteed to wake in time to fire the absolute timeout even when the next
/// retry is scheduled far in the future.
pub fn poll_timeout(&self) -> Option<I> {
match (self.next_deadline, self.timeout_deadline) {
(Some(n), Some(t)) => Some(if n < t { n } else { t }),
(Some(n), None) => Some(n),
(None, Some(t)) => Some(t),
(None, None) => None,
}
}
/// Route EVERY terminal transition through here. Idempotent: a no-op if
/// the query is already `done`. Sets `done = true`, queues the terminal
/// `QueryUpdate`, and under `#[cfg(feature="stats")]` bumps the correct
/// counter (`queries_timeout` or `queries_done`) and decrements
/// `queries_active` exactly once.
///
/// Callers must pass the appropriate `update`:
/// * [`QueryUpdate::Timeout`] for timeout/retry-exhaustion/duplicate-question paths.
/// * [`QueryUpdate::Done`] for voluntary "done" paths (if/when added).
fn terminate(&mut self, update: QueryUpdate) {
if self.done {
return;
}
self.done = true;
self.transmit_pending = false;
let _ = self.pending_updates.insert(update);
self.next_deadline = None;
self.timeout_deadline = None;
#[cfg(feature = "stats")]
if let Some(s) = self.stat() {
match update {
QueryUpdate::Timeout => s.queries_timeout(1),
QueryUpdate::Done => {}
}
s.queries_done(1);
s.decr_queries_active(1);
}
}
/// Drive timer-based transitions.
pub fn handle_timeout(&mut self, now: I) -> Result<(), HandleTimeoutError> {
#[cfg(feature = "tracing")]
let _span = hick_trace::trace_span!("query", handle = self.handle.raw()).entered();
if self.done {
return Ok(());
}
// Check the absolute deadline before the per-retry deadline. A caller-
// supplied timeout takes priority over the built-in retry budget.
if let Some(td) = self.timeout_deadline
&& now >= td
{
crate::trace::trace!(
target: "mdns_proto::query",
handle = self.handle.raw(),
"query: absolute timeout deadline reached"
);
self.terminate(QueryUpdate::Timeout);
return Ok(());
}
let due = match self.next_deadline {
Some(d) => d,
None => return Ok(()),
};
if now < due {
return Ok(());
}
// The scheduled retry is due. The retry budget is measured in datagrams
// actually sent (`retry_count`, incremented by `poll_transmit`); once the
// full budget is spent, retire the query instead of scheduling more.
if self.retry_count > MAX_RETRIES {
crate::trace::trace!(
target: "mdns_proto::query",
handle = self.handle.raw(),
retry_count = self.retry_count,
"query: retry budget exhausted — timeout"
);
self.terminate(QueryUpdate::Timeout);
} else {
// Mark a transmit due now and clear the deadline; `poll_transmit` emits
// the datagram and schedules the following retry. Clearing the deadline
// makes repeated `handle_timeout` calls before the drain no-ops, so a
// single fired tick yields exactly one retransmit.
crate::trace::trace!(
target: "mdns_proto::query",
handle = self.handle.raw(),
retry_count = self.retry_count,
"query: retry due — arming transmit"
);
self.transmit_pending = true;
self.next_deadline = None;
}
Ok(())
}
/// Force the query to its terminal TIMEOUT state at the DRIVER's request — used
/// when the transport can never send the question (e.g. a permanently-too-large
/// datagram on every reachable family), so the query would otherwise hang. This
/// is exactly the terminal a timer-driven timeout produces: it marks the query
/// `done` (so [`Self::is_done`] is true and `Endpoint::handle` freezes any late
/// answers) and queues a terminal [`QueryUpdate::Timeout`]. The collected answers
/// stay readable until the caller cancels. No-op if already done.
pub(crate) fn retire(&mut self) {
self.terminate(QueryUpdate::Timeout);
}
/// Produce the next outgoing datagram, if any. Writes into `buf`.
///
/// Returns `Ok(None)` when the query is done or when no send is currently
/// due (i.e. `transmit_pending` is false). A single call per scheduled
/// deadline tick is guaranteed: the pending flag is cleared after the
/// datagram is built, so a driver looping on this method will not
/// re-send the query until the next `handle_timeout` fires.
pub fn poll_transmit(
&mut self,
_now: I,
buf: &mut [u8],
) -> Result<Option<Transmit>, TransmitError> {
#[cfg(feature = "tracing")]
let _span = hick_trace::trace_span!("query", handle = self.handle.raw()).entered();
if self.done || !self.transmit_pending {
return Ok(None);
}
let buf_len = buf.len();
let header = Header::new().with_id(self.txid);
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> = MessageBuilder::try_new(buf, header)
.map_err(|_| {
TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(
crate::wire::HEADER_SIZE,
buf_len,
))
})?;
b.push_question(&self.qname, self.qtype, self.qclass, self.unicast_response)
.map_err(|_| TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(0, 0)))?;
let n = b
.finish()
.map_err(|_| TransmitError::BufferTooSmall(crate::error::BufferTooSmallDetail::new(0, 0)))?;
self.transmit_pending = false;
// do NOT advance the retry budget or schedule the next retry
// here — the datagram has only been ENCODED. Await the driver's delivery
// result (`note_transmit_result`), which schedules the backoff on a
// confirmed send and re-attempts (without burning the budget) on failure.
self.awaiting_send_confirm = true;
crate::trace::debug!(
target: "mdns_proto::query",
handle = self.handle.raw(),
qname = self.qname.as_str(),
qtype = ?self.qtype,
bytes = n,
"query: poll_transmit emitting question"
);
Ok(Some(Transmit::new(
crate::service::multicast_dst(),
None,
n,
)))
}
/// Report the result of sending the datagram most recently produced by
/// [`Self::poll_transmit`]. The driver calls this after a send,
/// with `delivered = true` when at least one socket send succeeded.
///
/// On a delivered send this counts the transmission against the §5.2 retry
/// budget and schedules the next retransmit (backoff: +1s, doubling, capped
/// at 60s). On a failed send the budget is NOT consumed and the query is
/// re-armed to re-attempt at the same interval — so a transient or total send
/// failure retries with backoff (no tight spin) and a query can never reach
/// its retry-budget timeout having put nothing on the wire.
pub fn note_transmit_result(&mut self, now: I, delivered: bool) {
if !self.awaiting_send_confirm {
return;
}
self.awaiting_send_confirm = false;
if delivered {
self.retry_count = self.retry_count.saturating_add(1);
self.next_deadline = retry::next_deadline(now, self.retry_count);
} else {
// Re-attempt this (undelivered) send after its backoff interval without
// advancing `retry_count`. `transmit_pending` stays false until the
// deadline fires, so the driver's drain loop does not spin.
self.next_deadline = retry::next_deadline(now, self.retry_count.saturating_add(1));
}
}
/// RFC 6762 §7.3 duplicate-question suppression. Another host has multicast
/// the SAME question that this query is ABOUT TO (re)transmit. Treat the
/// peer's query as our own ("treat its own query as having been sent"):
/// consume this retry slot and arm the next retransmit on the normal backoff,
/// without putting a redundant query on the wire — the peer's query elicits
/// the multicast answers we want.
///
/// A retransmit is "imminent" either when it is already armed
/// (`transmit_pending`, the window between `handle_timeout` firing and
/// `poll_transmit` draining) OR when its `next_deadline` is already due but
/// not yet armed — the latter covers drivers that pump received packets BEFORE
/// firing query timeouts, so suppression does not depend on that ordering.
/// Either way it consumes exactly one retry slot: `transmit_pending`
/// is cleared and the due deadline is pushed forward, so a second duplicate in
/// the same slot is a no-op — suppression is idempotent per slot.
///
/// The retry budget advances exactly as a real send would (and the query
/// retires via `MAX_RETRIES` here too), so a continuously-duplicated query
/// still progresses to its terminal timeout instead of being
/// deferred forever. An in-flight (awaiting-confirm) send is left alone.
///
/// Returns `true` if a transmit slot was actually consumed (i.e. real
/// suppression happened) and `false` if the call was a no-op (query is
/// done, awaiting send confirmation, or no send was imminent). Callers use
/// the return value to decide whether to bump the
/// `duplicate_questions_suppressed` counter.
pub fn note_duplicate_question(&mut self, now: I) -> bool {
if self.done || self.awaiting_send_confirm {
return false;
}
let imminent = self.transmit_pending || self.next_deadline.is_some_and(|d| now >= d);
if !imminent {
return false;
}
self.transmit_pending = false;
self.retry_count = self.retry_count.saturating_add(1);
if self.retry_count > MAX_RETRIES {
// Budget spent (counting suppressed slots as our sends) — retire exactly
// as `handle_timeout` would after the final retransmit. Route through
// `terminate` so stats (queries_timeout, queries_done, decr_queries_active)
// are bumped exactly once on this path too.
self.terminate(QueryUpdate::Timeout);
// The slot was consumed even though the query is now terminal.
return true;
}
self.next_deadline = retry::next_deadline(now, self.retry_count);
true
}
/// Drain a pending app-level update.
pub fn poll(&mut self) -> Option<QueryUpdate> {
let key = self.pending_updates.iter().next().map(|(k, _)| k)?;
self.pending_updates.try_remove(key)
}
/// Has the query reached a terminal state? Backstop for
/// [`Endpoint::poll_query`](crate::endpoint::Endpoint::poll_query) under
/// EV-pool pressure: if `handle_timeout` cannot push the terminal
/// `QueryUpdate::Timeout`, `Endpoint::poll_query` falls back to this
/// flag and synthesises `Timeout`. External callers normally do NOT
/// need to consult this directly — drive `Endpoint::poll_query` and
/// react to its terminal return value.
#[inline(always)]
pub const fn is_done(&self) -> bool {
self.done
}
/// Has the terminal `QueryUpdate` already been delivered to the
/// caller via `Endpoint::poll_query`? Internal latch — set
/// the first time `poll_query` returns `Done`/`Timeout` so subsequent
/// calls return `None` instead of re-emitting (or worse, double-emitting
/// from both the `pending_updates` push AND the `is_done` backstop).
#[inline(always)]
pub(crate) const fn terminal_emitted(&self) -> bool {
self.terminal_emitted
}
/// Mark the terminal as delivered. Intended for
/// `Endpoint::poll_query` to call after returning `Done`/`Timeout`.
#[inline(always)]
pub(crate) fn mark_terminal_emitted(&mut self) {
self.terminal_emitted = true;
}
/// Iterate the answers collected so far by this query.
pub fn collected_answers(&self) -> impl Iterator<Item = &CollectedAnswer> + '_ {
self.answers.iter().map(|(_, a)| a)
}
/// Total number of answers ever accepted by this query, including ones
/// already evicted by the `max_answers` cap.
///
/// Equal to the next sequence number to be assigned, so it is monotonic and
/// `>=` the highest `seq` currently in [`Self::collected_answers`]. A
/// consumer that delivers answers by ascending `seq` can compare this
/// against the count it has observed to detect (and count) answers the cap
/// evicted before they were read.
pub fn accepted_count(&self) -> u64 {
self.next_seq
}
}