use std::collections::HashSet;
use std::fmt;
use std::marker::PhantomData;
use anyhow::Context;
use kube::ResourceExt;
use crate::k8s;
use crate::k8s::apiextensionsv1::CustomResourceDefinition;
use crate::k8s::appsv1;
use crate::k8s::corev1;
use crate::k8s::rbacv1;
use super::*;
#[derive(Debug)]
pub struct ControllerRbac<C> {
pub k8s: k8s::Kubectl,
pub namespace: String,
pub _c: PhantomData<C>,
}
impl<C> ControllerRbac<C> {
pub fn new(k8s: k8s::Kubectl, namespace: impl ToString) -> Self {
Self {
k8s,
namespace: namespace.to_string(),
_c: PhantomData,
}
}
}
impl<C> std::ops::Deref for ControllerRbac<C> {
type Target = k8s::Kubectl;
fn deref(&self) -> &Self::Target {
&self.k8s
}
}
#[derive(Debug)]
pub struct RbacClusterResources {
pub secret: RbacClusterResourcesSecret,
pub service_account: corev1::ServiceAccount,
pub cluster_role: rbacv1::ClusterRole,
pub cluster_role_binding: rbacv1::ClusterRoleBinding,
}
impl fmt::Display for RbacClusterResources {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let secret = format!("IMAGE PULL SECRET: {}", self.secret.name());
let service_account = format!("SERVICE ACCOUNT: {}", self.service_account.name());
let cluster_role = format!("CLUSTER ROLE: {}", self.cluster_role.name());
let cluster_role_binding =
format!("CLUSTER ROLE BINDING: {}", self.cluster_role_binding.name());
write!(
f,
"rbac cluster resources:\n{}\n{}\n{}\n{}",
secret, service_account, cluster_role, cluster_role_binding
)
}
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum RbacClusterResourcesSecret {
Secret(corev1::Secret),
SecretName(String),
}
impl From<corev1::Secret> for RbacClusterResourcesSecret {
fn from(secret: corev1::Secret) -> Self {
Self::Secret(secret)
}
}
impl RbacClusterResourcesSecret {
pub async fn ensure_image_pull_secret_is_installed(
k8s: &k8s::Kubectl,
secret: corev1::Secret,
) -> anyhow::Result<corev1::Secret> {
let name = secret.name();
let r#type = secret.type_.as_deref().unwrap_or("Opaque");
let data = secret.data.context("secret must have data")?;
k8s.put_secret(&name, r#type, data)
.await
.context("failed to put secret")
}
pub async fn install(self, k8s: &k8s::Kubectl) -> anyhow::Result<Self> {
match self {
Self::SecretName(name) => {
k8s.get_secret(&name)
.await
.with_context(|| format!("failed to get secret {name}"))?;
Ok(Self::SecretName(name))
}
Self::Secret(secret) => Self::ensure_image_pull_secret_is_installed(k8s, secret)
.await
.map(Self::Secret),
}
}
fn name(&self) -> std::borrow::Cow<'_, str> {
match self {
Self::Secret(secret) => std::borrow::Cow::Owned(secret.name()),
Self::SecretName(name) => std::borrow::Cow::Borrowed(name),
}
}
}
impl RbacClusterResources {
pub fn as_yaml<W>(&self, mut writer: W) -> serde_yaml::Result<()>
where
W: std::io::Write,
{
if let RbacClusterResourcesSecret::Secret(secret) = &self.secret {
serde_yaml::to_writer(&mut writer, secret)?;
}
serde_yaml::to_writer(&mut writer, &self.service_account)?;
serde_yaml::to_writer(&mut writer, &self.cluster_role)?;
serde_yaml::to_writer(&mut writer, &self.cluster_role_binding)?;
Ok(())
}
}
impl<C: InstallOpinionatedController> ControllerRbac<C> {
pub async fn uninstall(
&self,
namespace: &str,
resources: RbacClusterResources,
) -> anyhow::Result<()> {
crate::utils::delete_and_await::<rbacv1::ClusterRoleBinding>(
self.api(),
&resources.cluster_role_binding.name(),
)
.await?;
crate::utils::delete_and_await::<rbacv1::ClusterRole>(
self.api(),
&resources.cluster_role.name(),
)
.await?;
crate::utils::delete_and_await::<corev1::ServiceAccount>(
self.api(),
&resources.service_account.name(),
)
.await?;
crate::utils::delete_and_await(self.secrets(namespace), resources.secret.name().as_ref())
.await?;
Ok(())
}
pub async fn install(
&self,
secret: RbacClusterResourcesSecret,
) -> anyhow::Result<RbacClusterResources> {
use k8s::ClusterRoleExt;
let secret = secret.install(self).await?;
let service_account = Self::service_account(secret.name().as_ref());
let service_account = self
.ensure_namespaced_k_is_installed(service_account, &self.namespace)
.await?;
let name = &C::rbac_name();
let cluster_role = k8s::Kubectl::update_with_default(
self.api(),
name,
|role| cluster_role_add_rules(role, Self::cluster_role_rules()),
|| rbacv1::ClusterRole::new(name),
)
.await?;
let cluster_role_binding = self
.ensure_global_k_is_installed(cluster_role_binding(
&service_account.name(),
&cluster_role,
&self.namespace,
))
.await?;
Ok(RbacClusterResources {
secret,
service_account,
cluster_role,
cluster_role_binding,
})
}
fn get_resources(namespace: &str, image_pull_secret_name: &str) -> RbacClusterResources {
let service_account = Self::service_account(image_pull_secret_name);
let service_account_name = &service_account.name();
let cluster_role = Self::cluster_role();
let cluster_role_binding =
cluster_role_binding(service_account_name, &cluster_role, namespace);
RbacClusterResources {
secret: RbacClusterResourcesSecret::SecretName(image_pull_secret_name.to_string()),
service_account,
cluster_role,
cluster_role_binding,
}
}
pub fn cluster_role_rules() -> Vec<rbacv1::PolicyRule> {
use k8s::PolicyRuleExt;
use kube::discovery::verbs::*;
vec![
rbacv1::PolicyRule::new::<CustomResourceDefinition>().verbs([GET, PATCH, CREATE]),
rbacv1::PolicyRule::new::<corev1::Event>().verb(CREATE),
rbacv1::PolicyRule::new::<corev1::Namespace>().all_verbs(),
rbacv1::PolicyRule::new::<corev1::Secret>().all_verbs(),
rbacv1::PolicyRule::new::<appsv1::DaemonSet>()
.with_status()
.all_verbs(),
rbacv1::PolicyRule::new::<appsv1::Deployment>()
.with_status()
.all_verbs(),
rbacv1::PolicyRule::new::<corev1::ConfigMap>().all_verbs(),
]
}
pub fn cluster_role() -> rbacv1::ClusterRole {
use k8s::ClusterRoleExt;
rbacv1::ClusterRole::new(C::rbac_name()).rules(Self::cluster_role_rules())
}
pub async fn ensure_image_pull_secret_is_installed(
&self,
image_pull_secret_name: &str,
secret: corev1::Secret,
) -> anyhow::Result<corev1::Secret> {
let r#type = secret.type_.as_deref().unwrap_or("Opaque");
let data = secret.data.context("secret must have data")?;
self.put_secret(image_pull_secret_name, r#type, data)
.await
.context("failed to put secret")
}
pub fn service_account(secret: &str) -> crate::k8s::corev1::ServiceAccount {
use crate::k8s::ServiceAccountExt;
let name = C::rbac_name();
crate::k8s::corev1::ServiceAccount::new(name).image_pull_secret(secret)
}
}
pub type InstallSelfResult = (
rbacv1::ClusterRole,
CustomResourceDefinition,
appsv1::Deployment,
);
#[derive(Debug)]
pub struct InstallResult(
pub RbacClusterResources,
pub CustomResourceDefinition,
pub appsv1::Deployment,
);
impl std::fmt::Display for InstallResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)?;
write!(f, "\nCRD SECRET: {}", self.1.name())?;
write!(f, "\nDEPLOYMENT SECRET: {}", self.2.name())?;
Ok(())
}
}
#[async_trait::async_trait]
pub trait InstallOpinionatedController: OpinionatedController {
fn rbac_name() -> String {
"statehub-common".to_string()
}
fn deployment_app_name() -> String {
Self::CONTROLLER_NAME.to_string()
}
async unsafe fn uninstall(
&self,
namespace: &str,
resources: RbacClusterResources,
deployment: appsv1::Deployment,
) -> anyhow::Result<()> {
let helper = self.helper();
crate::utils::delete_and_await(helper.deployments(namespace), &deployment.name()).await?;
crate::utils::delete_and_await(helper.crds(), Self::K::crd_name()).await?;
ControllerRbac::<Self>::new(helper, namespace)
.uninstall(namespace, resources)
.await
}
async fn install<F>(
&self,
namespace: &str,
secret: RbacClusterResourcesSecret,
controller_image: &str,
rust_log: &str,
deployment_update_f: F,
) -> anyhow::Result<InstallResult>
where
<Self::K as kube::Resource>::DynamicType: Default,
F: FnOnce(appsv1::Deployment) -> appsv1::Deployment + Send + Sync + 'static,
{
let resources = ControllerRbac::<Self>::new(self.helper(), namespace)
.install(secret)
.await
.context("install-common")?;
let service_account_name = &resources.service_account.name();
let (cluster_role, crd, deployment) = self
.install_self(
namespace,
service_account_name,
controller_image,
rust_log,
deployment_update_f,
)
.await
.context("install-self")?;
Ok(InstallResult(
RbacClusterResources {
cluster_role,
..resources
},
crd,
deployment,
))
}
async fn uninstall_self(
&self,
namespace: &str,
deployment: appsv1::Deployment,
) -> anyhow::Result<()>
where
<Self::K as kube::Resource>::DynamicType: Default,
{
let not_found = http::StatusCode::NOT_FOUND.as_u16();
let helper = self.helper();
crate::utils::delete_and_await(helper.deployments(namespace), &deployment.name()).await?;
let name = &Self::rbac_name();
let remove_rule_result = k8s::Kubectl::update(helper.api(), name, Self::remove_rules).await;
match remove_rule_result {
Ok(_) => Ok(()),
Err(kube::Error::Api(e)) if e.code == not_found => Ok(()),
Err(e) => Err(e),
}?;
crate::utils::delete_and_await(helper.crds(), Self::K::crd_name()).await?;
Ok(())
}
async fn install_self<F>(
&self,
namespace: &str,
service_account_name: &str,
controller_image: &str,
rust_log: &str,
deployment_update_f: F,
) -> anyhow::Result<InstallSelfResult>
where
<Self::K as kube::Resource>::DynamicType: Default,
F: FnOnce(appsv1::Deployment) -> appsv1::Deployment + Send + Sync + 'static,
{
use crate::k8s::ClusterRoleExt;
let helper = self.helper();
let crd = helper.create_global_crd(Self::K::crd()).await?;
let name = &Self::rbac_name();
let cluster_role =
k8s::Kubectl::update_with_default(helper.api(), name, Self::add_rules, || {
rbacv1::ClusterRole::new(name).rules(ControllerRbac::<Self>::cluster_role_rules())
})
.await?;
let deployment = Self::deployment(controller_image, service_account_name, rust_log);
let deployment = deployment_update_f(deployment);
let deployment = helper
.ensure_namespaced_k_is_installed(deployment, namespace)
.await?;
Ok((cluster_role, crd, deployment))
}
fn as_yaml<W, F>(
mut writer: W,
namespace: &str,
image_pull_secret_name: &str,
controller_image: &str,
rust_log: &str,
deployment_update_f: F,
) -> serde_yaml::Result<()>
where
<Self::K as kube::Resource>::DynamicType: Default,
W: std::io::Write,
F: FnOnce(appsv1::Deployment) -> appsv1::Deployment + Send + Sync + 'static,
{
let mut resources =
ControllerRbac::<Self>::get_resources(namespace, image_pull_secret_name);
Self::add_rules(&mut resources.cluster_role);
let crd = Self::K::crd();
let service_account_name = &resources.service_account.name();
let deployment = Self::deployment(controller_image, service_account_name, rust_log);
let deployment = deployment_update_f(deployment);
resources.as_yaml(&mut writer)?;
serde_yaml::to_writer(&mut writer, &crd)?;
serde_yaml::to_writer(&mut writer, &deployment)?;
Ok(())
}
async fn get_deployment_ref(
&self,
block_owner_deletion: bool,
) -> anyhow::Result<Option<crate::k8s::metav1::OwnerReference>> {
static NAMESPACE: once_cell::sync::OnceCell<String> = once_cell::sync::OnceCell::new();
let namespace = NAMESPACE
.get_or_try_init(|| std::env::var(crate::env::NAMESPACE))?
.as_str();
let deployment = self
.helper()
.get_deployment(&Self::deployment_app_name(), Some(namespace))
.await?
.object_ref(&());
Ok(crate::k8s::owner_reference(
deployment,
true,
block_owner_deletion,
))
}
fn deployment(image: &str, service_account: &str, rust_log: &str) -> appsv1::Deployment
where
<Self::K as kube::Resource>::DynamicType: Default,
{
use crate::k8s::ContainerExt;
use crate::k8s::DeploymentExt;
use crate::k8s::EnvVarExt;
use crate::k8s::PodSpecExt;
use crate::k8s::PodTemplateSpecExt;
let app_name = &Self::deployment_app_name();
let container = corev1::Container::new(app_name)
.image(image)
.image_pull_policy_always()
.env(std::iter::once(EnvVarExt::value("RUST_LOG", rust_log)).chain(crate::env::all()));
let pod = corev1::PodSpec::container(container).service_account_name(service_account);
let template = corev1::PodTemplateSpec::new(app_name)
.labels([("app", app_name)])
.pod_spec(pod);
appsv1::Deployment::with_labels(app_name, [("app", app_name)])
.replicas(1)
.match_labels([("app", app_name)])
.template(template)
}
fn rules() -> Vec<rbacv1::PolicyRule>
where
<Self::K as kube::Resource>::DynamicType: Default,
{
use crate::k8s::rbacv1;
use crate::k8s::PolicyRuleExt;
let dt = &<Self::K as kube::Resource>::DynamicType::default();
let rule = rbacv1::PolicyRule::default()
.api_group(Self::K::group(dt))
.resource(Self::K::plural(dt))
.with_status()
.all_verbs();
vec![rule]
}
fn add_rules(role: &mut crate::k8s::rbacv1::ClusterRole)
where
<Self::K as kube::Resource>::DynamicType: Default,
{
cluster_role_add_rules(role, Self::rules())
}
fn remove_rules(role: &mut crate::k8s::rbacv1::ClusterRole)
where
<Self::K as kube::Resource>::DynamicType: Default,
{
cluster_role_remove(role, Self::rules())
}
}
pub fn cluster_role_add_rules(
role: &mut rbacv1::ClusterRole,
mut new_rules: Vec<rbacv1::PolicyRule>,
) {
let old_rules = role.rules.get_or_insert_with(Vec::new);
for rule in old_rules.iter_mut().chain(&mut new_rules) {
rule.resources.get_or_insert_with(Vec::new).sort();
rule.api_groups.get_or_insert_with(Vec::new).sort();
rule.verbs.sort()
}
let old_rules_keys: HashSet<_> = old_rules
.iter()
.map(|rule| (&rule.resources, &rule.api_groups, &rule.verbs))
.collect();
new_rules
.retain(|rule| !old_rules_keys.contains(&(&rule.resources, &rule.api_groups, &rule.verbs)));
old_rules.extend(new_rules);
let is_empty = old_rules.is_empty();
if is_empty {
role.rules = None
}
}
pub fn cluster_role_remove(
role: &mut rbacv1::ClusterRole,
mut removed_rules: Vec<rbacv1::PolicyRule>,
) {
let old_rules = role.rules.get_or_insert_with(Vec::new);
for rule in old_rules.iter_mut().chain(&mut removed_rules) {
rule.resources.get_or_insert_with(Vec::new).sort();
rule.api_groups.get_or_insert_with(Vec::new).sort();
}
let removed_rules_keys: HashSet<_> = removed_rules
.iter()
.map(|rule| (&rule.resources, &rule.api_groups))
.collect();
old_rules.retain(|rule| !removed_rules_keys.contains(&(&rule.resources, &rule.api_groups)));
let is_empty = old_rules.is_empty();
if is_empty {
role.rules = None
}
}
pub fn cluster_role_binding(
service_account: &str,
cluster_role: &rbacv1::ClusterRole,
namespace: &str,
) -> rbacv1::ClusterRoleBinding {
use crate::k8s::ClusterRoleBindingExt;
use crate::k8s::SubjectExt;
let name = format!("{}-{}", service_account, cluster_role.name());
rbacv1::ClusterRoleBinding::new(name, cluster_role)
.subjects([rbacv1::Subject::service_account(service_account).namespace(namespace)])
}
#[cfg(test)]
mod tests {
use super::*;
use crate::controller::test_utils::*;
use crate::k8s::{Kubectl, SecretExt};
use std::convert::identity;
#[derive(
Clone, Debug, Serialize, Deserialize, CustomResource, JsonSchema, PartialEq, Eq, Hash,
)]
#[kube(
group = "statehub.cloud",
version = "v1alpha1",
kind = "BuilderCrd",
status = "IdStatus",
shortname = "builder",
crates(k8s_openapi = "k8s::openapi")
)]
struct IdSpec {
id: uuid::Uuid,
}
#[derive(Clone, Debug)]
struct BuilderManager<K> {
client: Kubectl,
_k: PhantomData<K>,
}
impl<K> BuilderManager<K> {
fn new(client: Kubectl) -> Self {
Self {
client,
_k: PhantomData,
}
}
}
impl<T> super::InstallOpinionatedController for BuilderManager<T>
where
T: kube::Resource
+ CustomResourceExt
+ HasStatus
+ HasSpec
+ ser::Serialize
+ de::DeserializeOwned
+ Clone
+ Send
+ Sync
+ Debug
+ 'static,
<T as kube::Resource>::DynamicType: Default,
{
fn deployment_app_name() -> String {
let controller_name = Self::CONTROLLER_NAME;
let crd_name = slug::slugify(T::crd_name());
format!("{controller_name}-{crd_name}")
}
fn rbac_name() -> String {
Self::deployment_app_name()
}
}
#[async_trait::async_trait]
impl<T> super::super::OpinionatedController for BuilderManager<T>
where
T: kube::Resource
+ CustomResourceExt
+ HasStatus
+ HasSpec
+ ser::Serialize
+ de::DeserializeOwned
+ Clone
+ Send
+ Sync
+ Debug
+ 'static,
<T as kube::Resource>::DynamicType: Default,
{
type K = T;
type Error = kube::Error;
const CONTROLLER_NAME: &'static str = "builder-manager";
fn api(&self) -> kube::Api<Self::K> {
self.client.api()
}
async fn apply(&self, _x: Arc<Self::K>) -> Result<Option<Duration>, Self::Error> {
Ok(None)
}
async fn cleanup(&self, _x: Arc<Self::K>) -> Result<Option<Duration>, Self::Error> {
Ok(None)
}
}
#[tokio::test]
#[ignore]
async fn basic_installation() {
init_tracing_subscriber();
let namespace = "default";
let secret_name = "image-pull-secret-basic-installation";
let client = Kubectl::try_default().await.unwrap();
let stuff = crate::k8s::openapi::ByteString(b"stuff".to_vec());
let secret = corev1::Secret::new(secret_name).data([("things".to_string(), stuff.clone())]);
let manager = BuilderManager::<BuilderCrd>::new(client);
let InstallResult(resources, _crd, deployment) = manager
.install(namespace, secret.into(), "debian", "info", identity)
.await
.unwrap();
let datum = manager
.helper()
.get_secret(secret_name)
.await
.unwrap()
.data
.unwrap_or_default()
.remove("things")
.unwrap();
assert_eq!(datum, stuff);
manager
.helper()
.get_deployment(&deployment.name(), namespace)
.await
.unwrap();
tracing::trace!(?resources, ?deployment);
let deployment_name = deployment.name();
unsafe {
manager
.uninstall(namespace, resources, deployment)
.await
.unwrap();
}
manager
.helper()
.get_secret(secret_name)
.await
.expect_err("secret not deleted");
manager
.helper()
.get_deployment(&deployment_name, namespace)
.await
.expect_err("deployment no deleted");
}
#[derive(
Clone, Debug, Serialize, Deserialize, CustomResource, JsonSchema, PartialEq, Eq, Hash,
)]
#[kube(
group = "statehub.cloud",
version = "v1alpha1",
kind = "PartialUninstallCrd",
status = "IdStatus",
shortname = "builder",
crates(k8s_openapi = "k8s::openapi")
)]
struct IdSpecPartialUninstall {
id: uuid::Uuid,
}
#[tokio::test]
#[ignore]
async fn partial_uninstall() {
use std::convert::identity;
init_tracing_subscriber();
let namespace = "default";
let secret_name = "image-pull-secret-partial-uninstall";
let client = Kubectl::try_default().await.unwrap();
let stuff = crate::k8s::openapi::ByteString(b"stuff".to_vec());
let secret = corev1::Secret::new(secret_name).data([("things".to_string(), stuff.clone())]);
let manager = BuilderManager::<PartialUninstallCrd>::new(client);
let InstallResult(resources, _crd, deployment) = manager
.install(namespace, secret.into(), "debian", "info", identity)
.await
.unwrap();
let helper = manager.helper();
let datum = helper
.get_secret(secret_name)
.await
.unwrap()
.data
.unwrap_or_default()
.remove("things")
.unwrap();
assert_eq!(datum, stuff);
tracing::trace!(?resources, ?deployment);
let deployment_name = deployment.name();
let rules = BuilderManager::<PartialUninstallCrd>::rules();
let new_rule = rules.first().unwrap();
for _ in 0..3 {
let deployment = helper
.get_deployment(&deployment_name, namespace)
.await
.unwrap();
manager.uninstall_self(namespace, deployment).await.unwrap();
helper
.get_deployment(&deployment_name, namespace)
.await
.expect_err("deployment no deleted");
let rules = helper
.api::<rbacv1::ClusterRole>()
.get(&resources.cluster_role.name())
.await
.unwrap()
.rules
.unwrap_or_default();
assert!(
rules
.iter()
.all(|rule| rule.resources.iter().flatten().sorted().as_slice()
!= new_rule.resources.iter().flatten().sorted().as_slice()),
"{new_rule:?} not found in {rules:?}"
);
manager
.install_self(
namespace,
&resources.service_account.name(),
"debian",
"info",
identity,
)
.await
.unwrap();
let rules = helper
.api::<rbacv1::ClusterRole>()
.get(&resources.cluster_role.name())
.await
.unwrap()
.rules
.unwrap_or_default();
assert!(
rules
.iter()
.any(|rule| rule.resources.iter().flatten().sorted().as_slice()
== new_rule.resources.iter().flatten().sorted().as_slice()),
"{new_rule:?} not found in {rules:?}"
)
}
let deployment = helper
.get_deployment(&deployment_name, namespace)
.await
.unwrap();
unsafe {
manager
.uninstall(namespace, resources, deployment)
.await
.unwrap();
}
}
}