use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use base64::Engine;
use tempfile::TempDir;
use super::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
use super::env_rewrite::rewrite_localhost_envs;
use crate::state::LambdaFunction;
pub struct DockerBackend {
cli: String,
instance_id: String,
host_alias: String,
add_host_arg: Option<String>,
server_port: u16,
sibling_host: String,
docker_config: Option<Arc<TempDir>>,
}
impl DockerBackend {
pub fn auto_detect(server_port: u16) -> Option<Self> {
let cli = fakecloud_core::container_net::detect_container_cli()?;
let instance_id = format!("fakecloud-{}", std::process::id());
let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
Some(Self {
cli,
instance_id,
host_alias: net.host_alias,
add_host_arg: net.add_host_arg,
server_port,
sibling_host: net.sibling_host,
docker_config,
})
}
fn apply_host_alias(&self, cmd: &mut tokio::process::Command) {
if let Some(arg) = &self.add_host_arg {
cmd.arg("--add-host").arg(arg);
}
}
fn docker_config_path(&self) -> Option<PathBuf> {
self.docker_config.as_ref().map(|d| d.path().to_path_buf())
}
async fn start_image_container(
&self,
func: &LambdaFunction,
layers: &[Vec<u8>],
) -> Result<WarmInstance, RuntimeError> {
let image = func.image_uri.as_deref().ok_or_else(|| {
RuntimeError::ContainerStartFailed("PackageType=Image function has no ImageUri".into())
})?;
let local_pull_uri = fakecloud_core::ecr_uri::translate_to_local_at(
image,
&self.sibling_host,
self.server_port,
);
let pull_uri = local_pull_uri.as_deref().unwrap_or(image);
let mut pull_cmd = tokio::process::Command::new(&self.cli);
if let Some(p) = self.docker_config_path() {
pull_cmd.env("DOCKER_CONFIG", p);
}
let pull_out = pull_cmd
.args(["pull", pull_uri])
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(format!("docker pull: {e}")))?;
if !pull_out.status.success() {
return Err(RuntimeError::ContainerStartFailed(format!(
"docker pull failed: {}",
String::from_utf8_lossy(&pull_out.stderr)
)));
}
let run_image = if let Some(ref local_uri) = local_pull_uri {
if fakecloud_core::ecr_uri::is_digest_ref(image) {
local_uri.clone()
} else {
let _ = tokio::process::Command::new(&self.cli)
.args(["tag", local_uri, image])
.output()
.await;
image.to_string()
}
} else {
image.to_string()
};
let mut cmd = tokio::process::Command::new(&self.cli);
cmd.arg("create")
.arg("-p")
.arg(":8080")
.arg("--label")
.arg(format!("fakecloud-lambda={}", func.function_name))
.arg("--label")
.arg(format!("fakecloud-instance={}", self.instance_id));
self.apply_host_alias(&mut cmd);
for (key, value) in rewrite_localhost_envs(&func.environment, &self.host_alias) {
cmd.arg("-e").arg(format!("{key}={value}"));
}
cmd.arg("-e")
.arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
let tmpfs_arg = ephemeral_storage_tmpfs_arg(func.ephemeral_storage_size);
cmd.arg("--tmpfs").arg(tmpfs_arg);
cmd.arg(&run_image);
let output = cmd
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !output.status.success() {
return Err(RuntimeError::ContainerStartFailed(
String::from_utf8_lossy(&output.stderr).to_string(),
));
}
let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
if let Err(e) = self.copy_layers_into(&container_id, layers).await {
self.remove_container(&container_id).await;
return Err(e);
}
let start_result = tokio::process::Command::new(&self.cli)
.args(["start", &container_id])
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !start_result.status.success() {
self.remove_container(&container_id).await;
return Err(RuntimeError::ContainerStartFailed(format!(
"docker start failed: {}",
String::from_utf8_lossy(&start_result.stderr)
)));
}
let port = self.query_host_port(&container_id).await?;
self.wait_for_ready(&container_id, port).await?;
tracing::info!(
function = %func.function_name,
container_id = %container_id,
port = port,
image = %image,
"Lambda image container started"
);
Ok(WarmInstance {
endpoint: format!("{}:{port}", self.sibling_host),
handle: BackendHandle::Container { id: container_id },
})
}
async fn start_zip_container(
&self,
func: &LambdaFunction,
zip_bytes: &[u8],
layers: &[Vec<u8>],
) -> Result<WarmInstance, RuntimeError> {
let image = runtime_to_image(&func.runtime)
.ok_or_else(|| RuntimeError::UnsupportedRuntime(func.runtime.clone()))?;
let code_dir =
TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
let zip_bytes = zip_bytes.to_vec();
let code_path = code_dir.path().to_path_buf();
tokio::task::spawn_blocking(move || extract_zip(&zip_bytes, &code_path))
.await
.map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
let mut cmd = tokio::process::Command::new(&self.cli);
cmd.arg("create")
.arg("-p")
.arg(":8080")
.arg("--label")
.arg(format!("fakecloud-lambda={}", func.function_name))
.arg("--label")
.arg(format!("fakecloud-instance={}", self.instance_id));
self.apply_host_alias(&mut cmd);
for (key, value) in rewrite_localhost_envs(&func.environment, &self.host_alias) {
cmd.arg("-e").arg(format!("{key}={value}"));
}
cmd.arg("-e")
.arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
let tmpfs_arg = ephemeral_storage_tmpfs_arg(func.ephemeral_storage_size);
cmd.arg("--tmpfs").arg(tmpfs_arg);
cmd.arg(&image).arg(&func.handler);
let output = cmd
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(RuntimeError::ContainerStartFailed(stderr.to_string()));
}
let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
let cp_result = tokio::process::Command::new(&self.cli)
.arg("cp")
.arg(format!("{}/.", code_dir.path().display()))
.arg(format!("{}:/var/task", container_id))
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !cp_result.status.success() {
self.remove_container(&container_id).await;
let stderr = String::from_utf8_lossy(&cp_result.stderr);
return Err(RuntimeError::ContainerStartFailed(format!(
"docker cp failed: {stderr}"
)));
}
if func.runtime.starts_with("provided") {
let cp_runtime = tokio::process::Command::new(&self.cli)
.arg("cp")
.arg(format!("{}/.", code_dir.path().display()))
.arg(format!("{}:/var/runtime", container_id))
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !cp_runtime.status.success() {
self.remove_container(&container_id).await;
let stderr = String::from_utf8_lossy(&cp_runtime.stderr);
return Err(RuntimeError::ContainerStartFailed(format!(
"docker cp to /var/runtime failed: {stderr}"
)));
}
}
if let Err(e) = self.copy_layers_into(&container_id, layers).await {
self.remove_container(&container_id).await;
return Err(e);
}
let start_result = tokio::process::Command::new(&self.cli)
.args(["start", &container_id])
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !start_result.status.success() {
self.remove_container(&container_id).await;
let stderr = String::from_utf8_lossy(&start_result.stderr);
return Err(RuntimeError::ContainerStartFailed(format!(
"docker start failed: {stderr}"
)));
}
let port = self.query_host_port(&container_id).await?;
self.wait_for_ready(&container_id, port).await?;
tracing::info!(
function = %func.function_name,
container_id = %container_id,
port = port,
runtime = %func.runtime,
"Lambda container started"
);
Ok(WarmInstance {
endpoint: format!("{}:{port}", self.sibling_host),
handle: BackendHandle::Container { id: container_id },
})
}
async fn query_host_port(&self, container_id: &str) -> Result<u16, RuntimeError> {
let port_output = tokio::process::Command::new(&self.cli)
.args(["port", container_id, "8080"])
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
let port_str = String::from_utf8_lossy(&port_output.stdout);
port_str
.trim()
.rsplit(':')
.next()
.and_then(|p| p.parse().ok())
.ok_or_else(|| {
RuntimeError::ContainerStartFailed(format!(
"could not determine port from: {}",
port_str.trim()
))
})
}
async fn wait_for_ready(&self, container_id: &str, port: u16) -> Result<(), RuntimeError> {
for _ in 0..20 {
tokio::time::sleep(Duration::from_millis(500)).await;
if tokio::net::TcpStream::connect(format!("{}:{port}", self.sibling_host))
.await
.is_ok()
{
return Ok(());
}
}
self.remove_container(container_id).await;
Err(RuntimeError::ContainerStartFailed(
"container did not become ready within 10 seconds".to_string(),
))
}
async fn copy_layers_into(
&self,
container_id: &str,
layers: &[Vec<u8>],
) -> Result<(), RuntimeError> {
if layers.is_empty() {
return Ok(());
}
let layers_dir =
TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
let layers_path = layers_dir.path().to_path_buf();
let layers_owned: Vec<Vec<u8>> = layers.to_vec();
tokio::task::spawn_blocking(move || {
for bytes in &layers_owned {
extract_zip(bytes, &layers_path)?;
}
Ok::<_, RuntimeError>(())
})
.await
.map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
let cp_result = tokio::process::Command::new(&self.cli)
.arg("cp")
.arg(format!("{}/.", layers_dir.path().display()))
.arg(format!("{}:/opt", container_id))
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !cp_result.status.success() {
let stderr = String::from_utf8_lossy(&cp_result.stderr);
return Err(RuntimeError::ContainerStartFailed(format!(
"docker cp layers to /opt failed: {stderr}"
)));
}
Ok(())
}
async fn remove_container(&self, container_id: &str) {
let _ = tokio::process::Command::new(&self.cli)
.args(["rm", "-f", container_id])
.output()
.await;
}
}
#[async_trait]
impl LambdaBackend for DockerBackend {
fn name(&self) -> &str {
&self.cli
}
async fn launch(
&self,
func: &LambdaFunction,
code_zip: Option<&[u8]>,
layers: &[Vec<u8>],
_deploy_id: &str,
) -> Result<WarmInstance, RuntimeError> {
if func.package_type == "Image" {
self.start_image_container(func, layers).await
} else {
let bytes =
code_zip.ok_or_else(|| RuntimeError::NoCodeZip(func.function_name.clone()))?;
self.start_zip_container(func, bytes, layers).await
}
}
async fn terminate(&self, handle: &BackendHandle) {
match handle {
BackendHandle::Container { id } => self.remove_container(id).await,
BackendHandle::Pod { .. } => {}
}
}
async fn instance_logs(&self, handle: &BackendHandle) -> Option<String> {
let BackendHandle::Container { id } = handle else {
return None;
};
let output = tokio::process::Command::new(&self.cli)
.args(["logs", "--tail", "200", id])
.output()
.await
.ok()?;
let mut combined = String::from_utf8_lossy(&output.stdout).into_owned();
combined.push_str(&String::from_utf8_lossy(&output.stderr));
if combined.is_empty() {
None
} else {
Some(combined)
}
}
async fn prepull_image(&self, image: &str) -> Result<(), RuntimeError> {
let local_uri = fakecloud_core::ecr_uri::translate_to_local_at(
image,
&self.sibling_host,
self.server_port,
);
let pull_uri = local_uri.as_deref().unwrap_or(image);
let mut cmd = tokio::process::Command::new(&self.cli);
if let Some(p) = self.docker_config_path() {
cmd.env("DOCKER_CONFIG", p);
}
let out = cmd
.args(["pull", pull_uri])
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(format!("docker pull: {e}")))?;
if !out.status.success() {
return Err(RuntimeError::ContainerStartFailed(format!(
"docker pull failed for {pull_uri}: {}",
String::from_utf8_lossy(&out.stderr)
)));
}
Ok(())
}
}
pub fn runtime_to_image(runtime: &str) -> Option<String> {
let (base, tag) = match runtime {
"python3.14" => ("python", "3.14"),
"python3.13" => ("python", "3.13"),
"python3.12" => ("python", "3.12"),
"python3.11" => ("python", "3.11"),
"python3.10" => ("python", "3.10"),
"python3.9" => ("python", "3.9"),
"python3.8" => ("python", "3.8"),
"nodejs24.x" => ("nodejs", "24"),
"nodejs22.x" => ("nodejs", "22"),
"nodejs20.x" => ("nodejs", "20"),
"nodejs18.x" => ("nodejs", "18"),
"nodejs16.x" => ("nodejs", "16"),
"ruby3.4" => ("ruby", "3.4"),
"ruby3.3" => ("ruby", "3.3"),
"java25" => ("java", "25"),
"java21" => ("java", "21"),
"java17" => ("java", "17"),
"java11" => ("java", "11"),
"dotnet10" => ("dotnet", "10"),
"dotnet8" => ("dotnet", "8"),
"go1.x" => ("go", "1"),
"provided.al2023" => ("provided", "al2023"),
"provided.al2" => ("provided", "al2"),
_ => return None,
};
Some(format!("public.ecr.aws/lambda/{base}:{tag}"))
}
pub(crate) fn ephemeral_storage_tmpfs_arg(size: Option<i64>) -> String {
let mib = size.unwrap_or(512).max(64);
format!("/tmp:size={mib}m,exec")
}
pub fn extract_zip(zip_bytes: &[u8], dest: &Path) -> Result<(), RuntimeError> {
let cursor = std::io::Cursor::new(zip_bytes);
let mut archive = zip::ZipArchive::new(cursor)
.map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
for i in 0..archive.len() {
let mut file = archive
.by_index(i)
.map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
let out_path = dest.join(file.enclosed_name().ok_or_else(|| {
RuntimeError::ZipExtractionFailed("invalid file name in ZIP".to_string())
})?);
if file.is_dir() {
std::fs::create_dir_all(&out_path)
.map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
} else {
if let Some(parent) = out_path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
}
let mut out_file = std::fs::File::create(&out_path)
.map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
std::io::copy(&mut file, &mut out_file)
.map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
if let Some(mode) = file.unix_mode() {
std::fs::set_permissions(&out_path, std::fs::Permissions::from_mode(mode))
.map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
}
}
}
}
Ok(())
}
fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
let dir = TempDir::new().ok()?;
let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-lambda-runtime");
let auths: serde_json::Map<String, serde_json::Value> =
fakecloud_core::container_net::registry_auth_hosts(server_port)
.into_iter()
.map(|host| (host, serde_json::json!({ "auth": auth })))
.collect();
let config = serde_json::json!({ "auths": auths });
std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
Some(dir)
}
#[cfg(test)]
mod tests {
use std::io::{Read, Write};
use super::*;
#[test]
fn test_runtime_to_image() {
assert_eq!(
runtime_to_image("python3.12"),
Some("public.ecr.aws/lambda/python:3.12".to_string())
);
assert_eq!(
runtime_to_image("nodejs20.x"),
Some("public.ecr.aws/lambda/nodejs:20".to_string())
);
assert_eq!(
runtime_to_image("provided.al2023"),
Some("public.ecr.aws/lambda/provided:al2023".to_string())
);
assert_eq!(
runtime_to_image("ruby3.4"),
Some("public.ecr.aws/lambda/ruby:3.4".to_string())
);
assert_eq!(
runtime_to_image("java21"),
Some("public.ecr.aws/lambda/java:21".to_string())
);
assert_eq!(
runtime_to_image("dotnet8"),
Some("public.ecr.aws/lambda/dotnet:8".to_string())
);
assert_eq!(
runtime_to_image("nodejs16.x"),
Some("public.ecr.aws/lambda/nodejs:16".to_string())
);
assert_eq!(
runtime_to_image("python3.10"),
Some("public.ecr.aws/lambda/python:3.10".to_string())
);
assert_eq!(
runtime_to_image("python3.9"),
Some("public.ecr.aws/lambda/python:3.9".to_string())
);
assert_eq!(
runtime_to_image("python3.8"),
Some("public.ecr.aws/lambda/python:3.8".to_string())
);
assert_eq!(
runtime_to_image("java11"),
Some("public.ecr.aws/lambda/java:11".to_string())
);
assert_eq!(
runtime_to_image("go1.x"),
Some("public.ecr.aws/lambda/go:1".to_string())
);
assert_eq!(
runtime_to_image("nodejs24.x"),
Some("public.ecr.aws/lambda/nodejs:24".to_string())
);
assert_eq!(
runtime_to_image("python3.14"),
Some("public.ecr.aws/lambda/python:3.14".to_string())
);
assert_eq!(
runtime_to_image("java25"),
Some("public.ecr.aws/lambda/java:25".to_string())
);
assert_eq!(
runtime_to_image("dotnet10"),
Some("public.ecr.aws/lambda/dotnet:10".to_string())
);
assert_eq!(runtime_to_image("unknown"), None);
}
#[test]
fn test_extract_zip() {
let buf = Vec::new();
let cursor = std::io::Cursor::new(buf);
let mut writer = zip::ZipWriter::new(cursor);
let options = zip::write::SimpleFileOptions::default();
writer.start_file("handler.py", options).unwrap();
writer
.write_all(b"def handler(event, context):\n return {'statusCode': 200}\n")
.unwrap();
let cursor = writer.finish().unwrap();
let zip_bytes = cursor.into_inner();
let dir = TempDir::new().unwrap();
extract_zip(&zip_bytes, dir.path()).unwrap();
let handler_path = dir.path().join("handler.py");
assert!(handler_path.exists());
let mut content = String::new();
std::fs::File::open(&handler_path)
.unwrap()
.read_to_string(&mut content)
.unwrap();
assert!(content.contains("def handler"));
}
#[test]
fn ephemeral_storage_tmpfs_arg_defaults_to_512_when_none() {
assert_eq!(ephemeral_storage_tmpfs_arg(None), "/tmp:size=512m,exec");
}
#[test]
fn ephemeral_storage_tmpfs_arg_uses_supplied_size() {
assert_eq!(
ephemeral_storage_tmpfs_arg(Some(2048)),
"/tmp:size=2048m,exec"
);
assert_eq!(
ephemeral_storage_tmpfs_arg(Some(10240)),
"/tmp:size=10240m,exec"
);
}
#[test]
fn ephemeral_storage_tmpfs_arg_clamps_to_64_floor() {
assert_eq!(ephemeral_storage_tmpfs_arg(Some(0)), "/tmp:size=64m,exec");
assert_eq!(ephemeral_storage_tmpfs_arg(Some(32)), "/tmp:size=64m,exec");
}
}