use std::collections::BTreeMap;
use crabka_security::ca::{self, CaMaterial};
use k8s_openapi::ByteString;
use k8s_openapi::api::core::v1::Secret;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
use kube::api::{Api, Patch, PatchParams};
use kube::{Resource, ResourceExt as _};
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use crate::controller::common::{FIELD_MANAGER, ReconcileError, read_pem_key};
use crate::crd::KafkaUser;
use crate::crd::user::TlsAuth;
pub(crate) const DEFAULT_VALIDITY_DAYS: u32 = 365;
pub(crate) const DEFAULT_RENEWAL_DAYS: u32 = 30;
#[derive(Debug, Clone)]
pub(crate) struct UserCertStatus {
pub not_after: String,
pub issued_new: bool,
}
pub(crate) async fn ensure_user_cert_secret(
secret_api: &Api<Secret>,
obj: &KafkaUser,
ca_material: &CaMaterial,
auth: &TlsAuth,
) -> Result<UserCertStatus, ReconcileError> {
let name = obj.name_any();
let validity = auth.validity_days.unwrap_or(DEFAULT_VALIDITY_DAYS);
let renewal = auth.renewal_days.unwrap_or(DEFAULT_RENEWAL_DAYS);
if let Some(existing) = secret_api.get_opt(&name).await?
&& let Some(not_after) = read_user_cert_not_after(&existing)
&& !is_cert_expiring_soon(¬_after, renewal, OffsetDateTime::now_utc())
{
return Ok(UserCertStatus {
not_after: format_rfc3339(not_after)?,
issued_new: false,
});
}
let user_cert =
ca::issue_user_cert(&ca_material.cert_pem, &ca_material.key_pem, &name, validity)
.map_err(ReconcileError::Ca)?;
let secret = render_user_cert_secret(obj, &user_cert, &ca_material.cert_pem)?;
let params = PatchParams {
field_manager: Some(FIELD_MANAGER.into()),
force: true,
..Default::default()
};
secret_api
.patch(&name, ¶ms, &Patch::Apply(&secret))
.await?;
Ok(UserCertStatus {
not_after: user_cert.not_after,
issued_new: true,
})
}
#[must_use]
pub(crate) fn tls_principal(name: &str) -> String {
format!("User:CN={name}")
}
#[must_use]
pub(crate) fn is_cert_expiring_soon(
not_after: &OffsetDateTime,
renewal_days: u32,
now: OffsetDateTime,
) -> bool {
let window = time::Duration::days(i64::from(renewal_days));
*not_after <= now + window
}
fn format_rfc3339(t: OffsetDateTime) -> Result<String, ReconcileError> {
t.format(&Rfc3339)
.map_err(|e| ReconcileError::CertParse(format!("rfc3339 format: {e}")))
}
fn read_user_cert_not_after(secret: &Secret) -> Option<OffsetDateTime> {
let pem = read_pem_key(secret, "user.crt")?;
cert_not_after_from_pem(&pem)
}
fn cert_not_after_from_pem(pem: &str) -> Option<OffsetDateTime> {
use x509_parser::pem::parse_x509_pem;
let (_, p) = parse_x509_pem(pem.as_bytes()).ok()?;
let cert = p.parse_x509().ok()?;
let ts = cert.validity().not_after.timestamp();
OffsetDateTime::from_unix_timestamp(ts).ok()
}
fn render_user_cert_secret(
obj: &KafkaUser,
user_cert: &ca::UserCert,
ca_cert_pem: &str,
) -> Result<Secret, ReconcileError> {
let name = obj.name_any();
let mut labels: BTreeMap<String, String> = BTreeMap::new();
labels.insert("app.kubernetes.io/name".into(), "crabka-broker".into());
labels.insert(
"app.kubernetes.io/managed-by".into(),
"crabka-operator".into(),
);
if let Some(cluster) = obj
.meta()
.labels
.as_ref()
.and_then(|l| l.get("crabka.io/cluster"))
{
labels.insert("crabka.io/cluster".into(), cluster.clone());
}
labels.insert("crabka.io/user".into(), name.clone());
labels.insert("crabka.io/auth".into(), "tls".into());
let mut data: BTreeMap<String, ByteString> = BTreeMap::new();
data.insert(
"user.crt".into(),
ByteString(user_cert.cert_pem.as_bytes().to_vec()),
);
data.insert(
"user.key".into(),
ByteString(user_cert.key_pem.as_bytes().to_vec()),
);
data.insert("ca.crt".into(), ByteString(ca_cert_pem.as_bytes().to_vec()));
Ok(Secret {
metadata: ObjectMeta {
name: Some(name),
namespace: obj.meta().namespace.clone(),
labels: Some(labels),
owner_references: Some(vec![user_owner_ref(obj)?]),
..Default::default()
},
type_: Some("Opaque".into()),
data: Some(data),
..Default::default()
})
}
fn user_owner_ref(obj: &KafkaUser) -> Result<OwnerReference, ReconcileError> {
let uid = obj
.meta()
.uid
.as_deref()
.ok_or(ReconcileError::MissingUid)?;
Ok(OwnerReference {
api_version: <KafkaUser as Resource>::api_version(&()).to_string(),
kind: <KafkaUser as Resource>::kind(&()).to_string(),
name: obj.name_any(),
uid: uid.to_string(),
controller: Some(true),
block_owner_deletion: Some(true),
})
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn tls_principal_format() {
assert!(tls_principal("alice") == "User:CN=alice");
}
#[test]
fn is_cert_expiring_soon_boundary_cases() {
let now = OffsetDateTime::now_utc();
let in_60_days = now + time::Duration::days(60);
assert!(
!is_cert_expiring_soon(&in_60_days, 30, now),
"60d > 30d window: not expiring"
);
let in_30_days = now + time::Duration::days(30);
assert!(
is_cert_expiring_soon(&in_30_days, 30, now),
"30d == 30d window: at boundary, treat as expiring (<=)"
);
let in_1_day = now + time::Duration::days(1);
assert!(
is_cert_expiring_soon(&in_1_day, 30, now),
"1d within 30d window: expiring"
);
let yesterday = now - time::Duration::days(1);
assert!(
is_cert_expiring_soon(&yesterday, 30, now),
"already past notAfter: expiring"
);
}
#[test]
fn cert_not_after_round_trips() {
let ca = ca::generate_clients_ca("test-root", 365).expect("ca");
let before = OffsetDateTime::now_utc();
let user = ca::issue_user_cert(&ca.cert_pem, &ca.key_pem, "alice", 365).expect("leaf");
let parsed = cert_not_after_from_pem(&user.cert_pem).expect("notAfter parses");
let expected = before + time::Duration::days(365);
let delta = (parsed - expected).whole_seconds().abs();
assert!(
delta <= 5,
"notAfter delta {delta}s exceeds ±5s tolerance (parsed={parsed}, expected={expected})"
);
}
#[test]
fn cert_not_after_from_pem_returns_none_on_malformed_input() {
assert!(cert_not_after_from_pem("not a pem").is_none());
assert!(cert_not_after_from_pem("").is_none());
assert!(
cert_not_after_from_pem("-----BEGIN CERTIFICATE-----\nQUFB\n-----END CERTIFICATE-----")
.is_none()
);
}
#[test]
fn read_pem_key_returns_some_when_present() {
let mut data = BTreeMap::new();
data.insert("ca.key".into(), ByteString(b"abc".to_vec()));
let s = Secret {
data: Some(data),
..Default::default()
};
assert!(read_pem_key(&s, "ca.key").as_deref() == Some("abc"));
}
#[test]
fn read_pem_key_returns_none_when_data_missing() {
let s = Secret::default();
assert!(read_pem_key(&s, "ca.key").is_none());
}
#[test]
fn read_pem_key_returns_none_when_key_missing() {
let mut data = BTreeMap::new();
data.insert("other".into(), ByteString(b"abc".to_vec()));
let s = Secret {
data: Some(data),
..Default::default()
};
assert!(read_pem_key(&s, "ca.key").is_none());
}
#[test]
fn read_pem_key_returns_none_on_non_utf8() {
let mut data = BTreeMap::new();
data.insert("ca.key".into(), ByteString(vec![0xFF, 0xFE, 0xFD]));
let s = Secret {
data: Some(data),
..Default::default()
};
assert!(read_pem_key(&s, "ca.key").is_none());
}
#[test]
fn read_user_cert_not_after_returns_none_when_user_crt_missing() {
let mut data = BTreeMap::new();
data.insert("user.key".into(), ByteString(b"junk".to_vec()));
let s = Secret {
data: Some(data),
..Default::default()
};
assert!(read_user_cert_not_after(&s).is_none());
}
#[test]
fn format_rfc3339_round_trips() {
let t = OffsetDateTime::from_unix_timestamp(1_700_000_000).expect("unix ts");
let s = format_rfc3339(t).expect("formats");
assert!(s == "2023-11-14T22:13:20Z");
}
fn dummy_ku() -> KafkaUser {
let mut ku = KafkaUser::new(
"alice",
crate::crd::KafkaUserSpec {
authentication: crate::crd::Authentication::Tls(TlsAuth::default()),
authorization: None,
quotas: None,
},
);
ku.metadata.namespace = Some("ns".into());
ku.metadata.uid = Some("user-uid".into());
let mut labels = BTreeMap::new();
labels.insert("crabka.io/cluster".into(), "demo".into());
ku.metadata.labels = Some(labels);
ku
}
#[test]
fn render_user_cert_secret_carries_three_keys_and_tls_auth_label() {
let ku = dummy_ku();
let user_cert = ca::UserCert {
cert_pem: "CERT".into(),
key_pem: "KEY".into(),
not_after: "2027-01-01T00:00:00Z".into(),
};
let secret = render_user_cert_secret(&ku, &user_cert, "CA-CERT").expect("renders");
assert!(secret.metadata.name.as_deref() == Some("alice"));
assert!(secret.metadata.namespace.as_deref() == Some("ns"));
let labels = secret.metadata.labels.as_ref().expect("labels");
assert!(labels.get("crabka.io/auth").map(String::as_str) == Some("tls"));
assert!(labels.get("crabka.io/user").map(String::as_str) == Some("alice"));
assert!(labels.get("crabka.io/cluster").map(String::as_str) == Some("demo"));
let owners = secret.metadata.owner_references.as_ref().expect("owner");
assert!(owners.len() == 1);
assert!(owners[0].kind == "KafkaUser");
assert!(owners[0].uid == "user-uid");
let data = secret.data.as_ref().expect("data");
assert!(data.get("user.crt").map(|bs| bs.0.as_slice()) == Some(b"CERT".as_slice()));
assert!(data.get("user.key").map(|bs| bs.0.as_slice()) == Some(b"KEY".as_slice()));
assert!(data.get("ca.crt").map(|bs| bs.0.as_slice()) == Some(b"CA-CERT".as_slice()));
}
#[test]
fn render_user_cert_secret_omits_cluster_label_when_label_absent() {
let mut ku = dummy_ku();
ku.metadata.labels = None;
let user_cert = ca::UserCert {
cert_pem: "C".into(),
key_pem: "K".into(),
not_after: "2027-01-01T00:00:00Z".into(),
};
let secret = render_user_cert_secret(&ku, &user_cert, "CA").expect("renders");
let labels = secret.metadata.labels.as_ref().expect("labels");
assert!(!labels.contains_key("crabka.io/cluster"));
}
#[test]
fn user_owner_ref_errors_on_missing_uid() {
let mut ku = dummy_ku();
ku.metadata.uid = None;
assert!(matches!(
user_owner_ref(&ku),
Err(ReconcileError::MissingUid)
));
}
#[test]
fn user_owner_ref_carries_block_owner_deletion() {
let ku = dummy_ku();
let owner = user_owner_ref(&ku).expect("owner ref");
assert!(owner.kind == "KafkaUser");
assert!(owner.name == "alice");
assert!(owner.uid == "user-uid");
assert!(owner.controller == Some(true));
assert!(owner.block_owner_deletion == Some(true));
}
}