Skip to main content

mdns_proto/endpoint/
route.rs

1//! The `RouteEvents` iterator: demuxes one inbound message into routing events.
2
3use super::*;
4
5/// Iterator over routing decisions for a single incoming datagram.
6///
7/// Borrows the endpoint mutably for the duration of iteration so that
8/// `QueryEvent::Answer` events can be applied to the internal
9/// [`Query`] state machines as they are yielded —
10/// callers do not need to dispatch query events themselves.  Service
11/// events still flow to the caller via the yielded [`RouteEvent`]s.
12pub struct RouteEvents<'a, 'e, I, R, C, SR, QS, EV, AN, EvQ>
13where
14  I: Instant,
15  R: Rng,
16  C: Pool<CacheEntry<I>>,
17  SR: Pool<ServiceRoute>,
18  QS: Pool<Query<I, AN, EvQ>>,
19  EV: Pool<EndpointEventEntry>,
20  AN: Pool<CollectedAnswer>,
21  EvQ: Pool<QueryUpdate>,
22{
23  pub(crate) src: SocketAddr,
24  pub(crate) endpoint: &'e mut Endpoint<I, R, C, SR, QS, EV, AN, EvQ>,
25  pub(crate) reader: MessageReader<'a>,
26  /// `true` when the QR bit is set (this is a response, not a query).
27  /// Used to gate KnownAnswer-suppression routing: KAS hints must only be
28  /// extracted from QUERY packets (QR=0); response packets must not poison
29  /// the KAS ring.
30  pub(crate) is_response: bool,
31  pub(crate) question_idx: u16,
32  /// Per-question service cursor: the slab key from which to resume iterating
33  /// services for the current question. Allows ALL matching services to receive
34  /// a `ServiceEvent::Question` for a single question before advancing to the
35  /// next question.
36  pub(crate) service_cursor: usize,
37  pub(crate) answer_idx: u16,
38  pub(crate) authority_idx: u16,
39  /// Stashed query event behind a higher-priority service event (e.g. a
40  /// `ProbeConflict` or `KnownAnswer` returns first and the first matching
41  /// `QueryEvent::Answer` for the same record drains on the next call).
42  pub(crate) pending_query: Option<RouteEvent<'a>>,
43  /// cursor for fanning out additional matching query routes for
44  /// the current `answer_idx` across multiple `next()` calls without
45  /// buffering events.  `None` means "have not started query fan-out for
46  /// this record" — `next()` does the service-side pass plus finds the
47  /// FIRST matching query.  `Some(k)` means "mid fan-out — resume scanning
48  /// `self.queries` from slab key `k`."  Resets to `None` whenever
49  /// `answer_idx` advances, so the cursor is always paired with the
50  /// current answer record.
51  ///
52  /// Replaces the unbounded `std::Vec` buffering — that
53  /// allocated on the inbound packet path with infallible `push`, which
54  /// under allocator pressure aborts/panics instead of surfacing an
55  /// error, and used `Vec::remove(0)` for drain (O(n²) on large
56  /// fan-outs).  The cursor model is O(1) state per record, O(n) total
57  /// work, and never allocates.
58  pub(crate) answer_query_cursor: Option<usize>,
59  /// cursor for fanning out KnownAnswer / ProbeConflict /
60  /// HostConflict events across multiple registered services that all
61  /// match the same answer record (e.g. several services sharing a
62  /// `service_type` for a PTR known-answer hint, or multiple services
63  /// sharing a host name).  Same shape as `answer_query_cursor`:
64  /// `None` = haven't started service-side fan-out for this record;
65  /// `Some(k)` = resume scan from slab key `k`.  Reset to `None` when
66  /// `answer_idx` advances.
67  ///
68  /// Previously the service-side scan stopped at the first matching
69  /// service, so the actual owning service of a PTR known-answer
70  /// (which had the right rdata to suppress) never received the hint —
71  /// the unrelated first-matching service got it instead and ignored
72  /// it by rdata mismatch.
73  pub(crate) answer_service_cursor: Option<usize>,
74  /// whether the answer-record service-phase fan-out is COMPLETE for
75  /// the current `answer_idx`. `answer_service_cursor` alone is ambiguous
76  /// (`None` means both "not started" and "exhausted"), so after a query event
77  /// returns mid-record, re-entry would re-scan services and replay conflict
78  /// events. This flag gates the service phase; it is reset only when
79  /// `answer_idx` advances.
80  pub(crate) answer_service_done: bool,
81  /// cursor for fanning out authority-section conflict events
82  /// (ProbeConflict / HostConflict) across multiple services that
83  /// share a host name.  `None` = haven't started fan-out for the
84  /// current authority record; `Some(k)` = resume scan from slab key
85  /// `k`.  Same shape as the other answer/question cursors.
86  ///
87  /// Previously the authority-section loop broke on the first
88  /// matching service and advanced `authority_idx`, so a peer probe
89  /// for a shared host name reached only one of the services
90  /// sharing that host; the rest never received the HostConflict
91  /// signal.
92  pub(crate) authority_service_cursor: Option<usize>,
93  /// When a QUERY-packet answer matches a registered service for both a
94  /// ProbeConflict and a KnownAnswer event, we emit ProbeConflict first and
95  /// stash the KnownAnswer here for the subsequent call.
96  pub(crate) pending_service_event: Option<RouteEvent<'a>>,
97  /// index into the ADDITIONAL section, plus the
98  /// service-conflict and query fan-out cursors for the current additional
99  /// record (same shape as the answer-section cursors). DNS-SD responders carry
100  /// SRV/TXT/A/AAAA here, so QR=1 additionals run conflict detection (instance
101  /// SRV/TXT → ProbeConflict, host A/AAAA → HostConflict) AND query fan-out —
102  /// but never KAS (additionals are not known-answer hints).
103  pub(crate) additional_idx: u16,
104  pub(crate) additional_service_cursor: Option<usize>,
105  /// like `answer_service_done`, marks the additional-record
106  /// service-phase fan-out complete for the current `additional_idx` so a query
107  /// event mid-record cannot cause the conflict events to replay on re-entry.
108  pub(crate) additional_service_done: bool,
109  pub(crate) additional_query_cursor: Option<usize>,
110  pub(crate) section: Section,
111}
112
113#[derive(Copy, Clone)]
114pub(crate) enum Section {
115  Questions,
116  Answers,
117  Authority,
118  Additional,
119  Done,
120}
121
122impl<'a, I, R, C, SR, QS, EV, AN, EvQ> RouteEvents<'a, '_, I, R, C, SR, QS, EV, AN, EvQ>
123where
124  I: Instant,
125  R: Rng,
126  C: Pool<CacheEntry<I>>,
127  SR: Pool<ServiceRoute>,
128  QS: Pool<Query<I, AN, EvQ>>,
129  EV: Pool<EndpointEventEntry>,
130  AN: Pool<CollectedAnswer>,
131  EvQ: Pool<QueryUpdate>,
132{
133  /// the ONE conflict-routing decision for a QR=1 record `r`,
134  /// shared by the Answers, Authority, and Additional sections (previously
135  /// triplicated). Scans registered services from slab key `start` and returns
136  /// the next `(key, event)`:
137  ///   * instance-name match + SRV/TXT → ProbeConflict (the instance's unique
138  ///     RRset; service-type / shared names are never conflicts);
139  ///   * host-name match + A/AAAA → HostConflict.
140  ///
141  /// conflicts are only routed for class-IN records — a record with
142  /// class ANY or an unknown class is not the same-class RRset RFC 6762 §9
143  /// requires, so it must not drive rename / host-conflict surfacing.
144  fn next_service_conflict(
145    &self,
146    r: &crate::wire::Ref<'a>,
147    start: usize,
148  ) -> Option<(usize, RouteEvent<'a>)> {
149    if r.rclass() != ResourceClass::In {
150      return None;
151    }
152    for (key, route) in self.endpoint.services.iter() {
153      if key < start {
154        continue;
155      }
156      // A withdrawing route's service is being torn down (only its goodbye is still
157      // draining) — never route a conflict to it. The route is retained for the
158      // name guard, but dispatching ProbeConflict/HostConflict here would feed
159      // terminal events into a proto the driver no longer drains (it skips
160      // withdrawing/errored contexts), letting a peer flood the proto event slab of
161      // a retiring service until GC — a bounded-time but unbounded-size growth path
162      //. Mirrors the question-dispatch and known-answer skips.
163      #[cfg(any(feature = "alloc", feature = "std", feature = "no-atomic"))]
164      if route.withdrawing {
165        continue;
166      }
167      if names_match_record(route.name(), r) && is_instance_conflict_rtype(r.rtype()) {
168        return Some((
169          key,
170          RouteEvent::ToService(ToService::new(
171            route.handle(),
172            ServiceEvent::ProbeConflict(ProbeConflict::new(self.src, *r)),
173          )),
174        ));
175      }
176      if names_match_record(route.host(), r) && is_host_conflict_rtype(r.rtype()) {
177        return Some((
178          key,
179          RouteEvent::ToService(ToService::new(
180            route.handle(),
181            ServiceEvent::HostConflict(HostConflict::new(*r)),
182          )),
183        ));
184      }
185    }
186    None
187  }
188}
189
190impl<'a, I, R, C, SR, QS, EV, AN, EvQ> Iterator
191  for RouteEvents<'a, '_, I, R, C, SR, QS, EV, AN, EvQ>
192where
193  I: Instant,
194  R: Rng,
195  C: Pool<CacheEntry<I>>,
196  SR: Pool<ServiceRoute>,
197  QS: Pool<Query<I, AN, EvQ>>,
198  EV: Pool<EndpointEventEntry>,
199  AN: Pool<CollectedAnswer>,
200  EvQ: Pool<QueryUpdate>,
201{
202  type Item = Result<RouteEvent<'a>, HandleError>;
203
204  fn next(&mut self) -> Option<Self::Item> {
205    // Flush pending stashed events in priority order before processing the
206    // next record.  Order: ProbeConflict / KnownAnswer stash (service event)
207    // first, then query Answer stash.
208    if let Some(ev) = self.pending_service_event.take() {
209      return Some(Ok(ev));
210    }
211    if let Some(ev) = self.pending_query.take() {
212      return Some(Ok(ev));
213    }
214
215    loop {
216      match self.section {
217        Section::Questions => {
218          // gate question→service routing on
219          // `EndpointConfig::answer_questions`.  When disabled, no
220          // `ServiceEvent::Question` events fire at all, so registered
221          // services never schedule responses to inbound queries.
222          // This is the "advertise but don't respond" / passive mode.
223          if !self.endpoint.config.answer_questions() {
224            self.section = Section::Answers;
225            continue;
226          }
227          if self.question_idx >= self.reader.header().question_count() {
228            self.section = Section::Answers;
229            continue;
230          }
231          let mut qs = self.reader.questions();
232          for _ in 0..self.question_idx {
233            let _ = qs.next();
234          }
235          let q = match qs.next() {
236            Some(Ok(q)) => q,
237            Some(Err(e)) => {
238              // skip the rest of the questions section after a
239              // parse error so the iterator terminates instead of
240              // looping on the same error indefinitely.
241              // parse_errors was already bumped by the upfront section-
242              // validation latch in Endpoint::handle — do NOT bump again.
243              self.section = Section::Answers;
244              return Some(Err(HandleError::Parse(e)));
245            }
246            None => {
247              self.section = Section::Answers;
248              continue;
249            }
250          };
251          // Walk services starting from the saved cursor, looking for the next
252          // match. This allows ALL services sharing the same PTR name to each
253          // receive a ServiceEvent::Question for this question before we move on.
254          let cursor = self.service_cursor;
255          let mut found: Option<(usize, RouteEvent<'a>)> = None;
256          for (key, route) in self.endpoint.services.iter() {
257            if key < cursor {
258              continue;
259            }
260            // A withdrawing route's service is gone (only its goodbye is still
261            // draining) — never route an incoming question to it, or it could
262            // emit a positive-TTL answer contradicting its own TTL=0 goodbye.
263            // The route is still present for the name guard, just not answered.
264            #[cfg(any(feature = "alloc", feature = "std", feature = "no-atomic"))]
265            if route.withdrawing {
266              continue;
267            }
268            if names_match(route.name(), q.qname())
269              || names_match(route.service_type(), q.qname())
270              || names_match(route.host(), q.qname())
271              || route
272                .subtypes
273                .iter()
274                .any(|s| names_match(s, q.qname()))
275              // RFC 6763 §9 service-type enumeration: route the meta-query to
276              // EVERY service; each answerable (Established/Announcing) one emits
277              // its own type PTR, and the cursor below delivers it to each in
278              // turn. An earlier per-type dedup (route only the
279              // lowest-keyed service of each type) is UNSAFE here — routing has
280              // no visibility into Service lifecycle state, so it could pick a
281              // probing/non-answering representative and mask an Established
282              // same-type sibling, leaving a live type unanswered. Two instances
283              // of one type emitting the identical meta-PTR is benign (receivers
284              // dedup the identical RR); true per-type dedup would require
285              // state-aware, cross-service handling at the driver layer.
286              || is_meta_query_name(q.qname())
287            {
288              found = Some((
289                key,
290                RouteEvent::ToService(ToService::new(
291                  route.handle(),
292                  ServiceEvent::Question(
293                    ServiceQuestion::new(q, self.src, self.reader.header().id())
294                      // RFC 6762 §7.2: a TC-bit query spreads its known answers
295                      // across multiple packets — the responder delays longer so
296                      // the follow-up packets accumulate before it suppresses.
297                      .with_truncated(self.reader.header().flags().is_truncated()),
298                  ),
299                )),
300              ));
301              break;
302            }
303          }
304          if let Some((key, ev)) = found {
305            // Advance the service cursor past this key so the next call picks up
306            // where we left off within the same question.
307            self.service_cursor = key.saturating_add(1);
308            return Some(Ok(ev));
309          }
310          // No more matching services for this question: advance to the next
311          // question and reset the per-question service cursor.
312          self.question_idx = self.question_idx.saturating_add(1);
313          self.service_cursor = 0;
314          continue;
315        }
316        Section::Answers => {
317          if self.answer_idx >= self.reader.header().answer_count() {
318            self.section = Section::Authority;
319            continue;
320          }
321          let mut ans = self.reader.answers();
322          for _ in 0..self.answer_idx {
323            let _ = ans.next();
324          }
325          let r = match ans.next() {
326            Some(Ok(r)) => r,
327            Some(Err(e)) => {
328              // skip the rest of the answers section after a
329              // parse error so the iterator terminates instead of
330              // looping on the same error indefinitely.
331              // parse_errors was already bumped in the eager walk in
332              // Endpoint::handle (which covers answers AND additionals);
333              // do NOT bump it here to avoid double-counting.
334              self.section = Section::Authority;
335              return Some(Err(HandleError::Parse(e)));
336            }
337            None => {
338              self.section = Section::Authority;
339              continue;
340            }
341          };
342
343          // route-level TTL=0 guard.  Records with TTL=0 are
344          // mDNS "goodbye" / deletion signals (RFC 6762 §10.1) — the
345          // cache layer already processes them as removals during the
346          // eager loop in `Endpoint::handle`, and `Query::handle_event`
347          // rejects them at the eager-mutation step.  The
348          // remaining hazard is the iterator: emitting service events
349          // (ProbeConflict / HostConflict / KnownAnswer) for a goodbye
350          // would let a peer withdrawing a record trigger our auto-
351          // rename or HostConflict surfacing, and emitting ToQuery
352          // for a goodbye would let callers receive ghost "answers"
353          // from records being withdrawn.  Skip the whole fan-out for
354          // TTL=0 — cache removal is the only correct side effect.
355          if r.ttl() == 0 {
356            self.answer_idx = self.answer_idx.saturating_add(1);
357            self.answer_service_cursor = None;
358            self.answer_service_done = false;
359            self.answer_query_cursor = None;
360            continue;
361          }
362
363          // Service-side fan-out for answer-section records.
364          //
365          // QR=0: records are KAS hints from another querier.
366          //   Emit only KnownAnswer (for KAS suppression on probes).
367          //   ProbeConflict / HostConflict are NEVER emitted here —
368          //   letting a hostile querier trigger our auto-rename by
369          //   mentioning our names in the answer section would be a
370          //   trivial denial-of-service vector.
371          //
372          // QR=1: records are AUTHORITATIVE peer responses.
373          //   Per RFC 6762 §8.1, a probing host MUST treat any
374          //   response (solicited or unsolicited) claiming one of its
375          //   tentative names as a conflict event.  Emit
376          //   ProbeConflict for instance-name matches and HostConflict
377          //   for host-name matches.  Service-type (shared) matches
378          //   are NOT conflicts (multiple services share a type).
379          //
380          // Authority-section records (Section::Authority) fire
381          // ProbeConflict / HostConflict regardless of QR — those are
382          // tentative-probe records.
383          if !self.answer_service_done {
384            let start = self.answer_service_cursor.unwrap_or(0);
385            let next_event = if self.is_response {
386              // QR=1: ProbeConflict / HostConflict via the shared conflict
387              // helper (name + rtype + class gates). Service-type (shared)
388              // names are never conflicts.
389              self.next_service_conflict(&r, start)
390            } else {
391              // QR=0: records are KAS hints. ANY name match (instance / host /
392              // service-type) emits a KnownAnswer for suppression — conflicts
393              // are NEVER routed for QR=0 (a hostile querier mentioning our
394              // names must not trigger auto-rename).
395              //
396              // a QR=0 PTR owned by the DNS-SD service-type enumeration
397              // meta name is a known-answer for the §9 meta reply. Its owner is
398              // none of our RRset names, so fan it out to EVERY service (mirrors
399              // how the meta QUESTION routes to all of them); each
400              // service decides whether the PTR target matches its own type.
401              let mut found: Option<(usize, RouteEvent<'a>)> = None;
402              for (key, route) in self.endpoint.services.iter() {
403                if key < start {
404                  continue;
405                }
406                // A withdrawing route's service is being torn down — never route a
407                // known-answer to it either, matching the question-dispatch and
408                // conflict skips (no dispatch after retirement).
409                #[cfg(any(feature = "alloc", feature = "std", feature = "no-atomic"))]
410                if route.withdrawing {
411                  continue;
412                }
413                if names_match_record(route.name(), &r)
414                  || names_match_record(route.host(), &r)
415                  || names_match_record(route.service_type(), &r)
416                  || is_meta_query_name(r.name())
417                {
418                  found = Some((
419                    key,
420                    RouteEvent::ToService(ToService::new(
421                      route.handle(),
422                      ServiceEvent::KnownAnswer(KnownAnswer::new(self.src, r)),
423                    )),
424                  ));
425                  break;
426                }
427              }
428              found
429            };
430            if let Some((key, ev)) = next_event {
431              self.answer_service_cursor = Some(key.saturating_add(1));
432              return Some(Ok(ev));
433            }
434            // service-side fan-out exhausted for THIS record. Mark
435            // it done (not just reset the cursor to None, which is ambiguous
436            // with "not started") so a later query event in the same record
437            // can't re-enter and replay the conflict/KAS events.
438            self.answer_service_done = true;
439          }
440
441          // Query-side fan-out (QR=1 only).  emit a
442          // ToQuery for every name/type-compatible active query via
443          // `answer_query_cursor`.  This already applied the answer
444          // to the Query state eagerly in `Endpoint::handle`; the
445          // events emitted here are informational only.
446          if self.is_response {
447            let start = self.answer_query_cursor.unwrap_or(0);
448            let mut found: Option<(usize, RouteEvent<'a>)> = None;
449            for (key, q) in self.endpoint.queries.iter() {
450              if key < start {
451                continue;
452              }
453              if q.is_done() || q.terminal_emitted() {
454                continue;
455              }
456              if names_match_record(q.qname(), &r) && qry_query_accepts(q, &r) {
457                found = Some((
458                  key,
459                  RouteEvent::ToQuery(ToQuery::new(q.handle(), QueryEvent::Answer(r))),
460                ));
461                break;
462              }
463            }
464            if let Some((key, ev)) = found {
465              self.answer_query_cursor = Some(key.saturating_add(1));
466              return Some(Ok(ev));
467            }
468            // Query-side fan-out exhausted for this record.
469            self.answer_query_cursor = None;
470          }
471
472          // Both phases exhausted — record fully processed.  Advance to the
473          // next answer record and reset the per-record fan-out state.
474          self.answer_idx = self.answer_idx.saturating_add(1);
475          self.answer_service_cursor = None;
476          self.answer_service_done = false;
477          self.answer_query_cursor = None;
478          continue;
479        }
480        Section::Authority => {
481          // authority-section records are tentative-probe claims
482          // (RFC 6762 §8.2) — a peer asserting ownership of a name. Routing
483          // them as ProbeConflict / HostConflict forces our service to rename
484          // or surfaces a host conflict, so they MUST come from a trusted mDNS
485          // peer. A genuine prober sends from UDP source port 5353; an
486          // ephemeral-port packet carrying an authority RR for our name is an
487          // off-path / forged artifact — a legacy §6.7 querier sends only
488          // questions, never authority records. (QR=1 responses from non-5353
489          // ports are already fully suppressed upstream; this closes the QR=0
490          // query path, where the Question section has ALREADY been routed
491          // above so legacy unicast repliers are unaffected.)
492          //
493          // accounting rule: suppressing conflict routing for a non-5353
494          // source is a SECTION-LEVEL suppression — the datagram's other
495          // sections (questions, answers, additional) are still processed.
496          // This is NOT a whole-datagram reject, so `packets_dropped` is NOT
497          // bumped here.  Parse errors in the authority section ARE still
498          // counted by the upfront section-validation latch (which walks
499          // authority regardless of source port), so malformed bytes are
500          // always accounted even when conflict routing is suppressed.
501          if self.src.port() != crate::constants::MDNS_PORT {
502            self.section = Section::Additional;
503            continue;
504          }
505          if self.authority_idx >= self.reader.header().authority_count() {
506            self.section = Section::Additional;
507            continue;
508          }
509          let mut auth = self.reader.authority();
510          for _ in 0..self.authority_idx {
511            let _ = auth.next();
512          }
513          let r = match auth.next() {
514            Some(Ok(r)) => r,
515            Some(Err(e)) => {
516              // skip the rest of the authority section after a
517              // parse error so the iterator terminates instead of
518              // looping on the same error indefinitely.
519              // parse_errors was already bumped by the upfront section-
520              // validation latch in Endpoint::handle — do NOT bump again.
521              self.section = Section::Done;
522              return Some(Err(HandleError::Parse(e)));
523            }
524            None => {
525              self.section = Section::Additional;
526              continue;
527            }
528          };
529
530          // route-level TTL=0 guard for the authority section.
531          // covered Section::Answers; the same rationale applies
532          // here.  A TTL=0 authority record is a goodbye/withdrawal,
533          // not a peer claiming the name — emitting ProbeConflict or
534          // HostConflict for it would let a withdrawing peer trigger
535          // our auto-rename or HostConflict surfacing.
536          if r.ttl() == 0 {
537            self.authority_idx = self.authority_idx.saturating_add(1);
538            self.authority_service_cursor = None;
539            continue;
540          }
541
542          // Authority records in mDNS probe messages signal a peer claiming the
543          // same name — route as ProbeConflict / HostConflict to EVERY matching
544          // service (multiple services can share a host) via the shared
545          // conflict helper (name + rtype + class gates centralized there).
546          let start = self.authority_service_cursor.unwrap_or(0);
547          if let Some((key, ev)) = self.next_service_conflict(&r, start) {
548            self.authority_service_cursor = Some(key.saturating_add(1));
549            return Some(Ok(ev));
550          }
551          // No more matching services for this authority record.
552          // Advance to the next authority record.
553          self.authority_idx = self.authority_idx.saturating_add(1);
554          self.authority_service_cursor = None;
555          continue;
556        }
557        Section::Additional => {
558          // additional-section records are supplementary ANSWERS
559          // (DNS-SD SRV/TXT/A/AAAA accompanying a PTR), NOT questions or probe
560          // claims. They fan out to active queries ONLY (QR=1) — never service
561          // conflicts/KAS. Cache population + eager query-state update already
562          // happened in `Endpoint::handle`; these events are informational.
563          if !self.is_response {
564            self.section = Section::Done;
565            continue;
566          }
567          if self.additional_idx >= self.reader.header().additional_count() {
568            self.section = Section::Done;
569            continue;
570          }
571          let mut add = self.reader.additional();
572          for _ in 0..self.additional_idx {
573            let _ = add.next();
574          }
575          let r = match add.next() {
576            Some(Ok(r)) => r,
577            Some(Err(e)) => {
578              // parse_errors was already bumped in the eager walk in
579              // Endpoint::handle (which covers answers AND additionals);
580              // do NOT bump it here to avoid double-counting.
581              self.section = Section::Done;
582              return Some(Err(HandleError::Parse(e)));
583            }
584            None => {
585              self.section = Section::Done;
586              continue;
587            }
588          };
589          // TTL=0 additionals are withdrawals — cache removal already handled
590          // eagerly; do not surface a ghost conflict/answer.
591          if r.ttl() == 0 {
592            self.additional_idx = self.additional_idx.saturating_add(1);
593            self.additional_service_cursor = None;
594            self.additional_service_done = false;
595            self.additional_query_cursor = None;
596            continue;
597          }
598          // service-conflict fan-out FIRST. A QR=1 additional record
599          // can carry a conflicting SRV/TXT (instance) or A/AAAA (host) for one
600          // of our services — DNS-SD responders place these in the Additional
601          // section, so missing them here would let duplicate names survive.
602          // Same unique-record gates as the Answer/Authority sections;
603          // service-type (shared) matches are never conflicts.
604          if !self.additional_service_done {
605            let start = self.additional_service_cursor.unwrap_or(0);
606            if let Some((key, ev)) = self.next_service_conflict(&r, start) {
607              self.additional_service_cursor = Some(key.saturating_add(1));
608              return Some(Ok(ev));
609            }
610            // mark the service phase done for this record so a later
611            // query event can't re-enter and replay the conflict events.
612            self.additional_service_done = true;
613          }
614          // Then query fan-out (informational; eager state update already done).
615          let start = self.additional_query_cursor.unwrap_or(0);
616          let mut found: Option<(usize, RouteEvent<'a>)> = None;
617          for (key, q) in self.endpoint.queries.iter() {
618            if key < start {
619              continue;
620            }
621            if q.is_done() || q.terminal_emitted() {
622              continue;
623            }
624            if names_match_record(q.qname(), &r) && qry_query_accepts(q, &r) {
625              found = Some((
626                key,
627                RouteEvent::ToQuery(ToQuery::new(q.handle(), QueryEvent::Answer(r))),
628              ));
629              break;
630            }
631          }
632          if let Some((key, ev)) = found {
633            self.additional_query_cursor = Some(key.saturating_add(1));
634            return Some(Ok(ev));
635          }
636          // Both fan-outs exhausted for this additional record; advance and
637          // reset the per-record fan-out state.
638          self.additional_idx = self.additional_idx.saturating_add(1);
639          self.additional_service_cursor = None;
640          self.additional_service_done = false;
641          self.additional_query_cursor = None;
642          continue;
643        }
644        Section::Done => return None,
645      }
646    }
647  }
648}