use crate::PodSecurityStandard;
use anyhow::bail;
use k8s_openapi::api::core::v1::{Namespace, Node};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use k8s_openapi::{ByteString, NamespaceResourceScope};
use kube::api::{
ApiResource, DeleteParams, DynamicObject, GroupVersionKind, ListParams, Patch, PatchParams,
PostParams,
};
use kube::core::gvk::ParseGroupVersionError;
use kube::discovery::{ApiCapabilities, Scope};
use kube::{Api, Client, Discovery, Resource, ResourceExt};
use saphyr::{LoadableYamlNode, Yaml, YamlEmitter};
use serde_json::json;
use std::collections::BTreeMap;
use thiserror::Error;
fn dynamic_api(
ar: &ApiResource,
caps: &ApiCapabilities,
client: Client,
ns: Option<&str>,
all: bool,
) -> Api<DynamicObject> {
if caps.scope == Scope::Cluster || all {
Api::all_with(client, ar)
} else if let Some(namespace) = ns {
Api::namespaced_with(client, namespace, ar)
} else {
Api::default_namespaced_with(client, ar)
}
}
#[derive(Error, Debug)]
pub enum ApplyError {
#[error("A Kubernetes error occurred: {0}")]
Kube(kube::Error),
#[error("Failed to parse document: {0}")]
Parse(serde_yaml::Error),
#[error("Failed to parse group version: {0}")]
ParseGv(ParseGroupVersionError),
#[error("Failed to infer type from document")]
MissingType,
}
impl From<kube::Error> for ApplyError {
fn from(e: kube::Error) -> Self {
Self::Kube(e)
}
}
impl From<serde_yaml::Error> for ApplyError {
fn from(e: serde_yaml::Error) -> Self {
Self::Parse(e)
}
}
impl From<ParseGroupVersionError> for ApplyError {
fn from(e: ParseGroupVersionError) -> Self {
Self::ParseGv(e)
}
}
pub async fn apply_any(client: Client, data: Vec<serde_yaml::Value>) -> Result<(), ApplyError> {
let discovery = Discovery::new(client.clone()).run().await?;
let ssapply = PatchParams::apply("n5i").force();
for doc in data {
let obj: DynamicObject = serde_yaml::from_value(doc)?;
let namespace = obj.metadata.namespace.as_deref();
let gvk = if let Some(tm) = &obj.types {
GroupVersionKind::try_from(tm)?
} else {
return Err(ApplyError::MissingType);
};
let name = obj.name_any();
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
let api = dynamic_api(&ar, &caps, client.clone(), namespace, false);
tracing::debug!("Applying {}: \n{}", gvk.kind, serde_yaml::to_string(&obj)?);
let data: serde_yaml::Value = serde_yaml::to_value(&obj)?;
let _r = api.patch(&name, &ssapply, &Patch::Apply(data)).await?;
tracing::debug!("Applied {} {}", gvk.kind, name);
} else {
tracing::warn!("Cannot apply document for unknown {:?}", gvk);
}
}
Ok(())
}
pub async fn apply<ResourceType>(client: Client, resources: &[ResourceType]) -> kube::Result<()>
where
<ResourceType as Resource>::DynamicType: Default,
ResourceType: Resource
+ Clone
+ serde::de::DeserializeOwned
+ std::fmt::Debug
+ serde::Serialize
+ Send
+ Sync,
{
let ssapply = PatchParams::apply("n5i").force();
let api: Api<ResourceType> = Api::all(client);
for resource in resources {
let name = resource.name_any();
api.patch(&name, &ssapply, &Patch::Apply(resource)).await?;
}
Ok(())
}
pub async fn apply_with_ns<ResourceType>(
client: Client,
resources: &[ResourceType],
namespace: &str,
) -> kube::Result<()>
where
<ResourceType as Resource>::DynamicType: Default,
ResourceType: Resource<Scope = NamespaceResourceScope>
+ Clone
+ serde::de::DeserializeOwned
+ std::fmt::Debug
+ serde::Serialize
+ Send
+ Sync,
{
let ssapply = PatchParams::apply("n5i").force();
let api: Api<ResourceType> = Api::namespaced(client, namespace);
for resource in resources {
let name = resource.name_any();
api.patch(&name, &ssapply, &Patch::Apply(resource)).await?;
}
Ok(())
}
pub fn multidoc_deserialize(data: &str) -> Result<Vec<serde_yaml::Value>, serde_yaml::Error> {
use serde::Deserialize;
let mut docs = vec![];
for de in serde_yaml::Deserializer::from_str(data) {
docs.push(serde_yaml::Value::deserialize(de)?);
}
Ok(docs)
}
pub fn multidoc_serialize<T: serde::Serialize>(data: Vec<T>) -> Result<String, serde_yaml::Error> {
let mut output = String::new();
for item in data {
let yaml_str = serde_yaml::to_string(&item)?;
let mut saphyr_yml = Yaml::load_from_str(&yaml_str).unwrap();
assert_eq!(saphyr_yml.len(), 1);
{
let mut emitter = YamlEmitter::new(&mut output);
let saphyr_yml = saphyr_yml.remove(0);
emitter.dump(&saphyr_yml).expect("TODO: ");
}
output += "\n";
}
Ok(output)
}
pub async fn ensure_user_namespace(client: Client, user_id: &str) -> kube::Result<()> {
let namespace_api: Api<Namespace> = Api::all(client);
let lp = ListParams {
field_selector: Some(format!("metadata.name={user_id}")),
..Default::default()
};
let currently_deployed_namespaces = namespace_api.list(&lp).await?;
if currently_deployed_namespaces.items.is_empty() {
let pp = PostParams::default();
let namespace = Namespace {
metadata: ObjectMeta {
name: Some(user_id.to_string()),
labels: Some(BTreeMap::from([(
"n5i.dev/user-id".to_string(),
user_id.to_string(),
)])),
..Default::default()
},
..Default::default()
};
namespace_api.create(&pp, &namespace).await?;
}
Ok(())
}
pub async fn ensure_app_namespace(
client: Client,
namespace: &str,
security_policy: PodSecurityStandard,
user_id: String,
app_id: String,
) -> kube::Result<()> {
let namespace_api: Api<Namespace> = Api::all(client);
let lp = ListParams {
field_selector: Some(format!("metadata.name={namespace}")),
..Default::default()
};
let currently_deployed_namespaces = namespace_api.list(&lp).await?;
if let Some(ns) = currently_deployed_namespaces.items.first() {
let mut labels = ns.metadata.labels.clone().unwrap_or_default();
labels.insert(
"pod-security.kubernetes.io/enforce".to_string(),
match security_policy {
PodSecurityStandard::Restricted => "restricted",
PodSecurityStandard::Baseline => "baseline",
PodSecurityStandard::Privileged => "privileged",
}
.to_string(),
);
labels.insert("n5i.dev/user-id".to_string(), user_id);
labels.insert("n5i.dev/app-id".to_string(), app_id);
let patch = json!({
"metadata": {
"labels": labels
}
});
let pp = PatchParams::default();
namespace_api
.patch(namespace, &pp, &Patch::Merge(&patch))
.await?;
} else {
let pp = PostParams::default();
let namespace = Namespace {
metadata: ObjectMeta {
name: Some(namespace.to_string()),
labels: Some(BTreeMap::from([(
"pod-security.kubernetes.io/enforce".to_string(),
match security_policy {
PodSecurityStandard::Restricted => "restricted",
PodSecurityStandard::Baseline => "baseline",
PodSecurityStandard::Privileged => "privileged",
}
.to_string(),
)])),
..Default::default()
},
..Default::default()
};
namespace_api.create(&pp, &namespace).await?;
}
Ok(())
}
pub async fn delete_namespace(client: Client, namespace: &str) -> kube::Result<()> {
let namespace_api: Api<Namespace> = Api::all(client);
let dp = DeleteParams::default();
namespace_api.delete(namespace, &dp).await?;
Ok(())
}
pub async fn create_secret(
client: &Client,
namespace: &str,
name: &str,
data: &BTreeMap<String, String>,
) -> kube::Result<()> {
let secret = k8s_openapi::api::core::v1::Secret {
data: Some(
data.clone()
.into_iter()
.map(|(k, v)| (k, ByteString(v.into())))
.collect(),
),
metadata: ObjectMeta {
name: Some(name.into()),
..Default::default()
},
..Default::default()
};
let api: Api<k8s_openapi::api::core::v1::Secret> = Api::namespaced(client.clone(), namespace);
let lp = ListParams::default()
.fields(&format!("metadata.name={name}"))
.timeout(10);
let secrets = api.list(&lp).await?;
if let Some(original_secret) = secrets.items.into_iter().next() {
let mut secret = secret;
secret.metadata.resource_version = original_secret.metadata.resource_version;
api.replace(name, &PostParams::default(), &secret).await?;
} else {
api.create(&PostParams::default(), &secret).await?;
}
Ok(())
}
pub async fn remove_all_from_ns<ResourceType>(client: Client, namespace: &str) -> kube::Result<()>
where
<ResourceType as Resource>::DynamicType: Default,
ResourceType: Resource<Scope = NamespaceResourceScope>
+ Clone
+ serde::de::DeserializeOwned
+ std::fmt::Debug
+ serde::Serialize,
{
let api: Api<ResourceType> = Api::namespaced(client, namespace);
let lp = ListParams::default();
let resources = api.list(&lp).await?;
for resource in resources.items {
let name = resource.name_any();
let dp = DeleteParams::default();
api.delete(&name, &dp).await?;
}
Ok(())
}
pub async fn scale_all_in_ns<ResourceType>(
client: Client,
namespace: &str,
amount: u8,
) -> kube::Result<()>
where
<ResourceType as Resource>::DynamicType: Default,
ResourceType: Resource<Scope = NamespaceResourceScope>
+ Clone
+ serde::de::DeserializeOwned
+ std::fmt::Debug
+ serde::Serialize,
{
let api: Api<ResourceType> = Api::namespaced(client, namespace);
let lp = ListParams::default();
let resources = api.list(&lp).await?;
let fs = json!({
"spec": { "replicas": amount }
});
for resource in resources.items {
let name = resource.name_any();
let pp = PatchParams::default();
api.patch_scale(&name, &pp, &Patch::Merge(&fs)).await?;
}
Ok(())
}
pub async fn set_job_suspense<ResourceType>(
client: Client,
namespace: &str,
suspend: bool,
) -> kube::Result<()>
where
<ResourceType as Resource>::DynamicType: Default,
ResourceType: Resource<Scope = NamespaceResourceScope>
+ Clone
+ serde::de::DeserializeOwned
+ std::fmt::Debug
+ serde::Serialize,
{
let api: Api<ResourceType> = Api::namespaced(client, namespace);
let patch = json!({
"spec": {
"suspend": suspend,
}
});
let lp = ListParams::default();
let resources = api.list(&lp).await?;
for resource in resources.items {
let name = resource.name_any();
let pp = PatchParams::default();
api.patch(&name, &pp, &Patch::Merge(&patch)).await?;
}
Ok(())
}
pub async fn get_node_ips(client: &Client) -> anyhow::Result<Vec<String>> {
#[cfg(feature = "__development")]
return Ok(std::env::var("NODE_IP")?);
let api: Api<Node> = Api::all(client.clone());
let lp = ListParams::default();
let nodes = api.list(&lp).await?;
let node = nodes
.items
.into_iter()
.next()
.ok_or_else(|| anyhow::anyhow!("No nodes found!"))?;
let labels = node.metadata.annotations.unwrap_or_default();
let ips = labels
.get("alpha.kubernetes.io/provided-node-ip")
.or_else(|| labels.get("k3s.io/internal-ip"))
.ok_or_else(|| anyhow::anyhow!("No node IP annotation found!"))?;
Ok(ips.split(',').map(String::from).collect::<Vec<String>>())
}
pub async fn get_node_ip(client: &Client) -> anyhow::Result<String> {
let mut ips = get_node_ips(client).await?;
sort_ips(&mut ips);
if ips.is_empty() {
bail!("No node ips found!");
}
Ok(ips.remove(0))
}
fn sort_ips(ips: &mut [String]) {
ips.sort_by(|a, b| {
if a.contains(':') && !b.contains(':') {
std::cmp::Ordering::Greater
} else if !a.contains(':') && b.contains(':') {
std::cmp::Ordering::Less
} else if a.starts_with("10.") && !b.starts_with("10.") {
std::cmp::Ordering::Greater
} else if !a.starts_with("10.") && b.starts_with("10.") {
std::cmp::Ordering::Less
} else if a.starts_with("192.168.") && !b.starts_with("192.168.") {
std::cmp::Ordering::Greater
} else if !a.starts_with("192.168.") && b.starts_with("192.168.") {
std::cmp::Ordering::Less
} else if a.starts_with("172.") && !b.starts_with("172.") {
std::cmp::Ordering::Greater
} else if !a.starts_with("172.") && b.starts_with("172.") {
std::cmp::Ordering::Less
} else {
a.cmp(b)
}
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sort_ips() {
let mut ips = [
"10.42.0.3".to_string(),
"65.109.113.186".to_string(),
"192.168.1.1".to_string(),
"172.16.0.1".to_string(),
"2001:db8::1".to_string(),
"8.8.8.8".to_string(),
];
sort_ips(&mut ips);
assert_eq!(
ips,
[
"65.109.113.186".to_string(),
"8.8.8.8".to_string(),
"172.16.0.1".to_string(),
"192.168.1.1".to_string(),
"10.42.0.3".to_string(),
"2001:db8::1".to_string()
]
);
}
}