cloudflare_dns_operator/
dns_check.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use crate::{
    context::Context,
    dns::lookup as dns_lookup,
    resources::CloudflareDNSRecord,
};
use futures::Stream;
use kube::{
    api::ListParams,
    runtime::reflector::{
        Lookup,
        ObjectRef,
    },
    Api,
};
use std::{
    net::SocketAddr,
    sync::Arc,
    time::Duration,
};
use tokio::sync::mpsc;

pub type DnsCheckSender = mpsc::Sender<DnsCheckRequest>;
pub type DnsCheckReceiver = mpsc::Receiver<DnsCheckRequest>;

pub enum DnsCheckRequest {
    CheckSingleRecord { name: String, namespace: String },
}

/// Try to resolve DNS records of `CloudflareDNSRecord` resources and compare them with the specified content. Will emit
/// object refs that will trigger resource updates through [`kube::runtime::Controller::reconcile_on`].
pub fn start_dns_check(
    ctx: Arc<Context>,
    mut dns_check_receiver: DnsCheckReceiver,
    check_interval: Option<Duration>,
    nameserver: SocketAddr,
) -> impl Stream<Item = ObjectRef<CloudflareDNSRecord>> + Send + 'static {
    async_stream::stream! {
        let Some(check_interval) = check_interval else {
            return;
        };

        let mut timer = tokio::time::interval(check_interval);
        let client = ctx.client.clone();

        loop {
            let resources = tokio::select! {
                _ = timer.tick() => {
                    let api = Api::<CloudflareDNSRecord>::all(client.clone());
                    match api.list(&ListParams::default()).await {
                        Ok(resources) => resources.into_iter().collect(),
                        Err(err) => {
                            error!("Failed to list CloudflareDNSRecord resources: {:?}", err);
                            continue;
                        }
                    }
                },

                Some(request) = dns_check_receiver.recv() => {
                    match request {
                        DnsCheckRequest::CheckSingleRecord { name, namespace } => {
                            trace!("Request to check single DNS record {}/{}", namespace, name);
                            match Api::<CloudflareDNSRecord>::namespaced(client.clone(), &namespace).get(&name).await {
                                Ok(resource) => vec![resource],
                                Err(err) => {
                                    error!("Failed to get CloudflareDNSRecord {}/{}: {}", namespace, name, err);
                                    continue;
                                }
                            }
                        },
                    }
                }
            };

            debug!("Checking DNS {} CloudflareDNSRecord resources", resources.len());

            for resource in resources {
                let Some(name) = resource.metadata.name.clone() else {
                    error!("Resource has no name: {:?}", resource);
                    continue;
                };
                let Some(ns) = resource.metadata.namespace.clone() else {
                    error!("Resource has no namespace: {:?}", resource);
                    continue;
                };

                let key = format!("{ns}:{name}");

                if resource.status.clone().is_none() {
                    // Status should be set on first reconcile
                    warn!("Resource {key:?} has not yet a status");
                    continue;
                };

                let qname = &resource.spec.name;

                let Some(content) = resource.spec.lookup_content(&ctx.client, &ns).await.ok().flatten() else {
                    error!("unable to resolve content for CloudflareDNSRecord {key:?}");
                    continue;
                };

                let ty = resource.spec.ty.unwrap_or_default();

                let dns_record_data = match dns_lookup::resolve(qname, ty,nameserver).await {
                    Ok(Some(it)) => it,
                    Ok(None) => {
                        error!("Unable to resolve unsupported DNS record type: {ty:?} for {key:?}");
                        continue;
                    }
                    Err(err) => {
                        error!("Failed to resolve DNS record: {err:?} for {key:?}");
                        Vec::new()
                    }
                };

                let matches = dns_record_data.contains(&content);

                trace!(?key, ?dns_record_data, ?content, "Matches DNS record?");
                let mut dns_lookup_success = ctx.dns_lookup_success.lock().await;
                let matched_before = dns_lookup_success.get(&key).cloned().unwrap_or(false);
                let changed = matched_before != matches;
                trace!(?key, ?matches, matched_before, changed, "DNS record matches");
                dns_lookup_success.insert(key, matches);

                if changed {
                    yield resource.to_object_ref(());
                }
            }
        }
    }
}