pub mod dnssec;
pub mod hickory_resolve;
pub mod rebinding;
pub mod sink_emitter;
pub mod ticker;
#[allow(unused_imports)]
pub use dnssec::{TrustAnchors, ENV_TRUST_ANCHORS_PATH, TRUST_ANCHOR_SOURCE_IANA_DEFAULT};
#[allow(unused_imports)]
pub use hickory_resolve::{
extract_rrsig_metadata, proof_to_validation_result_with_rrsig, resolve_with_ttl,
resolve_with_ttl_validated, DnssecValidationResult, ResolvedAnswer, ValidatedResolvedAnswer,
};
#[allow(unused_imports)]
pub use rebinding::{RebindingDecision, RebindingState};
use std::collections::{BTreeSet, HashMap};
use std::io;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use sha2::{Digest, Sha256};
use cellos_core::{
cloud_event_v1_dns_authority_dnssec_failed, cloud_event_v1_dns_authority_drift,
cloud_event_v1_dns_authority_rebind_rejected, cloud_event_v1_dns_authority_rebind_threshold,
CloudEventV1, DnsAuthorityDnssecFailed, DnsAuthorityDnssecFailureReason, DnsAuthorityDrift,
DnsAuthorityRebindRejected, DnsAuthorityRebindThreshold, DnsRebindingPolicy, DnsRefreshPolicy,
DnsRefreshStrategy, DnsResolver, DnsResolverDnssecPolicy,
};
pub type ResolverFn<'a> = dyn Fn(&str) -> io::Result<ResolvedAnswer> + 'a;
pub type ValidatedResolverFn<'a> = dyn Fn(&str) -> io::Result<ValidatedResolvedAnswer> + 'a;
#[derive(Debug, Clone, Default)]
struct HostState {
previous_targets: Vec<String>,
previous_digest: String,
last_refresh_at: Option<SystemTime>,
last_ttl_seconds: u32,
}
#[derive(Debug, Default)]
pub struct ResolverState {
hosts: HashMap<String, HostState>,
}
impl ResolverState {
pub fn new() -> Self {
Self::default()
}
pub fn len(&self) -> usize {
self.hosts.len()
}
pub fn is_empty(&self) -> bool {
self.hosts.is_empty()
}
}
pub struct ResolverRefresh<'a> {
pub policy: Option<&'a DnsRefreshPolicy>,
pub rebinding_policy: Option<&'a DnsRebindingPolicy>,
pub resolvers: &'a [DnsResolver],
pub hostnames: &'a [String],
pub keyset_id: Option<&'a str>,
pub issuer_kid: Option<&'a str>,
pub policy_digest: Option<&'a str>,
pub correlation_id: Option<&'a str>,
pub source: Option<&'a str>,
pub dnssec_policy: Option<&'a DnsResolverDnssecPolicy>,
pub trust_anchors: Option<&'a TrustAnchors>,
}
impl<'a> ResolverRefresh<'a> {
pub fn tick(
&self,
state: &mut ResolverState,
resolver: &ResolverFn<'_>,
now: SystemTime,
cell_id: &str,
run_id: &str,
) -> Vec<CloudEventV1> {
let mut throwaway = RebindingState::new();
self.tick_with_rebinding(state, &mut throwaway, resolver, now, cell_id, run_id)
}
pub fn tick_with_rebinding(
&self,
state: &mut ResolverState,
rebinding_state: &mut RebindingState,
resolver: &ResolverFn<'_>,
now: SystemTime,
cell_id: &str,
run_id: &str,
) -> Vec<CloudEventV1> {
let mut out: Vec<CloudEventV1> = Vec::new();
let strategy = self
.policy
.and_then(|p| p.strategy)
.unwrap_or(DnsRefreshStrategy::TtlHonor);
if matches!(strategy, DnsRefreshStrategy::Manual) {
return out;
}
let min_ttl = self.policy.and_then(|p| p.min_ttl_seconds).unwrap_or(0);
let max_stale = self.policy.and_then(|p| p.max_stale_seconds);
let resolver_id_owned: String = self
.resolvers
.first()
.map(|r| r.resolver_id.clone())
.unwrap_or_default();
let observed_at_rfc3339 = system_time_to_rfc3339(now);
for hostname in self.hostnames {
let prior = state.hosts.get(hostname).cloned().unwrap_or_default();
if let Some(last) = prior.last_refresh_at {
let age = now.duration_since(last).unwrap_or_default();
let age_secs = age.as_secs();
let floor_active = age_secs < u64::from(min_ttl);
let ceiling_breached = max_stale.is_some_and(|ms| age_secs >= u64::from(ms));
if floor_active && !ceiling_breached {
continue;
}
}
let answer = match resolver(hostname) {
Ok(answer) => answer,
Err(_) => {
continue;
}
};
let canonical_targets = canonicalize_targets(&answer.targets);
let previous_digest = if prior.previous_digest.is_empty() {
"empty".to_string()
} else {
prior.previous_digest.clone()
};
let (current_targets, rebind_events) = match self.rebinding_policy {
Some(rb_policy) => {
let decision =
rebinding_state.evaluate(hostname, &canonical_targets, rb_policy);
let mut events: Vec<CloudEventV1> = Vec::new();
if decision.threshold_exceeded {
if let Some(first_novel) = decision.novel_ips.first() {
let prior_count = rebinding_state.history(hostname).len() as u32;
let cumulative =
prior_count.saturating_add(decision.novel_ips.len() as u32);
let payload = DnsAuthorityRebindThreshold {
schema_version: "1.0.0".into(),
cell_id: cell_id.to_string(),
run_id: run_id.to_string(),
hostname: hostname.clone(),
novel_ip: (*first_novel).to_string(),
previous_ip_count: prior_count,
cumulative_ip_count: cumulative,
max_novel_ips_per_hostname: rb_policy.max_novel_ips_per_hostname,
policy_digest: self
.policy_digest
.map(str::to_string)
.unwrap_or_else(empty_policy_digest),
keyset_id: self.keyset_id.map(str::to_string),
issuer_kid: self.issuer_kid.map(str::to_string),
correlation_id: self.correlation_id.map(str::to_string),
resolver_id: if resolver_id_owned.is_empty() {
None
} else {
Some(resolver_id_owned.clone())
},
observed_at: observed_at_rfc3339.clone(),
};
let source = self.source.unwrap_or("cellos-supervisor");
if let Ok(ev) = cloud_event_v1_dns_authority_rebind_threshold(
source,
&observed_at_rfc3339,
&payload,
) {
events.push(ev);
}
}
}
if !decision.allowlist_violations.is_empty() {
let echo: Vec<String> = rb_policy
.response_ip_allowlist
.iter()
.filter(|raw| {
raw.split_once(':')
.is_some_and(|(prefix, _)| prefix == hostname)
})
.cloned()
.collect();
let prior_count = rebinding_state.history(hostname).len() as u32;
let cumulative =
prior_count.saturating_add(decision.novel_ips.len() as u32);
for &offending in &decision.allowlist_violations {
let payload = DnsAuthorityRebindRejected {
schema_version: "1.0.0".into(),
cell_id: cell_id.to_string(),
run_id: run_id.to_string(),
hostname: hostname.clone(),
novel_ip: offending.to_string(),
previous_ip_count: prior_count,
cumulative_ip_count: cumulative,
response_ip_allowlist: echo.clone(),
policy_digest: self
.policy_digest
.map(str::to_string)
.unwrap_or_else(empty_policy_digest),
keyset_id: self.keyset_id.map(str::to_string),
issuer_kid: self.issuer_kid.map(str::to_string),
correlation_id: self.correlation_id.map(str::to_string),
resolver_id: if resolver_id_owned.is_empty() {
None
} else {
Some(resolver_id_owned.clone())
},
observed_at: observed_at_rfc3339.clone(),
};
let source = self.source.unwrap_or("cellos-supervisor");
if let Ok(ev) = cloud_event_v1_dns_authority_rebind_rejected(
source,
&observed_at_rfc3339,
&payload,
) {
events.push(ev);
}
}
}
let effective = canonicalize_targets(&decision.effective_targets);
(effective, events)
}
None => (canonical_targets, Vec::new()),
};
let current_digest = digest_target_set(¤t_targets);
for ev in rebind_events {
out.push(ev);
}
let clamped_ttl: u32 = if min_ttl > 0 && answer.ttl_seconds < min_ttl {
min_ttl
} else {
answer.ttl_seconds
};
let stale_seconds: u32 = match (prior.last_refresh_at, prior.last_ttl_seconds) {
(Some(last), ttl) if ttl > 0 => {
let age = now.duration_since(last).unwrap_or_default().as_secs();
age.saturating_sub(u64::from(ttl)).min(u32::MAX as u64) as u32
}
_ => 0,
};
if current_digest != previous_digest {
let prev_set: BTreeSet<&String> = prior.previous_targets.iter().collect();
let curr_set: BTreeSet<&String> = current_targets.iter().collect();
let added: Vec<String> = curr_set
.difference(&prev_set)
.map(|s| (*s).clone())
.collect();
let removed: Vec<String> = prev_set
.difference(&curr_set)
.map(|s| (*s).clone())
.collect();
let drift = DnsAuthorityDrift {
schema_version: "1.0.0".into(),
cell_id: cell_id.to_string(),
run_id: run_id.to_string(),
resolver_id: resolver_id_owned.clone(),
hostname: hostname.clone(),
previous_targets: prior.previous_targets.clone(),
current_targets: current_targets.clone(),
added_targets: added,
removed_targets: removed,
previous_digest,
current_digest: current_digest.clone(),
ttl_seconds: clamped_ttl,
stale_seconds,
keyset_id: self.keyset_id.map(str::to_string),
issuer_kid: self.issuer_kid.map(str::to_string),
policy_digest: self.policy_digest.map(str::to_string),
correlation_id: self.correlation_id.map(str::to_string),
dnssec_status: None,
observed_at: observed_at_rfc3339.clone(),
};
let source = self.source.unwrap_or("cellos-supervisor");
match cloud_event_v1_dns_authority_drift(source, &observed_at_rfc3339, &drift) {
Ok(ev) => out.push(ev),
Err(e) => {
let _ = e;
}
}
}
state.hosts.insert(
hostname.clone(),
HostState {
previous_targets: current_targets.clone(),
previous_digest: current_digest,
last_refresh_at: Some(now),
last_ttl_seconds: clamped_ttl,
},
);
if self.rebinding_policy.is_some() {
rebinding_state.commit(hostname, ¤t_targets);
}
}
out
}
}
pub const DNSSEC_STATUS_VALIDATED: &str = "validated";
pub const DNSSEC_STATUS_VALIDATION_FAILED: &str = "validation_failed";
pub const DNSSEC_STATUS_UNSIGNED: &str = "unsigned";
pub const DNSSEC_STATUS_NOT_ATTEMPTED: &str = "not_attempted";
impl<'a> ResolverRefresh<'a> {
pub fn tick_with_dnssec(
&self,
state: &mut ResolverState,
rebinding_state: &mut RebindingState,
validated_resolved: &std::collections::HashMap<String, io::Result<ValidatedResolvedAnswer>>,
now: SystemTime,
cell_id: &str,
run_id: &str,
) -> Vec<CloudEventV1> {
let mut out: Vec<CloudEventV1> = Vec::new();
let strategy = self
.policy
.and_then(|p| p.strategy)
.unwrap_or(DnsRefreshStrategy::TtlHonor);
if matches!(strategy, DnsRefreshStrategy::Manual) {
return out;
}
let min_ttl = self.policy.and_then(|p| p.min_ttl_seconds).unwrap_or(0);
let max_stale = self.policy.and_then(|p| p.max_stale_seconds);
let resolver_id_owned: String = self
.resolvers
.first()
.map(|r| r.resolver_id.clone())
.unwrap_or_default();
let observed_at_rfc3339 = system_time_to_rfc3339(now);
let trust_anchor_source: String = self
.trust_anchors
.map(|ta| ta.source.clone())
.unwrap_or_else(|| dnssec::TRUST_ANCHOR_SOURCE_IANA_DEFAULT.to_string());
let policy_active = self.dnssec_policy.is_some();
let fail_closed = self.dnssec_policy.map(|p| p.fail_closed).unwrap_or(false);
for hostname in self.hostnames {
let prior = state.hosts.get(hostname).cloned().unwrap_or_default();
if let Some(last) = prior.last_refresh_at {
let age = now.duration_since(last).unwrap_or_default();
let age_secs = age.as_secs();
let floor_active = age_secs < u64::from(min_ttl);
let ceiling_breached = max_stale.is_some_and(|ms| age_secs >= u64::from(ms));
if floor_active && !ceiling_breached {
continue;
}
}
let validated = match validated_resolved.get(hostname) {
Some(Ok(v)) => v.clone(),
Some(Err(_)) | None => {
continue;
}
};
let (dnssec_status, dnssec_event, effective_answer): (
&'static str,
Option<CloudEventV1>,
ResolvedAnswer,
) = match (&validated.validation, policy_active) {
(DnssecValidationResult::Validated { .. }, true) => {
(DNSSEC_STATUS_VALIDATED, None, validated.answer.clone())
}
(DnssecValidationResult::Failed { reason }, true) => {
let payload = DnsAuthorityDnssecFailed {
schema_version: "1.0.0".into(),
cell_id: cell_id.to_string(),
run_id: run_id.to_string(),
resolver_id: resolver_id_owned.clone(),
hostname: hostname.clone(),
reason: DnsAuthorityDnssecFailureReason::ValidationFailed
.as_str()
.to_string(),
fail_closed,
trust_anchor_source: trust_anchor_source.clone(),
policy_digest: self.policy_digest.map(str::to_string),
keyset_id: self.keyset_id.map(str::to_string),
issuer_kid: self.issuer_kid.map(str::to_string),
correlation_id: self.correlation_id.map(str::to_string),
source: Some("resolver_refresh".into()),
observed_at: observed_at_rfc3339.clone(),
};
let _ = reason; let source = self.source.unwrap_or("cellos-supervisor");
let event = cloud_event_v1_dns_authority_dnssec_failed(
source,
&observed_at_rfc3339,
&payload,
)
.ok();
let answer = if fail_closed {
ResolvedAnswer {
targets: Vec::new(),
ttl_seconds: validated.answer.ttl_seconds,
resolver_addr: validated.answer.resolver_addr,
}
} else {
validated.answer.clone()
};
(DNSSEC_STATUS_VALIDATION_FAILED, event, answer)
}
(DnssecValidationResult::Unsigned, true) => {
let payload = DnsAuthorityDnssecFailed {
schema_version: "1.0.0".into(),
cell_id: cell_id.to_string(),
run_id: run_id.to_string(),
resolver_id: resolver_id_owned.clone(),
hostname: hostname.clone(),
reason: DnsAuthorityDnssecFailureReason::UnsignedZone
.as_str()
.to_string(),
fail_closed,
trust_anchor_source: trust_anchor_source.clone(),
policy_digest: self.policy_digest.map(str::to_string),
keyset_id: self.keyset_id.map(str::to_string),
issuer_kid: self.issuer_kid.map(str::to_string),
correlation_id: self.correlation_id.map(str::to_string),
source: Some("resolver_refresh".into()),
observed_at: observed_at_rfc3339.clone(),
};
let source = self.source.unwrap_or("cellos-supervisor");
let event = cloud_event_v1_dns_authority_dnssec_failed(
source,
&observed_at_rfc3339,
&payload,
)
.ok();
let answer = if fail_closed {
ResolvedAnswer {
targets: Vec::new(),
ttl_seconds: validated.answer.ttl_seconds,
resolver_addr: validated.answer.resolver_addr,
}
} else {
validated.answer.clone()
};
(DNSSEC_STATUS_UNSIGNED, event, answer)
}
(_, false) => (DNSSEC_STATUS_NOT_ATTEMPTED, None, validated.answer.clone()),
};
if let Some(ev) = dnssec_event {
out.push(ev);
}
let canonical_targets = canonicalize_targets(&effective_answer.targets);
let previous_digest = if prior.previous_digest.is_empty() {
"empty".to_string()
} else {
prior.previous_digest.clone()
};
let (current_targets, rebind_events) = match self.rebinding_policy {
Some(rb_policy) => {
let decision =
rebinding_state.evaluate(hostname, &canonical_targets, rb_policy);
let mut events: Vec<CloudEventV1> = Vec::new();
if decision.threshold_exceeded {
if let Some(first_novel) = decision.novel_ips.first() {
let prior_count = rebinding_state.history(hostname).len() as u32;
let cumulative =
prior_count.saturating_add(decision.novel_ips.len() as u32);
let payload = DnsAuthorityRebindThreshold {
schema_version: "1.0.0".into(),
cell_id: cell_id.to_string(),
run_id: run_id.to_string(),
hostname: hostname.clone(),
novel_ip: (*first_novel).to_string(),
previous_ip_count: prior_count,
cumulative_ip_count: cumulative,
max_novel_ips_per_hostname: rb_policy.max_novel_ips_per_hostname,
policy_digest: self
.policy_digest
.map(str::to_string)
.unwrap_or_else(empty_policy_digest),
keyset_id: self.keyset_id.map(str::to_string),
issuer_kid: self.issuer_kid.map(str::to_string),
correlation_id: self.correlation_id.map(str::to_string),
resolver_id: if resolver_id_owned.is_empty() {
None
} else {
Some(resolver_id_owned.clone())
},
observed_at: observed_at_rfc3339.clone(),
};
let source = self.source.unwrap_or("cellos-supervisor");
if let Ok(ev) = cloud_event_v1_dns_authority_rebind_threshold(
source,
&observed_at_rfc3339,
&payload,
) {
events.push(ev);
}
}
}
if !decision.allowlist_violations.is_empty() {
let echo: Vec<String> = rb_policy
.response_ip_allowlist
.iter()
.filter(|raw| {
raw.split_once(':')
.is_some_and(|(prefix, _)| prefix == hostname)
})
.cloned()
.collect();
let prior_count = rebinding_state.history(hostname).len() as u32;
let cumulative =
prior_count.saturating_add(decision.novel_ips.len() as u32);
for &offending in &decision.allowlist_violations {
let payload = DnsAuthorityRebindRejected {
schema_version: "1.0.0".into(),
cell_id: cell_id.to_string(),
run_id: run_id.to_string(),
hostname: hostname.clone(),
novel_ip: offending.to_string(),
previous_ip_count: prior_count,
cumulative_ip_count: cumulative,
response_ip_allowlist: echo.clone(),
policy_digest: self
.policy_digest
.map(str::to_string)
.unwrap_or_else(empty_policy_digest),
keyset_id: self.keyset_id.map(str::to_string),
issuer_kid: self.issuer_kid.map(str::to_string),
correlation_id: self.correlation_id.map(str::to_string),
resolver_id: if resolver_id_owned.is_empty() {
None
} else {
Some(resolver_id_owned.clone())
},
observed_at: observed_at_rfc3339.clone(),
};
let source = self.source.unwrap_or("cellos-supervisor");
if let Ok(ev) = cloud_event_v1_dns_authority_rebind_rejected(
source,
&observed_at_rfc3339,
&payload,
) {
events.push(ev);
}
}
}
let effective = canonicalize_targets(&decision.effective_targets);
(effective, events)
}
None => (canonical_targets, Vec::new()),
};
let current_digest = digest_target_set(¤t_targets);
for ev in rebind_events {
out.push(ev);
}
let clamped_ttl: u32 = if min_ttl > 0 && effective_answer.ttl_seconds < min_ttl {
min_ttl
} else {
effective_answer.ttl_seconds
};
let stale_seconds: u32 = match (prior.last_refresh_at, prior.last_ttl_seconds) {
(Some(last), ttl) if ttl > 0 => {
let age = now.duration_since(last).unwrap_or_default().as_secs();
age.saturating_sub(u64::from(ttl)).min(u32::MAX as u64) as u32
}
_ => 0,
};
if current_digest != previous_digest {
let prev_set: BTreeSet<&String> = prior.previous_targets.iter().collect();
let curr_set: BTreeSet<&String> = current_targets.iter().collect();
let added: Vec<String> = curr_set
.difference(&prev_set)
.map(|s| (*s).clone())
.collect();
let removed: Vec<String> = prev_set
.difference(&curr_set)
.map(|s| (*s).clone())
.collect();
let drift = DnsAuthorityDrift {
schema_version: "1.0.0".into(),
cell_id: cell_id.to_string(),
run_id: run_id.to_string(),
resolver_id: resolver_id_owned.clone(),
hostname: hostname.clone(),
previous_targets: prior.previous_targets.clone(),
current_targets: current_targets.clone(),
added_targets: added,
removed_targets: removed,
previous_digest,
current_digest: current_digest.clone(),
ttl_seconds: clamped_ttl,
stale_seconds,
keyset_id: self.keyset_id.map(str::to_string),
issuer_kid: self.issuer_kid.map(str::to_string),
policy_digest: self.policy_digest.map(str::to_string),
correlation_id: self.correlation_id.map(str::to_string),
dnssec_status: Some(dnssec_status.to_string()),
observed_at: observed_at_rfc3339.clone(),
};
let source = self.source.unwrap_or("cellos-supervisor");
if let Ok(ev) =
cloud_event_v1_dns_authority_drift(source, &observed_at_rfc3339, &drift)
{
out.push(ev);
}
}
state.hosts.insert(
hostname.clone(),
HostState {
previous_targets: current_targets.clone(),
previous_digest: current_digest,
last_refresh_at: Some(now),
last_ttl_seconds: clamped_ttl,
},
);
if self.rebinding_policy.is_some() {
rebinding_state.commit(hostname, ¤t_targets);
}
}
out
}
}
fn empty_policy_digest() -> String {
"sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".to_string()
}
fn canonicalize_targets(raw: &[String]) -> Vec<String> {
let mut set: BTreeSet<String> = BTreeSet::new();
for t in raw {
let trimmed = t.trim();
if !trimmed.is_empty() {
set.insert(trimmed.to_string());
}
}
set.into_iter().collect()
}
fn digest_target_set(canonical: &[String]) -> String {
let mut hasher = Sha256::new();
for (i, t) in canonical.iter().enumerate() {
if i > 0 {
hasher.update(b"\n");
}
hasher.update(t.as_bytes());
}
let bytes = hasher.finalize();
let mut out = String::with_capacity(7 + 64);
out.push_str("sha256:");
for b in bytes {
use std::fmt::Write;
let _ = write!(out, "{b:02x}");
}
out
}
fn system_time_to_rfc3339(t: SystemTime) -> String {
let secs = t
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs() as i64;
let dt =
chrono::DateTime::<chrono::Utc>::from_timestamp(secs, 0).unwrap_or_else(chrono::Utc::now);
dt.to_rfc3339()
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::RefCell;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
fn answer_zero_ttl(targets: Vec<String>) -> ResolvedAnswer {
ResolvedAnswer {
targets,
ttl_seconds: 0,
resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
}
}
fn fixed_resolvers() -> Vec<DnsResolver> {
vec![DnsResolver {
resolver_id: "resolver-test-001".into(),
endpoint: "1.1.1.1:53".into(),
protocol: cellos_core::DnsResolverProtocol::Do53Udp,
trust_kid: None,
dnssec: None,
}]
}
fn make<'a>(
policy: Option<&'a DnsRefreshPolicy>,
resolvers: &'a [DnsResolver],
hostnames: &'a [String],
) -> ResolverRefresh<'a> {
ResolverRefresh {
policy,
rebinding_policy: None,
resolvers,
hostnames,
keyset_id: Some("keyset-test-001"),
issuer_kid: Some("kid-test-001"),
policy_digest: None,
correlation_id: None,
source: Some("cellos-supervisor-test"),
dnssec_policy: None,
trust_anchors: None,
}
}
#[test]
fn canonicalize_dedupes_and_sorts() {
let raw: Vec<String> = vec!["1.0.0.1".into(), "1.1.1.1".into(), "1.0.0.1".into()];
let canon = canonicalize_targets(&raw);
assert_eq!(canon, vec!["1.0.0.1".to_string(), "1.1.1.1".to_string()]);
}
#[test]
fn digest_changes_when_targets_change() {
let a = canonicalize_targets(&["1.1.1.1".to_string()]);
let b = canonicalize_targets(&["1.0.0.1".to_string()]);
assert_ne!(digest_target_set(&a), digest_target_set(&b));
}
#[test]
fn first_observation_emits_drift_with_empty_previous() {
let hostnames = vec!["api.example.com".to_string()];
let resolvers = fixed_resolvers();
let refresher = make(None, &resolvers, &hostnames);
let mut state = ResolverState::new();
let resolver = |_h: &str| Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]));
let events = refresher.tick(
&mut state,
&resolver,
SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
"cell-001",
"run-001",
);
assert_eq!(events.len(), 1, "first observation must emit drift");
let data = events[0].data.as_ref().expect("data present");
assert_eq!(data["previousDigest"], "empty");
assert_eq!(data["addedTargets"], serde_json::json!(["1.1.1.1"]));
assert!(data["removedTargets"].as_array().unwrap().is_empty());
}
#[test]
fn stable_targets_emit_no_drift_after_first_observation() {
let hostnames = vec!["api.example.com".to_string()];
let resolvers = fixed_resolvers();
let policy = DnsRefreshPolicy {
min_ttl_seconds: Some(0),
max_stale_seconds: None,
strategy: None,
};
let refresher = make(Some(&policy), &resolvers, &hostnames);
let mut state = ResolverState::new();
let resolver = |_h: &str| Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]));
let now = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
let _ = refresher.tick(&mut state, &resolver, now, "c", "r");
let events = refresher.tick(
&mut state,
&resolver,
now + Duration::from_secs(60),
"c",
"r",
);
assert!(events.is_empty(), "stable targets must not emit drift");
}
#[test]
fn manual_strategy_emits_nothing() {
let hostnames = vec!["api.example.com".to_string()];
let resolvers = fixed_resolvers();
let policy = DnsRefreshPolicy {
min_ttl_seconds: None,
max_stale_seconds: None,
strategy: Some(DnsRefreshStrategy::Manual),
};
let refresher = make(Some(&policy), &resolvers, &hostnames);
let mut state = ResolverState::new();
let resolver = |_h: &str| Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]));
let events = refresher.tick(
&mut state,
&resolver,
SystemTime::UNIX_EPOCH + Duration::from_secs(1),
"c",
"r",
);
assert!(events.is_empty(), "manual strategy must skip ticks");
assert!(state.is_empty(), "state must remain untouched");
}
#[test]
fn floor_skips_refresh_within_min_ttl() {
let hostnames = vec!["api.example.com".to_string()];
let resolvers = fixed_resolvers();
let policy = DnsRefreshPolicy {
min_ttl_seconds: Some(300),
max_stale_seconds: None,
strategy: None,
};
let refresher = make(Some(&policy), &resolvers, &hostnames);
let mut state = ResolverState::new();
let calls: RefCell<u32> = RefCell::new(0);
let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
*calls.borrow_mut() += 1;
Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]))
};
let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
let _ = refresher.tick(&mut state, &resolver, t0, "c", "r");
let _ = refresher.tick(
&mut state,
&resolver,
t0 + Duration::from_secs(60),
"c",
"r",
);
assert_eq!(
*calls.borrow(),
1,
"floor must skip the second resolver call"
);
}
#[test]
fn ceiling_forces_refresh_after_max_stale() {
let hostnames = vec!["api.example.com".to_string()];
let resolvers = fixed_resolvers();
let policy = DnsRefreshPolicy {
min_ttl_seconds: Some(3_000),
max_stale_seconds: Some(60),
strategy: None,
};
let refresher = make(Some(&policy), &resolvers, &hostnames);
let mut state = ResolverState::new();
let calls: RefCell<u32> = RefCell::new(0);
let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
*calls.borrow_mut() += 1;
Ok(answer_zero_ttl(vec!["1.1.1.1".to_string()]))
};
let t0 = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
let _ = refresher.tick(&mut state, &resolver, t0, "c", "r");
let _ = refresher.tick(
&mut state,
&resolver,
t0 + Duration::from_secs(120),
"c",
"r",
);
assert_eq!(
*calls.borrow(),
2,
"ceiling must override the floor and force a refresh"
);
}
#[test]
fn resolver_failure_does_not_emit_drift() {
let hostnames = vec!["api.example.com".to_string()];
let resolvers = fixed_resolvers();
let refresher = make(None, &resolvers, &hostnames);
let mut state = ResolverState::new();
let resolver =
|_h: &str| -> io::Result<ResolvedAnswer> { Err(io::Error::other("transient")) };
let events = refresher.tick(
&mut state,
&resolver,
SystemTime::UNIX_EPOCH + Duration::from_secs(1),
"c",
"r",
);
assert!(
events.is_empty(),
"transient resolver error must not be reported as drift"
);
assert!(state.is_empty(), "failed lookup must not commit state");
}
fn answer_with_ttl(targets: Vec<String>, ttl: u32) -> ResolvedAnswer {
ResolvedAnswer {
targets,
ttl_seconds: ttl,
resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
}
}
#[test]
fn ticker_floors_ttl_to_min_ttl_seconds() {
let hostnames = vec!["api.example.com".to_string()];
let resolvers = fixed_resolvers();
let policy = DnsRefreshPolicy {
min_ttl_seconds: Some(60),
max_stale_seconds: None,
strategy: None,
};
let refresher = make(Some(&policy), &resolvers, &hostnames);
let mut state = ResolverState::new();
let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
Ok(answer_with_ttl(vec!["1.1.1.1".to_string()], 5))
};
let events = refresher.tick(
&mut state,
&resolver,
SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
"c",
"r",
);
assert_eq!(events.len(), 1, "first observation emits drift");
let data = events[0].data.as_ref().expect("data");
assert_eq!(
data["ttlSeconds"], 60,
"sub-floor TTL must be clamped up to min_ttl_seconds"
);
}
#[test]
fn ticker_passes_through_ttl_above_floor() {
let hostnames = vec!["api.example.com".to_string()];
let resolvers = fixed_resolvers();
let policy = DnsRefreshPolicy {
min_ttl_seconds: Some(60),
max_stale_seconds: None,
strategy: None,
};
let refresher = make(Some(&policy), &resolvers, &hostnames);
let mut state = ResolverState::new();
let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
Ok(answer_with_ttl(vec!["1.1.1.1".to_string()], 300))
};
let events = refresher.tick(
&mut state,
&resolver,
SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
"c",
"r",
);
let data = events[0].data.as_ref().expect("data");
assert_eq!(
data["ttlSeconds"], 300,
"above-floor TTL must pass through verbatim"
);
}
#[test]
fn ticker_zero_min_ttl_means_no_floor() {
let hostnames = vec!["api.example.com".to_string()];
let resolvers = fixed_resolvers();
let policy = DnsRefreshPolicy {
min_ttl_seconds: Some(0),
max_stale_seconds: None,
strategy: None,
};
let refresher = make(Some(&policy), &resolvers, &hostnames);
let mut state = ResolverState::new();
let resolver = |_h: &str| -> io::Result<ResolvedAnswer> {
Ok(answer_with_ttl(vec!["1.1.1.1".to_string()], 1))
};
let events = refresher.tick(
&mut state,
&resolver,
SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000),
"c",
"r",
);
let data = events[0].data.as_ref().expect("data");
assert_eq!(
data["ttlSeconds"], 1,
"min_ttl_seconds=0 means no clamp — pass through TTL=1"
);
}
}