Skip to main content

cellos_supervisor/resolver_refresh/
mod.rs

1//! SEC-21 host-controlled resolver refresh + DNS authority drift detection.
2//!
3//! This module is the dataplane companion to the W1 T13 contract layer
4//! (`spec.authority.dnsAuthority.refreshPolicy`). On each tick it:
5//!
6//! 1. For every declared hostname, calls an injectable resolver function and
7//!    receives a (possibly empty) set of target strings.
8//! 2. Compares the answer against the prior observation in [`ResolverState`].
9//! 3. When the target set has changed, emits a
10//!    `dev.cellos.events.cell.observability.v1.dns_authority_drift` CloudEvent.
11//! 4. Honors `policy.minTtlSeconds` (floor — skip refreshes within this window)
12//!    and `policy.maxStaleSeconds` (ceiling — force refresh once exceeded).
13//! 5. With `strategy: manual`, automatic ticks are skipped — caller must opt
14//!    in explicitly.
15//!
16//! The resolver function is injected via the [`ResolverFn`] type alias so unit
17//! tests can run without contacting the network. The supervisor wires the
18//! production path to a `to_socket_addrs`-style lookup; see
19//! [`crate::supervisor`] for the gating behind `CELLOS_DNS_AUTHORITY_REFRESH=1`.
20//!
21//! ## Scope
22//!
23//! - **Phase 1 (W2 SEC-21, shipped):** the supervisor calls
24//!   [`ResolverRefresh::tick`] exactly once at startup, before the workload
25//!   runs. `tick()` itself stays sync — callers run it from a context where
26//!   blocking is acceptable, e.g. inside `tokio::task::spawn_blocking`.
27//! - **Phase 2 (this slice, shipped):** the [`ticker`] submodule wraps
28//!   `tick()` in a long-running tokio task that fires every
29//!   `min(refreshPolicy.minTtlSeconds, 60s).max(5s)` for the lifetime of
30//!   the cell. The ticker owns its own [`ResolverState`] (independent of
31//!   the startup tick) and dispatches drift events through a sync
32//!   [`ticker::DriftEmitter`] adapter — see [`sink_emitter`] for the
33//!   sync→async bridge to [`cellos_core::ports::EventSink`].
34//! - **Phase 3 (shipped):** real upstream TTL via [`hickory_resolve`]. The
35//!   resolver function now returns a [`ResolvedAnswer`] carrying both the
36//!   target set AND the minimum TTL the upstream nameserver advertised. Two
37//!   downstream effects:
38//!
39//!     1. The `dns_authority_drift` event's `ttlSeconds` field is now real
40//!        (was always `0`); `staleSeconds` becomes meaningful when the
41//!        ticker observes drift past the prior TTL.
42//!     2. `refreshPolicy.minTtlSeconds` clamps short upstream TTLs as a
43//!        DNS-rebinding-mitigation floor (operators can refuse to honour a
44//!        TTL=0 fast-flux record by setting a positive floor) — see
45//!        [`ticker::TickerConfig`] / [`ResolverRefresh::tick`] for the
46//!        clamp logic.
47//!
48//!   Per-hostname response-IP allowlisting (the *full* rebinding closure)
49//!   remains a separate slice; the floor is the partial-mitigation hook.
50
51pub mod dnssec;
52pub mod hickory_resolve;
53pub mod rebinding;
54pub mod sink_emitter;
55pub mod ticker;
56
57#[allow(unused_imports)]
58pub use dnssec::{TrustAnchors, ENV_TRUST_ANCHORS_PATH, TRUST_ANCHOR_SOURCE_IANA_DEFAULT};
59#[allow(unused_imports)]
60pub use hickory_resolve::{
61    extract_rrsig_metadata, proof_to_validation_result_with_rrsig, resolve_with_ttl,
62    resolve_with_ttl_validated, DnssecValidationResult, ResolvedAnswer, ValidatedResolvedAnswer,
63};
64#[allow(unused_imports)]
65pub use rebinding::{RebindingDecision, RebindingState};
66
67use std::collections::{BTreeSet, HashMap};
68use std::io;
69use std::time::{Duration, SystemTime, UNIX_EPOCH};
70
71use sha2::{Digest, Sha256};
72
73use cellos_core::{
74    cloud_event_v1_dns_authority_dnssec_failed, cloud_event_v1_dns_authority_drift,
75    cloud_event_v1_dns_authority_rebind_rejected, cloud_event_v1_dns_authority_rebind_threshold,
76    CloudEventV1, DnsAuthorityDnssecFailed, DnsAuthorityDnssecFailureReason, DnsAuthorityDrift,
77    DnsAuthorityRebindRejected, DnsAuthorityRebindThreshold, DnsRebindingPolicy, DnsRefreshPolicy,
78    DnsRefreshStrategy, DnsResolver, DnsResolverDnssecPolicy,
79};
80
81/// Injectable resolver function.
82///
83/// scope: returns a [`ResolvedAnswer`] carrying both the target set AND
84/// the upstream TTL. Tests pass a closure returning canned answers;
85/// production wires this to a `hickory-resolver`-backed call (see
86/// [`hickory_resolve::resolve_with_ttl`]).
87///
88/// An `Err` is treated the same as "no targets observed this tick" — drift
89/// events are not emitted for a transient resolver failure, and prior state
90/// is preserved.
91///
92/// **Migration note** (legacy signature): an older signature was
93/// `Fn(&str) -> io::Result<Vec<String>>`. Existing call sites that have
94/// already canonicalized to `Vec<String>` can adapt with a one-line shim:
95/// `Ok(ResolvedAnswer { targets: vec, ttl_seconds: 0, resolver_addr: ... })`.
96pub type ResolverFn<'a> = dyn Fn(&str) -> io::Result<ResolvedAnswer> + 'a;
97
98/// SEC-21 Phase 3h — DNSSEC-validating resolver function.
99///
100/// Returns a [`ValidatedResolvedAnswer`] carrying both the standard
101/// [`ResolvedAnswer`] AND the [`DnssecValidationResult`] discriminator
102/// (Validated / Failed / Unsigned). When the [`ResolverRefresh`] has a
103/// `dnssec_policy` configured, this callback is preferred over the
104/// plain [`ResolverFn`]; otherwise it is unused.
105///
106/// An `Err` is treated identically to the [`ResolverFn`] err path —
107/// transient resolver failure, no drift event, prior state preserved.
108pub type ValidatedResolverFn<'a> = dyn Fn(&str) -> io::Result<ValidatedResolvedAnswer> + 'a;
109
110/// Per-hostname state tracked across ticks.
111#[derive(Debug, Clone, Default)]
112struct HostState {
113    /// Previous canonicalized target list (sorted, deduped).
114    previous_targets: Vec<String>,
115    /// Previous canonical digest, or `"empty"` on first observation.
116    previous_digest: String,
117    /// Wall-clock time of the last *successful* refresh — used to enforce
118    /// `min_ttl_seconds` (floor) and `max_stale_seconds` (ceiling).
119    last_refresh_at: Option<SystemTime>,
120    /// TTL the upstream returned for the last successful answer; `0` when
121    /// unknown (`to_socket_addrs` does not surface DNS TTL today).
122    last_ttl_seconds: u32,
123}
124
125/// Persistent state for [`ResolverRefresh::tick`] — one entry per hostname.
126#[derive(Debug, Default)]
127pub struct ResolverState {
128    hosts: HashMap<String, HostState>,
129}
130
131impl ResolverState {
132    /// Create an empty resolver state. Callers reuse a single instance across
133    /// ticks for a given cell so prior observations persist.
134    pub fn new() -> Self {
135        Self::default()
136    }
137
138    /// Number of hostnames currently tracked. Test affordance.
139    pub fn len(&self) -> usize {
140        self.hosts.len()
141    }
142
143    /// Convenience: is there any tracked state.
144    pub fn is_empty(&self) -> bool {
145        self.hosts.is_empty()
146    }
147}
148
149/// One refresh-tick execution.
150///
151/// Borrows the spec-side configuration (no allocation), runs each declared
152/// hostname through the injected resolver, and produces drift CloudEvents the
153/// caller publishes via the configured event sink.
154pub struct ResolverRefresh<'a> {
155    /// Refresh policy from `spec.authority.dnsAuthority.refreshPolicy`.
156    /// `None` → defaults: no floor, no ceiling, `ttl-honor` strategy.
157    pub policy: Option<&'a DnsRefreshPolicy>,
158    /// SEC-21 Phase 3e — DNS rebinding mitigation policy from
159    /// `spec.authority.dnsAuthority.rebindingPolicy`. `None` → no
160    /// per-hostname response-IP tracking. Combined with the P3a TTL floor
161    /// (`policy.min_ttl_seconds`), this structurally closes the v0.4.0
162    /// rebinding residual.
163    pub rebinding_policy: Option<&'a DnsRebindingPolicy>,
164    /// Declared resolvers from `spec.authority.dnsAuthority.resolvers[]`.
165    /// The first entry's `resolverId` is used as the event's `resolverId`
166    /// when present; the empty string is used when none are declared.
167    pub resolvers: &'a [DnsResolver],
168    /// Hostnames the supervisor may refresh — typically derived from
169    /// `spec.authority.egressRules[].host` ∪
170    /// `spec.authority.dnsAuthority.hostnameAllowlist`.
171    pub hostnames: &'a [String],
172    /// Optional `keysetId` to bind into emitted events.
173    pub keyset_id: Option<&'a str>,
174    /// Optional `issuerKid` to bind into emitted events.
175    pub issuer_kid: Option<&'a str>,
176    /// Optional policy-bundle digest (`sha256:<hex>`) to bind into emitted events.
177    pub policy_digest: Option<&'a str>,
178    /// Optional pass-through correlation id.
179    pub correlation_id: Option<&'a str>,
180    /// CloudEvent `source` field — defaults to `cellos-supervisor` when the
181    /// caller passes `None`.
182    pub source: Option<&'a str>,
183    /// SEC-21 Phase 3h — opt-in DNSSEC validation policy. `None` (default)
184    /// preserves P3a/P3e behaviour exactly: no validation, no
185    /// `dns_authority_dnssec_failed` events, no `dnssec_status` tagging on
186    /// drift events. When `Some`, the [`Self::tick_with_dnssec`] method
187    /// honours `validate` / `failClosed` per resolver — see method docs.
188    pub dnssec_policy: Option<&'a DnsResolverDnssecPolicy>,
189    /// Trust anchors loaded from the env / spec / IANA-default precedence
190    /// chain. Used purely for stamping the source descriptor into emitted
191    /// events; hickory 0.24's resolver does not accept custom anchors via
192    /// public API. See [`super::dnssec`] module docs for the residual.
193    pub trust_anchors: Option<&'a TrustAnchors>,
194}
195
196impl<'a> ResolverRefresh<'a> {
197    /// Run one refresh tick.
198    ///
199    /// For each declared hostname:
200    ///
201    /// - If the strategy is `manual`, skip (operator must opt in).
202    /// - If `min_ttl_seconds` says we refreshed recently, skip — *unless*
203    ///   `max_stale_seconds` has been exceeded (ceiling overrides floor).
204    /// - Call the injected resolver. On error: leave prior state untouched and
205    ///   emit no event (transient failures are not drift).
206    /// - Canonicalize, hash, compare against prior digest.
207    /// - On change, build a [`DnsAuthorityDrift`] payload, wrap it in a
208    ///   CloudEvent envelope, and append to the returned vector.
209    /// - Update `state` with the new observation.
210    ///
211    /// Returns the events the caller should publish — possibly empty.
212    pub fn tick(
213        &self,
214        state: &mut ResolverState,
215        resolver: &ResolverFn<'_>,
216        now: SystemTime,
217        cell_id: &str,
218        run_id: &str,
219    ) -> Vec<CloudEventV1> {
220        // Backward-compat: callers that don't track rebinding state pass
221        // through to the rebinding-aware variant with a throwaway state.
222        // The throwaway is fine because, when `rebinding_policy is None`,
223        // the rebinding-aware path emits no extra events and the
224        // throwaway state never has anything committed to it.
225        let mut throwaway = RebindingState::new();
226        self.tick_with_rebinding(state, &mut throwaway, resolver, now, cell_id, run_id)
227    }
228
229    /// SEC-21 Phase 3e variant of [`Self::tick`] that also threads a
230    /// per-cell [`RebindingState`] for DNS rebinding mitigation.
231    ///
232    /// Behaves identically to `tick` when `self.rebinding_policy` is
233    /// `None`: emits only `dns_authority_drift` events, never touches
234    /// `rebinding_state`. When `rebinding_policy` is `Some`:
235    ///
236    /// 1. Resolves every hostname (same as `tick`).
237    /// 2. For each successful resolution, calls
238    ///    [`RebindingState::evaluate`] to decide which IPs are novel,
239    ///    whether the per-hostname cap is exceeded, and which IPs violate
240    ///    the operator allowlist.
241    /// 3. Emits one `dns_authority_rebind_threshold` per hostname when
242    ///    the cap is exceeded, and one `dns_authority_rebind_rejected`
243    ///    per allowlist violation.
244    /// 4. Stamps the EFFECTIVE targets (post-rejection when
245    ///    `reject_on_rebind=true`) into the subsequent
246    ///    `dns_authority_drift` event so the drift digest reflects what
247    ///    the workload actually resolves to.
248    /// 5. Calls [`RebindingState::commit`] with the effective targets so
249    ///    the per-cell history persists across ticks.
250    pub fn tick_with_rebinding(
251        &self,
252        state: &mut ResolverState,
253        rebinding_state: &mut RebindingState,
254        resolver: &ResolverFn<'_>,
255        now: SystemTime,
256        cell_id: &str,
257        run_id: &str,
258    ) -> Vec<CloudEventV1> {
259        let mut out: Vec<CloudEventV1> = Vec::new();
260
261        // Manual strategy: caller must opt in explicitly via a different code
262        // path (not yet implemented). For Phase 1 we treat manual as "no-op".
263        let strategy = self
264            .policy
265            .and_then(|p| p.strategy)
266            .unwrap_or(DnsRefreshStrategy::TtlHonor);
267        if matches!(strategy, DnsRefreshStrategy::Manual) {
268            return out;
269        }
270
271        let min_ttl = self.policy.and_then(|p| p.min_ttl_seconds).unwrap_or(0);
272        let max_stale = self.policy.and_then(|p| p.max_stale_seconds);
273
274        let resolver_id_owned: String = self
275            .resolvers
276            .first()
277            .map(|r| r.resolver_id.clone())
278            .unwrap_or_default();
279
280        let observed_at_rfc3339 = system_time_to_rfc3339(now);
281
282        for hostname in self.hostnames {
283            let prior = state.hosts.get(hostname).cloned().unwrap_or_default();
284
285            // Floor (min_ttl_seconds) — skip refresh when we last looked up
286            // this hostname less than `min_ttl_seconds` ago.
287            //
288            // Ceiling (max_stale_seconds) — *override* the floor: if the prior
289            // answer is older than max_stale, refresh regardless.
290            if let Some(last) = prior.last_refresh_at {
291                let age = now.duration_since(last).unwrap_or_default();
292                let age_secs = age.as_secs();
293                let floor_active = age_secs < u64::from(min_ttl);
294                let ceiling_breached = max_stale.is_some_and(|ms| age_secs >= u64::from(ms));
295                if floor_active && !ceiling_breached {
296                    // Within the floor and not past the ceiling — skip.
297                    continue;
298                }
299            }
300
301            let answer = match resolver(hostname) {
302                Ok(answer) => answer,
303                Err(_) => {
304                    // Transient resolver failure — do not emit drift, do not
305                    // update prior state, do not bump `last_refresh_at`.
306                    continue;
307                }
308            };
309
310            let canonical_targets = canonicalize_targets(&answer.targets);
311            let previous_digest = if prior.previous_digest.is_empty() {
312                "empty".to_string()
313            } else {
314                prior.previous_digest.clone()
315            };
316
317            // scope: DNS rebinding mitigation. When the operator has
318            // declared a `rebindingPolicy`, evaluate the canonical targets
319            // against the per-cell rebinding state BEFORE digest / drift
320            // emission. The decision's `effective_targets` replaces
321            // `canonical_targets` for everything downstream — the digest,
322            // the drift event payload, and the persisted state — so the
323            // operator-visible `dns_authority_drift` reflects what the
324            // workload actually resolves to (and so a `reject_on_rebind`
325            // policy actually withholds attacker IPs from the workload's
326            // view).
327            //
328            // When `rebinding_policy is None`, the decision is a no-op:
329            // effective_targets = canonical_targets, no events fire.
330            let (current_targets, rebind_events) = match self.rebinding_policy {
331                Some(rb_policy) => {
332                    let decision =
333                        rebinding_state.evaluate(hostname, &canonical_targets, rb_policy);
334                    let mut events: Vec<CloudEventV1> = Vec::new();
335
336                    if decision.threshold_exceeded {
337                        // One threshold event per hostname per tick. We
338                        // pick the FIRST novel IP as `novelIp` for SIEM
339                        // ergonomics (the cumulative count + max give the
340                        // operator everything they need to triage).
341                        if let Some(first_novel) = decision.novel_ips.first() {
342                            let prior_count = rebinding_state.history(hostname).len() as u32;
343                            let cumulative =
344                                prior_count.saturating_add(decision.novel_ips.len() as u32);
345                            let payload = DnsAuthorityRebindThreshold {
346                                schema_version: "1.0.0".into(),
347                                cell_id: cell_id.to_string(),
348                                run_id: run_id.to_string(),
349                                hostname: hostname.clone(),
350                                novel_ip: (*first_novel).to_string(),
351                                previous_ip_count: prior_count,
352                                cumulative_ip_count: cumulative,
353                                max_novel_ips_per_hostname: rb_policy.max_novel_ips_per_hostname,
354                                policy_digest: self
355                                    .policy_digest
356                                    .map(str::to_string)
357                                    .unwrap_or_else(empty_policy_digest),
358                                keyset_id: self.keyset_id.map(str::to_string),
359                                issuer_kid: self.issuer_kid.map(str::to_string),
360                                correlation_id: self.correlation_id.map(str::to_string),
361                                resolver_id: if resolver_id_owned.is_empty() {
362                                    None
363                                } else {
364                                    Some(resolver_id_owned.clone())
365                                },
366                                observed_at: observed_at_rfc3339.clone(),
367                            };
368                            let source = self.source.unwrap_or("cellos-supervisor");
369                            if let Ok(ev) = cloud_event_v1_dns_authority_rebind_threshold(
370                                source,
371                                &observed_at_rfc3339,
372                                &payload,
373                            ) {
374                                events.push(ev);
375                            }
376                        }
377                    }
378
379                    if !decision.allowlist_violations.is_empty() {
380                        // One rejected event per offending IP. We pre-build
381                        // the per-hostname allowlist echo once.
382                        let echo: Vec<String> = rb_policy
383                            .response_ip_allowlist
384                            .iter()
385                            .filter(|raw| {
386                                raw.split_once(':')
387                                    .is_some_and(|(prefix, _)| prefix == hostname)
388                            })
389                            .cloned()
390                            .collect();
391                        let prior_count = rebinding_state.history(hostname).len() as u32;
392                        let cumulative =
393                            prior_count.saturating_add(decision.novel_ips.len() as u32);
394                        for &offending in &decision.allowlist_violations {
395                            let payload = DnsAuthorityRebindRejected {
396                                schema_version: "1.0.0".into(),
397                                cell_id: cell_id.to_string(),
398                                run_id: run_id.to_string(),
399                                hostname: hostname.clone(),
400                                novel_ip: offending.to_string(),
401                                previous_ip_count: prior_count,
402                                cumulative_ip_count: cumulative,
403                                response_ip_allowlist: echo.clone(),
404                                policy_digest: self
405                                    .policy_digest
406                                    .map(str::to_string)
407                                    .unwrap_or_else(empty_policy_digest),
408                                keyset_id: self.keyset_id.map(str::to_string),
409                                issuer_kid: self.issuer_kid.map(str::to_string),
410                                correlation_id: self.correlation_id.map(str::to_string),
411                                resolver_id: if resolver_id_owned.is_empty() {
412                                    None
413                                } else {
414                                    Some(resolver_id_owned.clone())
415                                },
416                                observed_at: observed_at_rfc3339.clone(),
417                            };
418                            let source = self.source.unwrap_or("cellos-supervisor");
419                            if let Ok(ev) = cloud_event_v1_dns_authority_rebind_rejected(
420                                source,
421                                &observed_at_rfc3339,
422                                &payload,
423                            ) {
424                                events.push(ev);
425                            }
426                        }
427                    }
428
429                    // Use the effective (post-rejection) targets for the
430                    // drift event + state commit.
431                    let effective = canonicalize_targets(&decision.effective_targets);
432                    (effective, events)
433                }
434                None => (canonical_targets, Vec::new()),
435            };
436
437            let current_digest = digest_target_set(&current_targets);
438
439            // Emit the rebinding events BEFORE the drift event so a SIEM
440            // sees them in causal order ("threshold/rejected → drift").
441            for ev in rebind_events {
442                out.push(ev);
443            }
444
445            // scope: DNS-rebinding-mitigation floor. `refreshPolicy.minTtlSeconds`
446            // doubles as a *clamp* on the upstream TTL we record: an operator
447            // who has explicitly set a positive floor refuses to honour
448            // sub-floor TTLs (typical fast-flux indicator). RFC 1035 §3.2.1
449            // permits resolvers to clamp; the cellos floor is conservative
450            // (it only raises sub-floor TTLs, never lowers above-floor ones).
451            // `min_ttl == 0` is the "no floor" sentinel — pass through verbatim.
452            let clamped_ttl: u32 = if min_ttl > 0 && answer.ttl_seconds < min_ttl {
453                min_ttl
454            } else {
455                answer.ttl_seconds
456            };
457
458            // Stale window observed: how long was the prior answer served past
459            // its TTL before this refresh fired? Phase 3: now uses the *real*
460            // prior TTL (was always 0 in Phase 1, making this trivially 0).
461            let stale_seconds: u32 = match (prior.last_refresh_at, prior.last_ttl_seconds) {
462                (Some(last), ttl) if ttl > 0 => {
463                    let age = now.duration_since(last).unwrap_or_default().as_secs();
464                    age.saturating_sub(u64::from(ttl)).min(u32::MAX as u64) as u32
465                }
466                _ => 0,
467            };
468
469            // Only emit a drift event when the digest actually changed.
470            if current_digest != previous_digest {
471                let prev_set: BTreeSet<&String> = prior.previous_targets.iter().collect();
472                let curr_set: BTreeSet<&String> = current_targets.iter().collect();
473                let added: Vec<String> = curr_set
474                    .difference(&prev_set)
475                    .map(|s| (*s).clone())
476                    .collect();
477                let removed: Vec<String> = prev_set
478                    .difference(&curr_set)
479                    .map(|s| (*s).clone())
480                    .collect();
481
482                let drift = DnsAuthorityDrift {
483                    schema_version: "1.0.0".into(),
484                    cell_id: cell_id.to_string(),
485                    run_id: run_id.to_string(),
486                    resolver_id: resolver_id_owned.clone(),
487                    hostname: hostname.clone(),
488                    previous_targets: prior.previous_targets.clone(),
489                    current_targets: current_targets.clone(),
490                    added_targets: added,
491                    removed_targets: removed,
492                    previous_digest,
493                    current_digest: current_digest.clone(),
494                    // scope: real upstream TTL via hickory-resolver, clamped
495                    // to `refreshPolicy.minTtlSeconds` when the operator has
496                    // declared a floor (DNS-rebinding fast-flux mitigation).
497                    ttl_seconds: clamped_ttl,
498                    stale_seconds,
499                    keyset_id: self.keyset_id.map(str::to_string),
500                    issuer_kid: self.issuer_kid.map(str::to_string),
501                    policy_digest: self.policy_digest.map(str::to_string),
502                    correlation_id: self.correlation_id.map(str::to_string),
503                    // SEC-21 Phase 3h — `tick_with_rebinding` (the
504                    // pre-DNSSEC path) leaves dnssec_status as `None`
505                    // so the schema treats the field as omitted. The
506                    // dnssec-aware sibling `tick_with_dnssec` populates
507                    // it with one of the four enum literals.
508                    dnssec_status: None,
509                    observed_at: observed_at_rfc3339.clone(),
510                };
511
512                let source = self.source.unwrap_or("cellos-supervisor");
513                match cloud_event_v1_dns_authority_drift(source, &observed_at_rfc3339, &drift) {
514                    Ok(ev) => out.push(ev),
515                    Err(e) => {
516                        // Emit-side serialization failure is reported via tracing
517                        // by the caller; the resolver-refresh module is pure
518                        // data + has no logger. Drop the event silently here.
519                        let _ = e;
520                    }
521                }
522            }
523
524            // Always update state on a successful resolution — even when the
525            // digest matched — so `last_refresh_at` reflects the latest probe.
526            // We persist the *clamped* TTL so the next stale-window calc uses
527            // the floor-honouring value, matching the event we just emitted.
528            state.hosts.insert(
529                hostname.clone(),
530                HostState {
531                    previous_targets: current_targets.clone(),
532                    previous_digest: current_digest,
533                    last_refresh_at: Some(now),
534                    last_ttl_seconds: clamped_ttl,
535                },
536            );
537
538            // scope: commit the per-hostname rebinding observation.
539            // Takes the EFFECTIVE targets (post-rejection in enforce mode)
540            // so the cumulative history reflects what the workload saw.
541            // No-op when `rebinding_policy is None`.
542            if self.rebinding_policy.is_some() {
543                rebinding_state.commit(hostname, &current_targets);
544            }
545        }
546
547        out
548    }
549}
550
551/// SEC-21 Phase 3h — `dnssec_status` enum literals for the
552/// `dns_authority_drift` event payload. Centralized so the ticker tests
553/// can match on the exact string the schema expects without re-typing it.
554pub const DNSSEC_STATUS_VALIDATED: &str = "validated";
555pub const DNSSEC_STATUS_VALIDATION_FAILED: &str = "validation_failed";
556pub const DNSSEC_STATUS_UNSIGNED: &str = "unsigned";
557pub const DNSSEC_STATUS_NOT_ATTEMPTED: &str = "not_attempted";
558
559impl<'a> ResolverRefresh<'a> {
560    /// SEC-21 Phase 3h variant of [`Self::tick_with_rebinding`] that
561    /// also processes DNSSEC validation outcomes per hostname.
562    ///
563    /// Behaviour:
564    ///
565    /// - When `self.dnssec_policy` is `None`, this method behaves exactly
566    ///   like [`Self::tick_with_rebinding`] except it pulls per-hostname
567    ///   answers from `validated_resolved` and stamps
568    ///   `dnssec_status: not_attempted` on every emitted drift event.
569    /// - When `self.dnssec_policy` is `Some(policy)`:
570    ///     - `Validated` → `dnssec_status: validated`, normal flow.
571    ///     - `Failed{reason}` → emit `dns_authority_dnssec_failed`
572    ///       (reason: `validation_failed`); when `policy.fail_closed`,
573    ///       drop `answer.targets` to empty BEFORE the rebinding
574    ///       evaluator runs and tag drift `dnssec_status: validation_failed`.
575    ///     - `Unsigned` → emit `dns_authority_dnssec_failed`
576    ///       (reason: `unsigned_zone`); same `fail_closed` semantics.
577    ///
578    /// Order: dnssec_failed events are emitted BEFORE the drift /
579    /// rebinding events for that hostname so a SIEM sees them in
580    /// causal order ("dnssec_failed → drift").
581    pub fn tick_with_dnssec(
582        &self,
583        state: &mut ResolverState,
584        rebinding_state: &mut RebindingState,
585        validated_resolved: &std::collections::HashMap<String, io::Result<ValidatedResolvedAnswer>>,
586        now: SystemTime,
587        cell_id: &str,
588        run_id: &str,
589    ) -> Vec<CloudEventV1> {
590        let mut out: Vec<CloudEventV1> = Vec::new();
591
592        let strategy = self
593            .policy
594            .and_then(|p| p.strategy)
595            .unwrap_or(DnsRefreshStrategy::TtlHonor);
596        if matches!(strategy, DnsRefreshStrategy::Manual) {
597            return out;
598        }
599
600        let min_ttl = self.policy.and_then(|p| p.min_ttl_seconds).unwrap_or(0);
601        let max_stale = self.policy.and_then(|p| p.max_stale_seconds);
602
603        let resolver_id_owned: String = self
604            .resolvers
605            .first()
606            .map(|r| r.resolver_id.clone())
607            .unwrap_or_default();
608        let observed_at_rfc3339 = system_time_to_rfc3339(now);
609        let trust_anchor_source: String = self
610            .trust_anchors
611            .map(|ta| ta.source.clone())
612            .unwrap_or_else(|| dnssec::TRUST_ANCHOR_SOURCE_IANA_DEFAULT.to_string());
613        let policy_active = self.dnssec_policy.is_some();
614        let fail_closed = self.dnssec_policy.map(|p| p.fail_closed).unwrap_or(false);
615
616        for hostname in self.hostnames {
617            let prior = state.hosts.get(hostname).cloned().unwrap_or_default();
618
619            // Floor / ceiling — same predicate as the non-DNSSEC path.
620            if let Some(last) = prior.last_refresh_at {
621                let age = now.duration_since(last).unwrap_or_default();
622                let age_secs = age.as_secs();
623                let floor_active = age_secs < u64::from(min_ttl);
624                let ceiling_breached = max_stale.is_some_and(|ms| age_secs >= u64::from(ms));
625                if floor_active && !ceiling_breached {
626                    continue;
627                }
628            }
629
630            let validated = match validated_resolved.get(hostname) {
631                Some(Ok(v)) => v.clone(),
632                Some(Err(_)) | None => {
633                    // Transient resolver failure or missing entry — same
634                    // semantics as the unvalidated path: no drift, no
635                    // state mutation, no DNSSEC event (can't tell what
636                    // went wrong).
637                    continue;
638                }
639            };
640
641            // Decide the dnssec_status string + per-hostname effective
642            // answer (drop on fail_closed) + whether to emit a
643            // dns_authority_dnssec_failed event.
644            let (dnssec_status, dnssec_event, effective_answer): (
645                &'static str,
646                Option<CloudEventV1>,
647                ResolvedAnswer,
648            ) = match (&validated.validation, policy_active) {
649                (DnssecValidationResult::Validated { .. }, true) => {
650                    (DNSSEC_STATUS_VALIDATED, None, validated.answer.clone())
651                }
652                (DnssecValidationResult::Failed { reason }, true) => {
653                    let payload = DnsAuthorityDnssecFailed {
654                        schema_version: "1.0.0".into(),
655                        cell_id: cell_id.to_string(),
656                        run_id: run_id.to_string(),
657                        resolver_id: resolver_id_owned.clone(),
658                        hostname: hostname.clone(),
659                        reason: DnsAuthorityDnssecFailureReason::ValidationFailed
660                            .as_str()
661                            .to_string(),
662                        fail_closed,
663                        trust_anchor_source: trust_anchor_source.clone(),
664                        policy_digest: self.policy_digest.map(str::to_string),
665                        keyset_id: self.keyset_id.map(str::to_string),
666                        issuer_kid: self.issuer_kid.map(str::to_string),
667                        correlation_id: self.correlation_id.map(str::to_string),
668                        // SEC-21 Phase 3h.1 — additive `source` field. The
669                        // resolver-refresh path always stamps this literal.
670                        // The dataplane (in-netns DNS proxy) emits the same
671                        // event with `source = "dataplane"` from
672                        // `dns_proxy::dnssec`.
673                        source: Some("resolver_refresh".into()),
674                        observed_at: observed_at_rfc3339.clone(),
675                    };
676                    let _ = reason; // reason is logged by the caller via tracing if needed
677                    let source = self.source.unwrap_or("cellos-supervisor");
678                    let event = cloud_event_v1_dns_authority_dnssec_failed(
679                        source,
680                        &observed_at_rfc3339,
681                        &payload,
682                    )
683                    .ok();
684                    let answer = if fail_closed {
685                        ResolvedAnswer {
686                            targets: Vec::new(),
687                            ttl_seconds: validated.answer.ttl_seconds,
688                            resolver_addr: validated.answer.resolver_addr,
689                        }
690                    } else {
691                        validated.answer.clone()
692                    };
693                    (DNSSEC_STATUS_VALIDATION_FAILED, event, answer)
694                }
695                (DnssecValidationResult::Unsigned, true) => {
696                    let payload = DnsAuthorityDnssecFailed {
697                        schema_version: "1.0.0".into(),
698                        cell_id: cell_id.to_string(),
699                        run_id: run_id.to_string(),
700                        resolver_id: resolver_id_owned.clone(),
701                        hostname: hostname.clone(),
702                        reason: DnsAuthorityDnssecFailureReason::UnsignedZone
703                            .as_str()
704                            .to_string(),
705                        fail_closed,
706                        trust_anchor_source: trust_anchor_source.clone(),
707                        policy_digest: self.policy_digest.map(str::to_string),
708                        keyset_id: self.keyset_id.map(str::to_string),
709                        issuer_kid: self.issuer_kid.map(str::to_string),
710                        correlation_id: self.correlation_id.map(str::to_string),
711                        // SEC-21 Phase 3h.1 — additive `source` field; see
712                        // matching comment on the Failed-arm above.
713                        source: Some("resolver_refresh".into()),
714                        observed_at: observed_at_rfc3339.clone(),
715                    };
716                    let source = self.source.unwrap_or("cellos-supervisor");
717                    let event = cloud_event_v1_dns_authority_dnssec_failed(
718                        source,
719                        &observed_at_rfc3339,
720                        &payload,
721                    )
722                    .ok();
723                    let answer = if fail_closed {
724                        ResolvedAnswer {
725                            targets: Vec::new(),
726                            ttl_seconds: validated.answer.ttl_seconds,
727                            resolver_addr: validated.answer.resolver_addr,
728                        }
729                    } else {
730                        validated.answer.clone()
731                    };
732                    (DNSSEC_STATUS_UNSIGNED, event, answer)
733                }
734                // policy off → status:not_attempted, no event, pass-through answer.
735                (_, false) => (DNSSEC_STATUS_NOT_ATTEMPTED, None, validated.answer.clone()),
736            };
737
738            // Emit dnssec_failed BEFORE the drift event so SIEM sees
739            // causal order. When `dnssec_event` is None this is a no-op.
740            if let Some(ev) = dnssec_event {
741                out.push(ev);
742            }
743
744            // From here, the path mirrors `tick_with_rebinding` exactly,
745            // operating on `effective_answer` (the post-DNSSEC-policy
746            // ResolvedAnswer).
747            let canonical_targets = canonicalize_targets(&effective_answer.targets);
748            let previous_digest = if prior.previous_digest.is_empty() {
749                "empty".to_string()
750            } else {
751                prior.previous_digest.clone()
752            };
753
754            let (current_targets, rebind_events) = match self.rebinding_policy {
755                Some(rb_policy) => {
756                    let decision =
757                        rebinding_state.evaluate(hostname, &canonical_targets, rb_policy);
758                    let mut events: Vec<CloudEventV1> = Vec::new();
759                    if decision.threshold_exceeded {
760                        if let Some(first_novel) = decision.novel_ips.first() {
761                            let prior_count = rebinding_state.history(hostname).len() as u32;
762                            let cumulative =
763                                prior_count.saturating_add(decision.novel_ips.len() as u32);
764                            let payload = DnsAuthorityRebindThreshold {
765                                schema_version: "1.0.0".into(),
766                                cell_id: cell_id.to_string(),
767                                run_id: run_id.to_string(),
768                                hostname: hostname.clone(),
769                                novel_ip: (*first_novel).to_string(),
770                                previous_ip_count: prior_count,
771                                cumulative_ip_count: cumulative,
772                                max_novel_ips_per_hostname: rb_policy.max_novel_ips_per_hostname,
773                                policy_digest: self
774                                    .policy_digest
775                                    .map(str::to_string)
776                                    .unwrap_or_else(empty_policy_digest),
777                                keyset_id: self.keyset_id.map(str::to_string),
778                                issuer_kid: self.issuer_kid.map(str::to_string),
779                                correlation_id: self.correlation_id.map(str::to_string),
780                                resolver_id: if resolver_id_owned.is_empty() {
781                                    None
782                                } else {
783                                    Some(resolver_id_owned.clone())
784                                },
785                                observed_at: observed_at_rfc3339.clone(),
786                            };
787                            let source = self.source.unwrap_or("cellos-supervisor");
788                            if let Ok(ev) = cloud_event_v1_dns_authority_rebind_threshold(
789                                source,
790                                &observed_at_rfc3339,
791                                &payload,
792                            ) {
793                                events.push(ev);
794                            }
795                        }
796                    }
797                    if !decision.allowlist_violations.is_empty() {
798                        let echo: Vec<String> = rb_policy
799                            .response_ip_allowlist
800                            .iter()
801                            .filter(|raw| {
802                                raw.split_once(':')
803                                    .is_some_and(|(prefix, _)| prefix == hostname)
804                            })
805                            .cloned()
806                            .collect();
807                        let prior_count = rebinding_state.history(hostname).len() as u32;
808                        let cumulative =
809                            prior_count.saturating_add(decision.novel_ips.len() as u32);
810                        for &offending in &decision.allowlist_violations {
811                            let payload = DnsAuthorityRebindRejected {
812                                schema_version: "1.0.0".into(),
813                                cell_id: cell_id.to_string(),
814                                run_id: run_id.to_string(),
815                                hostname: hostname.clone(),
816                                novel_ip: offending.to_string(),
817                                previous_ip_count: prior_count,
818                                cumulative_ip_count: cumulative,
819                                response_ip_allowlist: echo.clone(),
820                                policy_digest: self
821                                    .policy_digest
822                                    .map(str::to_string)
823                                    .unwrap_or_else(empty_policy_digest),
824                                keyset_id: self.keyset_id.map(str::to_string),
825                                issuer_kid: self.issuer_kid.map(str::to_string),
826                                correlation_id: self.correlation_id.map(str::to_string),
827                                resolver_id: if resolver_id_owned.is_empty() {
828                                    None
829                                } else {
830                                    Some(resolver_id_owned.clone())
831                                },
832                                observed_at: observed_at_rfc3339.clone(),
833                            };
834                            let source = self.source.unwrap_or("cellos-supervisor");
835                            if let Ok(ev) = cloud_event_v1_dns_authority_rebind_rejected(
836                                source,
837                                &observed_at_rfc3339,
838                                &payload,
839                            ) {
840                                events.push(ev);
841                            }
842                        }
843                    }
844                    let effective = canonicalize_targets(&decision.effective_targets);
845                    (effective, events)
846                }
847                None => (canonical_targets, Vec::new()),
848            };
849
850            let current_digest = digest_target_set(&current_targets);
851
852            for ev in rebind_events {
853                out.push(ev);
854            }
855
856            let clamped_ttl: u32 = if min_ttl > 0 && effective_answer.ttl_seconds < min_ttl {
857                min_ttl
858            } else {
859                effective_answer.ttl_seconds
860            };
861
862            let stale_seconds: u32 = match (prior.last_refresh_at, prior.last_ttl_seconds) {
863                (Some(last), ttl) if ttl > 0 => {
864                    let age = now.duration_since(last).unwrap_or_default().as_secs();
865                    age.saturating_sub(u64::from(ttl)).min(u32::MAX as u64) as u32
866                }
867                _ => 0,
868            };
869
870            if current_digest != previous_digest {
871                let prev_set: BTreeSet<&String> = prior.previous_targets.iter().collect();
872                let curr_set: BTreeSet<&String> = current_targets.iter().collect();
873                let added: Vec<String> = curr_set
874                    .difference(&prev_set)
875                    .map(|s| (*s).clone())
876                    .collect();
877                let removed: Vec<String> = prev_set
878                    .difference(&curr_set)
879                    .map(|s| (*s).clone())
880                    .collect();
881
882                let drift = DnsAuthorityDrift {
883                    schema_version: "1.0.0".into(),
884                    cell_id: cell_id.to_string(),
885                    run_id: run_id.to_string(),
886                    resolver_id: resolver_id_owned.clone(),
887                    hostname: hostname.clone(),
888                    previous_targets: prior.previous_targets.clone(),
889                    current_targets: current_targets.clone(),
890                    added_targets: added,
891                    removed_targets: removed,
892                    previous_digest,
893                    current_digest: current_digest.clone(),
894                    ttl_seconds: clamped_ttl,
895                    stale_seconds,
896                    keyset_id: self.keyset_id.map(str::to_string),
897                    issuer_kid: self.issuer_kid.map(str::to_string),
898                    policy_digest: self.policy_digest.map(str::to_string),
899                    correlation_id: self.correlation_id.map(str::to_string),
900                    dnssec_status: Some(dnssec_status.to_string()),
901                    observed_at: observed_at_rfc3339.clone(),
902                };
903
904                let source = self.source.unwrap_or("cellos-supervisor");
905                if let Ok(ev) =
906                    cloud_event_v1_dns_authority_drift(source, &observed_at_rfc3339, &drift)
907                {
908                    out.push(ev);
909                }
910            }
911
912            state.hosts.insert(
913                hostname.clone(),
914                HostState {
915                    previous_targets: current_targets.clone(),
916                    previous_digest: current_digest,
917                    last_refresh_at: Some(now),
918                    last_ttl_seconds: clamped_ttl,
919                },
920            );
921
922            if self.rebinding_policy.is_some() {
923                rebinding_state.commit(hostname, &current_targets);
924            }
925        }
926
927        out
928    }
929}
930
931/// Sentinel `policyDigest` used in the SEC-21 Phase 3e rebinding events
932/// when the caller didn't supply one. The schema requires a digest, so we
933/// stamp the deterministic empty-string digest rather than failing the
934/// emission. Operators who want the real digest must wire
935/// `policy_digest` into [`ResolverRefresh`] (every production call site
936/// already does — this fallback only fires in unit tests that omit it).
937fn empty_policy_digest() -> String {
938    // sha256("") — `e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855`
939    "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".to_string()
940}
941
942/// Canonicalize a target set: trim whitespace, drop empties, dedupe, sort.
943fn canonicalize_targets(raw: &[String]) -> Vec<String> {
944    let mut set: BTreeSet<String> = BTreeSet::new();
945    for t in raw {
946        let trimmed = t.trim();
947        if !trimmed.is_empty() {
948            set.insert(trimmed.to_string());
949        }
950    }
951    set.into_iter().collect()
952}
953
954/// Compute `sha256:<hex>` over the canonicalized target list joined with `\n`.
955/// `[]` (empty set) hashes deterministically — every empty set produces the
956/// same digest.
957fn digest_target_set(canonical: &[String]) -> String {
958    let mut hasher = Sha256::new();
959    for (i, t) in canonical.iter().enumerate() {
960        if i > 0 {
961            hasher.update(b"\n");
962        }
963        hasher.update(t.as_bytes());
964    }
965    let bytes = hasher.finalize();
966    let mut out = String::with_capacity(7 + 64);
967    out.push_str("sha256:");
968    for b in bytes {
969        use std::fmt::Write;
970        let _ = write!(out, "{b:02x}");
971    }
972    out
973}
974
975/// Convert a `SystemTime` to an RFC3339 string. Avoids pulling chrono into the
976/// pure-data section of the module — the supervisor passes its own
977/// chrono-based timestamp through `now` for production paths.
978fn system_time_to_rfc3339(t: SystemTime) -> String {
979    let secs = t
980        .duration_since(UNIX_EPOCH)
981        .unwrap_or(Duration::ZERO)
982        .as_secs() as i64;
983    let dt =
984        chrono::DateTime::<chrono::Utc>::from_timestamp(secs, 0).unwrap_or_else(chrono::Utc::now);
985    dt.to_rfc3339()
986}
987
988#[cfg(test)]
989mod tests {
990    use super::*;
991    use std::cell::RefCell;
992    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
993    use std::time::Duration;
994
995    /// Test helper — wrap a `Vec<String>` answer in the Phase 3 [`ResolvedAnswer`]
996    /// shape. The pre-Phase-3 tests didn't care about TTL, so we default to 0
997    /// here and let the Phase 3 ticker tests build their own answers with
998    /// real TTLs.
999    fn answer_zero_ttl(targets: Vec<String>) -> ResolvedAnswer {
1000        ResolvedAnswer {
1001            targets,
1002            ttl_seconds: 0,
1003            resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
1004        }
1005    }
1006
1007    fn fixed_resolvers() -> Vec<DnsResolver> {
1008        vec![DnsResolver {
1009            resolver_id: "resolver-test-001".into(),
1010            endpoint: "1.1.1.1:53".into(),
1011            protocol: cellos_core::DnsResolverProtocol::Do53Udp,
1012            trust_kid: None,
1013            dnssec: None,
1014        }]
1015    }
1016
1017    fn make<'a>(
1018        policy: Option<&'a DnsRefreshPolicy>,
1019        resolvers: &'a [DnsResolver],
1020        hostnames: &'a [String],
1021    ) -> ResolverRefresh<'a> {
1022        ResolverRefresh {
1023            policy,
1024            rebinding_policy: None,
1025            resolvers,
1026            hostnames,
1027            keyset_id: Some("keyset-test-001"),
1028            issuer_kid: Some("kid-test-001"),
1029            policy_digest: None,
1030            correlation_id: None,
1031            source: Some("cellos-supervisor-test"),
1032            // SEC-21 Phase 3h test default — DNSSEC off so the existing
1033            // P3a tests (~17 of them, untouched) keep their original
1034            // semantics. The DNSSEC-specific tests below build their
1035            // own ResolverRefresh with the policy populated.
1036            dnssec_policy: None,
1037            trust_anchors: None,
1038        }
1039    }
1040
1041    #[test]
1042    fn canonicalize_dedupes_and_sorts() {
1043        let raw: Vec<String> = vec!["1.0.0.1".into(), "1.1.1.1".into(), "1.0.0.1".into()];
1044        let canon = canonicalize_targets(&raw);
1045        assert_eq!(canon, vec!["1.0.0.1".to_string(), "1.1.1.1".to_string()]);
1046    }
1047
1048    #[test]
1049    fn digest_changes_when_targets_change() {
1050        let a = canonicalize_targets(&["1.1.1.1".to_string()]);
1051        let b = canonicalize_targets(&["1.0.0.1".to_string()]);
1052        assert_ne!(digest_target_set(&a), digest_target_set(&b));
1053    }
1054
1055    #[test]
1056    fn first_observation_emits_drift_with_empty_previous() {
1057        let hostnames = vec!["api.example.com".to_string()];
1058        let resolvers = fixed_resolvers();
1059        let refresher = make(None, &resolvers, &hostnames);
1060        let mut state = ResolverState::new();
1061        let resolver = |_h: &str| Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]));
1062        let events = refresher.tick(
1063            &mut state,
1064            &resolver,
1065            SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
1066            "cell-001",
1067            "run-001",
1068        );
1069        assert_eq!(events.len(), 1, "first observation must emit drift");
1070        let data = events[0].data.as_ref().expect("data present");
1071        assert_eq!(data["previousDigest"], "empty");
1072        assert_eq!(data["addedTargets"], serde_json::json!(["1.1.1.1"]));
1073        assert!(data["removedTargets"].as_array().unwrap().is_empty());
1074    }
1075
1076    #[test]
1077    fn stable_targets_emit_no_drift_after_first_observation() {
1078        let hostnames = vec!["api.example.com".to_string()];
1079        let resolvers = fixed_resolvers();
1080        let policy = DnsRefreshPolicy {
1081            min_ttl_seconds: Some(0),
1082            max_stale_seconds: None,
1083            strategy: None,
1084        };
1085        let refresher = make(Some(&policy), &resolvers, &hostnames);
1086        let mut state = ResolverState::new();
1087        let resolver = |_h: &str| Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]));
1088
1089        let now = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
1090        let _ = refresher.tick(&mut state, &resolver, now, "c", "r");
1091        let events = refresher.tick(
1092            &mut state,
1093            &resolver,
1094            now + Duration::from_secs(60),
1095            "c",
1096            "r",
1097        );
1098        assert!(events.is_empty(), "stable targets must not emit drift");
1099    }
1100
1101    #[test]
1102    fn manual_strategy_emits_nothing() {
1103        let hostnames = vec!["api.example.com".to_string()];
1104        let resolvers = fixed_resolvers();
1105        let policy = DnsRefreshPolicy {
1106            min_ttl_seconds: None,
1107            max_stale_seconds: None,
1108            strategy: Some(DnsRefreshStrategy::Manual),
1109        };
1110        let refresher = make(Some(&policy), &resolvers, &hostnames);
1111        let mut state = ResolverState::new();
1112        let resolver = |_h: &str| Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]));
1113        let events = refresher.tick(
1114            &mut state,
1115            &resolver,
1116            SystemTime::UNIX_EPOCH + Duration::from_secs(1),
1117            "c",
1118            "r",
1119        );
1120        assert!(events.is_empty(), "manual strategy must skip ticks");
1121        assert!(state.is_empty(), "state must remain untouched");
1122    }
1123
1124    #[test]
1125    fn floor_skips_refresh_within_min_ttl() {
1126        let hostnames = vec!["api.example.com".to_string()];
1127        let resolvers = fixed_resolvers();
1128        let policy = DnsRefreshPolicy {
1129            min_ttl_seconds: Some(300),
1130            max_stale_seconds: None,
1131            strategy: None,
1132        };
1133        let refresher = make(Some(&policy), &resolvers, &hostnames);
1134        let mut state = ResolverState::new();
1135        let calls: RefCell<u32> = RefCell::new(0);
1136        let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
1137            *calls.borrow_mut() += 1;
1138            Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]))
1139        };
1140
1141        let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
1142        let _ = refresher.tick(&mut state, &resolver, t0, "c", "r");
1143        // 60s later — well within min_ttl_seconds (300) — must skip.
1144        let _ = refresher.tick(
1145            &mut state,
1146            &resolver,
1147            t0 + Duration::from_secs(60),
1148            "c",
1149            "r",
1150        );
1151        assert_eq!(
1152            *calls.borrow(),
1153            1,
1154            "floor must skip the second resolver call"
1155        );
1156    }
1157
1158    #[test]
1159    fn ceiling_forces_refresh_after_max_stale() {
1160        let hostnames = vec!["api.example.com".to_string()];
1161        let resolvers = fixed_resolvers();
1162        let policy = DnsRefreshPolicy {
1163            min_ttl_seconds: Some(3_000),
1164            max_stale_seconds: Some(60),
1165            strategy: None,
1166        };
1167        let refresher = make(Some(&policy), &resolvers, &hostnames);
1168        let mut state = ResolverState::new();
1169        let calls: RefCell<u32> = RefCell::new(0);
1170        let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
1171            *calls.borrow_mut() += 1;
1172            Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]))
1173        };
1174
1175        let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
1176        let _ = refresher.tick(&mut state, &resolver, t0, "c", "r");
1177        // 120s later — past max_stale (60s) — ceiling must override the floor.
1178        let _ = refresher.tick(
1179            &mut state,
1180            &resolver,
1181            t0 + Duration::from_secs(120),
1182            "c",
1183            "r",
1184        );
1185        assert_eq!(
1186            *calls.borrow(),
1187            2,
1188            "ceiling must override the floor and force a refresh"
1189        );
1190    }
1191
1192    #[test]
1193    fn resolver_failure_does_not_emit_drift() {
1194        let hostnames = vec!["api.example.com".to_string()];
1195        let resolvers = fixed_resolvers();
1196        let refresher = make(None, &resolvers, &hostnames);
1197        let mut state = ResolverState::new();
1198        let resolver =
1199            |_h: &str| -> io::Result<ResolvedAnswer> { Err(io::Error::other("transient")) };
1200        let events = refresher.tick(
1201            &mut state,
1202            &resolver,
1203            SystemTime::UNIX_EPOCH + Duration::from_secs(1),
1204            "c",
1205            "r",
1206        );
1207        assert!(
1208            events.is_empty(),
1209            "transient resolver error must not be reported as drift"
1210        );
1211        assert!(state.is_empty(), "failed lookup must not commit state");
1212    }
1213
1214    // ============================================================
1215    // scope: TTL-clamp tests for `refreshPolicy.minTtlSeconds`.
1216    // The clamp is the partial DNS-rebinding mitigation: an operator
1217    // who sets `minTtlSeconds: 60` refuses to honour TTL=0 fast-flux
1218    // records, which would otherwise force per-millisecond cache
1219    // invalidation. RFC 1035 §3.2.1 permits the clamp; only the
1220    // recorded `ttlSeconds` field is affected — the resolver still
1221    // probes upstream on every tick.
1222    // ============================================================
1223
1224    fn answer_with_ttl(targets: Vec<String>, ttl: u32) -> ResolvedAnswer {
1225        ResolvedAnswer {
1226            targets,
1227            ttl_seconds: ttl,
1228            resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
1229        }
1230    }
1231
1232    #[test]
1233    fn ticker_floors_ttl_to_min_ttl_seconds() {
1234        let hostnames = vec!["api.example.com".to_string()];
1235        let resolvers = fixed_resolvers();
1236        // Operator floor at 60s; upstream returns TTL=5 (suspect).
1237        let policy = DnsRefreshPolicy {
1238            min_ttl_seconds: Some(60),
1239            max_stale_seconds: None,
1240            strategy: None,
1241        };
1242        let refresher = make(Some(&policy), &resolvers, &hostnames);
1243        let mut state = ResolverState::new();
1244        let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
1245            Ok(answer_with_ttl(vec!["1.1.1.1".to_string()], 5))
1246        };
1247        let events = refresher.tick(
1248            &mut state,
1249            &resolver,
1250            SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
1251            "c",
1252            "r",
1253        );
1254        assert_eq!(events.len(), 1, "first observation emits drift");
1255        let data = events[0].data.as_ref().expect("data");
1256        assert_eq!(
1257            data["ttlSeconds"], 60,
1258            "sub-floor TTL must be clamped up to min_ttl_seconds"
1259        );
1260    }
1261
1262    #[test]
1263    fn ticker_passes_through_ttl_above_floor() {
1264        let hostnames = vec!["api.example.com".to_string()];
1265        let resolvers = fixed_resolvers();
1266        // Operator floor at 60s; upstream returns TTL=300 (above the floor).
1267        let policy = DnsRefreshPolicy {
1268            min_ttl_seconds: Some(60),
1269            max_stale_seconds: None,
1270            strategy: None,
1271        };
1272        let refresher = make(Some(&policy), &resolvers, &hostnames);
1273        let mut state = ResolverState::new();
1274        let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
1275            Ok(answer_with_ttl(vec!["1.1.1.1".to_string()], 300))
1276        };
1277        let events = refresher.tick(
1278            &mut state,
1279            &resolver,
1280            SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
1281            "c",
1282            "r",
1283        );
1284        let data = events[0].data.as_ref().expect("data");
1285        assert_eq!(
1286            data["ttlSeconds"], 300,
1287            "above-floor TTL must pass through verbatim"
1288        );
1289    }
1290
1291    #[test]
1292    fn ticker_zero_min_ttl_means_no_floor() {
1293        let hostnames = vec!["api.example.com".to_string()];
1294        let resolvers = fixed_resolvers();
1295        // Operator declared `minTtlSeconds: 0` — the "no floor" sentinel.
1296        // Sub-second TTLs from upstream pass through unmolested so the
1297        // operator can observe them.
1298        let policy = DnsRefreshPolicy {
1299            min_ttl_seconds: Some(0),
1300            max_stale_seconds: None,
1301            strategy: None,
1302        };
1303        let refresher = make(Some(&policy), &resolvers, &hostnames);
1304        let mut state = ResolverState::new();
1305        let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
1306            Ok(answer_with_ttl(vec!["1.1.1.1".to_string()], 1))
1307        };
1308        let events = refresher.tick(
1309            &mut state,
1310            &resolver,
1311            SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
1312            "c",
1313            "r",
1314        );
1315        let data = events[0].data.as_ref().expect("data");
1316        assert_eq!(
1317            data["ttlSeconds"], 1,
1318            "min_ttl_seconds=0 means no clamp — pass through TTL=1"
1319        );
1320    }
1321}