#![allow(warnings)]
use core::time::Duration;
use super::*;
use crate::{
Name, ServiceHandle,
event::{KnownAnswer, ProbeConflict, ServiceEvent},
records::ServiceRecords,
wire::Ref,
};
use std::{borrow::ToOwned as _, string::ToString as _};
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
struct FakeInstant(u64);
impl FakeInstant {
fn zero() -> Self {
Self(0)
}
fn advance(self, ms: u64) -> Self {
Self(self.0 + ms)
}
}
impl crate::Instant for FakeInstant {
fn checked_add_duration(self, dur: Duration) -> Option<Self> {
let ms = dur.as_millis();
u64::try_from(ms)
.ok()
.and_then(|m| self.0.checked_add(m))
.map(Self)
}
fn checked_duration_since(self, earlier: Self) -> Option<Duration> {
self.0.checked_sub(earlier.0).map(Duration::from_millis)
}
}
fn make_records(ttl_secs: u32) -> ServiceRecords {
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("myprinter._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("host.local.").unwrap();
let mut r = ServiceRecords::new(stype, inst, host, 631, ttl_secs);
r.add_a(core::net::Ipv4Addr::new(192, 168, 1, 10));
r
}
#[test]
fn non_probing_service_announces_without_probing() {
let records = make_records(120);
let mut svc: Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>> =
Service::try_new(
ServiceHandle::from_raw(0),
records,
FakeInstant::zero(),
[0u8; 32],
false, );
assert!(
matches!(svc.state(), ServiceState::Announcing(_)),
"non-probing service must start in Announcing, got {:?}",
svc.state()
);
let mut buf = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
let mut ever_probed = false;
for _ in 0..20 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if matches!(svc.state(), ServiceState::Probing(_)) {
ever_probed = true;
}
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf) {
svc.note_transmit_delivered(now);
}
if svc.state() == ServiceState::Established {
break;
}
}
assert!(!ever_probed, "non-probing service must never enter Probing");
assert_eq!(
svc.state(),
ServiceState::Established,
"non-probing service must reach Established"
);
assert!(
svc.advertises_host(),
"having announced, the non-probing service advertises its host records"
);
}
fn make_service(
ttl_secs: u32,
) -> Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>> {
let handle = ServiceHandle::from_raw(0);
let records = make_records(ttl_secs);
Service::try_new(handle, records, FakeInstant::zero(), [0u8; 32], true)
}
impl GoodbyeOwnership {
fn mark_instance(&mut self) {
self.ptr = true;
self.srv = true;
self.txt = true;
}
}
fn make_a_record_ref(buf: &mut std::vec::Vec<u8>, name_str: &str, ttl: u32, addr: [u8; 4]) {
buf.clear();
for label in name_str.trim_end_matches('.').split('.') {
buf.push(label.len() as u8);
buf.extend_from_slice(label.as_bytes());
}
buf.push(0u8);
buf.extend_from_slice(&1u16.to_be_bytes()); buf.extend_from_slice(&1u16.to_be_bytes()); buf.extend_from_slice(&ttl.to_be_bytes());
buf.extend_from_slice(&4u16.to_be_bytes()); buf.extend_from_slice(&addr);
}
fn make_srv_record_ref(
buf: &mut std::vec::Vec<u8>,
owner_str: &str,
ttl: u32,
priority: u16,
weight: u16,
port: u16,
target_str: &str,
) {
buf.clear();
for label in owner_str.trim_end_matches('.').split('.') {
buf.push(label.len() as u8);
buf.extend_from_slice(label.as_bytes());
}
buf.push(0u8);
buf.extend_from_slice(&33u16.to_be_bytes()); buf.extend_from_slice(&1u16.to_be_bytes()); buf.extend_from_slice(&ttl.to_be_bytes());
let mut rdata: std::vec::Vec<u8> = std::vec::Vec::new();
rdata.extend_from_slice(&priority.to_be_bytes());
rdata.extend_from_slice(&weight.to_be_bytes());
rdata.extend_from_slice(&port.to_be_bytes());
for label in target_str.trim_end_matches('.').split('.') {
rdata.push(label.len() as u8);
rdata.extend_from_slice(label.as_bytes());
}
rdata.push(0u8);
#[allow(clippy::cast_possible_truncation)]
buf.extend_from_slice(&(rdata.len() as u16).to_be_bytes()); buf.extend_from_slice(&rdata);
}
fn make_txt_record_ref(buf: &mut std::vec::Vec<u8>, owner_str: &str, ttl: u32, segments: &[&[u8]]) {
buf.clear();
for label in owner_str.trim_end_matches('.').split('.') {
buf.push(label.len() as u8);
buf.extend_from_slice(label.as_bytes());
}
buf.push(0u8);
buf.extend_from_slice(&16u16.to_be_bytes()); buf.extend_from_slice(&1u16.to_be_bytes()); buf.extend_from_slice(&ttl.to_be_bytes());
let mut rdata: std::vec::Vec<u8> = std::vec::Vec::new();
for seg in segments {
#[allow(clippy::cast_possible_truncation)]
rdata.push(seg.len() as u8);
rdata.extend_from_slice(seg);
}
#[allow(clippy::cast_possible_truncation)]
buf.extend_from_slice(&(rdata.len() as u16).to_be_bytes()); buf.extend_from_slice(&rdata);
}
#[test]
fn service_resumes_probing_after_rename() {
let mut svc = make_service(120);
let t0 = FakeInstant::zero();
svc.handle_timeout(t0).unwrap();
assert!(
svc.last_now.is_some(),
"last_now should be set after first handle_timeout"
);
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
"myprinter._ipp._tcp.local.",
120,
0, 0, 9999, "host.local.",
);
let (record_ref, _) = Ref::try_parse(&buf, 0).unwrap();
let peer_src_a: core::net::SocketAddr = "192.168.1.99:5353".parse().unwrap();
let conflict = ProbeConflict::new(peer_src_a, record_ref);
svc.handle_event(ServiceEvent::ProbeConflict(conflict), t0);
assert!(
svc.tiebreak_pending,
"tiebreak_pending must be set after ProbeConflict"
);
assert_eq!(
svc.peer_probes.len(),
1,
"one peer probe bucket must be created"
);
let t1 = t0.advance(500);
svc.handle_timeout(t1).unwrap();
assert_eq!(
svc.state(),
ServiceState::Init,
"state must return to Init after tiebreak rename"
);
assert!(
svc.lifecycle_deadline.is_some(),
"lifecycle_deadline must be scheduled after rename so probing resumes"
);
assert!(
svc.pending_transmits.iter().all(|s| s.is_none()),
"pending_transmits must be cleared on rename to avoid re-sending a stale probe"
);
assert!(
svc.response_deadline.is_none(),
"response_deadline must be cleared on rename"
);
assert!(
svc.name().as_str().contains("-1"),
"instance name should include a rename suffix: got {}",
svc.name().as_str()
);
let t2 = t1.advance(500);
svc.handle_timeout(t2).unwrap();
assert!(
matches!(svc.state(), ServiceState::Probing(_) | ServiceState::Init),
"service should be Probing or Init (if probe delay scheduled) after second tick; got {:?}",
svc.state()
);
let t3 = t2.advance(500);
svc.handle_timeout(t3).unwrap();
assert!(
svc.state().is_probing(),
"service must be Probing after rename + two handle_timeout ticks; got {:?}",
svc.state()
);
}
fn inject_question_to_set_response_deadline(
svc: &mut Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>,
now: FakeInstant,
) {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "0.0.0.0:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0)),
now,
);
}
#[test]
fn kas_does_not_suppress_below_half_ttl() {
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl);
let now = drive_to_established(&mut svc);
inject_question_to_set_response_deadline(&mut svc, now);
let querier_ttl: u32 = 30;
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_a_record_ref(&mut buf, "host.local.", querier_ttl, [192, 168, 1, 10]);
let (record_ref, _) = Ref::try_parse(&buf, 0).unwrap();
let ka = KnownAnswer::new("0.0.0.0:5353".parse().unwrap(), record_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka), now);
let hint_count = svc.kas_hints.iter().filter(|s| s.is_some()).count();
assert_eq!(
hint_count, 0,
"KAS hint with querier TTL {querier_ttl} < half of our TTL {our_ttl} must be dropped; \
found {hint_count} hint(s) stored"
);
}
#[test]
fn kas_suppresses_at_or_above_half_ttl() {
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl);
let now = drive_to_established(&mut svc);
inject_question_to_set_response_deadline(&mut svc, now);
let querier_ttl: u32 = 60;
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_a_record_ref(&mut buf, "host.local.", querier_ttl, [192, 168, 1, 10]);
let (record_ref, _) = Ref::try_parse(&buf, 0).unwrap();
let ka = KnownAnswer::new("0.0.0.0:5353".parse().unwrap(), record_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka), now);
let hint_count = svc.kas_hints.iter().filter(|s| s.is_some()).count();
assert_eq!(
hint_count, 1,
"KAS hint with querier TTL {querier_ttl} == half of our TTL {our_ttl} should be stored; \
found {hint_count} hint(s)"
);
}
#[test]
fn kas_wrong_class_known_answer_does_not_suppress() {
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
inject_question_to_set_response_deadline(&mut svc, now);
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "host.local.".trim_end_matches('.').split('.') {
buf.push(label.len() as u8);
buf.extend_from_slice(label.as_bytes());
}
buf.push(0u8);
buf.extend_from_slice(&1u16.to_be_bytes()); buf.extend_from_slice(&255u16.to_be_bytes()); buf.extend_from_slice(&60u32.to_be_bytes()); buf.extend_from_slice(&4u16.to_be_bytes()); buf.extend_from_slice(&[192, 168, 1, 10]);
let (record_ref, _) = Ref::try_parse(&buf, 0).unwrap();
let ka = KnownAnswer::new("0.0.0.0:5353".parse().unwrap(), record_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka), now);
let hint_count = svc.kas_hints.iter().filter(|s| s.is_some()).count();
assert_eq!(
hint_count, 0,
"a CLASS=ANY known-answer must NOT be stored as a KAS suppressor; found {hint_count}"
);
}
fn drive_to_established(
svc: &mut Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>,
) -> FakeInstant {
let mut buf = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
for _ in 0..20 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf) {
svc.note_transmit_delivered(now);
}
if svc.state() == ServiceState::Established {
return now;
}
}
panic!(
"service did not reach Established within 20 ticks; state={:?}",
svc.state()
);
}
#[test]
fn empty_txt_encodes_as_single_zero_length_string() {
let mut svc = make_service(120);
let mut buf = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
let mut txt_rdata: Option<std::vec::Vec<u8>> = None;
'drive: for _ in 0..20 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
while let Ok(Some(tx)) = svc.poll_transmit(now, &mut buf) {
let reader = crate::wire::MessageReader::try_parse(buf.get(..tx.size()).unwrap()).unwrap();
for rec in reader.answers() {
let rec = rec.unwrap();
if rec.rtype() == crate::wire::ResourceType::Txt {
txt_rdata = Some(rec.rdata().to_vec());
break 'drive;
}
}
svc.note_transmit_result(now, true);
}
}
assert_eq!(
txt_rdata.as_deref(),
Some(&[0u8][..]),
"an empty TXT record must encode as a single zero-length string (one 0x00 byte)"
);
}
#[test]
fn rename_handoff_withdraws_only_advertised_instance_records() {
let mut svc = make_service(120);
svc.handle_timeout(FakeInstant::zero()).unwrap(); svc.goodbye.ptr = true;
let mut sbuf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut sbuf,
"myprinter._ipp._tcp.local.",
120,
0,
0,
9999,
"host.local.",
);
let (rec, _) = Ref::try_parse(&sbuf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.200:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, rec)),
FakeInstant::zero(),
);
let now = FakeInstant::zero().advance(500);
svc.handle_timeout(now).unwrap();
assert!(
svc.name().as_str().contains("-1"),
"service should have renamed"
);
let RenameGoodbyeHandoff {
owned: old_owned, ..
} = svc
.take_rename_goodbye_handoff()
.expect("a rename of an (PTR-)announced service must hand off the old-name goodbye");
assert!(
old_owned.ptr(),
"the advertised PTR is in the handoff ownership"
);
assert!(
!old_owned.srv() && !old_owned.txt(),
"the KAS-suppressed SRV/TXT must NOT be in the handoff ownership"
);
assert!(
old_owned.a_slice().is_empty() && old_owned.aaaa_slice().is_empty(),
"a rename handoff is instance-only — never host A/AAAA"
);
}
#[test]
fn advertised_host_addrs_are_the_emitted_subset_not_configured() {
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("p._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let mut records = ServiceRecords::new(stype, inst, host, 631, 120);
let a1 = core::net::Ipv4Addr::new(10, 0, 0, 2);
let a2 = core::net::Ipv4Addr::new(10, 0, 0, 3);
records.add_a(a1);
records.add_a(a2);
let mut svc: Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>> =
Service::try_new(
ServiceHandle::from_raw(0),
records,
FakeInstant::zero(),
[0u8; 32],
true,
);
assert!(
svc.advertised_a_addrs().is_empty(),
"nothing advertised before any confirmed send"
);
svc.goodbye.record_emitted(&respond::EmittedRecords::new(
false,
false,
false,
std::vec![a2],
std::vec::Vec::new(),
false,
));
assert_eq!(
svc.advertised_a_addrs(),
[a2],
"only the emitted address is advertised"
);
assert_eq!(
svc.records().a_addrs_slice(),
[a1, a2],
"the configured set still has both — advertised must NOT equal configured"
);
}
#[test]
fn announce_guards_latch_only_on_confirmed_delivery() {
let mut svc = make_service(120);
let mut buf = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
let mut held_unconfirmed_announcement = false;
'drive: for _ in 0..20 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
while let Ok(Some(_)) = svc.poll_transmit(now, &mut buf) {
if matches!(svc.awaiting_confirm, Some(AwaitingConfirm::Announcement(_))) {
held_unconfirmed_announcement = true;
break 'drive;
}
svc.note_transmit_result(now, true);
}
}
assert!(
held_unconfirmed_announcement,
"service should have produced an announcement within 20 ticks"
);
assert!(
!svc.advertises_host(),
"host ownership must NOT latch until a send is confirmed"
);
svc.note_transmit_delivered(now);
assert!(
svc.advertises_host(),
"host ownership must latch on confirmed delivery"
);
}
#[test]
fn announce_phase_does_not_advance_without_confirmed_send() {
let mut svc = make_service(120);
let mut buf = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
for _ in 0..20 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
while let Ok(Some(_)) = svc.poll_transmit(now, &mut buf) {
svc.note_transmit_result(now, true);
}
if matches!(svc.state(), ServiceState::Announcing(0)) {
break;
}
}
assert!(
matches!(svc.state(), ServiceState::Announcing(0)),
"should reach Announcing(0); got {:?}",
svc.state()
);
for _ in 0..10 {
now = now.advance(1000);
svc.handle_timeout(now).unwrap();
assert!(
svc.poll_transmit(now, &mut buf).unwrap().is_some(),
"an announcement must be (re)emitted each cycle while unconfirmed"
);
svc.note_transmit_result(now, false); assert!(
matches!(svc.state(), ServiceState::Announcing(0)),
"phase must NOT advance without a confirmed send; got {:?}",
svc.state()
);
}
let mut saw_established = false;
while let Some(u) = svc.poll() {
if matches!(u, ServiceUpdate::Established) {
saw_established = true;
}
}
assert!(
!saw_established,
"Established must NOT be emitted while no announcement was confirmed"
);
now = now.advance(1000);
svc.handle_timeout(now).unwrap();
assert!(svc.poll_transmit(now, &mut buf).unwrap().is_some());
svc.note_transmit_delivered(now);
assert!(
matches!(svc.state(), ServiceState::Announcing(1)),
"phase advances on the first confirmed announcement; got {:?}",
svc.state()
);
}
#[test]
fn probe_sequence_does_not_advance_without_confirmed_send() {
let mut svc = make_service(120);
let mut buf = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
let mut probes_emitted = 0usize;
for _ in 0..50 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
while let Ok(Some(_)) = svc.poll_transmit(now, &mut buf) {
if matches!(svc.awaiting_confirm, Some(AwaitingConfirm::Probe)) {
probes_emitted += 1;
}
svc.note_transmit_result(now, false);
}
assert!(
matches!(svc.state(), ServiceState::Init | ServiceState::Probing(_)),
"a service whose probes never reach the link must not leave probing; got {:?}",
svc.state()
);
}
assert!(
probes_emitted >= 3,
"the probe must be RETRIED (re-emitted) when its send is never confirmed; emitted {probes_emitted}"
);
assert!(
matches!(svc.state(), ServiceState::Probing(0)),
"with no confirmed probe the service stays at the first probe; got {:?}",
svc.state()
);
assert!(
!svc.advertises_host(),
"an un-probed service must never latch host advertisement (no goodbye ownership)"
);
now = now.advance(500);
svc.handle_timeout(now).unwrap();
assert!(svc.poll_transmit(now, &mut buf).unwrap().is_some());
svc.note_transmit_result(now, true);
assert!(
matches!(svc.state(), ServiceState::Probing(1)),
"a confirmed probe advances the sequence; got {:?}",
svc.state()
);
}
#[test]
fn no_goodbye_after_final_probe_before_first_announcement() {
let mut svc = make_service(120);
let mut buf = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
let mut reached = false;
for _ in 0..20 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if svc.state() == ServiceState::Announcing(0) {
reached = true;
break;
}
while let Ok(Some(_)) = svc.poll_transmit(now, &mut buf) {
svc.note_transmit_result(now, true);
}
}
assert!(
reached,
"service should reach Announcing(0) within 20 ticks"
);
let snap = svc.withdrawal_snapshot();
assert!(
!snap.owned.ptr()
&& !snap.owned.srv()
&& !snap.owned.txt()
&& !snap.owned.subtypes()
&& snap.host_a.is_empty()
&& snap.host_aaaa.is_empty(),
"no goodbye until an announcement has actually been emitted"
);
}
#[test]
fn delivered_response_before_first_announcement_latches_goodbye_ownership() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let mut buf = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
'drive: for _ in 0..20 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
while let Ok(Some(_)) = svc.poll_transmit(now, &mut buf) {
svc.note_transmit_result(now, true);
if matches!(svc.state(), ServiceState::Announcing(0)) {
break 'drive;
}
}
}
assert!(matches!(svc.state(), ServiceState::Announcing(0)));
assert!(
!svc.advertises_host() && !svc.goodbye.any_instance(),
"precondition: nothing advertised/withdrawable before any send"
);
let legacy_src: core::net::SocketAddr = "192.0.2.9:40000".parse().unwrap();
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, legacy_src, 0x4242)),
now,
);
let tx = svc
.poll_transmit(now, &mut buf)
.unwrap()
.expect("a legacy reply should be emitted");
assert_eq!(
tx.dst(),
legacy_src,
"legacy reply is unicast to the querier"
);
match &svc.awaiting_confirm {
Some(AwaitingConfirm::Response(e, _)) => assert!(
e.ptr() && e.srv() && e.txt() && !e.a_slice().is_empty(),
"a legacy reply emits all instance records plus the host A"
),
other => panic!("expected a Response commit token, got {other:?}"),
}
assert!(matches!(svc.state(), ServiceState::Announcing(0)));
svc.note_transmit_result(now, true);
assert!(
svc.goodbye.any_instance(),
"a delivered legacy reply latches the instance records it emitted"
);
assert!(
svc.goodbye.any_host(),
"the legacy reply also emitted the host A, so host ownership latches"
);
assert!(
matches!(svc.state(), ServiceState::Announcing(0)),
"a response must NOT advance the announce phase; got {:?}",
svc.state()
);
}
#[test]
fn legacy_a_query_reply_latches_full_set() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let mut buf = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
'drive: for _ in 0..20 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
while let Ok(Some(_)) = svc.poll_transmit(now, &mut buf) {
svc.note_transmit_result(now, true);
if matches!(svc.state(), ServiceState::Announcing(0)) {
break 'drive;
}
}
}
assert!(matches!(svc.state(), ServiceState::Announcing(0)));
let legacy_src: core::net::SocketAddr = "192.0.2.9:40000".parse().unwrap();
let host_str = svc.records.host().as_str().to_string();
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in host_str.trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&1u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, legacy_src, 0x55)),
now,
);
svc
.poll_transmit(now, &mut buf)
.unwrap()
.expect("a legacy A reply should be emitted");
match &svc.awaiting_confirm {
Some(AwaitingConfirm::Response(e, _)) => assert!(
e.ptr() && e.srv() && e.txt() && !e.a_slice().is_empty(),
"an A-query legacy reply still emits the instance records and the host A"
),
other => panic!("expected a Response commit token, got {other:?}"),
}
svc.note_transmit_result(now, true);
assert!(
svc.goodbye.any_host(),
"a host A reply latches host ownership"
);
assert!(
svc.goodbye.any_instance(),
"the full legacy reply also emitted the instance records, so they latch too"
);
}
#[test]
fn goodbye_ownership_accumulates_and_resets_instance_only() {
let ip = core::net::Ipv4Addr::new(192, 168, 1, 10);
let mut g = GoodbyeOwnership::default();
assert!(!g.any_instance() && !g.any_host());
g.record_emitted(&respond::EmittedRecords::new(
true,
false,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
));
assert!(
g.ptr && !g.srv && g.txt,
"only the emitted instance records latch"
);
assert!(g.any_instance() && !g.any_host());
g.record_emitted(&respond::EmittedRecords::new(
false,
false,
false,
std::vec![ip],
std::vec::Vec::new(),
false,
));
assert!(
g.any_instance() && g.any_host(),
"records accumulate independently"
);
assert_eq!(g.a, [ip], "the emitted address is tracked");
g.record_emitted(&respond::EmittedRecords::new(
false,
false,
false,
std::vec![ip],
std::vec::Vec::new(),
false,
));
assert_eq!(g.a, [ip], "duplicate address emit is idempotent");
g.reset_instance(); assert!(
!g.any_instance() && g.any_host(),
"rename drops instance records but the host name is unchanged"
);
assert_eq!(g.a, [ip], "host addresses survive the rename");
}
fn response_dst_for(qclass_raw: u16, src: core::net::SocketAddr) -> core::net::SocketAddr {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let mut now = drive_to_established(&mut svc);
let mut buf = std::vec![0u8; 4096];
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&qclass_raw.to_be_bytes());
let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0)),
now,
);
now = now.advance(200); svc.handle_timeout(now).unwrap();
match svc.poll_transmit(now, &mut buf).unwrap() {
Some(t) => t.dst(),
None => panic!("expected a response transmit"),
}
}
#[test]
fn unicast_response_routing() {
let legacy: core::net::SocketAddr = "192.0.2.5:40000".parse().unwrap();
assert_eq!(response_dst_for(0x0001, legacy), legacy);
let qu: core::net::SocketAddr = "192.0.2.6:5353".parse().unwrap();
assert_eq!(response_dst_for(0x8001, qu), respond::multicast_dst());
let qm: core::net::SocketAddr = "192.0.2.7:5353".parse().unwrap();
assert_eq!(response_dst_for(0x0001, qm), respond::multicast_dst());
}
#[test]
fn truncated_query_delays_response_to_400_500ms() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut buf = std::vec![0u8; 4096];
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "192.0.2.7:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0).with_truncated(true)),
now,
);
let t200 = now.advance(200);
svc.handle_timeout(t200).unwrap();
assert!(
svc.poll_transmit(t200, &mut buf).unwrap().is_none(),
"§7.2: a TC-bit response must not fire within the normal 20–120 ms window"
);
let t500 = now.advance(500);
svc.handle_timeout(t500).unwrap();
assert!(
svc.poll_transmit(t500, &mut buf).unwrap().is_some(),
"§7.2: the delayed TC-bit response must fire by 500 ms"
);
}
#[test]
fn truncated_then_normal_question_coalesces_to_earliest_deadline() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut buf = std::vec![0u8; 4096];
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "192.0.2.7:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0).with_truncated(true)),
now,
);
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0)),
now,
);
let t200 = now.advance(200);
svc.handle_timeout(t200).unwrap();
assert!(
svc.poll_transmit(t200, &mut buf).unwrap().is_some(),
"coalesced response must fire in the normal 20–120 ms window, not wait for TC"
);
}
#[test]
fn truncated_meta_query_delays_reply_to_400_500ms() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut buf = std::vec![0u8; 4096];
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_services._dns-sd._udp.local."
.trim_end_matches('.')
.split('.')
{
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "192.0.2.7:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0).with_truncated(true)),
now,
);
let t200 = now.advance(200);
svc.handle_timeout(t200).unwrap();
assert!(
svc.poll_transmit(t200, &mut buf).unwrap().is_none(),
"§7.2: a TC meta-query reply must not fire within 20–120 ms"
);
let t500 = now.advance(500);
svc.handle_timeout(t500).unwrap();
assert!(
svc.poll_transmit(t500, &mut buf).unwrap().is_some(),
"§7.2: the delayed TC meta-query reply must fire by 500 ms"
);
}
fn meta_reply_fires(with_ka: bool, ka_from_questioner: bool, ka_ttl: u32) -> bool {
use crate::{
event::{KnownAnswer, ServiceQuestion},
wire::{QuestionRef, Ref},
};
let mut svc = make_service(120); let now = drive_to_established(&mut svc);
let qsrc: core::net::SocketAddr = "192.0.2.7:5353".parse().unwrap();
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_services._dns-sd._udp.local."
.trim_end_matches('.')
.split('.')
{
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, qsrc, 0)),
now,
);
let mut kbuf: std::vec::Vec<u8> = std::vec::Vec::new();
if with_ka {
for label in "_services._dns-sd._udp.local."
.trim_end_matches('.')
.split('.')
{
kbuf.push(label.len() as u8);
kbuf.extend_from_slice(label.as_bytes());
}
kbuf.push(0u8);
kbuf.extend_from_slice(&12u16.to_be_bytes()); kbuf.extend_from_slice(&1u16.to_be_bytes()); kbuf.extend_from_slice(&ka_ttl.to_be_bytes());
let mut rdata: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
rdata.push(label.len() as u8);
rdata.extend_from_slice(label.as_bytes());
}
rdata.push(0u8);
kbuf.extend_from_slice(&(rdata.len() as u16).to_be_bytes());
kbuf.extend_from_slice(&rdata);
let ka_src: core::net::SocketAddr = if ka_from_questioner {
qsrc
} else {
"192.0.2.99:5353".parse().unwrap()
};
let (rref, _) = Ref::try_parse(&kbuf, 0).unwrap();
svc.handle_event(
ServiceEvent::KnownAnswer(KnownAnswer::new(ka_src, rref)),
now,
);
}
let t = now.advance(200); svc.handle_timeout(t).unwrap();
let mut buf = std::vec![0u8; 4096];
svc.poll_transmit(t, &mut buf).unwrap().is_some()
}
#[test]
fn meta_query_known_answer_suppression() {
assert!(
meta_reply_fires(false, false, 0),
"baseline: a meta-query with no known-answer must elicit our meta-PTR reply"
);
assert!(
!meta_reply_fires(true, true, 120),
"§7.1: a meta questioner already holding our service-type PTR suppresses the reply"
);
assert!(
meta_reply_fires(true, false, 120),
"the meta known-answer must come from a meta questioner source"
);
assert!(
meta_reply_fires(true, true, 10),
"§7.1 half-TTL: a low-TTL known-answer must NOT suppress (our TTL 120, half 60)"
);
}
#[test]
fn meta_kas_not_suppressed_when_multiple_meta_questioners() {
use crate::{
event::{KnownAnswer, ServiceQuestion},
wire::{QuestionRef, Ref},
};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let src_a: core::net::SocketAddr = "192.0.2.7:5353".parse().unwrap();
let src_b: core::net::SocketAddr = "192.0.2.8:5353".parse().unwrap();
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_services._dns-sd._udp.local."
.trim_end_matches('.')
.split('.')
{
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src_a, 0)),
now,
);
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src_b, 0)),
now,
);
let mut kbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_services._dns-sd._udp.local."
.trim_end_matches('.')
.split('.')
{
kbuf.push(label.len() as u8);
kbuf.extend_from_slice(label.as_bytes());
}
kbuf.push(0u8);
kbuf.extend_from_slice(&12u16.to_be_bytes()); kbuf.extend_from_slice(&1u16.to_be_bytes()); kbuf.extend_from_slice(&120u32.to_be_bytes());
let mut rdata: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
rdata.push(label.len() as u8);
rdata.extend_from_slice(label.as_bytes());
}
rdata.push(0u8);
kbuf.extend_from_slice(&(rdata.len() as u16).to_be_bytes());
kbuf.extend_from_slice(&rdata);
let (rref, _) = Ref::try_parse(&kbuf, 0).unwrap();
svc.handle_event(
ServiceEvent::KnownAnswer(KnownAnswer::new(src_a, rref)),
now,
);
let t = now.advance(200);
svc.handle_timeout(t).unwrap();
let mut buf = std::vec![0u8; 4096];
assert!(
svc.poll_transmit(t, &mut buf).unwrap().is_some(),
"two coalesced meta questioners — one source's known-answer must NOT \
suppress the meta reply the other still needs"
);
}
#[test]
fn legacy_response_echoes_id_and_question_and_caps_ttl() {
use crate::{
event::ServiceQuestion,
wire::{MessageReader, QuestionRef},
};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut buf = std::vec![0u8; 4096];
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "192.0.2.9:33333".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0x1234)),
now,
);
let tx = svc
.poll_transmit(now, &mut buf)
.unwrap()
.expect("a legacy querier must get a unicast response");
assert_eq!(tx.dst(), src);
let msg = buf.get(..tx.size()).unwrap();
let reader = MessageReader::try_parse(msg).unwrap();
assert_eq!(reader.header().id(), 0x1234, "must echo the query ID");
assert_eq!(
reader.header().question_count(),
1,
"must echo the question"
);
let q = reader.questions().next().unwrap().unwrap();
assert!(q.qtype().is_ptr(), "echoed question keeps its qtype");
let mut answers = 0usize;
for rec in reader.answers() {
let rec = rec.unwrap();
assert!(
rec.ttl() <= respond::LEGACY_UNICAST_MAX_TTL_SECS,
"legacy answer TTL must be capped at 10s, got {}",
rec.ttl()
);
answers += 1;
}
assert!(answers > 0, "legacy response must carry the answers");
}
#[test]
fn coalesced_legacy_queriers_each_get_a_response() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut buf = std::vec![0u8; 4096];
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes());
qbuf.extend_from_slice(&1u16.to_be_bytes());
let a: core::net::SocketAddr = "192.0.2.10:40000".parse().unwrap();
let b: core::net::SocketAddr = "192.0.2.11:40001".parse().unwrap();
for s in [a, b] {
let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, s, 7)),
now,
);
}
let mut dsts: std::vec::Vec<core::net::SocketAddr> = std::vec::Vec::new();
while let Some(t) = svc.poll_transmit(now, &mut buf).unwrap() {
dsts.push(t.dst());
svc.note_transmit_result(now, true); }
assert!(
dsts.contains(&a) && dsts.contains(&b),
"both coalesced legacy queriers must get a reply; got {:?}",
dsts
);
}
#[test]
fn same_source_distinct_legacy_transactions_each_reply() {
use crate::{
event::ServiceQuestion,
wire::{MessageReader, QuestionRef},
};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut buf = std::vec![0u8; 4096];
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes());
qbuf.extend_from_slice(&1u16.to_be_bytes());
let src: core::net::SocketAddr = "192.0.2.12:40000".parse().unwrap();
for id in [11u16, 22u16] {
let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, id)),
now,
);
}
let mut ids: std::vec::Vec<u16> = std::vec::Vec::new();
while let Some(t) = svc.poll_transmit(now, &mut buf).unwrap() {
assert_eq!(t.dst(), src);
let msg = buf.get(..t.size()).unwrap();
ids.push(MessageReader::try_parse(msg).unwrap().header().id());
svc.note_transmit_result(now, true); }
assert!(
ids.contains(&11) && ids.contains(&22),
"each distinct transaction (by query ID) must get its own reply; got {:?}",
ids
);
}
#[test]
fn oversized_legacy_response_is_dropped_not_errored() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes());
qbuf.extend_from_slice(&1u16.to_be_bytes());
let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "192.0.2.9:40000".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 1)),
now,
);
assert!(!svc.pending_legacy.is_empty(), "legacy reply was queued");
let mut tiny = [0u8; 16];
match svc.poll_transmit(now, &mut tiny) {
Ok(None) => {}
Ok(Some(_)) => panic!("did not expect a transmit into a too-small buffer"),
Err(e) => panic!("legacy encode failure must not surface as Err: {e:?}"),
}
assert!(
svc.pending_legacy.is_empty(),
"the un-encodable legacy entry must be dropped, not left stuck"
);
}
#[test]
fn kas_rejects_hints_from_non_questioner_source() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl);
let now = drive_to_established(&mut svc);
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src_a: core::net::SocketAddr = "10.0.0.1:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src_a, 0)),
now,
);
assert!(svc.response_deadline.is_some());
let querier_ttl: u32 = our_ttl;
let mut rec_buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_a_record_ref(&mut rec_buf, "host.local.", querier_ttl, [192, 168, 1, 10]);
let (record_ref, _) = Ref::try_parse(&rec_buf, 0).unwrap();
let src_b: core::net::SocketAddr = "10.0.0.99:5353".parse().unwrap();
let ka = KnownAnswer::new(src_b, record_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka), now);
let hint_count = svc.kas_hints.iter().filter(|s| s.is_some()).count();
assert_eq!(
hint_count, 0,
"KAS hints from a source that did not ask a question must be dropped; \
found {hint_count} hint(s)"
);
let ka2 = KnownAnswer::new(src_a, record_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka2), now);
let hint_count = svc.kas_hints.iter().filter(|s| s.is_some()).count();
assert_eq!(
hint_count, 1,
"control: hint from the legitimate questioner src_a must land; got {hint_count}"
);
}
#[test]
fn kas_disabled_when_multiple_questioners_coalesced() {
use crate::{
event::ServiceQuestion,
wire::{MessageReader, QuestionRef, ResourceType},
};
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl);
let now = drive_to_established(&mut svc);
let inject_q =
|svc: &mut Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>,
src: &str,
now: FakeInstant| {
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes());
qbuf.extend_from_slice(&1u16.to_be_bytes());
let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = src.parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0)),
now,
);
};
inject_q(&mut svc, "10.0.0.1:5353", now);
inject_q(&mut svc, "10.0.0.2:5353", now);
let querier_ttl: u32 = our_ttl;
let mut rec_buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_a_record_ref(&mut rec_buf, "host.local.", querier_ttl, [192, 168, 1, 10]);
let (record_ref, _) = Ref::try_parse(&rec_buf, 0).unwrap();
let src_b: core::net::SocketAddr = "10.0.0.2:5353".parse().unwrap();
let ka = KnownAnswer::new(src_b, record_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka), now);
let rd = svc.response_deadline.unwrap();
svc.handle_timeout(rd).unwrap();
let mut buf = std::vec![0u8; 4096];
let tx = svc
.poll_transmit(rd, &mut buf)
.unwrap()
.expect("response must be emitted");
let written = &buf[..tx.size()];
let reader = MessageReader::try_parse(written).expect("valid DNS");
let mut found_a = false;
for rr in reader.answers() {
let rr = rr.expect("answer must parse");
if rr.rtype() == ResourceType::A && rr.rdata() == [192, 168, 1, 10] {
found_a = true;
break;
}
}
assert!(
found_a,
"A record must NOT be suppressed when multiple questioners coalesce \
and only one supplied a matching KAS hint"
);
}
#[test]
fn kas_does_not_suppress_unsolicited_announcement() {
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl);
let now = drive_to_established(&mut svc);
assert_eq!(svc.state(), ServiceState::Established);
inject_question_to_set_response_deadline(&mut svc, now);
let querier_ttl: u32 = our_ttl;
let mut rec_buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_a_record_ref(&mut rec_buf, "host.local.", querier_ttl, [192, 168, 1, 10]);
let (record_ref, _) = Ref::try_parse(&rec_buf, 0).unwrap();
let ka = KnownAnswer::new("0.0.0.0:5353".parse().unwrap(), record_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka), now);
let hint_count = svc.kas_hints.iter().filter(|s| s.is_some()).count();
assert_eq!(hint_count, 1, "KAS hint for A record should be stored");
let now_reannounce = now.advance(u64::from(our_ttl) * 1000 + 1000);
svc.handle_timeout(now_reannounce).unwrap();
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Announcement),
"re-announce after deadline must produce Announcement kind"
);
let mut out = std::vec![0u8; 4096];
let transmit = svc
.poll_transmit(now_reannounce, &mut out)
.unwrap()
.expect("poll_transmit must return Some for a pending Announcement");
let written = &out[..transmit.size()];
let reader =
crate::wire::MessageReader::try_parse(written).expect("datagram must be a valid DNS message");
let a_found = reader.answers().any(|rr| {
if let Ok(rr) = rr {
matches!(rr.rtype(), crate::wire::ResourceType::A)
} else {
false
}
});
assert!(
a_found,
"unsolicited re-announcement must include the A record even when a fresh KAS hint exists; \
KAS filtering must not be applied to Announcement kind"
);
}
#[test]
fn host_conflict_does_not_rename_instance() {
use crate::event::{HostConflict, ServiceEvent};
let mut svc = make_service(120);
let t0 = FakeInstant::zero();
svc.handle_timeout(t0).unwrap();
let original_name = svc.name().as_str().to_owned();
let original_state = svc.state();
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_a_record_ref(&mut buf, "host.local.", 120, [192, 168, 1, 99]);
let (record_ref, _) = Ref::try_parse(&buf, 0).unwrap();
let hc = HostConflict::new(record_ref);
svc.handle_event(ServiceEvent::HostConflict(hc), t0);
assert_eq!(
svc.name().as_str(),
original_name,
"HostConflict must NOT rename the service instance"
);
assert_eq!(
svc.state(),
original_state,
"HostConflict must NOT change the service state"
);
let update = svc
.poll()
.expect("Service::poll() must return Some(ServiceUpdate::HostConflict) after HostConflict");
assert!(
update.is_host_conflict(),
"poll() must return ServiceUpdate::HostConflict, got {:?}",
update
);
assert!(
svc.poll().is_none(),
"only one update must be queued per HostConflict event"
);
}
#[test]
fn host_conflict_ignores_our_own_advertised_address() {
use crate::event::{HostConflict, ServiceEvent};
let mut svc = make_service(120); svc.handle_timeout(FakeInstant::zero()).unwrap();
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_a_record_ref(&mut buf, "host.local.", 120, [192, 168, 1, 10]); let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
svc.handle_event(
ServiceEvent::HostConflict(HostConflict::new(rec)),
FakeInstant::zero(),
);
assert!(
svc.poll().is_none(),
"an identical (our own) host A must not surface a HostConflict"
);
}
#[test]
fn host_conflict_surfaces_for_different_address() {
use crate::event::{HostConflict, ServiceEvent};
let mut svc = make_service(120);
svc.handle_timeout(FakeInstant::zero()).unwrap();
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_a_record_ref(&mut buf, "host.local.", 120, [10, 0, 0, 99]); let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
svc.handle_event(
ServiceEvent::HostConflict(HostConflict::new(rec)),
FakeInstant::zero(),
);
assert!(
svc.poll().is_some_and(|u| u.is_host_conflict()),
"a different host A must surface HostConflict"
);
}
#[test]
fn section9_reprobe_clears_queued_legacy_reply() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
drive_to_established(&mut svc);
let now = FakeInstant::zero().advance(100_000);
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "myprinter._ipp._tcp.local."
.trim_end_matches('.')
.split('.')
{
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&255u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let legacy_src: core::net::SocketAddr = "192.168.1.50:40000".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, legacy_src, 0x99)),
now,
);
assert!(
!svc.pending_legacy.is_empty(),
"a legacy querier must queue a unicast reply"
);
let mut sbuf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut sbuf,
"myprinter._ipp._tcp.local.",
120,
0,
0,
9999,
"host.local.",
);
let (srec, _) = Ref::try_parse(&sbuf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.200:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, srec)),
now,
);
assert_eq!(
svc.state(),
ServiceState::Init,
"§9 conflict must revert to re-probing"
);
assert!(
svc.pending_legacy.is_empty(),
"§9 revert must clear the queued legacy reply (don't answer an unverified name)"
);
}
#[test]
fn conflict_rename_hands_off_old_announced_name() {
let mut svc = make_service(120);
svc.handle_timeout(FakeInstant::zero()).unwrap(); svc.goodbye.mark_instance();
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
"myprinter._ipp._tcp.local.",
120,
0,
0,
9999,
"host.local.",
);
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.200:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, rec)),
FakeInstant::zero(),
);
svc
.handle_timeout(FakeInstant::zero().advance(500))
.unwrap();
assert!(
svc.name().as_str().contains("-1"),
"service should have renamed"
);
let RenameGoodbyeHandoff {
records: old_records,
owned: old_owned,
} = svc
.take_rename_goodbye_handoff()
.expect("a rename of an announced service must hand off the old-name goodbye");
assert_eq!(
old_records.instance().as_str(),
"myprinter._ipp._tcp.local.",
"the handoff carries the OLD instance name (captured before set_instance)"
);
assert!(
old_owned.ptr() && old_owned.srv() && old_owned.txt(),
"the OLD name's advertised instance records (PTR/SRV/TXT) are handed off"
);
assert!(
old_owned.a_slice().is_empty() && old_owned.aaaa_slice().is_empty(),
"a rename never withdraws host A/AAAA — the handoff is instance-only"
);
assert!(
svc.take_rename_goodbye_handoff().is_none(),
"the handoff is consumed by the first take"
);
let mut out = std::vec![0u8; 4096];
if let Ok(Some(t)) = svc.poll_transmit(FakeInstant::zero().advance(500), &mut out) {
let reader = crate::wire::MessageReader::try_parse(&out[..t.size()]).unwrap();
for rr in reader.answers() {
let rr = rr.unwrap();
assert!(
rr.ttl() != 0,
"the Service must not emit any TTL=0 goodbye after a rename — that moved to the endpoint"
);
}
}
}
#[test]
fn host_conflict_for_link_local_address_is_not_suppressed() {
use crate::event::{HostConflict, ServiceEvent};
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("myprinter._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("host.local.").unwrap();
let mut r = ServiceRecords::new(stype, inst, host, 631, 120);
r.add_a(core::net::Ipv4Addr::new(169, 254, 1, 1)); let mut svc: Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>> =
Service::try_new(
ServiceHandle::from_raw(0),
r,
FakeInstant::zero(),
[0u8; 32],
true,
);
svc.handle_timeout(FakeInstant::zero()).unwrap();
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_a_record_ref(&mut buf, "host.local.", 120, [169, 254, 1, 1]); let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
svc.handle_event(
ServiceEvent::HostConflict(HostConflict::new(rec)),
FakeInstant::zero(),
);
assert!(
svc.poll().is_some_and(|u| u.is_host_conflict()),
"a link-local host A must surface HostConflict even when the raw address matches"
);
}
#[test]
fn failed_conflict_rename_clears_stale_transmit_state() {
let long_label = "a".repeat(63);
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str(&std::format!("{long_label}._ipp._tcp.local.")).unwrap();
let host = Name::try_from_str("host.local.").unwrap();
let mut r = ServiceRecords::new(stype, inst, host, 631, 120);
r.add_a(core::net::Ipv4Addr::new(192, 168, 1, 10));
let mut svc: Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>> =
Service::try_new(
ServiceHandle::from_raw(0),
r,
FakeInstant::zero(),
[0u8; 32],
true,
);
svc.handle_timeout(FakeInstant::zero()).unwrap(); svc.pending_transmits[0] = Some(PendingTransmitKind::Probe);
svc.response_deadline = Some(FakeInstant::zero().advance(50));
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
&std::format!("{long_label}._ipp._tcp.local."),
120,
0,
0,
9999,
"host.local.",
);
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.200:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, rec)),
FakeInstant::zero(),
);
svc
.handle_timeout(FakeInstant::zero().advance(500))
.unwrap();
assert_eq!(
svc.state(),
ServiceState::Conflicting,
"invalid rename must go Conflicting"
);
assert_eq!(
svc.pending_transmits,
[None, None],
"failed rename must clear pending transmits"
);
assert!(
svc.response_deadline.is_none(),
"failed rename must clear response_deadline"
);
}
#[test]
fn tiebreak_we_win_continues_probing() {
let mut svc = make_service(120);
let t0 = FakeInstant::zero();
svc.handle_timeout(t0).unwrap();
{
let mut buf_a: std::vec::Vec<u8> = std::vec::Vec::new();
make_a_record_ref(
&mut buf_a,
"myprinter._ipp._tcp.local.",
120,
[192, 168, 1, 10],
);
let (rref_a, _) = Ref::try_parse(&buf_a, 0).unwrap();
let src_a: core::net::SocketAddr = "192.168.1.50:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(src_a, rref_a)),
t0,
);
assert!(
!svc.tiebreak_pending,
"ProbeConflict with an A record must NOT set tiebreak_pending"
);
assert_eq!(
svc.peer_probes.len(),
0,
"A-record ProbeConflict must NOT create a peer-probe bucket"
);
}
let peer_src_win: core::net::SocketAddr = "192.168.1.10:5353".parse().unwrap();
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
"myprinter._ipp._tcp.local.",
120,
0, 0, 80, "host.local.",
);
let (record_ref, _) = Ref::try_parse(&buf, 0).unwrap();
let conflict = ProbeConflict::new(peer_src_win, record_ref);
svc.handle_event(ServiceEvent::ProbeConflict(conflict), t0);
let mut buf_txt: std::vec::Vec<u8> = std::vec::Vec::new();
make_txt_record_ref(&mut buf_txt, "myprinter._ipp._tcp.local.", 120, &[]);
let (txt_ref, _) = Ref::try_parse(&buf_txt, 0).unwrap();
let conflict_txt = ProbeConflict::new(peer_src_win, txt_ref);
svc.handle_event(ServiceEvent::ProbeConflict(conflict_txt), t0);
assert!(svc.tiebreak_pending, "tiebreak_pending must be set");
let state_before = svc.state();
let name_before = svc.name().as_str().to_owned();
let t1 = t0.advance(500);
svc.handle_timeout(t1).unwrap();
assert_eq!(
svc.name().as_str(),
name_before,
"tiebreak win must NOT rename the service"
);
assert!(
!svc.tiebreak_pending,
"tiebreak_pending must be cleared after comparison"
);
assert_eq!(
svc.peer_probes.len(),
0,
"peer_probes must be cleared after tiebreak"
);
assert!(
svc.poll().is_none(),
"no ServiceUpdate::Renamed should be queued when we win the tiebreak"
);
let _ = state_before; assert!(
matches!(svc.state(), ServiceState::Init | ServiceState::Probing(_)),
"state must remain in probing sequence after winning tiebreak; got {:?}",
svc.state()
);
}
#[test]
fn tiebreak_we_lose_renames() {
let mut svc = make_service(120);
let t0 = FakeInstant::zero();
svc.handle_timeout(t0).unwrap();
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
"myprinter._ipp._tcp.local.",
120,
0, 0, 9999, "host.local.",
);
let (record_ref, _) = Ref::try_parse(&buf, 0).unwrap();
let peer_src_lose: core::net::SocketAddr = "192.168.1.200:5353".parse().unwrap();
let conflict = ProbeConflict::new(peer_src_lose, record_ref);
svc.handle_event(ServiceEvent::ProbeConflict(conflict), t0);
assert!(svc.tiebreak_pending);
let original_name = svc.name().as_str().to_owned();
let t1 = t0.advance(500);
svc.handle_timeout(t1).unwrap();
assert!(
svc.name().as_str().contains("-1"),
"tiebreak loss must rename the service (expected '-1' suffix); got {}",
svc.name().as_str()
);
assert_ne!(
svc.name().as_str(),
original_name,
"name must change after tiebreak loss"
);
assert_eq!(
svc.state(),
ServiceState::Init,
"state must reset to Init after tiebreak rename"
);
assert!(!svc.tiebreak_pending, "tiebreak_pending must be cleared");
assert_eq!(svc.peer_probes.len(), 0, "buffer must be cleared");
let update = svc
.poll()
.expect("ServiceUpdate::Renamed must be queued after tiebreak loss");
assert!(
update.is_renamed(),
"update must be Renamed, got {:?}",
update
);
}
#[test]
fn established_service_reprobes_on_different_rdata_conflict() {
let mut svc = make_service(120); drive_to_established(&mut svc);
assert_eq!(svc.state(), ServiceState::Established);
let original = svc.name().as_str().to_owned();
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
"myprinter._ipp._tcp.local.",
120,
0,
0,
9999,
"host.local.",
);
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.200:5353".parse().unwrap();
let t = FakeInstant::zero().advance(100_000);
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, rec)),
t,
);
assert_eq!(
svc.state(),
ServiceState::Init,
"a §9 conflict must revert an Established service to re-probing"
);
assert_eq!(
svc.name().as_str(),
original,
"re-verification must NOT rename the service immediately"
);
assert!(
svc.poll_timeout().is_some(),
"the re-probe deadline must be exposed via poll_timeout"
);
}
#[test]
fn established_service_ignores_identical_rdata() {
let mut svc = make_service(120);
drive_to_established(&mut svc);
assert_eq!(svc.state(), ServiceState::Established);
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
"myprinter._ipp._tcp.local.",
120,
0,
0,
631,
"host.local.",
);
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.200:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, rec)),
FakeInstant::zero().advance(100_000),
);
assert_eq!(
svc.state(),
ServiceState::Established,
"an identical record is consistent rdata, not a conflict — stay Established"
);
}
#[test]
fn conflict_rename_resets_announce_emitted() {
let mut svc = make_service(120);
svc.handle_timeout(FakeInstant::zero()).unwrap(); svc.goodbye.mark_instance();
let mut buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
"myprinter._ipp._tcp.local.",
120,
0,
0,
9999,
"host.local.",
);
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.200:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, rec)),
FakeInstant::zero(),
);
svc
.handle_timeout(FakeInstant::zero().advance(500))
.unwrap();
assert!(
svc.name().as_str().contains("-1"),
"tiebreak loss must rename"
);
assert!(
!svc.goodbye.any_instance(),
"rename must reset announce_emitted so the new name isn't goodbye'd un-announced"
);
assert!(
svc.pending_legacy.is_empty(),
"rename must clear queued legacy replies bound to the old name"
);
}
#[test]
fn question_during_announcing_does_not_shortcut_sequence() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let mut buf4096 = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
loop {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
if matches!(svc.state(), ServiceState::Announcing(_)) {
break;
}
assert!(
now.0 < 10_000,
"service should reach Announcing within 10 s; state={:?}",
svc.state()
);
}
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
for _ in 0..5 {
if matches!(svc.state(), ServiceState::Announcing(1)) {
break;
}
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
}
assert!(
matches!(svc.state(), ServiceState::Announcing(1)),
"should be in Announcing(1); got {:?}",
svc.state()
);
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8); qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "0.0.0.0:5353".parse().unwrap();
let sq = ServiceQuestion::new(qref, src, 0);
svc.handle_event(ServiceEvent::Question(sq), now);
assert!(
svc.response_deadline.is_some(),
"response_deadline must be set after Question"
);
now = now.advance(1); svc.handle_timeout(now).unwrap();
now = now.advance(200); svc.handle_timeout(now).unwrap();
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Response),
"question during Announcing must produce Response kind, not Announcement"
);
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
assert!(
matches!(svc.state(), ServiceState::Announcing(1)),
"state must remain Announcing(1) after response; got {:?}",
svc.state()
);
now = now.advance(2000); svc.handle_timeout(now).unwrap();
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Announcement),
"second announce must produce Announcement kind"
);
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
assert_eq!(
svc.state(),
ServiceState::Established,
"service must reach Established after second announce"
);
}
#[test]
fn tiebreak_two_peers_one_wins_we_lose() {
let mut svc = make_service(120); let t0 = FakeInstant::zero();
svc.handle_timeout(t0).unwrap();
let peer_a: core::net::SocketAddr = "192.168.1.10:5353".parse().unwrap();
let mut buf_a: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf_a,
"myprinter._ipp._tcp.local.",
120,
0, 0, 80, "host.local.",
);
let (rref_a, _) = Ref::try_parse(&buf_a, 0).unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer_a, rref_a)),
t0,
);
let peer_b: core::net::SocketAddr = "192.168.1.200:5353".parse().unwrap();
let mut buf_b: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf_b,
"myprinter._ipp._tcp.local.",
120,
0, 0, 9999, "host.local.",
);
let (rref_b, _) = Ref::try_parse(&buf_b, 0).unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer_b, rref_b)),
t0,
);
assert_eq!(svc.peer_probes.len(), 2, "should have 2 peer probe buckets");
assert!(svc.tiebreak_pending, "tiebreak_pending must be set");
let original_name = svc.name().as_str().to_owned();
let t1 = t0.advance(500);
svc.handle_timeout(t1).unwrap();
assert!(
svc.name().as_str().contains("-1"),
"service must rename when any peer wins the tiebreak; got: {}",
svc.name().as_str()
);
assert_ne!(svc.name().as_str(), original_name, "name must change");
assert_eq!(svc.state(), ServiceState::Init, "state must reset to Init");
assert!(!svc.tiebreak_pending, "tiebreak_pending must be cleared");
assert_eq!(svc.peer_probes.len(), 0, "peer buckets must be cleared");
let update = svc.poll().expect("ServiceUpdate::Renamed must be queued");
assert!(
update.is_renamed(),
"update must be Renamed, got {:?}",
update
);
}
#[test]
fn srv_wire_form_canonical() {
let mut out_aa: std::vec::Vec<u8> = std::vec::Vec::new();
write_canonical_wire_name("aa.local.", &mut out_aa);
assert_eq!(
out_aa,
std::vec![2u8, b'a', b'a', 5, b'l', b'o', b'c', b'a', b'l', 0],
"wire form for 'aa.local.' must be \\x02aa\\x05local\\x00"
);
let mut out_b: std::vec::Vec<u8> = std::vec::Vec::new();
write_canonical_wire_name("b.local.", &mut out_b);
assert_eq!(
out_b,
std::vec![1u8, b'b', 5, b'l', b'o', b'c', b'a', b'l', 0],
"wire form for 'b.local.' must be \\x01b\\x05local\\x00"
);
assert!(
out_aa > out_b,
"wire-form 'aa.local.' must be > 'b.local.' in byte order (length prefix 2 > 1)"
);
}
#[test]
fn question_does_not_push_out_announce_deadline() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let mut buf4096 = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
loop {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
if matches!(svc.state(), ServiceState::Announcing(1)) {
break;
}
assert!(
now.0 < 10_000,
"should reach Announcing(1) within 10 s; state={:?}",
svc.state()
);
}
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
let announce_deadline_before = svc.lifecycle_deadline;
assert!(
announce_deadline_before.is_some(),
"lifecycle_deadline must be set in Announcing(1)"
);
let min_lifecycle = now.advance(300); assert!(
announce_deadline_before.unwrap() >= min_lifecycle,
"lifecycle_deadline must be > now+200ms so the question-response test is meaningful; \
lifecycle={:?}, min={:?}",
announce_deadline_before,
min_lifecycle
);
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "0.0.0.0:5353".parse().unwrap();
let sq = ServiceQuestion::new(qref, src, 0);
svc.handle_event(ServiceEvent::Question(sq), now);
assert!(
svc.response_deadline.is_some(),
"response_deadline must be set after Question in Announcing(1)"
);
assert_eq!(
svc.lifecycle_deadline, announce_deadline_before,
"lifecycle_deadline must NOT be modified by a Question event"
);
now = now.advance(200);
svc.handle_timeout(now).unwrap();
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Response),
"firing response_deadline must produce Response kind, not Announcement"
);
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
assert_eq!(
svc.lifecycle_deadline, announce_deadline_before,
"lifecycle_deadline must be unchanged after response fires"
);
assert!(
matches!(svc.state(), ServiceState::Announcing(1)),
"state must remain Announcing(1) after response; got {:?}",
svc.state()
);
let original_announce = announce_deadline_before.unwrap();
svc.handle_timeout(original_announce).unwrap();
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Announcement),
"announce must fire at the original lifecycle_deadline, not at a pushed-out time"
);
if let Ok(Some(_)) = svc.poll_transmit(original_announce, &mut buf4096) {
svc.note_transmit_delivered(original_announce);
}
assert_eq!(
svc.state(),
ServiceState::Established,
"after the Announcing(1) announce fires, state must reach Established; got {:?}",
svc.state()
);
}
#[test]
fn srv_kas_hint_suppresses_srv_in_filtered_response() {
use crate::{
event::ServiceQuestion,
wire::{MessageReader, QuestionRef, ResourceType},
};
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl);
let now = drive_to_established(&mut svc);
assert_eq!(svc.state(), ServiceState::Established);
inject_question_to_set_response_deadline(&mut svc, now);
let mut srv_buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut srv_buf,
"myprinter._ipp._tcp.local.",
our_ttl,
0, 0, 631, "host.local.", );
let (srv_ref, _) = Ref::try_parse(&srv_buf, 0).unwrap();
let ka = KnownAnswer::new("0.0.0.0:5353".parse().unwrap(), srv_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka), now);
let srv_hint_count = svc
.kas_hints
.iter()
.filter(|s| {
s.map(|h| h.rtype == crate::wire::ResourceType::Srv)
.unwrap_or(false)
})
.count();
assert_eq!(srv_hint_count, 1, "SRV KAS hint must be stored");
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "myprinter._ipp._tcp.local."
.trim_end_matches('.')
.split('.')
{
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&255u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "0.0.0.0:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0)),
now,
);
assert!(
svc.response_deadline.is_some(),
"response_deadline must be set"
);
let now2 = now.advance(200);
svc.handle_timeout(now2).unwrap();
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Response),
"pending_transmits[0] must be Response after question"
);
let mut out = std::vec![0u8; 4096];
let transmit = svc
.poll_transmit(now2, &mut out)
.unwrap()
.expect("poll_transmit must return Some for pending Response");
let written = &out[..transmit.size()];
let reader =
MessageReader::try_parse(written).expect("response datagram must be a valid DNS message");
let srv_present = reader.answers().any(|rr| {
rr.map(|rec| rec.rtype() == ResourceType::Srv)
.unwrap_or(false)
});
assert!(
!srv_present,
"SRV answer must be suppressed by the matching KAS hint; found SRV in response"
);
}
#[test]
fn kas_wrong_owner_known_answer_does_not_suppress() {
use crate::wire::{MessageReader, ResourceType};
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl); let now = drive_to_established(&mut svc);
inject_question_to_set_response_deadline(&mut svc, now);
let mut a_buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_a_record_ref(&mut a_buf, "_ipp._tcp.local.", our_ttl, [192, 168, 1, 10]);
let (a_ref, _) = Ref::try_parse(&a_buf, 0).unwrap();
let ka = KnownAnswer::new("0.0.0.0:5353".parse().unwrap(), a_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka), now);
let now2 = now.advance(200);
svc.handle_timeout(now2).unwrap();
let mut out = std::vec![0u8; 4096];
let transmit = svc
.poll_transmit(now2, &mut out)
.unwrap()
.expect("a response must be emitted");
let reader = MessageReader::try_parse(&out[..transmit.size()]).unwrap();
let a_present = reader.answers().any(|rr| {
rr.map(|rec| rec.rtype() == ResourceType::A)
.unwrap_or(false)
});
assert!(
a_present,
"the host A must NOT be suppressed by a wrong-owner (_ipp._tcp.local) A known-answer"
);
}
#[test]
fn same_tick_response_and_lifecycle_both_fire() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl);
let mut now = FakeInstant::zero();
let mut buf4096 = std::vec![0u8; 4096];
loop {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
if matches!(svc.state(), ServiceState::Announcing(0)) {
break;
}
assert!(now.0 < 10_000, "should reach Announcing(0) within 10 s");
}
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
let announce_dl = svc
.lifecycle_deadline
.expect("lifecycle_deadline must be set");
{
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "0.0.0.0:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0)),
now,
);
};
svc.response_deadline = Some(announce_dl);
assert_eq!(svc.lifecycle_deadline, Some(announce_dl));
assert_eq!(svc.response_deadline, Some(announce_dl));
let state_before = svc.state();
svc.handle_timeout(announce_dl).unwrap();
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Announcement),
"lifecycle Announcement must be in slot 0 when both deadlines fire at same tick"
);
assert_eq!(
svc.pending_transmits[1],
Some(PendingTransmitKind::Response),
"Response must be in slot 1 when both deadlines fire at same tick"
);
while let Ok(Some(_)) = svc.poll_transmit(announce_dl, &mut buf4096) {
svc.note_transmit_delivered(announce_dl);
}
assert!(
!matches!(svc.state(), ServiceState::Announcing(0)),
"lifecycle must advance once the same-tick announcement is confirmed; \
got {:?} (expected Announcing(1) or Established)",
svc.state()
);
assert!(
svc.response_deadline.is_none(),
"response_deadline must be cleared after firing"
);
assert!(
svc.lifecycle_deadline != Some(announce_dl),
"lifecycle_deadline must be rescheduled after firing"
);
let _ = state_before; }
#[test]
fn same_tick_both_transmits_are_queued_and_drained() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl);
let mut buf4096 = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
loop {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
if matches!(svc.state(), ServiceState::Announcing(0)) {
break;
}
assert!(now.0 < 10_000, "should reach Announcing(0) within 10 s");
}
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
let announce_dl = svc
.lifecycle_deadline
.expect("lifecycle_deadline must be set in Announcing(0)");
{
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "0.0.0.0:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0)),
now,
);
}
svc.response_deadline = Some(announce_dl);
svc.handle_timeout(announce_dl).unwrap();
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Announcement),
"slot 0 must be Announcement when lifecycle fires"
);
assert_eq!(
svc.pending_transmits[1],
Some(PendingTransmitKind::Response),
"slot 1 must be Response when response also fires"
);
let t1 = svc.poll_transmit(announce_dl, &mut buf4096).unwrap();
assert!(
t1.is_some(),
"first poll_transmit must return Some (Announcement)"
);
svc.note_transmit_result(announce_dl, true);
assert!(
svc
.pending_transmits
.contains(&Some(PendingTransmitKind::Response)),
"Response must persist after draining the Announcement"
);
let t2 = svc.poll_transmit(announce_dl, &mut buf4096).unwrap();
assert!(
t2.is_some(),
"second poll_transmit must return Some (Response)"
);
svc.note_transmit_result(announce_dl, true);
let t3 = svc.poll_transmit(announce_dl, &mut buf4096).unwrap();
assert!(
t3.is_none(),
"third poll_transmit must return None (queue empty)"
);
}
#[test]
fn announcement_sets_cache_flush_on_unique_records() {
use crate::wire::{MessageReader, ResourceType};
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl);
let now = drive_to_established(&mut svc);
assert_eq!(svc.state(), ServiceState::Established);
let now_reannounce = now.advance(u64::from(our_ttl) * 1000 + 1000);
svc.handle_timeout(now_reannounce).unwrap();
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Announcement),
"precondition: pending_transmits[0] must be Announcement"
);
let mut out = std::vec![0u8; 4096];
let transmit = svc
.poll_transmit(now_reannounce, &mut out)
.unwrap()
.expect("poll_transmit must return Some for Announcement");
let written = &out[..transmit.size()];
let reader =
MessageReader::try_parse(written).expect("announcement datagram must be a valid DNS message");
for rr_result in reader.answers() {
let rr = rr_result.expect("answer record must parse cleanly");
match rr.rtype() {
ResourceType::Srv | ResourceType::Txt | ResourceType::A | ResourceType::AAAA => {
assert!(
rr.cache_flush(),
"{:?} record must have cache-flush bit set",
rr.rtype()
);
}
ResourceType::Ptr => {
assert!(
!rr.cache_flush(),
"PTR record must NOT have cache-flush bit set (shared record)"
);
}
_ => {}
}
}
}
#[test]
fn tiebreak_always_includes_empty_txt() {
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("myprinter._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("host.local.").unwrap();
let our = ServiceRecords::new(stype, inst.clone(), host.clone(), 631, 120);
assert_eq!(
our.txt_segments().count(),
0,
"precondition: no TXT segments"
);
let peer_src: core::net::SocketAddr = "192.168.1.99:5353".parse().unwrap();
{
let mut buf_srv: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf_srv,
"myprinter._ipp._tcp.local.",
120,
0, 0, 631, "host.local.",
);
let (srv_ref, _) = Ref::try_parse(&buf_srv, 0).unwrap();
let mut buf_txt: std::vec::Vec<u8> = std::vec::Vec::new();
make_txt_record_ref(&mut buf_txt, "myprinter._ipp._tcp.local.", 120, &[]);
let (txt_ref, _) = Ref::try_parse(&buf_txt, 0).unwrap();
let mut peer_probes_a = std::vec![PeerProbe {
src: peer_src,
records: std::vec![],
}];
for rref in &[srv_ref, txt_ref] {
let view = rref.rdata_view().unwrap();
let mut scratch = std::vec::Vec::new();
let canonical = respond::canonical_rdata_for_hash(&view, &mut scratch)
.unwrap()
.to_vec();
peer_probes_a[0].records.push(PeerRecord {
rtype: rref.rtype(),
canonical: canonical.into(),
});
}
let we_lose = compare_rr_sets_we_lose(&our, &peer_probes_a);
assert!(
we_lose,
"Case A: identical SRV(631)+TXT(empty) on both sides must be a tie \
→ we lose (§8.2.1); we_lose={we_lose}"
);
}
{
let mut buf_srv: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut buf_srv,
"myprinter._ipp._tcp.local.",
120,
0, 0, 631, "host.local.",
);
let (srv_ref, _) = Ref::try_parse(&buf_srv, 0).unwrap();
let mut peer_probes_b = std::vec![PeerProbe {
src: peer_src,
records: std::vec![],
}];
let view = srv_ref.rdata_view().unwrap();
let mut scratch = std::vec::Vec::new();
let canonical = respond::canonical_rdata_for_hash(&view, &mut scratch)
.unwrap()
.to_vec();
peer_probes_b[0].records.push(PeerRecord {
rtype: srv_ref.rtype(),
canonical: canonical.into(),
});
let we_lose = compare_rr_sets_we_lose(&our, &peer_probes_b);
assert!(
we_lose,
"Case B: peer set starting with SRV(0x0021) > our set starting with \
TXT(0x0010) → we lose; we_lose={we_lose}"
);
}
}
#[test]
fn poll_transmit_does_not_lose_pending_on_buffer_too_small() {
let mut svc = make_service(120);
let mut buf4096 = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
let mut probe_pending = false;
for _ in 0..20 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if svc
.pending_transmits
.contains(&Some(PendingTransmitKind::Probe))
{
probe_pending = true;
break;
}
if let Ok(Some(_)) = svc.poll_transmit(now, &mut buf4096) {
svc.note_transmit_delivered(now);
}
}
assert!(
probe_pending,
"a Probe transmit must be pending before the test can proceed"
);
let mut small_buf = [0u8; 4];
let r = svc.poll_transmit(now, &mut small_buf);
assert!(
r.is_err(),
"poll_transmit with a 4-byte buffer must return an error; got {:?}",
r
);
assert!(
svc
.pending_transmits
.contains(&Some(PendingTransmitKind::Probe)),
"Probe must still be in pending_transmits after failed encode"
);
let mut big_buf = std::vec![0u8; 1500];
let tx = svc.poll_transmit(now, &mut big_buf).unwrap();
assert!(
tx.is_some(),
"retry with large buffer must produce a transmit"
);
assert!(
!svc
.pending_transmits
.contains(&Some(PendingTransmitKind::Probe)),
"Probe must be removed from queue after successful encode"
);
}
#[test]
fn pending_transmits_is_fifo_after_pop_and_push() {
let mut svc = make_service(120);
svc.push_pending(PendingTransmitKind::Probe);
svc.push_pending(PendingTransmitKind::Announcement);
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Probe),
"first push lands in slot 0"
);
assert_eq!(
svc.pending_transmits[1],
Some(PendingTransmitKind::Announcement),
"second push lands in slot 1"
);
let head = svc.pop_pending();
assert_eq!(
head,
Some(PendingTransmitKind::Probe),
"pop must return the oldest item (Probe)"
);
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Announcement),
"after pop, tail (Announcement) must compact into slot 0"
);
assert_eq!(
svc.pending_transmits[1], None,
"slot 1 must be empty after compaction"
);
svc.push_pending(PendingTransmitKind::Response);
assert_eq!(
svc.pending_transmits[0],
Some(PendingTransmitKind::Announcement),
"existing Announcement keeps its head position"
);
assert_eq!(
svc.pending_transmits[1],
Some(PendingTransmitKind::Response),
"newer Response must queue BEHIND Announcement"
);
assert_eq!(
svc.pop_pending(),
Some(PendingTransmitKind::Announcement),
"FIFO order: Announcement before Response"
);
assert_eq!(
svc.pop_pending(),
Some(PendingTransmitKind::Response),
"FIFO order: Response last"
);
assert_eq!(svc.pop_pending(), None, "queue must be empty after drain");
}
#[test]
fn poll_transmit_blocks_until_confirmation() {
let mut svc = make_service(120);
let mut buf = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
let mut emitted = false;
for _ in 0..10 {
now = now.advance(300);
svc.handle_timeout(now).unwrap();
if svc.poll_transmit(now, &mut buf).unwrap().is_some() {
emitted = true;
break;
}
}
assert!(emitted, "a probe should eventually be emitted");
assert!(
svc.poll_transmit(now, &mut buf).unwrap().is_none(),
"no datagram may be handed out while a prior send is unconfirmed"
);
svc.note_transmit_result(now, true);
assert!(
svc.awaiting_confirm.is_none(),
"confirming must clear the commit token"
);
}
#[test]
fn failed_established_reannounce_retries_within_one_second() {
let mut svc = make_service(120);
let est = drive_to_established(&mut svc);
let due = svc
.poll_timeout()
.expect("Established schedules a re-announce");
assert!(
due.checked_duration_since(est).is_some(),
"the re-announce is scheduled into the future"
);
svc.handle_timeout(due).unwrap();
assert!(
svc
.poll_transmit(due, &mut std::vec![0u8; 4096])
.unwrap()
.is_some(),
"the periodic re-announce must be emitted"
);
svc.note_transmit_result(due, false);
assert!(
matches!(svc.state(), ServiceState::Established),
"a failed re-announce must not leave Established"
);
let next = svc
.poll_timeout()
.expect("a failed re-announce must re-arm");
let gap = next
.checked_duration_since(due)
.expect("the next deadline is at or after the fire time");
assert!(
gap <= core::time::Duration::from_secs(2),
"a failed Established re-announce must retry within ~1 s, got {gap:?}"
);
}
#[test]
fn subtype_ptr_advertised_in_response() {
use crate::wire::{MessageReader, ResourceType};
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("myprinter._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("host.local.").unwrap();
let mut records = ServiceRecords::new(stype, inst, host, 631, 120);
records.add_a(core::net::Ipv4Addr::new(192, 168, 1, 10));
records.add_subtype("_printer").unwrap();
let sub = Name::try_from_str("_printer._sub._ipp._tcp.local.").unwrap();
let mut svc: Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>> =
Service::try_new(
ServiceHandle::from_raw(0),
records,
FakeInstant::zero(),
[0u8; 32],
true,
);
let now = drive_to_established(&mut svc);
inject_question_to_set_response_deadline(&mut svc, now);
let now2 = now.advance(200);
svc.handle_timeout(now2).unwrap();
let mut buf = std::vec![0u8; 4096];
let tx = svc
.poll_transmit(now2, &mut buf)
.unwrap()
.expect("a response must be emitted");
let reader = MessageReader::try_parse(&buf[..tx.size()]).unwrap();
let saw_subtype = reader.answers().any(|rr| {
rr.map(|rec| {
rec.rtype() == ResourceType::Ptr
&& crate::endpoint::names_match(&sub, rec.name())
&& rec.ttl() > 0
})
.unwrap_or(false)
});
assert!(
saw_subtype,
"a response must include the subtype PTR at positive TTL"
);
svc.note_transmit_result(now2, true);
}
#[test]
fn meta_query_is_answered_with_service_type_ptr() {
use crate::{
event::ServiceQuestion,
wire::{MessageReader, QuestionRef, Rdata, ResourceType},
};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_services._dns-sd._udp.local."
.trim_end_matches('.')
.split('.')
{
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "0.0.0.0:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0)),
now,
);
let now2 = now.advance(200);
let mut buf = std::vec![0u8; 4096];
let tx = svc
.poll_transmit(now2, &mut buf)
.unwrap()
.expect("a meta-query reply must be emitted");
let reader = MessageReader::try_parse(&buf[..tx.size()]).unwrap();
let meta = Name::try_from_str("_services._dns-sd._udp.local.").unwrap();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let found = reader.answers().any(|rr| {
let rr = match rr {
Ok(r) => r,
Err(_) => return false,
};
if rr.rtype() != ResourceType::Ptr || !crate::endpoint::names_match(&meta, rr.name()) {
return false;
}
matches!(rr.rdata_view(), Ok(Rdata::Ptr(p)) if crate::endpoint::names_match(&stype, p.target()))
});
assert!(
found,
"the meta-query must be answered with a PTR meta-name → service_type"
);
}
#[test]
fn legacy_subtype_browse_gets_unicast_reply_with_subtype_ptr() {
use crate::{
event::ServiceQuestion,
wire::{MessageReader, QuestionRef, ResourceType},
};
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("myprinter._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("host.local.").unwrap();
let mut records = ServiceRecords::new(stype, inst, host, 631, 120);
records.add_a(core::net::Ipv4Addr::new(192, 168, 1, 10));
records.add_subtype("_printer").unwrap();
let sub = Name::try_from_str("_printer._sub._ipp._tcp.local.").unwrap();
let mut svc: Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>> =
Service::try_new(
ServiceHandle::from_raw(0),
records,
FakeInstant::zero(),
[0u8; 32],
true,
);
let now = drive_to_established(&mut svc);
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_printer._sub._ipp._tcp.local."
.trim_end_matches('.')
.split('.')
{
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let legacy_src: core::net::SocketAddr = "192.0.2.9:40000".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, legacy_src, 0x33)),
now,
);
let mut buf = std::vec![0u8; 4096];
let tx = svc
.poll_transmit(now, &mut buf)
.unwrap()
.expect("a legacy unicast reply must be queued for the subtype browse");
assert_eq!(
tx.dst(),
legacy_src,
"legacy reply is unicast to the querier"
);
let reader = MessageReader::try_parse(&buf[..tx.size()]).unwrap();
let saw_subtype = reader.answers().any(|rr| {
rr.map(|rec| rec.rtype() == ResourceType::Ptr && crate::endpoint::names_match(&sub, rec.name()))
.unwrap_or(false)
});
assert!(saw_subtype, "the legacy reply must carry the subtype PTR");
}
#[test]
fn legacy_meta_query_gets_unicast_meta_ptr() {
use crate::{
event::ServiceQuestion,
wire::{MessageReader, QuestionRef, Rdata, ResourceType},
};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_services._dns-sd._udp.local."
.trim_end_matches('.')
.split('.')
{
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let legacy_src: core::net::SocketAddr = "192.0.2.9:40000".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, legacy_src, 0x44)),
now,
);
let mut buf = std::vec![0u8; 4096];
let tx = svc
.poll_transmit(now, &mut buf)
.unwrap()
.expect("a legacy unicast meta reply must be queued");
assert_eq!(tx.dst(), legacy_src, "legacy meta reply is unicast");
let reader = MessageReader::try_parse(&buf[..tx.size()]).unwrap();
let meta = Name::try_from_str("_services._dns-sd._udp.local.").unwrap();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let found = reader.answers().any(|rr| {
let rr = match rr {
Ok(r) => r,
Err(_) => return false,
};
rr.rtype() == ResourceType::Ptr
&& crate::endpoint::names_match(&meta, rr.name())
&& matches!(rr.rdata_view(), Ok(Rdata::Ptr(p)) if crate::endpoint::names_match(&stype, p.target()))
});
assert!(
found,
"the legacy reply must carry the meta-PTR → service_type"
);
}
fn make_bad_srv_record_ref(buf: &mut std::vec::Vec<u8>, owner_str: &str) {
buf.clear();
for label in owner_str.trim_end_matches('.').split('.') {
buf.push(label.len() as u8);
buf.extend_from_slice(label.as_bytes());
}
buf.push(0u8); buf.extend_from_slice(&33u16.to_be_bytes()); buf.extend_from_slice(&1u16.to_be_bytes()); buf.extend_from_slice(&120u32.to_be_bytes()); let rdata = [0u8, 0, 0, 0, 0, 80, 5];
#[allow(clippy::cast_possible_truncation)]
buf.extend_from_slice(&(rdata.len() as u16).to_be_bytes()); buf.extend_from_slice(&rdata);
}
#[test]
fn probing_conflict_drops_malformed_rdata() {
let mut svc = make_service(120);
svc.handle_timeout(FakeInstant::zero()).unwrap(); let mut buf = std::vec::Vec::new();
make_bad_srv_record_ref(&mut buf, "myprinter._ipp._tcp.local.");
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
let src: core::net::SocketAddr = "192.168.1.88:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(src, rec)),
FakeInstant::zero(),
);
assert!(
svc.peer_probes.is_empty(),
"a malformed probe-conflict record must not create a peer-probe bucket"
);
}
#[test]
fn probing_conflict_caps_distinct_peer_sources() {
let mut svc = make_service(120);
svc.handle_timeout(FakeInstant::zero()).unwrap(); let mut buf = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
"myprinter._ipp._tcp.local.",
120,
0,
0,
9999,
"host.local.",
);
for i in 0..(MAX_PEER_PROBES + 1) {
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
let src: core::net::SocketAddr = std::format!("192.168.1.{}:5353", 100 + i).parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(src, rec)),
FakeInstant::zero(),
);
}
assert_eq!(
svc.peer_probes.len(),
MAX_PEER_PROBES,
"distinct peer-probe sources must be capped at MAX_PEER_PROBES"
);
}
#[test]
fn probing_conflict_caps_records_per_source() {
let mut svc = make_service(120);
svc.handle_timeout(FakeInstant::zero()).unwrap(); let src: core::net::SocketAddr = "192.168.1.77:5353".parse().unwrap();
for port in 0..(MAX_PEER_PROBE_RECORDS + 1) as u16 {
let mut buf = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
"myprinter._ipp._tcp.local.",
120,
0,
0,
1000 + port,
"host.local.",
);
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(src, rec)),
FakeInstant::zero(),
);
}
let bucket = svc.peer_probes.iter().find(|b| b.src == src).unwrap();
assert_eq!(
bucket.records.len(),
MAX_PEER_PROBE_RECORDS,
"per-source peer-probe records must be capped"
);
}
#[test]
fn post_establishment_conflict_drops_non_srv_txt_record() {
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut buf = std::vec::Vec::new();
make_a_record_ref(&mut buf, "host.local.", 120, [10, 0, 0, 9]);
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.50:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, rec)),
now,
);
assert_eq!(svc.state(), ServiceState::Established);
}
#[test]
fn post_establishment_conflict_ignores_identical_srv() {
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut buf = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
"myprinter._ipp._tcp.local.",
120,
0,
0,
631,
"host.local.",
);
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.50:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, rec)),
now,
);
assert_eq!(
svc.state(),
ServiceState::Established,
"identical SRV rdata is not a §9 conflict"
);
}
#[test]
fn post_establishment_conflict_drops_malformed_srv() {
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let mut buf = std::vec::Vec::new();
make_bad_srv_record_ref(&mut buf, "myprinter._ipp._tcp.local.");
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.50:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, rec)),
now,
);
assert_eq!(
svc.state(),
ServiceState::Established,
"a malformed §9 conflict record must be dropped"
);
}
#[test]
fn post_establishment_conflict_is_rate_limited() {
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
svc.last_conflict_reprobe = Some(now);
let mut buf = std::vec::Vec::new();
make_srv_record_ref(
&mut buf,
"myprinter._ipp._tcp.local.",
120,
0,
0,
9999,
"host.local.",
);
let (rec, _) = Ref::try_parse(&buf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.50:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, rec)),
now,
);
assert_eq!(
svc.state(),
ServiceState::Established,
"a rate-limited §9 conflict must not re-probe"
);
}
#[test]
fn service_handle_and_canonical_record_accessors() {
let svc = make_service(120);
let _ = svc.handle();
let _ = svc.our_canonical_record_for(crate::wire::ResourceType::Srv);
let _ = svc.our_canonical_record_for(crate::wire::ResourceType::Txt);
let _ = svc.our_canonical_record_for(crate::wire::ResourceType::A);
}
#[test]
fn canonical_rdata_for_hash_handles_nsec_and_unknown() {
let mut nbuf: std::vec::Vec<u8> = std::vec::Vec::new();
nbuf.extend_from_slice(&[1, b'x', 5, b'l', b'o', b'c', b'a', b'l', 0]); nbuf.extend_from_slice(&47u16.to_be_bytes()); nbuf.extend_from_slice(&1u16.to_be_bytes()); nbuf.extend_from_slice(&120u32.to_be_bytes()); nbuf.extend_from_slice(&12u16.to_be_bytes()); nbuf.extend_from_slice(&[1, b'x', 5, b'l', b'o', b'c', b'a', b'l', 0, 0, 1, 0x40]);
let (nrec, _) = Ref::try_parse(&nbuf, 0).unwrap();
let nview = nrec.rdata_view().unwrap();
let mut scratch = std::vec::Vec::new();
respond::canonical_rdata_for_hash(&nview, &mut scratch).unwrap();
let mut obuf: std::vec::Vec<u8> = std::vec::Vec::new();
obuf.extend_from_slice(&[1, b'x', 5, b'l', b'o', b'c', b'a', b'l', 0]); obuf.extend_from_slice(&999u16.to_be_bytes()); obuf.extend_from_slice(&1u16.to_be_bytes()); obuf.extend_from_slice(&120u32.to_be_bytes()); obuf.extend_from_slice(&3u16.to_be_bytes()); obuf.extend_from_slice(&[0xAA, 0xBB, 0xCC]);
let (orec, _) = Ref::try_parse(&obuf, 0).unwrap();
let oview = orec.rdata_view().unwrap();
let mut scratch2 = std::vec::Vec::new();
respond::canonical_rdata_for_hash(&oview, &mut scratch2).unwrap();
}
#[test]
fn poll_transmit_announcement_surfaces_buffer_too_small() {
let mut svc = make_service(120);
let mut buf = std::vec![0u8; 4096];
let mut now = FakeInstant::zero();
'drive: for _ in 0..20 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
while let Ok(Some(_)) = svc.poll_transmit(now, &mut buf) {
svc.note_transmit_result(now, true);
if matches!(svc.state(), ServiceState::Announcing(0)) {
break 'drive;
}
}
}
assert!(matches!(svc.state(), ServiceState::Announcing(0)));
let mut tiny = std::vec![0u8; 12];
for _ in 0..6 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if let Err(TransmitError::BufferTooSmall(_)) = svc.poll_transmit(now, &mut tiny) {
return;
}
}
panic!("expected the announcement to surface BufferTooSmall on a header-only buffer");
}
#[test]
fn poll_transmit_question_response_surfaces_buffer_too_small() {
let mut svc = make_service(120);
let now0 = drive_to_established(&mut svc);
inject_question_to_set_response_deadline(&mut svc, now0);
let mut tiny = std::vec![0u8; 12];
let mut now = now0;
for _ in 0..10 {
now = now.advance(500);
svc.handle_timeout(now).unwrap();
if let Err(TransmitError::BufferTooSmall(_)) = svc.poll_transmit(now, &mut tiny) {
return;
}
}
panic!("expected the question response to surface BufferTooSmall on a header-only buffer");
}
#[test]
fn withdrawal_snapshot_after_rename_captures_only_current() {
let mut svc = make_service(120);
svc.handle_timeout(FakeInstant::zero()).unwrap(); svc.goodbye.mark_instance();
let host_addr = core::net::Ipv4Addr::new(192, 168, 1, 10); svc.goodbye.a.push(host_addr);
let mut sbuf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut sbuf,
"myprinter._ipp._tcp.local.",
120,
0,
0,
9999,
"host.local.",
);
let (rec, _) = Ref::try_parse(&sbuf, 0).unwrap();
let peer: core::net::SocketAddr = "192.168.1.200:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::ProbeConflict(ProbeConflict::new(peer, rec)),
FakeInstant::zero(),
);
let now = FakeInstant::zero().advance(500);
svc.handle_timeout(now).unwrap();
assert!(
svc.name().as_str().contains("-1"),
"service should have renamed to `myprinter-1`"
);
let RenameGoodbyeHandoff {
records: old_records,
owned: old_owned,
} = svc
.take_rename_goodbye_handoff()
.expect("the rename hands off the OLD announced name's goodbye");
assert_eq!(
old_records.instance().as_str(),
"myprinter._ipp._tcp.local.",
"the handoff carries the OLD instance name"
);
assert!(
old_owned.ptr() && old_owned.srv() && old_owned.txt(),
"the OLD name's advertised instance records are handed off"
);
assert!(
old_owned.a_slice().is_empty() && old_owned.aaaa_slice().is_empty(),
"the OLD-name handoff is instance-only (a rename never withdraws host addrs)"
);
assert!(
!svc.goodbye.any_instance(),
"reset_instance must clear the instance latch after a rename"
);
assert!(
svc.goodbye.a.contains(&host_addr),
"the host A address survives the instance rename"
);
svc.goodbye.mark_instance();
let snap = svc.withdrawal_snapshot();
assert!(
snap.records.instance().as_str().contains("-1"),
"the snapshot's records must be the re-announced `myprinter-1`"
);
assert!(
snap.owned.ptr() && snap.owned.srv() && snap.owned.txt(),
"the CURRENT name's confirmed instance records are captured"
);
assert!(
snap.host_a.contains(&host_addr),
"the CURRENT (still-owned) host A address is captured for withdrawal"
);
assert!(
svc.take_rename_goodbye_handoff().is_none(),
"the rename handoff is consumed exactly once"
);
}
#[test]
fn duplicate_legacy_question_is_deduped() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let legacy_src: core::net::SocketAddr = "192.0.2.9:40000".parse().unwrap();
let inject = |svc: &mut Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>| {
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, legacy_src, 0x4242)),
now,
);
};
inject(&mut svc);
inject(&mut svc);
assert_eq!(
svc.pending_legacy.len(),
1,
"a duplicate legacy question must be deduped"
);
}
#[cfg(feature = "stats")]
#[test]
fn partial_kas_suppression_counter_is_delivery_gated() {
use crate::{
event::{KnownAnswer, ServiceQuestion},
wire::{QuestionRef, Ref},
};
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl);
let now = drive_to_established(&mut svc);
let stats = std::sync::Arc::new(hick_trace::stats::Stats::default());
svc.set_stats(stats.clone());
let inject_srv_hint =
|svc: &mut Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>,
now: FakeInstant| {
inject_question_to_set_response_deadline(svc, now);
let mut srv_buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut srv_buf,
"myprinter._ipp._tcp.local.",
our_ttl, 0,
0,
631,
"host.local.",
);
let (srv_ref, _) = Ref::try_parse(&srv_buf, 0).unwrap();
let ka = KnownAnswer::new("0.0.0.0:5353".parse().unwrap(), srv_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka), now);
};
let inject_any_question =
|svc: &mut Service<FakeInstant, slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>,
now: FakeInstant| {
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "myprinter._ipp._tcp.local."
.trim_end_matches('.')
.split('.')
{
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&255u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "0.0.0.0:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0)),
now,
);
};
inject_srv_hint(&mut svc, now);
inject_any_question(&mut svc, now);
svc.handle_timeout(now.advance(200)).unwrap();
let mut buf = std::vec![0u8; 4096];
let now2 = now.advance(200);
let tx = svc.poll_transmit(now2, &mut buf).unwrap();
assert!(
tx.is_some(),
"poll_transmit must return Some (partial suppression leaves a non-empty response)"
);
let before = stats.snapshot().answers_suppressed_kas;
svc.note_transmit_result(now2, false);
let after_fail = stats.snapshot().answers_suppressed_kas;
assert_eq!(
after_fail, before,
"answers_suppressed_kas must NOT be bumped when delivery=false; \
was {before}, now {after_fail}"
);
inject_srv_hint(&mut svc, now2);
inject_any_question(&mut svc, now2);
svc.handle_timeout(now2.advance(200)).unwrap();
let now3 = now2.advance(200);
let tx2 = svc.poll_transmit(now3, &mut buf).unwrap();
assert!(
tx2.is_some(),
"poll_transmit must return Some in the second cycle"
);
svc.note_transmit_result(now3, true);
let after_ok = stats.snapshot().answers_suppressed_kas;
assert!(
after_ok > after_fail,
"answers_suppressed_kas must be bumped when delivery=true; \
before_delivery={after_fail}, after_delivery={after_ok}"
);
}
#[cfg(feature = "stats")]
#[test]
fn full_kas_suppression_counts_at_suppression_not_delivery() {
use crate::{
event::{KnownAnswer, ServiceQuestion},
wire::{QuestionRef, Ref},
};
let our_ttl: u32 = 120;
let mut svc = make_service(our_ttl);
let now = drive_to_established(&mut svc);
let stats = std::sync::Arc::new(hick_trace::stats::Stats::default());
svc.set_stats(stats.clone());
inject_question_to_set_response_deadline(&mut svc, now);
let mut ptr_buf: std::vec::Vec<u8> = std::vec::Vec::new();
{
for label in "_ipp._tcp.local.".trim_end_matches('.').split('.') {
ptr_buf.push(label.len() as u8);
ptr_buf.extend_from_slice(label.as_bytes());
}
ptr_buf.push(0u8);
ptr_buf.extend_from_slice(&12u16.to_be_bytes()); ptr_buf.extend_from_slice(&1u16.to_be_bytes()); ptr_buf.extend_from_slice(&our_ttl.to_be_bytes());
let mut rdata: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "myprinter._ipp._tcp.local."
.trim_end_matches('.')
.split('.')
{
rdata.push(label.len() as u8);
rdata.extend_from_slice(label.as_bytes());
}
rdata.push(0u8);
#[allow(clippy::cast_possible_truncation)]
ptr_buf.extend_from_slice(&(rdata.len() as u16).to_be_bytes());
ptr_buf.extend_from_slice(&rdata);
}
let (ptr_ref, _) = Ref::try_parse(&ptr_buf, 0).unwrap();
let ka = KnownAnswer::new("0.0.0.0:5353".parse().unwrap(), ptr_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka), now);
{
let mut srv_buf: std::vec::Vec<u8> = std::vec::Vec::new();
make_srv_record_ref(
&mut srv_buf,
"myprinter._ipp._tcp.local.",
our_ttl,
0,
0,
631,
"host.local.",
);
let (srv_ref, _) = Ref::try_parse(&srv_buf, 0).unwrap();
let ka = KnownAnswer::new("0.0.0.0:5353".parse().unwrap(), srv_ref);
svc.handle_event(ServiceEvent::KnownAnswer(ka), now);
}
{
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "myprinter._ipp._tcp.local."
.trim_end_matches('.')
.split('.')
{
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8);
qbuf.extend_from_slice(&255u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "0.0.0.0:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0)),
now,
);
}
let now2 = now.advance(200);
svc.handle_timeout(now2).unwrap();
let before = stats.snapshot().answers_suppressed_kas;
let mut buf = std::vec![0u8; 4096];
let result = svc.poll_transmit(now2, &mut buf).unwrap();
if result.is_none() {
let after = stats.snapshot().answers_suppressed_kas;
assert!(
after > before,
"answers_suppressed_kas must be bumped at suppression for full Ok(None) case; \
before={before}, after={after}"
);
assert!(
svc.awaiting_confirm.is_none(),
"no awaiting_confirm token must exist after Ok(None)"
);
}
}
#[cfg(feature = "stats")]
fn build_meta_question_bytes() -> std::vec::Vec<u8> {
let mut qbuf: std::vec::Vec<u8> = std::vec::Vec::new();
for label in "_services._dns-sd._udp.local."
.trim_end_matches('.')
.split('.')
{
qbuf.push(label.len() as u8);
qbuf.extend_from_slice(label.as_bytes());
}
qbuf.push(0u8); qbuf.extend_from_slice(&12u16.to_be_bytes()); qbuf.extend_from_slice(&1u16.to_be_bytes()); qbuf
}
#[cfg(feature = "stats")]
#[test]
fn multicast_meta_response_counts_responses_tx() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let stats = std::sync::Arc::new(hick_trace::stats::Stats::default());
svc.set_stats(stats.clone());
let qbuf = build_meta_question_bytes();
let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "192.0.2.1:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 0)),
now,
);
let now2 = now.advance(200);
let mut buf = std::vec![0u8; 4096];
let tx = svc.poll_transmit(now2, &mut buf).unwrap();
assert!(
tx.is_some(),
"poll_transmit must produce a meta-response datagram"
);
assert!(
svc.awaiting_confirm.is_some(),
"awaiting_confirm must be set after a meta-response emit"
);
let before = stats.snapshot().responses_tx;
svc.note_transmit_result(now2, false);
let after_fail = stats.snapshot().responses_tx;
assert_eq!(
after_fail, before,
"responses_tx must NOT be bumped on delivery=false (meta); was {before}, now {after_fail}"
);
let qbuf2 = build_meta_question_bytes();
let (qref2, _) = QuestionRef::try_parse(&qbuf2, 0).unwrap();
let src2: core::net::SocketAddr = "192.0.2.2:5353".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref2, src2, 0)),
now2,
);
let now3 = now2.advance(200);
let tx2 = svc.poll_transmit(now3, &mut buf).unwrap();
assert!(
tx2.is_some(),
"poll_transmit must produce a second meta-response datagram"
);
svc.note_transmit_result(now3, true);
let after_ok = stats.snapshot().responses_tx;
assert_eq!(
after_ok,
before + 1,
"responses_tx must be bumped by 1 on delivery=true (meta); expected {}, got {after_ok}",
before + 1
);
}
#[cfg(feature = "stats")]
#[test]
fn legacy_meta_response_counts_responses_tx() {
use crate::{event::ServiceQuestion, wire::QuestionRef};
let mut svc = make_service(120);
let now = drive_to_established(&mut svc);
let stats = std::sync::Arc::new(hick_trace::stats::Stats::default());
svc.set_stats(stats.clone());
let qbuf = build_meta_question_bytes();
let (qref, _) = QuestionRef::try_parse(&qbuf, 0).unwrap();
let src: core::net::SocketAddr = "192.0.2.50:12345".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref, src, 42)),
now,
);
let mut buf = std::vec![0u8; 4096];
let tx = svc.poll_transmit(now, &mut buf).unwrap();
assert!(
tx.is_some(),
"poll_transmit must produce a legacy meta-response datagram"
);
assert!(
svc.awaiting_confirm.is_some(),
"awaiting_confirm must be set after a legacy meta-response emit"
);
let before = stats.snapshot().responses_tx;
svc.note_transmit_result(now, false);
let after_fail = stats.snapshot().responses_tx;
assert_eq!(
after_fail, before,
"responses_tx must NOT be bumped on delivery=false (legacy meta); \
was {before}, now {after_fail}"
);
let qbuf2 = build_meta_question_bytes();
let (qref2, _) = QuestionRef::try_parse(&qbuf2, 0).unwrap();
let src2: core::net::SocketAddr = "192.0.2.51:12345".parse().unwrap();
svc.handle_event(
ServiceEvent::Question(ServiceQuestion::new(qref2, src2, 43)),
now,
);
let tx2 = svc.poll_transmit(now, &mut buf).unwrap();
assert!(
tx2.is_some(),
"poll_transmit must produce a second legacy meta-response datagram"
);
svc.note_transmit_result(now, true);
let after_ok = stats.snapshot().responses_tx;
assert_eq!(
after_ok,
before + 1,
"responses_tx must be bumped by 1 on delivery=true (legacy meta); \
expected {}, got {after_ok}",
before + 1
);
}
#[test]
fn withdrawal_snapshot_of_established_service_owns_its_records() {
let mut svc = make_service(120);
drive_to_established(&mut svc);
let snap = svc.withdrawal_snapshot();
assert!(snap.owned.ptr(), "snapshot must own PTR");
assert!(snap.owned.srv(), "snapshot must own SRV");
assert!(snap.owned.txt(), "snapshot must own TXT");
let expected = core::net::Ipv4Addr::new(192, 168, 1, 10);
assert!(
snap.host_a.contains(&expected),
"snapshot host_a must contain {expected}"
);
}
#[test]
fn withdrawal_snapshot_of_never_announced_service_is_empty() {
let mut svc = make_service(120);
svc.handle_timeout(FakeInstant::zero()).unwrap();
let snap = svc.withdrawal_snapshot();
assert!(!snap.owned.ptr(), "unanounced: PTR must not be owned");
assert!(!snap.owned.srv(), "unannounced: SRV must not be owned");
assert!(!snap.owned.txt(), "unannounced: TXT must not be owned");
assert!(
!snap.owned.subtypes(),
"unannounced: subtypes must not be owned"
);
assert!(snap.host_a.is_empty(), "unannounced: host_a must be empty");
assert!(
snap.host_aaaa.is_empty(),
"unannounced: host_aaaa must be empty"
);
}