crabka-operator 0.3.6

Kubernetes operator for Crabka clusters
Documentation
//! TLS-auth helpers for the `KafkaUser` reconciler.
//!
//! Owns:
//! - per-user X.509 cert issuance + renewal,
//! - the per-user TLS-credential Secret render.
//!
//! `controller/user.rs` dispatches into here from its reconcile pipeline
//! when `spec.authentication` is `Authentication::Tls(_)`.

use std::collections::BTreeMap;

use crabka_security::ca::{self, CaMaterial};
use k8s_openapi::ByteString;
use k8s_openapi::api::core::v1::Secret;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
use kube::api::{Api, Patch, PatchParams};
use kube::{Resource, ResourceExt as _};
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;

use crate::controller::common::{FIELD_MANAGER, ReconcileError, read_pem_key};
use crate::crd::KafkaUser;
use crate::crd::user::TlsAuth;

/// Default cert lifetime (days) when `TlsAuth::validity_days` is absent.
pub(crate) const DEFAULT_VALIDITY_DAYS: u32 = 365;
/// Default renewal window (days) when `TlsAuth::renewal_days` is absent.
pub(crate) const DEFAULT_RENEWAL_DAYS: u32 = 30;

/// Outcome of `ensure_user_cert_secret`. Drives the status update.
#[derive(Debug, Clone)]
pub(crate) struct UserCertStatus {
    /// RFC3339 `notAfter` from the (newly issued or reused) cert.
    pub not_after: String,
    /// Whether the operator issued a new cert this reconcile.
    /// Pure observability; not load-bearing.
    pub issued_new: bool,
}

/// Get-or-create the per-user cert Secret. Idempotent: if the existing
/// Secret carries a cert whose `notAfter` is more than `renewal_days`
/// in the future, returns its status unchanged. Otherwise issues a new
/// cert and PATCH-applies the Secret.
pub(crate) async fn ensure_user_cert_secret(
    secret_api: &Api<Secret>,
    obj: &KafkaUser,
    ca_material: &CaMaterial,
    auth: &TlsAuth,
) -> Result<UserCertStatus, ReconcileError> {
    let name = obj.name_any();
    let validity = auth.validity_days.unwrap_or(DEFAULT_VALIDITY_DAYS);
    let renewal = auth.renewal_days.unwrap_or(DEFAULT_RENEWAL_DAYS);

    if let Some(existing) = secret_api.get_opt(&name).await?
        && let Some(not_after) = read_user_cert_not_after(&existing)
        && !is_cert_expiring_soon(&not_after, renewal, OffsetDateTime::now_utc())
    {
        return Ok(UserCertStatus {
            not_after: format_rfc3339(not_after)?,
            issued_new: false,
        });
    }

    let user_cert =
        ca::issue_user_cert(&ca_material.cert_pem, &ca_material.key_pem, &name, validity)
            .map_err(ReconcileError::Ca)?;

    let secret = render_user_cert_secret(obj, &user_cert, &ca_material.cert_pem)?;
    let params = PatchParams {
        field_manager: Some(FIELD_MANAGER.into()),
        force: true,
        ..Default::default()
    };
    secret_api
        .patch(&name, &params, &Patch::Apply(&secret))
        .await?;
    Ok(UserCertStatus {
        not_after: user_cert.not_after,
        issued_new: true,
    })
}

/// Compose Kafka principal for a TLS user (`User:CN=<name>`). The
/// SCRAM path lives in `controller/user.rs::principal_for` for now;
/// Batch B2 unifies them.
#[must_use]
pub(crate) fn tls_principal(name: &str) -> String {
    format!("User:CN={name}")
}

/// Pure: is `not_after` within `renewal_days` of `now`?
#[must_use]
pub(crate) fn is_cert_expiring_soon(
    not_after: &OffsetDateTime,
    renewal_days: u32,
    now: OffsetDateTime,
) -> bool {
    let window = time::Duration::days(i64::from(renewal_days));
    *not_after <= now + window
}

fn format_rfc3339(t: OffsetDateTime) -> Result<String, ReconcileError> {
    t.format(&Rfc3339)
        .map_err(|e| ReconcileError::CertParse(format!("rfc3339 format: {e}")))
}

/// Parse `user.crt` PEM out of an existing user Secret and return
/// the cert's `notAfter` as a `time::OffsetDateTime`. Returns `None`
/// if the key is missing, the PEM is malformed, or the cert won't
/// parse — caller treats `None` as "reissue".
fn read_user_cert_not_after(secret: &Secret) -> Option<OffsetDateTime> {
    let pem = read_pem_key(secret, "user.crt")?;
    cert_not_after_from_pem(&pem)
}

fn cert_not_after_from_pem(pem: &str) -> Option<OffsetDateTime> {
    use x509_parser::pem::parse_x509_pem;
    let (_, p) = parse_x509_pem(pem.as_bytes()).ok()?;
    let cert = p.parse_x509().ok()?;
    let ts = cert.validity().not_after.timestamp();
    OffsetDateTime::from_unix_timestamp(ts).ok()
}

fn render_user_cert_secret(
    obj: &KafkaUser,
    user_cert: &ca::UserCert,
    ca_cert_pem: &str,
) -> Result<Secret, ReconcileError> {
    let name = obj.name_any();
    let mut labels: BTreeMap<String, String> = BTreeMap::new();
    labels.insert("app.kubernetes.io/name".into(), "crabka-broker".into());
    labels.insert(
        "app.kubernetes.io/managed-by".into(),
        "crabka-operator".into(),
    );
    if let Some(cluster) = obj
        .meta()
        .labels
        .as_ref()
        .and_then(|l| l.get("crabka.io/cluster"))
    {
        labels.insert("crabka.io/cluster".into(), cluster.clone());
    }
    labels.insert("crabka.io/user".into(), name.clone());
    labels.insert("crabka.io/auth".into(), "tls".into());

    let mut data: BTreeMap<String, ByteString> = BTreeMap::new();
    data.insert(
        "user.crt".into(),
        ByteString(user_cert.cert_pem.as_bytes().to_vec()),
    );
    data.insert(
        "user.key".into(),
        ByteString(user_cert.key_pem.as_bytes().to_vec()),
    );
    data.insert("ca.crt".into(), ByteString(ca_cert_pem.as_bytes().to_vec()));

    Ok(Secret {
        metadata: ObjectMeta {
            name: Some(name),
            namespace: obj.meta().namespace.clone(),
            labels: Some(labels),
            owner_references: Some(vec![user_owner_ref(obj)?]),
            ..Default::default()
        },
        type_: Some("Opaque".into()),
        data: Some(data),
        ..Default::default()
    })
}

fn user_owner_ref(obj: &KafkaUser) -> Result<OwnerReference, ReconcileError> {
    let uid = obj
        .meta()
        .uid
        .as_deref()
        .ok_or(ReconcileError::MissingUid)?;
    Ok(OwnerReference {
        api_version: <KafkaUser as Resource>::api_version(&()).to_string(),
        kind: <KafkaUser as Resource>::kind(&()).to_string(),
        name: obj.name_any(),
        uid: uid.to_string(),
        controller: Some(true),
        block_owner_deletion: Some(true),
    })
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert2::assert;

    #[test]
    fn tls_principal_format() {
        assert!(tls_principal("alice") == "User:CN=alice");
    }

    #[test]
    fn is_cert_expiring_soon_boundary_cases() {
        let now = OffsetDateTime::now_utc();

        let in_60_days = now + time::Duration::days(60);
        assert!(
            !is_cert_expiring_soon(&in_60_days, 30, now),
            "60d > 30d window: not expiring"
        );

        let in_30_days = now + time::Duration::days(30);
        assert!(
            is_cert_expiring_soon(&in_30_days, 30, now),
            "30d == 30d window: at boundary, treat as expiring (<=)"
        );

        let in_1_day = now + time::Duration::days(1);
        assert!(
            is_cert_expiring_soon(&in_1_day, 30, now),
            "1d within 30d window: expiring"
        );

        let yesterday = now - time::Duration::days(1);
        assert!(
            is_cert_expiring_soon(&yesterday, 30, now),
            "already past notAfter: expiring"
        );
    }

    #[test]
    fn cert_not_after_round_trips() {
        let ca = ca::generate_clients_ca("test-root", 365).expect("ca");
        let before = OffsetDateTime::now_utc();
        let user = ca::issue_user_cert(&ca.cert_pem, &ca.key_pem, "alice", 365).expect("leaf");

        let parsed = cert_not_after_from_pem(&user.cert_pem).expect("notAfter parses");
        let expected = before + time::Duration::days(365);

        let delta = (parsed - expected).whole_seconds().abs();
        assert!(
            delta <= 5,
            "notAfter delta {delta}s exceeds ±5s tolerance (parsed={parsed}, expected={expected})"
        );
    }

    #[test]
    fn cert_not_after_from_pem_returns_none_on_malformed_input() {
        assert!(cert_not_after_from_pem("not a pem").is_none());
        assert!(cert_not_after_from_pem("").is_none());
        // Valid PEM framing, garbage body — exercises the parse_x509 failure
        // branch.
        assert!(
            cert_not_after_from_pem("-----BEGIN CERTIFICATE-----\nQUFB\n-----END CERTIFICATE-----")
                .is_none()
        );
    }

    #[test]
    fn read_pem_key_returns_some_when_present() {
        let mut data = BTreeMap::new();
        data.insert("ca.key".into(), ByteString(b"abc".to_vec()));
        let s = Secret {
            data: Some(data),
            ..Default::default()
        };
        assert!(read_pem_key(&s, "ca.key").as_deref() == Some("abc"));
    }

    #[test]
    fn read_pem_key_returns_none_when_data_missing() {
        let s = Secret::default();
        assert!(read_pem_key(&s, "ca.key").is_none());
    }

    #[test]
    fn read_pem_key_returns_none_when_key_missing() {
        let mut data = BTreeMap::new();
        data.insert("other".into(), ByteString(b"abc".to_vec()));
        let s = Secret {
            data: Some(data),
            ..Default::default()
        };
        assert!(read_pem_key(&s, "ca.key").is_none());
    }

    #[test]
    fn read_pem_key_returns_none_on_non_utf8() {
        let mut data = BTreeMap::new();
        data.insert("ca.key".into(), ByteString(vec![0xFF, 0xFE, 0xFD]));
        let s = Secret {
            data: Some(data),
            ..Default::default()
        };
        assert!(read_pem_key(&s, "ca.key").is_none());
    }

    #[test]
    fn read_user_cert_not_after_returns_none_when_user_crt_missing() {
        let mut data = BTreeMap::new();
        data.insert("user.key".into(), ByteString(b"junk".to_vec()));
        let s = Secret {
            data: Some(data),
            ..Default::default()
        };
        assert!(read_user_cert_not_after(&s).is_none());
    }

    #[test]
    fn format_rfc3339_round_trips() {
        let t = OffsetDateTime::from_unix_timestamp(1_700_000_000).expect("unix ts");
        let s = format_rfc3339(t).expect("formats");
        assert!(s == "2023-11-14T22:13:20Z");
    }

    fn dummy_ku() -> KafkaUser {
        let mut ku = KafkaUser::new(
            "alice",
            crate::crd::KafkaUserSpec {
                authentication: crate::crd::Authentication::Tls(TlsAuth::default()),
                authorization: None,
                quotas: None,
            },
        );
        ku.metadata.namespace = Some("ns".into());
        ku.metadata.uid = Some("user-uid".into());
        let mut labels = BTreeMap::new();
        labels.insert("crabka.io/cluster".into(), "demo".into());
        ku.metadata.labels = Some(labels);
        ku
    }

    #[test]
    fn render_user_cert_secret_carries_three_keys_and_tls_auth_label() {
        let ku = dummy_ku();
        let user_cert = ca::UserCert {
            cert_pem: "CERT".into(),
            key_pem: "KEY".into(),
            not_after: "2027-01-01T00:00:00Z".into(),
        };
        let secret = render_user_cert_secret(&ku, &user_cert, "CA-CERT").expect("renders");

        assert!(secret.metadata.name.as_deref() == Some("alice"));
        assert!(secret.metadata.namespace.as_deref() == Some("ns"));
        let labels = secret.metadata.labels.as_ref().expect("labels");
        assert!(labels.get("crabka.io/auth").map(String::as_str) == Some("tls"));
        assert!(labels.get("crabka.io/user").map(String::as_str) == Some("alice"));
        assert!(labels.get("crabka.io/cluster").map(String::as_str) == Some("demo"));
        let owners = secret.metadata.owner_references.as_ref().expect("owner");
        assert!(owners.len() == 1);
        assert!(owners[0].kind == "KafkaUser");
        assert!(owners[0].uid == "user-uid");
        let data = secret.data.as_ref().expect("data");
        assert!(data.get("user.crt").map(|bs| bs.0.as_slice()) == Some(b"CERT".as_slice()));
        assert!(data.get("user.key").map(|bs| bs.0.as_slice()) == Some(b"KEY".as_slice()));
        assert!(data.get("ca.crt").map(|bs| bs.0.as_slice()) == Some(b"CA-CERT".as_slice()));
    }

    #[test]
    fn render_user_cert_secret_omits_cluster_label_when_label_absent() {
        let mut ku = dummy_ku();
        ku.metadata.labels = None;
        let user_cert = ca::UserCert {
            cert_pem: "C".into(),
            key_pem: "K".into(),
            not_after: "2027-01-01T00:00:00Z".into(),
        };
        let secret = render_user_cert_secret(&ku, &user_cert, "CA").expect("renders");
        let labels = secret.metadata.labels.as_ref().expect("labels");
        assert!(!labels.contains_key("crabka.io/cluster"));
    }

    #[test]
    fn user_owner_ref_errors_on_missing_uid() {
        let mut ku = dummy_ku();
        ku.metadata.uid = None;
        assert!(matches!(
            user_owner_ref(&ku),
            Err(ReconcileError::MissingUid)
        ));
    }

    #[test]
    fn user_owner_ref_carries_block_owner_deletion() {
        let ku = dummy_ku();
        let owner = user_owner_ref(&ku).expect("owner ref");
        assert!(owner.kind == "KafkaUser");
        assert!(owner.name == "alice");
        assert!(owner.uid == "user-uid");
        assert!(owner.controller == Some(true));
        assert!(owner.block_owner_deletion == Some(true));
    }
}