use crate::ObjectState;
use crate::Operator;
use anyhow::{bail, ensure, Context};
use k8s_openapi::{
api::{
admissionregistration::v1::MutatingWebhookConfiguration,
core::v1::{Secret, Service},
},
apimachinery::pkg::apis::meta::v1::Status,
};
use k8s_openapi::{apimachinery::pkg::apis::meta::v1::OwnerReference, Metadata};
use kube::{
api::{ObjectMeta, Patch, PatchParams, ResourceExt},
Client, Resource,
};
use serde::{Deserialize, Serialize};
use std::{
fmt::{Display, Formatter},
sync::Arc,
};
use tracing::{info, trace, warn};
use tracing_futures::Instrument;
pub struct WebhookResources(pub Service, pub Secret, pub MutatingWebhookConfiguration);
impl From<(Service, Secret, MutatingWebhookConfiguration)> for WebhookResources {
fn from(tuple: (Service, Secret, MutatingWebhookConfiguration)) -> Self {
WebhookResources(tuple.0, tuple.1, tuple.2)
}
}
impl WebhookResources {
pub fn service(&self) -> &Service {
&self.0
}
pub fn secret(&self) -> &Secret {
&self.1
}
pub fn webhook_config(&self) -> &MutatingWebhookConfiguration {
&self.2
}
pub fn add_owner<T>(&self, owner: &T) -> Self
where
T: Resource + Metadata<Ty = ObjectMeta>,
{
let metadata = owner.metadata();
let owner_references = Some(vec![OwnerReference {
api_version: k8s_openapi::api_version(owner).to_string(),
controller: Some(true),
kind: k8s_openapi::kind(owner).to_string(),
name: metadata.name.clone().unwrap(),
uid: metadata.uid.clone().unwrap(),
..Default::default()
}]);
let mut secret = self.secret().to_owned();
secret.metadata.owner_references = owner_references.clone();
let mut service = self.service().to_owned();
service.metadata.owner_references = owner_references.clone();
let mut webhook_config = self.webhook_config().to_owned();
webhook_config.metadata.owner_references = owner_references;
WebhookResources(service, secret, webhook_config)
}
pub async fn apply_owned<T>(&self, client: &Client, owner: &T) -> anyhow::Result<()>
where
T: Resource + Metadata<Ty = ObjectMeta>,
{
self.add_owner(owner).apply(client).await
}
pub async fn apply(&self, client: &Client) -> anyhow::Result<()> {
let secret_namespace = self.secret().metadata.namespace.as_ref().with_context(|| {
format!(
"secret {} does not have namespace set",
self.secret()
.metadata
.name
.clone()
.unwrap_or_else(|| "".to_string())
)
})?;
let service_namespace = self
.service()
.metadata
.namespace
.as_ref()
.with_context(|| {
format!(
"service {} does not have namespace set",
self.service()
.metadata
.name
.as_ref()
.unwrap_or(&"".to_string())
)
})?;
{
let api: kube::Api<Secret> = kube::Api::namespaced(client.to_owned(), secret_namespace);
let name = self.secret().metadata.name.as_ref().unwrap();
api.patch(
name,
&PatchParams {
dry_run: false,
force: true,
field_manager: Some("krator".to_string()),
},
&Patch::Apply(self.secret()),
)
.await?;
}
{
let api: kube::Api<Service> =
kube::Api::namespaced(client.to_owned(), service_namespace);
let name = self.service().metadata.name.as_ref().unwrap();
api.patch(
name,
&PatchParams {
dry_run: false,
force: true,
field_manager: Some("krator".to_string()),
},
&Patch::Apply(self.service()),
)
.await?;
}
{
let api: kube::Api<MutatingWebhookConfiguration> = kube::Api::all(client.to_owned());
let name = self.webhook_config().metadata.name.as_ref().unwrap();
api.patch(
name,
&PatchParams {
dry_run: false,
force: true,
field_manager: Some("krator".to_string()),
},
&Patch::Apply(self.webhook_config()),
)
.await?;
}
Ok(())
}
}
impl Display for WebhookResources {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
let service = self.service();
write!(
f,
r#"
# resources necessary to expose the operator's webhook
# the service expects a pod with the labels
#
# {:?}
#
# in namespace {}
#
# the service for the webhook
{}
# the secret containing the certificate and the private key the
# webhook service uses for secure communication
{}
# the webhook configuration
{}
"#,
service.spec.clone().unwrap().selector.unwrap(),
service.metadata.namespace.as_ref().unwrap(),
serde_yaml::to_string(self.service()).unwrap(),
serde_yaml::to_string(self.secret()).unwrap(),
serde_yaml::to_string(self.webhook_config()).unwrap()
)
}
}
pub struct AdmissionTls {
pub cert: String,
pub private_key: String,
}
impl AdmissionTls {
pub fn from(s: &Secret) -> anyhow::Result<Self> {
ensure!(
s.type_.as_ref().unwrap() == "tls",
"only tls secrets can be converted to AdmisstionTLS struct"
);
let metadata = &s.metadata;
let error_msg = |key: &str| {
format!(
"secret data {}/{} does not contain key {}",
metadata.name.as_ref().unwrap_or(&"".to_string()),
metadata.namespace.as_ref().unwrap_or(&"".to_string()),
key
)
};
const TLS_CRT: &str = "tls.crt";
const TLS_KEY: &str = "tls.key";
if let Some(data) = &s.data {
let cert_byte_string = data.get(TLS_CRT).context(error_msg(TLS_CRT))?;
let key_byte_string = data.get(TLS_KEY).context(error_msg(TLS_KEY))?;
return Ok(AdmissionTls {
cert: std::str::from_utf8(&cert_byte_string.0)?.to_string(),
private_key: std::str::from_utf8(&key_byte_string.0)?.to_string(),
});
}
if let Some(string_data) = &s.string_data {
let cert = string_data.get(TLS_CRT).context(error_msg(TLS_CRT))?;
let key = string_data.get(TLS_KEY).context(error_msg(TLS_KEY))?;
return Ok(AdmissionTls {
cert: cert.to_string(),
private_key: key.to_string(),
});
}
bail!(
"secret {}/{} does not contain any data",
metadata.name.as_ref().unwrap_or(&"".to_string()),
metadata.namespace.as_ref().unwrap_or(&"".to_string())
)
}
}
pub type WebhookFn<C> = dyn Fn(
<C as Operator>::Manifest,
<<C as Operator>::ObjectState as ObjectState>::SharedState,
) -> AdmissionResult<<C as Operator>::Manifest>;
#[allow(clippy::large_enum_variant)]
pub enum AdmissionResult<T> {
Allow(T),
Deny(Status),
}
#[derive(Debug)]
enum Operation {
Create,
Update,
Delete,
}
#[derive(Deserialize, Debug)]
struct UserInfo {
username: String,
groups: Vec<String>,
}
#[derive(Deserialize)]
#[serde(tag = "operation")]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
enum AdmissionRequestOperation<T> {
Create {
object: T,
},
Update {
object: T,
#[serde(rename = "oldObject")]
old_object: T,
},
Delete {
#[serde(rename = "oldObject")]
old_object: T,
},
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct AdmissionRequest<T> {
uid: Option<String>,
user_info: UserInfo,
#[serde(flatten)]
operation: AdmissionRequestOperation<T>,
}
impl<T: Resource> AdmissionRequest<T> {
fn name(&self) -> String {
match &self.operation {
AdmissionRequestOperation::Create { object, .. } => object.name(),
AdmissionRequestOperation::Update { object, .. } => object.name(),
AdmissionRequestOperation::Delete { old_object, .. } => old_object.name(),
}
}
fn namespace(&self) -> Option<String> {
match &self.operation {
AdmissionRequestOperation::Create { object, .. } => object.namespace(),
AdmissionRequestOperation::Update { object, .. } => object.namespace(),
AdmissionRequestOperation::Delete { old_object, .. } => old_object.namespace(),
}
}
fn operation(&self) -> Operation {
match &self.operation {
AdmissionRequestOperation::Create { .. } => Operation::Create,
AdmissionRequestOperation::Update { .. } => Operation::Update,
AdmissionRequestOperation::Delete { .. } => Operation::Delete,
}
}
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct AdmissionResponse {
uid: Option<String>,
allowed: bool,
status: Option<Status>,
patch: Option<json_patch::Patch>,
patch_type: Option<String>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct AdmissionReviewRequest<T> {
api_version: String,
kind: String,
request: AdmissionRequest<T>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct AdmissionReviewResponse {
api_version: String,
kind: String,
response: AdmissionResponse,
}
#[tracing::instrument(
level="debug",
skip(operator, request),
fields(
name=%request.request.name(),
namespace=?request.request.namespace(),
api_version=%request.api_version,
operation=?request.request.operation(),
user_info=?request.request.user_info
)
)]
async fn review<O: Operator>(
operator: Arc<O>,
request: AdmissionReviewRequest<O::Manifest>,
) -> warp::reply::Json {
let manifest = match request.request.operation {
AdmissionRequestOperation::Create { object, .. } => object,
AdmissionRequestOperation::Update {
old_object, object, ..
} => {
let value = serde_json::to_value(&object).unwrap();
let old_value = serde_json::to_value(&old_object).unwrap();
let diff = json_patch::diff(&old_value, &value);
if !diff.0.is_empty() {
trace!(
diff=%format!("{:#?}", diff),
"Object changed."
);
}
object
}
AdmissionRequestOperation::Delete { old_object, .. } => old_object,
};
let name = manifest.name();
let namespace = manifest.namespace();
let span = tracing::debug_span!("Operator::admission_hook",);
let result = operator
.admission_hook(manifest.clone())
.instrument(span)
.await;
let response = match result {
AdmissionResult::Allow(new_manifest) => {
let new_value = serde_json::to_value(&new_manifest).unwrap();
let old_value = serde_json::to_value(&manifest).unwrap();
let patch = json_patch::diff(&old_value, &new_value);
let (patch, patch_type) = if !patch.0.is_empty() {
(Some(patch), Some("JSONPatch".to_string()))
} else {
(None, None)
};
info!(
%name,
?namespace,
allowed=true,
?patch,
"Admission request allowed."
);
AdmissionResponse {
uid: request.request.uid,
allowed: true,
status: None,
patch,
patch_type,
}
}
AdmissionResult::Deny(status) => {
warn!(
code=?status.code,
reason=?status.reason,
message=?status.message,
%name,
?namespace,
allowed=false,
"Admission request denied."
);
AdmissionResponse {
uid: request.request.uid,
allowed: false,
status: Some(status),
patch: None,
patch_type: None,
}
}
};
warp::reply::json(&AdmissionReviewResponse {
api_version: request.api_version,
kind: request.kind,
response,
})
}
pub(crate) async fn endpoint<O: Operator>(operator: Arc<O>) {
let tls = operator
.admission_hook_tls()
.await
.expect("getting webhook tls AdmissionTls failed");
use warp::Filter;
let routes = warp::any()
.and(warp::post())
.and(warp::body::json())
.and_then(move |request: AdmissionReviewRequest<O::Manifest>| {
let operator = Arc::clone(&operator);
async move {
let response = review(operator, request).await;
Ok::<_, std::convert::Infallible>(response)
}
});
warp::serve(routes)
.tls()
.cert(tls.cert)
.key(tls.private_key)
.run(([0, 0, 0, 0], 8443))
.await;
}