fakecloud_lambda/runtime/k8s/
mod.rs1pub mod spec;
11
12use std::time::Duration;
13
14use async_trait::async_trait;
15use k8s_openapi::api::core::v1::Pod;
16use kube::api::{Api, DeleteParams, ListParams, PostParams};
17use kube::Client;
18
19use super::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
20use crate::state::LambdaFunction;
21use spec::{build_pod_spec, pod_name_for, PodSpecContext};
22
23#[derive(Debug, thiserror::Error)]
26pub enum K8sBackendError {
27 #[error("FAKECLOUD_K8S_SELF_URL must be set when FAKECLOUD_LAMBDA_BACKEND=k8s")]
28 MissingSelfUrl,
29 #[error("FAKECLOUD_K8S_SELF_URL is not a valid URL: {0}")]
30 InvalidSelfUrl(String),
31 #[error("failed to construct Kubernetes client: {0}")]
32 Client(#[from] kube::Error),
33 #[error("failed to read kubeconfig / in-cluster config: {0}")]
34 Config(String),
35}
36
37pub struct K8sBackend {
39 client: Client,
40 namespace: String,
41 instance_id: String,
42 self_url: String,
46 self_host: String,
49 ecr_host: String,
52 ecr_port: u16,
53 internal_token: String,
56 pull_secret: Option<String>,
59}
60
61impl K8sBackend {
62 pub async fn from_env(
67 default_ecr_port: u16,
68 internal_token: String,
69 ) -> Result<Self, K8sBackendError> {
70 ensure_crypto_provider();
71 let self_url =
72 std::env::var("FAKECLOUD_K8S_SELF_URL").map_err(|_| K8sBackendError::MissingSelfUrl)?;
73 let parsed = reqwest::Url::parse(&self_url)
74 .map_err(|e| K8sBackendError::InvalidSelfUrl(e.to_string()))?;
75 let self_host = parsed
76 .host_str()
77 .ok_or_else(|| K8sBackendError::InvalidSelfUrl("missing host".into()))?
78 .to_string();
79 let self_port = parsed.port_or_known_default().unwrap_or(default_ecr_port);
80
81 let (ecr_host, ecr_port) = match std::env::var("FAKECLOUD_K8S_ECR_URL").ok() {
82 Some(raw) => {
83 let u = reqwest::Url::parse(&raw)
84 .map_err(|e| K8sBackendError::InvalidSelfUrl(e.to_string()))?;
85 let h = u
86 .host_str()
87 .ok_or_else(|| K8sBackendError::InvalidSelfUrl("ECR url missing host".into()))?
88 .to_string();
89 let p = u.port_or_known_default().unwrap_or(default_ecr_port);
90 (h, p)
91 }
92 None => (self_host.clone(), self_port),
93 };
94
95 let namespace =
96 std::env::var("FAKECLOUD_K8S_NAMESPACE").unwrap_or_else(|_| "default".to_string());
97 let pull_secret = std::env::var("FAKECLOUD_K8S_PULL_SECRET").ok();
98
99 let client = Client::try_default()
100 .await
101 .map_err(|e| K8sBackendError::Config(e.to_string()))?;
102
103 let instance_id = format!("fakecloud-{}", std::process::id());
104
105 tracing::info!(
106 namespace = %namespace,
107 self_url = %self_url,
108 ecr = %format!("{ecr_host}:{ecr_port}"),
109 "K8s Lambda backend initialized"
110 );
111
112 Ok(Self {
113 client,
114 namespace,
115 instance_id,
116 self_url,
117 self_host,
118 ecr_host,
119 ecr_port,
120 internal_token,
121 pull_secret,
122 })
123 }
124
125 fn pods_api(&self) -> Api<Pod> {
126 Api::namespaced(self.client.clone(), &self.namespace)
127 }
128
129 async fn wait_for_pod_ip(&self, pod_name: &str) -> Result<String, RuntimeError> {
133 let api = self.pods_api();
134 let deadline = std::time::Instant::now() + Duration::from_secs(60);
135 loop {
136 let pod = api.get(pod_name).await.map_err(|e| {
137 RuntimeError::ContainerStartFailed(format!("k8s get pod {pod_name}: {e}"))
138 })?;
139 if let Some(ip) = pod
140 .status
141 .as_ref()
142 .and_then(|s| s.pod_ip.as_ref())
143 .filter(|s| !s.is_empty())
144 {
145 let phase = pod
147 .status
148 .as_ref()
149 .and_then(|s| s.phase.as_deref())
150 .unwrap_or("Unknown");
151 if phase == "Running" {
152 return Ok(ip.clone());
153 }
154 if phase == "Failed" || phase == "Succeeded" {
155 return Err(RuntimeError::ContainerStartFailed(format!(
156 "pod {pod_name} reached terminal phase {phase} during startup"
157 )));
158 }
159 }
160 if std::time::Instant::now() >= deadline {
161 return Err(RuntimeError::ContainerStartFailed(format!(
162 "pod {pod_name} did not become Running with podIP within 60s"
163 )));
164 }
165 tokio::time::sleep(Duration::from_secs(1)).await;
166 }
167 }
168
169 async fn wait_for_rie_ready(&self, pod_ip: &str) -> Result<(), RuntimeError> {
173 for _ in 0..20 {
174 tokio::time::sleep(Duration::from_millis(500)).await;
175 if tokio::net::TcpStream::connect(format!("{pod_ip}:8080"))
176 .await
177 .is_ok()
178 {
179 return Ok(());
180 }
181 }
182 Err(RuntimeError::ContainerStartFailed(format!(
183 "RIE on {pod_ip}:8080 did not accept connections within 10s"
184 )))
185 }
186}
187
188fn ensure_crypto_provider() {
196 use std::sync::Once;
197 static INIT: Once = Once::new();
198 INIT.call_once(|| {
199 let _ = rustls::crypto::ring::default_provider().install_default();
200 });
201}
202
203fn account_id_from_arn(arn: &str) -> &str {
206 arn.split(':').nth(4).unwrap_or("000000000000")
207}
208
209#[async_trait]
210impl LambdaBackend for K8sBackend {
211 fn name(&self) -> &str {
212 "kubernetes"
213 }
214
215 async fn launch(
216 &self,
217 func: &LambdaFunction,
218 _code_zip: Option<&[u8]>,
219 _layers: &[Vec<u8>],
220 deploy_id: &str,
221 ) -> Result<WarmInstance, RuntimeError> {
222 let account_id = account_id_from_arn(&func.function_arn);
223 let ctx = PodSpecContext {
224 instance_id: &self.instance_id,
225 namespace: &self.namespace,
226 self_url: &self.self_url,
227 self_host: &self.self_host,
228 ecr_host: &self.ecr_host,
229 ecr_port: self.ecr_port,
230 internal_token: &self.internal_token,
231 account_id,
232 pull_secret: self.pull_secret.as_deref(),
233 };
234 let pod =
235 build_pod_spec(func, deploy_id, &ctx).map_err(RuntimeError::ContainerStartFailed)?;
236 let pod_name = pod
237 .metadata
238 .name
239 .clone()
240 .unwrap_or_else(|| pod_name_for(&func.function_name, deploy_id));
241
242 let api = self.pods_api();
243
244 let _ = api.delete(&pod_name, &DeleteParams::default()).await;
248 for attempt in 0..6 {
252 match api.create(&PostParams::default(), &pod).await {
253 Ok(_) => break,
254 Err(kube::Error::Api(e)) if e.code == 409 && attempt < 5 => {
255 tokio::time::sleep(Duration::from_millis(500)).await;
256 let _ = api.delete(&pod_name, &DeleteParams::default()).await;
257 continue;
258 }
259 Err(e) => {
260 return Err(RuntimeError::ContainerStartFailed(format!(
261 "k8s create pod {pod_name}: {e}"
262 )));
263 }
264 }
265 }
266
267 let pod_ip = self.wait_for_pod_ip(&pod_name).await.inspect_err(|_| {
268 let api = self.pods_api();
269 let name = pod_name.clone();
270 tokio::spawn(async move {
271 let _ = api.delete(&name, &DeleteParams::default()).await;
272 });
273 })?;
274 self.wait_for_rie_ready(&pod_ip).await.inspect_err(|_| {
275 let api = self.pods_api();
276 let name = pod_name.clone();
277 tokio::spawn(async move {
278 let _ = api.delete(&name, &DeleteParams::default()).await;
279 });
280 })?;
281
282 tracing::info!(
283 function = %func.function_name,
284 pod = %pod_name,
285 namespace = %self.namespace,
286 pod_ip = %pod_ip,
287 "Lambda Pod started"
288 );
289
290 Ok(WarmInstance {
291 endpoint: format!("{pod_ip}:8080"),
292 handle: BackendHandle::Pod {
293 namespace: self.namespace.clone(),
294 name: pod_name,
295 },
296 })
297 }
298
299 async fn terminate(&self, handle: &BackendHandle) {
300 let (ns, name) = match handle {
301 BackendHandle::Pod { namespace, name } => (namespace.clone(), name.clone()),
302 BackendHandle::Container { .. } => return,
304 };
305 let api: Api<Pod> = Api::namespaced(self.client.clone(), &ns);
306 if let Err(e) = api.delete(&name, &DeleteParams::default()).await {
307 if let kube::Error::Api(api_err) = &e {
309 if api_err.code == 404 {
310 return;
311 }
312 }
313 tracing::warn!(pod = %name, namespace = %ns, error = %e, "k8s delete pod failed");
314 }
315 }
316
317 async fn reap_stale(&self) {
322 let api = self.pods_api();
323 let lp = ListParams::default().labels("fakecloud-managed-by=fakecloud");
324 let list = match api.list(&lp).await {
325 Ok(l) => l,
326 Err(e) => {
327 tracing::warn!(error = %e, "k8s reap_stale: list pods failed");
328 return;
329 }
330 };
331 let mut reaped = 0usize;
332 for pod in list.items {
333 let labels = pod.metadata.labels.as_ref();
334 let inst = labels.and_then(|l| l.get("fakecloud-instance")).cloned();
335 if inst.as_deref() == Some(self.instance_id.as_str()) {
336 continue;
337 }
338 if let Some(name) = pod.metadata.name {
339 if let Err(e) = api.delete(&name, &DeleteParams::default()).await {
340 tracing::warn!(pod = %name, error = %e, "k8s reap_stale: delete failed");
341 } else {
342 reaped += 1;
343 }
344 }
345 }
346 if reaped > 0 {
347 tracing::info!(reaped, "k8s reap_stale: removed orphan Lambda Pods");
348 }
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::account_id_from_arn;
355
356 #[test]
357 fn account_id_from_simple_arn() {
358 assert_eq!(
359 account_id_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-fn"),
360 "123456789012"
361 );
362 }
363
364 #[test]
365 fn account_id_from_qualified_arn() {
366 assert_eq!(
367 account_id_from_arn("arn:aws:lambda:us-east-1:000000000000:function:my-fn:PROD"),
368 "000000000000"
369 );
370 }
371
372 #[test]
373 fn account_id_falls_back_for_malformed_arn() {
374 assert_eq!(account_id_from_arn("not-an-arn"), "000000000000");
375 }
376}