cloudflare-dns-operator 0.1.11

Kubernetes operator to manage Cloudflare DNS records
Documentation
#[macro_use]
extern crate tracing;

use clap::Parser;
use cloudflare_dns_operator::{
    context,
    dns,
    dns_check,
    reconcile::{
        self,
        ReconcileError,
    },
    resources,
    services,
};
use context::Context;
use eyre::Result;
use futures::StreamExt as _;
use k8s_openapi::api::core::v1::Service;
use kube::{
    runtime::{
        controller::Action,
        finalizer,
        finalizer::Event,
        watcher,
        Controller,
    },
    Api,
    CustomResourceExt as _,
    Resource as _,
};
use services::is_suitable_service;
use std::{
    net::SocketAddr,
    sync::Arc,
    time::Duration,
};
use tokio::sync::mpsc;

#[derive(Parser)]
#[command(version, about)]
enum Args {
    Crds,
    Controller(ArgsController),
    ListZones(ArgsController),
}

#[derive(Parser)]
struct ArgsController {
    #[clap(long, env = "CLOUDFLARE_API_TOKEN", help = "Cloudflare API token")]
    cloudflare_api_token: String,

    #[clap(
        long = "dns-check",
        env = "CHECK_DNS_RESOLUTION",
        help = "Do active DNS checks by querying 1.1.1.1? If not set, DNS check is disabled",
        value_parser = humantime::parse_duration
    )]
    dns_checks: Option<Duration>,

    #[clap(
        long,
        env = "NAMESERVER_FOR_DNS_CHECK",
        help = "Nameserver and port to use for DNS checks",
        default_value = "1.1.1.1:53"
    )]
    nameserver: SocketAddr,
}

#[tokio::main]
async fn main() -> Result<()> {
    color_eyre::install().expect("color_eyre init");
    tracing_subscriber::fmt::init();

    match Args::parse() {
        Args::Crds => {
            let yaml = serde_yaml::to_string(&resources::CloudflareDNSRecord::crd()).expect("Failed to serialize CRD");
            print!("{yaml}")
        }
        Args::Controller(args) => {
            run_controller(args).await?;
        }
        Args::ListZones(args) => {
            let zones = dns::cloudflare::list_zones(&args.cloudflare_api_token).await?;
            dbg!(zones);
        }
    }

    Ok(())
}

async fn run_controller(
    ArgsController {
        cloudflare_api_token,
        dns_checks,
        nameserver,
    }: ArgsController,
) -> Result<(), ReconcileError> {
    let client = kube::Client::try_default().await?;

    let dns_resources = Api::<resources::CloudflareDNSRecord>::all(client.clone());

    let (dns_check_tx, dns_check_rx) = mpsc::channel(64);

    let context = Arc::new(Context {
        client: client.clone(),
        cloudflare_api_token,
        do_dns_check: dns_checks.is_some(),
        dns_check_tx,
        dns_lookup_success: Default::default(),
    });

    let dns_change = dns_check::start_dns_check(context.clone(), dns_check_rx, dns_checks, nameserver);

    info!("Starting controller");

    Controller::new(dns_resources, Default::default())
        // watch load balancers / external ip services to adjust dns <-> public ip
        .watches(
            Api::<Service>::all(client),
            watcher::Config::default(),
            is_suitable_service,
        )
        .reconcile_on(dns_change)
        .shutdown_on_signal()
        .run(reconcile, error_policy, context)
        .for_each(|msg| async move { info!("Reconciled: {:?}", msg) })
        .await;

    info!("Controller stopped");

    Ok(())
}

async fn reconcile(
    resource: Arc<resources::CloudflareDNSRecord>,
    ctx: Arc<Context>,
) -> Result<Action, finalizer::Error<ReconcileError>> {
    let ns = resource.meta().namespace.as_deref().unwrap_or("default");
    let api: Api<resources::CloudflareDNSRecord> = Api::namespaced(ctx.client.clone(), ns);

    finalizer(&api, "dns.cloudflare.com/delete-dns-record", resource, |event| async {
        let result = match event {
            Event::Apply(server) => reconcile::apply(server, ctx.clone()).await,
            Event::Cleanup(server) => reconcile::cleanup(server, ctx.clone()).await,
        };

        if let Err(err) = result {
            match err {
                reconcile::ReconcileError::Kube(kube::Error::Api(err)) if err.code == 409 => {
                    warn!("Conflict when reconciling object: {err}");
                }
                reconcile::ReconcileError::Kube(kube::Error::Api(err)) if err.code == 404 => {
                    warn!("Object not found when reconciling object: {err}");
                }
                reconcile::ReconcileError::Deletion(err) => {
                    error!("Failed to delete object: {err:?}");
                }
                err => {
                    return Err(err);
                }
            }
        }

        Ok(Action::requeue(Duration::from_secs(60)))
    })
    .await
}

fn error_policy(
    _object: Arc<resources::CloudflareDNSRecord>,
    err: &finalizer::Error<ReconcileError>,
    _ctx: Arc<Context>,
) -> Action {
    error!("Error reconciling: {:?}", err);
    Action::requeue(Duration::from_secs(15))
}