pub mod spec;
use std::time::Duration;
use async_trait::async_trait;
use fakecloud_k8s::{K8sClient, K8sEnv, K8sEnvError, K8sPodConfig, K8sPodConfigError};
use super::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
use crate::state::LambdaFunction;
use spec::{build_pod_spec, unique_pod_name, PodSpecContext};
const SERVICE: &str = "lambda";
const POD_CONFIG_PREFIX: &str = "FAKECLOUD_LAMBDA_K8S";
#[derive(Debug, thiserror::Error)]
pub enum K8sBackendError {
#[error(transparent)]
Env(#[from] K8sEnvError),
#[error(transparent)]
PodConfig(#[from] K8sPodConfigError),
#[error("failed to connect to the Kubernetes cluster: {0}")]
Connect(String),
}
pub struct K8sBackend {
client: K8sClient,
self_url: String,
self_host: String,
ecr_host: String,
ecr_port: u16,
internal_token: String,
pull_secret: Option<String>,
pod_config: K8sPodConfig,
}
impl K8sBackend {
pub async fn from_env(
default_ecr_port: u16,
internal_token: String,
) -> Result<Self, K8sBackendError> {
let env = K8sEnv::from_env(default_ecr_port)?;
let pod_config = K8sPodConfig::resolved_base(POD_CONFIG_PREFIX)?;
let client = K8sClient::connect(env.namespace.clone())
.await
.map_err(|e| K8sBackendError::Connect(e.to_string()))?;
tracing::info!(
namespace = %env.namespace,
self_url = %env.self_url,
ecr = %format!("{}:{}", env.ecr_host, env.ecr_port),
"K8s Lambda backend initialized"
);
Ok(Self {
client,
self_url: env.self_url,
self_host: env.self_host,
ecr_host: env.ecr_host,
ecr_port: env.ecr_port,
internal_token,
pull_secret: env.pull_secret,
pod_config,
})
}
}
fn account_id_from_arn(arn: &str) -> &str {
arn.split(':').nth(4).unwrap_or("000000000000")
}
#[async_trait]
impl LambdaBackend for K8sBackend {
fn name(&self) -> &str {
"kubernetes"
}
async fn launch(
&self,
func: &LambdaFunction,
_code_zip: Option<&[u8]>,
_layers: &[Vec<u8>],
deploy_id: &str,
) -> Result<WarmInstance, RuntimeError> {
let account_id = account_id_from_arn(&func.function_arn);
let ctx = PodSpecContext {
instance_id: self.client.instance_id(),
namespace: self.client.namespace(),
self_url: &self.self_url,
self_host: &self.self_host,
ecr_host: &self.ecr_host,
ecr_port: self.ecr_port,
internal_token: &self.internal_token,
account_id,
pull_secret: self.pull_secret.as_deref(),
};
let mut pod =
build_pod_spec(func, deploy_id, &ctx).map_err(RuntimeError::ContainerStartFailed)?;
let pod_name = unique_pod_name(&func.function_name, deploy_id);
pod.metadata.name = Some(pod_name.clone());
self.pod_config
.clone()
.merge(K8sPodConfig::from_tags(&func.tags))
.apply(&mut pod);
self.client
.create_pod(&pod)
.await
.map_err(|e| RuntimeError::ContainerStartFailed(format!("k8s create pod: {e}")))?;
let pod_ip = match self
.client
.wait_for_pod_ip(&pod_name, Duration::from_secs(60))
.await
{
Ok(ip) => ip,
Err(e) => {
self.client.delete_pod(&pod_name).await;
return Err(RuntimeError::ContainerStartFailed(e.to_string()));
}
};
if let Err(e) = K8sClient::wait_for_tcp(&pod_ip, 8080, Duration::from_secs(10)).await {
self.client.delete_pod(&pod_name).await;
return Err(RuntimeError::ContainerStartFailed(format!(
"RIE on {pod_ip}:8080 not ready: {e}"
)));
}
tracing::info!(
function = %func.function_name,
pod = %pod_name,
namespace = %self.client.namespace(),
pod_ip = %pod_ip,
"Lambda Pod started"
);
Ok(WarmInstance {
endpoint: format!("{pod_ip}:8080"),
handle: BackendHandle::Pod {
namespace: self.client.namespace().to_string(),
name: pod_name,
},
})
}
async fn terminate(&self, handle: &BackendHandle) {
match handle {
BackendHandle::Pod { name, .. } => self.client.delete_pod(name).await,
BackendHandle::Container { .. } => {}
}
}
async fn instance_logs(&self, handle: &BackendHandle) -> Option<String> {
let BackendHandle::Pod { name, .. } = handle else {
return None;
};
self.client
.pod_logs(name, None)
.await
.ok()
.filter(|s| !s.is_empty())
}
async fn reap_stale(&self) {
self.client.reap_stale(SERVICE).await;
}
}
#[cfg(test)]
mod tests {
use super::account_id_from_arn;
#[test]
fn account_id_from_simple_arn() {
assert_eq!(
account_id_from_arn("arn:aws:lambda:us-east-1:123456789012:function:my-fn"),
"123456789012"
);
}
#[test]
fn account_id_from_qualified_arn() {
assert_eq!(
account_id_from_arn("arn:aws:lambda:us-east-1:000000000000:function:my-fn:PROD"),
"000000000000"
);
}
#[test]
fn account_id_falls_back_for_malformed_arn() {
assert_eq!(account_id_from_arn("not-an-arn"), "000000000000");
}
}