1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
//! The `RouteEvents` iterator: demuxes one inbound message into routing events.
use super::*;
/// Iterator over routing decisions for a single incoming datagram.
///
/// Borrows the endpoint mutably for the duration of iteration so that
/// `QueryEvent::Answer` events can be applied to the internal
/// [`Query`] state machines as they are yielded —
/// callers do not need to dispatch query events themselves. Service
/// events still flow to the caller via the yielded [`RouteEvent`]s.
pub struct RouteEvents<'a, 'e, I, R, C, SR, QS, EV, AN, EvQ>
where
I: Instant,
R: Rng,
C: Pool<CacheEntry<I>>,
SR: Pool<ServiceRoute>,
QS: Pool<Query<I, AN, EvQ>>,
EV: Pool<EndpointEventEntry>,
AN: Pool<CollectedAnswer>,
EvQ: Pool<QueryUpdate>,
{
pub(crate) src: SocketAddr,
pub(crate) endpoint: &'e mut Endpoint<I, R, C, SR, QS, EV, AN, EvQ>,
pub(crate) reader: MessageReader<'a>,
/// `true` when the QR bit is set (this is a response, not a query).
/// Used to gate KnownAnswer-suppression routing: KAS hints must only be
/// extracted from QUERY packets (QR=0); response packets must not poison
/// the KAS ring.
pub(crate) is_response: bool,
pub(crate) question_idx: u16,
/// Per-question service cursor: the slab key from which to resume iterating
/// services for the current question. Allows ALL matching services to receive
/// a `ServiceEvent::Question` for a single question before advancing to the
/// next question.
pub(crate) service_cursor: usize,
pub(crate) answer_idx: u16,
pub(crate) authority_idx: u16,
/// Stashed query event behind a higher-priority service event (e.g. a
/// `ProbeConflict` or `KnownAnswer` returns first and the first matching
/// `QueryEvent::Answer` for the same record drains on the next call).
pub(crate) pending_query: Option<RouteEvent<'a>>,
/// cursor for fanning out additional matching query routes for
/// the current `answer_idx` across multiple `next()` calls without
/// buffering events. `None` means "have not started query fan-out for
/// this record" — `next()` does the service-side pass plus finds the
/// FIRST matching query. `Some(k)` means "mid fan-out — resume scanning
/// `self.queries` from slab key `k`." Resets to `None` whenever
/// `answer_idx` advances, so the cursor is always paired with the
/// current answer record.
///
/// Replaces the unbounded `std::Vec` buffering — that
/// allocated on the inbound packet path with infallible `push`, which
/// under allocator pressure aborts/panics instead of surfacing an
/// error, and used `Vec::remove(0)` for drain (O(n²) on large
/// fan-outs). The cursor model is O(1) state per record, O(n) total
/// work, and never allocates.
pub(crate) answer_query_cursor: Option<usize>,
/// cursor for fanning out KnownAnswer / ProbeConflict /
/// HostConflict events across multiple registered services that all
/// match the same answer record (e.g. several services sharing a
/// `service_type` for a PTR known-answer hint, or multiple services
/// sharing a host name). Same shape as `answer_query_cursor`:
/// `None` = haven't started service-side fan-out for this record;
/// `Some(k)` = resume scan from slab key `k`. Reset to `None` when
/// `answer_idx` advances.
///
/// Previously the service-side scan stopped at the first matching
/// service, so the actual owning service of a PTR known-answer
/// (which had the right rdata to suppress) never received the hint —
/// the unrelated first-matching service got it instead and ignored
/// it by rdata mismatch.
pub(crate) answer_service_cursor: Option<usize>,
/// whether the answer-record service-phase fan-out is COMPLETE for
/// the current `answer_idx`. `answer_service_cursor` alone is ambiguous
/// (`None` means both "not started" and "exhausted"), so after a query event
/// returns mid-record, re-entry would re-scan services and replay conflict
/// events. This flag gates the service phase; it is reset only when
/// `answer_idx` advances.
pub(crate) answer_service_done: bool,
/// cursor for fanning out authority-section conflict events
/// (ProbeConflict / HostConflict) across multiple services that
/// share a host name. `None` = haven't started fan-out for the
/// current authority record; `Some(k)` = resume scan from slab key
/// `k`. Same shape as the other answer/question cursors.
///
/// Previously the authority-section loop broke on the first
/// matching service and advanced `authority_idx`, so a peer probe
/// for a shared host name reached only one of the services
/// sharing that host; the rest never received the HostConflict
/// signal.
pub(crate) authority_service_cursor: Option<usize>,
/// When a QUERY-packet answer matches a registered service for both a
/// ProbeConflict and a KnownAnswer event, we emit ProbeConflict first and
/// stash the KnownAnswer here for the subsequent call.
pub(crate) pending_service_event: Option<RouteEvent<'a>>,
/// index into the ADDITIONAL section, plus the
/// service-conflict and query fan-out cursors for the current additional
/// record (same shape as the answer-section cursors). DNS-SD responders carry
/// SRV/TXT/A/AAAA here, so QR=1 additionals run conflict detection (instance
/// SRV/TXT → ProbeConflict, host A/AAAA → HostConflict) AND query fan-out —
/// but never KAS (additionals are not known-answer hints).
pub(crate) additional_idx: u16,
pub(crate) additional_service_cursor: Option<usize>,
/// like `answer_service_done`, marks the additional-record
/// service-phase fan-out complete for the current `additional_idx` so a query
/// event mid-record cannot cause the conflict events to replay on re-entry.
pub(crate) additional_service_done: bool,
pub(crate) additional_query_cursor: Option<usize>,
pub(crate) section: Section,
}
#[derive(Copy, Clone)]
pub(crate) enum Section {
Questions,
Answers,
Authority,
Additional,
Done,
}
impl<'a, I, R, C, SR, QS, EV, AN, EvQ> RouteEvents<'a, '_, I, R, C, SR, QS, EV, AN, EvQ>
where
I: Instant,
R: Rng,
C: Pool<CacheEntry<I>>,
SR: Pool<ServiceRoute>,
QS: Pool<Query<I, AN, EvQ>>,
EV: Pool<EndpointEventEntry>,
AN: Pool<CollectedAnswer>,
EvQ: Pool<QueryUpdate>,
{
/// the ONE conflict-routing decision for a QR=1 record `r`,
/// shared by the Answers, Authority, and Additional sections (previously
/// triplicated). Scans registered services from slab key `start` and returns
/// the next `(key, event)`:
/// * instance-name match + SRV/TXT → ProbeConflict (the instance's unique
/// RRset; service-type / shared names are never conflicts);
/// * host-name match + A/AAAA → HostConflict.
///
/// conflicts are only routed for class-IN records — a record with
/// class ANY or an unknown class is not the same-class RRset RFC 6762 §9
/// requires, so it must not drive rename / host-conflict surfacing.
fn next_service_conflict(
&self,
r: &crate::wire::Ref<'a>,
start: usize,
) -> Option<(usize, RouteEvent<'a>)> {
if r.rclass() != ResourceClass::In {
return None;
}
for (key, route) in self.endpoint.services.iter() {
if key < start {
continue;
}
// A withdrawing route's service is being torn down (only its goodbye is still
// draining) — never route a conflict to it. The route is retained for the
// name guard, but dispatching ProbeConflict/HostConflict here would feed
// terminal events into a proto the driver no longer drains (it skips
// withdrawing/errored contexts), letting a peer flood the proto event slab of
// a retiring service until GC — a bounded-time but unbounded-size growth path
//. Mirrors the question-dispatch and known-answer skips.
#[cfg(any(feature = "alloc", feature = "std", feature = "no-atomic"))]
if route.withdrawing {
continue;
}
if names_match_record(route.name(), r) && is_instance_conflict_rtype(r.rtype()) {
return Some((
key,
RouteEvent::ToService(ToService::new(
route.handle(),
ServiceEvent::ProbeConflict(ProbeConflict::new(self.src, *r)),
)),
));
}
if names_match_record(route.host(), r) && is_host_conflict_rtype(r.rtype()) {
return Some((
key,
RouteEvent::ToService(ToService::new(
route.handle(),
ServiceEvent::HostConflict(HostConflict::new(*r)),
)),
));
}
}
None
}
}
impl<'a, I, R, C, SR, QS, EV, AN, EvQ> Iterator
for RouteEvents<'a, '_, I, R, C, SR, QS, EV, AN, EvQ>
where
I: Instant,
R: Rng,
C: Pool<CacheEntry<I>>,
SR: Pool<ServiceRoute>,
QS: Pool<Query<I, AN, EvQ>>,
EV: Pool<EndpointEventEntry>,
AN: Pool<CollectedAnswer>,
EvQ: Pool<QueryUpdate>,
{
type Item = Result<RouteEvent<'a>, HandleError>;
fn next(&mut self) -> Option<Self::Item> {
// Flush pending stashed events in priority order before processing the
// next record. Order: ProbeConflict / KnownAnswer stash (service event)
// first, then query Answer stash.
if let Some(ev) = self.pending_service_event.take() {
return Some(Ok(ev));
}
if let Some(ev) = self.pending_query.take() {
return Some(Ok(ev));
}
loop {
match self.section {
Section::Questions => {
// gate question→service routing on
// `EndpointConfig::answer_questions`. When disabled, no
// `ServiceEvent::Question` events fire at all, so registered
// services never schedule responses to inbound queries.
// This is the "advertise but don't respond" / passive mode.
if !self.endpoint.config.answer_questions() {
self.section = Section::Answers;
continue;
}
if self.question_idx >= self.reader.header().question_count() {
self.section = Section::Answers;
continue;
}
let mut qs = self.reader.questions();
for _ in 0..self.question_idx {
let _ = qs.next();
}
let q = match qs.next() {
Some(Ok(q)) => q,
Some(Err(e)) => {
// skip the rest of the questions section after a
// parse error so the iterator terminates instead of
// looping on the same error indefinitely.
// parse_errors was already bumped by the upfront section-
// validation latch in Endpoint::handle — do NOT bump again.
self.section = Section::Answers;
return Some(Err(HandleError::Parse(e)));
}
None => {
self.section = Section::Answers;
continue;
}
};
// Walk services starting from the saved cursor, looking for the next
// match. This allows ALL services sharing the same PTR name to each
// receive a ServiceEvent::Question for this question before we move on.
let cursor = self.service_cursor;
let mut found: Option<(usize, RouteEvent<'a>)> = None;
for (key, route) in self.endpoint.services.iter() {
if key < cursor {
continue;
}
// A withdrawing route's service is gone (only its goodbye is still
// draining) — never route an incoming question to it, or it could
// emit a positive-TTL answer contradicting its own TTL=0 goodbye.
// The route is still present for the name guard, just not answered.
#[cfg(any(feature = "alloc", feature = "std", feature = "no-atomic"))]
if route.withdrawing {
continue;
}
if names_match(route.name(), q.qname())
|| names_match(route.service_type(), q.qname())
|| names_match(route.host(), q.qname())
|| route
.subtypes
.iter()
.any(|s| names_match(s, q.qname()))
// RFC 6763 §9 service-type enumeration: route the meta-query to
// EVERY service; each answerable (Established/Announcing) one emits
// its own type PTR, and the cursor below delivers it to each in
// turn. An earlier per-type dedup (route only the
// lowest-keyed service of each type) is UNSAFE here — routing has
// no visibility into Service lifecycle state, so it could pick a
// probing/non-answering representative and mask an Established
// same-type sibling, leaving a live type unanswered. Two instances
// of one type emitting the identical meta-PTR is benign (receivers
// dedup the identical RR); true per-type dedup would require
// state-aware, cross-service handling at the driver layer.
|| is_meta_query_name(q.qname())
{
found = Some((
key,
RouteEvent::ToService(ToService::new(
route.handle(),
ServiceEvent::Question(
ServiceQuestion::new(q, self.src, self.reader.header().id())
// RFC 6762 §7.2: a TC-bit query spreads its known answers
// across multiple packets — the responder delays longer so
// the follow-up packets accumulate before it suppresses.
.with_truncated(self.reader.header().flags().is_truncated()),
),
)),
));
break;
}
}
if let Some((key, ev)) = found {
// Advance the service cursor past this key so the next call picks up
// where we left off within the same question.
self.service_cursor = key.saturating_add(1);
return Some(Ok(ev));
}
// No more matching services for this question: advance to the next
// question and reset the per-question service cursor.
self.question_idx = self.question_idx.saturating_add(1);
self.service_cursor = 0;
continue;
}
Section::Answers => {
if self.answer_idx >= self.reader.header().answer_count() {
self.section = Section::Authority;
continue;
}
let mut ans = self.reader.answers();
for _ in 0..self.answer_idx {
let _ = ans.next();
}
let r = match ans.next() {
Some(Ok(r)) => r,
Some(Err(e)) => {
// skip the rest of the answers section after a
// parse error so the iterator terminates instead of
// looping on the same error indefinitely.
// parse_errors was already bumped in the eager walk in
// Endpoint::handle (which covers answers AND additionals);
// do NOT bump it here to avoid double-counting.
self.section = Section::Authority;
return Some(Err(HandleError::Parse(e)));
}
None => {
self.section = Section::Authority;
continue;
}
};
// route-level TTL=0 guard. Records with TTL=0 are
// mDNS "goodbye" / deletion signals (RFC 6762 §10.1) — the
// cache layer already processes them as removals during the
// eager loop in `Endpoint::handle`, and `Query::handle_event`
// rejects them at the eager-mutation step. The
// remaining hazard is the iterator: emitting service events
// (ProbeConflict / HostConflict / KnownAnswer) for a goodbye
// would let a peer withdrawing a record trigger our auto-
// rename or HostConflict surfacing, and emitting ToQuery
// for a goodbye would let callers receive ghost "answers"
// from records being withdrawn. Skip the whole fan-out for
// TTL=0 — cache removal is the only correct side effect.
if r.ttl() == 0 {
self.answer_idx = self.answer_idx.saturating_add(1);
self.answer_service_cursor = None;
self.answer_service_done = false;
self.answer_query_cursor = None;
continue;
}
// Service-side fan-out for answer-section records.
//
// QR=0: records are KAS hints from another querier.
// Emit only KnownAnswer (for KAS suppression on probes).
// ProbeConflict / HostConflict are NEVER emitted here —
// letting a hostile querier trigger our auto-rename by
// mentioning our names in the answer section would be a
// trivial denial-of-service vector.
//
// QR=1: records are AUTHORITATIVE peer responses.
// Per RFC 6762 §8.1, a probing host MUST treat any
// response (solicited or unsolicited) claiming one of its
// tentative names as a conflict event. Emit
// ProbeConflict for instance-name matches and HostConflict
// for host-name matches. Service-type (shared) matches
// are NOT conflicts (multiple services share a type).
//
// Authority-section records (Section::Authority) fire
// ProbeConflict / HostConflict regardless of QR — those are
// tentative-probe records.
if !self.answer_service_done {
let start = self.answer_service_cursor.unwrap_or(0);
let next_event = if self.is_response {
// QR=1: ProbeConflict / HostConflict via the shared conflict
// helper (name + rtype + class gates). Service-type (shared)
// names are never conflicts.
self.next_service_conflict(&r, start)
} else {
// QR=0: records are KAS hints. ANY name match (instance / host /
// service-type) emits a KnownAnswer for suppression — conflicts
// are NEVER routed for QR=0 (a hostile querier mentioning our
// names must not trigger auto-rename).
//
// a QR=0 PTR owned by the DNS-SD service-type enumeration
// meta name is a known-answer for the §9 meta reply. Its owner is
// none of our RRset names, so fan it out to EVERY service (mirrors
// how the meta QUESTION routes to all of them); each
// service decides whether the PTR target matches its own type.
let mut found: Option<(usize, RouteEvent<'a>)> = None;
for (key, route) in self.endpoint.services.iter() {
if key < start {
continue;
}
// A withdrawing route's service is being torn down — never route a
// known-answer to it either, matching the question-dispatch and
// conflict skips (no dispatch after retirement).
#[cfg(any(feature = "alloc", feature = "std", feature = "no-atomic"))]
if route.withdrawing {
continue;
}
if names_match_record(route.name(), &r)
|| names_match_record(route.host(), &r)
|| names_match_record(route.service_type(), &r)
|| is_meta_query_name(r.name())
{
found = Some((
key,
RouteEvent::ToService(ToService::new(
route.handle(),
ServiceEvent::KnownAnswer(KnownAnswer::new(self.src, r)),
)),
));
break;
}
}
found
};
if let Some((key, ev)) = next_event {
self.answer_service_cursor = Some(key.saturating_add(1));
return Some(Ok(ev));
}
// service-side fan-out exhausted for THIS record. Mark
// it done (not just reset the cursor to None, which is ambiguous
// with "not started") so a later query event in the same record
// can't re-enter and replay the conflict/KAS events.
self.answer_service_done = true;
}
// Query-side fan-out (QR=1 only). emit a
// ToQuery for every name/type-compatible active query via
// `answer_query_cursor`. This already applied the answer
// to the Query state eagerly in `Endpoint::handle`; the
// events emitted here are informational only.
if self.is_response {
let start = self.answer_query_cursor.unwrap_or(0);
let mut found: Option<(usize, RouteEvent<'a>)> = None;
for (key, q) in self.endpoint.queries.iter() {
if key < start {
continue;
}
if q.is_done() || q.terminal_emitted() {
continue;
}
if names_match_record(q.qname(), &r) && qry_query_accepts(q, &r) {
found = Some((
key,
RouteEvent::ToQuery(ToQuery::new(q.handle(), QueryEvent::Answer(r))),
));
break;
}
}
if let Some((key, ev)) = found {
self.answer_query_cursor = Some(key.saturating_add(1));
return Some(Ok(ev));
}
// Query-side fan-out exhausted for this record.
self.answer_query_cursor = None;
}
// Both phases exhausted — record fully processed. Advance to the
// next answer record and reset the per-record fan-out state.
self.answer_idx = self.answer_idx.saturating_add(1);
self.answer_service_cursor = None;
self.answer_service_done = false;
self.answer_query_cursor = None;
continue;
}
Section::Authority => {
// authority-section records are tentative-probe claims
// (RFC 6762 §8.2) — a peer asserting ownership of a name. Routing
// them as ProbeConflict / HostConflict forces our service to rename
// or surfaces a host conflict, so they MUST come from a trusted mDNS
// peer. A genuine prober sends from UDP source port 5353; an
// ephemeral-port packet carrying an authority RR for our name is an
// off-path / forged artifact — a legacy §6.7 querier sends only
// questions, never authority records. (QR=1 responses from non-5353
// ports are already fully suppressed upstream; this closes the QR=0
// query path, where the Question section has ALREADY been routed
// above so legacy unicast repliers are unaffected.)
//
// accounting rule: suppressing conflict routing for a non-5353
// source is a SECTION-LEVEL suppression — the datagram's other
// sections (questions, answers, additional) are still processed.
// This is NOT a whole-datagram reject, so `packets_dropped` is NOT
// bumped here. Parse errors in the authority section ARE still
// counted by the upfront section-validation latch (which walks
// authority regardless of source port), so malformed bytes are
// always accounted even when conflict routing is suppressed.
if self.src.port() != crate::constants::MDNS_PORT {
self.section = Section::Additional;
continue;
}
if self.authority_idx >= self.reader.header().authority_count() {
self.section = Section::Additional;
continue;
}
let mut auth = self.reader.authority();
for _ in 0..self.authority_idx {
let _ = auth.next();
}
let r = match auth.next() {
Some(Ok(r)) => r,
Some(Err(e)) => {
// skip the rest of the authority section after a
// parse error so the iterator terminates instead of
// looping on the same error indefinitely.
// parse_errors was already bumped by the upfront section-
// validation latch in Endpoint::handle — do NOT bump again.
self.section = Section::Done;
return Some(Err(HandleError::Parse(e)));
}
None => {
self.section = Section::Additional;
continue;
}
};
// route-level TTL=0 guard for the authority section.
// covered Section::Answers; the same rationale applies
// here. A TTL=0 authority record is a goodbye/withdrawal,
// not a peer claiming the name — emitting ProbeConflict or
// HostConflict for it would let a withdrawing peer trigger
// our auto-rename or HostConflict surfacing.
if r.ttl() == 0 {
self.authority_idx = self.authority_idx.saturating_add(1);
self.authority_service_cursor = None;
continue;
}
// Authority records in mDNS probe messages signal a peer claiming the
// same name — route as ProbeConflict / HostConflict to EVERY matching
// service (multiple services can share a host) via the shared
// conflict helper (name + rtype + class gates centralized there).
let start = self.authority_service_cursor.unwrap_or(0);
if let Some((key, ev)) = self.next_service_conflict(&r, start) {
self.authority_service_cursor = Some(key.saturating_add(1));
return Some(Ok(ev));
}
// No more matching services for this authority record.
// Advance to the next authority record.
self.authority_idx = self.authority_idx.saturating_add(1);
self.authority_service_cursor = None;
continue;
}
Section::Additional => {
// additional-section records are supplementary ANSWERS
// (DNS-SD SRV/TXT/A/AAAA accompanying a PTR), NOT questions or probe
// claims. They fan out to active queries ONLY (QR=1) — never service
// conflicts/KAS. Cache population + eager query-state update already
// happened in `Endpoint::handle`; these events are informational.
if !self.is_response {
self.section = Section::Done;
continue;
}
if self.additional_idx >= self.reader.header().additional_count() {
self.section = Section::Done;
continue;
}
let mut add = self.reader.additional();
for _ in 0..self.additional_idx {
let _ = add.next();
}
let r = match add.next() {
Some(Ok(r)) => r,
Some(Err(e)) => {
// parse_errors was already bumped in the eager walk in
// Endpoint::handle (which covers answers AND additionals);
// do NOT bump it here to avoid double-counting.
self.section = Section::Done;
return Some(Err(HandleError::Parse(e)));
}
None => {
self.section = Section::Done;
continue;
}
};
// TTL=0 additionals are withdrawals — cache removal already handled
// eagerly; do not surface a ghost conflict/answer.
if r.ttl() == 0 {
self.additional_idx = self.additional_idx.saturating_add(1);
self.additional_service_cursor = None;
self.additional_service_done = false;
self.additional_query_cursor = None;
continue;
}
// service-conflict fan-out FIRST. A QR=1 additional record
// can carry a conflicting SRV/TXT (instance) or A/AAAA (host) for one
// of our services — DNS-SD responders place these in the Additional
// section, so missing them here would let duplicate names survive.
// Same unique-record gates as the Answer/Authority sections;
// service-type (shared) matches are never conflicts.
if !self.additional_service_done {
let start = self.additional_service_cursor.unwrap_or(0);
if let Some((key, ev)) = self.next_service_conflict(&r, start) {
self.additional_service_cursor = Some(key.saturating_add(1));
return Some(Ok(ev));
}
// mark the service phase done for this record so a later
// query event can't re-enter and replay the conflict events.
self.additional_service_done = true;
}
// Then query fan-out (informational; eager state update already done).
let start = self.additional_query_cursor.unwrap_or(0);
let mut found: Option<(usize, RouteEvent<'a>)> = None;
for (key, q) in self.endpoint.queries.iter() {
if key < start {
continue;
}
if q.is_done() || q.terminal_emitted() {
continue;
}
if names_match_record(q.qname(), &r) && qry_query_accepts(q, &r) {
found = Some((
key,
RouteEvent::ToQuery(ToQuery::new(q.handle(), QueryEvent::Answer(r))),
));
break;
}
}
if let Some((key, ev)) = found {
self.additional_query_cursor = Some(key.saturating_add(1));
return Some(Ok(ev));
}
// Both fan-outs exhausted for this additional record; advance and
// reset the per-record fan-out state.
self.additional_idx = self.additional_idx.saturating_add(1);
self.additional_service_cursor = None;
self.additional_service_done = false;
self.additional_query_cursor = None;
continue;
}
Section::Done => return None,
}
}
}
}