Skip to main content

fakecloud_lambda/runtime/k8s/
mod.rs

1//! Kubernetes [`LambdaBackend`] implementation.
2//!
3//! Spawns Lambda function runtimes as native Pods in a Kubernetes
4//! cluster instead of as Docker containers. Gated by
5//! `FAKECLOUD_LAMBDA_BACKEND=k8s` on the fakecloud server.
6//!
7//! See `website/content/docs/guides/kubernetes-backend.md` for the
8//! operator-facing setup (ServiceAccount, RBAC, Deployment yaml).
9
10pub mod spec;
11
12use std::time::Duration;
13
14use async_trait::async_trait;
15use k8s_openapi::api::core::v1::Pod;
16use kube::api::{Api, DeleteParams, ListParams, PostParams};
17use kube::Client;
18
19use super::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
20use crate::state::LambdaFunction;
21use spec::{build_pod_spec, pod_name_for, PodSpecContext};
22
23/// Errors that can prevent the K8s backend from initializing. Surfaced
24/// to the operator at fakecloud startup; never silently swallowed.
25#[derive(Debug, thiserror::Error)]
26pub enum K8sBackendError {
27    #[error("FAKECLOUD_K8S_SELF_URL must be set when FAKECLOUD_LAMBDA_BACKEND=k8s")]
28    MissingSelfUrl,
29    #[error("FAKECLOUD_K8S_SELF_URL is not a valid URL: {0}")]
30    InvalidSelfUrl(String),
31    #[error("failed to construct Kubernetes client: {0}")]
32    Client(#[from] kube::Error),
33    #[error("failed to read kubeconfig / in-cluster config: {0}")]
34    Config(String),
35}
36
37/// Native Kubernetes Lambda execution backend.
38pub struct K8sBackend {
39    client: Client,
40    namespace: String,
41    instance_id: String,
42    /// In-cluster URL of the fakecloud server (e.g.
43    /// `http://fakecloud.fakecloud.svc.cluster.local:4566`). Init
44    /// containers fetch code + layers from this host.
45    self_url: String,
46    /// Just the host part of `self_url` — used to rewrite localhost env
47    /// values so user code can talk to fakecloud from inside the Pod.
48    self_host: String,
49    /// Host:port for the fakecloud ECR endpoint (defaults to the host
50    /// of `self_url` when `FAKECLOUD_K8S_ECR_URL` is unset).
51    ecr_host: String,
52    ecr_port: u16,
53    /// Bearer token the init container presents when fetching code +
54    /// layers. Generated at server startup, kept in process memory only.
55    internal_token: String,
56    /// Optional `imagePullSecrets` reference for image-package functions
57    /// that pull from a registry needing credentials.
58    pull_secret: Option<String>,
59}
60
61impl K8sBackend {
62    /// Read configuration from env vars and connect to the cluster.
63    /// Fails fast on missing required config — never silently degrades.
64    /// `default_ecr_port` is fakecloud's bound port; used as the ECR
65    /// port when `FAKECLOUD_K8S_ECR_URL` is unset.
66    pub async fn from_env(
67        default_ecr_port: u16,
68        internal_token: String,
69    ) -> Result<Self, K8sBackendError> {
70        ensure_crypto_provider();
71        let self_url =
72            std::env::var("FAKECLOUD_K8S_SELF_URL").map_err(|_| K8sBackendError::MissingSelfUrl)?;
73        let parsed = reqwest::Url::parse(&self_url)
74            .map_err(|e| K8sBackendError::InvalidSelfUrl(e.to_string()))?;
75        let self_host = parsed
76            .host_str()
77            .ok_or_else(|| K8sBackendError::InvalidSelfUrl("missing host".into()))?
78            .to_string();
79        let self_port = parsed.port_or_known_default().unwrap_or(default_ecr_port);
80
81        let (ecr_host, ecr_port) = match std::env::var("FAKECLOUD_K8S_ECR_URL").ok() {
82            Some(raw) => {
83                let u = reqwest::Url::parse(&raw)
84                    .map_err(|e| K8sBackendError::InvalidSelfUrl(e.to_string()))?;
85                let h = u
86                    .host_str()
87                    .ok_or_else(|| K8sBackendError::InvalidSelfUrl("ECR url missing host".into()))?
88                    .to_string();
89                let p = u.port_or_known_default().unwrap_or(default_ecr_port);
90                (h, p)
91            }
92            None => (self_host.clone(), self_port),
93        };
94
95        let namespace =
96            std::env::var("FAKECLOUD_K8S_NAMESPACE").unwrap_or_else(|_| "default".to_string());
97        let pull_secret = std::env::var("FAKECLOUD_K8S_PULL_SECRET").ok();
98
99        let client = Client::try_default()
100            .await
101            .map_err(|e| K8sBackendError::Config(e.to_string()))?;
102
103        let instance_id = format!("fakecloud-{}", std::process::id());
104
105        tracing::info!(
106            namespace = %namespace,
107            self_url = %self_url,
108            ecr = %format!("{ecr_host}:{ecr_port}"),
109            "K8s Lambda backend initialized"
110        );
111
112        Ok(Self {
113            client,
114            namespace,
115            instance_id,
116            self_url,
117            self_host,
118            ecr_host,
119            ecr_port,
120            internal_token,
121            pull_secret,
122        })
123    }
124
125    fn pods_api(&self) -> Api<Pod> {
126        Api::namespaced(self.client.clone(), &self.namespace)
127    }
128
129    /// Watch the pod until it has a non-empty `status.podIP`. Polled
130    /// rather than streamed — simpler, and the polling cadence (1s) is
131    /// well below the typical 5-30s pod boot time for an RIE image.
132    async fn wait_for_pod_ip(&self, pod_name: &str) -> Result<String, RuntimeError> {
133        let api = self.pods_api();
134        let deadline = std::time::Instant::now() + Duration::from_secs(60);
135        loop {
136            let pod = api.get(pod_name).await.map_err(|e| {
137                RuntimeError::ContainerStartFailed(format!("k8s get pod {pod_name}: {e}"))
138            })?;
139            if let Some(ip) = pod
140                .status
141                .as_ref()
142                .and_then(|s| s.pod_ip.as_ref())
143                .filter(|s| !s.is_empty())
144            {
145                // Pod might have IP but not yet be Running — check phase
146                let phase = pod
147                    .status
148                    .as_ref()
149                    .and_then(|s| s.phase.as_deref())
150                    .unwrap_or("Unknown");
151                if phase == "Running" {
152                    return Ok(ip.clone());
153                }
154                if phase == "Failed" || phase == "Succeeded" {
155                    return Err(RuntimeError::ContainerStartFailed(format!(
156                        "pod {pod_name} reached terminal phase {phase} during startup"
157                    )));
158                }
159            }
160            if std::time::Instant::now() >= deadline {
161                return Err(RuntimeError::ContainerStartFailed(format!(
162                    "pod {pod_name} did not become Running with podIP within 60s"
163                )));
164            }
165            tokio::time::sleep(Duration::from_secs(1)).await;
166        }
167    }
168
169    /// TCP-handshake the RIE port. Pod-Running doesn't guarantee the
170    /// RIE inside the main container is listening yet — same logic as
171    /// Docker's `wait_for_ready`.
172    async fn wait_for_rie_ready(&self, pod_ip: &str) -> Result<(), RuntimeError> {
173        for _ in 0..20 {
174            tokio::time::sleep(Duration::from_millis(500)).await;
175            if tokio::net::TcpStream::connect(format!("{pod_ip}:8080"))
176                .await
177                .is_ok()
178            {
179                return Ok(());
180            }
181        }
182        Err(RuntimeError::ContainerStartFailed(format!(
183            "RIE on {pod_ip}:8080 did not accept connections within 10s"
184        )))
185    }
186}
187
188/// Install rustls' `ring` CryptoProvider once per process. Rustls
189/// 0.23 dropped the implicit default and every TLS connection now
190/// panics until something installs one. Kube's `rustls-tls` feature
191/// doesn't pull in a provider on our behalf, so we do it here. Safe
192/// to call concurrently; the `.ok()` swallows the "already installed"
193/// error in case another component (a different test, a future
194/// service) beat us to it.
195fn ensure_crypto_provider() {
196    use std::sync::Once;
197    static INIT: Once = Once::new();
198    INIT.call_once(|| {
199        let _ = rustls::crypto::ring::default_provider().install_default();
200    });
201}
202
203/// Extract the account ID from a function ARN
204/// (`arn:aws:lambda:<region>:<account>:function:<name>[:<qual>]`).
205fn account_id_from_arn(arn: &str) -> &str {
206    arn.split(':').nth(4).unwrap_or("000000000000")
207}
208
209#[async_trait]
210impl LambdaBackend for K8sBackend {
211    fn name(&self) -> &str {
212        "kubernetes"
213    }
214
215    async fn launch(
216        &self,
217        func: &LambdaFunction,
218        _code_zip: Option<&[u8]>,
219        _layers: &[Vec<u8>],
220        deploy_id: &str,
221    ) -> Result<WarmInstance, RuntimeError> {
222        let account_id = account_id_from_arn(&func.function_arn);
223        let ctx = PodSpecContext {
224            instance_id: &self.instance_id,
225            namespace: &self.namespace,
226            self_url: &self.self_url,
227            self_host: &self.self_host,
228            ecr_host: &self.ecr_host,
229            ecr_port: self.ecr_port,
230            internal_token: &self.internal_token,
231            account_id,
232            pull_secret: self.pull_secret.as_deref(),
233        };
234        let pod =
235            build_pod_spec(func, deploy_id, &ctx).map_err(RuntimeError::ContainerStartFailed)?;
236        let pod_name = pod
237            .metadata
238            .name
239            .clone()
240            .unwrap_or_else(|| pod_name_for(&func.function_name, deploy_id));
241
242        let api = self.pods_api();
243
244        // Best-effort delete of any stale pod with the same name (e.g.
245        // a previous fakecloud process left it behind). `Created` from
246        // a stale pod would otherwise come back as `Conflict (409)`.
247        let _ = api.delete(&pod_name, &DeleteParams::default()).await;
248        // Give the API server a moment to actually delete; otherwise
249        // create can race and 409. Polling for absence would be more
250        // correct — keep simple for now and retry create on 409.
251        for attempt in 0..6 {
252            match api.create(&PostParams::default(), &pod).await {
253                Ok(_) => break,
254                Err(kube::Error::Api(e)) if e.code == 409 && attempt < 5 => {
255                    tokio::time::sleep(Duration::from_millis(500)).await;
256                    let _ = api.delete(&pod_name, &DeleteParams::default()).await;
257                    continue;
258                }
259                Err(e) => {
260                    return Err(RuntimeError::ContainerStartFailed(format!(
261                        "k8s create pod {pod_name}: {e}"
262                    )));
263                }
264            }
265        }
266
267        let pod_ip = self.wait_for_pod_ip(&pod_name).await.inspect_err(|_| {
268            let api = self.pods_api();
269            let name = pod_name.clone();
270            tokio::spawn(async move {
271                let _ = api.delete(&name, &DeleteParams::default()).await;
272            });
273        })?;
274        self.wait_for_rie_ready(&pod_ip).await.inspect_err(|_| {
275            let api = self.pods_api();
276            let name = pod_name.clone();
277            tokio::spawn(async move {
278                let _ = api.delete(&name, &DeleteParams::default()).await;
279            });
280        })?;
281
282        tracing::info!(
283            function = %func.function_name,
284            pod = %pod_name,
285            namespace = %self.namespace,
286            pod_ip = %pod_ip,
287            "Lambda Pod started"
288        );
289
290        Ok(WarmInstance {
291            endpoint: format!("{pod_ip}:8080"),
292            handle: BackendHandle::Pod {
293                namespace: self.namespace.clone(),
294                name: pod_name,
295            },
296        })
297    }
298
299    async fn terminate(&self, handle: &BackendHandle) {
300        let (ns, name) = match handle {
301            BackendHandle::Pod { namespace, name } => (namespace.clone(), name.clone()),
302            // Docker handles aren't ours to manage — defensive no-op.
303            BackendHandle::Container { .. } => return,
304        };
305        let api: Api<Pod> = Api::namespaced(self.client.clone(), &ns);
306        if let Err(e) = api.delete(&name, &DeleteParams::default()).await {
307            // 404 is normal on idempotent re-delete; surface anything else.
308            if let kube::Error::Api(api_err) = &e {
309                if api_err.code == 404 {
310                    return;
311                }
312            }
313            tracing::warn!(pod = %name, namespace = %ns, error = %e, "k8s delete pod failed");
314        }
315    }
316
317    /// Sweep Lambda Pods that belong to a previous fakecloud process.
318    /// Without this, a fakecloud restart leaks the previous run's Pods
319    /// and `Create` collides on function names. Mirrors the docker
320    /// `reaper` semantics.
321    async fn reap_stale(&self) {
322        let api = self.pods_api();
323        let lp = ListParams::default().labels("fakecloud-managed-by=fakecloud");
324        let list = match api.list(&lp).await {
325            Ok(l) => l,
326            Err(e) => {
327                tracing::warn!(error = %e, "k8s reap_stale: list pods failed");
328                return;
329            }
330        };
331        let mut reaped = 0usize;
332        for pod in list.items {
333            let labels = pod.metadata.labels.as_ref();
334            let inst = labels.and_then(|l| l.get("fakecloud-instance")).cloned();
335            if inst.as_deref() == Some(self.instance_id.as_str()) {
336                continue;
337            }
338            if let Some(name) = pod.metadata.name {
339                if let Err(e) = api.delete(&name, &DeleteParams::default()).await {
340                    tracing::warn!(pod = %name, error = %e, "k8s reap_stale: delete failed");
341                } else {
342                    reaped += 1;
343                }
344            }
345        }
346        if reaped > 0 {
347            tracing::info!(reaped, "k8s reap_stale: removed orphan Lambda Pods");
348        }
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::account_id_from_arn;
355
356    #[test]
357    fn account_id_from_simple_arn() {
358        assert_eq!(
359            account_id_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-fn"),
360            "123456789012"
361        );
362    }
363
364    #[test]
365    fn account_id_from_qualified_arn() {
366        assert_eq!(
367            account_id_from_arn("arn:aws:lambda:us-east-1:000000000000:function:my-fn:PROD"),
368            "000000000000"
369        );
370    }
371
372    #[test]
373    fn account_id_falls_back_for_malformed_arn() {
374        assert_eq!(account_id_from_arn("not-an-arn"), "000000000000");
375    }
376}