use std::{convert::TryFrom, env, path::Path, str};
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::certificates::v1beta1::CertificateSigningRequest;
use kube::api::{Api, ListParams, PostParams};
use kube::config::Kubeconfig;
use kube::Config;
use kube_runtime::watcher::{watcher, Event};
use rcgen::{
Certificate, CertificateParams, DistinguishedName, DnType, KeyPair, SanType,
PKCS_ECDSA_P256_SHA256,
};
use tokio::fs::{read, write};
use tracing::{debug, info};
use crate::config::Config as KubeletConfig;
use crate::kubeconfig::exists as kubeconfig_exists;
use crate::kubeconfig::KUBECONFIG;
const APPROVED_TYPE: &str = "Approved";
pub async fn bootstrap<K: AsRef<Path>>(
config: &KubeletConfig,
bootstrap_file: K,
notify: impl Fn(String),
) -> anyhow::Result<Config> {
debug!("Starting bootstrap for {}", config.node_name);
let kubeconfig = bootstrap_auth(config, bootstrap_file).await?;
bootstrap_tls(config, kubeconfig.clone(), notify).await?;
Ok(kubeconfig)
}
async fn bootstrap_auth<K: AsRef<Path>>(
config: &KubeletConfig,
bootstrap_file: K,
) -> anyhow::Result<Config> {
if kubeconfig_exists() {
debug!("Found existing kubeconfig, loading...");
Config::infer()
.await
.map_err(|e| anyhow::anyhow!("Unable to load config from host: {}", e))
} else {
let original_kubeconfig = env::var(KUBECONFIG)?;
debug!(
"No existing kubeconfig found, loading bootstrap config from {:?}",
bootstrap_file.as_ref()
);
env::set_var(KUBECONFIG, bootstrap_file.as_ref().as_os_str());
let conf = kube::Config::infer().await?;
let client = kube::Client::try_from(conf)?;
let cert_bundle = gen_auth_cert(config)?;
let bootstrap_config = read_from(&bootstrap_file).await?;
let named_cluster = bootstrap_config
.clusters
.into_iter()
.next()
.ok_or_else(|| {
anyhow::anyhow!("Unable to find cluster information in bootstrap config")
})?;
let server = named_cluster.cluster.server;
let ca_data = named_cluster
.cluster
.certificate_authority_data
.ok_or_else(|| {
anyhow::anyhow!(
"Unable to find certificate authority information in bootstrap config"
)
})?;
let csrs: Api<CertificateSigningRequest> = Api::all(client);
let csr_json = serde_json::json!({
"apiVersion": "certificates.k8s.io/v1beta1",
"kind": "CertificateSigningRequest",
"metadata": {
"name": config.node_name,
},
"spec": {
"request": base64::encode(cert_bundle.serialize_request_pem()?.as_bytes()),
"signerName": "kubernetes.io/kube-apiserver-client-kubelet",
"usages": [
"digital signature",
"key encipherment",
"client auth"
]
}
});
let post_data = serde_json::from_value(csr_json)
.expect("Invalid CSR JSON, this is a programming error");
csrs.create(&PostParams::default(), &post_data).await?;
let inf = watcher(
csrs,
ListParams::default().fields(&format!("metadata.name={}", config.node_name)),
);
let mut watcher = inf.boxed();
let mut generated_kubeconfig = Vec::new();
let mut got_cert = false;
let start = std::time::Instant::now();
while let Some(event) = watcher.try_next().await? {
let status = match event {
Event::Applied(m) => m.status.unwrap(),
Event::Restarted(mut certs) => {
if certs.len() > 1 {
return Err(anyhow::anyhow!("On watch restart, got more than 1 authentication CSR. This means something is in an incorrect state"));
}
certs.remove(0).status.unwrap()
}
Event::Deleted(_) => {
return Err(anyhow::anyhow!(
"Authentication CSR was deleted before it was approved"
))
}
};
if let Some(cert) = status.certificate {
if let Some(v) = status.conditions {
if v.into_iter().any(|c| c.type_.as_str() == APPROVED_TYPE) {
generated_kubeconfig = gen_kubeconfig(
ca_data,
server,
cert,
cert_bundle.serialize_private_key_pem(),
)?;
got_cert = true;
break;
}
}
}
info!("Got modified event, but CSR for authentication certs is not currently approved, {:?} elapsed", start.elapsed());
}
if !got_cert {
return Err(anyhow::anyhow!(
"Authentication certificates were never approved"
));
}
write(&original_kubeconfig, &generated_kubeconfig).await?;
env::set_var(KUBECONFIG, original_kubeconfig);
Config::infer()
.await
.map_err(|e| anyhow::anyhow!("Unable to load generated config: {}", e))
}
}
async fn bootstrap_tls(
config: &KubeletConfig,
kubeconfig: Config,
notify: impl Fn(String),
) -> anyhow::Result<()> {
debug!("Starting bootstrap of TLS serving certs");
if config.server_config.cert_file.exists() {
return Ok(());
}
let cert_bundle = gen_tls_cert(config)?;
let csr_name = format!("{}-tls", config.hostname);
let client = kube::Client::try_from(kubeconfig)?;
let csrs: Api<CertificateSigningRequest> = Api::all(client);
let csr_json = serde_json::json!({
"apiVersion": "certificates.k8s.io/v1beta1",
"kind": "CertificateSigningRequest",
"metadata": {
"name": csr_name,
},
"spec": {
"request": base64::encode(cert_bundle.serialize_request_pem()?.as_bytes()),
"signerName": "kubernetes.io/kubelet-serving",
"usages": [
"digital signature",
"key encipherment",
"server auth"
]
}
});
let post_data =
serde_json::from_value(csr_json).expect("Invalid CSR JSON, this is a programming error");
csrs.create(&PostParams::default(), &post_data).await?;
notify(awaiting_user_csr_approval("TLS", &csr_name));
let inf = watcher(
csrs,
ListParams::default().fields(&format!("metadata.name={}", csr_name)),
);
let mut watcher = inf.boxed();
let mut certificate = String::new();
let mut got_cert = false;
let start = std::time::Instant::now();
while let Some(event) = watcher.try_next().await? {
let status = match event {
Event::Applied(m) => m.status.unwrap(),
Event::Restarted(mut certs) => {
if certs.len() > 1 {
return Err(anyhow::anyhow!("On watch restart, got more than 1 serving CSR. This means something is in an incorrect state"));
}
certs.remove(0).status.unwrap()
}
Event::Deleted(_) => {
return Err(anyhow::anyhow!(
"Serving CSR was deleted before it was approved"
))
}
};
if let Some(cert) = status.certificate {
if let Some(v) = status.conditions {
if v.into_iter().any(|c| c.type_.as_str() == APPROVED_TYPE) {
certificate = std::str::from_utf8(&cert.0)?.to_owned();
got_cert = true;
break;
}
}
}
info!("Got modified event, but CSR for serving certs is not currently approved, {:?} remaining", start.elapsed());
}
if !got_cert {
return Err(anyhow::anyhow!(
"Authentication certificates were never approved"
));
}
let private_key = cert_bundle.serialize_private_key_pem();
debug!(
"Got certificate from API, writing cert to {:?} and private key to {:?}",
config.server_config.cert_file, config.server_config.private_key_file
);
write(&config.server_config.cert_file, &certificate).await?;
write(&config.server_config.private_key_file, &private_key).await?;
notify(completed_csr_approval("TLS"));
Ok(())
}
fn awaiting_user_csr_approval(cert_description: &str, csr_name: &str) -> String {
format!(
"{} certificate requires manual approval. Run kubectl certificate approve {}",
cert_description, csr_name
)
}
fn completed_csr_approval(cert_description: &str) -> String {
format!(
"received {} certificate approval: continuing",
cert_description
)
}
#[allow(clippy::field_reassign_with_default)]
fn gen_auth_cert(config: &KubeletConfig) -> anyhow::Result<Certificate> {
let mut params = CertificateParams::default();
params.not_before = chrono::Utc::now();
params.not_after = chrono::Utc::now() + chrono::Duration::weeks(52);
let mut distinguished_name = DistinguishedName::new();
distinguished_name.push(DnType::OrganizationName, "system:nodes");
distinguished_name.push(
DnType::CommonName,
&format!("system:node:{}", config.node_name),
);
params.distinguished_name = distinguished_name;
params
.key_pair
.replace(KeyPair::generate(&PKCS_ECDSA_P256_SHA256)?);
params.alg = &PKCS_ECDSA_P256_SHA256;
Ok(Certificate::from_params(params)?)
}
#[allow(clippy::field_reassign_with_default)]
fn gen_tls_cert(config: &KubeletConfig) -> anyhow::Result<Certificate> {
let mut params = CertificateParams::default();
params.not_before = chrono::Utc::now();
params.not_after = chrono::Utc::now() + chrono::Duration::weeks(52);
let mut distinguished_name = DistinguishedName::new();
distinguished_name.push(DnType::OrganizationName, "system:nodes");
distinguished_name.push(
DnType::CommonName,
&format!("system:node:{}", config.hostname),
);
params.distinguished_name = distinguished_name;
params
.key_pair
.replace(KeyPair::generate(&PKCS_ECDSA_P256_SHA256)?);
params.alg = &PKCS_ECDSA_P256_SHA256;
params.subject_alt_names = vec![
SanType::DnsName(config.hostname.clone()),
SanType::IpAddress(config.node_ip),
];
Ok(Certificate::from_params(params)?)
}
fn gen_kubeconfig(
ca_data: String,
server: String,
client_cert_data: k8s_openapi::ByteString,
client_key: String,
) -> anyhow::Result<Vec<u8>> {
let json = serde_json::json!({
"kind": "Config",
"apiVersion": "v1",
"preferences": {},
"clusters": [{
"name": "krustlet",
"cluster": {
"certificate-authority-data": ca_data,
"server": server,
}
}],
"users":[{
"name": "krustlet",
"user": {
"client-certificate-data": client_cert_data,
"client-key-data": base64::encode(client_key.as_bytes())
}
}],
"contexts": [{
"name": "krustlet",
"context": {
"cluster": "krustlet",
"user": "krustlet",
}
}],
"current-context": "krustlet"
});
serde_json::to_vec(&json)
.map_err(|e| anyhow::anyhow!("Unable to serialize generated kubeconfig: {}", e))
}
async fn read_from<P: AsRef<Path>>(path: P) -> anyhow::Result<Kubeconfig> {
let raw = read(path)
.await
.map_err(|e| anyhow::anyhow!(format!("Error loading bootstrap file: {}", e)))?;
let config = serde_yaml::from_slice(&raw)
.map_err(|e| anyhow::anyhow!(format!("Error parsing bootstrap file: {}", e)))?;
Ok(config)
}