Skip to main content

fakecloud_k8s/
client.rs

1//! Kube client wrapper + generic Pod lifecycle shared by every backend.
2//!
3//! Wraps a [`kube::Client`] scoped to one namespace and provides the
4//! handful of operations every FakeCloud k8s backend performs: create a
5//! Pod (idempotently, replacing any stale same-named Pod), wait for it
6//! to get an IP and become reachable, `exec` a command inside it, delete
7//! it, and reap Pods orphaned by a previous process. Service-specific
8//! Pod *construction* stays in each service; this is only the plumbing.
9
10use std::time::{Duration, Instant};
11
12use k8s_openapi::api::core::v1::Pod;
13use k8s_openapi::api::networking::v1::NetworkPolicy;
14use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;
15use kube::api::{Api, AttachParams, DeleteParams, ListParams, PostParams};
16use kube::Client;
17use tokio::io::AsyncReadExt;
18
19use crate::labels;
20
21/// Errors from Pod lifecycle operations.
22#[derive(Debug, thiserror::Error)]
23pub enum K8sError {
24    #[error("kubernetes API error: {0}")]
25    Kube(#[from] kube::Error),
26    #[error("failed to construct Kubernetes client: {0}")]
27    Connect(String),
28    #[error("timed out: {0}")]
29    Timeout(String),
30    #[error("{0}")]
31    Other(String),
32}
33
34/// Output of an [`K8sClient::exec`] call.
35#[derive(Debug, Clone)]
36pub struct ExecOutput {
37    /// Bytes written to the command's stdout.
38    pub stdout: Vec<u8>,
39    /// The command's stderr, decoded lossily as UTF-8.
40    pub stderr: String,
41    /// The command's exit code, if Kubernetes reported one. `Some(0)` on
42    /// success, `Some(n)` on a non-zero exit, `None` if the status was
43    /// unparseable.
44    pub exit_code: Option<i32>,
45}
46
47impl ExecOutput {
48    /// Whether the command exited successfully (exit code 0).
49    pub fn success(&self) -> bool {
50        self.exit_code == Some(0)
51    }
52
53    /// Stdout decoded lossily as UTF-8.
54    pub fn stdout_str(&self) -> std::borrow::Cow<'_, str> {
55        String::from_utf8_lossy(&self.stdout)
56    }
57}
58
59/// A namespaced kube client plus this process's instance identity.
60#[derive(Clone)]
61pub struct K8sClient {
62    client: Client,
63    namespace: String,
64    instance_id: String,
65}
66
67impl K8sClient {
68    /// Install the rustls CryptoProvider, connect a client from the
69    /// ambient config (in-cluster ServiceAccount or kubeconfig), and
70    /// scope it to `namespace`.
71    pub async fn connect(namespace: impl Into<String>) -> Result<Self, K8sError> {
72        ensure_crypto_provider();
73        let client = Client::try_default()
74            .await
75            .map_err(|e| K8sError::Connect(e.to_string()))?;
76        Ok(Self::from_client(client, namespace.into()))
77    }
78
79    /// Wrap an already-constructed client (used by tests and callers that
80    /// share a client across backends).
81    pub fn from_client(client: Client, namespace: String) -> Self {
82        Self {
83            client,
84            namespace,
85            instance_id: labels::instance_id(),
86        }
87    }
88
89    /// The namespace Pods are created in.
90    pub fn namespace(&self) -> &str {
91        &self.namespace
92    }
93
94    /// This process's instance identity (`fakecloud-<pid>`), used for the
95    /// [`labels::INSTANCE`] label.
96    pub fn instance_id(&self) -> &str {
97        &self.instance_id
98    }
99
100    /// The underlying client, for callers that need raw API access.
101    pub fn client(&self) -> &Client {
102        &self.client
103    }
104
105    /// Namespaced Pod API handle.
106    pub fn pods(&self) -> Api<Pod> {
107        Api::namespaced(self.client.clone(), &self.namespace)
108    }
109
110    /// Create `pod`, first deleting any stale Pod with the same name left
111    /// behind by a previous process (which would otherwise make `create`
112    /// return `409 Conflict`). Retries the create a few times while the
113    /// API server finishes deleting the old Pod.
114    pub async fn create_pod(&self, pod: &Pod) -> Result<(), K8sError> {
115        let name = pod
116            .metadata
117            .name
118            .clone()
119            .ok_or_else(|| K8sError::Other("pod spec has no metadata.name".into()))?;
120        let api = self.pods();
121        let _ = api.delete(&name, &DeleteParams::default()).await;
122        for attempt in 0..6 {
123            match api.create(&PostParams::default(), pod).await {
124                Ok(_) => return Ok(()),
125                Err(kube::Error::Api(e)) if e.code == 409 && attempt < 5 => {
126                    tokio::time::sleep(Duration::from_millis(500)).await;
127                    let _ = api.delete(&name, &DeleteParams::default()).await;
128                    continue;
129                }
130                Err(e) => return Err(K8sError::Kube(e)),
131            }
132        }
133        Err(K8sError::Timeout(format!(
134            "pod {name} could not be created after repeated 409 conflicts"
135        )))
136    }
137
138    /// Poll until the Pod has a non-empty `status.podIP` and phase
139    /// `Running`, returning the IP. Errors if the Pod reaches a terminal
140    /// phase (`Failed`/`Succeeded`) during startup or if `timeout`
141    /// elapses first.
142    pub async fn wait_for_pod_ip(&self, name: &str, timeout: Duration) -> Result<String, K8sError> {
143        let api = self.pods();
144        let deadline = Instant::now() + timeout;
145        loop {
146            let pod = api.get(name).await?;
147            let status = pod.status.as_ref();
148            let phase = status.and_then(|s| s.phase.as_deref()).unwrap_or("Unknown");
149            // Fail fast on a terminal phase regardless of whether an IP was
150            // ever assigned — a Pod that crashes before getting an IP should
151            // error immediately, not wait out the full timeout.
152            if let "Failed" | "Succeeded" = phase {
153                return Err(K8sError::Other(format!(
154                    "pod {name} reached terminal phase {phase} during startup"
155                )));
156            }
157            let ip = status
158                .and_then(|s| s.pod_ip.as_ref())
159                .filter(|s| !s.is_empty());
160            if let Some(ip) = ip {
161                if phase == "Running" {
162                    return Ok(ip.clone());
163                }
164            }
165            if Instant::now() >= deadline {
166                return Err(K8sError::Timeout(format!(
167                    "pod {name} did not become Running with a podIP within {timeout:?}"
168                )));
169            }
170            tokio::time::sleep(Duration::from_secs(1)).await;
171        }
172    }
173
174    /// TCP-handshake `ip:port` until it accepts a connection or `timeout`
175    /// elapses. A Pod being `Running` doesn't guarantee the process
176    /// inside it is listening yet, so backends follow [`wait_for_pod_ip`]
177    /// with this.
178    ///
179    /// [`wait_for_pod_ip`]: Self::wait_for_pod_ip
180    pub async fn wait_for_tcp(ip: &str, port: u16, timeout: Duration) -> Result<(), K8sError> {
181        // Bound each connect so a single hung handshake (SYN dropped, no
182        // RST) can't run past the overall deadline. The kernel's default
183        // connect timeout is tens of seconds, well beyond `timeout`.
184        const PER_ATTEMPT: Duration = Duration::from_secs(2);
185        let deadline = Instant::now() + timeout;
186        let addr = format!("{ip}:{port}");
187        loop {
188            let remaining = deadline.saturating_duration_since(Instant::now());
189            let attempt_budget = remaining.min(PER_ATTEMPT);
190            if !attempt_budget.is_zero() {
191                if let Ok(Ok(_)) =
192                    tokio::time::timeout(attempt_budget, tokio::net::TcpStream::connect(&addr))
193                        .await
194                {
195                    return Ok(());
196                }
197            }
198            if Instant::now() >= deadline {
199                return Err(K8sError::Timeout(format!(
200                    "{addr} did not accept connections within {timeout:?}"
201                )));
202            }
203            tokio::time::sleep(Duration::from_millis(500)).await;
204        }
205    }
206
207    /// Run `cmd` inside `pod` (in `container`, or the default container
208    /// when `None`) and collect stdout/stderr/exit-code. This is the k8s
209    /// equivalent of `docker exec` — used for operations like issuing
210    /// `redis-cli` commands or copying a file out of a Pod.
211    pub async fn exec(
212        &self,
213        pod: &str,
214        container: Option<&str>,
215        cmd: &[&str],
216    ) -> Result<ExecOutput, K8sError> {
217        let api = self.pods();
218        let mut ap = AttachParams::default()
219            .stdin(false)
220            .stdout(true)
221            .stderr(true);
222        if let Some(c) = container {
223            ap = ap.container(c.to_string());
224        }
225        let mut proc = api.exec(pod, cmd.iter().copied(), &ap).await?;
226
227        let mut stdout = Vec::new();
228        if let Some(mut s) = proc.stdout() {
229            s.read_to_end(&mut stdout)
230                .await
231                .map_err(|e| K8sError::Other(format!("reading exec stdout: {e}")))?;
232        }
233        let mut stderr_buf = Vec::new();
234        if let Some(mut s) = proc.stderr() {
235            // stderr being unreadable shouldn't mask a successful command.
236            let _ = s.read_to_end(&mut stderr_buf).await;
237        }
238        let status = match proc.take_status() {
239            Some(fut) => fut.await,
240            None => None,
241        };
242        // Drain the connection so the websocket closes cleanly.
243        let _ = proc.join().await;
244
245        Ok(ExecOutput {
246            stdout,
247            stderr: String::from_utf8_lossy(&stderr_buf).into_owned(),
248            exit_code: exit_code_from_status(status.as_ref()),
249        })
250    }
251
252    /// Like [`exec`](Self::exec) but writes `stdin` to the command's
253    /// standard input first (then closes it). Used for piping a SQL dump
254    /// into `psql`/`mysql` during a restore — the k8s equivalent of
255    /// `docker exec -i ... < dump`.
256    pub async fn exec_with_stdin(
257        &self,
258        pod: &str,
259        container: Option<&str>,
260        cmd: &[&str],
261        stdin: &[u8],
262    ) -> Result<ExecOutput, K8sError> {
263        use tokio::io::AsyncWriteExt;
264        let api = self.pods();
265        let mut ap = AttachParams::default()
266            .stdin(true)
267            .stdout(true)
268            .stderr(true);
269        if let Some(c) = container {
270            ap = ap.container(c.to_string());
271        }
272        let mut proc = api.exec(pod, cmd.iter().copied(), &ap).await?;
273
274        if let Some(mut w) = proc.stdin() {
275            w.write_all(stdin)
276                .await
277                .map_err(|e| K8sError::Other(format!("writing exec stdin: {e}")))?;
278            w.shutdown()
279                .await
280                .map_err(|e| K8sError::Other(format!("closing exec stdin: {e}")))?;
281        }
282
283        let mut stdout = Vec::new();
284        if let Some(mut s) = proc.stdout() {
285            s.read_to_end(&mut stdout)
286                .await
287                .map_err(|e| K8sError::Other(format!("reading exec stdout: {e}")))?;
288        }
289        let mut stderr_buf = Vec::new();
290        if let Some(mut s) = proc.stderr() {
291            let _ = s.read_to_end(&mut stderr_buf).await;
292        }
293        let status = match proc.take_status() {
294            Some(fut) => fut.await,
295            None => None,
296        };
297        let _ = proc.join().await;
298
299        Ok(ExecOutput {
300            stdout,
301            stderr: String::from_utf8_lossy(&stderr_buf).into_owned(),
302            exit_code: exit_code_from_status(status.as_ref()),
303        })
304    }
305
306    /// Fetch a Pod container's logs (the k8s equivalent of `docker logs`)
307    /// — used for log-marker readiness on engines that don't expose a
308    /// connect-based probe (Oracle / SQL Server / Db2).
309    pub async fn pod_logs(&self, pod: &str, container: Option<&str>) -> Result<String, K8sError> {
310        use kube::api::LogParams;
311        let api = self.pods();
312        let lp = LogParams {
313            container: container.map(|c| c.to_string()),
314            ..LogParams::default()
315        };
316        Ok(api.logs(pod, &lp).await?)
317    }
318
319    /// Delete a Pod by name. Idempotent — a `404` (already gone) is
320    /// treated as success; other errors are logged but not returned,
321    /// since teardown is best-effort.
322    pub async fn delete_pod(&self, name: &str) {
323        let api = self.pods();
324        if let Err(e) = api.delete(name, &DeleteParams::default()).await {
325            if let kube::Error::Api(api_err) = &e {
326                if api_err.code == 404 {
327                    return;
328                }
329            }
330            tracing::warn!(pod = %name, namespace = %self.namespace, error = %e, "k8s delete pod failed");
331        }
332    }
333
334    /// Delete Pods of the given `service` left behind by a *different*
335    /// process. Lists Pods labelled with both [`labels::MANAGED_BY`] and
336    /// the `service` value, and deletes those whose [`labels::INSTANCE`]
337    /// differs from this process's. Mirrors the Docker reaper so a
338    /// restart doesn't leak the previous run's Pods. Returns the count
339    /// reaped.
340    pub async fn reap_stale(&self, service: &str) -> usize {
341        let api = self.pods();
342        let selector = format!(
343            "{}={},{}={}",
344            labels::MANAGED_BY,
345            labels::MANAGED_BY_VALUE,
346            labels::SERVICE,
347            service
348        );
349        let lp = ListParams::default().labels(&selector);
350        let list = match api.list(&lp).await {
351            Ok(l) => l,
352            Err(e) => {
353                tracing::warn!(service, error = %e, "k8s reap_stale: list pods failed");
354                return 0;
355            }
356        };
357        let mut reaped = 0usize;
358        for pod in list.items {
359            let inst = pod
360                .metadata
361                .labels
362                .as_ref()
363                .and_then(|l| l.get(labels::INSTANCE))
364                .map(String::as_str);
365            if inst == Some(self.instance_id.as_str()) {
366                continue;
367            }
368            if let Some(name) = pod.metadata.name {
369                if let Err(e) = api.delete(&name, &DeleteParams::default()).await {
370                    tracing::warn!(pod = %name, error = %e, "k8s reap_stale: delete failed");
371                } else {
372                    reaped += 1;
373                }
374            }
375        }
376        if reaped > 0 {
377            tracing::info!(service, reaped, "k8s reap_stale: removed orphan Pods");
378        }
379        reaped
380    }
381
382    /// Namespaced NetworkPolicy API handle.
383    pub fn network_policies(&self) -> Api<NetworkPolicy> {
384        Api::namespaced(self.client.clone(), &self.namespace)
385    }
386
387    /// Create or replace a NetworkPolicy (delete-then-create, like
388    /// [`create_pod`](Self::create_pod), so a re-apply with changed rules
389    /// always lands). Best-effort: errors are logged, not propagated, since a
390    /// failed policy apply must never fail the originating EC2 API call.
391    pub async fn apply_network_policy(&self, np: &NetworkPolicy) {
392        let Some(name) = np.metadata.name.clone() else {
393            return;
394        };
395        let api = self.network_policies();
396        let _ = api.delete(&name, &DeleteParams::default()).await;
397        if let Err(e) = api.create(&PostParams::default(), np).await {
398            // A concurrent re-apply may have recreated it; a 409 is benign.
399            if !matches!(&e, kube::Error::Api(a) if a.code == 409) {
400                tracing::warn!(policy = %name, error = %e, "k8s apply NetworkPolicy failed");
401            }
402        }
403    }
404
405    /// Delete every NetworkPolicy owned by this process (managed-by + this
406    /// instance label) whose name is not in `keep`. Prunes policies for
407    /// instances that have since terminated. Best-effort.
408    pub async fn prune_network_policies(&self, keep: &std::collections::HashSet<String>) {
409        let api = self.network_policies();
410        let selector = format!(
411            "{}={},{}={}",
412            labels::MANAGED_BY,
413            labels::MANAGED_BY_VALUE,
414            labels::INSTANCE,
415            self.instance_id,
416        );
417        let lp = ListParams::default().labels(&selector);
418        let list = match api.list(&lp).await {
419            Ok(l) => l,
420            Err(e) => {
421                tracing::warn!(error = %e, "k8s prune NetworkPolicies: list failed");
422                return;
423            }
424        };
425        for np in list.items {
426            if let Some(name) = np.metadata.name {
427                if !keep.contains(&name) {
428                    let _ = api.delete(&name, &DeleteParams::default()).await;
429                }
430            }
431        }
432    }
433
434    /// Best-effort detection of the cluster CNI from Pod names across the
435    /// namespaces CNIs commonly install into (e.g. `calico-node-*`, `cilium-*`,
436    /// `kindnet-*`). Returns the matched component names; the caller maps them
437    /// to a driver. An empty result (lists failed or no recognizable CNI) maps
438    /// to "unknown".
439    ///
440    /// Scans `kube-system` plus the operator namespaces Calico/Cilium use
441    /// (`calico-system`, `tigera-operator`, `cilium`) so a Tigera-operator or
442    /// dedicated-namespace install isn't mis-reported as non-enforcing
443    /// (bug-hunt 2026-06-18 finding 1.6). Per-namespace list errors (RBAC /
444    /// absent namespace) are swallowed.
445    pub async fn cni_component_names(&self) -> Vec<String> {
446        const CNI_NAMESPACES: [&str; 4] =
447            ["kube-system", "calico-system", "tigera-operator", "cilium"];
448        let mut names = Vec::new();
449        for ns in CNI_NAMESPACES {
450            let api: Api<Pod> = Api::namespaced(self.client.clone(), ns);
451            match api.list(&ListParams::default()).await {
452                Ok(list) => names.extend(list.items.into_iter().filter_map(|p| p.metadata.name)),
453                Err(e) => {
454                    tracing::debug!(namespace = ns, error = %e, "k8s CNI detect: list pods failed");
455                }
456            }
457        }
458        names
459    }
460}
461
462/// Install rustls' `ring` CryptoProvider once per process. Rustls 0.23
463/// dropped the implicit default and every TLS connection panics until
464/// one is installed; kube's `rustls-tls` feature doesn't pull one in on
465/// our behalf. Safe to call concurrently and repeatedly — the `.ok()`
466/// swallows the "already installed" error.
467pub fn ensure_crypto_provider() {
468    use std::sync::Once;
469    static INIT: Once = Once::new();
470    INIT.call_once(|| {
471        let _ = rustls::crypto::ring::default_provider().install_default();
472    });
473}
474
475/// Derive an exit code from the terminal `Status` Kubernetes returns for
476/// an `exec`. `Success` -> 0; a `Failure` carries the real code in
477/// `details.causes[reason=ExitCode].message`, falling back to 1 when the
478/// cause is absent.
479fn exit_code_from_status(status: Option<&Status>) -> Option<i32> {
480    let status = status?;
481    match status.status.as_deref() {
482        Some("Success") => Some(0),
483        Some("Failure") => status
484            .details
485            .as_ref()
486            .and_then(|d| d.causes.as_ref())
487            .and_then(|causes| {
488                causes
489                    .iter()
490                    .find(|c| c.reason.as_deref() == Some("ExitCode"))
491            })
492            .and_then(|c| c.message.as_ref())
493            .and_then(|m| m.parse::<i32>().ok())
494            .or(Some(1)),
495        _ => None,
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502    use k8s_openapi::apimachinery::pkg::apis::meta::v1::{StatusCause, StatusDetails};
503
504    fn status(state: &str, causes: Option<Vec<StatusCause>>) -> Status {
505        Status {
506            status: Some(state.to_string()),
507            details: causes.map(|c| StatusDetails {
508                causes: Some(c),
509                ..Default::default()
510            }),
511            ..Default::default()
512        }
513    }
514
515    #[test]
516    fn success_status_is_exit_zero() {
517        assert_eq!(
518            exit_code_from_status(Some(&status("Success", None))),
519            Some(0)
520        );
521    }
522
523    #[test]
524    fn failure_with_exit_code_cause_parses_code() {
525        let causes = vec![StatusCause {
526            reason: Some("ExitCode".into()),
527            message: Some("137".into()),
528            ..Default::default()
529        }];
530        assert_eq!(
531            exit_code_from_status(Some(&status("Failure", Some(causes)))),
532            Some(137)
533        );
534    }
535
536    #[test]
537    fn failure_without_cause_defaults_to_one() {
538        assert_eq!(
539            exit_code_from_status(Some(&status("Failure", None))),
540            Some(1)
541        );
542    }
543
544    #[test]
545    fn missing_status_is_none() {
546        assert_eq!(exit_code_from_status(None), None);
547    }
548
549    #[test]
550    fn exec_output_helpers() {
551        let out = ExecOutput {
552            stdout: b"hello".to_vec(),
553            stderr: String::new(),
554            exit_code: Some(0),
555        };
556        assert!(out.success());
557        assert_eq!(out.stdout_str(), "hello");
558    }
559}