use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use cellos_core::{CloudEventV1, DnsQueryType};
use cellos_supervisor::dns_proxy::dnssec::{DataplaneDnssecBackend, DataplaneDnssecValidator};
use cellos_supervisor::dns_proxy::{run_one_shot, DnsProxyConfig, DnsQueryEmitter};
use cellos_supervisor::resolver_refresh::DnssecValidationResult;
#[derive(Default)]
struct MemEmitter {
events: Mutex<Vec<CloudEventV1>>,
}
impl DnsQueryEmitter for MemEmitter {
fn emit(&self, event: CloudEventV1) {
self.events.lock().unwrap().push(event);
}
}
impl MemEmitter {
fn snapshot(&self) -> Vec<CloudEventV1> {
self.events.lock().unwrap().clone()
}
}
fn build_query_packet(qname: &str, qtype: u16) -> Vec<u8> {
let mut p = Vec::new();
p.extend_from_slice(&[
0xab, 0xcd, 0x01, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
]);
for label in qname.split('.') {
p.push(label.len() as u8);
p.extend_from_slice(label.as_bytes());
}
p.push(0);
p.extend_from_slice(&qtype.to_be_bytes());
p.extend_from_slice(&[0x00, 0x01]);
p
}
fn build_a_response(query: &[u8], ancount: u16) -> Vec<u8> {
let mut resp = query.to_vec();
resp[2] = 0x81;
resp[3] = 0x80;
resp[6] = (ancount >> 8) as u8;
resp[7] = (ancount & 0xff) as u8;
for _ in 0..ancount {
resp.extend_from_slice(&[0xc0, 0x0c]); resp.extend_from_slice(&[0x00, 0x01]); resp.extend_from_slice(&[0x00, 0x01]); resp.extend_from_slice(&[0x00, 0x00, 0x01, 0x2c]); resp.extend_from_slice(&[0x00, 0x04]); resp.extend_from_slice(&[203, 0, 113, 1]);
}
resp
}
fn spawn_synthetic_upstream() -> (SocketAddr, std::thread::JoinHandle<()>) {
let sock = UdpSocket::bind("127.0.0.1:0").unwrap();
let addr = sock.local_addr().unwrap();
sock.set_read_timeout(Some(Duration::from_millis(2000)))
.unwrap();
let h = std::thread::spawn(move || {
let mut buf = [0u8; 1500];
while let Ok((n, peer)) = sock.recv_from(&mut buf) {
let resp = build_a_response(&buf[..n], 1);
let _ = sock.send_to(&resp, peer);
}
});
(addr, h)
}
fn proxy_cfg(
upstream: SocketAddr,
validator: Option<Arc<DataplaneDnssecValidator>>,
) -> DnsProxyConfig {
DnsProxyConfig {
bind_addr: "127.0.0.1:0".parse().unwrap(),
upstream_addr: upstream,
hostname_allowlist: vec!["api.example.com".into()],
allowed_query_types: vec![DnsQueryType::A, DnsQueryType::AAAA],
cell_id: "it-cell-dp-dnssec".into(),
run_id: "it-run-dp-dnssec".into(),
policy_digest: Some(
"sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into(),
),
keyset_id: Some("it-keyset-dp".into()),
issuer_kid: Some("it-kid-dp-001".into()),
correlation_id: Some("it-corr-dp-001".into()),
upstream_resolver_id: "resolver-dp-001".into(),
upstream_timeout: Duration::from_millis(400),
tcp_idle_timeout: Duration::ZERO,
dnssec_validator: validator,
transport: cellos_supervisor::dns_proxy::upstream::UpstreamTransport::Do53Udp,
upstream_extras: cellos_supervisor::dns_proxy::upstream::UpstreamExtras::default(),
}
}
fn validator_returning(
fail_closed: bool,
outcome_factory: impl Fn() -> std::io::Result<DnssecValidationResult> + Send + Sync + 'static,
) -> Arc<DataplaneDnssecValidator> {
let backend: Arc<DataplaneDnssecBackend> = Arc::new(move |_h, _t| outcome_factory());
Arc::new(DataplaneDnssecValidator::with_backend(
fail_closed,
"iana-default".into(),
backend,
))
}
fn run_one_query(cfg: DnsProxyConfig, qname: &str, qtype: u16) -> (u8, Vec<CloudEventV1>) {
let listener = UdpSocket::bind("127.0.0.1:0").unwrap();
listener
.set_read_timeout(Some(Duration::from_millis(150)))
.unwrap();
let listen_addr = listener.local_addr().unwrap();
let upstream_sock = UdpSocket::bind("127.0.0.1:0").unwrap();
let emitter = Arc::new(MemEmitter::default());
let shutdown = Arc::new(AtomicBool::new(false));
let proxy_handle = {
let emitter = emitter.clone();
let shutdown = shutdown.clone();
let cfg = cfg.clone();
std::thread::spawn(move || {
let _ = run_one_shot(&cfg, &listener, &upstream_sock, &*emitter, &shutdown);
})
};
let client = UdpSocket::bind("127.0.0.1:0").unwrap();
client
.set_read_timeout(Some(Duration::from_secs(2)))
.unwrap();
let q = build_query_packet(qname, qtype);
client.send_to(&q, listen_addr).unwrap();
let mut rb = [0u8; 1500];
let (_n, _) = client.recv_from(&mut rb).unwrap();
let rcode = rb[3] & 0x0f;
shutdown.store(true, Ordering::SeqCst);
proxy_handle.join().unwrap();
(rcode, emitter.snapshot())
}
#[test]
fn require_validated_forwards_answer_and_emits_no_dnssec_event() {
let (upstream, _u) = spawn_synthetic_upstream();
let v = validator_returning(true, || {
Ok(DnssecValidationResult::Validated {
algorithm: "RSASHA256".into(),
key_tag: 12345,
})
});
let cfg = proxy_cfg(upstream, Some(v));
let (rcode, events) = run_one_query(cfg, "api.example.com", 1);
assert_eq!(rcode, 0, "Validated MUST yield NOERROR (forwarded answer)");
let dnssec_events: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_authority_dnssec_failed"))
.collect();
assert!(
dnssec_events.is_empty(),
"Validated path MUST NOT emit dns_authority_dnssec_failed; got {} events: {:?}",
dnssec_events.len(),
dnssec_events.iter().map(|e| &e.ty).collect::<Vec<_>>()
);
let q_events: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_query"))
.collect();
assert_eq!(q_events.len(), 1, "exactly one dns_query event expected");
let q_data = q_events[0].data.as_ref().unwrap();
assert_eq!(q_data["decision"], "allow");
assert_eq!(q_data["reasonCode"], "allowed_by_allowlist");
}
#[test]
fn require_unsigned_servfails_and_emits_unsigned_in_require_event() {
let (upstream, _u) = spawn_synthetic_upstream();
let v = validator_returning(true, || Ok(DnssecValidationResult::Unsigned));
let cfg = proxy_cfg(upstream, Some(v));
let (rcode, events) = run_one_query(cfg, "api.example.com", 1);
assert_eq!(
rcode, 2,
"require + Unsigned MUST yield SERVFAIL; got rcode={rcode}"
);
let dnssec_events: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_authority_dnssec_failed"))
.collect();
assert_eq!(
dnssec_events.len(),
1,
"exactly one dns_authority_dnssec_failed event expected"
);
let payload = dnssec_events[0].data.as_ref().unwrap();
assert_eq!(payload["reason"], "unsigned_in_require_mode");
assert_eq!(payload["source"], "dataplane");
assert_eq!(payload["failClosed"], true);
assert_eq!(payload["trustAnchorSource"], "iana-default");
assert_eq!(payload["resolverId"], "resolver-dp-001");
assert_eq!(payload["hostname"], "api.example.com");
assert_eq!(payload["cellId"], "it-cell-dp-dnssec");
assert_eq!(payload["correlationId"], "it-corr-dp-001");
}
#[test]
fn require_bogus_servfails_and_emits_validation_failed_event() {
let (upstream, _u) = spawn_synthetic_upstream();
let v = validator_returning(true, || {
Ok(DnssecValidationResult::Failed {
reason: "synthetic-bogus-rrsig".into(),
})
});
let cfg = proxy_cfg(upstream, Some(v));
let (rcode, events) = run_one_query(cfg, "api.example.com", 1);
assert_eq!(rcode, 2, "require + Failed MUST yield SERVFAIL");
let dnssec_events: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_authority_dnssec_failed"))
.collect();
assert_eq!(dnssec_events.len(), 1);
let payload = dnssec_events[0].data.as_ref().unwrap();
assert_eq!(payload["reason"], "validation_failed");
assert_eq!(payload["source"], "dataplane");
assert_eq!(payload["failClosed"], true);
}
#[test]
fn best_effort_unsigned_forwards_answer_and_emits_no_event() {
let (upstream, _u) = spawn_synthetic_upstream();
let v = validator_returning(false, || Ok(DnssecValidationResult::Unsigned));
let cfg = proxy_cfg(upstream, Some(v));
let (rcode, events) = run_one_query(cfg, "api.example.com", 1);
assert_eq!(
rcode, 0,
"best_effort + Unsigned MUST yield NOERROR (forwarded answer)"
);
let dnssec_events: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_authority_dnssec_failed"))
.collect();
assert!(
dnssec_events.is_empty(),
"best_effort + Unsigned MUST NOT emit dns_authority_dnssec_failed (explicit tolerate-unsigned branch); got {} events",
dnssec_events.len()
);
}
#[test]
fn best_effort_bogus_servfails_and_emits_event() {
let (upstream, _u) = spawn_synthetic_upstream();
let v = validator_returning(false, || {
Ok(DnssecValidationResult::Failed {
reason: "synthetic-bogus-rrsig".into(),
})
});
let cfg = proxy_cfg(upstream, Some(v));
let (rcode, events) = run_one_query(cfg, "api.example.com", 1);
assert_eq!(
rcode, 2,
"best_effort + Failed MUST yield SERVFAIL — bogus is always rejected"
);
let dnssec_events: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_authority_dnssec_failed"))
.collect();
assert_eq!(dnssec_events.len(), 1);
let payload = dnssec_events[0].data.as_ref().unwrap();
assert_eq!(payload["reason"], "validation_failed");
assert_eq!(payload["source"], "dataplane");
assert_eq!(payload["failClosed"], false);
}
#[test]
fn off_mode_query_path_byte_identical() {
let (upstream, _u) = spawn_synthetic_upstream();
let cfg = proxy_cfg(upstream, None); let (rcode, events) = run_one_query(cfg, "api.example.com", 1);
assert_eq!(rcode, 0, "off mode MUST yield NOERROR (forwarded answer)");
let dnssec_events: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_authority_dnssec_failed"))
.collect();
assert!(
dnssec_events.is_empty(),
"off mode MUST NOT emit any dns_authority_dnssec_failed events"
);
let q_events: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_query"))
.collect();
assert_eq!(q_events.len(), 1);
let q_data = q_events[0].data.as_ref().unwrap();
assert_eq!(q_data["decision"], "allow");
}
#[test]
fn require_skip_non_a_aaaa_servfails() {
let (upstream, _u) = spawn_synthetic_upstream();
let backend: Arc<DataplaneDnssecBackend> = Arc::new(|_h, _t| {
panic!("backend MUST NOT be called for non-A/AAAA query (Skip is decided pre-dispatch)")
});
let v = Arc::new(DataplaneDnssecValidator::with_backend(
true, "iana-default".into(),
backend,
));
let mut cfg = proxy_cfg(upstream, Some(v));
cfg.allowed_query_types = vec![DnsQueryType::A, DnsQueryType::AAAA, DnsQueryType::TXT];
let (rcode, events) = run_one_query(cfg, "api.example.com", 16);
assert_eq!(rcode, 2, "require + Skip (non-A/AAAA) MUST yield SERVFAIL");
let dnssec_events: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_authority_dnssec_failed"))
.collect();
assert_eq!(dnssec_events.len(), 1);
let payload = dnssec_events[0].data.as_ref().unwrap();
assert_eq!(payload["reason"], "unsupported_query_type_in_require_mode");
assert_eq!(payload["source"], "dataplane");
}
#[test]
fn best_effort_skip_non_a_aaaa_forwards_unvalidated() {
let (upstream, _u) = spawn_synthetic_upstream();
let backend: Arc<DataplaneDnssecBackend> = Arc::new(|_h, _t| {
panic!("backend MUST NOT be called for non-A/AAAA query (Skip is decided pre-dispatch)")
});
let v = Arc::new(DataplaneDnssecValidator::with_backend(
false, "iana-default".into(),
backend,
));
let mut cfg = proxy_cfg(upstream, Some(v));
cfg.allowed_query_types = vec![DnsQueryType::A, DnsQueryType::AAAA, DnsQueryType::TXT];
let (rcode, events) = run_one_query(cfg, "api.example.com", 16);
assert_eq!(
rcode, 0,
"best_effort + Skip MUST yield NOERROR (forwarded unvalidated)"
);
let dnssec_events: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_authority_dnssec_failed"))
.collect();
assert!(dnssec_events.is_empty());
}