1use std::time::{Duration, Instant};
11
12use k8s_openapi::api::core::v1::Pod;
13use k8s_openapi::api::networking::v1::NetworkPolicy;
14use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;
15use kube::api::{Api, AttachParams, DeleteParams, ListParams, PostParams};
16use kube::Client;
17use tokio::io::AsyncReadExt;
18
19use crate::labels;
20
21#[derive(Debug, thiserror::Error)]
23pub enum K8sError {
24 #[error("kubernetes API error: {0}")]
25 Kube(#[from] kube::Error),
26 #[error("failed to construct Kubernetes client: {0}")]
27 Connect(String),
28 #[error("timed out: {0}")]
29 Timeout(String),
30 #[error("{0}")]
31 Other(String),
32}
33
34#[derive(Debug, Clone)]
36pub struct ExecOutput {
37 pub stdout: Vec<u8>,
39 pub stderr: String,
41 pub exit_code: Option<i32>,
45}
46
47impl ExecOutput {
48 pub fn success(&self) -> bool {
50 self.exit_code == Some(0)
51 }
52
53 pub fn stdout_str(&self) -> std::borrow::Cow<'_, str> {
55 String::from_utf8_lossy(&self.stdout)
56 }
57}
58
59#[derive(Clone)]
61pub struct K8sClient {
62 client: Client,
63 namespace: String,
64 instance_id: String,
65}
66
67impl K8sClient {
68 pub async fn connect(namespace: impl Into<String>) -> Result<Self, K8sError> {
72 ensure_crypto_provider();
73 let client = Client::try_default()
74 .await
75 .map_err(|e| K8sError::Connect(e.to_string()))?;
76 Ok(Self::from_client(client, namespace.into()))
77 }
78
79 pub fn from_client(client: Client, namespace: String) -> Self {
82 Self {
83 client,
84 namespace,
85 instance_id: labels::instance_id(),
86 }
87 }
88
89 pub fn namespace(&self) -> &str {
91 &self.namespace
92 }
93
94 pub fn instance_id(&self) -> &str {
97 &self.instance_id
98 }
99
100 pub fn client(&self) -> &Client {
102 &self.client
103 }
104
105 pub fn pods(&self) -> Api<Pod> {
107 Api::namespaced(self.client.clone(), &self.namespace)
108 }
109
110 pub async fn create_pod(&self, pod: &Pod) -> Result<(), K8sError> {
115 let name = pod
116 .metadata
117 .name
118 .clone()
119 .ok_or_else(|| K8sError::Other("pod spec has no metadata.name".into()))?;
120 let api = self.pods();
121 let _ = api.delete(&name, &DeleteParams::default()).await;
122 for attempt in 0..6 {
123 match api.create(&PostParams::default(), pod).await {
124 Ok(_) => return Ok(()),
125 Err(kube::Error::Api(e)) if e.code == 409 && attempt < 5 => {
126 tokio::time::sleep(Duration::from_millis(500)).await;
127 let _ = api.delete(&name, &DeleteParams::default()).await;
128 continue;
129 }
130 Err(e) => return Err(K8sError::Kube(e)),
131 }
132 }
133 Err(K8sError::Timeout(format!(
134 "pod {name} could not be created after repeated 409 conflicts"
135 )))
136 }
137
138 pub async fn wait_for_pod_ip(&self, name: &str, timeout: Duration) -> Result<String, K8sError> {
143 let api = self.pods();
144 let deadline = Instant::now() + timeout;
145 loop {
146 let pod = api.get(name).await?;
147 let status = pod.status.as_ref();
148 let phase = status.and_then(|s| s.phase.as_deref()).unwrap_or("Unknown");
149 if let "Failed" | "Succeeded" = phase {
153 return Err(K8sError::Other(format!(
154 "pod {name} reached terminal phase {phase} during startup"
155 )));
156 }
157 let ip = status
158 .and_then(|s| s.pod_ip.as_ref())
159 .filter(|s| !s.is_empty());
160 if let Some(ip) = ip {
161 if phase == "Running" {
162 return Ok(ip.clone());
163 }
164 }
165 if Instant::now() >= deadline {
166 return Err(K8sError::Timeout(format!(
167 "pod {name} did not become Running with a podIP within {timeout:?}"
168 )));
169 }
170 tokio::time::sleep(Duration::from_secs(1)).await;
171 }
172 }
173
174 pub async fn wait_for_tcp(ip: &str, port: u16, timeout: Duration) -> Result<(), K8sError> {
181 const PER_ATTEMPT: Duration = Duration::from_secs(2);
185 let deadline = Instant::now() + timeout;
186 let addr = format!("{ip}:{port}");
187 loop {
188 let remaining = deadline.saturating_duration_since(Instant::now());
189 let attempt_budget = remaining.min(PER_ATTEMPT);
190 if !attempt_budget.is_zero() {
191 if let Ok(Ok(_)) =
192 tokio::time::timeout(attempt_budget, tokio::net::TcpStream::connect(&addr))
193 .await
194 {
195 return Ok(());
196 }
197 }
198 if Instant::now() >= deadline {
199 return Err(K8sError::Timeout(format!(
200 "{addr} did not accept connections within {timeout:?}"
201 )));
202 }
203 tokio::time::sleep(Duration::from_millis(500)).await;
204 }
205 }
206
207 pub async fn exec(
212 &self,
213 pod: &str,
214 container: Option<&str>,
215 cmd: &[&str],
216 ) -> Result<ExecOutput, K8sError> {
217 let api = self.pods();
218 let mut ap = AttachParams::default()
219 .stdin(false)
220 .stdout(true)
221 .stderr(true);
222 if let Some(c) = container {
223 ap = ap.container(c.to_string());
224 }
225 let mut proc = api.exec(pod, cmd.iter().copied(), &ap).await?;
226
227 let mut stdout = Vec::new();
228 if let Some(mut s) = proc.stdout() {
229 s.read_to_end(&mut stdout)
230 .await
231 .map_err(|e| K8sError::Other(format!("reading exec stdout: {e}")))?;
232 }
233 let mut stderr_buf = Vec::new();
234 if let Some(mut s) = proc.stderr() {
235 let _ = s.read_to_end(&mut stderr_buf).await;
237 }
238 let status = match proc.take_status() {
239 Some(fut) => fut.await,
240 None => None,
241 };
242 let _ = proc.join().await;
244
245 Ok(ExecOutput {
246 stdout,
247 stderr: String::from_utf8_lossy(&stderr_buf).into_owned(),
248 exit_code: exit_code_from_status(status.as_ref()),
249 })
250 }
251
252 pub async fn exec_with_stdin(
257 &self,
258 pod: &str,
259 container: Option<&str>,
260 cmd: &[&str],
261 stdin: &[u8],
262 ) -> Result<ExecOutput, K8sError> {
263 use tokio::io::AsyncWriteExt;
264 let api = self.pods();
265 let mut ap = AttachParams::default()
266 .stdin(true)
267 .stdout(true)
268 .stderr(true);
269 if let Some(c) = container {
270 ap = ap.container(c.to_string());
271 }
272 let mut proc = api.exec(pod, cmd.iter().copied(), &ap).await?;
273
274 if let Some(mut w) = proc.stdin() {
275 w.write_all(stdin)
276 .await
277 .map_err(|e| K8sError::Other(format!("writing exec stdin: {e}")))?;
278 w.shutdown()
279 .await
280 .map_err(|e| K8sError::Other(format!("closing exec stdin: {e}")))?;
281 }
282
283 let mut stdout = Vec::new();
284 if let Some(mut s) = proc.stdout() {
285 s.read_to_end(&mut stdout)
286 .await
287 .map_err(|e| K8sError::Other(format!("reading exec stdout: {e}")))?;
288 }
289 let mut stderr_buf = Vec::new();
290 if let Some(mut s) = proc.stderr() {
291 let _ = s.read_to_end(&mut stderr_buf).await;
292 }
293 let status = match proc.take_status() {
294 Some(fut) => fut.await,
295 None => None,
296 };
297 let _ = proc.join().await;
298
299 Ok(ExecOutput {
300 stdout,
301 stderr: String::from_utf8_lossy(&stderr_buf).into_owned(),
302 exit_code: exit_code_from_status(status.as_ref()),
303 })
304 }
305
306 pub async fn pod_logs(&self, pod: &str, container: Option<&str>) -> Result<String, K8sError> {
310 use kube::api::LogParams;
311 let api = self.pods();
312 let lp = LogParams {
313 container: container.map(|c| c.to_string()),
314 ..LogParams::default()
315 };
316 Ok(api.logs(pod, &lp).await?)
317 }
318
319 pub async fn delete_pod(&self, name: &str) {
323 let api = self.pods();
324 if let Err(e) = api.delete(name, &DeleteParams::default()).await {
325 if let kube::Error::Api(api_err) = &e {
326 if api_err.code == 404 {
327 return;
328 }
329 }
330 tracing::warn!(pod = %name, namespace = %self.namespace, error = %e, "k8s delete pod failed");
331 }
332 }
333
334 pub async fn reap_stale(&self, service: &str) -> usize {
341 let api = self.pods();
342 let selector = format!(
343 "{}={},{}={}",
344 labels::MANAGED_BY,
345 labels::MANAGED_BY_VALUE,
346 labels::SERVICE,
347 service
348 );
349 let lp = ListParams::default().labels(&selector);
350 let list = match api.list(&lp).await {
351 Ok(l) => l,
352 Err(e) => {
353 tracing::warn!(service, error = %e, "k8s reap_stale: list pods failed");
354 return 0;
355 }
356 };
357 let mut reaped = 0usize;
358 for pod in list.items {
359 let inst = pod
360 .metadata
361 .labels
362 .as_ref()
363 .and_then(|l| l.get(labels::INSTANCE))
364 .map(String::as_str);
365 if inst == Some(self.instance_id.as_str()) {
366 continue;
367 }
368 if let Some(name) = pod.metadata.name {
369 if let Err(e) = api.delete(&name, &DeleteParams::default()).await {
370 tracing::warn!(pod = %name, error = %e, "k8s reap_stale: delete failed");
371 } else {
372 reaped += 1;
373 }
374 }
375 }
376 if reaped > 0 {
377 tracing::info!(service, reaped, "k8s reap_stale: removed orphan Pods");
378 }
379 reaped
380 }
381
382 pub fn network_policies(&self) -> Api<NetworkPolicy> {
384 Api::namespaced(self.client.clone(), &self.namespace)
385 }
386
387 pub async fn apply_network_policy(&self, np: &NetworkPolicy) {
392 let Some(name) = np.metadata.name.clone() else {
393 return;
394 };
395 let api = self.network_policies();
396 let _ = api.delete(&name, &DeleteParams::default()).await;
397 if let Err(e) = api.create(&PostParams::default(), np).await {
398 if !matches!(&e, kube::Error::Api(a) if a.code == 409) {
400 tracing::warn!(policy = %name, error = %e, "k8s apply NetworkPolicy failed");
401 }
402 }
403 }
404
405 pub async fn prune_network_policies(&self, keep: &std::collections::HashSet<String>) {
409 let api = self.network_policies();
410 let selector = format!(
411 "{}={},{}={}",
412 labels::MANAGED_BY,
413 labels::MANAGED_BY_VALUE,
414 labels::INSTANCE,
415 self.instance_id,
416 );
417 let lp = ListParams::default().labels(&selector);
418 let list = match api.list(&lp).await {
419 Ok(l) => l,
420 Err(e) => {
421 tracing::warn!(error = %e, "k8s prune NetworkPolicies: list failed");
422 return;
423 }
424 };
425 for np in list.items {
426 if let Some(name) = np.metadata.name {
427 if !keep.contains(&name) {
428 let _ = api.delete(&name, &DeleteParams::default()).await;
429 }
430 }
431 }
432 }
433
434 pub async fn cni_component_names(&self) -> Vec<String> {
446 const CNI_NAMESPACES: [&str; 4] =
447 ["kube-system", "calico-system", "tigera-operator", "cilium"];
448 let mut names = Vec::new();
449 for ns in CNI_NAMESPACES {
450 let api: Api<Pod> = Api::namespaced(self.client.clone(), ns);
451 match api.list(&ListParams::default()).await {
452 Ok(list) => names.extend(list.items.into_iter().filter_map(|p| p.metadata.name)),
453 Err(e) => {
454 tracing::debug!(namespace = ns, error = %e, "k8s CNI detect: list pods failed");
455 }
456 }
457 }
458 names
459 }
460}
461
462pub fn ensure_crypto_provider() {
468 use std::sync::Once;
469 static INIT: Once = Once::new();
470 INIT.call_once(|| {
471 let _ = rustls::crypto::ring::default_provider().install_default();
472 });
473}
474
475fn exit_code_from_status(status: Option<&Status>) -> Option<i32> {
480 let status = status?;
481 match status.status.as_deref() {
482 Some("Success") => Some(0),
483 Some("Failure") => status
484 .details
485 .as_ref()
486 .and_then(|d| d.causes.as_ref())
487 .and_then(|causes| {
488 causes
489 .iter()
490 .find(|c| c.reason.as_deref() == Some("ExitCode"))
491 })
492 .and_then(|c| c.message.as_ref())
493 .and_then(|m| m.parse::<i32>().ok())
494 .or(Some(1)),
495 _ => None,
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502 use k8s_openapi::apimachinery::pkg::apis::meta::v1::{StatusCause, StatusDetails};
503
504 fn status(state: &str, causes: Option<Vec<StatusCause>>) -> Status {
505 Status {
506 status: Some(state.to_string()),
507 details: causes.map(|c| StatusDetails {
508 causes: Some(c),
509 ..Default::default()
510 }),
511 ..Default::default()
512 }
513 }
514
515 #[test]
516 fn success_status_is_exit_zero() {
517 assert_eq!(
518 exit_code_from_status(Some(&status("Success", None))),
519 Some(0)
520 );
521 }
522
523 #[test]
524 fn failure_with_exit_code_cause_parses_code() {
525 let causes = vec![StatusCause {
526 reason: Some("ExitCode".into()),
527 message: Some("137".into()),
528 ..Default::default()
529 }];
530 assert_eq!(
531 exit_code_from_status(Some(&status("Failure", Some(causes)))),
532 Some(137)
533 );
534 }
535
536 #[test]
537 fn failure_without_cause_defaults_to_one() {
538 assert_eq!(
539 exit_code_from_status(Some(&status("Failure", None))),
540 Some(1)
541 );
542 }
543
544 #[test]
545 fn missing_status_is_none() {
546 assert_eq!(exit_code_from_status(None), None);
547 }
548
549 #[test]
550 fn exec_output_helpers() {
551 let out = ExecOutput {
552 stdout: b"hello".to_vec(),
553 stderr: String::new(),
554 exit_code: Some(0),
555 };
556 assert!(out.success());
557 assert_eq!(out.stdout_str(), "hello");
558 }
559}