cellos_supervisor/resolver_refresh/mod.rs
1//! SEC-21 host-controlled resolver refresh + DNS authority drift detection.
2//!
3//! This module is the dataplane companion to the W1 T13 contract layer
4//! (`spec.authority.dnsAuthority.refreshPolicy`). On each tick it:
5//!
6//! 1. For every declared hostname, calls an injectable resolver function and
7//! receives a (possibly empty) set of target strings.
8//! 2. Compares the answer against the prior observation in [`ResolverState`].
9//! 3. When the target set has changed, emits a
10//! `dev.cellos.events.cell.observability.v1.dns_authority_drift` CloudEvent.
11//! 4. Honors `policy.minTtlSeconds` (floor — skip refreshes within this window)
12//! and `policy.maxStaleSeconds` (ceiling — force refresh once exceeded).
13//! 5. With `strategy: manual`, automatic ticks are skipped — caller must opt
14//! in explicitly.
15//!
16//! The resolver function is injected via the [`ResolverFn`] type alias so unit
17//! tests can run without contacting the network. The supervisor wires the
18//! production path to a `to_socket_addrs`-style lookup; see
19//! [`crate::supervisor`] for the gating behind `CELLOS_DNS_AUTHORITY_REFRESH=1`.
20//!
21//! ## Scope
22//!
23//! - **Phase 1 (W2 SEC-21, shipped):** the supervisor calls
24//! [`ResolverRefresh::tick`] exactly once at startup, before the workload
25//! runs. `tick()` itself stays sync — callers run it from a context where
26//! blocking is acceptable, e.g. inside `tokio::task::spawn_blocking`.
27//! - **Phase 2 (this slice, shipped):** the [`ticker`] submodule wraps
28//! `tick()` in a long-running tokio task that fires every
29//! `min(refreshPolicy.minTtlSeconds, 60s).max(5s)` for the lifetime of
30//! the cell. The ticker owns its own [`ResolverState`] (independent of
31//! the startup tick) and dispatches drift events through a sync
32//! [`ticker::DriftEmitter`] adapter — see [`sink_emitter`] for the
33//! sync→async bridge to [`cellos_core::ports::EventSink`].
34//! - **Phase 3 (shipped):** real upstream TTL via [`hickory_resolve`]. The
35//! resolver function now returns a [`ResolvedAnswer`] carrying both the
36//! target set AND the minimum TTL the upstream nameserver advertised. Two
37//! downstream effects:
38//!
39//! 1. The `dns_authority_drift` event's `ttlSeconds` field is now real
40//! (was always `0`); `staleSeconds` becomes meaningful when the
41//! ticker observes drift past the prior TTL.
42//! 2. `refreshPolicy.minTtlSeconds` clamps short upstream TTLs as a
43//! DNS-rebinding-mitigation floor (operators can refuse to honour a
44//! TTL=0 fast-flux record by setting a positive floor) — see
45//! [`ticker::TickerConfig`] / [`ResolverRefresh::tick`] for the
46//! clamp logic.
47//!
48//! Per-hostname response-IP allowlisting (the *full* rebinding closure)
49//! remains a separate slice; the floor is the partial-mitigation hook.
50
51pub mod dnssec;
52pub mod hickory_resolve;
53pub mod rebinding;
54pub mod sink_emitter;
55pub mod ticker;
56
57#[allow(unused_imports)]
58pub use dnssec::{TrustAnchors, ENV_TRUST_ANCHORS_PATH, TRUST_ANCHOR_SOURCE_IANA_DEFAULT};
59#[allow(unused_imports)]
60pub use hickory_resolve::{
61 extract_rrsig_metadata, proof_to_validation_result_with_rrsig, resolve_with_ttl,
62 resolve_with_ttl_validated, DnssecValidationResult, ResolvedAnswer, ValidatedResolvedAnswer,
63};
64#[allow(unused_imports)]
65pub use rebinding::{RebindingDecision, RebindingState};
66
67use std::collections::{BTreeSet, HashMap};
68use std::io;
69use std::time::{Duration, SystemTime, UNIX_EPOCH};
70
71use sha2::{Digest, Sha256};
72
73use cellos_core::{
74 cloud_event_v1_dns_authority_dnssec_failed, cloud_event_v1_dns_authority_drift,
75 cloud_event_v1_dns_authority_rebind_rejected, cloud_event_v1_dns_authority_rebind_threshold,
76 CloudEventV1, DnsAuthorityDnssecFailed, DnsAuthorityDnssecFailureReason, DnsAuthorityDrift,
77 DnsAuthorityRebindRejected, DnsAuthorityRebindThreshold, DnsRebindingPolicy, DnsRefreshPolicy,
78 DnsRefreshStrategy, DnsResolver, DnsResolverDnssecPolicy,
79};
80
81/// Injectable resolver function.
82///
83/// scope: returns a [`ResolvedAnswer`] carrying both the target set AND
84/// the upstream TTL. Tests pass a closure returning canned answers;
85/// production wires this to a `hickory-resolver`-backed call (see
86/// [`hickory_resolve::resolve_with_ttl`]).
87///
88/// An `Err` is treated the same as "no targets observed this tick" — drift
89/// events are not emitted for a transient resolver failure, and prior state
90/// is preserved.
91///
92/// **Migration note** (legacy signature): an older signature was
93/// `Fn(&str) -> io::Result<Vec<String>>`. Existing call sites that have
94/// already canonicalized to `Vec<String>` can adapt with a one-line shim:
95/// `Ok(ResolvedAnswer { targets: vec, ttl_seconds: 0, resolver_addr: ... })`.
96pub type ResolverFn<'a> = dyn Fn(&str) -> io::Result<ResolvedAnswer> + 'a;
97
98/// SEC-21 Phase 3h — DNSSEC-validating resolver function.
99///
100/// Returns a [`ValidatedResolvedAnswer`] carrying both the standard
101/// [`ResolvedAnswer`] AND the [`DnssecValidationResult`] discriminator
102/// (Validated / Failed / Unsigned). When the [`ResolverRefresh`] has a
103/// `dnssec_policy` configured, this callback is preferred over the
104/// plain [`ResolverFn`]; otherwise it is unused.
105///
106/// An `Err` is treated identically to the [`ResolverFn`] err path —
107/// transient resolver failure, no drift event, prior state preserved.
108pub type ValidatedResolverFn<'a> = dyn Fn(&str) -> io::Result<ValidatedResolvedAnswer> + 'a;
109
110/// Per-hostname state tracked across ticks.
111#[derive(Debug, Clone, Default)]
112struct HostState {
113 /// Previous canonicalized target list (sorted, deduped).
114 previous_targets: Vec<String>,
115 /// Previous canonical digest, or `"empty"` on first observation.
116 previous_digest: String,
117 /// Wall-clock time of the last *successful* refresh — used to enforce
118 /// `min_ttl_seconds` (floor) and `max_stale_seconds` (ceiling).
119 last_refresh_at: Option<SystemTime>,
120 /// TTL the upstream returned for the last successful answer; `0` when
121 /// unknown (`to_socket_addrs` does not surface DNS TTL today).
122 last_ttl_seconds: u32,
123}
124
125/// Persistent state for [`ResolverRefresh::tick`] — one entry per hostname.
126#[derive(Debug, Default)]
127pub struct ResolverState {
128 hosts: HashMap<String, HostState>,
129}
130
131impl ResolverState {
132 /// Create an empty resolver state. Callers reuse a single instance across
133 /// ticks for a given cell so prior observations persist.
134 pub fn new() -> Self {
135 Self::default()
136 }
137
138 /// Number of hostnames currently tracked. Test affordance.
139 pub fn len(&self) -> usize {
140 self.hosts.len()
141 }
142
143 /// Convenience: is there any tracked state.
144 pub fn is_empty(&self) -> bool {
145 self.hosts.is_empty()
146 }
147}
148
149/// One refresh-tick execution.
150///
151/// Borrows the spec-side configuration (no allocation), runs each declared
152/// hostname through the injected resolver, and produces drift CloudEvents the
153/// caller publishes via the configured event sink.
154pub struct ResolverRefresh<'a> {
155 /// Refresh policy from `spec.authority.dnsAuthority.refreshPolicy`.
156 /// `None` → defaults: no floor, no ceiling, `ttl-honor` strategy.
157 pub policy: Option<&'a DnsRefreshPolicy>,
158 /// SEC-21 Phase 3e — DNS rebinding mitigation policy from
159 /// `spec.authority.dnsAuthority.rebindingPolicy`. `None` → no
160 /// per-hostname response-IP tracking. Combined with the P3a TTL floor
161 /// (`policy.min_ttl_seconds`), this structurally closes the v0.4.0
162 /// rebinding residual.
163 pub rebinding_policy: Option<&'a DnsRebindingPolicy>,
164 /// Declared resolvers from `spec.authority.dnsAuthority.resolvers[]`.
165 /// The first entry's `resolverId` is used as the event's `resolverId`
166 /// when present; the empty string is used when none are declared.
167 pub resolvers: &'a [DnsResolver],
168 /// Hostnames the supervisor may refresh — typically derived from
169 /// `spec.authority.egressRules[].host` ∪
170 /// `spec.authority.dnsAuthority.hostnameAllowlist`.
171 pub hostnames: &'a [String],
172 /// Optional `keysetId` to bind into emitted events.
173 pub keyset_id: Option<&'a str>,
174 /// Optional `issuerKid` to bind into emitted events.
175 pub issuer_kid: Option<&'a str>,
176 /// Optional policy-bundle digest (`sha256:<hex>`) to bind into emitted events.
177 pub policy_digest: Option<&'a str>,
178 /// Optional pass-through correlation id.
179 pub correlation_id: Option<&'a str>,
180 /// CloudEvent `source` field — defaults to `cellos-supervisor` when the
181 /// caller passes `None`.
182 pub source: Option<&'a str>,
183 /// SEC-21 Phase 3h — opt-in DNSSEC validation policy. `None` (default)
184 /// preserves P3a/P3e behaviour exactly: no validation, no
185 /// `dns_authority_dnssec_failed` events, no `dnssec_status` tagging on
186 /// drift events. When `Some`, the [`Self::tick_with_dnssec`] method
187 /// honours `validate` / `failClosed` per resolver — see method docs.
188 pub dnssec_policy: Option<&'a DnsResolverDnssecPolicy>,
189 /// Trust anchors loaded from the env / spec / IANA-default precedence
190 /// chain. Used purely for stamping the source descriptor into emitted
191 /// events; hickory 0.24's resolver does not accept custom anchors via
192 /// public API. See [`super::dnssec`] module docs for the residual.
193 pub trust_anchors: Option<&'a TrustAnchors>,
194}
195
196impl<'a> ResolverRefresh<'a> {
197 /// Run one refresh tick.
198 ///
199 /// For each declared hostname:
200 ///
201 /// - If the strategy is `manual`, skip (operator must opt in).
202 /// - If `min_ttl_seconds` says we refreshed recently, skip — *unless*
203 /// `max_stale_seconds` has been exceeded (ceiling overrides floor).
204 /// - Call the injected resolver. On error: leave prior state untouched and
205 /// emit no event (transient failures are not drift).
206 /// - Canonicalize, hash, compare against prior digest.
207 /// - On change, build a [`DnsAuthorityDrift`] payload, wrap it in a
208 /// CloudEvent envelope, and append to the returned vector.
209 /// - Update `state` with the new observation.
210 ///
211 /// Returns the events the caller should publish — possibly empty.
212 pub fn tick(
213 &self,
214 state: &mut ResolverState,
215 resolver: &ResolverFn<'_>,
216 now: SystemTime,
217 cell_id: &str,
218 run_id: &str,
219 ) -> Vec<CloudEventV1> {
220 // Backward-compat: callers that don't track rebinding state pass
221 // through to the rebinding-aware variant with a throwaway state.
222 // The throwaway is fine because, when `rebinding_policy is None`,
223 // the rebinding-aware path emits no extra events and the
224 // throwaway state never has anything committed to it.
225 let mut throwaway = RebindingState::new();
226 self.tick_with_rebinding(state, &mut throwaway, resolver, now, cell_id, run_id)
227 }
228
229 /// SEC-21 Phase 3e variant of [`Self::tick`] that also threads a
230 /// per-cell [`RebindingState`] for DNS rebinding mitigation.
231 ///
232 /// Behaves identically to `tick` when `self.rebinding_policy` is
233 /// `None`: emits only `dns_authority_drift` events, never touches
234 /// `rebinding_state`. When `rebinding_policy` is `Some`:
235 ///
236 /// 1. Resolves every hostname (same as `tick`).
237 /// 2. For each successful resolution, calls
238 /// [`RebindingState::evaluate`] to decide which IPs are novel,
239 /// whether the per-hostname cap is exceeded, and which IPs violate
240 /// the operator allowlist.
241 /// 3. Emits one `dns_authority_rebind_threshold` per hostname when
242 /// the cap is exceeded, and one `dns_authority_rebind_rejected`
243 /// per allowlist violation.
244 /// 4. Stamps the EFFECTIVE targets (post-rejection when
245 /// `reject_on_rebind=true`) into the subsequent
246 /// `dns_authority_drift` event so the drift digest reflects what
247 /// the workload actually resolves to.
248 /// 5. Calls [`RebindingState::commit`] with the effective targets so
249 /// the per-cell history persists across ticks.
250 pub fn tick_with_rebinding(
251 &self,
252 state: &mut ResolverState,
253 rebinding_state: &mut RebindingState,
254 resolver: &ResolverFn<'_>,
255 now: SystemTime,
256 cell_id: &str,
257 run_id: &str,
258 ) -> Vec<CloudEventV1> {
259 let mut out: Vec<CloudEventV1> = Vec::new();
260
261 // Manual strategy: caller must opt in explicitly via a different code
262 // path (not yet implemented). For Phase 1 we treat manual as "no-op".
263 let strategy = self
264 .policy
265 .and_then(|p| p.strategy)
266 .unwrap_or(DnsRefreshStrategy::TtlHonor);
267 if matches!(strategy, DnsRefreshStrategy::Manual) {
268 return out;
269 }
270
271 let min_ttl = self.policy.and_then(|p| p.min_ttl_seconds).unwrap_or(0);
272 let max_stale = self.policy.and_then(|p| p.max_stale_seconds);
273
274 let resolver_id_owned: String = self
275 .resolvers
276 .first()
277 .map(|r| r.resolver_id.clone())
278 .unwrap_or_default();
279
280 let observed_at_rfc3339 = system_time_to_rfc3339(now);
281
282 for hostname in self.hostnames {
283 let prior = state.hosts.get(hostname).cloned().unwrap_or_default();
284
285 // Floor (min_ttl_seconds) — skip refresh when we last looked up
286 // this hostname less than `min_ttl_seconds` ago.
287 //
288 // Ceiling (max_stale_seconds) — *override* the floor: if the prior
289 // answer is older than max_stale, refresh regardless.
290 if let Some(last) = prior.last_refresh_at {
291 let age = now.duration_since(last).unwrap_or_default();
292 let age_secs = age.as_secs();
293 let floor_active = age_secs < u64::from(min_ttl);
294 let ceiling_breached = max_stale.is_some_and(|ms| age_secs >= u64::from(ms));
295 if floor_active && !ceiling_breached {
296 // Within the floor and not past the ceiling — skip.
297 continue;
298 }
299 }
300
301 let answer = match resolver(hostname) {
302 Ok(answer) => answer,
303 Err(_) => {
304 // Transient resolver failure — do not emit drift, do not
305 // update prior state, do not bump `last_refresh_at`.
306 continue;
307 }
308 };
309
310 let canonical_targets = canonicalize_targets(&answer.targets);
311 let previous_digest = if prior.previous_digest.is_empty() {
312 "empty".to_string()
313 } else {
314 prior.previous_digest.clone()
315 };
316
317 // scope: DNS rebinding mitigation. When the operator has
318 // declared a `rebindingPolicy`, evaluate the canonical targets
319 // against the per-cell rebinding state BEFORE digest / drift
320 // emission. The decision's `effective_targets` replaces
321 // `canonical_targets` for everything downstream — the digest,
322 // the drift event payload, and the persisted state — so the
323 // operator-visible `dns_authority_drift` reflects what the
324 // workload actually resolves to (and so a `reject_on_rebind`
325 // policy actually withholds attacker IPs from the workload's
326 // view).
327 //
328 // When `rebinding_policy is None`, the decision is a no-op:
329 // effective_targets = canonical_targets, no events fire.
330 let (current_targets, rebind_events) = match self.rebinding_policy {
331 Some(rb_policy) => {
332 let decision =
333 rebinding_state.evaluate(hostname, &canonical_targets, rb_policy);
334 let mut events: Vec<CloudEventV1> = Vec::new();
335
336 if decision.threshold_exceeded {
337 // One threshold event per hostname per tick. We
338 // pick the FIRST novel IP as `novelIp` for SIEM
339 // ergonomics (the cumulative count + max give the
340 // operator everything they need to triage).
341 if let Some(first_novel) = decision.novel_ips.first() {
342 let prior_count = rebinding_state.history(hostname).len() as u32;
343 let cumulative =
344 prior_count.saturating_add(decision.novel_ips.len() as u32);
345 let payload = DnsAuthorityRebindThreshold {
346 schema_version: "1.0.0".into(),
347 cell_id: cell_id.to_string(),
348 run_id: run_id.to_string(),
349 hostname: hostname.clone(),
350 novel_ip: (*first_novel).to_string(),
351 previous_ip_count: prior_count,
352 cumulative_ip_count: cumulative,
353 max_novel_ips_per_hostname: rb_policy.max_novel_ips_per_hostname,
354 policy_digest: self
355 .policy_digest
356 .map(str::to_string)
357 .unwrap_or_else(empty_policy_digest),
358 keyset_id: self.keyset_id.map(str::to_string),
359 issuer_kid: self.issuer_kid.map(str::to_string),
360 correlation_id: self.correlation_id.map(str::to_string),
361 resolver_id: if resolver_id_owned.is_empty() {
362 None
363 } else {
364 Some(resolver_id_owned.clone())
365 },
366 observed_at: observed_at_rfc3339.clone(),
367 };
368 let source = self.source.unwrap_or("cellos-supervisor");
369 if let Ok(ev) = cloud_event_v1_dns_authority_rebind_threshold(
370 source,
371 &observed_at_rfc3339,
372 &payload,
373 ) {
374 events.push(ev);
375 }
376 }
377 }
378
379 if !decision.allowlist_violations.is_empty() {
380 // One rejected event per offending IP. We pre-build
381 // the per-hostname allowlist echo once.
382 let echo: Vec<String> = rb_policy
383 .response_ip_allowlist
384 .iter()
385 .filter(|raw| {
386 raw.split_once(':')
387 .is_some_and(|(prefix, _)| prefix == hostname)
388 })
389 .cloned()
390 .collect();
391 let prior_count = rebinding_state.history(hostname).len() as u32;
392 let cumulative =
393 prior_count.saturating_add(decision.novel_ips.len() as u32);
394 for &offending in &decision.allowlist_violations {
395 let payload = DnsAuthorityRebindRejected {
396 schema_version: "1.0.0".into(),
397 cell_id: cell_id.to_string(),
398 run_id: run_id.to_string(),
399 hostname: hostname.clone(),
400 novel_ip: offending.to_string(),
401 previous_ip_count: prior_count,
402 cumulative_ip_count: cumulative,
403 response_ip_allowlist: echo.clone(),
404 policy_digest: self
405 .policy_digest
406 .map(str::to_string)
407 .unwrap_or_else(empty_policy_digest),
408 keyset_id: self.keyset_id.map(str::to_string),
409 issuer_kid: self.issuer_kid.map(str::to_string),
410 correlation_id: self.correlation_id.map(str::to_string),
411 resolver_id: if resolver_id_owned.is_empty() {
412 None
413 } else {
414 Some(resolver_id_owned.clone())
415 },
416 observed_at: observed_at_rfc3339.clone(),
417 };
418 let source = self.source.unwrap_or("cellos-supervisor");
419 if let Ok(ev) = cloud_event_v1_dns_authority_rebind_rejected(
420 source,
421 &observed_at_rfc3339,
422 &payload,
423 ) {
424 events.push(ev);
425 }
426 }
427 }
428
429 // Use the effective (post-rejection) targets for the
430 // drift event + state commit.
431 let effective = canonicalize_targets(&decision.effective_targets);
432 (effective, events)
433 }
434 None => (canonical_targets, Vec::new()),
435 };
436
437 let current_digest = digest_target_set(¤t_targets);
438
439 // Emit the rebinding events BEFORE the drift event so a SIEM
440 // sees them in causal order ("threshold/rejected → drift").
441 for ev in rebind_events {
442 out.push(ev);
443 }
444
445 // scope: DNS-rebinding-mitigation floor. `refreshPolicy.minTtlSeconds`
446 // doubles as a *clamp* on the upstream TTL we record: an operator
447 // who has explicitly set a positive floor refuses to honour
448 // sub-floor TTLs (typical fast-flux indicator). RFC 1035 §3.2.1
449 // permits resolvers to clamp; the cellos floor is conservative
450 // (it only raises sub-floor TTLs, never lowers above-floor ones).
451 // `min_ttl == 0` is the "no floor" sentinel — pass through verbatim.
452 let clamped_ttl: u32 = if min_ttl > 0 && answer.ttl_seconds < min_ttl {
453 min_ttl
454 } else {
455 answer.ttl_seconds
456 };
457
458 // Stale window observed: how long was the prior answer served past
459 // its TTL before this refresh fired? Phase 3: now uses the *real*
460 // prior TTL (was always 0 in Phase 1, making this trivially 0).
461 let stale_seconds: u32 = match (prior.last_refresh_at, prior.last_ttl_seconds) {
462 (Some(last), ttl) if ttl > 0 => {
463 let age = now.duration_since(last).unwrap_or_default().as_secs();
464 age.saturating_sub(u64::from(ttl)).min(u32::MAX as u64) as u32
465 }
466 _ => 0,
467 };
468
469 // Only emit a drift event when the digest actually changed.
470 if current_digest != previous_digest {
471 let prev_set: BTreeSet<&String> = prior.previous_targets.iter().collect();
472 let curr_set: BTreeSet<&String> = current_targets.iter().collect();
473 let added: Vec<String> = curr_set
474 .difference(&prev_set)
475 .map(|s| (*s).clone())
476 .collect();
477 let removed: Vec<String> = prev_set
478 .difference(&curr_set)
479 .map(|s| (*s).clone())
480 .collect();
481
482 let drift = DnsAuthorityDrift {
483 schema_version: "1.0.0".into(),
484 cell_id: cell_id.to_string(),
485 run_id: run_id.to_string(),
486 resolver_id: resolver_id_owned.clone(),
487 hostname: hostname.clone(),
488 previous_targets: prior.previous_targets.clone(),
489 current_targets: current_targets.clone(),
490 added_targets: added,
491 removed_targets: removed,
492 previous_digest,
493 current_digest: current_digest.clone(),
494 // scope: real upstream TTL via hickory-resolver, clamped
495 // to `refreshPolicy.minTtlSeconds` when the operator has
496 // declared a floor (DNS-rebinding fast-flux mitigation).
497 ttl_seconds: clamped_ttl,
498 stale_seconds,
499 keyset_id: self.keyset_id.map(str::to_string),
500 issuer_kid: self.issuer_kid.map(str::to_string),
501 policy_digest: self.policy_digest.map(str::to_string),
502 correlation_id: self.correlation_id.map(str::to_string),
503 // SEC-21 Phase 3h — `tick_with_rebinding` (the
504 // pre-DNSSEC path) leaves dnssec_status as `None`
505 // so the schema treats the field as omitted. The
506 // dnssec-aware sibling `tick_with_dnssec` populates
507 // it with one of the four enum literals.
508 dnssec_status: None,
509 observed_at: observed_at_rfc3339.clone(),
510 };
511
512 let source = self.source.unwrap_or("cellos-supervisor");
513 match cloud_event_v1_dns_authority_drift(source, &observed_at_rfc3339, &drift) {
514 Ok(ev) => out.push(ev),
515 Err(e) => {
516 // Emit-side serialization failure is reported via tracing
517 // by the caller; the resolver-refresh module is pure
518 // data + has no logger. Drop the event silently here.
519 let _ = e;
520 }
521 }
522 }
523
524 // Always update state on a successful resolution — even when the
525 // digest matched — so `last_refresh_at` reflects the latest probe.
526 // We persist the *clamped* TTL so the next stale-window calc uses
527 // the floor-honouring value, matching the event we just emitted.
528 state.hosts.insert(
529 hostname.clone(),
530 HostState {
531 previous_targets: current_targets.clone(),
532 previous_digest: current_digest,
533 last_refresh_at: Some(now),
534 last_ttl_seconds: clamped_ttl,
535 },
536 );
537
538 // scope: commit the per-hostname rebinding observation.
539 // Takes the EFFECTIVE targets (post-rejection in enforce mode)
540 // so the cumulative history reflects what the workload saw.
541 // No-op when `rebinding_policy is None`.
542 if self.rebinding_policy.is_some() {
543 rebinding_state.commit(hostname, ¤t_targets);
544 }
545 }
546
547 out
548 }
549}
550
551/// SEC-21 Phase 3h — `dnssec_status` enum literals for the
552/// `dns_authority_drift` event payload. Centralized so the ticker tests
553/// can match on the exact string the schema expects without re-typing it.
554pub const DNSSEC_STATUS_VALIDATED: &str = "validated";
555pub const DNSSEC_STATUS_VALIDATION_FAILED: &str = "validation_failed";
556pub const DNSSEC_STATUS_UNSIGNED: &str = "unsigned";
557pub const DNSSEC_STATUS_NOT_ATTEMPTED: &str = "not_attempted";
558
559impl<'a> ResolverRefresh<'a> {
560 /// SEC-21 Phase 3h variant of [`Self::tick_with_rebinding`] that
561 /// also processes DNSSEC validation outcomes per hostname.
562 ///
563 /// Behaviour:
564 ///
565 /// - When `self.dnssec_policy` is `None`, this method behaves exactly
566 /// like [`Self::tick_with_rebinding`] except it pulls per-hostname
567 /// answers from `validated_resolved` and stamps
568 /// `dnssec_status: not_attempted` on every emitted drift event.
569 /// - When `self.dnssec_policy` is `Some(policy)`:
570 /// - `Validated` → `dnssec_status: validated`, normal flow.
571 /// - `Failed{reason}` → emit `dns_authority_dnssec_failed`
572 /// (reason: `validation_failed`); when `policy.fail_closed`,
573 /// drop `answer.targets` to empty BEFORE the rebinding
574 /// evaluator runs and tag drift `dnssec_status: validation_failed`.
575 /// - `Unsigned` → emit `dns_authority_dnssec_failed`
576 /// (reason: `unsigned_zone`); same `fail_closed` semantics.
577 ///
578 /// Order: dnssec_failed events are emitted BEFORE the drift /
579 /// rebinding events for that hostname so a SIEM sees them in
580 /// causal order ("dnssec_failed → drift").
581 pub fn tick_with_dnssec(
582 &self,
583 state: &mut ResolverState,
584 rebinding_state: &mut RebindingState,
585 validated_resolved: &std::collections::HashMap<String, io::Result<ValidatedResolvedAnswer>>,
586 now: SystemTime,
587 cell_id: &str,
588 run_id: &str,
589 ) -> Vec<CloudEventV1> {
590 let mut out: Vec<CloudEventV1> = Vec::new();
591
592 let strategy = self
593 .policy
594 .and_then(|p| p.strategy)
595 .unwrap_or(DnsRefreshStrategy::TtlHonor);
596 if matches!(strategy, DnsRefreshStrategy::Manual) {
597 return out;
598 }
599
600 let min_ttl = self.policy.and_then(|p| p.min_ttl_seconds).unwrap_or(0);
601 let max_stale = self.policy.and_then(|p| p.max_stale_seconds);
602
603 let resolver_id_owned: String = self
604 .resolvers
605 .first()
606 .map(|r| r.resolver_id.clone())
607 .unwrap_or_default();
608 let observed_at_rfc3339 = system_time_to_rfc3339(now);
609 let trust_anchor_source: String = self
610 .trust_anchors
611 .map(|ta| ta.source.clone())
612 .unwrap_or_else(|| dnssec::TRUST_ANCHOR_SOURCE_IANA_DEFAULT.to_string());
613 let policy_active = self.dnssec_policy.is_some();
614 let fail_closed = self.dnssec_policy.map(|p| p.fail_closed).unwrap_or(false);
615
616 for hostname in self.hostnames {
617 let prior = state.hosts.get(hostname).cloned().unwrap_or_default();
618
619 // Floor / ceiling — same predicate as the non-DNSSEC path.
620 if let Some(last) = prior.last_refresh_at {
621 let age = now.duration_since(last).unwrap_or_default();
622 let age_secs = age.as_secs();
623 let floor_active = age_secs < u64::from(min_ttl);
624 let ceiling_breached = max_stale.is_some_and(|ms| age_secs >= u64::from(ms));
625 if floor_active && !ceiling_breached {
626 continue;
627 }
628 }
629
630 let validated = match validated_resolved.get(hostname) {
631 Some(Ok(v)) => v.clone(),
632 Some(Err(_)) | None => {
633 // Transient resolver failure or missing entry — same
634 // semantics as the unvalidated path: no drift, no
635 // state mutation, no DNSSEC event (can't tell what
636 // went wrong).
637 continue;
638 }
639 };
640
641 // Decide the dnssec_status string + per-hostname effective
642 // answer (drop on fail_closed) + whether to emit a
643 // dns_authority_dnssec_failed event.
644 let (dnssec_status, dnssec_event, effective_answer): (
645 &'static str,
646 Option<CloudEventV1>,
647 ResolvedAnswer,
648 ) = match (&validated.validation, policy_active) {
649 (DnssecValidationResult::Validated { .. }, true) => {
650 (DNSSEC_STATUS_VALIDATED, None, validated.answer.clone())
651 }
652 (DnssecValidationResult::Failed { reason }, true) => {
653 let payload = DnsAuthorityDnssecFailed {
654 schema_version: "1.0.0".into(),
655 cell_id: cell_id.to_string(),
656 run_id: run_id.to_string(),
657 resolver_id: resolver_id_owned.clone(),
658 hostname: hostname.clone(),
659 reason: DnsAuthorityDnssecFailureReason::ValidationFailed
660 .as_str()
661 .to_string(),
662 fail_closed,
663 trust_anchor_source: trust_anchor_source.clone(),
664 policy_digest: self.policy_digest.map(str::to_string),
665 keyset_id: self.keyset_id.map(str::to_string),
666 issuer_kid: self.issuer_kid.map(str::to_string),
667 correlation_id: self.correlation_id.map(str::to_string),
668 // SEC-21 Phase 3h.1 — additive `source` field. The
669 // resolver-refresh path always stamps this literal.
670 // The dataplane (in-netns DNS proxy) emits the same
671 // event with `source = "dataplane"` from
672 // `dns_proxy::dnssec`.
673 source: Some("resolver_refresh".into()),
674 observed_at: observed_at_rfc3339.clone(),
675 };
676 let _ = reason; // reason is logged by the caller via tracing if needed
677 let source = self.source.unwrap_or("cellos-supervisor");
678 let event = cloud_event_v1_dns_authority_dnssec_failed(
679 source,
680 &observed_at_rfc3339,
681 &payload,
682 )
683 .ok();
684 let answer = if fail_closed {
685 ResolvedAnswer {
686 targets: Vec::new(),
687 ttl_seconds: validated.answer.ttl_seconds,
688 resolver_addr: validated.answer.resolver_addr,
689 }
690 } else {
691 validated.answer.clone()
692 };
693 (DNSSEC_STATUS_VALIDATION_FAILED, event, answer)
694 }
695 (DnssecValidationResult::Unsigned, true) => {
696 let payload = DnsAuthorityDnssecFailed {
697 schema_version: "1.0.0".into(),
698 cell_id: cell_id.to_string(),
699 run_id: run_id.to_string(),
700 resolver_id: resolver_id_owned.clone(),
701 hostname: hostname.clone(),
702 reason: DnsAuthorityDnssecFailureReason::UnsignedZone
703 .as_str()
704 .to_string(),
705 fail_closed,
706 trust_anchor_source: trust_anchor_source.clone(),
707 policy_digest: self.policy_digest.map(str::to_string),
708 keyset_id: self.keyset_id.map(str::to_string),
709 issuer_kid: self.issuer_kid.map(str::to_string),
710 correlation_id: self.correlation_id.map(str::to_string),
711 // SEC-21 Phase 3h.1 — additive `source` field; see
712 // matching comment on the Failed-arm above.
713 source: Some("resolver_refresh".into()),
714 observed_at: observed_at_rfc3339.clone(),
715 };
716 let source = self.source.unwrap_or("cellos-supervisor");
717 let event = cloud_event_v1_dns_authority_dnssec_failed(
718 source,
719 &observed_at_rfc3339,
720 &payload,
721 )
722 .ok();
723 let answer = if fail_closed {
724 ResolvedAnswer {
725 targets: Vec::new(),
726 ttl_seconds: validated.answer.ttl_seconds,
727 resolver_addr: validated.answer.resolver_addr,
728 }
729 } else {
730 validated.answer.clone()
731 };
732 (DNSSEC_STATUS_UNSIGNED, event, answer)
733 }
734 // policy off → status:not_attempted, no event, pass-through answer.
735 (_, false) => (DNSSEC_STATUS_NOT_ATTEMPTED, None, validated.answer.clone()),
736 };
737
738 // Emit dnssec_failed BEFORE the drift event so SIEM sees
739 // causal order. When `dnssec_event` is None this is a no-op.
740 if let Some(ev) = dnssec_event {
741 out.push(ev);
742 }
743
744 // From here, the path mirrors `tick_with_rebinding` exactly,
745 // operating on `effective_answer` (the post-DNSSEC-policy
746 // ResolvedAnswer).
747 let canonical_targets = canonicalize_targets(&effective_answer.targets);
748 let previous_digest = if prior.previous_digest.is_empty() {
749 "empty".to_string()
750 } else {
751 prior.previous_digest.clone()
752 };
753
754 let (current_targets, rebind_events) = match self.rebinding_policy {
755 Some(rb_policy) => {
756 let decision =
757 rebinding_state.evaluate(hostname, &canonical_targets, rb_policy);
758 let mut events: Vec<CloudEventV1> = Vec::new();
759 if decision.threshold_exceeded {
760 if let Some(first_novel) = decision.novel_ips.first() {
761 let prior_count = rebinding_state.history(hostname).len() as u32;
762 let cumulative =
763 prior_count.saturating_add(decision.novel_ips.len() as u32);
764 let payload = DnsAuthorityRebindThreshold {
765 schema_version: "1.0.0".into(),
766 cell_id: cell_id.to_string(),
767 run_id: run_id.to_string(),
768 hostname: hostname.clone(),
769 novel_ip: (*first_novel).to_string(),
770 previous_ip_count: prior_count,
771 cumulative_ip_count: cumulative,
772 max_novel_ips_per_hostname: rb_policy.max_novel_ips_per_hostname,
773 policy_digest: self
774 .policy_digest
775 .map(str::to_string)
776 .unwrap_or_else(empty_policy_digest),
777 keyset_id: self.keyset_id.map(str::to_string),
778 issuer_kid: self.issuer_kid.map(str::to_string),
779 correlation_id: self.correlation_id.map(str::to_string),
780 resolver_id: if resolver_id_owned.is_empty() {
781 None
782 } else {
783 Some(resolver_id_owned.clone())
784 },
785 observed_at: observed_at_rfc3339.clone(),
786 };
787 let source = self.source.unwrap_or("cellos-supervisor");
788 if let Ok(ev) = cloud_event_v1_dns_authority_rebind_threshold(
789 source,
790 &observed_at_rfc3339,
791 &payload,
792 ) {
793 events.push(ev);
794 }
795 }
796 }
797 if !decision.allowlist_violations.is_empty() {
798 let echo: Vec<String> = rb_policy
799 .response_ip_allowlist
800 .iter()
801 .filter(|raw| {
802 raw.split_once(':')
803 .is_some_and(|(prefix, _)| prefix == hostname)
804 })
805 .cloned()
806 .collect();
807 let prior_count = rebinding_state.history(hostname).len() as u32;
808 let cumulative =
809 prior_count.saturating_add(decision.novel_ips.len() as u32);
810 for &offending in &decision.allowlist_violations {
811 let payload = DnsAuthorityRebindRejected {
812 schema_version: "1.0.0".into(),
813 cell_id: cell_id.to_string(),
814 run_id: run_id.to_string(),
815 hostname: hostname.clone(),
816 novel_ip: offending.to_string(),
817 previous_ip_count: prior_count,
818 cumulative_ip_count: cumulative,
819 response_ip_allowlist: echo.clone(),
820 policy_digest: self
821 .policy_digest
822 .map(str::to_string)
823 .unwrap_or_else(empty_policy_digest),
824 keyset_id: self.keyset_id.map(str::to_string),
825 issuer_kid: self.issuer_kid.map(str::to_string),
826 correlation_id: self.correlation_id.map(str::to_string),
827 resolver_id: if resolver_id_owned.is_empty() {
828 None
829 } else {
830 Some(resolver_id_owned.clone())
831 },
832 observed_at: observed_at_rfc3339.clone(),
833 };
834 let source = self.source.unwrap_or("cellos-supervisor");
835 if let Ok(ev) = cloud_event_v1_dns_authority_rebind_rejected(
836 source,
837 &observed_at_rfc3339,
838 &payload,
839 ) {
840 events.push(ev);
841 }
842 }
843 }
844 let effective = canonicalize_targets(&decision.effective_targets);
845 (effective, events)
846 }
847 None => (canonical_targets, Vec::new()),
848 };
849
850 let current_digest = digest_target_set(¤t_targets);
851
852 for ev in rebind_events {
853 out.push(ev);
854 }
855
856 let clamped_ttl: u32 = if min_ttl > 0 && effective_answer.ttl_seconds < min_ttl {
857 min_ttl
858 } else {
859 effective_answer.ttl_seconds
860 };
861
862 let stale_seconds: u32 = match (prior.last_refresh_at, prior.last_ttl_seconds) {
863 (Some(last), ttl) if ttl > 0 => {
864 let age = now.duration_since(last).unwrap_or_default().as_secs();
865 age.saturating_sub(u64::from(ttl)).min(u32::MAX as u64) as u32
866 }
867 _ => 0,
868 };
869
870 if current_digest != previous_digest {
871 let prev_set: BTreeSet<&String> = prior.previous_targets.iter().collect();
872 let curr_set: BTreeSet<&String> = current_targets.iter().collect();
873 let added: Vec<String> = curr_set
874 .difference(&prev_set)
875 .map(|s| (*s).clone())
876 .collect();
877 let removed: Vec<String> = prev_set
878 .difference(&curr_set)
879 .map(|s| (*s).clone())
880 .collect();
881
882 let drift = DnsAuthorityDrift {
883 schema_version: "1.0.0".into(),
884 cell_id: cell_id.to_string(),
885 run_id: run_id.to_string(),
886 resolver_id: resolver_id_owned.clone(),
887 hostname: hostname.clone(),
888 previous_targets: prior.previous_targets.clone(),
889 current_targets: current_targets.clone(),
890 added_targets: added,
891 removed_targets: removed,
892 previous_digest,
893 current_digest: current_digest.clone(),
894 ttl_seconds: clamped_ttl,
895 stale_seconds,
896 keyset_id: self.keyset_id.map(str::to_string),
897 issuer_kid: self.issuer_kid.map(str::to_string),
898 policy_digest: self.policy_digest.map(str::to_string),
899 correlation_id: self.correlation_id.map(str::to_string),
900 dnssec_status: Some(dnssec_status.to_string()),
901 observed_at: observed_at_rfc3339.clone(),
902 };
903
904 let source = self.source.unwrap_or("cellos-supervisor");
905 if let Ok(ev) =
906 cloud_event_v1_dns_authority_drift(source, &observed_at_rfc3339, &drift)
907 {
908 out.push(ev);
909 }
910 }
911
912 state.hosts.insert(
913 hostname.clone(),
914 HostState {
915 previous_targets: current_targets.clone(),
916 previous_digest: current_digest,
917 last_refresh_at: Some(now),
918 last_ttl_seconds: clamped_ttl,
919 },
920 );
921
922 if self.rebinding_policy.is_some() {
923 rebinding_state.commit(hostname, ¤t_targets);
924 }
925 }
926
927 out
928 }
929}
930
931/// Sentinel `policyDigest` used in the SEC-21 Phase 3e rebinding events
932/// when the caller didn't supply one. The schema requires a digest, so we
933/// stamp the deterministic empty-string digest rather than failing the
934/// emission. Operators who want the real digest must wire
935/// `policy_digest` into [`ResolverRefresh`] (every production call site
936/// already does — this fallback only fires in unit tests that omit it).
937fn empty_policy_digest() -> String {
938 // sha256("") — `e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855`
939 "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".to_string()
940}
941
942/// Canonicalize a target set: trim whitespace, drop empties, dedupe, sort.
943fn canonicalize_targets(raw: &[String]) -> Vec<String> {
944 let mut set: BTreeSet<String> = BTreeSet::new();
945 for t in raw {
946 let trimmed = t.trim();
947 if !trimmed.is_empty() {
948 set.insert(trimmed.to_string());
949 }
950 }
951 set.into_iter().collect()
952}
953
954/// Compute `sha256:<hex>` over the canonicalized target list joined with `\n`.
955/// `[]` (empty set) hashes deterministically — every empty set produces the
956/// same digest.
957fn digest_target_set(canonical: &[String]) -> String {
958 let mut hasher = Sha256::new();
959 for (i, t) in canonical.iter().enumerate() {
960 if i > 0 {
961 hasher.update(b"\n");
962 }
963 hasher.update(t.as_bytes());
964 }
965 let bytes = hasher.finalize();
966 let mut out = String::with_capacity(7 + 64);
967 out.push_str("sha256:");
968 for b in bytes {
969 use std::fmt::Write;
970 let _ = write!(out, "{b:02x}");
971 }
972 out
973}
974
975/// Convert a `SystemTime` to an RFC3339 string. Avoids pulling chrono into the
976/// pure-data section of the module — the supervisor passes its own
977/// chrono-based timestamp through `now` for production paths.
978fn system_time_to_rfc3339(t: SystemTime) -> String {
979 let secs = t
980 .duration_since(UNIX_EPOCH)
981 .unwrap_or(Duration::ZERO)
982 .as_secs() as i64;
983 let dt =
984 chrono::DateTime::<chrono::Utc>::from_timestamp(secs, 0).unwrap_or_else(chrono::Utc::now);
985 dt.to_rfc3339()
986}
987
988#[cfg(test)]
989mod tests {
990 use super::*;
991 use std::cell::RefCell;
992 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
993 use std::time::Duration;
994
995 /// Test helper — wrap a `Vec<String>` answer in the Phase 3 [`ResolvedAnswer`]
996 /// shape. The pre-Phase-3 tests didn't care about TTL, so we default to 0
997 /// here and let the Phase 3 ticker tests build their own answers with
998 /// real TTLs.
999 fn answer_zero_ttl(targets: Vec<String>) -> ResolvedAnswer {
1000 ResolvedAnswer {
1001 targets,
1002 ttl_seconds: 0,
1003 resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
1004 }
1005 }
1006
1007 fn fixed_resolvers() -> Vec<DnsResolver> {
1008 vec![DnsResolver {
1009 resolver_id: "resolver-test-001".into(),
1010 endpoint: "1.1.1.1:53".into(),
1011 protocol: cellos_core::DnsResolverProtocol::Do53Udp,
1012 trust_kid: None,
1013 dnssec: None,
1014 }]
1015 }
1016
1017 fn make<'a>(
1018 policy: Option<&'a DnsRefreshPolicy>,
1019 resolvers: &'a [DnsResolver],
1020 hostnames: &'a [String],
1021 ) -> ResolverRefresh<'a> {
1022 ResolverRefresh {
1023 policy,
1024 rebinding_policy: None,
1025 resolvers,
1026 hostnames,
1027 keyset_id: Some("keyset-test-001"),
1028 issuer_kid: Some("kid-test-001"),
1029 policy_digest: None,
1030 correlation_id: None,
1031 source: Some("cellos-supervisor-test"),
1032 // SEC-21 Phase 3h test default — DNSSEC off so the existing
1033 // P3a tests (~17 of them, untouched) keep their original
1034 // semantics. The DNSSEC-specific tests below build their
1035 // own ResolverRefresh with the policy populated.
1036 dnssec_policy: None,
1037 trust_anchors: None,
1038 }
1039 }
1040
1041 #[test]
1042 fn canonicalize_dedupes_and_sorts() {
1043 let raw: Vec<String> = vec!["1.0.0.1".into(), "1.1.1.1".into(), "1.0.0.1".into()];
1044 let canon = canonicalize_targets(&raw);
1045 assert_eq!(canon, vec!["1.0.0.1".to_string(), "1.1.1.1".to_string()]);
1046 }
1047
1048 #[test]
1049 fn digest_changes_when_targets_change() {
1050 let a = canonicalize_targets(&["1.1.1.1".to_string()]);
1051 let b = canonicalize_targets(&["1.0.0.1".to_string()]);
1052 assert_ne!(digest_target_set(&a), digest_target_set(&b));
1053 }
1054
1055 #[test]
1056 fn first_observation_emits_drift_with_empty_previous() {
1057 let hostnames = vec!["api.example.com".to_string()];
1058 let resolvers = fixed_resolvers();
1059 let refresher = make(None, &resolvers, &hostnames);
1060 let mut state = ResolverState::new();
1061 let resolver = |_h: &str| Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]));
1062 let events = refresher.tick(
1063 &mut state,
1064 &resolver,
1065 SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
1066 "cell-001",
1067 "run-001",
1068 );
1069 assert_eq!(events.len(), 1, "first observation must emit drift");
1070 let data = events[0].data.as_ref().expect("data present");
1071 assert_eq!(data["previousDigest"], "empty");
1072 assert_eq!(data["addedTargets"], serde_json::json!(["1.1.1.1"]));
1073 assert!(data["removedTargets"].as_array().unwrap().is_empty());
1074 }
1075
1076 #[test]
1077 fn stable_targets_emit_no_drift_after_first_observation() {
1078 let hostnames = vec!["api.example.com".to_string()];
1079 let resolvers = fixed_resolvers();
1080 let policy = DnsRefreshPolicy {
1081 min_ttl_seconds: Some(0),
1082 max_stale_seconds: None,
1083 strategy: None,
1084 };
1085 let refresher = make(Some(&policy), &resolvers, &hostnames);
1086 let mut state = ResolverState::new();
1087 let resolver = |_h: &str| Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]));
1088
1089 let now = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
1090 let _ = refresher.tick(&mut state, &resolver, now, "c", "r");
1091 let events = refresher.tick(
1092 &mut state,
1093 &resolver,
1094 now + Duration::from_secs(60),
1095 "c",
1096 "r",
1097 );
1098 assert!(events.is_empty(), "stable targets must not emit drift");
1099 }
1100
1101 #[test]
1102 fn manual_strategy_emits_nothing() {
1103 let hostnames = vec!["api.example.com".to_string()];
1104 let resolvers = fixed_resolvers();
1105 let policy = DnsRefreshPolicy {
1106 min_ttl_seconds: None,
1107 max_stale_seconds: None,
1108 strategy: Some(DnsRefreshStrategy::Manual),
1109 };
1110 let refresher = make(Some(&policy), &resolvers, &hostnames);
1111 let mut state = ResolverState::new();
1112 let resolver = |_h: &str| Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]));
1113 let events = refresher.tick(
1114 &mut state,
1115 &resolver,
1116 SystemTime::UNIX_EPOCH + Duration::from_secs(1),
1117 "c",
1118 "r",
1119 );
1120 assert!(events.is_empty(), "manual strategy must skip ticks");
1121 assert!(state.is_empty(), "state must remain untouched");
1122 }
1123
1124 #[test]
1125 fn floor_skips_refresh_within_min_ttl() {
1126 let hostnames = vec!["api.example.com".to_string()];
1127 let resolvers = fixed_resolvers();
1128 let policy = DnsRefreshPolicy {
1129 min_ttl_seconds: Some(300),
1130 max_stale_seconds: None,
1131 strategy: None,
1132 };
1133 let refresher = make(Some(&policy), &resolvers, &hostnames);
1134 let mut state = ResolverState::new();
1135 let calls: RefCell<u32> = RefCell::new(0);
1136 let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
1137 *calls.borrow_mut() += 1;
1138 Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]))
1139 };
1140
1141 let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
1142 let _ = refresher.tick(&mut state, &resolver, t0, "c", "r");
1143 // 60s later — well within min_ttl_seconds (300) — must skip.
1144 let _ = refresher.tick(
1145 &mut state,
1146 &resolver,
1147 t0 + Duration::from_secs(60),
1148 "c",
1149 "r",
1150 );
1151 assert_eq!(
1152 *calls.borrow(),
1153 1,
1154 "floor must skip the second resolver call"
1155 );
1156 }
1157
1158 #[test]
1159 fn ceiling_forces_refresh_after_max_stale() {
1160 let hostnames = vec!["api.example.com".to_string()];
1161 let resolvers = fixed_resolvers();
1162 let policy = DnsRefreshPolicy {
1163 min_ttl_seconds: Some(3_000),
1164 max_stale_seconds: Some(60),
1165 strategy: None,
1166 };
1167 let refresher = make(Some(&policy), &resolvers, &hostnames);
1168 let mut state = ResolverState::new();
1169 let calls: RefCell<u32> = RefCell::new(0);
1170 let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
1171 *calls.borrow_mut() += 1;
1172 Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]))
1173 };
1174
1175 let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
1176 let _ = refresher.tick(&mut state, &resolver, t0, "c", "r");
1177 // 120s later — past max_stale (60s) — ceiling must override the floor.
1178 let _ = refresher.tick(
1179 &mut state,
1180 &resolver,
1181 t0 + Duration::from_secs(120),
1182 "c",
1183 "r",
1184 );
1185 assert_eq!(
1186 *calls.borrow(),
1187 2,
1188 "ceiling must override the floor and force a refresh"
1189 );
1190 }
1191
1192 #[test]
1193 fn resolver_failure_does_not_emit_drift() {
1194 let hostnames = vec!["api.example.com".to_string()];
1195 let resolvers = fixed_resolvers();
1196 let refresher = make(None, &resolvers, &hostnames);
1197 let mut state = ResolverState::new();
1198 let resolver =
1199 |_h: &str| -> io::Result<ResolvedAnswer> { Err(io::Error::other("transient")) };
1200 let events = refresher.tick(
1201 &mut state,
1202 &resolver,
1203 SystemTime::UNIX_EPOCH + Duration::from_secs(1),
1204 "c",
1205 "r",
1206 );
1207 assert!(
1208 events.is_empty(),
1209 "transient resolver error must not be reported as drift"
1210 );
1211 assert!(state.is_empty(), "failed lookup must not commit state");
1212 }
1213
1214 // ============================================================
1215 // scope: TTL-clamp tests for `refreshPolicy.minTtlSeconds`.
1216 // The clamp is the partial DNS-rebinding mitigation: an operator
1217 // who sets `minTtlSeconds: 60` refuses to honour TTL=0 fast-flux
1218 // records, which would otherwise force per-millisecond cache
1219 // invalidation. RFC 1035 §3.2.1 permits the clamp; only the
1220 // recorded `ttlSeconds` field is affected — the resolver still
1221 // probes upstream on every tick.
1222 // ============================================================
1223
1224 fn answer_with_ttl(targets: Vec<String>, ttl: u32) -> ResolvedAnswer {
1225 ResolvedAnswer {
1226 targets,
1227 ttl_seconds: ttl,
1228 resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
1229 }
1230 }
1231
1232 #[test]
1233 fn ticker_floors_ttl_to_min_ttl_seconds() {
1234 let hostnames = vec!["api.example.com".to_string()];
1235 let resolvers = fixed_resolvers();
1236 // Operator floor at 60s; upstream returns TTL=5 (suspect).
1237 let policy = DnsRefreshPolicy {
1238 min_ttl_seconds: Some(60),
1239 max_stale_seconds: None,
1240 strategy: None,
1241 };
1242 let refresher = make(Some(&policy), &resolvers, &hostnames);
1243 let mut state = ResolverState::new();
1244 let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
1245 Ok(answer_with_ttl(vec!["1.1.1.1".to_string()], 5))
1246 };
1247 let events = refresher.tick(
1248 &mut state,
1249 &resolver,
1250 SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
1251 "c",
1252 "r",
1253 );
1254 assert_eq!(events.len(), 1, "first observation emits drift");
1255 let data = events[0].data.as_ref().expect("data");
1256 assert_eq!(
1257 data["ttlSeconds"], 60,
1258 "sub-floor TTL must be clamped up to min_ttl_seconds"
1259 );
1260 }
1261
1262 #[test]
1263 fn ticker_passes_through_ttl_above_floor() {
1264 let hostnames = vec!["api.example.com".to_string()];
1265 let resolvers = fixed_resolvers();
1266 // Operator floor at 60s; upstream returns TTL=300 (above the floor).
1267 let policy = DnsRefreshPolicy {
1268 min_ttl_seconds: Some(60),
1269 max_stale_seconds: None,
1270 strategy: None,
1271 };
1272 let refresher = make(Some(&policy), &resolvers, &hostnames);
1273 let mut state = ResolverState::new();
1274 let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
1275 Ok(answer_with_ttl(vec!["1.1.1.1".to_string()], 300))
1276 };
1277 let events = refresher.tick(
1278 &mut state,
1279 &resolver,
1280 SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
1281 "c",
1282 "r",
1283 );
1284 let data = events[0].data.as_ref().expect("data");
1285 assert_eq!(
1286 data["ttlSeconds"], 300,
1287 "above-floor TTL must pass through verbatim"
1288 );
1289 }
1290
1291 #[test]
1292 fn ticker_zero_min_ttl_means_no_floor() {
1293 let hostnames = vec!["api.example.com".to_string()];
1294 let resolvers = fixed_resolvers();
1295 // Operator declared `minTtlSeconds: 0` — the "no floor" sentinel.
1296 // Sub-second TTLs from upstream pass through unmolested so the
1297 // operator can observe them.
1298 let policy = DnsRefreshPolicy {
1299 min_ttl_seconds: Some(0),
1300 max_stale_seconds: None,
1301 strategy: None,
1302 };
1303 let refresher = make(Some(&policy), &resolvers, &hostnames);
1304 let mut state = ResolverState::new();
1305 let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
1306 Ok(answer_with_ttl(vec!["1.1.1.1".to_string()], 1))
1307 };
1308 let events = refresher.tick(
1309 &mut state,
1310 &resolver,
1311 SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
1312 "c",
1313 "r",
1314 );
1315 let data = events[0].data.as_ref().expect("data");
1316 assert_eq!(
1317 data["ttlSeconds"], 1,
1318 "min_ttl_seconds=0 means no clamp — pass through TTL=1"
1319 );
1320 }
1321}