use k8s_openapi::api::{
core::v1::{Endpoints, Service},
networking::v1::NetworkPolicy,
};
use kube::{
api::{Patch, PatchParams},
runtime::controller::Action,
Api, Client,
};
use serde_json::Value;
use std::time::Duration;
use tracing::{debug, error};
pub async fn reconcile_network_policies(client: Client, namespace: &str) -> Result<(), Action> {
let kubernetes_api_ip_addresses = lookup_kubernetes_api_ips(&client).await?;
let np_api: Api<NetworkPolicy> = Api::namespaced(client, namespace);
let deny_all = serde_json::json!({
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {
"name": format!("deny-all"),
"namespace": format!("{namespace}"),
},
"spec": {
"podSelector": {},
"policyTypes": [
"Egress",
"Ingress"
],
}
});
apply_network_policy(namespace, &np_api, deny_all).await?;
let allow_dns = serde_json::json!({
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {
"name": "allow-egress-to-kube-dns",
"namespace": format!("{namespace}"),
},
"spec": {
"podSelector": {},
"policyTypes": [
"Egress"
],
"egress": [
{
"to": [
{
"podSelector": {
"matchLabels": {
"k8s-app": "kube-dns"
}
},
"namespaceSelector": {
"matchLabels": {
"kubernetes.io/metadata.name": "kube-system"
}
}
}
],
"ports": [
{
"protocol": "UDP",
"port": 53
},
{
"protocol": "TCP",
"port": 53
}
]
}
]
}
});
apply_network_policy(namespace, &np_api, allow_dns).await?;
let allow_system_ingress = serde_json::json!({
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {
"name": "allow-system",
"namespace": format!("{namespace}"),
},
"spec": {
"podSelector": {},
"policyTypes": ["Ingress"],
"ingress": [
{
"from": [
{
"namespaceSelector": {
"matchLabels": {
"kubernetes.io/metadata.name": "monitoring"
}
}
},
{
"namespaceSelector": {
"matchLabels": {
"kubernetes.io/metadata.name": "cnpg-system"
}
}
},
{
"namespaceSelector": {
"matchLabels": {
"kubernetes.io/metadata.name": "coredb-operator"
}
}
},
{
"namespaceSelector": {
"matchLabels": {
"kubernetes.io/metadata.name": "traefik"
}
}
}
]
}
]
}
});
apply_network_policy(namespace, &np_api, allow_system_ingress).await?;
let allow_system_egress = serde_json::json!({
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {
"name": "allow-system-egress",
"namespace": format!("{namespace}"),
},
"spec": {
"podSelector": {},
"policyTypes": ["Egress"],
"egress": [
{
"to": [
{
"namespaceSelector": {
"matchLabels": {
"kubernetes.io/metadata.name": "minio"
}
}
}
]
}
]
}
});
apply_network_policy(namespace, &np_api, allow_system_egress).await?;
let allow_public_internet = serde_json::json!({
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {
"name": "allow-egress-to-internet",
"namespace": format!("{namespace}"),
},
"spec": {
"podSelector": {},
"policyTypes": ["Egress"],
"egress": [
{
"to": [
{
"ipBlock": {
"cidr": "0.0.0.0/0",
"except": [
"10.0.0.0/8",
"172.16.0.0/12",
"192.168.0.0/16"
]
}
}
]
}
]
}
});
apply_network_policy(namespace, &np_api, allow_public_internet).await?;
let allow_within_namespace = serde_json::json!({
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {
"name": "allow-within-namespace",
"namespace": format!("{namespace}"),
},
"spec": {
"podSelector": {},
"policyTypes": ["Ingress", "Egress"],
"ingress": [
{
"from": [
{
"podSelector": {}
}
]
}
],
"egress": [
{
"to": [
{
"podSelector": {}
}
]
}
]
}
});
apply_network_policy(namespace, &np_api, allow_within_namespace).await?;
let mut ip_list_kube_api = Vec::new();
for ip_address in kubernetes_api_ip_addresses {
ip_list_kube_api.push(serde_json::json!({
"ipBlock": {
"cidr": format!("{}/32", ip_address)
}
}));
}
let allow_kube_api = serde_json::json!({
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {
"name": "allow-kube-api",
"namespace": format!("{namespace}"),
},
"spec": {
"podSelector": {},
"policyTypes": ["Egress"],
"egress": [
{
"to": ip_list_kube_api
}
]
}
});
apply_network_policy(namespace, &np_api, allow_kube_api).await?;
Ok(())
}
async fn lookup_kubernetes_api_ips(client: &Client) -> Result<Vec<String>, Action> {
let service_api = Api::<Service>::namespaced(client.clone(), "default");
let kubernetes_service = match service_api.get("kubernetes").await {
Ok(s) => s,
Err(_) => {
error!("Failed to get kubernetes service");
return Err(Action::requeue(Duration::from_secs(300)));
}
};
let kubernetes_service_spec = match kubernetes_service.spec {
Some(s) => s,
None => {
error!("while discovering kubernetes API IP address, service has no spec");
return Err(Action::requeue(Duration::from_secs(300)));
}
};
let cluster_ip = match kubernetes_service_spec.cluster_ip.clone() {
Some(c) => c,
None => {
error!("while discovering kubernetes API IP address, service has no cluster IP");
return Err(Action::requeue(Duration::from_secs(300)));
}
};
let mut results = Vec::new();
results.push(cluster_ip);
let endpoints_api = Api::<Endpoints>::namespaced(client.clone(), "default");
let kubernetes_endpoint = match endpoints_api.get("kubernetes").await {
Ok(endpoint) => endpoint,
Err(e) => {
error!("Failed to get kubernetes endpoint: {}", e);
return Err(Action::requeue(Duration::from_secs(300)));
}
};
let kubernetes_endpoint_subsets = match kubernetes_endpoint.subsets {
Some(s) => s,
None => {
error!("while discovering kubernetes API IP address, endpoint has no subsets");
return Err(Action::requeue(Duration::from_secs(300)));
}
};
if kubernetes_endpoint_subsets.is_empty() {
error!("While discovering kubernetes API IP address, found no endpoints");
return Err(Action::requeue(Duration::from_secs(300)));
}
for subset in kubernetes_endpoint_subsets {
let addresses = match subset.addresses {
Some(a) => a,
None => {
error!("while discovering kubernetes API IP address, endpoint subset has no addresses");
return Err(Action::requeue(Duration::from_secs(300)));
}
};
for address in addresses {
results.push(address.ip);
}
}
results.sort();
Ok(results)
}
async fn apply_network_policy(namespace: &str, np_api: &Api<NetworkPolicy>, np: Value) -> Result<(), Action> {
let network_policy: NetworkPolicy = match serde_json::from_value(np) {
Ok(np) => np,
Err(_) => {
error!("Failed to deserialize Network Policy namespace {}", namespace);
return Err(Action::requeue(Duration::from_secs(300)));
}
};
let name = network_policy
.metadata
.name
.clone()
.expect("There is always a name for a network policy")
.clone();
let params: PatchParams = PatchParams::apply("conductor").force();
debug!("\nApplying Network Policy {} in namespace {}", name, namespace);
let _o: NetworkPolicy = match np_api.patch(&name, ¶ms, &Patch::Apply(&network_policy)).await {
Ok(np) => np,
Err(_) => {
error!(
"Failed to create Network Policy {} in namespace {}",
name, namespace
);
return Err(Action::requeue(Duration::from_secs(300)));
}
};
Ok(())
}