1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
//! SEC-21 Phase 3h — DNSSEC validation integration test.
//!
//! Drives the resolver-refresh ticker end-to-end with a synthetic
//! "bogus DNSSEC" upstream and asserts the pipeline emits a
//! `dns_authority_dnssec_failed` event into a JSONL sink. The
//! expected discriminator path is:
//!
//! 1. `spawn_continuous_ticker` is called with a `dnssec_policy`
//! populated (validate=true, failClosed=false — audit mode so we
//! can also observe the drift event with `dnssec_status` tagged).
//! 2. The ticker calls our synthetic `validated_resolver` which always
//! returns a `Failed { reason: "synthetic-bogus" }` outcome.
//! 3. The ticker's per-tick refresher recognises the Failed verdict
//! and emits a `dns_authority_dnssec_failed` event.
//! 4. The drift event still fires (audit-only kept the answer) and
//! carries `dnssecStatus: "validation_failed"`.
//! 5. Both events flow through the in-memory `DriftEmitter` collector
//! (the JSONL sink fan-out adapter is exercised in
//! `supervisor_resolver_refresh.rs`; this test focuses on the
//! Phase 3h-specific signals).
//!
//! ## Why `#[ignore]`
//!
//! The test is plain Rust + tokio with localhost UDP — works on any
//! platform CellOS builds for. We mark it `#[ignore]` per the
//! repository convention for tests in the SEC-21 family that exercise
//! the *integration* surface (vs the unit surface in
//! `crates/cellos-supervisor/src/resolver_refresh/{ticker,
//! hickory_resolve}::tests`). CI runs ignored tests via the
//! quality-gate's `cargo test -- --ignored` invocation; local
//! developers can opt in with the same flag.
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use cellos_core::{
CloudEventV1, DnsRebindingPolicy, DnsRefreshPolicy, DnsResolver, DnsResolverDnssecPolicy,
DnsResolverProtocol,
};
use cellos_supervisor::resolver_refresh::ticker::{
spawn_continuous_ticker, DriftEmitter, SharedResolverFn, SharedValidatedResolverFn,
TickerConfig,
};
use cellos_supervisor::resolver_refresh::{
DnssecValidationResult, ResolvedAnswer, TrustAnchors, ValidatedResolvedAnswer,
};
#[derive(Default)]
struct CollectingEmitter {
events: Mutex<Vec<CloudEventV1>>,
}
impl DriftEmitter for CollectingEmitter {
fn emit(&self, event: CloudEventV1) {
self.events.lock().unwrap().push(event);
}
}
fn one_dnssec_resolver() -> Vec<DnsResolver> {
vec![DnsResolver {
resolver_id: "resolver-do53-internal".into(),
endpoint: "127.0.0.1:53".into(),
protocol: DnsResolverProtocol::Do53Udp,
trust_kid: None,
dnssec: Some(DnsResolverDnssecPolicy {
validate: true,
fail_closed: false,
trust_anchors_path: None,
}),
}]
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
async fn supervisor_dnssec_validation_emits_failed_event_into_sink() {
// Synthetic upstream that ALWAYS returns a "Failed" validation
// outcome — mirrors a real upstream returning a bogus RRSIG that
// hickory's bundled validator would reject. We do NOT spin a
// wire-format DNSSEC server here; the validator's discriminator
// is unit-tested in `hickory_resolve::tests` against synthetic
// localhost UDP. This integration test pins the END-TO-END
// *plumbing*: ticker pickup → refresher emit → emitter receive.
let validated: SharedValidatedResolverFn = Arc::new(|_h: &str| {
Ok(ValidatedResolvedAnswer {
answer: ResolvedAnswer {
targets: vec!["198.51.100.7".to_string()],
ttl_seconds: 60,
resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
},
validation: DnssecValidationResult::Failed {
reason: "synthetic-bogus-rrsig".to_string(),
},
})
});
// Plain (unvalidated) closure is required by the TickerConfig
// even when DNSSEC is active — the ticker uses the validated path
// when `dnssec_active` is true but the field is structurally
// mandatory. A panicking closure proves it is never called.
let unused: SharedResolverFn =
Arc::new(|_h: &str| panic!("plain resolver MUST NOT be called when DNSSEC is active"));
let cfg = TickerConfig {
interval: Duration::from_millis(70),
policy: Some(DnsRefreshPolicy {
min_ttl_seconds: Some(0),
max_stale_seconds: None,
strategy: None,
}),
rebinding_policy: None,
resolvers: one_dnssec_resolver(),
hostnames: vec!["bogus.example.com".into()],
keyset_id: Some("keyset-it-001".into()),
issuer_kid: Some("kid-it-001".into()),
policy_digest: Some(
"sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into(),
),
correlation_id: Some("corr-it-001".into()),
source: "cellos-supervisor-it".into(),
cell_id: "it-cell-dnssec".into(),
run_id: "it-run-dnssec".into(),
dnssec_policy: Some(DnsResolverDnssecPolicy {
validate: true,
fail_closed: false, // audit-only; preserves drift event for cross-correlation
trust_anchors_path: None,
}),
trust_anchors: Some(TrustAnchors::iana_default()),
validated_resolver: Some(validated),
};
let emitter = Arc::new(CollectingEmitter::default());
let handle = spawn_continuous_ticker(cfg, emitter.clone(), unused);
// Two ticks at 70ms — sleep 220ms to comfortably observe both.
tokio::time::sleep(Duration::from_millis(220)).await;
handle.shutdown.store(true, Ordering::SeqCst);
let _ = tokio::time::timeout(Duration::from_secs(2), handle.task)
.await
.expect("ticker join timeout");
let events = emitter.events.lock().unwrap();
let dnssec_failed: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_authority_dnssec_failed"))
.collect();
assert!(
!dnssec_failed.is_empty(),
"synthetic bogus DNSSEC upstream must fire dns_authority_dnssec_failed (got {} total events: {:?})",
events.len(),
events.iter().map(|e| &e.ty).collect::<Vec<_>>()
);
let payload = dnssec_failed[0].data.as_ref().expect("event has data");
assert_eq!(payload["reason"], "validation_failed");
assert_eq!(payload["failClosed"], false);
assert_eq!(payload["trustAnchorSource"], "iana-default");
assert_eq!(payload["resolverId"], "resolver-do53-internal");
assert_eq!(payload["hostname"], "bogus.example.com");
assert_eq!(payload["cellId"], "it-cell-dnssec");
assert_eq!(payload["correlationId"], "corr-it-001");
// SEC-21 Phase 3h.1 — additive `source` discriminator. The
// resolver-refresh path stamps `"resolver_refresh"` (the dataplane
// surface stamps `"dataplane"`). Asserting the literal here locks
// in the contract so a future emitter regression is caught at this
// test layer instead of leaking into the SIEM dispatch path.
assert_eq!(
payload["source"], "resolver_refresh",
"P3h resolver-refresh emissions must stamp source=resolver_refresh (P3h.1 contract)"
);
// Drift event must also fire (audit-only kept the answer) and
// carry dnssecStatus="validation_failed" so SIEM correlation works.
let drift_events: Vec<_> = events
.iter()
.filter(|e| e.ty.ends_with("dns_authority_drift"))
.collect();
assert!(
!drift_events.is_empty(),
"audit-only mode keeps the answer; drift must still fire"
);
let drift_data = drift_events[0].data.as_ref().expect("drift has data");
assert_eq!(drift_data["dnssecStatus"], "validation_failed");
// Suppress the unused-import warning for DnsRebindingPolicy when
// reachability changes in the future; keep the import for forward
// additions to this test (e.g. a combined DNSSEC + rebinding scenario).
let _ = std::mem::size_of::<DnsRebindingPolicy>();
}