Skip to main content

cellos_supervisor/resolver_refresh/
ticker.rs

1//! SEC-21 Phase 2 — continuous-ticker daemon mode.
2//!
3//! W2 SEC-21 (`super::ResolverRefresh::tick`) runs *one* refresh per cell
4//! run, before the workload starts. Phase 2 keeps that loop running for the
5//! lifetime of the cell so drift between runtime ticks produces events the
6//! operator sees in real time, not just at boot.
7//!
8//! ## Design notes
9//!
10//! - The ticker owns its own [`super::ResolverState`]. The startup one-tick
11//!   path in `supervisor.rs` uses a *separate* state. As a deliberate
12//!   trade-off, the **first observation per hostname inside the continuous
13//!   ticker** emits a baseline drift event with `previousDigest: empty`
14//!   even when the startup tick already observed the same hostname. The
15//!   alternative — sharing state across the startup tick and the
16//!   continuous ticker — would entangle two lifetimes that don't compose
17//!   cleanly; the duplication is bounded to one event per hostname per
18//!   cell run.
19//!
20//! - The resolver function ([`super::ResolverFn`]) is sync. Calling it
21//!   directly inside the tokio task would block a runtime worker for the
22//!   duration of every blocking `getaddrinfo`-class lookup, so each
23//!   per-tick batch resolution is wrapped in
24//!   [`tokio::task::spawn_blocking`] — mirroring the W2 SEC-21 fix in
25//!   `supervisor.rs::Supervisor::maybe_run_dns_authority_refresh`. The
26//!   Phase 3 production wiring routes that closure to
27//!   [`super::hickory_resolve::resolve_with_ttl`] via
28//!   [`tokio::runtime::Handle::block_on`] so the ticker stays sync at the
29//!   call site while still benefiting from the async resolver.
30//!
31//! - Shutdown is coordinated via a shared [`std::sync::atomic::AtomicBool`]
32//!   the supervisor flips to `true` at cell destroy. The loop checks the
33//!   flag at the top of every iteration AND races `shutdown` observation
34//!   against the inter-tick sleep via `tokio::select!` so a teardown
35//!   request returns within milliseconds, not at the next sleep boundary.
36//!
37//! - **Phase 3 — real upstream TTL.** The resolver function returns a
38//!   [`super::ResolvedAnswer`] carrying both the target set and the
39//!   minimum TTL the upstream nameserver advertised. The ticker forwards
40//!   that TTL into [`super::ResolverRefresh::tick`], which clamps it to
41//!   `refreshPolicy.minTtlSeconds` (the operator's DNS-rebinding floor)
42//!   before stamping it into the `dns_authority_drift` event.
43
44use std::collections::HashMap;
45use std::io;
46use std::sync::atomic::{AtomicBool, Ordering};
47use std::sync::Arc;
48use std::time::{Duration, SystemTime};
49
50use cellos_core::{
51    CloudEventV1, DnsRebindingPolicy, DnsRefreshPolicy, DnsResolver, DnsResolverDnssecPolicy,
52};
53
54use super::{
55    RebindingState, ResolvedAnswer, ResolverRefresh, ResolverState, TrustAnchors,
56    ValidatedResolvedAnswer,
57};
58
59/// Sync, send-and-share-able resolver closure shared by both the ticker
60/// internals and the supervisor production wiring. Type-aliased so clippy
61/// `type_complexity` stays quiet at the per-callsite level.
62///
63/// scope: returns [`ResolvedAnswer`] (targets + ttl_seconds +
64/// resolver_addr). The production closure is a thin `Handle::block_on`
65/// wrapper around [`super::hickory_resolve::resolve_with_ttl`]; tests build
66/// fixed answers directly.
67pub type SharedResolverFn = Arc<dyn Fn(&str) -> io::Result<ResolvedAnswer> + Send + Sync>;
68
69/// SEC-21 Phase 3h — DNSSEC-validating resolver closure shared with the
70/// ticker. Wraps [`super::resolve_with_ttl_validated`] in production; the
71/// ticker tests pass synthetic closures that mint deterministic
72/// [`ValidatedResolvedAnswer`] outcomes (Validated / Failed / Unsigned).
73pub type SharedValidatedResolverFn =
74    Arc<dyn Fn(&str) -> io::Result<ValidatedResolvedAnswer> + Send + Sync>;
75
76/// Sink-shape consumer the ticker hands events to. Mirrors the proxy's
77/// [`crate::dns_proxy::DnsQueryEmitter`] — synchronous on the call site so
78/// the ticker loop never blocks on emit, with the bridging to async
79/// [`cellos_core::ports::EventSink`] handled by the
80/// [`super::sink_emitter::EventSinkEmitter`] adapter.
81pub trait DriftEmitter: Send + Sync + 'static {
82    fn emit(&self, event: CloudEventV1);
83}
84
85/// Snapshot of per-cell counters returned when the ticker exits.
86///
87/// Returned by the spawned task as its [`tokio::task::JoinHandle`] output
88/// so the supervisor can log a summary at teardown. None of these counters
89/// drive control flow — they are pure observability.
90#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
91pub struct TickerStats {
92    /// Number of complete tick iterations executed.
93    pub tick_count: u64,
94    /// Number of `dns_authority_drift` events emitted across all ticks.
95    pub events_emitted: u64,
96    /// Number of `Err` returns from the injected resolver across all
97    /// hostname lookups in all ticks.
98    pub resolver_errors: u64,
99}
100
101/// Spawned-task handle the supervisor uses to coordinate shutdown.
102///
103/// On cell destroy the supervisor:
104///
105/// 1. Flips `shutdown` to `true`.
106/// 2. Awaits `task` with a bounded timeout (the supervisor's call site
107///    chooses 2s — matching the W4 SEAM-1 Phase 2b shutdown pattern in
108///    [`crate::dns_proxy::spawn::DnsProxyHandle::join`]).
109pub struct TickerHandle {
110    /// Shared shutdown flag. Setting `true` signals the loop to exit at its
111    /// next iteration. The loop also races this flag against the inter-tick
112    /// sleep so a teardown request collapses worst-case shutdown latency.
113    pub shutdown: Arc<AtomicBool>,
114    /// JoinHandle for the spawned tokio task. The task always returns
115    /// [`TickerStats`]; failures inside the loop are absorbed (resolver
116    /// errors increment a counter, malformed events drop, etc.) so the
117    /// task does not panic out from under the supervisor.
118    pub task: tokio::task::JoinHandle<TickerStats>,
119}
120
121/// Owned, owned-string configuration for one ticker — built by the
122/// supervisor in async context and moved into the spawned task. All
123/// borrowing-from-spec is resolved before spawn so the task's lifetime is
124/// `'static`.
125pub struct TickerConfig {
126    /// Inter-tick interval. Resolved by the supervisor — typically
127    /// `min(refreshPolicy.minTtlSeconds, 60).max(5)` — with a hard floor
128    /// of 5s applied at the call site so a misconfigured spec cannot burn
129    /// CPU here.
130    pub interval: Duration,
131    /// Refresh policy carried by the spec (`spec.authority.dnsAuthority.
132    /// refreshPolicy`). `None` means "no floor / no ceiling, ttl-honor
133    /// strategy", same as the startup tick.
134    pub policy: Option<DnsRefreshPolicy>,
135    /// SEC-21 Phase 3e — rebinding mitigation policy carried by the spec
136    /// (`spec.authority.dnsAuthority.rebindingPolicy`). `None` means "no
137    /// per-hostname response-IP tracking" — the ticker emits only the
138    /// standard `dns_authority_drift` events. When `Some`, the ticker
139    /// owns a per-cell [`RebindingState`] across ticks and emits
140    /// `dns_authority_rebind_threshold` / `dns_authority_rebind_rejected`
141    /// events as the operator-declared cap / allowlist is breached.
142    pub rebinding_policy: Option<DnsRebindingPolicy>,
143    /// Declared resolvers. The first entry's `resolverId` is stamped into
144    /// every emitted event.
145    pub resolvers: Vec<DnsResolver>,
146    /// Hostnames the ticker may refresh — typically the
147    /// `dnsAuthority.hostnameAllowlist` ∪ egress-rule hosts, resolved by
148    /// the supervisor at predicate time so the ticker has nothing to
149    /// re-derive.
150    pub hostnames: Vec<String>,
151    /// Optional `keysetId` to stamp into emitted events.
152    pub keyset_id: Option<String>,
153    /// Optional `issuerKid` to stamp into emitted events.
154    pub issuer_kid: Option<String>,
155    /// Optional policy-bundle digest (`sha256:<hex>`).
156    pub policy_digest: Option<String>,
157    /// Optional pass-through correlation id.
158    pub correlation_id: Option<String>,
159    /// CloudEvent `source` field.
160    pub source: String,
161    /// Cell id for event payloads.
162    pub cell_id: String,
163    /// Run id for event payloads.
164    pub run_id: String,
165    /// SEC-21 Phase 3h — opt-in DNSSEC validation policy. `None`
166    /// preserves P3a/P3e behaviour exactly: the ticker calls the plain
167    /// `SharedResolverFn` and never emits `dns_authority_dnssec_failed`
168    /// events. When `Some`, the ticker uses
169    /// [`Self::validated_resolver`] (which MUST also be set) and tags
170    /// `dnssec_status` on every emitted `dns_authority_drift` event.
171    /// Carries the EFFECTIVE per-tick policy — supervisors with a
172    /// heterogeneous `dnsAuthority.resolvers[]` set today pick a single
173    /// policy (the first opt-in resolver's) per ticker; multi-policy
174    /// per-resolver routing is a future slice.
175    pub dnssec_policy: Option<DnsResolverDnssecPolicy>,
176    /// Trust anchors loaded from env / spec / IANA-default. Used purely
177    /// for stamping the source descriptor into `dns_authority_dnssec_failed`
178    /// event payloads. Hickory 0.24 limitation: the resolver does not
179    /// accept custom anchors via public API; see [`super::dnssec`].
180    pub trust_anchors: Option<TrustAnchors>,
181    /// SEC-21 Phase 3h — DNSSEC-validating resolver closure. MUST be
182    /// set when `dnssec_policy` is `Some`; ignored otherwise. The
183    /// supervisor wires this to [`super::resolve_with_ttl_validated`]
184    /// in production; tests pass synthetic closures.
185    pub validated_resolver: Option<SharedValidatedResolverFn>,
186}
187
188/// Spawn the continuous ticker on the current tokio runtime.
189///
190/// Runs forever (`shutdown.load() == false`) on `cfg.interval` cadence,
191/// invoking the injected `resolver` for each hostname inside
192/// `tokio::task::spawn_blocking`. Each per-hostname Ok/Err is fed to a
193/// [`super::ResolverRefresh::tick`] call against the ticker's *own*
194/// [`super::ResolverState`]; resulting events are dispatched through
195/// `emitter`.
196///
197/// **Tokio-context constraint:** must be called from inside a tokio
198/// runtime (multi-thread or current-thread). The returned handle's
199/// `task` lives on the runtime that was current at spawn time.
200pub fn spawn_continuous_ticker(
201    cfg: TickerConfig,
202    emitter: Arc<dyn DriftEmitter>,
203    resolver: SharedResolverFn,
204) -> TickerHandle {
205    let shutdown = Arc::new(AtomicBool::new(false));
206    let shutdown_for_task = shutdown.clone();
207
208    let task =
209        tokio::spawn(
210            async move { run_ticker_loop(cfg, emitter, resolver, shutdown_for_task).await },
211        );
212
213    TickerHandle { shutdown, task }
214}
215
216/// The actual loop, factored out for testability.
217async fn run_ticker_loop(
218    cfg: TickerConfig,
219    emitter: Arc<dyn DriftEmitter>,
220    resolver: SharedResolverFn,
221    shutdown: Arc<AtomicBool>,
222) -> TickerStats {
223    let mut stats = TickerStats::default();
224    let mut state = ResolverState::new();
225    // SEC-21 Phase 3e — per-cell rebinding state lives ON the ticker so
226    // observations persist across ticks for the lifetime of the cell. A
227    // fresh state is created on every ticker spawn, matching the
228    // ResolverState lifecycle. When `cfg.rebinding_policy is None`, this
229    // state is never read or written by `tick_with_rebinding` (the
230    // method short-circuits the rebinding-aware path).
231    let mut rebinding_state = RebindingState::new();
232
233    // Quick exit if the supervisor handed us nothing to refresh — saves
234    // spinning the loop forever for a misconfigured cell.
235    if cfg.hostnames.is_empty() {
236        return stats;
237    }
238
239    loop {
240        if shutdown.load(Ordering::SeqCst) {
241            break;
242        }
243
244        // SEC-21 Phase 3h decision: when the operator has wired DNSSEC,
245        // pre-resolve through the validating closure so the per-tick
246        // refresher receives the validation discriminator alongside the
247        // standard answer. When DNSSEC is off, take the unchanged P3a
248        // pre-resolution path. Both branches honour the same
249        // `spawn_blocking` discipline.
250        let dnssec_active = cfg.dnssec_policy.is_some() && cfg.validated_resolver.is_some();
251        let validated_resolved: HashMap<String, io::Result<ValidatedResolvedAnswer>> =
252            if dnssec_active {
253                let hostnames_for_resolve = cfg.hostnames.clone();
254                let validated = cfg.validated_resolver.as_ref().unwrap().clone();
255                match tokio::task::spawn_blocking(move || {
256                    let mut out: HashMap<String, io::Result<ValidatedResolvedAnswer>> =
257                        HashMap::new();
258                    for hostname in &hostnames_for_resolve {
259                        out.insert(hostname.clone(), validated(hostname));
260                    }
261                    out
262                })
263                .await
264                {
265                    Ok(map) => map,
266                    Err(_) => {
267                        if !sleep_or_shutdown(cfg.interval, &shutdown).await {
268                            break;
269                        }
270                        continue;
271                    }
272                }
273            } else {
274                HashMap::new()
275            };
276
277        // Resolve every hostname on a blocking thread. Mirrors the W2
278        // SEC-21 startup-tick path — never call sync resolvers on the
279        // runtime thread. Phase 3: the closure itself dispatches to the
280        // hickory-resolver async helper via `Handle::block_on`, but that
281        // helper is fast-running per query and we still want the
282        // spawn_blocking isolation to keep the runtime workers free.
283        //
284        // scope: when DNSSEC is active, the validated_resolved map is
285        // the source of truth and we SKIP this unvalidated pre-resolve
286        // entirely (the unvalidated closure may even be a panicking stub
287        // in tests that prove the validated path is exclusively used).
288        let resolved: HashMap<String, io::Result<ResolvedAnswer>> = if dnssec_active {
289            HashMap::new()
290        } else {
291            let hostnames_for_resolve = cfg.hostnames.clone();
292            let resolver_for_blocking = resolver.clone();
293            let join_result = tokio::task::spawn_blocking(move || {
294                let mut out: HashMap<String, io::Result<ResolvedAnswer>> = HashMap::new();
295                for hostname in &hostnames_for_resolve {
296                    out.insert(hostname.clone(), resolver_for_blocking(hostname));
297                }
298                out
299            })
300            .await;
301
302            match join_result {
303                Ok(map) => map,
304                Err(_) => {
305                    // spawn_blocking join error (panic / cancel). Skip
306                    // this tick — counter not bumped because we did no
307                    // useful work; loop continues so a transient runtime
308                    // hiccup does not silently kill the ticker.
309                    if !sleep_or_shutdown(cfg.interval, &shutdown).await {
310                        break;
311                    }
312                    continue;
313                }
314            }
315        };
316
317        // Count resolver errors before we hand the map to tick().
318        // When DNSSEC is active, the *validated* map is the source of
319        // truth for error counting — the unvalidated map is unused.
320        if dnssec_active {
321            for v in validated_resolved.values() {
322                if v.is_err() {
323                    stats.resolver_errors = stats.resolver_errors.saturating_add(1);
324                }
325            }
326        } else {
327            for v in resolved.values() {
328                if v.is_err() {
329                    stats.resolver_errors = stats.resolver_errors.saturating_add(1);
330                }
331            }
332        }
333
334        let resolver_for_tick = |hostname: &str| -> io::Result<ResolvedAnswer> {
335            match resolved.get(hostname) {
336                Some(Ok(answer)) => Ok(answer.clone()),
337                Some(Err(e)) => Err(io::Error::new(e.kind(), e.to_string())),
338                None => Err(io::Error::other(
339                    "ticker: hostname missing from pre-resolved map",
340                )),
341            }
342        };
343
344        let refresher = ResolverRefresh {
345            policy: cfg.policy.as_ref(),
346            rebinding_policy: cfg.rebinding_policy.as_ref(),
347            resolvers: cfg.resolvers.as_slice(),
348            hostnames: cfg.hostnames.as_slice(),
349            keyset_id: cfg.keyset_id.as_deref(),
350            issuer_kid: cfg.issuer_kid.as_deref(),
351            policy_digest: cfg.policy_digest.as_deref(),
352            correlation_id: cfg.correlation_id.as_deref(),
353            source: Some(cfg.source.as_str()),
354            // SEC-21 Phase 3h — DNSSEC validation policy + loaded
355            // trust anchors. Both default to `None` (P3a/P3e behaviour
356            // unchanged); the supervisor sets these in
357            // `pick_dnssec_resolver_policy` when at least one resolver
358            // has opted in via spec.
359            dnssec_policy: cfg.dnssec_policy.as_ref(),
360            trust_anchors: cfg.trust_anchors.as_ref(),
361        };
362
363        let events = if dnssec_active {
364            // SEC-21 Phase 3h — drive the validating refresher path so
365            // the per-tick decision tree can emit
366            // `dns_authority_dnssec_failed` and tag drift events with
367            // `dnssec_status`.
368            refresher.tick_with_dnssec(
369                &mut state,
370                &mut rebinding_state,
371                &validated_resolved,
372                SystemTime::now(),
373                &cfg.cell_id,
374                &cfg.run_id,
375            )
376        } else {
377            refresher.tick_with_rebinding(
378                &mut state,
379                &mut rebinding_state,
380                &resolver_for_tick,
381                SystemTime::now(),
382                &cfg.cell_id,
383                &cfg.run_id,
384            )
385        };
386
387        for ev in events {
388            stats.events_emitted = stats.events_emitted.saturating_add(1);
389            emitter.emit(ev);
390        }
391
392        stats.tick_count = stats.tick_count.saturating_add(1);
393
394        if !sleep_or_shutdown(cfg.interval, &shutdown).await {
395            break;
396        }
397    }
398
399    stats
400}
401
402/// Clamp an operator-supplied tick interval (in seconds) to the
403/// minimum-5s floor the ticker enforces. Pure helper, exposed so the
404/// supervisor wiring + the property test can both call into it.
405pub fn clamp_tick_interval_secs(secs: u64) -> u64 {
406    secs.max(5)
407}
408
409/// Sleep for `interval` OR return early when `shutdown` flips to `true`.
410///
411/// Returns `true` when the interval elapsed normally, `false` when
412/// shutdown was requested mid-sleep. Polls the shutdown flag on a
413/// 50ms granularity so worst-case shutdown latency is ~50ms even when
414/// `interval` is large (e.g. 60s).
415async fn sleep_or_shutdown(interval: Duration, shutdown: &AtomicBool) -> bool {
416    // Poll-based wake. A `tokio::sync::Notify` would be tighter but adds
417    // a second synchronization primitive on top of the AtomicBool the
418    // supervisor already owns; the proxy spawn module took the same
419    // poll-on-AtomicBool route for the same reason.
420    let poll_step = Duration::from_millis(50);
421    let deadline = std::time::Instant::now() + interval;
422    loop {
423        if shutdown.load(Ordering::SeqCst) {
424            return false;
425        }
426        let now = std::time::Instant::now();
427        if now >= deadline {
428            return true;
429        }
430        let remaining = deadline - now;
431        tokio::time::sleep(remaining.min(poll_step)).await;
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
439    use std::sync::atomic::AtomicU64;
440    use std::sync::Mutex;
441
442    /// Build a Phase-3-shaped answer. The ticker tests don't probe TTL
443    /// semantics directly — that's covered in `super::tests` for the clamp
444    /// and in [`super::hickory_resolve::tests`] for the wire-format min-TTL —
445    /// so we default to ttl=0 here.
446    fn answer(targets: Vec<String>) -> ResolvedAnswer {
447        ResolvedAnswer {
448            targets,
449            ttl_seconds: 0,
450            resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
451        }
452    }
453
454    #[derive(Default)]
455    struct CollectingEmitter {
456        events: Mutex<Vec<CloudEventV1>>,
457    }
458    impl DriftEmitter for CollectingEmitter {
459        fn emit(&self, event: CloudEventV1) {
460            self.events.lock().unwrap().push(event);
461        }
462    }
463
464    fn one_resolver() -> Vec<DnsResolver> {
465        vec![DnsResolver {
466            resolver_id: "resolver-doh-cloudflare".into(),
467            endpoint: "https://1.1.1.1/dns-query".into(),
468            protocol: cellos_core::DnsResolverProtocol::Doh,
469            trust_kid: None,
470            dnssec: None,
471        }]
472    }
473
474    fn base_cfg(hostnames: Vec<String>, interval: Duration) -> TickerConfig {
475        TickerConfig {
476            interval,
477            policy: Some(DnsRefreshPolicy {
478                min_ttl_seconds: Some(0),
479                max_stale_seconds: None,
480                strategy: None,
481            }),
482            rebinding_policy: None,
483            resolvers: one_resolver(),
484            hostnames,
485            keyset_id: Some("keyset-test".into()),
486            issuer_kid: Some("kid-test".into()),
487            policy_digest: None,
488            correlation_id: None,
489            source: "cellos-supervisor-test".into(),
490            cell_id: "cell-A".into(),
491            run_id: "run-A".into(),
492            // SEC-21 Phase 3h — DNSSEC opt-out for the baseline test
493            // helper. The Phase 3h-specific tests build their own
494            // TickerConfig with these fields populated.
495            dnssec_policy: None,
496            trust_anchors: None,
497            validated_resolver: None,
498        }
499    }
500
501    /// Inject a resolver whose answers cycle through a fixed sequence —
502    /// each call advances by one, the last entry repeats forever.
503    fn cycling_resolver(sequence: Vec<Vec<String>>) -> SharedResolverFn {
504        let counter = Arc::new(AtomicU64::new(0));
505        let seq = Arc::new(sequence);
506        Arc::new(move |_h: &str| {
507            let idx = counter.fetch_add(1, Ordering::SeqCst) as usize;
508            let pick = if idx >= seq.len() {
509                seq.last().cloned().unwrap_or_default()
510            } else {
511                seq[idx].clone()
512            };
513            Ok(answer(pick))
514        })
515    }
516
517    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
518    async fn ticker_emits_drift_when_targets_change_between_ticks() {
519        let cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(100));
520        let emitter = Arc::new(CollectingEmitter::default());
521        let resolver = cycling_resolver(vec![
522            vec!["1.1.1.1".into()],
523            vec!["1.0.0.1".into()],
524            vec!["1.0.0.1".into()],
525        ]);
526
527        let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
528        // Two ticks at 100ms — sleep 250ms to comfortably observe both.
529        tokio::time::sleep(Duration::from_millis(250)).await;
530        handle.shutdown.store(true, Ordering::SeqCst);
531        let stats = tokio::time::timeout(Duration::from_secs(1), handle.task)
532            .await
533            .expect("ticker join timeout")
534            .expect("ticker task panicked");
535
536        let events = emitter.events.lock().unwrap();
537        assert!(
538            events.len() >= 2,
539            "expected baseline + change drift events, got {}",
540            events.len()
541        );
542        assert!(stats.tick_count >= 2);
543        assert!(stats.events_emitted >= 2);
544    }
545
546    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
547    async fn ticker_silent_when_targets_stable() {
548        let cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(50));
549        let emitter = Arc::new(CollectingEmitter::default());
550        // Stable answer across all ticks.
551        let resolver: SharedResolverFn =
552            Arc::new(|_h: &str| Ok(answer(vec!["203.0.113.10".into()])));
553
554        let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
555        tokio::time::sleep(Duration::from_millis(250)).await;
556        handle.shutdown.store(true, Ordering::SeqCst);
557        let _ = tokio::time::timeout(Duration::from_secs(1), handle.task)
558            .await
559            .expect("ticker join timeout");
560
561        let events = emitter.events.lock().unwrap();
562        assert_eq!(
563            events.len(),
564            1,
565            "stable targets must emit exactly one baseline event, got {}",
566            events.len()
567        );
568    }
569
570    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
571    async fn ticker_respects_shutdown_promptly() {
572        let cfg = base_cfg(vec!["api.example.com".into()], Duration::from_secs(10));
573        let emitter = Arc::new(CollectingEmitter::default());
574        let resolver: SharedResolverFn = Arc::new(|_h: &str| Ok(answer(vec!["1.1.1.1".into()])));
575
576        let handle = spawn_continuous_ticker(cfg, emitter, resolver);
577        // Let the first tick fire, then immediately shutdown — the loop
578        // should be in `sleep_or_shutdown` waiting on a 10s deadline,
579        // and the 50ms poll granularity means we collapse to ~50ms.
580        tokio::time::sleep(Duration::from_millis(80)).await;
581        handle.shutdown.store(true, Ordering::SeqCst);
582        let started = std::time::Instant::now();
583        let _ = tokio::time::timeout(Duration::from_secs(1), handle.task)
584            .await
585            .expect("ticker did not honour shutdown within 1s");
586        let elapsed = started.elapsed();
587        assert!(
588            elapsed < Duration::from_millis(500),
589            "shutdown took {elapsed:?}, expected <500ms"
590        );
591    }
592
593    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
594    async fn ticker_respects_floor_interval_min() {
595        // The ≥5s floor is enforced at the supervisor call site, not in
596        // the ticker module — the ticker honours whatever interval it is
597        // handed. This test pins the supervisor-side helper that performs
598        // the clamp so a future refactor of the call site cannot silently
599        // remove the floor.
600        let floor = crate::resolver_refresh::ticker::clamp_tick_interval_secs(1);
601        assert!(floor >= 5, "tick interval floor must be >=5s; got {floor}");
602        let unbounded = crate::resolver_refresh::ticker::clamp_tick_interval_secs(120);
603        assert_eq!(unbounded, 120, "values >=floor must pass through untouched");
604        let zero = crate::resolver_refresh::ticker::clamp_tick_interval_secs(0);
605        assert!(zero >= 5, "zero must clamp up to floor");
606    }
607
608    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
609    async fn ticker_handles_resolver_failure_gracefully() {
610        let cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(50));
611        let emitter = Arc::new(CollectingEmitter::default());
612        let resolver: SharedResolverFn = Arc::new(|_h: &str| Err(io::Error::other("transient")));
613
614        let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
615        tokio::time::sleep(Duration::from_millis(250)).await;
616        handle.shutdown.store(true, Ordering::SeqCst);
617        let stats = tokio::time::timeout(Duration::from_secs(1), handle.task)
618            .await
619            .expect("ticker join timeout")
620            .expect("ticker task panicked");
621
622        let events = emitter.events.lock().unwrap();
623        assert!(
624            events.is_empty(),
625            "resolver failures must not emit drift, got {} events",
626            events.len()
627        );
628        assert!(
629            stats.resolver_errors >= 1,
630            "resolver_errors counter should reflect the failures, got {}",
631            stats.resolver_errors
632        );
633        assert!(
634            stats.tick_count >= 1,
635            "ticker must keep running across resolver errors"
636        );
637    }
638
639    // ============================================================
640    // SEC-21 Phase 3e — DNS rebinding closure tests.
641    //
642    // These exercise the `rebinding_policy` field threaded through
643    // `TickerConfig` → `ResolverRefresh::tick_with_rebinding`. The unit
644    // tests in `super::tests` cover the per-tick decision math; these
645    // tests pin the END-TO-END ticker behaviour so a future refactor
646    // can't silently disconnect the policy from event emission.
647    // ============================================================
648
649    fn count_events_of(events: &[CloudEventV1], suffix: &str) -> usize {
650        events.iter().filter(|e| e.ty.ends_with(suffix)).count()
651    }
652
653    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
654    async fn ticker_no_rebinding_events_when_policy_is_none() {
655        // Baseline contract: `rebinding_policy: None` → ticker emits
656        // only the standard `dns_authority_drift` events. A future
657        // refactor that accidentally wires the rebinding code path to
658        // fire unconditionally will trip this test.
659        let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(80));
660        cfg.rebinding_policy = None;
661        let emitter = Arc::new(CollectingEmitter::default());
662        // 5 ticks worth of churning IPs — rebind events would normally
663        // fire for at least the first 4-5 distinct IPs.
664        let resolver = cycling_resolver(vec![
665            vec!["1.0.0.1".into()],
666            vec!["1.0.0.2".into()],
667            vec!["1.0.0.3".into()],
668            vec!["1.0.0.4".into()],
669            vec!["1.0.0.5".into()],
670        ]);
671
672        let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
673        tokio::time::sleep(Duration::from_millis(250)).await;
674        handle.shutdown.store(true, Ordering::SeqCst);
675        let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
676
677        let events = emitter.events.lock().unwrap();
678        assert_eq!(
679            count_events_of(&events, "dns_authority_rebind_threshold"),
680            0,
681            "no rebinding policy → no threshold events"
682        );
683        assert_eq!(
684            count_events_of(&events, "dns_authority_rebind_rejected"),
685            0,
686            "no rebinding policy → no rejected events"
687        );
688    }
689
690    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
691    async fn ticker_threshold_only_audit_mode_emits_threshold_events() {
692        // Audit-only: `reject_on_rebind=false`. Cap at 2 distinct IPs.
693        // The ticker walks 4 distinct IPs across 4 ticks; once the
694        // cumulative count exceeds 2, every subsequent tick that adds
695        // a novel IP fires a threshold event.
696        let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(60));
697        cfg.rebinding_policy = Some(DnsRebindingPolicy {
698            response_ip_allowlist: Vec::new(),
699            max_novel_ips_per_hostname: 2,
700            reject_on_rebind: false,
701        });
702        cfg.policy_digest =
703            Some("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into());
704        let emitter = Arc::new(CollectingEmitter::default());
705        let resolver = cycling_resolver(vec![
706            vec!["1.0.0.1".into()],
707            vec!["1.0.0.2".into()],
708            vec!["1.0.0.3".into()],
709            vec!["1.0.0.4".into()],
710        ]);
711
712        let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
713        tokio::time::sleep(Duration::from_millis(280)).await;
714        handle.shutdown.store(true, Ordering::SeqCst);
715        let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
716
717        let events = emitter.events.lock().unwrap();
718        let threshold = count_events_of(&events, "dns_authority_rebind_threshold");
719        let rejected = count_events_of(&events, "dns_authority_rebind_rejected");
720        assert!(
721            threshold >= 2,
722            "audit-only mode must emit at least 2 threshold events when cap is breached over multiple ticks; got {threshold}"
723        );
724        assert_eq!(
725            rejected, 0,
726            "no allowlist set → no rejected events; got {rejected}"
727        );
728    }
729
730    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
731    async fn ticker_reject_on_rebind_filters_drift_targets() {
732        // Enforce mode: cap=2, reject_on_rebind=true. Resolver returns a
733        // cumulative-style answer that grows on each tick (mirrors a CDN
734        // legitimately serving multiple A records, with an attacker IP
735        // appended on tick 2). After the cap fills, the new IP must be
736        // filtered out of the workload-visible target set.
737        let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(60));
738        cfg.rebinding_policy = Some(DnsRebindingPolicy {
739            response_ip_allowlist: Vec::new(),
740            max_novel_ips_per_hostname: 2,
741            reject_on_rebind: true,
742        });
743        cfg.policy_digest =
744            Some("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into());
745        let emitter = Arc::new(CollectingEmitter::default());
746        // Tick 1: two legitimate CDN IPs (fills cap=2 exactly).
747        // Tick 2: same two PLUS an attacker IP — must be filtered.
748        let resolver = cycling_resolver(vec![
749            vec!["1.0.0.1".into(), "1.0.0.2".into()],
750            vec!["1.0.0.1".into(), "1.0.0.2".into(), "198.51.100.7".into()],
751        ]);
752
753        let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
754        tokio::time::sleep(Duration::from_millis(220)).await;
755        handle.shutdown.store(true, Ordering::SeqCst);
756        let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
757
758        let events = emitter.events.lock().unwrap();
759        let drift_events: Vec<_> = events
760            .iter()
761            .filter(|e| e.ty.ends_with("dns_authority_drift"))
762            .collect();
763        assert!(
764            !drift_events.is_empty(),
765            "drift events must still fire (rejection only filters targets, not the drift signal)"
766        );
767        let last = drift_events.last().unwrap();
768        let data = last.data.as_ref().expect("data");
769        let current: Vec<&str> = data["currentTargets"]
770            .as_array()
771            .unwrap()
772            .iter()
773            .map(|v| v.as_str().unwrap())
774            .collect();
775        assert!(
776            current.contains(&"1.0.0.1"),
777            "first legitimate IP must survive rejection: {current:?}"
778        );
779        assert!(
780            current.contains(&"1.0.0.2"),
781            "second legitimate IP must survive rejection: {current:?}"
782        );
783        assert!(
784            !current.contains(&"198.51.100.7"),
785            "attacker IP beyond cap=2 must be filtered when reject_on_rebind=true: {current:?}"
786        );
787    }
788
789    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
790    async fn ticker_allowlist_only_emits_rejected_events() {
791        // Allowlist set, cap large, reject=false. Resolver returns one
792        // IP NOT in the allowlist on every tick → one rejected event
793        // per tick (deduped per-tick — no flapping when the same IP
794        // repeats). No threshold events because the cap isn't tight.
795        let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(60));
796        cfg.rebinding_policy = Some(DnsRebindingPolicy {
797            response_ip_allowlist: vec!["api.example.com:1.1.1.1".into()],
798            max_novel_ips_per_hostname: 100,
799            reject_on_rebind: false,
800        });
801        cfg.policy_digest =
802            Some("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into());
803        let emitter = Arc::new(CollectingEmitter::default());
804        let resolver: SharedResolverFn =
805            Arc::new(|_h: &str| Ok(answer(vec!["198.51.100.7".into()])));
806
807        let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
808        tokio::time::sleep(Duration::from_millis(220)).await;
809        handle.shutdown.store(true, Ordering::SeqCst);
810        let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
811
812        let events = emitter.events.lock().unwrap();
813        let threshold = count_events_of(&events, "dns_authority_rebind_threshold");
814        let rejected = count_events_of(&events, "dns_authority_rebind_rejected");
815        // The first tick observes the disallowed IP; subsequent ticks
816        // observe the SAME IP (cumulative count stays at 1, well below
817        // the 100 cap) — but the allowlist check fires every tick.
818        assert!(
819            rejected >= 1,
820            "allowlist violation must fire at least one rejected event"
821        );
822        assert_eq!(
823            threshold, 0,
824            "cap is far above the IP count → no threshold events; got {threshold}"
825        );
826    }
827
828    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
829    async fn ticker_combined_threshold_and_allowlist_both_fire() {
830        // Both policies active: tight cap AND allowlist. Resolver
831        // returns IPs that violate both. Assert both event types fire.
832        let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(60));
833        cfg.rebinding_policy = Some(DnsRebindingPolicy {
834            response_ip_allowlist: vec!["api.example.com:1.1.1.1".into()],
835            max_novel_ips_per_hostname: 1,
836            reject_on_rebind: false,
837        });
838        cfg.policy_digest =
839            Some("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into());
840        let emitter = Arc::new(CollectingEmitter::default());
841        // Tick 1: 1.1.1.1 (in allowlist, fills cap).
842        // Tick 2: 198.51.100.7 (not in allowlist, exceeds cap).
843        let resolver = cycling_resolver(vec![vec!["1.1.1.1".into()], vec!["198.51.100.7".into()]]);
844
845        let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
846        tokio::time::sleep(Duration::from_millis(220)).await;
847        handle.shutdown.store(true, Ordering::SeqCst);
848        let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
849
850        let events = emitter.events.lock().unwrap();
851        let threshold = count_events_of(&events, "dns_authority_rebind_threshold");
852        let rejected = count_events_of(&events, "dns_authority_rebind_rejected");
853        assert!(
854            threshold >= 1,
855            "second tick exceeds cap=1 → threshold event expected"
856        );
857        assert!(
858            rejected >= 1,
859            "second tick IP is not in allowlist → rejected event expected"
860        );
861    }
862
863    // ============================================================
864    // SEC-21 Phase 3h — DNSSEC validation ticker tests.
865    //
866    // These exercise the `dnssec_policy` + `validated_resolver`
867    // wiring threaded through `TickerConfig` and pin the load-bearing
868    // properties:
869    //
870    //   1. `dns_authority_dnssec_failed` events fire when the validator
871    //      reports Failed/Unsigned and the policy is active (audit OR
872    //      enforce mode — the event ALWAYS fires; failClosed only
873    //      changes whether the answer is dropped).
874    //   2. `failClosed=true` empties `currentTargets` in the drift
875    //      event so the workload-facing target set never sees the
876    //      attacker IPs.
877    //   3. `dnssec_status` is stamped on every drift event in DNSSEC
878    //      mode (validated / validation_failed / unsigned) so the SIEM
879    //      can correlate without parsing the dnssec_failed event.
880    // ============================================================
881
882    fn cycling_validated_resolver(
883        sequence: Vec<crate::resolver_refresh::ValidatedResolvedAnswer>,
884    ) -> SharedValidatedResolverFn {
885        let counter = Arc::new(AtomicU64::new(0));
886        let seq = Arc::new(sequence);
887        Arc::new(move |_h: &str| {
888            let idx = counter.fetch_add(1, Ordering::SeqCst) as usize;
889            Ok(if idx >= seq.len() {
890                seq.last().cloned().unwrap_or_else(|| {
891                    crate::resolver_refresh::ValidatedResolvedAnswer {
892                        answer: ResolvedAnswer {
893                            targets: Vec::new(),
894                            ttl_seconds: 0,
895                            resolver_addr: SocketAddr::new(
896                                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
897                                53,
898                            ),
899                        },
900                        validation: crate::resolver_refresh::DnssecValidationResult::Unsigned,
901                    }
902                })
903            } else {
904                seq[idx].clone()
905            })
906        })
907    }
908
909    fn validated_with(
910        targets: Vec<&str>,
911        validation: crate::resolver_refresh::DnssecValidationResult,
912    ) -> crate::resolver_refresh::ValidatedResolvedAnswer {
913        crate::resolver_refresh::ValidatedResolvedAnswer {
914            answer: ResolvedAnswer {
915                targets: targets.into_iter().map(String::from).collect(),
916                ttl_seconds: 60,
917                resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
918            },
919            validation,
920        }
921    }
922
923    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
924    async fn dnssec_failed_event_emitted_when_validate_true_failclosed_false() {
925        // Audit-only mode: validate=true, failClosed=false. Validator
926        // reports Failed → ticker MUST emit dns_authority_dnssec_failed
927        // BUT keep the answer (drift fires with the original targets,
928        // dnssec_status=validation_failed).
929        let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(70));
930        cfg.dnssec_policy = Some(DnsResolverDnssecPolicy {
931            validate: true,
932            fail_closed: false,
933            trust_anchors_path: None,
934        });
935        cfg.trust_anchors = Some(crate::resolver_refresh::TrustAnchors::iana_default());
936        cfg.validated_resolver = Some(cycling_validated_resolver(vec![validated_with(
937            vec!["1.0.0.1"],
938            crate::resolver_refresh::DnssecValidationResult::Failed {
939                reason: "synthetic-bogus".to_string(),
940            },
941        )]));
942        let emitter = Arc::new(CollectingEmitter::default());
943
944        let handle = spawn_continuous_ticker(
945            cfg,
946            emitter.clone(),
947            Arc::new(|_h: &str| Ok(answer(vec!["1.0.0.1".into()]))),
948        );
949        tokio::time::sleep(Duration::from_millis(220)).await;
950        handle.shutdown.store(true, Ordering::SeqCst);
951        let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
952
953        let events = emitter.events.lock().unwrap();
954        let dnssec_failed = count_events_of(&events, "dns_authority_dnssec_failed");
955        assert!(
956            dnssec_failed >= 1,
957            "audit-only DNSSEC failure must fire dns_authority_dnssec_failed; got {dnssec_failed}"
958        );
959        // Drift event is still present because the answer was kept.
960        let drift_events: Vec<_> = events
961            .iter()
962            .filter(|e| e.ty.ends_with("dns_authority_drift"))
963            .collect();
964        assert!(
965            !drift_events.is_empty(),
966            "audit-only mode keeps the answer; drift must still fire"
967        );
968        // Drift must carry currentTargets (audit-only does not drop).
969        let last_drift = drift_events.last().unwrap();
970        let data = last_drift.data.as_ref().expect("data");
971        let current: Vec<&str> = data["currentTargets"]
972            .as_array()
973            .unwrap()
974            .iter()
975            .map(|v| v.as_str().unwrap())
976            .collect();
977        assert!(
978            current.contains(&"1.0.0.1"),
979            "audit-only mode preserves the unvalidated answer in the drift event: {current:?}"
980        );
981    }
982
983    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
984    async fn dnssec_drops_answer_when_failclosed_true() {
985        // Enforce mode: validate=true, failClosed=true. Validator
986        // reports Failed → ticker MUST drop the answer (drift's
987        // currentTargets is empty) AND fire dns_authority_dnssec_failed.
988        let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(70));
989        cfg.dnssec_policy = Some(DnsResolverDnssecPolicy {
990            validate: true,
991            fail_closed: true,
992            trust_anchors_path: None,
993        });
994        cfg.trust_anchors = Some(crate::resolver_refresh::TrustAnchors::iana_default());
995        cfg.validated_resolver = Some(cycling_validated_resolver(vec![validated_with(
996            vec!["198.51.100.7"],
997            crate::resolver_refresh::DnssecValidationResult::Failed {
998                reason: "synthetic-bogus".to_string(),
999            },
1000        )]));
1001        let emitter = Arc::new(CollectingEmitter::default());
1002
1003        let handle = spawn_continuous_ticker(
1004            cfg,
1005            emitter.clone(),
1006            Arc::new(|_h: &str| Ok(answer(vec!["198.51.100.7".into()]))),
1007        );
1008        tokio::time::sleep(Duration::from_millis(220)).await;
1009        handle.shutdown.store(true, Ordering::SeqCst);
1010        let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
1011
1012        let events = emitter.events.lock().unwrap();
1013        let dnssec_failed = count_events_of(&events, "dns_authority_dnssec_failed");
1014        assert!(
1015            dnssec_failed >= 1,
1016            "enforce DNSSEC failure must fire dns_authority_dnssec_failed; got {dnssec_failed}"
1017        );
1018
1019        let drift_events: Vec<_> = events
1020            .iter()
1021            .filter(|e| e.ty.ends_with("dns_authority_drift"))
1022            .collect();
1023        // failClosed=true: the answer is dropped, but the drift event
1024        // still fires (with empty currentTargets) so the SIEM observes
1025        // the workload-visible state.
1026        if let Some(last) = drift_events.last() {
1027            let data = last.data.as_ref().expect("data");
1028            let current: Vec<&str> = data["currentTargets"]
1029                .as_array()
1030                .unwrap()
1031                .iter()
1032                .map(|v| v.as_str().unwrap())
1033                .collect();
1034            assert!(
1035                !current.contains(&"198.51.100.7"),
1036                "failClosed=true MUST drop the attacker IP from drift currentTargets: {current:?}"
1037            );
1038        }
1039    }
1040
1041    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1042    async fn dnssec_status_field_set_in_drift_event() {
1043        // When DNSSEC is active and the validator reports Validated,
1044        // the drift event MUST carry dnssec_status="validated" so the
1045        // SIEM can correlate without parsing dns_authority_dnssec_failed
1046        // (which won't fire on the success path).
1047        let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(70));
1048        cfg.dnssec_policy = Some(DnsResolverDnssecPolicy {
1049            validate: true,
1050            fail_closed: false,
1051            trust_anchors_path: None,
1052        });
1053        cfg.trust_anchors = Some(crate::resolver_refresh::TrustAnchors::iana_default());
1054        cfg.validated_resolver = Some(cycling_validated_resolver(vec![validated_with(
1055            vec!["1.1.1.1"],
1056            crate::resolver_refresh::DnssecValidationResult::Validated {
1057                algorithm: "RSASHA256".to_string(),
1058                key_tag: 19036,
1059            },
1060        )]));
1061        let emitter = Arc::new(CollectingEmitter::default());
1062
1063        let handle = spawn_continuous_ticker(
1064            cfg,
1065            emitter.clone(),
1066            Arc::new(|_h: &str| Ok(answer(vec!["1.1.1.1".into()]))),
1067        );
1068        tokio::time::sleep(Duration::from_millis(180)).await;
1069        handle.shutdown.store(true, Ordering::SeqCst);
1070        let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
1071
1072        let events = emitter.events.lock().unwrap();
1073        // No dnssec_failed event on the success path.
1074        assert_eq!(
1075            count_events_of(&events, "dns_authority_dnssec_failed"),
1076            0,
1077            "Validated path must not emit dns_authority_dnssec_failed"
1078        );
1079        // Drift event(s) carry dnssec_status: "validated".
1080        let drift_events: Vec<_> = events
1081            .iter()
1082            .filter(|e| e.ty.ends_with("dns_authority_drift"))
1083            .collect();
1084        assert!(
1085            !drift_events.is_empty(),
1086            "drift must fire on first observation"
1087        );
1088        let data = drift_events[0].data.as_ref().expect("data");
1089        assert_eq!(
1090            data["dnssecStatus"], "validated",
1091            "drift in DNSSEC mode must stamp dnssecStatus=validated"
1092        );
1093    }
1094
1095    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1096    async fn ticker_novel_ip_within_cap_does_not_emit_threshold() {
1097        // Cap=4 (the default), resolver returns 3 distinct IPs over 3
1098        // ticks — cumulative 3, never exceeds. No threshold events.
1099        let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(60));
1100        cfg.rebinding_policy = Some(DnsRebindingPolicy {
1101            response_ip_allowlist: Vec::new(),
1102            max_novel_ips_per_hostname: 4,
1103            reject_on_rebind: false,
1104        });
1105        cfg.policy_digest =
1106            Some("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into());
1107        let emitter = Arc::new(CollectingEmitter::default());
1108        let resolver = cycling_resolver(vec![
1109            vec!["1.0.0.1".into()],
1110            vec!["1.0.0.2".into()],
1111            vec!["1.0.0.3".into()],
1112        ]);
1113
1114        let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
1115        tokio::time::sleep(Duration::from_millis(220)).await;
1116        handle.shutdown.store(true, Ordering::SeqCst);
1117        let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
1118
1119        let events = emitter.events.lock().unwrap();
1120        let threshold = count_events_of(&events, "dns_authority_rebind_threshold");
1121        assert_eq!(
1122            threshold, 0,
1123            "3 distinct IPs under cap=4 must not fire threshold; got {threshold}"
1124        );
1125    }
1126}