mdns_proto/endpoint/route.rs
1//! The `RouteEvents` iterator: demuxes one inbound message into routing events.
2
3use super::*;
4
5/// Iterator over routing decisions for a single incoming datagram.
6///
7/// Borrows the endpoint mutably for the duration of iteration so that
8/// `QueryEvent::Answer` events can be applied to the internal
9/// [`Query`] state machines as they are yielded —
10/// callers do not need to dispatch query events themselves. Service
11/// events still flow to the caller via the yielded [`RouteEvent`]s.
12pub struct RouteEvents<'a, 'e, I, R, C, SR, QS, EV, AN, EvQ>
13where
14 I: Instant,
15 R: Rng,
16 C: Pool<CacheEntry<I>>,
17 SR: Pool<ServiceRoute>,
18 QS: Pool<Query<I, AN, EvQ>>,
19 EV: Pool<EndpointEventEntry>,
20 AN: Pool<CollectedAnswer>,
21 EvQ: Pool<QueryUpdate>,
22{
23 pub(crate) src: SocketAddr,
24 pub(crate) endpoint: &'e mut Endpoint<I, R, C, SR, QS, EV, AN, EvQ>,
25 pub(crate) reader: MessageReader<'a>,
26 /// `true` when the QR bit is set (this is a response, not a query).
27 /// Used to gate KnownAnswer-suppression routing: KAS hints must only be
28 /// extracted from QUERY packets (QR=0); response packets must not poison
29 /// the KAS ring.
30 pub(crate) is_response: bool,
31 pub(crate) question_idx: u16,
32 /// Per-question service cursor: the slab key from which to resume iterating
33 /// services for the current question. Allows ALL matching services to receive
34 /// a `ServiceEvent::Question` for a single question before advancing to the
35 /// next question.
36 pub(crate) service_cursor: usize,
37 pub(crate) answer_idx: u16,
38 pub(crate) authority_idx: u16,
39 /// Stashed query event behind a higher-priority service event (e.g. a
40 /// `ProbeConflict` or `KnownAnswer` returns first and the first matching
41 /// `QueryEvent::Answer` for the same record drains on the next call).
42 pub(crate) pending_query: Option<RouteEvent<'a>>,
43 /// cursor for fanning out additional matching query routes for
44 /// the current `answer_idx` across multiple `next()` calls without
45 /// buffering events. `None` means "have not started query fan-out for
46 /// this record" — `next()` does the service-side pass plus finds the
47 /// FIRST matching query. `Some(k)` means "mid fan-out — resume scanning
48 /// `self.queries` from slab key `k`." Resets to `None` whenever
49 /// `answer_idx` advances, so the cursor is always paired with the
50 /// current answer record.
51 ///
52 /// Replaces the unbounded `std::Vec` buffering — that
53 /// allocated on the inbound packet path with infallible `push`, which
54 /// under allocator pressure aborts/panics instead of surfacing an
55 /// error, and used `Vec::remove(0)` for drain (O(n²) on large
56 /// fan-outs). The cursor model is O(1) state per record, O(n) total
57 /// work, and never allocates.
58 pub(crate) answer_query_cursor: Option<usize>,
59 /// cursor for fanning out KnownAnswer / ProbeConflict /
60 /// HostConflict events across multiple registered services that all
61 /// match the same answer record (e.g. several services sharing a
62 /// `service_type` for a PTR known-answer hint, or multiple services
63 /// sharing a host name). Same shape as `answer_query_cursor`:
64 /// `None` = haven't started service-side fan-out for this record;
65 /// `Some(k)` = resume scan from slab key `k`. Reset to `None` when
66 /// `answer_idx` advances.
67 ///
68 /// Previously the service-side scan stopped at the first matching
69 /// service, so the actual owning service of a PTR known-answer
70 /// (which had the right rdata to suppress) never received the hint —
71 /// the unrelated first-matching service got it instead and ignored
72 /// it by rdata mismatch.
73 pub(crate) answer_service_cursor: Option<usize>,
74 /// whether the answer-record service-phase fan-out is COMPLETE for
75 /// the current `answer_idx`. `answer_service_cursor` alone is ambiguous
76 /// (`None` means both "not started" and "exhausted"), so after a query event
77 /// returns mid-record, re-entry would re-scan services and replay conflict
78 /// events. This flag gates the service phase; it is reset only when
79 /// `answer_idx` advances.
80 pub(crate) answer_service_done: bool,
81 /// cursor for fanning out authority-section conflict events
82 /// (ProbeConflict / HostConflict) across multiple services that
83 /// share a host name. `None` = haven't started fan-out for the
84 /// current authority record; `Some(k)` = resume scan from slab key
85 /// `k`. Same shape as the other answer/question cursors.
86 ///
87 /// Previously the authority-section loop broke on the first
88 /// matching service and advanced `authority_idx`, so a peer probe
89 /// for a shared host name reached only one of the services
90 /// sharing that host; the rest never received the HostConflict
91 /// signal.
92 pub(crate) authority_service_cursor: Option<usize>,
93 /// When a QUERY-packet answer matches a registered service for both a
94 /// ProbeConflict and a KnownAnswer event, we emit ProbeConflict first and
95 /// stash the KnownAnswer here for the subsequent call.
96 pub(crate) pending_service_event: Option<RouteEvent<'a>>,
97 /// index into the ADDITIONAL section, plus the
98 /// service-conflict and query fan-out cursors for the current additional
99 /// record (same shape as the answer-section cursors). DNS-SD responders carry
100 /// SRV/TXT/A/AAAA here, so QR=1 additionals run conflict detection (instance
101 /// SRV/TXT → ProbeConflict, host A/AAAA → HostConflict) AND query fan-out —
102 /// but never KAS (additionals are not known-answer hints).
103 pub(crate) additional_idx: u16,
104 pub(crate) additional_service_cursor: Option<usize>,
105 /// like `answer_service_done`, marks the additional-record
106 /// service-phase fan-out complete for the current `additional_idx` so a query
107 /// event mid-record cannot cause the conflict events to replay on re-entry.
108 pub(crate) additional_service_done: bool,
109 pub(crate) additional_query_cursor: Option<usize>,
110 pub(crate) section: Section,
111}
112
113#[derive(Copy, Clone)]
114pub(crate) enum Section {
115 Questions,
116 Answers,
117 Authority,
118 Additional,
119 Done,
120}
121
122impl<'a, I, R, C, SR, QS, EV, AN, EvQ> RouteEvents<'a, '_, I, R, C, SR, QS, EV, AN, EvQ>
123where
124 I: Instant,
125 R: Rng,
126 C: Pool<CacheEntry<I>>,
127 SR: Pool<ServiceRoute>,
128 QS: Pool<Query<I, AN, EvQ>>,
129 EV: Pool<EndpointEventEntry>,
130 AN: Pool<CollectedAnswer>,
131 EvQ: Pool<QueryUpdate>,
132{
133 /// the ONE conflict-routing decision for a QR=1 record `r`,
134 /// shared by the Answers, Authority, and Additional sections (previously
135 /// triplicated). Scans registered services from slab key `start` and returns
136 /// the next `(key, event)`:
137 /// * instance-name match + SRV/TXT → ProbeConflict (the instance's unique
138 /// RRset; service-type / shared names are never conflicts);
139 /// * host-name match + A/AAAA → HostConflict.
140 ///
141 /// conflicts are only routed for class-IN records — a record with
142 /// class ANY or an unknown class is not the same-class RRset RFC 6762 §9
143 /// requires, so it must not drive rename / host-conflict surfacing.
144 fn next_service_conflict(
145 &self,
146 r: &crate::wire::Ref<'a>,
147 start: usize,
148 ) -> Option<(usize, RouteEvent<'a>)> {
149 if r.rclass() != ResourceClass::In {
150 return None;
151 }
152 for (key, route) in self.endpoint.services.iter() {
153 if key < start {
154 continue;
155 }
156 // A withdrawing route's service is being torn down (only its goodbye is still
157 // draining) — never route a conflict to it. The route is retained for the
158 // name guard, but dispatching ProbeConflict/HostConflict here would feed
159 // terminal events into a proto the driver no longer drains (it skips
160 // withdrawing/errored contexts), letting a peer flood the proto event slab of
161 // a retiring service until GC — a bounded-time but unbounded-size growth path
162 //. Mirrors the question-dispatch and known-answer skips.
163 #[cfg(any(feature = "alloc", feature = "std", feature = "no-atomic"))]
164 if route.withdrawing {
165 continue;
166 }
167 if names_match_record(route.name(), r) && is_instance_conflict_rtype(r.rtype()) {
168 return Some((
169 key,
170 RouteEvent::ToService(ToService::new(
171 route.handle(),
172 ServiceEvent::ProbeConflict(ProbeConflict::new(self.src, *r)),
173 )),
174 ));
175 }
176 if names_match_record(route.host(), r) && is_host_conflict_rtype(r.rtype()) {
177 return Some((
178 key,
179 RouteEvent::ToService(ToService::new(
180 route.handle(),
181 ServiceEvent::HostConflict(HostConflict::new(*r)),
182 )),
183 ));
184 }
185 }
186 None
187 }
188}
189
190impl<'a, I, R, C, SR, QS, EV, AN, EvQ> Iterator
191 for RouteEvents<'a, '_, I, R, C, SR, QS, EV, AN, EvQ>
192where
193 I: Instant,
194 R: Rng,
195 C: Pool<CacheEntry<I>>,
196 SR: Pool<ServiceRoute>,
197 QS: Pool<Query<I, AN, EvQ>>,
198 EV: Pool<EndpointEventEntry>,
199 AN: Pool<CollectedAnswer>,
200 EvQ: Pool<QueryUpdate>,
201{
202 type Item = Result<RouteEvent<'a>, HandleError>;
203
204 fn next(&mut self) -> Option<Self::Item> {
205 // Flush pending stashed events in priority order before processing the
206 // next record. Order: ProbeConflict / KnownAnswer stash (service event)
207 // first, then query Answer stash.
208 if let Some(ev) = self.pending_service_event.take() {
209 return Some(Ok(ev));
210 }
211 if let Some(ev) = self.pending_query.take() {
212 return Some(Ok(ev));
213 }
214
215 loop {
216 match self.section {
217 Section::Questions => {
218 // gate question→service routing on
219 // `EndpointConfig::answer_questions`. When disabled, no
220 // `ServiceEvent::Question` events fire at all, so registered
221 // services never schedule responses to inbound queries.
222 // This is the "advertise but don't respond" / passive mode.
223 if !self.endpoint.config.answer_questions() {
224 self.section = Section::Answers;
225 continue;
226 }
227 if self.question_idx >= self.reader.header().question_count() {
228 self.section = Section::Answers;
229 continue;
230 }
231 let mut qs = self.reader.questions();
232 for _ in 0..self.question_idx {
233 let _ = qs.next();
234 }
235 let q = match qs.next() {
236 Some(Ok(q)) => q,
237 Some(Err(e)) => {
238 // skip the rest of the questions section after a
239 // parse error so the iterator terminates instead of
240 // looping on the same error indefinitely.
241 // parse_errors was already bumped by the upfront section-
242 // validation latch in Endpoint::handle — do NOT bump again.
243 self.section = Section::Answers;
244 return Some(Err(HandleError::Parse(e)));
245 }
246 None => {
247 self.section = Section::Answers;
248 continue;
249 }
250 };
251 // Walk services starting from the saved cursor, looking for the next
252 // match. This allows ALL services sharing the same PTR name to each
253 // receive a ServiceEvent::Question for this question before we move on.
254 let cursor = self.service_cursor;
255 let mut found: Option<(usize, RouteEvent<'a>)> = None;
256 for (key, route) in self.endpoint.services.iter() {
257 if key < cursor {
258 continue;
259 }
260 // A withdrawing route's service is gone (only its goodbye is still
261 // draining) — never route an incoming question to it, or it could
262 // emit a positive-TTL answer contradicting its own TTL=0 goodbye.
263 // The route is still present for the name guard, just not answered.
264 #[cfg(any(feature = "alloc", feature = "std", feature = "no-atomic"))]
265 if route.withdrawing {
266 continue;
267 }
268 if names_match(route.name(), q.qname())
269 || names_match(route.service_type(), q.qname())
270 || names_match(route.host(), q.qname())
271 || route
272 .subtypes
273 .iter()
274 .any(|s| names_match(s, q.qname()))
275 // RFC 6763 §9 service-type enumeration: route the meta-query to
276 // EVERY service; each answerable (Established/Announcing) one emits
277 // its own type PTR, and the cursor below delivers it to each in
278 // turn. An earlier per-type dedup (route only the
279 // lowest-keyed service of each type) is UNSAFE here — routing has
280 // no visibility into Service lifecycle state, so it could pick a
281 // probing/non-answering representative and mask an Established
282 // same-type sibling, leaving a live type unanswered. Two instances
283 // of one type emitting the identical meta-PTR is benign (receivers
284 // dedup the identical RR); true per-type dedup would require
285 // state-aware, cross-service handling at the driver layer.
286 || is_meta_query_name(q.qname())
287 {
288 found = Some((
289 key,
290 RouteEvent::ToService(ToService::new(
291 route.handle(),
292 ServiceEvent::Question(
293 ServiceQuestion::new(q, self.src, self.reader.header().id())
294 // RFC 6762 §7.2: a TC-bit query spreads its known answers
295 // across multiple packets — the responder delays longer so
296 // the follow-up packets accumulate before it suppresses.
297 .with_truncated(self.reader.header().flags().is_truncated()),
298 ),
299 )),
300 ));
301 break;
302 }
303 }
304 if let Some((key, ev)) = found {
305 // Advance the service cursor past this key so the next call picks up
306 // where we left off within the same question.
307 self.service_cursor = key.saturating_add(1);
308 return Some(Ok(ev));
309 }
310 // No more matching services for this question: advance to the next
311 // question and reset the per-question service cursor.
312 self.question_idx = self.question_idx.saturating_add(1);
313 self.service_cursor = 0;
314 continue;
315 }
316 Section::Answers => {
317 if self.answer_idx >= self.reader.header().answer_count() {
318 self.section = Section::Authority;
319 continue;
320 }
321 let mut ans = self.reader.answers();
322 for _ in 0..self.answer_idx {
323 let _ = ans.next();
324 }
325 let r = match ans.next() {
326 Some(Ok(r)) => r,
327 Some(Err(e)) => {
328 // skip the rest of the answers section after a
329 // parse error so the iterator terminates instead of
330 // looping on the same error indefinitely.
331 // parse_errors was already bumped in the eager walk in
332 // Endpoint::handle (which covers answers AND additionals);
333 // do NOT bump it here to avoid double-counting.
334 self.section = Section::Authority;
335 return Some(Err(HandleError::Parse(e)));
336 }
337 None => {
338 self.section = Section::Authority;
339 continue;
340 }
341 };
342
343 // route-level TTL=0 guard. Records with TTL=0 are
344 // mDNS "goodbye" / deletion signals (RFC 6762 §10.1) — the
345 // cache layer already processes them as removals during the
346 // eager loop in `Endpoint::handle`, and `Query::handle_event`
347 // rejects them at the eager-mutation step. The
348 // remaining hazard is the iterator: emitting service events
349 // (ProbeConflict / HostConflict / KnownAnswer) for a goodbye
350 // would let a peer withdrawing a record trigger our auto-
351 // rename or HostConflict surfacing, and emitting ToQuery
352 // for a goodbye would let callers receive ghost "answers"
353 // from records being withdrawn. Skip the whole fan-out for
354 // TTL=0 — cache removal is the only correct side effect.
355 if r.ttl() == 0 {
356 self.answer_idx = self.answer_idx.saturating_add(1);
357 self.answer_service_cursor = None;
358 self.answer_service_done = false;
359 self.answer_query_cursor = None;
360 continue;
361 }
362
363 // Service-side fan-out for answer-section records.
364 //
365 // QR=0: records are KAS hints from another querier.
366 // Emit only KnownAnswer (for KAS suppression on probes).
367 // ProbeConflict / HostConflict are NEVER emitted here —
368 // letting a hostile querier trigger our auto-rename by
369 // mentioning our names in the answer section would be a
370 // trivial denial-of-service vector.
371 //
372 // QR=1: records are AUTHORITATIVE peer responses.
373 // Per RFC 6762 §8.1, a probing host MUST treat any
374 // response (solicited or unsolicited) claiming one of its
375 // tentative names as a conflict event. Emit
376 // ProbeConflict for instance-name matches and HostConflict
377 // for host-name matches. Service-type (shared) matches
378 // are NOT conflicts (multiple services share a type).
379 //
380 // Authority-section records (Section::Authority) fire
381 // ProbeConflict / HostConflict regardless of QR — those are
382 // tentative-probe records.
383 if !self.answer_service_done {
384 let start = self.answer_service_cursor.unwrap_or(0);
385 let next_event = if self.is_response {
386 // QR=1: ProbeConflict / HostConflict via the shared conflict
387 // helper (name + rtype + class gates). Service-type (shared)
388 // names are never conflicts.
389 self.next_service_conflict(&r, start)
390 } else {
391 // QR=0: records are KAS hints. ANY name match (instance / host /
392 // service-type) emits a KnownAnswer for suppression — conflicts
393 // are NEVER routed for QR=0 (a hostile querier mentioning our
394 // names must not trigger auto-rename).
395 //
396 // a QR=0 PTR owned by the DNS-SD service-type enumeration
397 // meta name is a known-answer for the §9 meta reply. Its owner is
398 // none of our RRset names, so fan it out to EVERY service (mirrors
399 // how the meta QUESTION routes to all of them); each
400 // service decides whether the PTR target matches its own type.
401 let mut found: Option<(usize, RouteEvent<'a>)> = None;
402 for (key, route) in self.endpoint.services.iter() {
403 if key < start {
404 continue;
405 }
406 // A withdrawing route's service is being torn down — never route a
407 // known-answer to it either, matching the question-dispatch and
408 // conflict skips (no dispatch after retirement).
409 #[cfg(any(feature = "alloc", feature = "std", feature = "no-atomic"))]
410 if route.withdrawing {
411 continue;
412 }
413 if names_match_record(route.name(), &r)
414 || names_match_record(route.host(), &r)
415 || names_match_record(route.service_type(), &r)
416 || is_meta_query_name(r.name())
417 {
418 found = Some((
419 key,
420 RouteEvent::ToService(ToService::new(
421 route.handle(),
422 ServiceEvent::KnownAnswer(KnownAnswer::new(self.src, r)),
423 )),
424 ));
425 break;
426 }
427 }
428 found
429 };
430 if let Some((key, ev)) = next_event {
431 self.answer_service_cursor = Some(key.saturating_add(1));
432 return Some(Ok(ev));
433 }
434 // service-side fan-out exhausted for THIS record. Mark
435 // it done (not just reset the cursor to None, which is ambiguous
436 // with "not started") so a later query event in the same record
437 // can't re-enter and replay the conflict/KAS events.
438 self.answer_service_done = true;
439 }
440
441 // Query-side fan-out (QR=1 only). emit a
442 // ToQuery for every name/type-compatible active query via
443 // `answer_query_cursor`. This already applied the answer
444 // to the Query state eagerly in `Endpoint::handle`; the
445 // events emitted here are informational only.
446 if self.is_response {
447 let start = self.answer_query_cursor.unwrap_or(0);
448 let mut found: Option<(usize, RouteEvent<'a>)> = None;
449 for (key, q) in self.endpoint.queries.iter() {
450 if key < start {
451 continue;
452 }
453 if q.is_done() || q.terminal_emitted() {
454 continue;
455 }
456 if names_match_record(q.qname(), &r) && qry_query_accepts(q, &r) {
457 found = Some((
458 key,
459 RouteEvent::ToQuery(ToQuery::new(q.handle(), QueryEvent::Answer(r))),
460 ));
461 break;
462 }
463 }
464 if let Some((key, ev)) = found {
465 self.answer_query_cursor = Some(key.saturating_add(1));
466 return Some(Ok(ev));
467 }
468 // Query-side fan-out exhausted for this record.
469 self.answer_query_cursor = None;
470 }
471
472 // Both phases exhausted — record fully processed. Advance to the
473 // next answer record and reset the per-record fan-out state.
474 self.answer_idx = self.answer_idx.saturating_add(1);
475 self.answer_service_cursor = None;
476 self.answer_service_done = false;
477 self.answer_query_cursor = None;
478 continue;
479 }
480 Section::Authority => {
481 // authority-section records are tentative-probe claims
482 // (RFC 6762 §8.2) — a peer asserting ownership of a name. Routing
483 // them as ProbeConflict / HostConflict forces our service to rename
484 // or surfaces a host conflict, so they MUST come from a trusted mDNS
485 // peer. A genuine prober sends from UDP source port 5353; an
486 // ephemeral-port packet carrying an authority RR for our name is an
487 // off-path / forged artifact — a legacy §6.7 querier sends only
488 // questions, never authority records. (QR=1 responses from non-5353
489 // ports are already fully suppressed upstream; this closes the QR=0
490 // query path, where the Question section has ALREADY been routed
491 // above so legacy unicast repliers are unaffected.)
492 //
493 // accounting rule: suppressing conflict routing for a non-5353
494 // source is a SECTION-LEVEL suppression — the datagram's other
495 // sections (questions, answers, additional) are still processed.
496 // This is NOT a whole-datagram reject, so `packets_dropped` is NOT
497 // bumped here. Parse errors in the authority section ARE still
498 // counted by the upfront section-validation latch (which walks
499 // authority regardless of source port), so malformed bytes are
500 // always accounted even when conflict routing is suppressed.
501 if self.src.port() != crate::constants::MDNS_PORT {
502 self.section = Section::Additional;
503 continue;
504 }
505 if self.authority_idx >= self.reader.header().authority_count() {
506 self.section = Section::Additional;
507 continue;
508 }
509 let mut auth = self.reader.authority();
510 for _ in 0..self.authority_idx {
511 let _ = auth.next();
512 }
513 let r = match auth.next() {
514 Some(Ok(r)) => r,
515 Some(Err(e)) => {
516 // skip the rest of the authority section after a
517 // parse error so the iterator terminates instead of
518 // looping on the same error indefinitely.
519 // parse_errors was already bumped by the upfront section-
520 // validation latch in Endpoint::handle — do NOT bump again.
521 self.section = Section::Done;
522 return Some(Err(HandleError::Parse(e)));
523 }
524 None => {
525 self.section = Section::Additional;
526 continue;
527 }
528 };
529
530 // route-level TTL=0 guard for the authority section.
531 // covered Section::Answers; the same rationale applies
532 // here. A TTL=0 authority record is a goodbye/withdrawal,
533 // not a peer claiming the name — emitting ProbeConflict or
534 // HostConflict for it would let a withdrawing peer trigger
535 // our auto-rename or HostConflict surfacing.
536 if r.ttl() == 0 {
537 self.authority_idx = self.authority_idx.saturating_add(1);
538 self.authority_service_cursor = None;
539 continue;
540 }
541
542 // Authority records in mDNS probe messages signal a peer claiming the
543 // same name — route as ProbeConflict / HostConflict to EVERY matching
544 // service (multiple services can share a host) via the shared
545 // conflict helper (name + rtype + class gates centralized there).
546 let start = self.authority_service_cursor.unwrap_or(0);
547 if let Some((key, ev)) = self.next_service_conflict(&r, start) {
548 self.authority_service_cursor = Some(key.saturating_add(1));
549 return Some(Ok(ev));
550 }
551 // No more matching services for this authority record.
552 // Advance to the next authority record.
553 self.authority_idx = self.authority_idx.saturating_add(1);
554 self.authority_service_cursor = None;
555 continue;
556 }
557 Section::Additional => {
558 // additional-section records are supplementary ANSWERS
559 // (DNS-SD SRV/TXT/A/AAAA accompanying a PTR), NOT questions or probe
560 // claims. They fan out to active queries ONLY (QR=1) — never service
561 // conflicts/KAS. Cache population + eager query-state update already
562 // happened in `Endpoint::handle`; these events are informational.
563 if !self.is_response {
564 self.section = Section::Done;
565 continue;
566 }
567 if self.additional_idx >= self.reader.header().additional_count() {
568 self.section = Section::Done;
569 continue;
570 }
571 let mut add = self.reader.additional();
572 for _ in 0..self.additional_idx {
573 let _ = add.next();
574 }
575 let r = match add.next() {
576 Some(Ok(r)) => r,
577 Some(Err(e)) => {
578 // parse_errors was already bumped in the eager walk in
579 // Endpoint::handle (which covers answers AND additionals);
580 // do NOT bump it here to avoid double-counting.
581 self.section = Section::Done;
582 return Some(Err(HandleError::Parse(e)));
583 }
584 None => {
585 self.section = Section::Done;
586 continue;
587 }
588 };
589 // TTL=0 additionals are withdrawals — cache removal already handled
590 // eagerly; do not surface a ghost conflict/answer.
591 if r.ttl() == 0 {
592 self.additional_idx = self.additional_idx.saturating_add(1);
593 self.additional_service_cursor = None;
594 self.additional_service_done = false;
595 self.additional_query_cursor = None;
596 continue;
597 }
598 // service-conflict fan-out FIRST. A QR=1 additional record
599 // can carry a conflicting SRV/TXT (instance) or A/AAAA (host) for one
600 // of our services — DNS-SD responders place these in the Additional
601 // section, so missing them here would let duplicate names survive.
602 // Same unique-record gates as the Answer/Authority sections;
603 // service-type (shared) matches are never conflicts.
604 if !self.additional_service_done {
605 let start = self.additional_service_cursor.unwrap_or(0);
606 if let Some((key, ev)) = self.next_service_conflict(&r, start) {
607 self.additional_service_cursor = Some(key.saturating_add(1));
608 return Some(Ok(ev));
609 }
610 // mark the service phase done for this record so a later
611 // query event can't re-enter and replay the conflict events.
612 self.additional_service_done = true;
613 }
614 // Then query fan-out (informational; eager state update already done).
615 let start = self.additional_query_cursor.unwrap_or(0);
616 let mut found: Option<(usize, RouteEvent<'a>)> = None;
617 for (key, q) in self.endpoint.queries.iter() {
618 if key < start {
619 continue;
620 }
621 if q.is_done() || q.terminal_emitted() {
622 continue;
623 }
624 if names_match_record(q.qname(), &r) && qry_query_accepts(q, &r) {
625 found = Some((
626 key,
627 RouteEvent::ToQuery(ToQuery::new(q.handle(), QueryEvent::Answer(r))),
628 ));
629 break;
630 }
631 }
632 if let Some((key, ev)) = found {
633 self.additional_query_cursor = Some(key.saturating_add(1));
634 return Some(Ok(ev));
635 }
636 // Both fan-outs exhausted for this additional record; advance and
637 // reset the per-record fan-out state.
638 self.additional_idx = self.additional_idx.saturating_add(1);
639 self.additional_service_cursor = None;
640 self.additional_service_done = false;
641 self.additional_query_cursor = None;
642 continue;
643 }
644 Section::Done => return None,
645 }
646 }
647 }
648}