use super::*;
use crate::{
cache::CacheEntry,
config::{EndpointConfig, ServiceSpec},
event::{QueryUpdate, ServiceUpdate},
query::Query,
records::ServiceRecords,
transmit::Transmit,
};
use std::{net::Ipv4Addr, time::Instant as StdInstant};
type TestQuery = Query<StdInstant, slab::Slab<CollectedAnswer>, slab::Slab<QueryUpdate>>;
type TestEndp = Endpoint<
StdInstant,
rand::rngs::StdRng,
slab::Slab<CacheEntry<StdInstant>>,
slab::Slab<ServiceRoute>,
slab::Slab<TestQuery>,
slab::Slab<EndpointEventEntry>,
slab::Slab<CollectedAnswer>,
slab::Slab<QueryUpdate>,
>;
fn build_endpoint() -> TestEndp {
use rand::SeedableRng;
let rng = rand::rngs::StdRng::from_seed([99u8; 32]);
TestEndp::try_new(EndpointConfig::new(), rng)
}
#[test]
fn service_route_exposes_advertised_addresses() {
let mut e = build_endpoint();
let st = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("P._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let mut recs = ServiceRecords::new(st, inst, host, 631, 120);
recs.add_a(Ipv4Addr::new(10, 0, 0, 5));
let (handle, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
StdInstant::now(),
)
.unwrap();
let (_, route) = e
.services
.iter()
.find(|(_, r)| r.handle() == handle)
.unwrap();
assert_eq!(route.a_addrs(), [Ipv4Addr::new(10, 0, 0, 5)].as_slice());
assert!(route.aaaa_addrs().is_empty());
assert!(route.aaaa_scopes().is_empty());
}
#[test]
fn endpoint_event_entry_borrows_inner_event() {
let entry = EndpointEventEntry(crate::event::EndpointEvent::CacheExpired);
assert!(matches!(
entry.event(),
crate::event::EndpointEvent::CacheExpired
));
}
#[test]
fn handle_rejects_a_malformed_packet_with_a_parse_error() {
let mut e = build_endpoint();
let now = StdInstant::now();
let src = "192.0.2.1:5353".parse().unwrap();
let local = "192.0.2.20".parse().unwrap();
let res = e.handle(now, src, local, 0, &[0u8], false);
assert!(matches!(res, Err(HandleError::Parse(_))));
}
#[test]
fn note_service_advertised_is_a_noop_for_an_unknown_handle() {
let mut e = build_endpoint();
e.note_service_advertised(ServiceHandle::from_raw(0xDEAD), &[], &[], false);
}
#[test]
fn sibling_retained_addrs_is_empty_for_an_unknown_handle() {
let e = build_endpoint();
assert!(
e.sibling_retained_addrs(ServiceHandle::from_raw(0xBEEF))
.is_empty()
);
}
#[test]
fn advance_after_encode_failure_is_a_noop_for_an_unknown_index() {
let mut e = build_endpoint();
let now = StdInstant::now();
e.advance_after_encode_failure(9999, now, false);
}
#[test]
fn query_delegation_tolerates_unknown_handles() {
let mut e = build_endpoint();
let bogus = QueryHandle::from_raw(0xDEAD);
let now = StdInstant::now();
let mut buf = std::vec![0u8; 512];
assert!(matches!(
e.poll_query_transmit(bogus, now, &mut buf),
Ok(None)
));
e.note_query_transmit_result(bogus, now, true); assert!(e.handle_query_timeout(bogus, now).is_ok());
}
#[test]
fn endpoint_config_accessor_and_empty_poll_transmit() {
let mut e = build_endpoint();
let _ = e.config();
let mut buf = std::vec![0u8; 64];
assert!(matches!(
e.poll_transmit(StdInstant::now(), &mut buf),
Ok(None)
));
}
#[test]
fn src_matches_advertised_checks_route_addresses() {
let mut e = build_endpoint();
let st = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("P._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let mut recs = ServiceRecords::new(st, inst, host, 631, 120);
recs.add_a(Ipv4Addr::new(10, 0, 0, 5));
e.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
StdInstant::now(),
)
.unwrap();
assert!(e.src_matches_advertised(core::net::IpAddr::V4(Ipv4Addr::new(10, 0, 0, 5)), 0));
assert!(!e.src_matches_advertised(core::net::IpAddr::V4(Ipv4Addr::new(10, 0, 0, 99)), 0));
assert!(!e.src_matches_advertised(core::net::IpAddr::V6(core::net::Ipv6Addr::LOCALHOST), 0));
}
#[test]
fn handle_rejects_invalid_opcode_and_response_code() {
let mut e = build_endpoint();
let src: std::net::SocketAddr = "192.168.1.5:5353".parse().unwrap();
let local_ip = core::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10));
let now = StdInstant::now();
let bad_opcode = [0u8, 0, 0x10, 0x00, 0, 0, 0, 0, 0, 0, 0, 0];
assert!(matches!(
e.handle(now, src, local_ip, 0, &bad_opcode, false),
Err(HandleError::InvalidOpcode(_))
));
let bad_rcode = [0u8, 0, 0x00, 0x01, 0, 0, 0, 0, 0, 0, 0, 0];
assert!(matches!(
e.handle(now, src, local_ip, 0, &bad_rcode, false),
Err(HandleError::InvalidResponseCode(_))
));
}
#[test]
fn handle_service_renamed_updates_route_name() {
let mut e = build_endpoint();
let stype = Name::try_from_str("_http._tcp.local.").unwrap();
let inst = Name::try_from_str("WebServer._http._tcp.local.").unwrap();
let host = Name::try_from_str("server.local.").unwrap();
let mut recs = ServiceRecords::new(stype, inst.clone(), host, 80, 120);
recs.add_a(Ipv4Addr::new(10, 0, 0, 1));
let now = StdInstant::now();
let (handle, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let new_name = Name::try_from_str("WebServer-2._http._tcp.local.").unwrap();
e.handle_service_renamed(handle, new_name.clone()).unwrap();
let found = e
.services
.iter()
.find(|(_, route)| route.handle() == handle)
.map(|(_, route)| route.name().clone());
assert_eq!(
found.as_ref().map(Name::as_str),
Some(new_name.as_str()),
"expected route name to be updated to the renamed instance"
);
}
#[test]
fn handle_service_renamed_rejects_duplicate() {
use crate::error::HandleServiceRenamedError;
let mut e = build_endpoint();
let now = StdInstant::now();
let stype1 = Name::try_from_str("_http._tcp.local.").unwrap();
let inst1 = Name::try_from_str("Alpha._http._tcp.local.").unwrap();
let host1 = Name::try_from_str("alpha.local.").unwrap();
let mut recs1 = ServiceRecords::new(stype1, inst1.clone(), host1, 80, 120);
recs1.add_a(Ipv4Addr::new(10, 0, 0, 1));
let (handle1, _svc1) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs1),
now,
)
.unwrap();
let stype2 = Name::try_from_str("_http._tcp.local.").unwrap();
let inst2 = Name::try_from_str("Beta._http._tcp.local.").unwrap();
let host2 = Name::try_from_str("beta.local.").unwrap();
let mut recs2 = ServiceRecords::new(stype2, inst2.clone(), host2, 80, 120);
recs2.add_a(Ipv4Addr::new(10, 0, 0, 2));
let (_handle2, _svc2) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs2),
now,
)
.unwrap();
let result = e.handle_service_renamed(handle1, inst2.clone());
assert!(
result.is_err(),
"expected an error when renaming to an already-registered name"
);
assert!(
matches!(
result.unwrap_err(),
HandleServiceRenamedError::NameAlreadyRegistered(_)
),
"expected NameAlreadyRegistered variant"
);
let found = e
.services
.iter()
.find(|(_, route)| route.handle() == handle1)
.map(|(_, route)| route.name().clone());
assert_eq!(
found.as_ref().map(Name::as_str),
Some(inst1.as_str()),
"handle1 name must remain unchanged after rejected rename"
);
}
#[test]
fn service_route_has_host_field() {
let mut e = build_endpoint();
let st = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("printer-host.local.").unwrap();
let recs = ServiceRecords::new(st, inst, host.clone(), 631, 120);
let now = StdInstant::now();
let _ = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let route = e
.services
.iter()
.next()
.map(|(_, r)| r.clone())
.expect("expected one registered route");
assert_eq!(
route.host().as_str(),
host.as_str(),
"ServiceRoute::host() must reflect the host name from ServiceRecords"
);
}
fn build_query_for_host(buf: &mut [u8; 512], host_str: &str) -> usize {
use crate::wire::{Header, MessageBuilder, ResourceClass, ResourceType};
let hdr = Header::new();
let mut b = MessageBuilder::<'_, 32>::try_new(buf, hdr).unwrap();
let name = Name::try_from_str(host_str).unwrap();
b.push_question(&name, ResourceType::A, ResourceClass::In, false)
.unwrap();
b.finish().unwrap()
}
fn build_probe_authority_for_host(buf: &mut [u8; 512], host_str: &str) -> usize {
use crate::wire::{Header, MessageBuilder};
let hdr = Header::new();
let mut b = MessageBuilder::<'_, 32>::try_new(buf, hdr).unwrap();
let name = Name::try_from_str(host_str).unwrap();
b.push_a_authority(&name, 120, Ipv4Addr::new(192, 168, 1, 99))
.unwrap();
b.finish().unwrap()
}
fn build_probe_srv_authority(buf: &mut [u8; 512], instance_str: &str) -> usize {
use crate::wire::{Header, MessageBuilder};
let hdr = Header::new();
let mut b = MessageBuilder::<'_, 32>::try_new(buf, hdr).unwrap();
let name = Name::try_from_str(instance_str).unwrap();
let target = Name::try_from_str("other-host.local.").unwrap();
b.push_srv_authority(&name, 120, 0, 0, 8080, &target)
.unwrap();
b.finish().unwrap()
}
fn build_endpoint_with_printer() -> (TestEndp, ServiceHandle) {
let mut e = build_endpoint();
let st = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("printer-host.local.").unwrap();
let recs = ServiceRecords::new(st, inst, host, 631, 120);
let now = StdInstant::now();
let (handle, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
(e, handle)
}
#[test]
fn host_question_routes_to_service() {
use crate::event::RouteEvent;
use core::net::SocketAddr;
let (mut e, expected_handle) = build_endpoint_with_printer();
let src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 55), 5353));
let local_ip = core::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10));
let mut buf = [0u8; 512];
let n = build_query_for_host(&mut buf, "printer-host.local.");
let data = &buf[..n];
let mut events = e
.handle(StdInstant::now(), src, local_ip, 0, data, false)
.unwrap();
let ev = events
.next()
.expect("expected at least one routing event")
.expect("expected Ok");
match ev {
RouteEvent::ToService(ts) => {
assert_eq!(
ts.handle(),
expected_handle,
"event must be addressed to the registered service handle"
);
assert!(
ts.event().is_question(),
"event must be ServiceEvent::Question, got {:?}",
ts.event()
);
}
other => panic!("expected RouteEvent::ToService(Question), got {:?}", other),
}
}
#[test]
fn authority_instance_name_routes_as_probe_conflict() {
use crate::event::RouteEvent;
use core::net::SocketAddr;
let (mut e, expected_handle) = build_endpoint_with_printer();
let src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 55), 5353));
let local_ip = core::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10));
let mut buf = [0u8; 512];
let n = build_probe_srv_authority(&mut buf, "Printer._ipp._tcp.local.");
let data = &buf[..n];
let mut events = e
.handle(StdInstant::now(), src, local_ip, 0, data, false)
.unwrap();
let ev = events
.next()
.expect("expected at least one routing event")
.expect("expected Ok");
match ev {
RouteEvent::ToService(ts) => {
assert_eq!(ts.handle(), expected_handle);
assert!(
ts.event().is_probe_conflict(),
"expected ProbeConflict for an instance-name authority record, got {:?}",
ts.event()
);
}
other => panic!(
"expected RouteEvent::ToService(ProbeConflict), got {:?}",
other
),
}
}
#[test]
fn ephemeral_port_authority_record_does_not_trigger_conflict() {
use crate::event::RouteEvent;
use core::net::SocketAddr;
let (mut e, _handle) = build_endpoint_with_printer();
let src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 55), 40000));
let local_ip = core::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10));
let mut buf = [0u8; 512];
let n = build_probe_srv_authority(&mut buf, "Printer._ipp._tcp.local.");
let data = &buf[..n];
let events = e
.handle(StdInstant::now(), src, local_ip, 0, data, false)
.unwrap();
for ev in events {
let ev = ev.expect("expected Ok");
if let RouteEvent::ToService(ts) = ev {
assert!(
!ts.event().is_probe_conflict() && !ts.event().is_host_conflict(),
"ephemeral-port authority record must not route as a conflict, got {:?}",
ts.event()
);
}
}
}
#[test]
fn authority_host_name_routes_as_host_conflict() {
use crate::event::RouteEvent;
use core::net::SocketAddr;
let (mut e, expected_handle) = build_endpoint_with_printer();
let src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 55), 5353));
let local_ip = core::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10));
let mut buf = [0u8; 512];
let n = build_probe_authority_for_host(&mut buf, "printer-host.local.");
let data = &buf[..n];
let mut events = e
.handle(StdInstant::now(), src, local_ip, 0, data, false)
.unwrap();
let ev = events
.next()
.expect("expected at least one routing event")
.expect("expected Ok");
match ev {
RouteEvent::ToService(ts) => {
assert_eq!(ts.handle(), expected_handle);
assert!(
ts.event().is_host_conflict(),
"expected HostConflict for a host-name authority record, got {:?}",
ts.event()
);
}
other => panic!(
"expected RouteEvent::ToService(HostConflict), got {:?}",
other
),
}
}
#[test]
fn txt_owned_by_host_name_does_not_route_host_conflict() {
use crate::{
event::RouteEvent,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder},
};
use core::net::SocketAddr;
let (mut e, _handle) = build_endpoint_with_printer();
let src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 55), 5353));
let local_ip = core::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10));
let mut buf = [0u8; 512];
let host = Name::try_from_str("printer-host.local.").unwrap();
let hdr = Header::new(); let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_txt_authority(&host, 120, [b"k=v".as_slice()])
.unwrap();
let n = b.finish().unwrap();
let events = e
.handle(StdInstant::now(), src, local_ip, 0, &buf[..n], false)
.unwrap();
for ev in events {
if let Ok(RouteEvent::ToService(ts)) = ev {
assert!(
!ts.event().is_host_conflict() && !ts.event().is_probe_conflict(),
"a TXT owned by the host name must not route a conflict, got {:?}",
ts.event()
);
}
}
}
#[test]
fn additional_section_records_are_cached_and_delivered() {
use crate::{
config::QuerySpec,
wire::{ResourceClass, ResourceType},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
let h = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::A), now)
.unwrap();
let mut msg: std::vec::Vec<u8> = std::vec::Vec::new();
msg.extend_from_slice(&[0, 0, 0x84, 0x00, 0, 0, 0, 0, 0, 0, 0, 1]);
msg.extend_from_slice(&[
7, b'p', b'r', b'i', b'n', b't', b'e', b'r', 5, b'l', b'o', b'c', b'a', b'l', 0,
]);
msg.extend_from_slice(&1u16.to_be_bytes()); msg.extend_from_slice(&1u16.to_be_bytes()); msg.extend_from_slice(&120u32.to_be_bytes()); msg.extend_from_slice(&4u16.to_be_bytes()); msg.extend_from_slice(&[10, 0, 0, 7]);
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let to_query = e
.handle(now, src, local_ip, 0, &msg, false)
.unwrap()
.filter(|r| matches!(r, Ok(ev) if ev.is_to_query()))
.count();
assert!(
to_query >= 1,
"additional-section A must emit a ToQuery for the matching query"
);
let answers: std::vec::Vec<_> = e.collected_answers(h).cloned().collect();
assert_eq!(
answers.len(),
1,
"additional-section A must reach the active query; got {answers:?}"
);
assert!(
e.cache.contains(&qname, ResourceType::A, ResourceClass::In),
"additional-section A must be cached"
);
}
#[test]
fn additional_section_srv_for_instance_routes_probe_conflict() {
use crate::{
event::RouteEvent,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder},
};
use core::net::SocketAddr;
let (mut e, expected) = build_endpoint_with_printer();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
let target = Name::try_from_str("other-host.local.").unwrap();
b.push_srv_answer(&inst, 120, 0, 0, 8080, &target, false)
.unwrap();
let n = b.finish().unwrap();
buf[7] = 0; buf[11] = 1;
let src: SocketAddr = "192.168.1.55:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let saw_conflict = e
.handle(StdInstant::now(), src, local_ip, 0, &buf[..n], false)
.unwrap()
.filter_map(Result::ok)
.any(|ev| {
matches!(ev, RouteEvent::ToService(ts) if ts.handle() == expected && ts.event().is_probe_conflict())
});
assert!(
saw_conflict,
"an SRV for our instance name in the ADDITIONAL section must route a ProbeConflict"
);
}
#[test]
fn additional_conflict_not_replayed_across_query_events() {
use crate::{
config::QuerySpec,
event::RouteEvent,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType},
};
use core::net::SocketAddr;
let (mut e, _h) = build_endpoint_with_printer();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let now = StdInstant::now();
let _q1 = e
.try_start_query(QuerySpec::new(inst.clone(), ResourceType::Any), now)
.unwrap();
let _q2 = e
.try_start_query(QuerySpec::new(inst.clone(), ResourceType::Any), now)
.unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
let target = Name::try_from_str("other-host.local.").unwrap();
b.push_srv_answer(&inst, 120, 0, 0, 8080, &target, false)
.unwrap();
let n = b.finish().unwrap();
buf[7] = 0; buf[11] = 1;
let src: SocketAddr = "192.168.1.55:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let mut conflicts = 0usize;
let mut to_query = 0usize;
for ev in e.handle(now, src, local_ip, 0, &buf[..n], false).unwrap() {
match ev.unwrap() {
RouteEvent::ToService(ts) if ts.event().is_probe_conflict() => conflicts += 1,
RouteEvent::ToQuery(_) => to_query += 1,
_ => {}
}
}
assert_eq!(
conflicts, 1,
"the conflict must fire exactly once, not replay per query"
);
assert_eq!(
to_query, 2,
"both active queries must receive the additional SRV"
);
}
#[test]
fn non_in_class_record_does_not_route_conflict() {
use crate::event::RouteEvent;
use core::net::SocketAddr;
let (mut e, _h) = build_endpoint_with_printer();
let mut msg: std::vec::Vec<u8> = std::vec::Vec::new();
msg.extend_from_slice(&[0, 0, 0x84, 0x00, 0, 0, 0, 1, 0, 0, 0, 0]); msg.extend_from_slice(&[
7, b'P', b'r', b'i', b'n', b't', b'e', b'r', 4, b'_', b'i', b'p', b'p', 4, b'_', b't', b'c',
b'p', 5, b'l', b'o', b'c', b'a', b'l', 0,
]);
msg.extend_from_slice(&33u16.to_be_bytes()); msg.extend_from_slice(&255u16.to_be_bytes()); msg.extend_from_slice(&120u32.to_be_bytes()); msg.extend_from_slice(&15u16.to_be_bytes()); msg.extend_from_slice(&[0, 0, 0, 0, 0x1F, 0x90]); msg.extend_from_slice(&[1, b'x', 5, b'l', b'o', b'c', b'a', b'l', 0]);
let src: SocketAddr = "192.168.1.55:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
for ev in e
.handle(StdInstant::now(), src, local_ip, 0, &msg, false)
.unwrap()
{
if let Ok(RouteEvent::ToService(ts)) = ev {
assert!(
!ts.event().is_probe_conflict() && !ts.event().is_host_conflict(),
"a non-IN-class record must not route a conflict, got {:?}",
ts.event()
);
}
}
}
#[test]
fn query_answer_for_instance_name_emits_known_answer_only() {
use crate::wire::{
DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceClass, ResourceType,
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let st = Name::try_from_str("_http._tcp.local.").unwrap();
let inst = Name::try_from_str("Alpha._http._tcp.local.").unwrap();
let host = Name::try_from_str("alpha.local.").unwrap();
let mut recs = ServiceRecords::new(st, inst.clone(), host.clone(), 80, 120);
recs.add_a(Ipv4Addr::new(10, 0, 0, 1));
let now = StdInstant::now();
let (_handle, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let mut buf = [0u8; 512];
let header = Header::new(); let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, header).unwrap();
b.push_question(&inst, ResourceType::Any, ResourceClass::In, true)
.unwrap();
b.push_a_answer(&inst, 120, Ipv4Addr::new(10, 0, 0, 2), false)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.99:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.collect();
for ev in &events {
if let RouteEvent::ToService(ts) = ev {
assert!(
!ts.event().is_probe_conflict(),
"QR=0 answer-section MUST NOT emit ProbeConflict; got {events:?}"
);
}
}
let kas_count = events
.iter()
.filter(|ev| matches!(ev, RouteEvent::ToService(ts) if ts.event().is_known_answer()))
.count();
assert!(
kas_count >= 1,
"at least one KnownAnswer must fire for the instance-name match; got {events:?}"
);
}
#[test]
fn qr0_answer_for_host_name_emits_host_conflict_not_probe_conflict() {
use crate::wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder};
use core::net::SocketAddr;
let mut e = build_endpoint();
let st = Name::try_from_str("_http._tcp.local.").unwrap();
let inst = Name::try_from_str("Alpha._http._tcp.local.").unwrap();
let host = Name::try_from_str("alpha.local.").unwrap();
let mut recs = ServiceRecords::new(st, inst.clone(), host.clone(), 80, 120);
recs.add_a(Ipv4Addr::new(10, 0, 0, 1));
let now = StdInstant::now();
let (expected_handle, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let mut buf = [0u8; 512];
let header = Header::new(); let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, header).unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 2), false)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.99:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.collect();
for ev in &events {
if let RouteEvent::ToService(ts) = ev {
assert!(
!ts.event().is_host_conflict() && !ts.event().is_probe_conflict(),
"QR=0 answer-section MUST NOT emit conflict events; got {events:?}"
);
assert_eq!(
ts.handle(),
expected_handle,
"event must target the registered service"
);
}
}
let kas_count = events
.iter()
.filter(|ev| matches!(ev, RouteEvent::ToService(ts) if ts.event().is_known_answer()))
.count();
assert!(
kas_count >= 1,
"at least one KnownAnswer must fire for the host-name match; got {events:?}"
);
}
#[test]
fn qr0_answer_does_not_populate_query() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let qname = Name::try_from_str("myhost.local.").unwrap();
let now = StdInstant::now();
let spec = QuerySpec::new(qname.clone(), ResourceType::A);
let _qhandle = e.try_start_query(spec, now).unwrap();
let mut buf = [0u8; 512];
let header = Header::new(); let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, header).unwrap();
b.push_a_answer(&qname, 120, Ipv4Addr::new(10, 0, 0, 9), false)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events = e.handle(now, src, local_ip, 0, pkt, false).unwrap();
for ev in events {
let ev = ev.unwrap();
assert!(
!matches!(ev, RouteEvent::ToQuery(ref tq) if matches!(tq.event(), QueryEvent::Answer(_))),
"QR=0 answer records must NOT produce QueryEvent::Answer; got: {:?}",
ev
);
}
}
#[test]
fn duplicate_qm_question_suppresses_planned_query() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceClass, ResourceType},
};
use core::net::SocketAddr;
let qname = Name::try_from_str("_ipp._tcp.local.").unwrap();
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
{
let mut e = build_endpoint();
let now = StdInstant::now();
let h = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::Ptr), now)
.unwrap();
let mut buf = [0u8; 512];
assert!(
e.poll_query_transmit(h, now, &mut buf).unwrap().is_some(),
"control: a started query transmits when no duplicate is seen"
);
}
let mut e = build_endpoint();
let now = StdInstant::now();
let h = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::Ptr), now)
.unwrap();
let mut qbuf = [0u8; 512];
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut qbuf, Header::new()).unwrap(); b.push_question(&qname, ResourceType::Ptr, ResourceClass::In, false) .unwrap();
let n = b.finish().unwrap();
let _ = e.handle(now, src, local_ip, 0, &qbuf[..n], false).unwrap();
let mut buf = [0u8; 512];
assert!(
e.poll_query_transmit(h, now, &mut buf).unwrap().is_none(),
"§7.3: observing a duplicate QM question must suppress our planned query"
);
assert!(
e.poll_query_timeout(h).is_some(),
"§7.3: the suppressed query is deferred (rescheduled), not retired"
);
}
#[test]
fn qu_duplicate_question_does_not_suppress_query() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceClass, ResourceType},
};
use core::net::SocketAddr;
let qname = Name::try_from_str("_ipp._tcp.local.").unwrap();
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let mut e = build_endpoint();
let now = StdInstant::now();
let h = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::Ptr), now)
.unwrap();
let mut qbuf = [0u8; 512];
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut qbuf, Header::new()).unwrap();
b.push_question(&qname, ResourceType::Ptr, ResourceClass::In, true) .unwrap();
let n = b.finish().unwrap();
let _ = e.handle(now, src, local_ip, 0, &qbuf[..n], false).unwrap();
let mut buf = [0u8; 512];
assert!(
e.poll_query_transmit(h, now, &mut buf).unwrap().is_some(),
"§7.3: a QU duplicate must NOT suppress our query (it elicits no multicast answer)"
);
}
#[test]
fn legacy_source_duplicate_does_not_suppress_query() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceClass, ResourceType},
};
use core::net::SocketAddr;
let qname = Name::try_from_str("_ipp._tcp.local.").unwrap();
let legacy_src: SocketAddr = "192.168.1.77:40000".parse().unwrap(); let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let mut e = build_endpoint();
let now = StdInstant::now();
let h = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::Ptr), now)
.unwrap();
let mut qbuf = [0u8; 512];
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut qbuf, Header::new()).unwrap();
b.push_question(&qname, ResourceType::Ptr, ResourceClass::In, false) .unwrap();
let n = b.finish().unwrap();
let _ = e
.handle(now, legacy_src, local_ip, 0, &qbuf[..n], false)
.unwrap();
let mut buf = [0u8; 512];
assert!(
e.poll_query_transmit(h, now, &mut buf).unwrap().is_some(),
"§7.3: a legacy-source (non-5353) duplicate must NOT suppress our query"
);
}
#[test]
fn repeated_duplicate_questions_do_not_stall_query_forever() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceClass, ResourceType},
};
use core::net::SocketAddr;
let qname = Name::try_from_str("_ipp._tcp.local.").unwrap();
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let mut e = build_endpoint();
let mut now = StdInstant::now();
let h = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::Ptr), now)
.unwrap();
let mut qbuf = [0u8; 512];
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut qbuf, Header::new()).unwrap();
b.push_question(&qname, ResourceType::Ptr, ResourceClass::In, false)
.unwrap();
let n = b.finish().unwrap();
let mut buf = [0u8; 512];
let mut retired = false;
for _ in 0..32 {
let _ = e.handle(now, src, local_ip, 0, &qbuf[..n], false).unwrap();
assert!(
e.poll_query_transmit(h, now, &mut buf).unwrap().is_none(),
"each duplicate suppresses the planned transmit"
);
match e.poll_query_timeout(h) {
Some(due) => {
now = due;
e.handle_query_timeout(h, now).unwrap();
}
None => {
retired = true;
break;
}
}
}
assert!(
retired,
"§7.3: a continuously-duplicated query must retire via the retry budget, not defer forever"
);
}
#[test]
fn duplicate_suppresses_due_retry_independent_of_driver_order() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceClass, ResourceType},
};
use core::net::SocketAddr;
let qname = Name::try_from_str("_ipp._tcp.local.").unwrap();
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let mut e = build_endpoint();
let now = StdInstant::now();
let h = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::Ptr), now)
.unwrap();
let mut buf = [0u8; 512];
assert!(e.poll_query_transmit(h, now, &mut buf).unwrap().is_some());
e.note_query_transmit_result(h, now, true);
let t1 = e
.poll_query_timeout(h)
.expect("a retransmit must be scheduled");
let mut qbuf = [0u8; 512];
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut qbuf, Header::new()).unwrap();
b.push_question(&qname, ResourceType::Ptr, ResourceClass::In, false)
.unwrap();
let n = b.finish().unwrap();
let _ = e.handle(t1, src, local_ip, 0, &qbuf[..n], false).unwrap();
let t2 = e
.poll_query_timeout(h)
.expect("query still active, retry rescheduled");
assert!(
t2 > t1,
"§7.3: a duplicate at a due retry must consume the slot and defer it"
);
e.handle_query_timeout(h, t1).unwrap();
assert!(
e.poll_query_transmit(h, t1, &mut buf).unwrap().is_none(),
"§7.3: no redundant transmit after the due slot was suppressed"
);
}
#[test]
fn self_packet_does_not_route_as_probe_conflict() {
use crate::event::RouteEvent;
use core::net::SocketAddr;
let (mut e, _expected_handle) = build_endpoint_with_printer();
let local_ip: core::net::IpAddr = core::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10));
let mut buf = [0u8; 512];
let n = build_probe_srv_authority(&mut buf, "Printer._ipp._tcp.local.");
let data = &buf[..n];
let now = StdInstant::now();
let self_src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 10), 5353));
let mut self_events = e.handle(now, self_src, local_ip, 0, data, true).unwrap();
assert!(
self_events.next().is_none(),
"self-packet (caller_is_self = true) must yield zero routing events"
);
let peer_src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 55), 5353));
let mut peer_events = e
.handle(StdInstant::now(), peer_src, local_ip, 0, data, false)
.unwrap();
let ev = peer_events
.next()
.expect("control: foreign-source probe MUST still produce a routing event")
.expect("control: routing event must be Ok");
match ev {
RouteEvent::ToService(ts) => assert!(
ts.event().is_probe_conflict(),
"control: foreign-source probe must still emit ProbeConflict; got {:?}",
ts.event()
),
other => panic!(
"control: expected RouteEvent::ToService(ProbeConflict), got {:?}",
other
),
}
}
#[test]
fn self_packet_does_not_populate_cache() {
use crate::wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let local_ip: core::net::IpAddr = core::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10));
let observed = Name::try_from_str("printer.local.").unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&observed, 120, Ipv4Addr::new(10, 0, 0, 9), false)
.unwrap();
let n = b.finish().unwrap();
let data = &buf[..n];
let self_src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 10), 5353));
let _ = e.handle(now, self_src, local_ip, 0, data, true).unwrap();
assert!(
!e.cache
.contains(&observed, ResourceType::A, ResourceClass::In),
"self-packet must not populate cache; cache contained {:?}",
observed.as_str()
);
let peer_src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 55), 5353));
let _ = e.handle(now, peer_src, local_ip, 0, data, false).unwrap();
assert!(
e.cache
.contains(&observed, ResourceType::A, ResourceClass::In),
"control: foreign-source response must populate the cache"
);
}
#[test]
fn cache_goodbye_matches_differently_encoded_and_cased_ptr() {
use crate::wire::{ResourceClass, ResourceType};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let local_ip: core::net::IpAddr = core::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10));
let src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 55), 5353));
let owner = Name::try_from_str("svc.local.").unwrap();
let header_an1 = [0u8, 0, 0x84, 0x00, 0, 0, 0, 1, 0, 0, 0, 0];
let owner_wire = [3u8, b's', b'v', b'c', 5, b'l', b'o', b'c', b'a', b'l', 0];
let mut insert = std::vec::Vec::new();
insert.extend_from_slice(&header_an1);
insert.extend_from_slice(&owner_wire);
insert.extend_from_slice(&12u16.to_be_bytes()); insert.extend_from_slice(&1u16.to_be_bytes()); insert.extend_from_slice(&120u32.to_be_bytes()); insert.extend_from_slice(&7u16.to_be_bytes()); insert.extend_from_slice(&[4, b'i', b'n', b's', b't', 0xC0, 0x0C]);
let _ = e.handle(now, src, local_ip, 0, &insert, false).unwrap();
assert!(
e.cache
.contains(&owner, ResourceType::Ptr, ResourceClass::In),
"compressed-target PTR response must populate the cache"
);
let mut goodbye = std::vec::Vec::new();
goodbye.extend_from_slice(&header_an1);
goodbye.extend_from_slice(&owner_wire);
goodbye.extend_from_slice(&12u16.to_be_bytes()); goodbye.extend_from_slice(&1u16.to_be_bytes()); goodbye.extend_from_slice(&0u32.to_be_bytes()); goodbye.extend_from_slice(&16u16.to_be_bytes()); goodbye.extend_from_slice(&[
4, b'I', b'N', b'S', b'T', 3, b'S', b'V', b'C', 5, b'L', b'O', b'C', b'A', b'L', 0,
]);
let _ = e.handle(now, src, local_ip, 0, &goodbye, false).unwrap();
let after_rescue = now + core::time::Duration::from_secs(2);
e.cache.sweep_expired(after_rescue);
assert!(
!e.cache
.contains(&owner, ResourceType::Ptr, ResourceClass::In),
"a differently-encoded/-cased TTL=0 goodbye must match and expire the cached PTR within the §10.1 rescue window"
);
}
#[test]
fn ipv6_self_packet_detected_via_advertised_aaaa() {
use crate::{
event::RouteEvent,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder},
};
use core::net::{Ipv6Addr, SocketAddr};
use rand::SeedableRng;
let rng = rand::rngs::StdRng::from_seed([99u8; 32]);
let mut e = TestEndp::try_new(
EndpointConfig::new().with_trust_advertised_src_as_self(true),
rng,
);
let st = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("printer-host.local.").unwrap();
let mut recs = ServiceRecords::new(st, inst.clone(), host, 631, 120);
let our_v6 = Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 1);
recs.add_aaaa(our_v6);
let now = StdInstant::now();
let (_handle, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let mut buf = [0u8; 512];
let hdr = Header::new();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_srv_authority(
&inst,
120,
0,
0,
8080,
&Name::try_from_str("other-host.local.").unwrap(),
)
.unwrap();
let n = b.finish().unwrap();
let data = &buf[..n];
let local_ip: core::net::IpAddr =
core::net::IpAddr::V6(Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0x00fb));
let self_src: SocketAddr = SocketAddr::from((our_v6, 5353));
let mut self_events = e.handle(now, self_src, local_ip, 0, data, false).unwrap();
assert!(
self_events.next().is_none(),
"IPv6 self-packet (src ∈ advertised AAAA) must yield zero routing events; \
local_ip == ff02::fb cannot detect this, so the membership branch must catch it"
);
let peer_v6 = Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 0x0099);
let peer_src: SocketAddr = SocketAddr::from((peer_v6, 5353));
let mut peer_events = e.handle(now, peer_src, local_ip, 0, data, false).unwrap();
let ev = peer_events
.next()
.expect("control: foreign IPv6 probe MUST still produce a routing event")
.expect("control: routing event must be Ok");
match ev {
RouteEvent::ToService(ts) => assert!(
ts.event().is_probe_conflict(),
"control: foreign IPv6 probe must still emit ProbeConflict; got {:?}",
ts.event()
),
other => panic!(
"control: expected RouteEvent::ToService(ProbeConflict), got {:?}",
other
),
}
}
#[test]
fn poll_query_terminal_then_cancel_no_leak() {
use crate::{config::QuerySpec, event::QueryUpdate, wire::ResourceType};
use core::time::Duration;
let mut e = build_endpoint();
let mut now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
for i in 0..1024u32 {
let spec =
QuerySpec::new(qname.clone(), ResourceType::A).with_timeout(Duration::from_millis(100));
let qhandle = e.try_start_query(spec, now).unwrap_or_else(|err| {
panic!(
"try_start_query #{i} must succeed when previous queries are cancelled; \
got {err:?}"
)
});
assert_eq!(
e.queries.len(),
1,
"queries pool len must be 1 after start #{i}, before cancel"
);
now = now.checked_add(Duration::from_millis(200)).unwrap();
e.handle_query_timeout(qhandle, now).unwrap();
let update = e.poll_query(qhandle);
assert!(
matches!(update, Some(QueryUpdate::Timeout | QueryUpdate::Done)),
"poll_query must return Some(Timeout|Done) after deadline; got {update:?}"
);
assert_eq!(
e.queries.len(),
1,
"queries pool len must remain 1 after terminal poll_query #{i} \
(no auto-prune; caller must explicitly cancel)"
);
assert!(
e.poll_query(qhandle).is_none(),
"subsequent poll_query after terminal must return None (latched)"
);
e.cancel_query(qhandle).unwrap();
assert_eq!(
e.queries.len(),
0,
"queries pool len must be 0 after cancel #{i}"
);
}
}
#[test]
fn collected_answers_survive_terminal_until_cancel() {
use crate::{
config::QuerySpec,
event::QueryUpdate,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType},
};
use core::{net::SocketAddr, time::Duration};
let mut e = build_endpoint();
let mut now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
let spec =
QuerySpec::new(qname.clone(), ResourceType::A).with_timeout(Duration::from_millis(100));
let h = e.try_start_query(spec, now).unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
let addr = Ipv4Addr::new(10, 0, 0, 7);
b.push_a_answer(&qname, 120, addr, false).unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let _ = e.handle(now, src, local_ip, 0, pkt, false).unwrap().count();
let answers_before: std::vec::Vec<_> = e.collected_answers(h).cloned().collect();
assert_eq!(
answers_before.len(),
1,
"answer must land in collected_answers; got {answers_before:?}"
);
now = now.checked_add(Duration::from_millis(200)).unwrap();
e.handle_query_timeout(h, now).unwrap();
let update = e.poll_query(h);
assert!(
matches!(update, Some(QueryUpdate::Timeout | QueryUpdate::Done)),
"poll_query must return terminal; got {update:?}"
);
let answers_after: std::vec::Vec<_> = e.collected_answers(h).cloned().collect();
assert_eq!(
answers_after.len(),
1,
"collected_answers must survive terminal poll_query; \
caller had no chance to read them before they would have been \
auto-pruned; got {answers_after:?}"
);
assert!(
e.poll_query(h).is_none(),
"second poll_query after terminal must return None (latched)"
);
e.cancel_query(h).unwrap();
assert!(e.collected_answers(h).next().is_none());
}
#[test]
fn dropping_route_events_does_not_lose_query_state() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
let h_a = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::A), now)
.unwrap();
let h_any = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::Any), now)
.unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&qname, 120, Ipv4Addr::new(10, 0, 0, 9), false)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
{
let _events = e.handle(now, src, local_ip, 0, pkt, false).unwrap();
}
let a_answers: std::vec::Vec<_> = e.collected_answers(h_a).cloned().collect();
let any_answers: std::vec::Vec<_> = e.collected_answers(h_any).cloned().collect();
assert_eq!(
a_answers.len(),
1,
"A-query must have the answer applied even with dropped iterator"
);
assert_eq!(
any_answers.len(),
1,
"Any-query must ALSO have the answer applied even with dropped iterator \
(fan-out is no longer dependent on draining the iterator)"
);
}
#[test]
fn pre_poll_terminal_freeze_closes_race() {
use crate::{
config::QuerySpec,
event::QueryUpdate,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType},
};
use core::{net::SocketAddr, time::Duration};
let mut e = build_endpoint();
let mut now = StdInstant::now();
let qn = Name::try_from_str("printer.local.").unwrap();
let spec = QuerySpec::new(qn.clone(), ResourceType::A).with_timeout(Duration::from_millis(100));
let h = e.try_start_query(spec, now).unwrap();
now = now.checked_add(Duration::from_millis(200)).unwrap();
e.handle_query_timeout(h, now).unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&qn, 120, Ipv4Addr::new(10, 0, 0, 7), false)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let to_query_events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.filter_map(|ev| match ev {
RouteEvent::ToQuery(tq) if matches!(tq.event(), QueryEvent::Answer(_)) => Some(tq.handle()),
_ => None,
})
.collect();
assert!(
!to_query_events.contains(&h),
"ToQuery events must NOT fire for is_done query (pre-poll); \
got {to_query_events:?}"
);
assert!(matches!(
e.poll_query(h),
Some(QueryUpdate::Timeout | QueryUpdate::Done)
));
let answers: std::vec::Vec<_> = e.collected_answers(h).cloned().collect();
assert!(
answers.is_empty(),
"collected_answers must be empty — the post-done answer must NOT \
have been applied to the Query; got {answers:?}"
);
e.cancel_query(h).unwrap();
}
#[test]
fn query_ignores_ttl_zero_goodbye_records() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let qn = Name::try_from_str("printer.local.").unwrap();
let spec = QuerySpec::new(qn.clone(), ResourceType::A);
let h = e.try_start_query(spec, now).unwrap();
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
{
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&qn, 120, Ipv4Addr::new(10, 0, 0, 7), false)
.unwrap();
let n = b.finish().unwrap();
let _ = e
.handle(now, src, local_ip, 0, &buf[..n], false)
.unwrap()
.count();
}
assert_eq!(
e.collected_answers(h).count(),
1,
"live (TTL=120) answer must be collected"
);
{
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&qn, 0, Ipv4Addr::new(10, 0, 0, 99), false)
.unwrap();
let n = b.finish().unwrap();
let _ = e
.handle(now, src, local_ip, 0, &buf[..n], false)
.unwrap()
.count();
}
let answers: std::vec::Vec<_> = e.collected_answers(h).cloned().collect();
assert_eq!(
answers.len(),
1,
"TTL=0 goodbye record must NOT be collected; \
prior live answer must remain intact. Got: {answers:?}"
);
e.cancel_query(h).unwrap();
}
#[test]
fn qr0_ptr_known_answer_fans_out_to_all_same_type_services() {
use crate::{
config::ServiceSpec,
records::ServiceRecords,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let mut handles = std::vec::Vec::new();
for inst_label in ["Alpha", "Beta", "Gamma"] {
let inst_str = std::format!("{inst_label}._ipp._tcp.local.");
let inst = Name::try_from_str(&inst_str).unwrap();
let host_str = std::format!("{}-host.local.", inst_label.to_ascii_lowercase());
let host = Name::try_from_str(&host_str).unwrap();
let recs = ServiceRecords::new(stype.clone(), inst, host, 631, 120);
let (h, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
handles.push(h);
}
let mut buf = [0u8; 512];
let header = Header::new();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, header).unwrap();
let beta_inst = Name::try_from_str("Beta._ipp._tcp.local.").unwrap();
b.push_ptr_answer(&stype, 120, &beta_inst).unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.99:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.collect();
let kas_handles: std::vec::Vec<_> = events
.iter()
.filter_map(|ev| match ev {
RouteEvent::ToService(ts) if ts.event().is_known_answer() => Some(ts.handle()),
_ => None,
})
.collect();
for h in &handles {
assert!(
kas_handles.contains(h),
"service {h:?} must receive KnownAnswer for shared-PTR; \
got handles {kas_handles:?}"
);
}
assert_eq!(
kas_handles.len(),
3,
"exactly three KnownAnswer events expected (one per same-type service); \
got {kas_handles:?}"
);
}
#[test]
fn meta_ptr_known_answer_fans_out_to_all_services() {
use crate::{
config::ServiceSpec,
records::ServiceRecords,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let mut handles = std::vec::Vec::new();
for (inst_str, stype_str, host_str) in [
("p._ipp._tcp.local.", "_ipp._tcp.local.", "ph.local."),
("w._http._tcp.local.", "_http._tcp.local.", "wh.local."),
] {
let recs = ServiceRecords::new(
Name::try_from_str(stype_str).unwrap(),
Name::try_from_str(inst_str).unwrap(),
Name::try_from_str(host_str).unwrap(),
631,
120,
);
let (h, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
handles.push(h);
}
let mut buf = [0u8; 512];
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, Header::new()).unwrap();
let meta = Name::try_from_str("_services._dns-sd._udp.local.").unwrap();
let ipp = Name::try_from_str("_ipp._tcp.local.").unwrap();
b.push_ptr_answer(&meta, 120, &ipp).unwrap();
let n = b.finish().unwrap();
let src: SocketAddr = "192.168.1.99:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, &buf[..n], false)
.unwrap()
.map(Result::unwrap)
.collect();
let kas_handles: std::vec::Vec<_> = events
.iter()
.filter_map(|ev| match ev {
RouteEvent::ToService(ts) if ts.event().is_known_answer() => Some(ts.handle()),
_ => None,
})
.collect();
for h in &handles {
assert!(
kas_handles.contains(h),
"meta-PTR known-answer must fan out to service {h:?}; got {kas_handles:?}"
);
}
assert_eq!(
kas_handles.len(),
2,
"one meta KnownAnswer per registered service; got {kas_handles:?}"
);
}
#[test]
fn qr0_ttl_zero_does_not_emit_service_events() {
use crate::{
config::ServiceSpec,
records::ServiceRecords,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let st = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("printer-host.local.").unwrap();
let mut recs = ServiceRecords::new(st, inst.clone(), host.clone(), 631, 120);
recs.add_a(Ipv4Addr::new(10, 0, 0, 1));
let now = StdInstant::now();
let (_h, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let mut buf = [0u8; 512];
let header = Header::new();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, header).unwrap();
b.push_a_answer(&host, 0, Ipv4Addr::new(10, 0, 0, 2), false)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.99:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.collect();
assert!(
events.is_empty(),
"QR=0 TTL=0 record must NOT yield any RouteEvent; got {events:?}"
);
for record_name in [&inst, &Name::try_from_str("_ipp._tcp.local.").unwrap()] {
let mut buf2 = [0u8; 512];
let header = Header::new();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf2, header).unwrap();
b.push_a_answer(record_name, 0, Ipv4Addr::new(10, 0, 0, 3), false)
.unwrap();
let n = b.finish().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, &buf2[..n], false)
.unwrap()
.map(Result::unwrap)
.collect();
assert!(
events.is_empty(),
"QR=0 TTL=0 for {} must NOT yield any RouteEvent; got {events:?}",
record_name.as_str()
);
}
}
#[test]
fn authority_ttl_zero_does_not_emit_conflict_events() {
use crate::{
config::ServiceSpec,
records::ServiceRecords,
wire::{Header, MessageBuilder},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let st = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("printer-host.local.").unwrap();
let recs = ServiceRecords::new(st, inst.clone(), host.clone(), 631, 120);
let now = StdInstant::now();
let (_h, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let mut buf = [0u8; 512];
let hdr = Header::new();
let mut b = MessageBuilder::<'_, 32>::try_new(&mut buf, hdr).unwrap();
b.push_a_authority(&host, 0, Ipv4Addr::new(192, 168, 1, 99))
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.99:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.collect();
assert!(
events.is_empty(),
"TTL=0 authority record (host) must not emit HostConflict; got {events:?}"
);
let mut buf2 = [0u8; 512];
let hdr = Header::new();
let mut b = MessageBuilder::<'_, 32>::try_new(&mut buf2, hdr).unwrap();
b.push_a_authority(&inst, 0, Ipv4Addr::new(192, 168, 1, 99))
.unwrap();
let n = b.finish().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, &buf2[..n], false)
.unwrap()
.map(Result::unwrap)
.collect();
assert!(
events.is_empty(),
"TTL=0 authority record (instance) must not emit ProbeConflict; got {events:?}"
);
}
#[test]
fn cache_flush_within_one_packet_preserves_full_rrset() {
use crate::wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("multihomed.local.").unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 1), true)
.unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 2), true)
.unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 3), true)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let _ = e.handle(now, src, local_ip, 0, pkt, false).unwrap().count();
let count = e
.cache
.count_matching(&host, ResourceType::A, ResourceClass::In);
assert_eq!(
count, 3,
"multi-A RRSet with cache_flush must preserve all 3 entries; got {count}"
);
}
#[test]
fn qr1_answer_for_instance_name_emits_probe_conflict() {
use crate::{
config::ServiceSpec,
records::ServiceRecords,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let st = Name::try_from_str("_http._tcp.local.").unwrap();
let inst = Name::try_from_str("Alpha._http._tcp.local.").unwrap();
let host = Name::try_from_str("alpha.local.").unwrap();
let recs = ServiceRecords::new(st, inst.clone(), host.clone(), 80, 120);
let now = StdInstant::now();
let (_handle, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
let srv_target = Name::try_from_str("other-host.local.").unwrap();
b.push_srv_answer(&inst, 120, 0, 0, 8080, &srv_target, false)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.99:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.collect();
let has_probe_conflict = events
.iter()
.any(|ev| matches!(ev, RouteEvent::ToService(ts) if ts.event().is_probe_conflict()));
assert!(
has_probe_conflict,
"QR=1 answer claiming our instance name must emit ProbeConflict; got {events:?}"
);
}
#[test]
fn qr1_answer_for_host_name_emits_host_conflict() {
use crate::{
config::ServiceSpec,
records::ServiceRecords,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let st = Name::try_from_str("_http._tcp.local.").unwrap();
let inst = Name::try_from_str("Alpha._http._tcp.local.").unwrap();
let host = Name::try_from_str("alpha.local.").unwrap();
let recs = ServiceRecords::new(st, inst.clone(), host.clone(), 80, 120);
let now = StdInstant::now();
let (_handle, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 2), false)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.99:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.collect();
let has_host_conflict = events
.iter()
.any(|ev| matches!(ev, RouteEvent::ToService(ts) if ts.event().is_host_conflict()));
assert!(
has_host_conflict,
"QR=1 answer claiming our host name must emit HostConflict; got {events:?}"
);
}
#[test]
fn qr0_answer_does_not_mutate_cache() {
use crate::wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType};
use core::{net::SocketAddr, time::Duration};
let mut e = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("victim.local.").unwrap();
e.cache
.try_insert(
host.clone(),
ResourceType::A,
ResourceClass::In,
std::vec![10, 0, 0, 1],
Duration::from_secs(120),
now,
false,
)
.unwrap();
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
1
);
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let mut buf = [0u8; 512];
let header = Header::new(); let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, header).unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 99), false)
.unwrap();
let n = b.finish().unwrap();
let _ = e
.handle(now, src, local_ip, 0, &buf[..n], false)
.unwrap()
.count();
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
1,
"QR=0 positive-TTL answer must NOT insert into cache"
);
let mut buf = [0u8; 512];
let header = Header::new();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, header).unwrap();
b.push_a_answer(&host, 0, Ipv4Addr::new(10, 0, 0, 1), false)
.unwrap();
let n = b.finish().unwrap();
let _ = e
.handle(now, src, local_ip, 0, &buf[..n], false)
.unwrap()
.count();
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
1,
"QR=0 TTL=0 answer must NOT delete cached entry"
);
let mut buf = [0u8; 512];
let header = Header::new();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, header).unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 99), true)
.unwrap();
let n = b.finish().unwrap();
let after_grace = now.checked_add(Duration::from_secs(2)).unwrap();
let _ = e
.handle(after_grace, src, local_ip, 0, &buf[..n], false)
.unwrap()
.count();
let after_clamp = after_grace.checked_add(Duration::from_secs(2)).unwrap();
e.cache.sweep_expired(after_clamp);
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
1,
"QR=0 cache_flush must NOT clamp legitimate cached siblings"
);
}
#[test]
fn cache_flush_deferred_expiry_preserves_refreshed_rrset() {
use crate::wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType};
use core::{net::SocketAddr, time::Duration};
let mut e = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("multihomed.local.").unwrap();
let now = now + Duration::from_secs(300);
let long_ago = now.checked_sub(Duration::from_secs(300)).unwrap();
e.cache
.try_insert(
host.clone(),
ResourceType::A,
ResourceClass::In,
std::vec![10, 0, 0, 1],
Duration::from_secs(120),
long_ago,
false,
)
.unwrap();
e.cache
.try_insert(
host.clone(),
ResourceType::A,
ResourceClass::In,
std::vec![10, 0, 0, 2],
Duration::from_secs(120),
long_ago,
false,
)
.unwrap();
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
2
);
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let pkt1_t = now;
{
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 1), true)
.unwrap();
let n = b.finish().unwrap();
let _ = e
.handle(pkt1_t, src, local_ip, 0, &buf[..n], false)
.unwrap()
.count();
}
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
2,
"clamp must NOT remove A2 immediately — only defer its expiry"
);
let pkt2_t = pkt1_t.checked_add(Duration::from_millis(200)).unwrap();
{
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 2), true)
.unwrap();
let n = b.finish().unwrap();
let _ = e
.handle(pkt2_t, src, local_ip, 0, &buf[..n], false)
.unwrap()
.count();
}
let after_clamp = pkt1_t.checked_add(Duration::from_secs(3)).unwrap();
e.cache.sweep_expired(after_clamp);
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
2,
"a refresh burst within the §10.2 grace must preserve the \
full RRSet — both A1 and A2 must survive after sweep"
);
}
#[test]
fn flush_marker_keys_on_rclass_so_mixed_class_does_not_suppress() {
use crate::wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType};
use core::{net::SocketAddr, time::Duration};
let mut e = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("svc.local.").unwrap();
let now = now + Duration::from_secs(300);
let long_ago = now.checked_sub(Duration::from_secs(300)).unwrap();
e.cache
.try_insert(
host.clone(),
ResourceType::A,
ResourceClass::In,
std::vec![10, 0, 0, 1],
Duration::from_secs(120),
long_ago,
false,
)
.unwrap();
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
1
);
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 2), true)
.unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 3), true)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let _ = e.handle(now, src, local_ip, 0, pkt, false).unwrap().count();
let after_clamp = now.checked_add(Duration::from_secs(2)).unwrap();
e.cache.sweep_expired(after_clamp);
let count = e
.cache
.count_matching(&host, ResourceType::A, ResourceClass::In);
assert_eq!(
count, 2,
"old IN sibling must have been clamped + swept; the two \
new IN records from the packet survive. Expected 2 (10.0.0.2 \
and 10.0.0.3); got {count}"
);
}
#[test]
fn cache_class_isolates_in_from_non_in() {
use core::time::Duration;
let mut e = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("svc.local.").unwrap();
e.cache
.try_insert(
host.clone(),
ResourceType::A,
ResourceClass::In,
std::vec![10, 0, 0, 1],
Duration::from_secs(120),
now,
false,
)
.unwrap();
e.cache
.try_insert(
host.clone(),
ResourceType::A,
crate::wire::ResourceClass::Any,
std::vec![10, 0, 0, 1],
Duration::from_secs(120),
now,
false,
)
.unwrap();
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
1
);
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, crate::wire::ResourceClass::Any),
1
);
let after_grace = now.checked_add(Duration::from_secs(2)).unwrap();
e.cache
.try_insert(
host.clone(),
ResourceType::A,
crate::wire::ResourceClass::Any,
std::vec![10, 0, 0, 99],
Duration::from_secs(120),
after_grace,
true,
)
.unwrap();
let after_clamp = after_grace.checked_add(Duration::from_secs(2)).unwrap();
e.cache.sweep_expired(after_clamp);
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
1,
"cache_flush in class ANY must NOT touch IN-class entries"
);
}
#[test]
fn cache_flush_preserves_recent_siblings_across_packets() {
use crate::wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("multihomed.local.").unwrap();
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
{
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 1), true)
.unwrap();
let n = b.finish().unwrap();
let _ = e
.handle(now, src, local_ip, 0, &buf[..n], false)
.unwrap()
.count();
}
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
1
);
let later = now
.checked_add(core::time::Duration::from_millis(100))
.unwrap();
{
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 2), true)
.unwrap();
let n = b.finish().unwrap();
let _ = e
.handle(later, src, local_ip, 0, &buf[..n], false)
.unwrap()
.count();
}
let count = e
.cache
.count_matching(&host, ResourceType::A, ResourceClass::In);
assert_eq!(
count, 2,
"cross-packet cache-flush within §10.2 grace must preserve \
fresh siblings. Expected 2 (both 10.0.0.1 and 10.0.0.2); got {count}"
);
}
#[test]
fn ttl_zero_does_not_consume_flush_marker() {
use crate::wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType};
use core::{net::SocketAddr, time::Duration};
let mut e = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("printer.local.").unwrap();
e.cache
.try_insert(
host.clone(),
ResourceType::A,
ResourceClass::In,
std::vec![10, 0, 0, 1],
Duration::from_secs(120),
now,
false,
)
.unwrap();
e.cache
.try_insert(
host.clone(),
ResourceType::A,
ResourceClass::In,
std::vec![10, 0, 0, 2],
Duration::from_secs(120),
now,
false,
)
.unwrap();
assert_eq!(
e.cache
.count_matching(&host, ResourceType::A, ResourceClass::In),
2
);
let after_grace = now.checked_add(Duration::from_secs(2)).unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&host, 0, Ipv4Addr::new(10, 0, 0, 1), true)
.unwrap();
b.push_a_answer(&host, 120, Ipv4Addr::new(10, 0, 0, 3), true)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let _ = e
.handle(after_grace, src, local_ip, 0, pkt, false)
.unwrap()
.count();
let after_clamp = after_grace.checked_add(Duration::from_secs(2)).unwrap();
e.cache.sweep_expired(after_clamp);
let count = e
.cache
.count_matching(&host, ResourceType::A, ResourceClass::In);
assert_eq!(
count, 1,
"TTL=0 goodbye must not consume the per-packet flush \
marker, so the subsequent positive-TTL cache_flush record must \
still evict the unrelated sibling. Expected 1 (only 10.0.0.3); \
got {count}"
);
}
#[test]
fn malformed_record_does_not_loop_forever() {
use crate::wire::Header;
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let mut buf = [0u8; 32];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
hdr.set_answer_count(1);
let header_len = hdr.write(&mut buf).unwrap();
let pkt = &buf[..header_len];
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events = e.handle(now, src, local_ip, 0, pkt, false).unwrap();
let mut total_polls = 0u32;
let mut error_count = 0u32;
for ev in events {
total_polls = total_polls.saturating_add(1);
if ev.is_err() {
error_count = error_count.saturating_add(1);
}
if total_polls > 10 {
panic!(
"iterator must terminate after parse error; \
seen {error_count} errors in {total_polls} polls without None"
);
}
}
assert!(
error_count <= 3,
"at most one parse error per section (3 sections); got {error_count}"
);
}
#[test]
fn answer_questions_false_suppresses_question_events() {
use crate::{
config::ServiceSpec,
records::ServiceRecords,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType},
};
use core::net::SocketAddr;
use rand::SeedableRng;
let rng = rand::rngs::StdRng::from_seed([7u8; 32]);
let cfg = EndpointConfig::new().with_answer_questions(false);
let mut e = TestEndp::try_new(cfg, rng);
let st = Name::try_from_str("_http._tcp.local.").unwrap();
let inst = Name::try_from_str("WebServer._http._tcp.local.").unwrap();
let host = Name::try_from_str("web.local.").unwrap();
let recs = ServiceRecords::new(st.clone(), inst.clone(), host, 80, 120);
let now = StdInstant::now();
let (_h, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let mut buf = [0u8; 512];
let header = Header::new();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, header).unwrap();
b.push_question(
&inst,
ResourceType::Any,
crate::wire::ResourceClass::In,
false,
)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.collect();
let question_events: std::vec::Vec<_> = events
.iter()
.filter(|ev| matches!(ev, RouteEvent::ToService(ts) if ts.event().is_question()))
.collect();
assert!(
question_events.is_empty(),
"answer_questions=false must suppress ServiceEvent::Question; \
got {question_events:?}"
);
}
#[test]
fn authority_host_conflict_fans_out_to_all_same_host_services() {
use crate::{
config::ServiceSpec,
records::ServiceRecords,
wire::{Header, MessageBuilder},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let host = Name::try_from_str("shared-host.local.").unwrap();
let now = StdInstant::now();
let mut handles = std::vec::Vec::new();
for inst_label in ["A", "B", "C"] {
let inst_str = std::format!("{inst_label}._ipp._tcp.local.");
let st = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str(&inst_str).unwrap();
let recs = ServiceRecords::new(st, inst, host.clone(), 631, 120);
let (h, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
handles.push(h);
}
let mut buf = [0u8; 512];
let hdr = Header::new();
let mut b = MessageBuilder::<'_, 32>::try_new(&mut buf, hdr).unwrap();
b.push_a_authority(&host, 120, Ipv4Addr::new(192, 168, 1, 99))
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.99:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.collect();
let conflict_handles: std::vec::Vec<_> = events
.iter()
.filter_map(|ev| match ev {
RouteEvent::ToService(ts) if ts.event().is_host_conflict() => Some(ts.handle()),
_ => None,
})
.collect();
for h in &handles {
assert!(
conflict_handles.contains(h),
"service {h:?} must receive HostConflict for shared host; \
got handles {conflict_handles:?}"
);
}
assert_eq!(
conflict_handles.len(),
3,
"exactly three HostConflict events expected (one per service); \
got {conflict_handles:?}"
);
}
#[test]
fn qr1_ttl_zero_does_not_emit_to_query_events() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
let h = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::A), now)
.unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&qname, 0, Ipv4Addr::new(10, 0, 0, 7), false)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.99:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.collect();
let to_query_count = events
.iter()
.filter(
|ev| matches!(ev, RouteEvent::ToQuery(tq) if matches!(tq.event(), QueryEvent::Answer(_))),
)
.count();
assert_eq!(
to_query_count, 0,
"QR=1 TTL=0 must NOT emit ToQuery(Answer) events; got events {events:?}"
);
assert_eq!(
e.collected_answers(h).count(),
0,
"TTL=0 must not land in collected_answers"
);
e.cancel_query(h).unwrap();
}
#[test]
fn terminated_query_rejects_late_answers() {
use crate::{
config::QuerySpec,
event::QueryUpdate,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType},
};
use core::{net::SocketAddr, time::Duration};
let mut e = build_endpoint();
let mut now = StdInstant::now();
let qn = Name::try_from_str("printer.local.").unwrap();
let spec = QuerySpec::new(qn.clone(), ResourceType::A).with_timeout(Duration::from_millis(100));
let h = e.try_start_query(spec, now).unwrap();
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let mut buf = [0u8; 512];
let pre_terminal_addr = Ipv4Addr::new(10, 0, 0, 7);
{
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&qn, 120, pre_terminal_addr, false).unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let _ = e.handle(now, src, local_ip, 0, pkt, false).unwrap().count();
}
assert_eq!(
e.collected_answers(h).count(),
1,
"pre-terminal answer must land in collected_answers"
);
now = now.checked_add(Duration::from_millis(200)).unwrap();
e.handle_query_timeout(h, now).unwrap();
assert!(matches!(
e.poll_query(h),
Some(QueryUpdate::Timeout | QueryUpdate::Done)
));
let answers_at_terminal: std::vec::Vec<_> = e.collected_answers(h).cloned().collect();
assert_eq!(answers_at_terminal.len(), 1);
let mut buf2 = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf2, hdr).unwrap();
b.push_a_answer(&qn, 120, Ipv4Addr::new(10, 0, 0, 99), false)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf2[..n];
let events: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, pkt, false)
.unwrap()
.map(Result::unwrap)
.collect();
let to_query_events: std::vec::Vec<_> = events
.iter()
.filter_map(|ev| match ev {
RouteEvent::ToQuery(tq) if matches!(tq.event(), QueryEvent::Answer(_)) => Some(tq.handle()),
_ => None,
})
.collect();
assert!(
!to_query_events.contains(&h),
"terminated query must NOT receive ToQuery(Answer) events; got handles {to_query_events:?}"
);
let answers_after_terminal: std::vec::Vec<_> = e.collected_answers(h).cloned().collect();
assert_eq!(
answers_after_terminal.len(),
1,
"collected_answers must be frozen after terminal; \
got {answers_after_terminal:?}"
);
assert_eq!(
answers_after_terminal[0].rdata_slice(),
&pre_terminal_addr.octets(),
"pre-terminal answer must remain intact (no eviction)"
);
e.cancel_query(h).unwrap();
}
#[test]
fn sweep_terminated_queries_prunes_only_terminated() {
use crate::{config::QuerySpec, event::QueryUpdate, wire::ResourceType};
use core::time::Duration;
let mut e = build_endpoint();
let mut now = StdInstant::now();
let qn = Name::try_from_str("printer.local.").unwrap();
let h_short = e
.try_start_query(
QuerySpec::new(qn.clone(), ResourceType::A).with_timeout(Duration::from_millis(100)),
now,
)
.unwrap();
let h_long = e
.try_start_query(QuerySpec::new(qn.clone(), ResourceType::AAAA), now)
.unwrap();
assert_eq!(e.queries.len(), 2);
assert_eq!(e.sweep_terminated_queries(), 0);
assert_eq!(e.queries.len(), 2);
now = now.checked_add(Duration::from_millis(200)).unwrap();
e.handle_query_timeout(h_short, now).unwrap();
assert!(matches!(
e.poll_query(h_short),
Some(QueryUpdate::Timeout | QueryUpdate::Done)
));
assert_eq!(e.queries.len(), 2, "terminal does not auto-prune");
assert_eq!(e.sweep_terminated_queries(), 1);
assert_eq!(e.queries.len(), 1);
assert!(e.collected_answers(h_short).next().is_none());
e.cancel_query(h_long).unwrap();
}
#[test]
fn cancel_query_removes_route() {
use crate::{config::QuerySpec, error::CancelQueryError, wire::ResourceType};
let mut e = build_endpoint();
let now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
let spec = QuerySpec::new(qname, ResourceType::A);
let h = e.try_start_query(spec, now).unwrap();
assert_eq!(e.queries.len(), 1);
e.cancel_query(h).unwrap();
assert_eq!(e.queries.len(), 0);
let r = e.cancel_query(h);
assert!(
matches!(r, Err(CancelQueryError::QueryNotFound(_))),
"cancel_query on absent handle must return QueryNotFound; got {r:?}"
);
}
#[cfg(feature = "stats")]
#[test]
fn cancel_query_stats_invariant() {
use crate::{config::QuerySpec, wire::ResourceType};
use core::time::Duration;
let check_invariant = |label: &str, snap: &hick_trace::stats::StatsSnapshot| {
assert_eq!(
snap.queries_started,
snap.queries_done + snap.queries_active,
"invariant queries_started == queries_done + queries_active \
violated at '{label}': {snap:?}"
);
};
let mut e = build_endpoint();
let now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
let spec = QuerySpec::new(qname.clone(), ResourceType::A);
let h = e.try_start_query(spec, now).unwrap();
let before = e.stats();
assert_eq!(before.queries_started, 1);
assert_eq!(before.queries_active, 1);
assert_eq!(before.queries_done, 0);
check_invariant("after-start", &before);
e.cancel_query(h).unwrap();
let after_live_cancel = e.stats();
assert_eq!(
after_live_cancel.queries_done, 1,
"live cancel must bump queries_done; got {after_live_cancel:?}"
);
assert_eq!(
after_live_cancel.queries_active, 0,
"live cancel must decrement queries_active; got {after_live_cancel:?}"
);
check_invariant("after-live-cancel", &after_live_cancel);
let mut e2 = build_endpoint();
let mut now2 = StdInstant::now();
let spec2 = QuerySpec::new(qname, ResourceType::A).with_timeout(Duration::from_millis(50));
let h2 = e2.try_start_query(spec2, now2).unwrap();
now2 += Duration::from_millis(100);
e2.handle_query_timeout(h2, now2).unwrap();
let _ = e2.poll_query(h2);
let snap_terminal = e2.stats();
check_invariant("after-terminal", &snap_terminal);
e2.cancel_query(h2).unwrap();
let snap_after_cancel = e2.stats();
assert_eq!(
snap_after_cancel.queries_done, snap_terminal.queries_done,
"cancel-after-terminal must not bump queries_done again; {snap_after_cancel:?}"
);
assert_eq!(
snap_after_cancel.queries_active, snap_terminal.queries_active,
"cancel-after-terminal must not decrement queries_active again; {snap_after_cancel:?}"
);
check_invariant("after-cancel-of-terminal", &snap_after_cancel);
}
#[cfg(feature = "stats")]
#[test]
fn duplicate_questions_suppressed_only_on_real_suppression() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceClass, ResourceType},
};
use core::{
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
let mut e = build_endpoint();
let mut now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
let spec = QuerySpec::new(qname.clone(), ResourceType::A);
let h = e.try_start_query(spec, now).unwrap();
let mut pkt_buf = [0u8; 512];
let hdr = Header::new();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut pkt_buf, hdr).unwrap();
b.push_question(&qname, ResourceType::A, ResourceClass::In, false)
.unwrap();
let n = b.finish().unwrap();
let pkt = pkt_buf[..n].to_vec();
let multicast_ip: IpAddr = IpAddr::V4(Ipv4Addr::new(224, 0, 0, 251));
let peer_src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 2), 5353u16));
let mut tx_buf = std::vec![0u8; 512];
let tx = e.poll_query_transmit(h, now, &mut tx_buf).unwrap();
assert!(
tx.is_some(),
"newly-started query must have an initial transmit pending"
);
{
let mut events = e
.handle(now, peer_src, multicast_ip, 0, &pkt, false)
.unwrap();
while events.next().is_some() {}
}
let snap_awaiting = e.stats();
assert_eq!(
snap_awaiting.duplicate_questions_suppressed, 0,
"(a) no suppression while awaiting send confirm; got {snap_awaiting:?}"
);
e.note_query_transmit_result(h, now, true); now += Duration::from_secs(10); e.handle_query_timeout(h, now).unwrap();
{
let mut events = e
.handle(now, peer_src, multicast_ip, 0, &pkt, false)
.unwrap();
while events.next().is_some() {}
}
let snap_suppressed = e.stats();
assert_eq!(
snap_suppressed.duplicate_questions_suppressed, 1,
"(b) one suppression expected after arming next retry; got {snap_suppressed:?}"
);
}
#[test]
fn ipv6_link_local_self_check_is_interface_scoped() {
use crate::{
event::RouteEvent,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder},
};
use core::net::{Ipv6Addr, SocketAddr};
use rand::SeedableRng;
let rng = rand::rngs::StdRng::from_seed([99u8; 32]);
let mut e = TestEndp::try_new(
EndpointConfig::new().with_trust_advertised_src_as_self(true),
rng,
);
let st = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("printer-host.local.").unwrap();
let mut recs = ServiceRecords::new(st, inst.clone(), host, 631, 120);
let our_v6 = Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 1);
recs.add_aaaa_scoped(our_v6, 2);
let now = StdInstant::now();
let (_handle, _svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let mut buf = [0u8; 512];
let hdr = Header::new();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_srv_authority(
&inst,
120,
0,
0,
8080,
&Name::try_from_str("other-host.local.").unwrap(),
)
.unwrap();
let n = b.finish().unwrap();
let data = &buf[..n];
let local_ip: core::net::IpAddr =
core::net::IpAddr::V6(Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0x00fb));
let self_src: SocketAddr = SocketAddr::from((our_v6, 5353));
let mut self_events = e.handle(now, self_src, local_ip, 2, data, false).unwrap();
assert!(
self_events.next().is_none(),
"link-local from OUR interface (ifindex=2) must be self-suppressed"
);
let mut peer_events = e.handle(now, self_src, local_ip, 3, data, false).unwrap();
let ev = peer_events
.next()
.expect("link-local from a DIFFERENT interface must still produce a routing event")
.expect("event must be Ok");
match ev {
RouteEvent::ToService(ts) => assert!(
ts.event().is_probe_conflict(),
"link-local from ifindex=3 must emit ProbeConflict (not be misclassified \
as self because of bare-address match); got {:?}",
ts.event()
),
other => panic!(
"expected RouteEvent::ToService(ProbeConflict), got {:?}",
other
),
}
}
#[test]
fn response_answer_fans_out_to_type_compatible_queries() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType},
};
use core::net::{Ipv6Addr, SocketAddr};
let mut e = build_endpoint();
let now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
let spec_a = QuerySpec::new(qname.clone(), ResourceType::A);
let h_a = e.try_start_query(spec_a, now).unwrap();
let spec_aaaa = QuerySpec::new(qname.clone(), ResourceType::AAAA);
let h_aaaa = e.try_start_query(spec_aaaa, now).unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
let aaaa = Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1);
b.push_aaaa_answer(&qname, 120, aaaa, false).unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events = e.handle(now, src, local_ip, 0, pkt, false).unwrap();
let mut answer_handles: std::vec::Vec<QueryHandle> = std::vec::Vec::new();
for ev in events {
let ev = ev.unwrap();
if let RouteEvent::ToQuery(tq) = ev
&& let QueryEvent::Answer(_) = tq.event()
{
answer_handles.push(tq.handle());
}
}
assert!(
answer_handles.contains(&h_aaaa),
"AAAA query must receive the AAAA answer; got handles {answer_handles:?}"
);
assert!(
!answer_handles.contains(&h_a),
"A query must NOT receive an AAAA answer (route-level rtype filter); \
got handles {answer_handles:?}"
);
}
#[test]
fn response_answer_fans_out_to_any_and_specific_routes() {
use crate::{
config::QuerySpec,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder, ResourceType},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
let spec_a = QuerySpec::new(qname.clone(), ResourceType::A);
let h_a = e.try_start_query(spec_a, now).unwrap();
let spec_any = QuerySpec::new(qname.clone(), ResourceType::Any);
let h_any = e.try_start_query(spec_any, now).unwrap();
let mut buf = [0u8; 512];
let mut hdr = Header::new();
hdr.flags_mut().set_response();
let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, hdr).unwrap();
b.push_a_answer(&qname, 120, Ipv4Addr::new(10, 0, 0, 9), false)
.unwrap();
let n = b.finish().unwrap();
let pkt = &buf[..n];
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let events = e.handle(now, src, local_ip, 0, pkt, false).unwrap();
let mut answer_handles: std::vec::Vec<QueryHandle> = std::vec::Vec::new();
for ev in events {
let ev = ev.unwrap();
if let RouteEvent::ToQuery(tq) = ev
&& let QueryEvent::Answer(_) = tq.event()
{
answer_handles.push(tq.handle());
}
}
assert!(
answer_handles.contains(&h_a),
"A-specific query must receive the A answer; handles={answer_handles:?}"
);
assert!(
answer_handles.contains(&h_any),
"Any-wildcard query must also receive the A answer; handles={answer_handles:?}"
);
assert_eq!(
answer_handles.len(),
2,
"exactly two ToQuery(Answer) events expected (one per compatible route); \
got {answer_handles:?}"
);
}
#[cfg(feature = "stats")]
#[test]
fn begin_withdrawal_holds_the_name_and_keeps_services_active() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let st = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("printer-host.local.").unwrap();
let recs = ServiceRecords::new(st, inst.clone(), host, 631, 120);
let (handle, mut svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let before = ep.stats().services_active;
let snap = svc.withdrawal_snapshot();
ep.begin_withdrawal(handle, snap, now);
assert_eq!(
ep.stats().services_active,
before,
"begin_withdrawal must not decrement services_active"
);
let st2 = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst2 = inst; let host2 = Name::try_from_str("printer-host.local.").unwrap();
let recs2 = ServiceRecords::new(st2, inst2, host2, 631, 120);
let result = ep.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs2),
now,
);
assert!(
matches!(result, Err(RegisterServiceError::NameAlreadyRegistered(_))),
"same-name re-registration must be rejected while withdrawal route is held"
);
}
#[test]
fn begin_withdrawal_unknown_handle_is_noop() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let st = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("Ghost._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("ghost-host.local.").unwrap();
let recs = ServiceRecords::new(st, inst, host, 631, 120);
let (_, mut svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let snap = svc.withdrawal_snapshot();
let bogus = ServiceHandle::from_raw(0xDEAD);
ep.begin_withdrawal(bogus, snap, now); }
#[test]
fn poll_withdrawal_emits_ttl0_and_retains_sibling_host_addr() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let shared = Ipv4Addr::new(192, 168, 1, 5);
let unique = Ipv4Addr::new(192, 168, 1, 6);
let host = Name::try_from_str("h.local.").unwrap();
let mut recs_a = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("A._ipp._tcp.local.").unwrap(),
host.clone(),
631,
120,
);
recs_a.add_a(shared);
recs_a.add_a(unique);
recs_a.add_subtype("_printer").unwrap();
let sub = Name::try_from_str("_printer._sub._ipp._tcp.local.").unwrap();
let (a_handle, _svc_a) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs_a.clone()),
now,
)
.unwrap();
let mut recs_b = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("B._ipp._tcp.local.").unwrap(),
host.clone(),
632,
120,
);
recs_b.add_a(shared);
let (b_handle, _svc_b) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs_b),
now,
)
.unwrap();
ep.note_service_advertised(b_handle, &[shared], &[], true);
let snap = crate::service::WithdrawalSnapshot {
records: recs_a,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
true,
),
host_a: std::vec![shared, unique],
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(a_handle, snap, now);
let mut buf = std::vec![0u8; 4096];
let (_dst, len, got) = ep
.poll_withdrawal_transmit(now, &mut buf)
.expect("a due withdrawal must produce a datagram");
assert_eq!(
Some(got),
ep.route_withdrawal_token(a_handle),
"the route-attached item for the withdrawing handle is the one emitted"
);
let reader = crate::wire::MessageReader::try_parse(buf.get(..len).unwrap()).unwrap();
let mut saw_instance = false;
let mut saw_subtype = false;
let mut withdrawn_v4: std::vec::Vec<Ipv4Addr> = std::vec::Vec::new();
for rec in reader.answers() {
let rec = rec.unwrap();
assert_eq!(rec.ttl(), 0, "every goodbye record must carry TTL 0");
match rec.rtype() {
crate::wire::ResourceType::A => {
let d = rec.rdata();
assert_eq!(d.len(), 4, "A rdata is 4 bytes");
withdrawn_v4.push(Ipv4Addr::new(d[0], d[1], d[2], d[3]));
}
crate::wire::ResourceType::Ptr => {
if names_match(&sub, rec.name()) {
saw_subtype = true;
} else {
saw_instance = true;
}
}
crate::wire::ResourceType::Srv | crate::wire::ResourceType::Txt => saw_instance = true,
_ => {}
}
}
assert!(
saw_instance,
"instance records (PTR/SRV/TXT) must be withdrawn at TTL 0"
);
assert!(saw_subtype, "the subtype PTR must be withdrawn at TTL 0");
assert!(
withdrawn_v4.contains(&unique),
"A's unique address must be withdrawn"
);
assert!(
!withdrawn_v4.contains(&shared),
"the sibling-shared address must be RETAINED (not withdrawn)"
);
}
fn register_host_service(
ep: &mut TestEndp,
instance: &str,
host: &Name,
configured_a: &[Ipv4Addr],
advertised: Option<&[Ipv4Addr]>,
) -> ServiceHandle {
let mut recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str(instance).unwrap(),
host.clone(),
631,
120,
);
for a in configured_a {
recs.add_a(*a);
}
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
StdInstant::now(),
)
.unwrap();
if let Some(adv) = advertised {
ep.note_service_advertised(h, adv, &[], true);
}
h
}
fn poll_withdrawn_v4(
ep: &mut TestEndp,
now: StdInstant,
) -> (std::vec::Vec<Ipv4Addr>, WithdrawalToken) {
let mut buf = std::vec![0u8; 4096];
let (_dst, len, token) = ep
.poll_withdrawal_transmit(now, &mut buf)
.expect("a due withdrawal must produce a datagram");
let reader = crate::wire::MessageReader::try_parse(buf.get(..len).unwrap()).unwrap();
let mut withdrawn = std::vec::Vec::new();
for rec in reader.answers() {
let rec = rec.unwrap();
if rec.rtype() == crate::wire::ResourceType::A {
let d = rec.rdata();
withdrawn.push(Ipv4Addr::new(d[0], d[1], d[2], d[3]));
}
}
(withdrawn, token)
}
fn host_a_snapshot(
host: &Name,
instance: &str,
host_a: &[Ipv4Addr],
) -> crate::service::WithdrawalSnapshot {
let mut recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str(instance).unwrap(),
host.clone(),
631,
120,
);
for a in host_a {
recs.add_a(*a);
}
crate::service::WithdrawalSnapshot {
records: recs,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: host_a.to_vec(),
host_aaaa: std::vec::Vec::new(),
}
}
#[test]
fn withdrawal_withdraws_addr_when_sibling_never_advertised() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("h.local.").unwrap();
let shared = Ipv4Addr::new(192, 168, 1, 5);
let unique = Ipv4Addr::new(192, 168, 1, 6);
let a = register_host_service(
&mut ep,
"A._ipp._tcp.local.",
&host,
&[shared, unique],
Some(&[shared, unique]),
);
let _b = register_host_service(&mut ep, "B._ipp._tcp.local.", &host, &[shared], None);
ep.begin_withdrawal(
a,
host_a_snapshot(&host, "A._ipp._tcp.local.", &[shared, unique]),
now,
);
let (withdrawn, token) = poll_withdrawn_v4(&mut ep, now);
assert_eq!(
token,
ep.route_withdrawal_token(a).unwrap(),
"the datagram is A's route-attached withdrawal item"
);
assert!(
withdrawn.contains(&shared),
"shared addr must be WITHDRAWN: no LIVE sibling actually advertised it"
);
assert!(
withdrawn.contains(&unique),
"A's unique addr must be withdrawn"
);
}
#[test]
fn withdrawal_retains_addr_advertised_by_live_sibling() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("h.local.").unwrap();
let shared = Ipv4Addr::new(192, 168, 1, 5);
let unique = Ipv4Addr::new(192, 168, 1, 6);
let a = register_host_service(
&mut ep,
"A._ipp._tcp.local.",
&host,
&[shared, unique],
Some(&[shared, unique]),
);
let _b = register_host_service(
&mut ep,
"B._ipp._tcp.local.",
&host,
&[shared],
Some(&[shared]),
);
ep.begin_withdrawal(
a,
host_a_snapshot(&host, "A._ipp._tcp.local.", &[shared, unique]),
now,
);
let (withdrawn, token) = poll_withdrawn_v4(&mut ep, now);
assert_eq!(
token,
ep.route_withdrawal_token(a).unwrap(),
"the datagram is A's route-attached withdrawal item"
);
assert!(
!withdrawn.contains(&shared),
"shared addr must be RETAINED: live sibling B still advertises it"
);
assert!(
withdrawn.contains(&unique),
"A's unique addr must be withdrawn"
);
}
#[test]
fn simultaneous_same_host_withdrawals_each_withdraw_shared_addr() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("h.local.").unwrap();
let shared = Ipv4Addr::new(192, 168, 1, 5);
let a = register_host_service(
&mut ep,
"A._ipp._tcp.local.",
&host,
&[shared],
Some(&[shared]),
);
let b = register_host_service(
&mut ep,
"B._ipp._tcp.local.",
&host,
&[shared],
Some(&[shared]),
);
ep.begin_withdrawal(
a,
host_a_snapshot(&host, "A._ipp._tcp.local.", &[shared]),
now,
);
ep.begin_withdrawal(
b,
host_a_snapshot(&host, "B._ipp._tcp.local.", &[shared]),
now,
);
let (withdrawn_1, tok1) = poll_withdrawn_v4(&mut ep, now);
assert!(
withdrawn_1.contains(&shared),
"first withdrawer ({tok1:?}) must withdraw the shared addr (sibling is also leaving)"
);
ep.note_withdrawal_result(
tok1,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Sent,
);
let (withdrawn_2, tok2) = poll_withdrawn_v4(&mut ep, now);
assert_ne!(
tok1, tok2,
"the second poll must advance to the OTHER withdrawer's item"
);
assert!(
withdrawn_2.contains(&shared),
"second withdrawer ({tok2:?}) must ALSO withdraw the shared addr"
);
}
#[test]
fn note_withdrawal_delivered_spends_failed_rearms() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("A._ipp._tcp.local.").unwrap(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs.clone()),
now,
)
.unwrap();
let snap = crate::service::WithdrawalSnapshot {
records: recs,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, now);
let token = ep.route_withdrawal_token(h).unwrap();
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Retry,
super::WithdrawalSend::Retry,
);
assert_eq!(
ep.route_withdrawal_owed(h),
Some([super::WITHDRAWAL_SENDS, super::WITHDRAWAL_SENDS]),
"a no-send round must not spend either family's resend budget"
);
let backoff_at = ep.route_withdrawal_next_at(h).unwrap();
assert_eq!(
backoff_at,
now
.checked_add_duration(super::WITHDRAWAL_RETRY_BACKOFF)
.unwrap()
);
assert!(
backoff_at
< now
.checked_add_duration(super::WITHDRAWAL_INTERVAL)
.unwrap(),
"a no-send round must NOT delay a full interval"
);
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Sent,
);
assert_eq!(
ep.route_withdrawal_owed(h),
Some([super::WITHDRAWAL_SENDS - 1, super::WITHDRAWAL_SENDS - 1]),
"a dual-stack delivered round spends exactly one per family"
);
assert_eq!(
ep.route_withdrawal_next_at(h).unwrap(),
now
.checked_add_duration(super::WITHDRAWAL_INTERVAL)
.unwrap()
);
ep.note_route_withdrawal_result(
h,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Retry,
);
assert_eq!(
ep.route_withdrawal_owed(h),
Some([super::WITHDRAWAL_SENDS - 2, super::WITHDRAWAL_SENDS - 1]),
"a v4-only round spends only v4's budget; v6 keeps its debt"
);
assert_eq!(
ep.route_withdrawal_next_at(h).unwrap(),
now
.checked_add_duration(super::WITHDRAWAL_INTERVAL)
.unwrap(),
"a round with >= 1 Sent re-arms at the full interval"
);
}
#[test]
fn withdrawal_send_as_str_slug_for_every_variant() {
assert_eq!(super::WithdrawalSend::Sent.as_str(), "sent");
assert_eq!(super::WithdrawalSend::Retry.as_str(), "retry");
assert_eq!(super::WithdrawalSend::WriteOff.as_str(), "write_off");
assert_eq!(
std::format!("{}", super::WithdrawalSend::WriteOff),
"write_off"
);
}
#[test]
fn withdrawal_not_freed_until_every_family_sent() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst.clone(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs.clone()),
now,
)
.unwrap();
let snap = crate::service::WithdrawalSnapshot {
records: recs,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, now);
let token = ep.route_withdrawal_token(h).unwrap();
for _ in 0..super::WITHDRAWAL_SENDS {
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Retry,
);
}
assert_eq!(
ep.route_withdrawal_owed(h),
Some([0, super::WITHDRAWAL_SENDS]),
"v4 fully sent but v6 (busy) still owes its whole budget"
);
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done);
assert!(
done.is_empty(),
"a withdrawal whose v6 family still owes must NOT be freed before the ceiling"
);
let dup = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst.clone(),
Name::try_from_str("h2.local.").unwrap(),
631,
120,
);
assert!(
matches!(
ep.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(dup),
now,
),
Err(RegisterServiceError::NameAlreadyRegistered(_))
),
"the name must stay held while v6's goodbye debt is unpaid"
);
for _ in 0..super::WITHDRAWAL_SENDS {
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Sent,
);
}
assert_eq!(
ep.route_withdrawal_owed(h),
Some([0, 0]),
"once v6 sends its budget every family's debt is cleared"
);
ep.drain_completed_withdrawals(now, &mut done);
assert!(
done.contains(&h),
"the withdrawal completes once every family has withdrawn its records"
);
let recs2 = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst,
Name::try_from_str("h2.local.").unwrap(),
631,
120,
);
assert!(
ep.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs2),
now,
)
.is_ok(),
"the withdrawn name is re-registerable once all families have sent"
);
}
#[test]
fn withdrawal_writeoff_family_completes() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst,
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs.clone()),
now,
)
.unwrap();
let snap = crate::service::WithdrawalSnapshot {
records: recs,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, now);
let token = ep.route_withdrawal_token(h).unwrap();
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::WriteOff,
);
assert_eq!(
ep.route_withdrawal_owed(h),
Some([super::WITHDRAWAL_SENDS - 1, 0]),
"WriteOff zeroes v6's debt; v4 spent exactly one"
);
for _ in 0..(super::WITHDRAWAL_SENDS - 1) {
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::WriteOff,
);
}
assert_eq!(
ep.route_withdrawal_owed(h),
Some([0, 0]),
"v4 fully sent + v6 written off → every family's debt cleared"
);
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done);
assert!(
done.contains(&h),
"the withdrawal completes via v4 alone once v6 is written off"
);
}
#[test]
fn withdrawal_retries_owed_family_at_backoff_when_other_is_paid() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("A._ipp._tcp.local.").unwrap(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs.clone()),
now,
)
.unwrap();
let snap = crate::service::WithdrawalSnapshot {
records: recs,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, now);
let token = ep.route_withdrawal_token(h).unwrap();
for _ in 0..super::WITHDRAWAL_SENDS {
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Retry,
);
}
assert_eq!(
ep.route_withdrawal_owed(h),
Some([0, super::WITHDRAWAL_SENDS]),
"v4 fully paid; v6 (busy) still owes its whole budget"
);
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Retry,
);
assert_eq!(
ep.route_withdrawal_owed(h),
Some([0, super::WITHDRAWAL_SENDS]),
"a redundant `Sent` on the already-paid v4 must not change any debt"
);
let backoff_at = ep.route_withdrawal_next_at(h).unwrap();
assert_eq!(
backoff_at,
now
.checked_add_duration(super::WITHDRAWAL_RETRY_BACKOFF)
.unwrap(),
"an already-paid family's `Sent` is not progress: re-arm at the short backoff"
);
assert!(
backoff_at
< now
.checked_add_duration(super::WITHDRAWAL_INTERVAL)
.unwrap(),
"the still-owed v6 must be retried at the short backoff, not a full interval"
);
for _ in 0..super::WITHDRAWAL_SENDS {
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Sent,
);
}
assert_eq!(
ep.route_withdrawal_owed(h),
Some([0, 0]),
"v6 draining its budget clears every family's debt"
);
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done);
assert!(
done.contains(&h),
"the withdrawal completes once the previously-owed v6 has sent its budget"
);
}
#[test]
fn writeoff_only_zeroes_its_own_family() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("A._ipp._tcp.local.").unwrap(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs.clone()),
now,
)
.unwrap();
let snap = crate::service::WithdrawalSnapshot {
records: recs,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, now);
ep.note_route_withdrawal_result(
h,
now,
super::WithdrawalSend::WriteOff,
super::WithdrawalSend::Retry,
);
assert_eq!(
ep.route_withdrawal_owed(h),
Some([0, super::WITHDRAWAL_SENDS]),
"WriteOff zeroes ONLY v4; v6's full budget is untouched"
);
}
#[test]
fn encode_failing_withdrawal_does_not_block_a_sibling() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let inst_a = Name::try_from_str("A._ipp._tcp.local.").unwrap();
let host_a = Name::try_from_str("ha.local.").unwrap();
let recs_a = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst_a,
host_a,
631,
120,
);
let big_a: std::vec::Vec<Ipv4Addr> = (0..60u8).map(|i| Ipv4Addr::new(10, 0, 0, i)).collect();
let (a, _svc_a) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs_a.clone()),
now,
)
.unwrap();
let snap_a = crate::service::WithdrawalSnapshot {
records: recs_a,
owned: crate::service::EmittedRecords::new(
true,
false,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: big_a,
host_aaaa: std::vec::Vec::new(),
};
let inst_b = Name::try_from_str("B._ipp._tcp.local.").unwrap();
let recs_b = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst_b,
Name::try_from_str("hb.local.").unwrap(),
632,
120,
);
let (b, _svc_b) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs_b.clone()),
now,
)
.unwrap();
let snap_b = crate::service::WithdrawalSnapshot {
records: recs_b,
owned: crate::service::EmittedRecords::new(
true,
false,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(a, snap_a, now);
ep.begin_withdrawal(b, snap_b, now);
let mut scratch = std::vec![0u8; 128];
let got = ep.poll_withdrawal_transmit(now, &mut scratch);
let (_dst, _len, got_handle) =
got.expect("the pump must scan past the encode-failing A and return B's goodbye");
assert_eq!(
Some(got_handle),
ep.route_withdrawal_token(b),
"B (encodable) is returned; A (encode-failing) did not head-of-line block"
);
let a_next = ep.route_withdrawal_next_at(a).unwrap();
assert!(
a_next > now,
"the encode-failing A must have its next_at pushed past now, not left due"
);
assert_eq!(
a_next,
now
.checked_add_duration(super::WITHDRAWAL_RETRY_BACKOFF)
.unwrap(),
"A re-arms at the short backoff after an encode failure"
);
assert_eq!(
ep.route_withdrawal_owed(a),
Some([super::WITHDRAWAL_SENDS, super::WITHDRAWAL_SENDS]),
"an encode failure must NOT spend A's resend budget"
);
}
#[test]
fn teardown_during_rename_goodbye_withdraws_old_and_new_name() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let old_name = Name::try_from_str("A._ipp._tcp.local.").unwrap();
let new_name = Name::try_from_str("A-1._ipp._tcp.local.").unwrap();
let host_v4 = Ipv4Addr::new(192, 168, 1, 7);
let host_v6 = std::net::Ipv6Addr::new(0xfd00, 0, 0, 0, 0, 0, 0, 1);
let mut recs_b = ServiceRecords::new(stype.clone(), new_name.clone(), host.clone(), 631, 120);
recs_b.add_a(host_v4);
recs_b.add_aaaa(host_v6);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs_b.clone()),
now,
)
.unwrap();
let old_records = ServiceRecords::new(stype, old_name.clone(), host.clone(), 631, 120);
let old_owned = crate::service::EmittedRecords::new(
true,
true,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
);
ep.enqueue_rename_withdrawal(
crate::service::RenameGoodbyeHandoff {
records: old_records,
owned: old_owned,
},
now,
false,
);
let snap = crate::service::WithdrawalSnapshot {
records: recs_b,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec![host_v4],
host_aaaa: std::vec![host_v6],
};
ep.begin_withdrawal(h, snap, now);
assert_eq!(
ep.route_withdrawal_owed(h),
Some([super::WITHDRAWAL_SENDS, super::WITHDRAWAL_SENDS]),
"the route-attached current-name (B) item owes a full budget"
);
assert_eq!(
ep.detached_withdrawal_owed_for(&old_name),
Some([super::WITHDRAWAL_SENDS, super::WITHDRAWAL_SENDS]),
"the detached old-name (A) item owes a full budget independently"
);
let mut buf = std::vec![0u8; 4096];
let parse = |bytes: &[u8]| {
let reader = crate::wire::MessageReader::try_parse(bytes).unwrap();
let mut saw_old = false;
let mut saw_new = false;
let mut v4: std::vec::Vec<Ipv4Addr> = std::vec::Vec::new();
let mut v6: std::vec::Vec<std::net::Ipv6Addr> = std::vec::Vec::new();
for rec in reader.answers() {
let rec = rec.unwrap();
assert_eq!(rec.ttl(), 0, "every goodbye record must carry TTL 0");
match rec.rtype() {
crate::wire::ResourceType::A => {
let d = rec.rdata();
assert_eq!(d.len(), 4, "A rdata is 4 bytes");
v4.push(Ipv4Addr::new(d[0], d[1], d[2], d[3]));
}
crate::wire::ResourceType::AAAA => {
let d = rec.rdata();
assert_eq!(d.len(), 16, "AAAA rdata is 16 bytes");
let mut o = [0u8; 16];
o.copy_from_slice(d);
v6.push(std::net::Ipv6Addr::from(o));
}
crate::wire::ResourceType::Srv => {
if names_match(&old_name, rec.name()) {
saw_old = true;
} else if names_match(&new_name, rec.name()) {
saw_new = true;
}
}
_ => {}
}
}
(saw_old, saw_new, v4, v6)
};
let token_b = ep.route_withdrawal_token(h).expect("B's route token");
let token_a_owed = ep.detached_withdrawal_owed_for(&old_name);
assert!(token_a_owed.is_some(), "A's detached item exists");
let mut saw_new_datagram = false;
let mut saw_old_datagram = false;
for _ in 0..2 {
let (_dst, len, token) = ep
.poll_withdrawal_transmit(now, &mut buf)
.expect("each rename-window item is due at now and emits its own datagram");
let (saw_old, saw_new, withdrawn_v4, withdrawn_v6) = parse(buf.get(..len).unwrap());
if saw_new {
assert_eq!(token, token_b, "B's datagram round-trips B's route token");
assert!(!saw_old, "B's datagram does NOT carry the old name A");
assert!(
withdrawn_v4.contains(&host_v4) && withdrawn_v6.contains(&host_v6),
"the confirmed host A/AAAA addresses are withdrawn with B"
);
saw_new_datagram = true;
} else {
assert!(saw_old, "the other datagram carries the old name A");
assert_ne!(
token, token_b,
"A's datagram is a DIFFERENT (detached) item"
);
assert!(
withdrawn_v4.is_empty() && withdrawn_v6.is_empty(),
"a rename (old-name) goodbye never withdraws host addresses"
);
saw_old_datagram = true;
}
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Sent,
);
}
assert!(
saw_new_datagram && saw_old_datagram,
"BOTH the current name B and the old name A are withdrawn, as separate datagrams"
);
assert_eq!(
ep.route_withdrawal_owed(h),
Some([super::WITHDRAWAL_SENDS - 1, super::WITHDRAWAL_SENDS - 1]),
"B's route item spent exactly one round of its own budget"
);
assert_eq!(
ep.detached_withdrawal_owed_for(&old_name),
Some([super::WITHDRAWAL_SENDS - 1, super::WITHDRAWAL_SENDS - 1]),
"A's detached item spent exactly one round of its own budget"
);
}
#[test]
fn collision_old_name_holds_against_reregister_until_goodbye_completes() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let old_name = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("printer.local.").unwrap();
let old_records = ServiceRecords::new(stype.clone(), old_name.clone(), host.clone(), 631, 120);
let old_owned = crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
);
ep.enqueue_rename_withdrawal(
crate::service::RenameGoodbyeHandoff {
records: old_records,
owned: old_owned,
},
now,
true,
);
let recs = ServiceRecords::new(stype.clone(), old_name.clone(), host.clone(), 631, 120);
assert!(
matches!(
ep.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now
),
Err(RegisterServiceError::NameAlreadyRegistered(_))
),
"a HELD collision old-name must block re-registration until its goodbye completes"
);
let mut buf = std::vec![0u8; 4096];
let mut out: std::vec::Vec<crate::ServiceHandle> = std::vec::Vec::new();
let mut t = now;
for _ in 0..20 {
while let Some((_, _, tok)) = ep.poll_withdrawal_transmit(t, &mut buf) {
ep.note_withdrawal_result(
tok,
t,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Sent,
);
}
ep.drain_completed_withdrawals(t, &mut out);
if ep.detached_withdrawal_owed_for(&old_name).is_none() {
break;
}
t += std::time::Duration::from_millis(400);
}
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_none(),
"the held old-name goodbye must complete"
);
let recs2 = ServiceRecords::new(stype, old_name, host, 631, 120);
assert!(
ep.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs2),
now
)
.is_ok(),
"once the held goodbye completes, the old name is reusable"
);
}
#[test]
fn surviving_rename_old_name_is_reclaimable_on_announce() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let old_name = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("printer.local.").unwrap();
let old_records = ServiceRecords::new(stype.clone(), old_name.clone(), host.clone(), 631, 120);
let old_owned = crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
);
ep.enqueue_rename_withdrawal(
crate::service::RenameGoodbyeHandoff {
records: old_records,
owned: old_owned,
},
now,
false,
);
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_some(),
"the detached goodbye is queued"
);
let recs = ServiceRecords::new(stype, old_name.clone(), host, 631, 120);
let (handle, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.expect("a surviving rename's old name must be reclaimable, not blocked");
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_some(),
"registration must NOT cancel the reclaimable goodbye (it survives until announce)"
);
ep.note_service_advertised(handle, &[Ipv4Addr::new(192, 168, 1, 10)], &[], true);
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_none(),
"the reclaimable goodbye is cancelled when the new service announces the name"
);
}
#[test]
fn probe_does_not_cancel_reclaimed_goodbye_only_a_confirmed_advertise_does() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let old_name = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("printer.local.").unwrap();
ep.enqueue_rename_withdrawal(
crate::service::RenameGoodbyeHandoff {
records: ServiceRecords::new(stype.clone(), old_name.clone(), host.clone(), 631, 120),
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
},
now,
false,
);
let recs = ServiceRecords::new(stype, old_name.clone(), host, 631, 120);
let (handle, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
ep.note_service_advertised(handle, &[], &[], false);
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_some(),
"a probe (advertised_instance=false) must NOT cancel the reclaimed goodbye"
);
ep.note_service_advertised(handle, &[], &[], true);
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_none(),
"a confirmed instance-advertise cancels the reclaimed goodbye (even address-less)"
);
}
#[test]
fn rename_enqueues_a_detached_withdrawal_for_the_old_name() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let old_name = Name::try_from_str("Old._ipp._tcp.local.").unwrap();
let new_name = Name::try_from_str("Old-1._ipp._tcp.local.").unwrap();
let recs = ServiceRecords::new(stype.clone(), new_name.clone(), host.clone(), 631, 120);
let (_h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_none(),
"no detached item exists before the rename handoff is enqueued"
);
let old_records = ServiceRecords::new(stype, old_name.clone(), host, 631, 120);
let old_owned = crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
);
ep.enqueue_rename_withdrawal(
crate::service::RenameGoodbyeHandoff {
records: old_records,
owned: old_owned,
},
now,
false,
);
assert_eq!(
ep.detached_withdrawal_owed_for(&old_name),
Some([super::WITHDRAWAL_SENDS, super::WITHDRAWAL_SENDS]),
"the rename enqueues a detached item owning the old name with a full budget"
);
let mut buf = std::vec![0u8; 4096];
let (_dst, len, token) = ep
.poll_withdrawal_transmit(now, &mut buf)
.expect("the detached old-name item is due and emits its goodbye");
let reader = crate::wire::MessageReader::try_parse(buf.get(..len).unwrap()).unwrap();
let mut saw_old_srv = false;
let mut saw_host_addr = false;
for rec in reader.answers() {
let rec = rec.unwrap();
assert_eq!(rec.ttl(), 0, "every rename-goodbye record carries TTL 0");
match rec.rtype() {
crate::wire::ResourceType::Srv => {
if names_match(&old_name, rec.name()) {
saw_old_srv = true;
}
}
crate::wire::ResourceType::A | crate::wire::ResourceType::AAAA => saw_host_addr = true,
_ => {}
}
}
assert!(
saw_old_srv,
"the detached goodbye withdraws the OLD instance's SRV at TTL 0"
);
assert!(
!saw_host_addr,
"a rename (old-name) goodbye never withdraws host A/AAAA"
);
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done);
assert!(
done.is_empty(),
"a detached item reports no handle while still in flight"
);
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_some(),
"the detached item is still owed after one (unconfirmed-by-drain) round"
);
for _ in 0..super::WITHDRAWAL_SENDS {
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Sent,
);
}
assert_eq!(
ep.detached_withdrawal_owed_for(&old_name),
Some([0, 0]),
"the detached old-name budget is fully spent"
);
let mut done2: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done2);
assert!(
done2.is_empty(),
"a completed detached item frees no route and reports to nobody"
);
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_none(),
"the completed detached item is removed"
);
let empty_owned = crate::service::EmittedRecords::new(
false,
false,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
);
let empty_records = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("Empty._ipp._tcp.local.").unwrap(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
ep.enqueue_rename_withdrawal(
crate::service::RenameGoodbyeHandoff {
records: empty_records,
owned: empty_owned,
},
now,
false,
);
assert!(
ep.detached_withdrawal_owed_for(&Name::try_from_str("Empty._ipp._tcp.local.").unwrap())
.is_none(),
"an empty-ownership handoff is a no-op (nothing for peers to evict)"
);
}
#[test]
fn rename_only_withdrawal_emits_old_name_goodbye() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let old_name = Name::try_from_str("Old._ipp._tcp.local.").unwrap();
let cur_name = Name::try_from_str("Cur._ipp._tcp.local.").unwrap();
let cur_recs = ServiceRecords::new(stype.clone(), cur_name, host.clone(), 631, 120);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(cur_recs.clone()),
now,
)
.unwrap();
let old_records = ServiceRecords::new(stype, old_name.clone(), host, 631, 120);
let old_owned = crate::service::EmittedRecords::new(
true,
true,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
);
ep.enqueue_rename_withdrawal(
crate::service::RenameGoodbyeHandoff {
records: old_records,
owned: old_owned,
},
now,
false,
);
let snap = crate::service::WithdrawalSnapshot {
records: cur_recs,
owned: crate::service::EmittedRecords::new(
false,
false,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, now);
assert_eq!(
ep.route_withdrawal_owed(h),
Some([0, 0]),
"the route-attached current-name item owes nothing"
);
assert_eq!(
ep.detached_withdrawal_owed_for(&old_name),
Some([super::WITHDRAWAL_SENDS, super::WITHDRAWAL_SENDS]),
"the detached old-name item owes a full per-family budget"
);
let mut buf = std::vec![0u8; 4096];
let detached_token = {
let (_dst, len, token) = ep
.poll_withdrawal_transmit(now, &mut buf)
.expect("the detached old-name item must still produce the old-name goodbye");
let reader = crate::wire::MessageReader::try_parse(buf.get(..len).unwrap()).unwrap();
let mut saw_old = false;
for rec in reader.answers() {
let rec = rec.unwrap();
assert_eq!(rec.ttl(), 0);
if rec.rtype() == crate::wire::ResourceType::Srv && names_match(&old_name, rec.name()) {
saw_old = true;
}
}
assert!(
saw_old,
"the OLD name's instance records are withdrawn at TTL 0 (separate detached item)"
);
token
};
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done);
assert!(
done.contains(&h),
"the (empty) route-attached item completes immediately and reports its handle"
);
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_some(),
"the detached old-name item is still in flight (not yet fully sent)"
);
for _ in 0..super::WITHDRAWAL_SENDS {
ep.note_withdrawal_result(
detached_token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Sent,
);
}
assert_eq!(
ep.detached_withdrawal_owed_for(&old_name),
Some([0, 0]),
"the detached old-name budget is fully spent"
);
let mut done2: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done2);
assert!(
done2.is_empty(),
"a detached old-name item completes silently — no handle reported"
);
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_none(),
"the detached old-name item is removed once fully sent"
);
}
#[test]
fn dual_name_each_fits_but_combined_would_not() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let old_name = Name::try_from_str("Old._ipp._tcp.local.").unwrap();
let new_name = Name::try_from_str("New._ipp._tcp.local.").unwrap();
let big_seg = || std::vec![b'x'; 240];
let mut recs_b = ServiceRecords::new(stype.clone(), new_name.clone(), host.clone(), 631, 120);
for _ in 0..4 {
recs_b.add_txt_segment(big_seg());
}
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs_b.clone()),
now,
)
.unwrap();
let mut old_records = ServiceRecords::new(stype, old_name.clone(), host, 631, 120);
for _ in 0..4 {
old_records.add_txt_segment(big_seg());
}
let owned_full = crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
);
ep.enqueue_rename_withdrawal(
crate::service::RenameGoodbyeHandoff {
records: old_records,
owned: owned_full.clone(),
},
now,
false,
);
let snap = crate::service::WithdrawalSnapshot {
records: recs_b,
owned: owned_full,
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, now);
let mut buf = std::vec![0u8; 1600];
let mut len_new = 0usize;
let mut len_old = 0usize;
for _ in 0..2 {
let (_d, len, token) = ep
.poll_withdrawal_transmit(now, &mut buf)
.expect("each single-name goodbye fits its own datagram");
let reader = crate::wire::MessageReader::try_parse(buf.get(..len).unwrap()).unwrap();
let mut saw_new = false;
let mut saw_old = false;
for r in reader.answers() {
let r = r.unwrap();
if r.rtype() == crate::wire::ResourceType::Srv {
if names_match(&new_name, r.name()) {
saw_new = true;
} else if names_match(&old_name, r.name()) {
saw_old = true;
}
}
}
if saw_new {
assert!(!saw_old, "the current name rides its OWN datagram");
len_new = len;
} else {
assert!(saw_old, "the other datagram carries the old name");
len_old = len;
}
ep.note_withdrawal_result(
token,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Sent,
);
}
assert!(
len_new > 0 && len_old > 0,
"BOTH names were withdrawn, each in its own datagram"
);
assert!(len_new <= buf.len() && len_old <= buf.len());
assert!(
len_new + len_old > buf.len(),
"combined message ({len_new} + {len_old} = {}) would exceed the {}-byte scratch",
len_new + len_old,
buf.len()
);
}
#[test]
fn independent_items_unencodable_current_does_not_starve_rename() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let old_name = Name::try_from_str("Old._ipp._tcp.local.").unwrap();
let cur_name = Name::try_from_str("Cur._ipp._tcp.local.").unwrap();
let mut cur_recs = ServiceRecords::new(stype.clone(), cur_name.clone(), host.clone(), 631, 120);
for _ in 0..4 {
cur_recs.add_txt_segment(std::vec![b'x'; 240]);
}
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(cur_recs.clone()),
now,
)
.unwrap();
let old_records = ServiceRecords::new(stype, old_name.clone(), host, 631, 120);
let old_owned = crate::service::EmittedRecords::new(
true,
true,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
);
ep.enqueue_rename_withdrawal(
crate::service::RenameGoodbyeHandoff {
records: old_records,
owned: old_owned,
},
now,
false,
);
let snap = crate::service::WithdrawalSnapshot {
records: cur_recs,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, now);
assert!(
ep.route_withdrawal_owed(h).is_some(),
"the current name is a route-attached item"
);
assert_eq!(
ep.detached_withdrawal_owed_for(&old_name),
Some([super::WITHDRAWAL_SENDS, super::WITHDRAWAL_SENDS]),
"the renamed-away old name is a detached item owing a full budget"
);
let mut small = std::vec![0u8; 300];
let (_d, len, tok) = ep
.poll_withdrawal_transmit(now, &mut small)
.expect("the small old-name goodbye is emitted even though the current is unencodable");
let reader = crate::wire::MessageReader::try_parse(small.get(..len).unwrap()).unwrap();
let saw_old = reader.answers().any(|r| {
let r = r.unwrap();
r.rtype() == crate::wire::ResourceType::Srv && names_match(&old_name, r.name())
});
assert!(
saw_old,
"the renamed-away old name is withdrawn — NOT starved by the unencodable current"
);
assert_ne!(
Some(tok),
ep.route_withdrawal_token(h),
"the emitted item is the detached old-name item, not the unencodable route item"
);
let mut done = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done);
assert!(
!done.contains(&h),
"the route is held while its withdrawal item is still in flight"
);
let past = now
.checked_add_duration(super::WITHDRAWAL_CEILING + core::time::Duration::from_millis(1))
.unwrap();
let mut guard = 0;
while ep.poll_withdrawal_transmit(past, &mut small).is_some() {
guard += 1;
assert!(
guard < 16,
"the past-ceiling pump must terminate (each item's final attempt fires once)"
);
}
let mut done2 = std::vec::Vec::new();
ep.drain_completed_withdrawals(past, &mut done2);
assert!(
done2.contains(&h),
"the route is force-freed at its ceiling even though the current goodbye never encoded"
);
}
#[test]
fn unregister_service_drops_route_attached_withdrawal_no_stale_goodbye() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let inst = Name::try_from_str("Svc._ipp._tcp.local.").unwrap();
let recs = ServiceRecords::new(stype.clone(), inst.clone(), host, 631, 120);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs.clone()),
now,
)
.unwrap();
let snap = crate::service::WithdrawalSnapshot {
records: recs,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, now);
assert!(
ep.route_withdrawal_owed(h).is_some(),
"a route-attached withdrawal item owes a goodbye for the name"
);
assert!(ep.unregister_service(h), "the route was found and removed");
assert!(
ep.route_withdrawal_owed(h).is_none(),
"force-remove dropped the route-attached withdrawal item"
);
ep.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(ServiceRecords::new(
stype,
inst,
Name::try_from_str("other.local.").unwrap(),
700,
120,
)),
now,
)
.expect("the name is reusable after force-remove");
let mut buf = std::vec![0u8; 1500];
assert!(
ep.poll_withdrawal_transmit(now, &mut buf).is_none(),
"no stale TTL=0 goodbye is emitted for the force-removed-then-reused name"
);
}
#[test]
fn reclaiming_a_detached_name_cancels_its_goodbye() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let old_name = Name::try_from_str("Old._ipp._tcp.local.").unwrap();
let cur_name = Name::try_from_str("Cur._ipp._tcp.local.").unwrap();
let cur_recs = ServiceRecords::new(stype.clone(), cur_name, host.clone(), 631, 120);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(cur_recs.clone()),
now,
)
.unwrap();
let old_records = ServiceRecords::new(stype, old_name.clone(), host, 631, 120);
let old_owned = crate::service::EmittedRecords::new(
true,
true,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
);
ep.enqueue_rename_withdrawal(
crate::service::RenameGoodbyeHandoff {
records: old_records,
owned: old_owned,
},
now,
false,
);
let snap = crate::service::WithdrawalSnapshot {
records: cur_recs,
owned: crate::service::EmittedRecords::new(
false,
false,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, now);
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_some(),
"a detached item owns the renamed-away old name"
);
let dup = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
old_name.clone(),
Name::try_from_str("other.local.").unwrap(),
700,
120,
);
let (dup_h, _dup_svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(dup),
now,
)
.expect("reclaiming a detached-reserved name succeeds (not rejected)");
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_some(),
"registration must NOT cancel the goodbye — it survives until the reclaiming service announces"
);
ep.note_service_advertised(dup_h, &[], &[], true);
assert!(
ep.detached_withdrawal_owed_for(&old_name).is_none(),
"the detached old-name goodbye is cancelled when the reclaiming service announces"
);
}
#[test]
fn rename_onto_a_detached_name_cancels_it_not_kills_the_service() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let target = Name::try_from_str("Target._ipp._tcp.local.").unwrap();
let s_recs = ServiceRecords::new(
stype.clone(),
Name::try_from_str("S._ipp._tcp.local.").unwrap(),
host.clone(),
631,
120,
);
let (s, _svc_s) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(s_recs),
now,
)
.unwrap();
let c2_recs = ServiceRecords::new(
stype.clone(),
Name::try_from_str("C2._ipp._tcp.local.").unwrap(),
host.clone(),
632,
120,
);
let (h2, _svc2) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(c2_recs.clone()),
now,
)
.unwrap();
let target_records = ServiceRecords::new(stype, target.clone(), host, 633, 120);
let target_owned = crate::service::EmittedRecords::new(
true,
true,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
);
ep.enqueue_rename_withdrawal(
crate::service::RenameGoodbyeHandoff {
records: target_records,
owned: target_owned,
},
now,
false,
);
let snap2 = crate::service::WithdrawalSnapshot {
records: c2_recs,
owned: crate::service::EmittedRecords::new(
false,
false,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h2, snap2, now);
assert!(
ep.detached_withdrawal_owed_for(&target).is_some(),
"a detached item owns `target`"
);
ep.handle_service_renamed(s, target.clone())
.expect("an auto-rename onto a detached-reserved name succeeds (not rejected)");
assert!(
ep.detached_withdrawal_owed_for(&target).is_some(),
"the rename must NOT cancel the goodbye — it survives until S advertises `target`"
);
ep.note_service_advertised(s, &[], &[], true);
assert!(
ep.detached_withdrawal_owed_for(&target).is_none(),
"the detached goodbye is cancelled once S advertises the reclaimed name"
);
}
#[test]
fn rename_onto_a_held_detached_name_is_rejected_reclaimable_is_not() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let held = Name::try_from_str("Held._ipp._tcp.local.").unwrap();
let reclaimable = Name::try_from_str("Reclaim._ipp._tcp.local.").unwrap();
let s_recs = ServiceRecords::new(
stype.clone(),
Name::try_from_str("S._ipp._tcp.local.").unwrap(),
host.clone(),
631,
120,
);
let (s, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(s_recs),
now,
)
.unwrap();
let mk = |name: &Name| crate::service::RenameGoodbyeHandoff {
records: ServiceRecords::new(stype.clone(), name.clone(), host.clone(), 631, 120),
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
};
ep.enqueue_rename_withdrawal(mk(&held), now, true);
ep.enqueue_rename_withdrawal(mk(&reclaimable), now, false);
assert!(
ep.handle_service_renamed(s, held.clone()).is_err(),
"a rename onto a HELD collision name must be rejected"
);
assert!(
ep.detached_withdrawal_owed_for(&held).is_some(),
"the held goodbye is untouched by the rejected rename"
);
assert!(
ep.handle_service_renamed(s, reclaimable.clone()).is_ok(),
"a rename onto a RECLAIMABLE detached name must succeed"
);
assert!(
ep.detached_withdrawal_owed_for(&reclaimable).is_some(),
"the reclaimable goodbye survives the rename (cancelled only on advertise)"
);
}
#[test]
fn owed_family_gets_a_final_attempt_at_ceiling() {
let mut ep = build_endpoint();
let t0 = StdInstant::now();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst.clone(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs.clone()),
t0,
)
.unwrap();
let snap = crate::service::WithdrawalSnapshot {
records: recs,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, t0);
let ceiling = t0.checked_add_duration(super::WITHDRAWAL_CEILING).unwrap();
for _ in 0..super::WITHDRAWAL_SENDS {
ep.note_route_withdrawal_result(
h,
t0,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Retry,
);
}
assert_eq!(
ep.route_withdrawal_owed(h),
Some([0, super::WITHDRAWAL_SENDS]),
"v4 paid; v6 still owes its whole budget"
);
let t_near = t0
.checked_add_duration(super::WITHDRAWAL_CEILING - core::time::Duration::from_millis(1))
.unwrap();
ep.note_route_withdrawal_result(
h,
t_near,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Retry,
);
assert_eq!(
ep.route_withdrawal_next_at(h),
Some(ceiling),
"the re-arm must be CLAMPED to ceiling_at, not pushed past it"
);
let mut buf = std::vec![0u8; 4096];
let first = ep.poll_withdrawal_transmit(ceiling, &mut buf);
let (_dst, _len, got) =
first.expect("the owed family must get a FINAL goodbye attempt at the ceiling");
assert_eq!(
Some(got),
ep.route_withdrawal_token(h),
"the final attempt is for the owed withdrawal"
);
assert!(
ep.poll_withdrawal_transmit(ceiling, &mut buf).is_none(),
"the final attempt fires exactly once; a second poll must return None"
);
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(ceiling, &mut done);
assert!(
done.contains(&h),
"after its final ceiling attempt the route is force-completed and freed"
);
let recs2 = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst,
Name::try_from_str("h2.local.").unwrap(),
631,
120,
);
assert!(
ep.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs2),
ceiling,
)
.is_ok(),
"the withdrawn name is re-registerable after the route is force-freed"
);
}
#[test]
fn past_ceiling_owed_withdrawal_is_held_until_final_attempt() {
let mut ep = build_endpoint();
let t0 = StdInstant::now();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("Printer._ipp._tcp.local.").unwrap(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs.clone()),
t0,
)
.unwrap();
let snap = crate::service::WithdrawalSnapshot {
records: recs,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, t0);
let ceiling = t0.checked_add_duration(super::WITHDRAWAL_CEILING).unwrap();
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(ceiling, &mut done);
assert!(
done.is_empty(),
"a past-ceiling owed withdrawal must be HELD until its final attempt is made"
);
let mut buf = std::vec![0u8; 4096];
assert!(
ep.poll_withdrawal_transmit(ceiling, &mut buf).is_some(),
"the final ceiling attempt is emitted"
);
ep.drain_completed_withdrawals(ceiling, &mut done);
assert!(
done.contains(&h),
"after the final attempt the held route is force-completed"
);
}
#[cfg(feature = "stats")]
#[test]
fn withdrawal_completes_frees_name_and_decrements_active() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst.clone(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, mut svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let before = ep.stats().services_active;
ep.begin_withdrawal(h, svc.withdrawal_snapshot(), now);
for _ in 0..super::WITHDRAWAL_SENDS {
ep.note_route_withdrawal_result(
h,
now,
super::WithdrawalSend::Sent,
super::WithdrawalSend::Sent,
);
}
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done);
assert_eq!(
done,
std::vec![h],
"the completed handle is returned for GC"
);
assert_eq!(
ep.stats().services_active,
before - 1,
"services_active is decremented on completion"
);
let recs2 = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst,
Name::try_from_str("h2.local.").unwrap(),
631,
120,
);
assert!(
ep.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs2),
now,
)
.is_ok(),
"the withdrawn name is re-registerable after completion"
);
}
#[test]
fn withdrawal_force_completes_at_ceiling() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("A._ipp._tcp.local.").unwrap(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, mut svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
ep.begin_withdrawal(h, svc.withdrawal_snapshot(), now);
let at_ceiling = now.checked_add_duration(super::WITHDRAWAL_CEILING).unwrap();
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(at_ceiling, &mut done);
assert_eq!(
done,
std::vec![h],
"ceiling force-completes a wedged withdrawal"
);
}
fn host_only_snapshot(
host: &Name,
instance: &str,
host_a: &[Ipv4Addr],
) -> crate::service::WithdrawalSnapshot {
let mut recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str(instance).unwrap(),
host.clone(),
631,
120,
);
for a in host_a {
recs.add_a(*a);
}
crate::service::WithdrawalSnapshot {
records: recs,
owned: crate::service::EmittedRecords::new(
false,
false,
false,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: host_a.to_vec(),
host_aaaa: std::vec::Vec::new(),
}
}
#[test]
fn retained_only_withdrawal_completes_and_does_not_block_a_sibling() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("h.local.").unwrap();
let shared = Ipv4Addr::new(192, 168, 1, 5);
let _c = register_host_service(
&mut ep,
"C._ipp._tcp.local.",
&host,
&[shared],
Some(&[shared]),
);
let a = register_host_service(
&mut ep,
"A._ipp._tcp.local.",
&host,
&[shared],
Some(&[shared]),
);
let recs_b = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("B._ipp._tcp.local.").unwrap(),
Name::try_from_str("hb.local.").unwrap(),
632,
120,
);
let (b, _svc_b) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs_b.clone()),
now,
)
.unwrap();
let snap_b = crate::service::WithdrawalSnapshot {
records: recs_b,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(
a,
host_only_snapshot(&host, "A._ipp._tcp.local.", &[shared]),
now,
);
ep.begin_withdrawal(b, snap_b, now);
let mut buf = std::vec![0u8; 4096];
let (_dst, len, got) = ep
.poll_withdrawal_transmit(now, &mut buf)
.expect("the pump must scan past the retained-only A and return B's goodbye");
assert_eq!(
Some(got),
ep.route_withdrawal_token(b),
"the genuine withdrawal B is the one that emits"
);
let reader = crate::wire::MessageReader::try_parse(buf.get(..len).unwrap()).unwrap();
assert!(
reader.answers().count() > 0,
"B's goodbye must carry its TTL=0 instance records"
);
assert_eq!(
ep.route_withdrawal_owed(a),
Some([0, 0]),
"the retained-only A must be COMPLETED (owed = [0, 0]) by the scan"
);
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done);
assert!(
done.contains(&a),
"the retained-only A must be freed immediately, not pinned to the ceiling"
);
let recs_a2 = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("A._ipp._tcp.local.").unwrap(),
Name::try_from_str("h2.local.").unwrap(),
633,
120,
);
assert!(
ep.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs_a2),
now,
)
.is_ok(),
"A's name is released the moment its retained-only withdrawal completes"
);
}
#[test]
fn retained_only_withdrawal_completes_immediately() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let host = Name::try_from_str("h.local.").unwrap();
let shared = Ipv4Addr::new(192, 168, 1, 5);
let _c = register_host_service(
&mut ep,
"C._ipp._tcp.local.",
&host,
&[shared],
Some(&[shared]),
);
let a = register_host_service(
&mut ep,
"A._ipp._tcp.local.",
&host,
&[shared],
Some(&[shared]),
);
ep.begin_withdrawal(
a,
host_only_snapshot(&host, "A._ipp._tcp.local.", &[shared]),
now,
);
let mut buf = std::vec![0u8; 4096];
assert!(
ep.poll_withdrawal_transmit(now, &mut buf).is_none(),
"a retained-only withdrawal has nothing to emit"
);
assert_eq!(
ep.route_withdrawal_owed(a),
Some([0, 0]),
"the retained-only withdrawal must be completed (owed = [0, 0]), not left due"
);
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done);
assert_eq!(
done,
std::vec![a],
"the retained-only withdrawal is freed at once, not at the 2 s ceiling"
);
}
#[test]
fn withdrawing_route_is_not_answered_but_still_blocks_reregister() {
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst.clone(),
Name::try_from_str("printer-host.local.").unwrap(),
631,
120,
);
let (handle, mut svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
e.begin_withdrawal(handle, svc.withdrawal_snapshot(), now);
let src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 55), 5353));
let local_ip = core::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10));
let mut buf = [0u8; 512];
let n = build_query_for_host(&mut buf, "printer-host.local.");
let routed_to_service = e
.handle(StdInstant::now(), src, local_ip, 0, &buf[..n], false)
.unwrap()
.any(|ev| matches!(ev, Ok(crate::event::RouteEvent::ToService(_))));
assert!(
!routed_to_service,
"a withdrawing service must not be routed a question"
);
let recs2 = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst,
Name::try_from_str("h2.local.").unwrap(),
631,
120,
);
assert!(
matches!(
e.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs2),
now
),
Err(RegisterServiceError::NameAlreadyRegistered(_))
),
"the withdrawing name must still be held"
);
}
#[test]
fn withdrawing_route_receives_no_service_dispatch_but_still_blocks_reregister() {
use core::net::SocketAddr;
use crate::wire::{Header, MessageBuilder};
let mut e = build_endpoint();
let now = StdInstant::now();
let inst = Name::try_from_str("Printer._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("printer-host.local.").unwrap();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst.clone(),
host.clone(),
631,
120,
);
let (handle, mut svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let src: SocketAddr = SocketAddr::from((Ipv4Addr::new(192, 168, 1, 55), 5353));
let local_ip = core::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 10));
let host_pkt = {
let mut buf = [0u8; 512];
let mut b = MessageBuilder::<'_, 32>::try_new(&mut buf, Header::new()).unwrap();
b.push_a_authority(&host, 120, Ipv4Addr::new(10, 0, 0, 99))
.unwrap();
let n = b.finish().unwrap();
buf[..n].to_vec()
};
let inst_pkt = {
let target = Name::try_from_str("rival.local.").unwrap();
let mut buf = [0u8; 512];
let mut b = MessageBuilder::<'_, 32>::try_new(&mut buf, Header::new()).unwrap();
b.push_srv_authority(&inst, 120, 0, 0, 9999, &target)
.unwrap();
let n = b.finish().unwrap();
buf[..n].to_vec()
};
let ka_pkt = {
let meta = Name::try_from_str("_services._dns-sd._udp.local.").unwrap();
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let mut buf = [0u8; 512];
let mut b = MessageBuilder::<'_, 32>::try_new(&mut buf, Header::new()).unwrap();
b.push_ptr_answer(&meta, 120, &stype).unwrap();
let n = b.finish().unwrap();
buf[..n].to_vec()
};
let live_host = e
.handle(StdInstant::now(), src, local_ip, 0, &host_pkt, false)
.unwrap()
.any(|ev| matches!(ev, Ok(crate::event::RouteEvent::ToService(_))));
assert!(
live_host,
"sanity: a LIVE service must receive the HostConflict dispatch"
);
let live_inst = e
.handle(StdInstant::now(), src, local_ip, 0, &inst_pkt, false)
.unwrap()
.any(|ev| matches!(ev, Ok(crate::event::RouteEvent::ToService(_))));
assert!(
live_inst,
"sanity: a LIVE service must receive the ProbeConflict dispatch"
);
let live_ka = e
.handle(StdInstant::now(), src, local_ip, 0, &ka_pkt, false)
.unwrap()
.any(|ev| matches!(ev, Ok(crate::event::RouteEvent::ToService(_))));
assert!(
live_ka,
"sanity: a LIVE service must receive the meta-PTR KnownAnswer dispatch"
);
e.begin_withdrawal(handle, svc.withdrawal_snapshot(), now);
let wd_host = e
.handle(StdInstant::now(), src, local_ip, 0, &host_pkt, false)
.unwrap()
.any(|ev| matches!(ev, Ok(crate::event::RouteEvent::ToService(_))));
assert!(
!wd_host,
"a withdrawing service must not receive a HostConflict dispatch"
);
let wd_inst = e
.handle(StdInstant::now(), src, local_ip, 0, &inst_pkt, false)
.unwrap()
.any(|ev| matches!(ev, Ok(crate::event::RouteEvent::ToService(_))));
assert!(
!wd_inst,
"a withdrawing service must not receive a ProbeConflict dispatch"
);
let wd_ka = e
.handle(StdInstant::now(), src, local_ip, 0, &ka_pkt, false)
.unwrap()
.any(|ev| matches!(ev, Ok(crate::event::RouteEvent::ToService(_))));
assert!(
!wd_ka,
"a withdrawing service must not receive a KnownAnswer dispatch"
);
let recs2 = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
inst,
Name::try_from_str("h2.local.").unwrap(),
631,
120,
);
assert!(
matches!(
e.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs2),
now
),
Err(RegisterServiceError::NameAlreadyRegistered(_))
),
"the withdrawing name must still be held"
);
}
#[test]
fn poll_timeout_accounts_for_due_withdrawal() {
let mut e = build_endpoint();
let now = StdInstant::now();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("A._ipp._tcp.local.").unwrap(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, mut svc) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
e.begin_withdrawal(h, svc.withdrawal_snapshot(), now);
assert_eq!(
e.poll_timeout(),
Some(now),
"a due-now withdrawal makes poll_timeout return now"
);
}
#[cfg(feature = "stats")]
#[test]
fn empty_withdrawal_completes_immediately() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("A._ipp._tcp.local.").unwrap(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, mut svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
let before = ep.stats().services_active;
ep.begin_withdrawal(h, svc.withdrawal_snapshot(), now);
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done);
assert_eq!(
done,
std::vec![h],
"an empty withdrawal completes on the first drain (no ceiling wait)"
);
assert_eq!(ep.stats().services_active, before - 1);
}
#[test]
fn next_withdrawal_deadline_reflects_only_withdrawals() {
let mut ep = build_endpoint();
let now = StdInstant::now();
assert_eq!(
ep.next_withdrawal_deadline(),
None,
"no withdrawal in flight → no withdrawal deadline"
);
assert!(!ep.has_pending_withdrawals());
let stype = Name::try_from_str("_ipp._tcp.local.").unwrap();
let inst = Name::try_from_str("Svc._ipp._tcp.local.").unwrap();
let host = Name::try_from_str("h.local.").unwrap();
let recs = ServiceRecords::new(stype, inst, host, 631, 120);
let (h, _svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs.clone()),
now,
)
.unwrap();
let snap = crate::service::WithdrawalSnapshot {
records: recs,
owned: crate::service::EmittedRecords::new(
true,
true,
true,
std::vec::Vec::new(),
std::vec::Vec::new(),
false,
),
host_a: std::vec::Vec::new(),
host_aaaa: std::vec::Vec::new(),
};
ep.begin_withdrawal(h, snap, now);
assert_eq!(
ep.next_withdrawal_deadline(),
Some(now),
"a due-now withdrawal sets the withdrawal deadline"
);
assert!(ep.has_pending_withdrawals());
assert!(ep.unregister_service(h));
assert_eq!(ep.next_withdrawal_deadline(), None);
assert!(!ep.has_pending_withdrawals());
}
#[test]
fn begin_withdrawal_is_idempotent() {
let mut ep = build_endpoint();
let now = StdInstant::now();
let recs = ServiceRecords::new(
Name::try_from_str("_ipp._tcp.local.").unwrap(),
Name::try_from_str("A._ipp._tcp.local.").unwrap(),
Name::try_from_str("h.local.").unwrap(),
631,
120,
);
let (h, mut svc) = ep
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs),
now,
)
.unwrap();
ep.begin_withdrawal(h, svc.withdrawal_snapshot(), now);
ep.begin_withdrawal(h, svc.withdrawal_snapshot(), now);
let mut done: std::vec::Vec<ServiceHandle> = std::vec::Vec::new();
ep.drain_completed_withdrawals(now, &mut done);
assert_eq!(
done,
std::vec![h],
"idempotent begin_withdrawal must report the handle exactly once"
);
}
#[test]
fn qr0_known_answer_fans_out_to_a_later_matching_service() {
use crate::{
event::RouteEvent,
wire::{DEFAULT_COMPRESSION_TABLE, Header, MessageBuilder},
};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let st0 = Name::try_from_str("_http._tcp.local.").unwrap();
let inst0 = Name::try_from_str("Alpha._http._tcp.local.").unwrap();
let host0 = Name::try_from_str("alpha.local.").unwrap();
let recs0 = ServiceRecords::new(st0, inst0, host0, 80, 120);
let (_h0, _s0) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs0),
now,
)
.unwrap();
let st1 = Name::try_from_str("_other._tcp.local.").unwrap();
let inst1 = Name::try_from_str("Beta._other._tcp.local.").unwrap();
let host1 = Name::try_from_str("beta.local.").unwrap();
let recs1 = ServiceRecords::new(st1, inst1.clone(), host1, 81, 120);
let (h1, _s1) = e
.try_register_service::<slab::Slab<Transmit>, slab::Slab<ServiceUpdate>>(
ServiceSpec::new(recs1),
now,
)
.unwrap();
let mut buf = [0u8; 512];
let header = Header::new(); let mut b: MessageBuilder<'_, DEFAULT_COMPRESSION_TABLE> =
MessageBuilder::try_new(&mut buf, header).unwrap();
b.push_a_answer(&inst1, 120, Ipv4Addr::new(10, 0, 0, 2), false)
.unwrap();
let n = b.finish().unwrap();
let src: SocketAddr = "192.168.1.99:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let known_answers: std::vec::Vec<_> = e
.handle(now, src, local_ip, 0, &buf[..n], false)
.unwrap()
.filter_map(Result::ok)
.filter_map(|ev| match ev {
RouteEvent::ToService(ts) if ts.event().is_known_answer() => Some(ts.handle()),
_ => None,
})
.collect();
assert_eq!(
known_answers,
std::vec![h1],
"the KAS hint must fan out past the non-matching first service to Beta"
);
}
#[test]
fn additional_section_malformed_record_surfaces_parse_error() {
use crate::{config::QuerySpec, event::RouteEvent, wire::ResourceType};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
let _h = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::A), now)
.unwrap();
let mut msg: std::vec::Vec<u8> = std::vec::Vec::new();
msg.extend_from_slice(&[0, 0, 0x84, 0x00, 0, 0, 0, 0, 0, 0, 0, 2]); msg.extend_from_slice(&[
7, b'p', b'r', b'i', b'n', b't', b'e', b'r', 5, b'l', b'o', b'c', b'a', b'l', 0,
]);
msg.extend_from_slice(&1u16.to_be_bytes()); msg.extend_from_slice(&1u16.to_be_bytes()); msg.extend_from_slice(&120u32.to_be_bytes()); msg.extend_from_slice(&4u16.to_be_bytes()); msg.extend_from_slice(&[10, 0, 0, 7]);
msg.extend_from_slice(&[0x10, b'x']);
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let mut saw_to_query = false;
let mut saw_parse_err = false;
for ev in e.handle(now, src, local_ip, 0, &msg, false).unwrap() {
match ev {
Ok(RouteEvent::ToQuery(_)) => saw_to_query = true,
Err(HandleError::Parse(_)) => saw_parse_err = true,
_ => {}
}
}
assert!(
saw_to_query,
"the well-formed first additional must still reach the active query"
);
assert!(
saw_parse_err,
"a malformed additional record must surface HandleError::Parse from the iterator"
);
}
#[test]
fn additional_section_ttl0_withdrawal_skipped_then_later_record_delivered() {
use crate::{config::QuerySpec, event::RouteEvent, wire::ResourceType};
use core::net::SocketAddr;
let mut e = build_endpoint();
let now = StdInstant::now();
let qname = Name::try_from_str("printer.local.").unwrap();
let _h = e
.try_start_query(QuerySpec::new(qname.clone(), ResourceType::A), now)
.unwrap();
let owner: [u8; 15] = [
7, b'p', b'r', b'i', b'n', b't', b'e', b'r', 5, b'l', b'o', b'c', b'a', b'l', 0,
];
let mut msg: std::vec::Vec<u8> = std::vec::Vec::new();
msg.extend_from_slice(&[0, 0, 0x84, 0x00, 0, 0, 0, 0, 0, 0, 0, 2]); msg.extend_from_slice(&owner);
msg.extend_from_slice(&1u16.to_be_bytes()); msg.extend_from_slice(&1u16.to_be_bytes()); msg.extend_from_slice(&0u32.to_be_bytes()); msg.extend_from_slice(&4u16.to_be_bytes()); msg.extend_from_slice(&[10, 0, 0, 8]);
msg.extend_from_slice(&owner);
msg.extend_from_slice(&1u16.to_be_bytes()); msg.extend_from_slice(&1u16.to_be_bytes()); msg.extend_from_slice(&120u32.to_be_bytes()); msg.extend_from_slice(&4u16.to_be_bytes()); msg.extend_from_slice(&[10, 0, 0, 9]);
let src: SocketAddr = "192.168.1.77:5353".parse().unwrap();
let local_ip: core::net::IpAddr = "192.168.1.1".parse().unwrap();
let to_query = e
.handle(now, src, local_ip, 0, &msg, false)
.unwrap()
.filter(|r| matches!(r, Ok(RouteEvent::ToQuery(_))))
.count();
assert_eq!(
to_query, 1,
"a TTL=0 additional must be skipped while the following positive-TTL one is delivered"
);
}