fakecloud_lambda/runtime/k8s/
mod.rs1pub mod spec;
17
18use std::time::Duration;
19
20use async_trait::async_trait;
21use fakecloud_k8s::{K8sClient, K8sEnv, K8sEnvError, K8sPodConfig, K8sPodConfigError};
22
23use super::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
24use crate::state::LambdaFunction;
25use spec::{build_pod_spec, unique_pod_name, PodSpecContext};
26
27const SERVICE: &str = "lambda";
30
31const POD_CONFIG_PREFIX: &str = "FAKECLOUD_LAMBDA_K8S";
35
36#[derive(Debug, thiserror::Error)]
39pub enum K8sBackendError {
40 #[error(transparent)]
41 Env(#[from] K8sEnvError),
42 #[error(transparent)]
43 PodConfig(#[from] K8sPodConfigError),
44 #[error("failed to connect to the Kubernetes cluster: {0}")]
45 Connect(String),
46}
47
48pub struct K8sBackend {
50 client: K8sClient,
51 self_url: String,
55 self_host: String,
58 ecr_host: String,
61 ecr_port: u16,
62 internal_token: String,
65 pull_secret: Option<String>,
68 pod_config: K8sPodConfig,
72}
73
74impl K8sBackend {
75 pub async fn from_env(
80 default_ecr_port: u16,
81 internal_token: String,
82 ) -> Result<Self, K8sBackendError> {
83 let env = K8sEnv::from_env(default_ecr_port)?;
84 let pod_config = K8sPodConfig::resolved_base(POD_CONFIG_PREFIX)?;
85 let client = K8sClient::connect(env.namespace.clone())
86 .await
87 .map_err(|e| K8sBackendError::Connect(e.to_string()))?;
88
89 tracing::info!(
90 namespace = %env.namespace,
91 self_url = %env.self_url,
92 ecr = %format!("{}:{}", env.ecr_host, env.ecr_port),
93 "K8s Lambda backend initialized"
94 );
95
96 Ok(Self {
97 client,
98 self_url: env.self_url,
99 self_host: env.self_host,
100 ecr_host: env.ecr_host,
101 ecr_port: env.ecr_port,
102 internal_token,
103 pull_secret: env.pull_secret,
104 pod_config,
105 })
106 }
107}
108
109fn account_id_from_arn(arn: &str) -> &str {
112 arn.split(':').nth(4).unwrap_or("000000000000")
113}
114
115#[async_trait]
116impl LambdaBackend for K8sBackend {
117 fn name(&self) -> &str {
118 "kubernetes"
119 }
120
121 async fn launch(
122 &self,
123 func: &LambdaFunction,
124 _code_zip: Option<&[u8]>,
125 _layers: &[Vec<u8>],
126 deploy_id: &str,
127 ) -> Result<WarmInstance, RuntimeError> {
128 let account_id = account_id_from_arn(&func.function_arn);
129 let ctx = PodSpecContext {
130 instance_id: self.client.instance_id(),
131 namespace: self.client.namespace(),
132 self_url: &self.self_url,
133 self_host: &self.self_host,
134 ecr_host: &self.ecr_host,
135 ecr_port: self.ecr_port,
136 internal_token: &self.internal_token,
137 account_id,
138 pull_secret: self.pull_secret.as_deref(),
139 };
140 let mut pod =
141 build_pod_spec(func, deploy_id, &ctx).map_err(RuntimeError::ContainerStartFailed)?;
142 let pod_name = unique_pod_name(&func.function_name, deploy_id);
147 pod.metadata.name = Some(pod_name.clone());
148
149 self.pod_config
153 .clone()
154 .merge(K8sPodConfig::from_tags(&func.tags))
155 .apply(&mut pod);
156
157 self.client
158 .create_pod(&pod)
159 .await
160 .map_err(|e| RuntimeError::ContainerStartFailed(format!("k8s create pod: {e}")))?;
161
162 let pod_ip = match self
165 .client
166 .wait_for_pod_ip(&pod_name, Duration::from_secs(60))
167 .await
168 {
169 Ok(ip) => ip,
170 Err(e) => {
171 self.client.delete_pod(&pod_name).await;
172 return Err(RuntimeError::ContainerStartFailed(e.to_string()));
173 }
174 };
175 if let Err(e) = K8sClient::wait_for_tcp(&pod_ip, 8080, Duration::from_secs(10)).await {
178 self.client.delete_pod(&pod_name).await;
179 return Err(RuntimeError::ContainerStartFailed(format!(
180 "RIE on {pod_ip}:8080 not ready: {e}"
181 )));
182 }
183
184 tracing::info!(
185 function = %func.function_name,
186 pod = %pod_name,
187 namespace = %self.client.namespace(),
188 pod_ip = %pod_ip,
189 "Lambda Pod started"
190 );
191
192 Ok(WarmInstance {
193 endpoint: format!("{pod_ip}:8080"),
194 handle: BackendHandle::Pod {
195 namespace: self.client.namespace().to_string(),
196 name: pod_name,
197 },
198 })
199 }
200
201 async fn terminate(&self, handle: &BackendHandle) {
202 match handle {
203 BackendHandle::Pod { name, .. } => self.client.delete_pod(name).await,
204 BackendHandle::Container { .. } => {}
206 }
207 }
208
209 async fn instance_logs(&self, handle: &BackendHandle) -> Option<String> {
210 let BackendHandle::Pod { name, .. } = handle else {
211 return None;
212 };
213 self.client
215 .pod_logs(name, None)
216 .await
217 .ok()
218 .filter(|s| !s.is_empty())
219 }
220
221 async fn reap_stale(&self) {
226 self.client.reap_stale(SERVICE).await;
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::account_id_from_arn;
233
234 #[test]
235 fn account_id_from_simple_arn() {
236 assert_eq!(
237 account_id_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-fn"),
238 "123456789012"
239 );
240 }
241
242 #[test]
243 fn account_id_from_qualified_arn() {
244 assert_eq!(
245 account_id_from_arn("arn:aws:lambda:us-east-1:000000000000:function:my-fn:PROD"),
246 "000000000000"
247 );
248 }
249
250 #[test]
251 fn account_id_falls_back_for_malformed_arn() {
252 assert_eq!(account_id_from_arn("not-an-arn"), "000000000000");
253 }
254}