1use std::collections::HashMap;
45use std::io;
46use std::sync::atomic::{AtomicBool, Ordering};
47use std::sync::Arc;
48use std::time::{Duration, SystemTime};
49
50use cellos_core::{
51 CloudEventV1, DnsRebindingPolicy, DnsRefreshPolicy, DnsResolver, DnsResolverDnssecPolicy,
52};
53
54use super::{
55 RebindingState, ResolvedAnswer, ResolverRefresh, ResolverState, TrustAnchors,
56 ValidatedResolvedAnswer,
57};
58
59pub type SharedResolverFn = Arc<dyn Fn(&str) -> io::Result<ResolvedAnswer> + Send + Sync>;
68
69pub type SharedValidatedResolverFn =
74 Arc<dyn Fn(&str) -> io::Result<ValidatedResolvedAnswer> + Send + Sync>;
75
76pub trait DriftEmitter: Send + Sync + 'static {
82 fn emit(&self, event: CloudEventV1);
83}
84
85#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
91pub struct TickerStats {
92 pub tick_count: u64,
94 pub events_emitted: u64,
96 pub resolver_errors: u64,
99}
100
101pub struct TickerHandle {
110 pub shutdown: Arc<AtomicBool>,
114 pub task: tokio::task::JoinHandle<TickerStats>,
119}
120
121pub struct TickerConfig {
126 pub interval: Duration,
131 pub policy: Option<DnsRefreshPolicy>,
135 pub rebinding_policy: Option<DnsRebindingPolicy>,
143 pub resolvers: Vec<DnsResolver>,
146 pub hostnames: Vec<String>,
151 pub keyset_id: Option<String>,
153 pub issuer_kid: Option<String>,
155 pub policy_digest: Option<String>,
157 pub correlation_id: Option<String>,
159 pub source: String,
161 pub cell_id: String,
163 pub run_id: String,
165 pub dnssec_policy: Option<DnsResolverDnssecPolicy>,
176 pub trust_anchors: Option<TrustAnchors>,
181 pub validated_resolver: Option<SharedValidatedResolverFn>,
186}
187
188pub fn spawn_continuous_ticker(
201 cfg: TickerConfig,
202 emitter: Arc<dyn DriftEmitter>,
203 resolver: SharedResolverFn,
204) -> TickerHandle {
205 let shutdown = Arc::new(AtomicBool::new(false));
206 let shutdown_for_task = shutdown.clone();
207
208 let task =
209 tokio::spawn(
210 async move { run_ticker_loop(cfg, emitter, resolver, shutdown_for_task).await },
211 );
212
213 TickerHandle { shutdown, task }
214}
215
216async fn run_ticker_loop(
218 cfg: TickerConfig,
219 emitter: Arc<dyn DriftEmitter>,
220 resolver: SharedResolverFn,
221 shutdown: Arc<AtomicBool>,
222) -> TickerStats {
223 let mut stats = TickerStats::default();
224 let mut state = ResolverState::new();
225 let mut rebinding_state = RebindingState::new();
232
233 if cfg.hostnames.is_empty() {
236 return stats;
237 }
238
239 loop {
240 if shutdown.load(Ordering::SeqCst) {
241 break;
242 }
243
244 let dnssec_active = cfg.dnssec_policy.is_some() && cfg.validated_resolver.is_some();
251 let validated_resolved: HashMap<String, io::Result<ValidatedResolvedAnswer>> =
252 if dnssec_active {
253 let hostnames_for_resolve = cfg.hostnames.clone();
254 let validated = cfg.validated_resolver.as_ref().unwrap().clone();
255 match tokio::task::spawn_blocking(move || {
256 let mut out: HashMap<String, io::Result<ValidatedResolvedAnswer>> =
257 HashMap::new();
258 for hostname in &hostnames_for_resolve {
259 out.insert(hostname.clone(), validated(hostname));
260 }
261 out
262 })
263 .await
264 {
265 Ok(map) => map,
266 Err(_) => {
267 if !sleep_or_shutdown(cfg.interval, &shutdown).await {
268 break;
269 }
270 continue;
271 }
272 }
273 } else {
274 HashMap::new()
275 };
276
277 let resolved: HashMap<String, io::Result<ResolvedAnswer>> = if dnssec_active {
289 HashMap::new()
290 } else {
291 let hostnames_for_resolve = cfg.hostnames.clone();
292 let resolver_for_blocking = resolver.clone();
293 let join_result = tokio::task::spawn_blocking(move || {
294 let mut out: HashMap<String, io::Result<ResolvedAnswer>> = HashMap::new();
295 for hostname in &hostnames_for_resolve {
296 out.insert(hostname.clone(), resolver_for_blocking(hostname));
297 }
298 out
299 })
300 .await;
301
302 match join_result {
303 Ok(map) => map,
304 Err(_) => {
305 if !sleep_or_shutdown(cfg.interval, &shutdown).await {
310 break;
311 }
312 continue;
313 }
314 }
315 };
316
317 if dnssec_active {
321 for v in validated_resolved.values() {
322 if v.is_err() {
323 stats.resolver_errors = stats.resolver_errors.saturating_add(1);
324 }
325 }
326 } else {
327 for v in resolved.values() {
328 if v.is_err() {
329 stats.resolver_errors = stats.resolver_errors.saturating_add(1);
330 }
331 }
332 }
333
334 let resolver_for_tick = |hostname: &str| -> io::Result<ResolvedAnswer> {
335 match resolved.get(hostname) {
336 Some(Ok(answer)) => Ok(answer.clone()),
337 Some(Err(e)) => Err(io::Error::new(e.kind(), e.to_string())),
338 None => Err(io::Error::other(
339 "ticker: hostname missing from pre-resolved map",
340 )),
341 }
342 };
343
344 let refresher = ResolverRefresh {
345 policy: cfg.policy.as_ref(),
346 rebinding_policy: cfg.rebinding_policy.as_ref(),
347 resolvers: cfg.resolvers.as_slice(),
348 hostnames: cfg.hostnames.as_slice(),
349 keyset_id: cfg.keyset_id.as_deref(),
350 issuer_kid: cfg.issuer_kid.as_deref(),
351 policy_digest: cfg.policy_digest.as_deref(),
352 correlation_id: cfg.correlation_id.as_deref(),
353 source: Some(cfg.source.as_str()),
354 dnssec_policy: cfg.dnssec_policy.as_ref(),
360 trust_anchors: cfg.trust_anchors.as_ref(),
361 };
362
363 let events = if dnssec_active {
364 refresher.tick_with_dnssec(
369 &mut state,
370 &mut rebinding_state,
371 &validated_resolved,
372 SystemTime::now(),
373 &cfg.cell_id,
374 &cfg.run_id,
375 )
376 } else {
377 refresher.tick_with_rebinding(
378 &mut state,
379 &mut rebinding_state,
380 &resolver_for_tick,
381 SystemTime::now(),
382 &cfg.cell_id,
383 &cfg.run_id,
384 )
385 };
386
387 for ev in events {
388 stats.events_emitted = stats.events_emitted.saturating_add(1);
389 emitter.emit(ev);
390 }
391
392 stats.tick_count = stats.tick_count.saturating_add(1);
393
394 if !sleep_or_shutdown(cfg.interval, &shutdown).await {
395 break;
396 }
397 }
398
399 stats
400}
401
402pub fn clamp_tick_interval_secs(secs: u64) -> u64 {
406 secs.max(5)
407}
408
409async fn sleep_or_shutdown(interval: Duration, shutdown: &AtomicBool) -> bool {
416 let poll_step = Duration::from_millis(50);
421 let deadline = std::time::Instant::now() + interval;
422 loop {
423 if shutdown.load(Ordering::SeqCst) {
424 return false;
425 }
426 let now = std::time::Instant::now();
427 if now >= deadline {
428 return true;
429 }
430 let remaining = deadline - now;
431 tokio::time::sleep(remaining.min(poll_step)).await;
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
439 use std::sync::atomic::AtomicU64;
440 use std::sync::Mutex;
441
442 fn answer(targets: Vec<String>) -> ResolvedAnswer {
447 ResolvedAnswer {
448 targets,
449 ttl_seconds: 0,
450 resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
451 }
452 }
453
454 #[derive(Default)]
455 struct CollectingEmitter {
456 events: Mutex<Vec<CloudEventV1>>,
457 }
458 impl DriftEmitter for CollectingEmitter {
459 fn emit(&self, event: CloudEventV1) {
460 self.events.lock().unwrap().push(event);
461 }
462 }
463
464 fn one_resolver() -> Vec<DnsResolver> {
465 vec![DnsResolver {
466 resolver_id: "resolver-doh-cloudflare".into(),
467 endpoint: "https://1.1.1.1/dns-query".into(),
468 protocol: cellos_core::DnsResolverProtocol::Doh,
469 trust_kid: None,
470 dnssec: None,
471 }]
472 }
473
474 fn base_cfg(hostnames: Vec<String>, interval: Duration) -> TickerConfig {
475 TickerConfig {
476 interval,
477 policy: Some(DnsRefreshPolicy {
478 min_ttl_seconds: Some(0),
479 max_stale_seconds: None,
480 strategy: None,
481 }),
482 rebinding_policy: None,
483 resolvers: one_resolver(),
484 hostnames,
485 keyset_id: Some("keyset-test".into()),
486 issuer_kid: Some("kid-test".into()),
487 policy_digest: None,
488 correlation_id: None,
489 source: "cellos-supervisor-test".into(),
490 cell_id: "cell-A".into(),
491 run_id: "run-A".into(),
492 dnssec_policy: None,
496 trust_anchors: None,
497 validated_resolver: None,
498 }
499 }
500
501 fn cycling_resolver(sequence: Vec<Vec<String>>) -> SharedResolverFn {
504 let counter = Arc::new(AtomicU64::new(0));
505 let seq = Arc::new(sequence);
506 Arc::new(move |_h: &str| {
507 let idx = counter.fetch_add(1, Ordering::SeqCst) as usize;
508 let pick = if idx >= seq.len() {
509 seq.last().cloned().unwrap_or_default()
510 } else {
511 seq[idx].clone()
512 };
513 Ok(answer(pick))
514 })
515 }
516
517 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
518 async fn ticker_emits_drift_when_targets_change_between_ticks() {
519 let cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(100));
520 let emitter = Arc::new(CollectingEmitter::default());
521 let resolver = cycling_resolver(vec![
522 vec!["1.1.1.1".into()],
523 vec!["1.0.0.1".into()],
524 vec!["1.0.0.1".into()],
525 ]);
526
527 let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
528 tokio::time::sleep(Duration::from_millis(250)).await;
530 handle.shutdown.store(true, Ordering::SeqCst);
531 let stats = tokio::time::timeout(Duration::from_secs(1), handle.task)
532 .await
533 .expect("ticker join timeout")
534 .expect("ticker task panicked");
535
536 let events = emitter.events.lock().unwrap();
537 assert!(
538 events.len() >= 2,
539 "expected baseline + change drift events, got {}",
540 events.len()
541 );
542 assert!(stats.tick_count >= 2);
543 assert!(stats.events_emitted >= 2);
544 }
545
546 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
547 async fn ticker_silent_when_targets_stable() {
548 let cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(50));
549 let emitter = Arc::new(CollectingEmitter::default());
550 let resolver: SharedResolverFn =
552 Arc::new(|_h: &str| Ok(answer(vec!["203.0.113.10".into()])));
553
554 let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
555 tokio::time::sleep(Duration::from_millis(250)).await;
556 handle.shutdown.store(true, Ordering::SeqCst);
557 let _ = tokio::time::timeout(Duration::from_secs(1), handle.task)
558 .await
559 .expect("ticker join timeout");
560
561 let events = emitter.events.lock().unwrap();
562 assert_eq!(
563 events.len(),
564 1,
565 "stable targets must emit exactly one baseline event, got {}",
566 events.len()
567 );
568 }
569
570 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
571 async fn ticker_respects_shutdown_promptly() {
572 let cfg = base_cfg(vec!["api.example.com".into()], Duration::from_secs(10));
573 let emitter = Arc::new(CollectingEmitter::default());
574 let resolver: SharedResolverFn = Arc::new(|_h: &str| Ok(answer(vec!["1.1.1.1".into()])));
575
576 let handle = spawn_continuous_ticker(cfg, emitter, resolver);
577 tokio::time::sleep(Duration::from_millis(80)).await;
581 handle.shutdown.store(true, Ordering::SeqCst);
582 let started = std::time::Instant::now();
583 let _ = tokio::time::timeout(Duration::from_secs(1), handle.task)
584 .await
585 .expect("ticker did not honour shutdown within 1s");
586 let elapsed = started.elapsed();
587 assert!(
588 elapsed < Duration::from_millis(500),
589 "shutdown took {elapsed:?}, expected <500ms"
590 );
591 }
592
593 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
594 async fn ticker_respects_floor_interval_min() {
595 let floor = crate::resolver_refresh::ticker::clamp_tick_interval_secs(1);
601 assert!(floor >= 5, "tick interval floor must be >=5s; got {floor}");
602 let unbounded = crate::resolver_refresh::ticker::clamp_tick_interval_secs(120);
603 assert_eq!(unbounded, 120, "values >=floor must pass through untouched");
604 let zero = crate::resolver_refresh::ticker::clamp_tick_interval_secs(0);
605 assert!(zero >= 5, "zero must clamp up to floor");
606 }
607
608 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
609 async fn ticker_handles_resolver_failure_gracefully() {
610 let cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(50));
611 let emitter = Arc::new(CollectingEmitter::default());
612 let resolver: SharedResolverFn = Arc::new(|_h: &str| Err(io::Error::other("transient")));
613
614 let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
615 tokio::time::sleep(Duration::from_millis(250)).await;
616 handle.shutdown.store(true, Ordering::SeqCst);
617 let stats = tokio::time::timeout(Duration::from_secs(1), handle.task)
618 .await
619 .expect("ticker join timeout")
620 .expect("ticker task panicked");
621
622 let events = emitter.events.lock().unwrap();
623 assert!(
624 events.is_empty(),
625 "resolver failures must not emit drift, got {} events",
626 events.len()
627 );
628 assert!(
629 stats.resolver_errors >= 1,
630 "resolver_errors counter should reflect the failures, got {}",
631 stats.resolver_errors
632 );
633 assert!(
634 stats.tick_count >= 1,
635 "ticker must keep running across resolver errors"
636 );
637 }
638
639 fn count_events_of(events: &[CloudEventV1], suffix: &str) -> usize {
650 events.iter().filter(|e| e.ty.ends_with(suffix)).count()
651 }
652
653 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
654 async fn ticker_no_rebinding_events_when_policy_is_none() {
655 let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(80));
660 cfg.rebinding_policy = None;
661 let emitter = Arc::new(CollectingEmitter::default());
662 let resolver = cycling_resolver(vec![
665 vec!["1.0.0.1".into()],
666 vec!["1.0.0.2".into()],
667 vec!["1.0.0.3".into()],
668 vec!["1.0.0.4".into()],
669 vec!["1.0.0.5".into()],
670 ]);
671
672 let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
673 tokio::time::sleep(Duration::from_millis(250)).await;
674 handle.shutdown.store(true, Ordering::SeqCst);
675 let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
676
677 let events = emitter.events.lock().unwrap();
678 assert_eq!(
679 count_events_of(&events, "dns_authority_rebind_threshold"),
680 0,
681 "no rebinding policy → no threshold events"
682 );
683 assert_eq!(
684 count_events_of(&events, "dns_authority_rebind_rejected"),
685 0,
686 "no rebinding policy → no rejected events"
687 );
688 }
689
690 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
691 async fn ticker_threshold_only_audit_mode_emits_threshold_events() {
692 let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(60));
697 cfg.rebinding_policy = Some(DnsRebindingPolicy {
698 response_ip_allowlist: Vec::new(),
699 max_novel_ips_per_hostname: 2,
700 reject_on_rebind: false,
701 });
702 cfg.policy_digest =
703 Some("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into());
704 let emitter = Arc::new(CollectingEmitter::default());
705 let resolver = cycling_resolver(vec![
706 vec!["1.0.0.1".into()],
707 vec!["1.0.0.2".into()],
708 vec!["1.0.0.3".into()],
709 vec!["1.0.0.4".into()],
710 ]);
711
712 let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
713 tokio::time::sleep(Duration::from_millis(280)).await;
714 handle.shutdown.store(true, Ordering::SeqCst);
715 let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
716
717 let events = emitter.events.lock().unwrap();
718 let threshold = count_events_of(&events, "dns_authority_rebind_threshold");
719 let rejected = count_events_of(&events, "dns_authority_rebind_rejected");
720 assert!(
721 threshold >= 2,
722 "audit-only mode must emit at least 2 threshold events when cap is breached over multiple ticks; got {threshold}"
723 );
724 assert_eq!(
725 rejected, 0,
726 "no allowlist set → no rejected events; got {rejected}"
727 );
728 }
729
730 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
731 async fn ticker_reject_on_rebind_filters_drift_targets() {
732 let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(60));
738 cfg.rebinding_policy = Some(DnsRebindingPolicy {
739 response_ip_allowlist: Vec::new(),
740 max_novel_ips_per_hostname: 2,
741 reject_on_rebind: true,
742 });
743 cfg.policy_digest =
744 Some("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into());
745 let emitter = Arc::new(CollectingEmitter::default());
746 let resolver = cycling_resolver(vec![
749 vec!["1.0.0.1".into(), "1.0.0.2".into()],
750 vec!["1.0.0.1".into(), "1.0.0.2".into(), "198.51.100.7".into()],
751 ]);
752
753 let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
754 tokio::time::sleep(Duration::from_millis(220)).await;
755 handle.shutdown.store(true, Ordering::SeqCst);
756 let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
757
758 let events = emitter.events.lock().unwrap();
759 let drift_events: Vec<_> = events
760 .iter()
761 .filter(|e| e.ty.ends_with("dns_authority_drift"))
762 .collect();
763 assert!(
764 !drift_events.is_empty(),
765 "drift events must still fire (rejection only filters targets, not the drift signal)"
766 );
767 let last = drift_events.last().unwrap();
768 let data = last.data.as_ref().expect("data");
769 let current: Vec<&str> = data["currentTargets"]
770 .as_array()
771 .unwrap()
772 .iter()
773 .map(|v| v.as_str().unwrap())
774 .collect();
775 assert!(
776 current.contains(&"1.0.0.1"),
777 "first legitimate IP must survive rejection: {current:?}"
778 );
779 assert!(
780 current.contains(&"1.0.0.2"),
781 "second legitimate IP must survive rejection: {current:?}"
782 );
783 assert!(
784 !current.contains(&"198.51.100.7"),
785 "attacker IP beyond cap=2 must be filtered when reject_on_rebind=true: {current:?}"
786 );
787 }
788
789 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
790 async fn ticker_allowlist_only_emits_rejected_events() {
791 let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(60));
796 cfg.rebinding_policy = Some(DnsRebindingPolicy {
797 response_ip_allowlist: vec!["api.example.com:1.1.1.1".into()],
798 max_novel_ips_per_hostname: 100,
799 reject_on_rebind: false,
800 });
801 cfg.policy_digest =
802 Some("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into());
803 let emitter = Arc::new(CollectingEmitter::default());
804 let resolver: SharedResolverFn =
805 Arc::new(|_h: &str| Ok(answer(vec!["198.51.100.7".into()])));
806
807 let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
808 tokio::time::sleep(Duration::from_millis(220)).await;
809 handle.shutdown.store(true, Ordering::SeqCst);
810 let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
811
812 let events = emitter.events.lock().unwrap();
813 let threshold = count_events_of(&events, "dns_authority_rebind_threshold");
814 let rejected = count_events_of(&events, "dns_authority_rebind_rejected");
815 assert!(
819 rejected >= 1,
820 "allowlist violation must fire at least one rejected event"
821 );
822 assert_eq!(
823 threshold, 0,
824 "cap is far above the IP count → no threshold events; got {threshold}"
825 );
826 }
827
828 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
829 async fn ticker_combined_threshold_and_allowlist_both_fire() {
830 let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(60));
833 cfg.rebinding_policy = Some(DnsRebindingPolicy {
834 response_ip_allowlist: vec!["api.example.com:1.1.1.1".into()],
835 max_novel_ips_per_hostname: 1,
836 reject_on_rebind: false,
837 });
838 cfg.policy_digest =
839 Some("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into());
840 let emitter = Arc::new(CollectingEmitter::default());
841 let resolver = cycling_resolver(vec![vec!["1.1.1.1".into()], vec!["198.51.100.7".into()]]);
844
845 let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
846 tokio::time::sleep(Duration::from_millis(220)).await;
847 handle.shutdown.store(true, Ordering::SeqCst);
848 let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
849
850 let events = emitter.events.lock().unwrap();
851 let threshold = count_events_of(&events, "dns_authority_rebind_threshold");
852 let rejected = count_events_of(&events, "dns_authority_rebind_rejected");
853 assert!(
854 threshold >= 1,
855 "second tick exceeds cap=1 → threshold event expected"
856 );
857 assert!(
858 rejected >= 1,
859 "second tick IP is not in allowlist → rejected event expected"
860 );
861 }
862
863 fn cycling_validated_resolver(
883 sequence: Vec<crate::resolver_refresh::ValidatedResolvedAnswer>,
884 ) -> SharedValidatedResolverFn {
885 let counter = Arc::new(AtomicU64::new(0));
886 let seq = Arc::new(sequence);
887 Arc::new(move |_h: &str| {
888 let idx = counter.fetch_add(1, Ordering::SeqCst) as usize;
889 Ok(if idx >= seq.len() {
890 seq.last().cloned().unwrap_or_else(|| {
891 crate::resolver_refresh::ValidatedResolvedAnswer {
892 answer: ResolvedAnswer {
893 targets: Vec::new(),
894 ttl_seconds: 0,
895 resolver_addr: SocketAddr::new(
896 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
897 53,
898 ),
899 },
900 validation: crate::resolver_refresh::DnssecValidationResult::Unsigned,
901 }
902 })
903 } else {
904 seq[idx].clone()
905 })
906 })
907 }
908
909 fn validated_with(
910 targets: Vec<&str>,
911 validation: crate::resolver_refresh::DnssecValidationResult,
912 ) -> crate::resolver_refresh::ValidatedResolvedAnswer {
913 crate::resolver_refresh::ValidatedResolvedAnswer {
914 answer: ResolvedAnswer {
915 targets: targets.into_iter().map(String::from).collect(),
916 ttl_seconds: 60,
917 resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
918 },
919 validation,
920 }
921 }
922
923 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
924 async fn dnssec_failed_event_emitted_when_validate_true_failclosed_false() {
925 let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(70));
930 cfg.dnssec_policy = Some(DnsResolverDnssecPolicy {
931 validate: true,
932 fail_closed: false,
933 trust_anchors_path: None,
934 });
935 cfg.trust_anchors = Some(crate::resolver_refresh::TrustAnchors::iana_default());
936 cfg.validated_resolver = Some(cycling_validated_resolver(vec![validated_with(
937 vec!["1.0.0.1"],
938 crate::resolver_refresh::DnssecValidationResult::Failed {
939 reason: "synthetic-bogus".to_string(),
940 },
941 )]));
942 let emitter = Arc::new(CollectingEmitter::default());
943
944 let handle = spawn_continuous_ticker(
945 cfg,
946 emitter.clone(),
947 Arc::new(|_h: &str| Ok(answer(vec!["1.0.0.1".into()]))),
948 );
949 tokio::time::sleep(Duration::from_millis(220)).await;
950 handle.shutdown.store(true, Ordering::SeqCst);
951 let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
952
953 let events = emitter.events.lock().unwrap();
954 let dnssec_failed = count_events_of(&events, "dns_authority_dnssec_failed");
955 assert!(
956 dnssec_failed >= 1,
957 "audit-only DNSSEC failure must fire dns_authority_dnssec_failed; got {dnssec_failed}"
958 );
959 let drift_events: Vec<_> = events
961 .iter()
962 .filter(|e| e.ty.ends_with("dns_authority_drift"))
963 .collect();
964 assert!(
965 !drift_events.is_empty(),
966 "audit-only mode keeps the answer; drift must still fire"
967 );
968 let last_drift = drift_events.last().unwrap();
970 let data = last_drift.data.as_ref().expect("data");
971 let current: Vec<&str> = data["currentTargets"]
972 .as_array()
973 .unwrap()
974 .iter()
975 .map(|v| v.as_str().unwrap())
976 .collect();
977 assert!(
978 current.contains(&"1.0.0.1"),
979 "audit-only mode preserves the unvalidated answer in the drift event: {current:?}"
980 );
981 }
982
983 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
984 async fn dnssec_drops_answer_when_failclosed_true() {
985 let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(70));
989 cfg.dnssec_policy = Some(DnsResolverDnssecPolicy {
990 validate: true,
991 fail_closed: true,
992 trust_anchors_path: None,
993 });
994 cfg.trust_anchors = Some(crate::resolver_refresh::TrustAnchors::iana_default());
995 cfg.validated_resolver = Some(cycling_validated_resolver(vec![validated_with(
996 vec!["198.51.100.7"],
997 crate::resolver_refresh::DnssecValidationResult::Failed {
998 reason: "synthetic-bogus".to_string(),
999 },
1000 )]));
1001 let emitter = Arc::new(CollectingEmitter::default());
1002
1003 let handle = spawn_continuous_ticker(
1004 cfg,
1005 emitter.clone(),
1006 Arc::new(|_h: &str| Ok(answer(vec!["198.51.100.7".into()]))),
1007 );
1008 tokio::time::sleep(Duration::from_millis(220)).await;
1009 handle.shutdown.store(true, Ordering::SeqCst);
1010 let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
1011
1012 let events = emitter.events.lock().unwrap();
1013 let dnssec_failed = count_events_of(&events, "dns_authority_dnssec_failed");
1014 assert!(
1015 dnssec_failed >= 1,
1016 "enforce DNSSEC failure must fire dns_authority_dnssec_failed; got {dnssec_failed}"
1017 );
1018
1019 let drift_events: Vec<_> = events
1020 .iter()
1021 .filter(|e| e.ty.ends_with("dns_authority_drift"))
1022 .collect();
1023 if let Some(last) = drift_events.last() {
1027 let data = last.data.as_ref().expect("data");
1028 let current: Vec<&str> = data["currentTargets"]
1029 .as_array()
1030 .unwrap()
1031 .iter()
1032 .map(|v| v.as_str().unwrap())
1033 .collect();
1034 assert!(
1035 !current.contains(&"198.51.100.7"),
1036 "failClosed=true MUST drop the attacker IP from drift currentTargets: {current:?}"
1037 );
1038 }
1039 }
1040
1041 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1042 async fn dnssec_status_field_set_in_drift_event() {
1043 let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(70));
1048 cfg.dnssec_policy = Some(DnsResolverDnssecPolicy {
1049 validate: true,
1050 fail_closed: false,
1051 trust_anchors_path: None,
1052 });
1053 cfg.trust_anchors = Some(crate::resolver_refresh::TrustAnchors::iana_default());
1054 cfg.validated_resolver = Some(cycling_validated_resolver(vec![validated_with(
1055 vec!["1.1.1.1"],
1056 crate::resolver_refresh::DnssecValidationResult::Validated {
1057 algorithm: "RSASHA256".to_string(),
1058 key_tag: 19036,
1059 },
1060 )]));
1061 let emitter = Arc::new(CollectingEmitter::default());
1062
1063 let handle = spawn_continuous_ticker(
1064 cfg,
1065 emitter.clone(),
1066 Arc::new(|_h: &str| Ok(answer(vec!["1.1.1.1".into()]))),
1067 );
1068 tokio::time::sleep(Duration::from_millis(180)).await;
1069 handle.shutdown.store(true, Ordering::SeqCst);
1070 let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
1071
1072 let events = emitter.events.lock().unwrap();
1073 assert_eq!(
1075 count_events_of(&events, "dns_authority_dnssec_failed"),
1076 0,
1077 "Validated path must not emit dns_authority_dnssec_failed"
1078 );
1079 let drift_events: Vec<_> = events
1081 .iter()
1082 .filter(|e| e.ty.ends_with("dns_authority_drift"))
1083 .collect();
1084 assert!(
1085 !drift_events.is_empty(),
1086 "drift must fire on first observation"
1087 );
1088 let data = drift_events[0].data.as_ref().expect("data");
1089 assert_eq!(
1090 data["dnssecStatus"], "validated",
1091 "drift in DNSSEC mode must stamp dnssecStatus=validated"
1092 );
1093 }
1094
1095 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1096 async fn ticker_novel_ip_within_cap_does_not_emit_threshold() {
1097 let mut cfg = base_cfg(vec!["api.example.com".into()], Duration::from_millis(60));
1100 cfg.rebinding_policy = Some(DnsRebindingPolicy {
1101 response_ip_allowlist: Vec::new(),
1102 max_novel_ips_per_hostname: 4,
1103 reject_on_rebind: false,
1104 });
1105 cfg.policy_digest =
1106 Some("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into());
1107 let emitter = Arc::new(CollectingEmitter::default());
1108 let resolver = cycling_resolver(vec![
1109 vec!["1.0.0.1".into()],
1110 vec!["1.0.0.2".into()],
1111 vec!["1.0.0.3".into()],
1112 ]);
1113
1114 let handle = spawn_continuous_ticker(cfg, emitter.clone(), resolver);
1115 tokio::time::sleep(Duration::from_millis(220)).await;
1116 handle.shutdown.store(true, Ordering::SeqCst);
1117 let _ = tokio::time::timeout(Duration::from_secs(1), handle.task).await;
1118
1119 let events = emitter.events.lock().unwrap();
1120 let threshold = count_events_of(&events, "dns_authority_rebind_threshold");
1121 assert_eq!(
1122 threshold, 0,
1123 "3 distinct IPs under cap=4 must not fire threshold; got {threshold}"
1124 );
1125 }
1126}