cloudflare_dns_operator/
dns_check.rs

1use crate::{
2    context::Context,
3    dns::lookup as dns_lookup,
4    resources::CloudflareDNSRecord,
5};
6use futures::Stream;
7use kube::{
8    api::ListParams,
9    runtime::reflector::{
10        Lookup,
11        ObjectRef,
12    },
13    Api,
14};
15use std::{
16    net::SocketAddr,
17    sync::Arc,
18    time::Duration,
19};
20use tokio::sync::mpsc;
21
22pub type DnsCheckSender = mpsc::Sender<DnsCheckRequest>;
23pub type DnsCheckReceiver = mpsc::Receiver<DnsCheckRequest>;
24
25pub enum DnsCheckRequest {
26    CheckSingleRecord { name: String, namespace: String },
27}
28
29/// Try to resolve DNS records of `CloudflareDNSRecord` resources and compare them with the specified content. Will emit
30/// object refs that will trigger resource updates through [`kube::runtime::Controller::reconcile_on`].
31pub fn start_dns_check(
32    ctx: Arc<Context>,
33    mut dns_check_receiver: DnsCheckReceiver,
34    check_interval: Option<Duration>,
35    nameserver: SocketAddr,
36) -> impl Stream<Item = ObjectRef<CloudflareDNSRecord>> + Send + 'static {
37    async_stream::stream! {
38        let Some(check_interval) = check_interval else {
39            return;
40        };
41
42        let mut timer = tokio::time::interval(check_interval);
43        let client = ctx.client.clone();
44
45        loop {
46            let resources = tokio::select! {
47                _ = timer.tick() => {
48                    let api = Api::<CloudflareDNSRecord>::all(client.clone());
49                    match api.list(&ListParams::default()).await {
50                        Ok(resources) => resources.into_iter().collect(),
51                        Err(err) => {
52                            error!("Failed to list CloudflareDNSRecord resources: {:?}", err);
53                            continue;
54                        }
55                    }
56                },
57
58                Some(request) = dns_check_receiver.recv() => {
59                    match request {
60                        DnsCheckRequest::CheckSingleRecord { name, namespace } => {
61                            trace!("Request to check single DNS record {}/{}", namespace, name);
62                            match Api::<CloudflareDNSRecord>::namespaced(client.clone(), &namespace).get(&name).await {
63                                Ok(resource) => vec![resource],
64                                Err(err) => {
65                                    error!("Failed to get CloudflareDNSRecord {}/{}: {}", namespace, name, err);
66                                    continue;
67                                }
68                            }
69                        },
70                    }
71                }
72            };
73
74            debug!("Checking DNS {} CloudflareDNSRecord resources", resources.len());
75
76            for resource in resources {
77                let Some(name) = resource.metadata.name.clone() else {
78                    error!("Resource has no name: {:?}", resource);
79                    continue;
80                };
81                let Some(ns) = resource.metadata.namespace.clone() else {
82                    error!("Resource has no namespace: {:?}", resource);
83                    continue;
84                };
85
86                let key = format!("{ns}:{name}");
87
88                if resource.status.clone().is_none() {
89                    // Status should be set on first reconcile
90                    warn!("Resource {key:?} has not yet a status");
91                    continue;
92                };
93
94                let qname = &resource.spec.name;
95
96                let Some(content) = resource.spec.lookup_content(&ctx.client, &ns).await.ok().flatten() else {
97                    error!("unable to resolve content for CloudflareDNSRecord {key:?}");
98                    continue;
99                };
100
101                let ty = resource.spec.ty.unwrap_or_default();
102
103                let dns_record_data = match dns_lookup::resolve(qname, ty,nameserver).await {
104                    Ok(Some(it)) => it,
105                    Ok(None) => {
106                        error!("Unable to resolve unsupported DNS record type: {ty:?} for {key:?}");
107                        continue;
108                    }
109                    Err(err) => {
110                        error!("Failed to resolve DNS record: {err:?} for {key:?}");
111                        Vec::new()
112                    }
113                };
114
115                let matches = dns_record_data.contains(&content);
116
117                trace!(?key, ?dns_record_data, ?content, "Matches DNS record?");
118                let mut dns_lookup_success = ctx.dns_lookup_success.lock().await;
119                let matched_before = dns_lookup_success.get(&key).cloned().unwrap_or(false);
120                let changed = matched_before != matches;
121                trace!(?key, ?matches, matched_before, changed, "DNS record matches");
122                dns_lookup_success.insert(key, matches);
123
124                if changed {
125                    yield resource.to_object_ref(());
126                }
127            }
128        }
129    }
130}