use crate::PodSecurityStandard;
use anyhow::bail;
use k8s_openapi::api::core::v1::{Namespace, Node};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use k8s_openapi::{ByteString, NamespaceResourceScope};
use kube::api::{
ApiResource, DeleteParams, DynamicObject, GroupVersionKind, ListParams, Patch, PatchParams,
PostParams,
};
use kube::core::gvk::ParseGroupVersionError;
use kube::discovery::{ApiCapabilities, Scope};
use kube::{Api, Client, Discovery, Resource, ResourceExt};
use saphyr::{LoadableYamlNode, Yaml, YamlEmitter};
use serde_json::json;
use std::collections::BTreeMap;
use thiserror::Error;
fn dynamic_api(
ar: ApiResource,
caps: ApiCapabilities,
client: Client,
ns: Option<&str>,
all: bool,
) -> Api<DynamicObject> {
if caps.scope == Scope::Cluster || all {
Api::all_with(client, &ar)
} else if let Some(namespace) = ns {
Api::namespaced_with(client, namespace, &ar)
} else {
Api::default_namespaced_with(client, &ar)
}
}
#[derive(Error, Debug)]
pub enum ApplyError {
#[error("A Kubernetes error occurred: {0}")]
Kube(kube::Error),
#[error("Failed to parse document: {0}")]
Parse(serde_yaml::Error),
#[error("Failed to parse group version: {0}")]
ParseGv(ParseGroupVersionError),
#[error("Failed to infer type from document")]
MissingType,
}
impl From<kube::Error> for ApplyError {
fn from(e: kube::Error) -> Self {
ApplyError::Kube(e)
}
}
impl From<serde_yaml::Error> for ApplyError {
fn from(e: serde_yaml::Error) -> Self {
ApplyError::Parse(e)
}
}
impl From<ParseGroupVersionError> for ApplyError {
fn from(e: ParseGroupVersionError) -> Self {
ApplyError::ParseGv(e)
}
}
pub async fn apply_any(client: Client, data: Vec<serde_yaml::Value>) -> Result<(), ApplyError> {
let discovery = Discovery::new(client.clone()).run().await?;
let ssapply = PatchParams::apply("n5i").force();
for doc in data {
let obj: DynamicObject = serde_yaml::from_value(doc)?;
let namespace = obj.metadata.namespace.as_deref();
let gvk = if let Some(tm) = &obj.types {
GroupVersionKind::try_from(tm)?
} else {
return Err(ApplyError::MissingType);
};
let name = obj.name_any();
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
let api = dynamic_api(ar, caps, client.clone(), namespace, false);
tracing::debug!("Applying {}: \n{}", gvk.kind, serde_yaml::to_string(&obj)?);
let data: serde_yaml::Value = serde_yaml::to_value(&obj)?;
let _r = api.patch(&name, &ssapply, &Patch::Apply(data)).await?;
tracing::debug!("Applied {} {}", gvk.kind, name);
} else {
tracing::warn!("Cannot apply document for unknown {:?}", gvk);
}
}
Ok(())
}
pub async fn apply<ResourceType>(client: Client, resources: &[ResourceType]) -> kube::Result<()>
where
<ResourceType as Resource>::DynamicType: Default,
ResourceType:
Resource + Clone + serde::de::DeserializeOwned + std::fmt::Debug + serde::Serialize,
{
let ssapply = PatchParams::apply("n5i").force();
let api: Api<ResourceType> = Api::all(client);
for resource in resources {
let name = resource.name_any();
api.patch(&name, &ssapply, &Patch::Apply(resource)).await?;
}
Ok(())
}
pub async fn apply_with_ns<ResourceType>(
client: Client,
resources: &[ResourceType],
namespace: &str,
) -> kube::Result<()>
where
<ResourceType as Resource>::DynamicType: Default,
ResourceType: Resource<Scope = NamespaceResourceScope>
+ Clone
+ serde::de::DeserializeOwned
+ std::fmt::Debug
+ serde::Serialize,
{
let ssapply = PatchParams::apply("n5i").force();
let api: Api<ResourceType> = Api::namespaced(client, namespace);
for resource in resources {
let name = resource.name_any();
api.patch(&name, &ssapply, &Patch::Apply(resource)).await?;
}
Ok(())
}
pub fn multidoc_deserialize(data: &str) -> Result<Vec<serde_yaml::Value>, serde_yaml::Error> {
use serde::Deserialize;
let mut docs = vec![];
for de in serde_yaml::Deserializer::from_str(data) {
docs.push(serde_yaml::Value::deserialize(de)?);
}
Ok(docs)
}
pub fn multidoc_serialize<T: serde::Serialize>(data: Vec<T>) -> Result<String, serde_yaml::Error> {
let mut output = String::new();
for item in data {
let yaml_str = serde_yaml::to_string(&item)?;
let mut saphyr_yml = Yaml::load_from_str(&yaml_str).unwrap();
assert_eq!(saphyr_yml.len(), 1);
{
let mut emitter = YamlEmitter::new(&mut output);
let saphyr_yml = saphyr_yml.remove(0);
emitter.dump(&saphyr_yml).expect("TODO: ");
}
output += "\n";
}
Ok(output)
}
pub async fn create_namespace(client: Client, namespace: &str) -> kube::Result<()> {
let namespace_api: Api<Namespace> = Api::all(client);
let lp = ListParams {
field_selector: Some(format!("metadata.name={namespace}")),
..Default::default()
};
let currently_deployed_namespaces = namespace_api.list(&lp).await?;
if currently_deployed_namespaces.items.is_empty() {
let pp = PostParams::default();
let namespace = Namespace {
metadata: ObjectMeta {
name: Some(namespace.to_string()),
..Default::default()
},
..Default::default()
};
namespace_api.create(&pp, &namespace).await?;
}
Ok(())
}
pub async fn ensure_restricted_namespace(
client: Client,
namespace: &str,
security_policy: PodSecurityStandard,
) -> kube::Result<()> {
let namespace_api: Api<Namespace> = Api::all(client);
let lp = ListParams {
field_selector: Some(format!("metadata.name={namespace}")),
..Default::default()
};
let currently_deployed_namespaces = namespace_api.list(&lp).await?;
if let Some(ns) = currently_deployed_namespaces.items.first() {
let mut labels = ns.metadata.labels.clone().unwrap_or_default();
labels.insert(
"pod-security.kubernetes.io/enforce".to_string(),
match security_policy {
PodSecurityStandard::Restricted => "restricted",
PodSecurityStandard::Baseline => "baseline",
PodSecurityStandard::Privileged => "privileged",
}
.to_string(),
);
let patch = json!({
"metadata": {
"labels": labels
}
});
let pp = PatchParams::default();
namespace_api
.patch(namespace, &pp, &Patch::Merge(&patch))
.await?;
} else {
let pp = PostParams::default();
let namespace = Namespace {
metadata: ObjectMeta {
name: Some(namespace.to_string()),
labels: Some(BTreeMap::from([(
"pod-security.kubernetes.io/enforce".to_string(),
match security_policy {
PodSecurityStandard::Restricted => "restricted",
PodSecurityStandard::Baseline => "baseline",
PodSecurityStandard::Privileged => "privileged",
}
.to_string(),
)])),
..Default::default()
},
..Default::default()
};
namespace_api.create(&pp, &namespace).await?;
}
Ok(())
}
pub async fn delete_namespace(client: Client, namespace: &str) -> kube::Result<()> {
let namespace_api: Api<Namespace> = Api::all(client);
let dp = DeleteParams::default();
namespace_api.delete(namespace, &dp).await?;
Ok(())
}
pub async fn create_secret(
client: &Client,
namespace: &str,
name: &str,
data: &BTreeMap<String, String>,
) -> kube::Result<()> {
let secret = k8s_openapi::api::core::v1::Secret {
data: Some(
data.clone()
.into_iter()
.map(|(k, v)| (k, ByteString(v.into())))
.collect(),
),
metadata: ObjectMeta {
name: Some(name.into()),
..Default::default()
},
..Default::default()
};
let api: Api<k8s_openapi::api::core::v1::Secret> = Api::namespaced(client.clone(), namespace);
let lp = ListParams::default()
.fields(&format!("metadata.name={name}"))
.timeout(10);
let secrets = api.list(&lp).await?;
if let Some(original_secret) = secrets.items.into_iter().next() {
let mut secret = secret;
secret.metadata.resource_version = original_secret.metadata.resource_version;
api.replace(name, &Default::default(), &secret).await?;
} else {
api.create(&Default::default(), &secret).await?;
}
Ok(())
}
pub async fn remove_all_from_ns<ResourceType>(client: Client, namespace: &str) -> kube::Result<()>
where
<ResourceType as Resource>::DynamicType: Default,
ResourceType: Resource<Scope = NamespaceResourceScope>
+ Clone
+ serde::de::DeserializeOwned
+ std::fmt::Debug
+ serde::Serialize,
{
let api: Api<ResourceType> = Api::namespaced(client, namespace);
let lp = ListParams::default();
let resources = api.list(&lp).await?;
for resource in resources.items {
let name = resource.name_any();
let dp = DeleteParams::default();
api.delete(&name, &dp).await?;
}
Ok(())
}
pub async fn scale_all_in_ns<ResourceType>(
client: Client,
namespace: &str,
amount: u8,
) -> kube::Result<()>
where
<ResourceType as Resource>::DynamicType: Default,
ResourceType: Resource<Scope = NamespaceResourceScope>
+ Clone
+ serde::de::DeserializeOwned
+ std::fmt::Debug
+ serde::Serialize,
{
let api: Api<ResourceType> = Api::namespaced(client, namespace);
let lp = ListParams::default();
let resources = api.list(&lp).await?;
let fs = json!({
"spec": { "replicas": amount }
});
for resource in resources.items {
let name = resource.name_any();
let pp = PatchParams::default();
api.patch_scale(&name, &pp, &Patch::Merge(&fs)).await?;
}
Ok(())
}
pub async fn set_job_suspense<ResourceType>(
client: Client,
namespace: &str,
suspend: bool,
) -> kube::Result<()>
where
<ResourceType as Resource>::DynamicType: Default,
ResourceType: Resource<Scope = NamespaceResourceScope>
+ Clone
+ serde::de::DeserializeOwned
+ std::fmt::Debug
+ serde::Serialize,
{
let api: Api<ResourceType> = Api::namespaced(client, namespace);
let patch = json!({
"spec": {
"suspend": suspend,
}
});
let lp = ListParams::default();
let resources = api.list(&lp).await?;
for resource in resources.items {
let name = resource.name_any();
let pp = PatchParams::default();
api.patch(&name, &pp, &Patch::Merge(&patch)).await?;
}
Ok(())
}
pub async fn get_node_ip(client: &Client) -> anyhow::Result<String> {
#[cfg(feature = "__development")]
return Ok(std::env::var("NODE_IP")?);
let api: Api<Node> = Api::all(client.clone());
let lp = ListParams::default();
let nodes = api.list(&lp).await?;
let node = nodes
.items
.into_iter()
.next()
.ok_or_else(|| anyhow::anyhow!("No nodes found!"))?;
let ips = node
.metadata
.annotations
.unwrap_or_default()
.remove("alpha.kubernetes.io/provided-node-ip")
.ok_or(anyhow::anyhow!("Failed to get node IPs"))?;
let mut ips = ips.split(',').collect::<Vec<&str>>();
if ips.is_empty() {
bail!("Failed to get node IPs");
}
sort_ips(&mut ips);
Ok(ips[0].to_string())
}
fn sort_ips(ips: &mut [&str]) {
ips.sort_by(|a, b| {
if a.contains(":") && !b.contains(":") {
std::cmp::Ordering::Greater
} else if !a.contains(":") && b.contains(":") {
std::cmp::Ordering::Less
} else if a.starts_with("10.") && !b.starts_with("10.") {
std::cmp::Ordering::Greater
} else if !a.starts_with("10.") && b.starts_with("10.") {
std::cmp::Ordering::Less
} else if a.starts_with("192.168.") && !b.starts_with("192.168.") {
std::cmp::Ordering::Greater
} else if !a.starts_with("192.168.") && b.starts_with("192.168.") {
std::cmp::Ordering::Less
} else if a.starts_with("172.") && !b.starts_with("172.") {
std::cmp::Ordering::Greater
} else if !a.starts_with("172.") && b.starts_with("172.") {
std::cmp::Ordering::Less
} else {
a.cmp(b)
}
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sort_ips() {
let mut ips = [
"10.42.0.3",
"65.109.113.186",
"192.168.1.1",
"172.16.0.1",
"2001:db8::1",
"8.8.8.8",
];
sort_ips(&mut ips);
assert_eq!(
ips,
[
"65.109.113.186",
"8.8.8.8",
"172.16.0.1",
"192.168.1.1",
"10.42.0.3",
"2001:db8::1"
]
);
}
}