use std::time::{Duration, Instant};
use k8s_openapi::api::core::v1::Pod;
use k8s_openapi::api::networking::v1::NetworkPolicy;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;
use kube::api::{Api, AttachParams, DeleteParams, ListParams, PostParams};
use kube::Client;
use tokio::io::AsyncReadExt;
use crate::labels;
#[derive(Debug, thiserror::Error)]
pub enum K8sError {
#[error("kubernetes API error: {0}")]
Kube(#[from] kube::Error),
#[error("failed to construct Kubernetes client: {0}")]
Connect(String),
#[error("timed out: {0}")]
Timeout(String),
#[error("{0}")]
Other(String),
}
#[derive(Debug, Clone)]
pub struct ExecOutput {
pub stdout: Vec<u8>,
pub stderr: String,
pub exit_code: Option<i32>,
}
impl ExecOutput {
pub fn success(&self) -> bool {
self.exit_code == Some(0)
}
pub fn stdout_str(&self) -> std::borrow::Cow<'_, str> {
String::from_utf8_lossy(&self.stdout)
}
}
#[derive(Clone)]
pub struct K8sClient {
client: Client,
namespace: String,
instance_id: String,
}
impl K8sClient {
pub async fn connect(namespace: impl Into<String>) -> Result<Self, K8sError> {
ensure_crypto_provider();
let client = Client::try_default()
.await
.map_err(|e| K8sError::Connect(e.to_string()))?;
Ok(Self::from_client(client, namespace.into()))
}
pub fn from_client(client: Client, namespace: String) -> Self {
Self {
client,
namespace,
instance_id: labels::instance_id(),
}
}
pub fn namespace(&self) -> &str {
&self.namespace
}
pub fn instance_id(&self) -> &str {
&self.instance_id
}
pub fn client(&self) -> &Client {
&self.client
}
pub fn pods(&self) -> Api<Pod> {
Api::namespaced(self.client.clone(), &self.namespace)
}
pub async fn create_pod(&self, pod: &Pod) -> Result<(), K8sError> {
let name = pod
.metadata
.name
.clone()
.ok_or_else(|| K8sError::Other("pod spec has no metadata.name".into()))?;
let api = self.pods();
let _ = api.delete(&name, &DeleteParams::default()).await;
for attempt in 0..6 {
match api.create(&PostParams::default(), pod).await {
Ok(_) => return Ok(()),
Err(kube::Error::Api(e)) if e.code == 409 && attempt < 5 => {
tokio::time::sleep(Duration::from_millis(500)).await;
let _ = api.delete(&name, &DeleteParams::default()).await;
continue;
}
Err(e) => return Err(K8sError::Kube(e)),
}
}
Err(K8sError::Timeout(format!(
"pod {name} could not be created after repeated 409 conflicts"
)))
}
pub async fn wait_for_pod_ip(&self, name: &str, timeout: Duration) -> Result<String, K8sError> {
let api = self.pods();
let deadline = Instant::now() + timeout;
loop {
let pod = api.get(name).await?;
let status = pod.status.as_ref();
let phase = status.and_then(|s| s.phase.as_deref()).unwrap_or("Unknown");
if let "Failed" | "Succeeded" = phase {
return Err(K8sError::Other(format!(
"pod {name} reached terminal phase {phase} during startup"
)));
}
let ip = status
.and_then(|s| s.pod_ip.as_ref())
.filter(|s| !s.is_empty());
if let Some(ip) = ip {
if phase == "Running" {
return Ok(ip.clone());
}
}
if Instant::now() >= deadline {
return Err(K8sError::Timeout(format!(
"pod {name} did not become Running with a podIP within {timeout:?}"
)));
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
pub async fn wait_for_tcp(ip: &str, port: u16, timeout: Duration) -> Result<(), K8sError> {
const PER_ATTEMPT: Duration = Duration::from_secs(2);
let deadline = Instant::now() + timeout;
let addr = format!("{ip}:{port}");
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
let attempt_budget = remaining.min(PER_ATTEMPT);
if !attempt_budget.is_zero() {
if let Ok(Ok(_)) =
tokio::time::timeout(attempt_budget, tokio::net::TcpStream::connect(&addr))
.await
{
return Ok(());
}
}
if Instant::now() >= deadline {
return Err(K8sError::Timeout(format!(
"{addr} did not accept connections within {timeout:?}"
)));
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
pub async fn exec(
&self,
pod: &str,
container: Option<&str>,
cmd: &[&str],
) -> Result<ExecOutput, K8sError> {
let api = self.pods();
let mut ap = AttachParams::default()
.stdin(false)
.stdout(true)
.stderr(true);
if let Some(c) = container {
ap = ap.container(c.to_string());
}
let mut proc = api.exec(pod, cmd.iter().copied(), &ap).await?;
let mut stdout = Vec::new();
if let Some(mut s) = proc.stdout() {
s.read_to_end(&mut stdout)
.await
.map_err(|e| K8sError::Other(format!("reading exec stdout: {e}")))?;
}
let mut stderr_buf = Vec::new();
if let Some(mut s) = proc.stderr() {
let _ = s.read_to_end(&mut stderr_buf).await;
}
let status = match proc.take_status() {
Some(fut) => fut.await,
None => None,
};
let _ = proc.join().await;
Ok(ExecOutput {
stdout,
stderr: String::from_utf8_lossy(&stderr_buf).into_owned(),
exit_code: exit_code_from_status(status.as_ref()),
})
}
pub async fn exec_with_stdin(
&self,
pod: &str,
container: Option<&str>,
cmd: &[&str],
stdin: &[u8],
) -> Result<ExecOutput, K8sError> {
use tokio::io::AsyncWriteExt;
let api = self.pods();
let mut ap = AttachParams::default()
.stdin(true)
.stdout(true)
.stderr(true);
if let Some(c) = container {
ap = ap.container(c.to_string());
}
let mut proc = api.exec(pod, cmd.iter().copied(), &ap).await?;
if let Some(mut w) = proc.stdin() {
w.write_all(stdin)
.await
.map_err(|e| K8sError::Other(format!("writing exec stdin: {e}")))?;
w.shutdown()
.await
.map_err(|e| K8sError::Other(format!("closing exec stdin: {e}")))?;
}
let mut stdout = Vec::new();
if let Some(mut s) = proc.stdout() {
s.read_to_end(&mut stdout)
.await
.map_err(|e| K8sError::Other(format!("reading exec stdout: {e}")))?;
}
let mut stderr_buf = Vec::new();
if let Some(mut s) = proc.stderr() {
let _ = s.read_to_end(&mut stderr_buf).await;
}
let status = match proc.take_status() {
Some(fut) => fut.await,
None => None,
};
let _ = proc.join().await;
Ok(ExecOutput {
stdout,
stderr: String::from_utf8_lossy(&stderr_buf).into_owned(),
exit_code: exit_code_from_status(status.as_ref()),
})
}
pub async fn pod_logs(&self, pod: &str, container: Option<&str>) -> Result<String, K8sError> {
use kube::api::LogParams;
let api = self.pods();
let lp = LogParams {
container: container.map(|c| c.to_string()),
..LogParams::default()
};
Ok(api.logs(pod, &lp).await?)
}
pub async fn delete_pod(&self, name: &str) {
let api = self.pods();
if let Err(e) = api.delete(name, &DeleteParams::default()).await {
if let kube::Error::Api(api_err) = &e {
if api_err.code == 404 {
return;
}
}
tracing::warn!(pod = %name, namespace = %self.namespace, error = %e, "k8s delete pod failed");
}
}
pub async fn reap_stale(&self, service: &str) -> usize {
let api = self.pods();
let selector = format!(
"{}={},{}={}",
labels::MANAGED_BY,
labels::MANAGED_BY_VALUE,
labels::SERVICE,
service
);
let lp = ListParams::default().labels(&selector);
let list = match api.list(&lp).await {
Ok(l) => l,
Err(e) => {
tracing::warn!(service, error = %e, "k8s reap_stale: list pods failed");
return 0;
}
};
let mut reaped = 0usize;
for pod in list.items {
let inst = pod
.metadata
.labels
.as_ref()
.and_then(|l| l.get(labels::INSTANCE))
.map(String::as_str);
if inst == Some(self.instance_id.as_str()) {
continue;
}
if let Some(name) = pod.metadata.name {
if let Err(e) = api.delete(&name, &DeleteParams::default()).await {
tracing::warn!(pod = %name, error = %e, "k8s reap_stale: delete failed");
} else {
reaped += 1;
}
}
}
if reaped > 0 {
tracing::info!(service, reaped, "k8s reap_stale: removed orphan Pods");
}
reaped
}
pub fn network_policies(&self) -> Api<NetworkPolicy> {
Api::namespaced(self.client.clone(), &self.namespace)
}
pub async fn apply_network_policy(&self, np: &NetworkPolicy) {
let Some(name) = np.metadata.name.clone() else {
return;
};
let api = self.network_policies();
let _ = api.delete(&name, &DeleteParams::default()).await;
if let Err(e) = api.create(&PostParams::default(), np).await {
if !matches!(&e, kube::Error::Api(a) if a.code == 409) {
tracing::warn!(policy = %name, error = %e, "k8s apply NetworkPolicy failed");
}
}
}
pub async fn prune_network_policies(&self, keep: &std::collections::HashSet<String>) {
let api = self.network_policies();
let selector = format!(
"{}={},{}={}",
labels::MANAGED_BY,
labels::MANAGED_BY_VALUE,
labels::INSTANCE,
self.instance_id,
);
let lp = ListParams::default().labels(&selector);
let list = match api.list(&lp).await {
Ok(l) => l,
Err(e) => {
tracing::warn!(error = %e, "k8s prune NetworkPolicies: list failed");
return;
}
};
for np in list.items {
if let Some(name) = np.metadata.name {
if !keep.contains(&name) {
let _ = api.delete(&name, &DeleteParams::default()).await;
}
}
}
}
pub async fn cni_component_names(&self) -> Vec<String> {
const CNI_NAMESPACES: [&str; 4] =
["kube-system", "calico-system", "tigera-operator", "cilium"];
let mut names = Vec::new();
for ns in CNI_NAMESPACES {
let api: Api<Pod> = Api::namespaced(self.client.clone(), ns);
match api.list(&ListParams::default()).await {
Ok(list) => names.extend(list.items.into_iter().filter_map(|p| p.metadata.name)),
Err(e) => {
tracing::debug!(namespace = ns, error = %e, "k8s CNI detect: list pods failed");
}
}
}
names
}
}
pub fn ensure_crypto_provider() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
let _ = rustls::crypto::ring::default_provider().install_default();
});
}
fn exit_code_from_status(status: Option<&Status>) -> Option<i32> {
let status = status?;
match status.status.as_deref() {
Some("Success") => Some(0),
Some("Failure") => status
.details
.as_ref()
.and_then(|d| d.causes.as_ref())
.and_then(|causes| {
causes
.iter()
.find(|c| c.reason.as_deref() == Some("ExitCode"))
})
.and_then(|c| c.message.as_ref())
.and_then(|m| m.parse::<i32>().ok())
.or(Some(1)),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{StatusCause, StatusDetails};
fn status(state: &str, causes: Option<Vec<StatusCause>>) -> Status {
Status {
status: Some(state.to_string()),
details: causes.map(|c| StatusDetails {
causes: Some(c),
..Default::default()
}),
..Default::default()
}
}
#[test]
fn success_status_is_exit_zero() {
assert_eq!(
exit_code_from_status(Some(&status("Success", None))),
Some(0)
);
}
#[test]
fn failure_with_exit_code_cause_parses_code() {
let causes = vec![StatusCause {
reason: Some("ExitCode".into()),
message: Some("137".into()),
..Default::default()
}];
assert_eq!(
exit_code_from_status(Some(&status("Failure", Some(causes)))),
Some(137)
);
}
#[test]
fn failure_without_cause_defaults_to_one() {
assert_eq!(
exit_code_from_status(Some(&status("Failure", None))),
Some(1)
);
}
#[test]
fn missing_status_is_none() {
assert_eq!(exit_code_from_status(None), None);
}
#[test]
fn exec_output_helpers() {
let out = ExecOutput {
stdout: b"hello".to_vec(),
stderr: String::new(),
exit_code: Some(0),
};
assert!(out.success());
assert_eq!(out.stdout_str(), "hello");
}
}