cloudflare_dns_operator/
dns_check.rs1use crate::{
2 context::Context,
3 dns::lookup as dns_lookup,
4 resources::CloudflareDNSRecord,
5};
6use futures::Stream;
7use kube::{
8 api::ListParams,
9 runtime::reflector::{
10 Lookup,
11 ObjectRef,
12 },
13 Api,
14};
15use std::{
16 net::SocketAddr,
17 sync::Arc,
18 time::Duration,
19};
20use tokio::sync::mpsc;
21
22pub type DnsCheckSender = mpsc::Sender<DnsCheckRequest>;
23pub type DnsCheckReceiver = mpsc::Receiver<DnsCheckRequest>;
24
25pub enum DnsCheckRequest {
26 CheckSingleRecord { name: String, namespace: String },
27}
28
29pub fn start_dns_check(
32 ctx: Arc<Context>,
33 mut dns_check_receiver: DnsCheckReceiver,
34 check_interval: Option<Duration>,
35 nameserver: SocketAddr,
36) -> impl Stream<Item = ObjectRef<CloudflareDNSRecord>> + Send + 'static {
37 async_stream::stream! {
38 let Some(check_interval) = check_interval else {
39 return;
40 };
41
42 let mut timer = tokio::time::interval(check_interval);
43 let client = ctx.client.clone();
44
45 loop {
46 let resources = tokio::select! {
47 _ = timer.tick() => {
48 let api = Api::<CloudflareDNSRecord>::all(client.clone());
49 match api.list(&ListParams::default()).await {
50 Ok(resources) => resources.into_iter().collect(),
51 Err(err) => {
52 error!("Failed to list CloudflareDNSRecord resources: {:?}", err);
53 continue;
54 }
55 }
56 },
57
58 Some(request) = dns_check_receiver.recv() => {
59 match request {
60 DnsCheckRequest::CheckSingleRecord { name, namespace } => {
61 trace!("Request to check single DNS record {}/{}", namespace, name);
62 match Api::<CloudflareDNSRecord>::namespaced(client.clone(), &namespace).get(&name).await {
63 Ok(resource) => vec![resource],
64 Err(err) => {
65 error!("Failed to get CloudflareDNSRecord {}/{}: {}", namespace, name, err);
66 continue;
67 }
68 }
69 },
70 }
71 }
72 };
73
74 debug!("Checking DNS {} CloudflareDNSRecord resources", resources.len());
75
76 for resource in resources {
77 let Some(name) = resource.metadata.name.clone() else {
78 error!("Resource has no name: {:?}", resource);
79 continue;
80 };
81 let Some(ns) = resource.metadata.namespace.clone() else {
82 error!("Resource has no namespace: {:?}", resource);
83 continue;
84 };
85
86 let key = format!("{ns}:{name}");
87
88 if resource.status.clone().is_none() {
89 warn!("Resource {key:?} has not yet a status");
91 continue;
92 };
93
94 let qname = &resource.spec.name;
95
96 let Some(content) = resource.spec.lookup_content(&ctx.client, &ns).await.ok().flatten() else {
97 error!("unable to resolve content for CloudflareDNSRecord {key:?}");
98 continue;
99 };
100
101 let ty = resource.spec.ty.unwrap_or_default();
102
103 let dns_record_data = match dns_lookup::resolve(qname, ty,nameserver).await {
104 Ok(Some(it)) => it,
105 Ok(None) => {
106 error!("Unable to resolve unsupported DNS record type: {ty:?} for {key:?}");
107 continue;
108 }
109 Err(err) => {
110 error!("Failed to resolve DNS record: {err:?} for {key:?}");
111 Vec::new()
112 }
113 };
114
115 let matches = dns_record_data.contains(&content);
116
117 trace!(?key, ?dns_record_data, ?content, "Matches DNS record?");
118 let mut dns_lookup_success = ctx.dns_lookup_success.lock().await;
119 let matched_before = dns_lookup_success.get(&key).cloned().unwrap_or(false);
120 let changed = matched_before != matches;
121 trace!(?key, ?matches, matched_before, changed, "DNS record matches");
122 dns_lookup_success.insert(key, matches);
123
124 if changed {
125 yield resource.to_object_ref(());
126 }
127 }
128 }
129 }
130}