1use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::Duration;
9
10use async_trait::async_trait;
11use base64::Engine;
12use tempfile::TempDir;
13
14use super::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
15use super::env_rewrite::rewrite_localhost_envs;
16use crate::state::LambdaFunction;
17
18pub struct DockerBackend {
20 cli: String,
21 instance_id: String,
22 host_ip: String,
24 server_port: u16,
28 docker_config: Option<Arc<TempDir>>,
32}
33
34impl DockerBackend {
35 pub fn auto_detect(server_port: u16) -> Option<Self> {
40 let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
41 if std::process::Command::new(&cli)
42 .arg("info")
43 .stdout(std::process::Stdio::null())
44 .stderr(std::process::Stdio::null())
45 .status()
46 .map(|s| s.success())
47 .unwrap_or(false)
48 {
49 cli
50 } else {
51 return None;
52 }
53 } else if is_cli_available("docker") {
54 "docker".to_string()
55 } else if is_cli_available("podman") {
56 "podman".to_string()
57 } else {
58 return None;
59 };
60
61 let instance_id = format!("fakecloud-{}", std::process::id());
62
63 let host_ip = if cfg!(target_os = "linux") {
66 detect_bridge_gateway(&cli).unwrap_or_else(|| "172.17.0.1".to_string())
67 } else {
68 "host-gateway".to_string()
69 };
70
71 let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
72 Some(Self {
73 cli,
74 instance_id,
75 host_ip,
76 server_port,
77 docker_config,
78 })
79 }
80
81 fn docker_config_path(&self) -> Option<PathBuf> {
82 self.docker_config.as_ref().map(|d| d.path().to_path_buf())
83 }
84
85 async fn start_image_container(
91 &self,
92 func: &LambdaFunction,
93 layers: &[Vec<u8>],
94 ) -> Result<WarmInstance, RuntimeError> {
95 let image = func.image_uri.as_deref().ok_or_else(|| {
96 RuntimeError::ContainerStartFailed("PackageType=Image function has no ImageUri".into())
97 })?;
98
99 let local_pull_uri = fakecloud_core::ecr_uri::translate_to_local(image, self.server_port);
100 let pull_uri = local_pull_uri.as_deref().unwrap_or(image);
101
102 let mut pull_cmd = tokio::process::Command::new(&self.cli);
103 if let Some(p) = self.docker_config_path() {
104 pull_cmd.env("DOCKER_CONFIG", p);
105 }
106 let pull_out = pull_cmd
107 .args(["pull", pull_uri])
108 .output()
109 .await
110 .map_err(|e| RuntimeError::ContainerStartFailed(format!("docker pull: {e}")))?;
111 if !pull_out.status.success() {
112 return Err(RuntimeError::ContainerStartFailed(format!(
113 "docker pull failed: {}",
114 String::from_utf8_lossy(&pull_out.stderr)
115 )));
116 }
117 let run_image = if let Some(ref local_uri) = local_pull_uri {
122 if fakecloud_core::ecr_uri::is_digest_ref(image) {
123 local_uri.clone()
124 } else {
125 let _ = tokio::process::Command::new(&self.cli)
126 .args(["tag", local_uri, image])
127 .output()
128 .await;
129 image.to_string()
130 }
131 } else {
132 image.to_string()
133 };
134
135 let mut cmd = tokio::process::Command::new(&self.cli);
136 cmd.arg("create")
137 .arg("-p")
138 .arg(":8080")
139 .arg("--label")
140 .arg(format!("fakecloud-lambda={}", func.function_name))
141 .arg("--label")
142 .arg(format!("fakecloud-instance={}", self.instance_id))
143 .arg("--add-host")
144 .arg(format!("host.docker.internal:{}", self.host_ip));
145
146 for (key, value) in rewrite_localhost_envs(&func.environment, "host.docker.internal") {
147 cmd.arg("-e").arg(format!("{key}={value}"));
148 }
149 cmd.arg("-e")
150 .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
151
152 let tmpfs_arg = ephemeral_storage_tmpfs_arg(func.ephemeral_storage_size);
153 cmd.arg("--tmpfs").arg(tmpfs_arg);
154
155 cmd.arg(&run_image);
156
157 let output = cmd
158 .output()
159 .await
160 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
161 if !output.status.success() {
162 return Err(RuntimeError::ContainerStartFailed(
163 String::from_utf8_lossy(&output.stderr).to_string(),
164 ));
165 }
166 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
167
168 if let Err(e) = self.copy_layers_into(&container_id, layers).await {
169 self.remove_container(&container_id).await;
170 return Err(e);
171 }
172
173 let start_result = tokio::process::Command::new(&self.cli)
174 .args(["start", &container_id])
175 .output()
176 .await
177 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
178 if !start_result.status.success() {
179 self.remove_container(&container_id).await;
180 return Err(RuntimeError::ContainerStartFailed(format!(
181 "docker start failed: {}",
182 String::from_utf8_lossy(&start_result.stderr)
183 )));
184 }
185
186 let port = self.query_host_port(&container_id).await?;
187 self.wait_for_ready(&container_id, port).await?;
188
189 tracing::info!(
190 function = %func.function_name,
191 container_id = %container_id,
192 port = port,
193 image = %image,
194 "Lambda image container started"
195 );
196
197 Ok(WarmInstance {
198 endpoint: format!("127.0.0.1:{port}"),
199 handle: BackendHandle::Container { id: container_id },
200 })
201 }
202
203 async fn start_zip_container(
204 &self,
205 func: &LambdaFunction,
206 zip_bytes: &[u8],
207 layers: &[Vec<u8>],
208 ) -> Result<WarmInstance, RuntimeError> {
209 let image = runtime_to_image(&func.runtime)
210 .ok_or_else(|| RuntimeError::UnsupportedRuntime(func.runtime.clone()))?;
211
212 let code_dir =
215 TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
216 let zip_bytes = zip_bytes.to_vec();
217 let code_path = code_dir.path().to_path_buf();
218 tokio::task::spawn_blocking(move || extract_zip(&zip_bytes, &code_path))
219 .await
220 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
221
222 let mut cmd = tokio::process::Command::new(&self.cli);
224 cmd.arg("create")
225 .arg("-p")
226 .arg(":8080")
227 .arg("--label")
228 .arg(format!("fakecloud-lambda={}", func.function_name))
229 .arg("--label")
230 .arg(format!("fakecloud-instance={}", self.instance_id))
231 .arg("--add-host")
232 .arg(format!("host.docker.internal:{}", self.host_ip));
233
234 for (key, value) in rewrite_localhost_envs(&func.environment, "host.docker.internal") {
235 cmd.arg("-e").arg(format!("{key}={value}"));
236 }
237
238 cmd.arg("-e")
239 .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
240
241 let tmpfs_arg = ephemeral_storage_tmpfs_arg(func.ephemeral_storage_size);
242 cmd.arg("--tmpfs").arg(tmpfs_arg);
243
244 cmd.arg(&image).arg(&func.handler);
245
246 let output = cmd
247 .output()
248 .await
249 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
250
251 if !output.status.success() {
252 let stderr = String::from_utf8_lossy(&output.stderr);
253 return Err(RuntimeError::ContainerStartFailed(stderr.to_string()));
254 }
255
256 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
257
258 let cp_result = tokio::process::Command::new(&self.cli)
260 .arg("cp")
261 .arg(format!("{}/.", code_dir.path().display()))
262 .arg(format!("{}:/var/task", container_id))
263 .output()
264 .await
265 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
266
267 if !cp_result.status.success() {
268 self.remove_container(&container_id).await;
269 let stderr = String::from_utf8_lossy(&cp_result.stderr);
270 return Err(RuntimeError::ContainerStartFailed(format!(
271 "docker cp failed: {stderr}"
272 )));
273 }
274
275 if func.runtime.starts_with("provided") {
277 let cp_runtime = tokio::process::Command::new(&self.cli)
278 .arg("cp")
279 .arg(format!("{}/.", code_dir.path().display()))
280 .arg(format!("{}:/var/runtime", container_id))
281 .output()
282 .await
283 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
284
285 if !cp_runtime.status.success() {
286 self.remove_container(&container_id).await;
287 let stderr = String::from_utf8_lossy(&cp_runtime.stderr);
288 return Err(RuntimeError::ContainerStartFailed(format!(
289 "docker cp to /var/runtime failed: {stderr}"
290 )));
291 }
292 }
293
294 if let Err(e) = self.copy_layers_into(&container_id, layers).await {
295 self.remove_container(&container_id).await;
296 return Err(e);
297 }
298
299 let start_result = tokio::process::Command::new(&self.cli)
302 .args(["start", &container_id])
303 .output()
304 .await
305 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
306
307 if !start_result.status.success() {
308 self.remove_container(&container_id).await;
309 let stderr = String::from_utf8_lossy(&start_result.stderr);
310 return Err(RuntimeError::ContainerStartFailed(format!(
311 "docker start failed: {stderr}"
312 )));
313 }
314
315 let port = self.query_host_port(&container_id).await?;
316 self.wait_for_ready(&container_id, port).await?;
317
318 tracing::info!(
319 function = %func.function_name,
320 container_id = %container_id,
321 port = port,
322 runtime = %func.runtime,
323 "Lambda container started"
324 );
325
326 Ok(WarmInstance {
327 endpoint: format!("127.0.0.1:{port}"),
328 handle: BackendHandle::Container { id: container_id },
329 })
330 }
331
332 async fn query_host_port(&self, container_id: &str) -> Result<u16, RuntimeError> {
333 let port_output = tokio::process::Command::new(&self.cli)
334 .args(["port", container_id, "8080"])
335 .output()
336 .await
337 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
338 let port_str = String::from_utf8_lossy(&port_output.stdout);
339 port_str
340 .trim()
341 .rsplit(':')
342 .next()
343 .and_then(|p| p.parse().ok())
344 .ok_or_else(|| {
345 RuntimeError::ContainerStartFailed(format!(
346 "could not determine port from: {}",
347 port_str.trim()
348 ))
349 })
350 }
351
352 async fn wait_for_ready(&self, container_id: &str, port: u16) -> Result<(), RuntimeError> {
353 for _ in 0..20 {
354 tokio::time::sleep(Duration::from_millis(500)).await;
355 if tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
356 .await
357 .is_ok()
358 {
359 return Ok(());
360 }
361 }
362 self.remove_container(container_id).await;
363 Err(RuntimeError::ContainerStartFailed(
364 "container did not become ready within 10 seconds".to_string(),
365 ))
366 }
367
368 async fn copy_layers_into(
375 &self,
376 container_id: &str,
377 layers: &[Vec<u8>],
378 ) -> Result<(), RuntimeError> {
379 if layers.is_empty() {
380 return Ok(());
381 }
382 let layers_dir =
383 TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
384 let layers_path = layers_dir.path().to_path_buf();
385 let layers_owned: Vec<Vec<u8>> = layers.to_vec();
386 tokio::task::spawn_blocking(move || {
387 for bytes in &layers_owned {
388 extract_zip(bytes, &layers_path)?;
389 }
390 Ok::<_, RuntimeError>(())
391 })
392 .await
393 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
394
395 let cp_result = tokio::process::Command::new(&self.cli)
396 .arg("cp")
397 .arg(format!("{}/.", layers_dir.path().display()))
398 .arg(format!("{}:/opt", container_id))
399 .output()
400 .await
401 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
402 if !cp_result.status.success() {
403 let stderr = String::from_utf8_lossy(&cp_result.stderr);
404 return Err(RuntimeError::ContainerStartFailed(format!(
405 "docker cp layers to /opt failed: {stderr}"
406 )));
407 }
408 Ok(())
409 }
410
411 async fn remove_container(&self, container_id: &str) {
413 let _ = tokio::process::Command::new(&self.cli)
414 .args(["rm", "-f", container_id])
415 .output()
416 .await;
417 }
418}
419
420#[async_trait]
421impl LambdaBackend for DockerBackend {
422 fn name(&self) -> &str {
423 &self.cli
424 }
425
426 async fn launch(
427 &self,
428 func: &LambdaFunction,
429 code_zip: Option<&[u8]>,
430 layers: &[Vec<u8>],
431 _deploy_id: &str,
432 ) -> Result<WarmInstance, RuntimeError> {
433 if func.package_type == "Image" {
434 self.start_image_container(func, layers).await
435 } else {
436 let bytes =
437 code_zip.ok_or_else(|| RuntimeError::NoCodeZip(func.function_name.clone()))?;
438 self.start_zip_container(func, bytes, layers).await
439 }
440 }
441
442 async fn terminate(&self, handle: &BackendHandle) {
443 match handle {
444 BackendHandle::Container { id } => self.remove_container(id).await,
445 BackendHandle::Pod { .. } => {}
448 }
449 }
450}
451
452pub fn runtime_to_image(runtime: &str) -> Option<String> {
454 let (base, tag) = match runtime {
455 "python3.14" => ("python", "3.14"),
456 "python3.13" => ("python", "3.13"),
457 "python3.12" => ("python", "3.12"),
458 "python3.11" => ("python", "3.11"),
459 "python3.10" => ("python", "3.10"),
460 "python3.9" => ("python", "3.9"),
461 "python3.8" => ("python", "3.8"),
462 "nodejs24.x" => ("nodejs", "24"),
463 "nodejs22.x" => ("nodejs", "22"),
464 "nodejs20.x" => ("nodejs", "20"),
465 "nodejs18.x" => ("nodejs", "18"),
466 "nodejs16.x" => ("nodejs", "16"),
467 "ruby3.4" => ("ruby", "3.4"),
468 "ruby3.3" => ("ruby", "3.3"),
469 "java25" => ("java", "25"),
470 "java21" => ("java", "21"),
471 "java17" => ("java", "17"),
472 "java11" => ("java", "11"),
473 "dotnet10" => ("dotnet", "10"),
474 "dotnet8" => ("dotnet", "8"),
475 "go1.x" => ("go", "1"),
476 "provided.al2023" => ("provided", "al2023"),
477 "provided.al2" => ("provided", "al2"),
478 _ => return None,
479 };
480 Some(format!("public.ecr.aws/lambda/{base}:{tag}"))
481}
482
483pub(crate) fn ephemeral_storage_tmpfs_arg(size: Option<i64>) -> String {
495 let mib = size.unwrap_or(512).max(64);
496 format!("/tmp:size={mib}m,exec")
497}
498
499pub fn extract_zip(zip_bytes: &[u8], dest: &Path) -> Result<(), RuntimeError> {
501 let cursor = std::io::Cursor::new(zip_bytes);
502 let mut archive = zip::ZipArchive::new(cursor)
503 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
504
505 for i in 0..archive.len() {
506 let mut file = archive
507 .by_index(i)
508 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
509
510 let out_path = dest.join(file.enclosed_name().ok_or_else(|| {
511 RuntimeError::ZipExtractionFailed("invalid file name in ZIP".to_string())
512 })?);
513
514 if file.is_dir() {
515 std::fs::create_dir_all(&out_path)
516 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
517 } else {
518 if let Some(parent) = out_path.parent() {
519 std::fs::create_dir_all(parent)
520 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
521 }
522 let mut out_file = std::fs::File::create(&out_path)
523 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
524 std::io::copy(&mut file, &mut out_file)
525 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
526
527 #[cfg(unix)]
528 {
529 use std::os::unix::fs::PermissionsExt;
530 if let Some(mode) = file.unix_mode() {
531 std::fs::set_permissions(&out_path, std::fs::Permissions::from_mode(mode))
532 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
533 }
534 }
535 }
536 }
537 Ok(())
538}
539
540fn detect_bridge_gateway(cli: &str) -> Option<String> {
542 let output = std::process::Command::new(cli)
543 .args([
544 "network",
545 "inspect",
546 "bridge",
547 "--format",
548 "{{range .IPAM.Config}}{{.Gateway}}{{end}}",
549 ])
550 .output()
551 .ok()?;
552
553 if output.status.success() {
554 let gateway = String::from_utf8_lossy(&output.stdout).trim().to_string();
555 if !gateway.is_empty() && gateway.contains('.') {
556 tracing::info!(
557 gateway = %gateway,
558 "Detected Docker bridge gateway for Lambda containers"
559 );
560 return Some(gateway);
561 }
562 }
563 None
564}
565
566fn is_cli_available(name: &str) -> bool {
567 std::process::Command::new(name)
568 .arg("info")
569 .stdout(std::process::Stdio::null())
570 .stderr(std::process::Stdio::null())
571 .status()
572 .map(|s| s.success())
573 .unwrap_or(false)
574}
575
576fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
577 let dir = TempDir::new().ok()?;
578 let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-lambda-runtime");
579 let config = serde_json::json!({
580 "auths": {
581 format!("127.0.0.1:{server_port}"): { "auth": auth },
582 }
583 });
584 std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
585 Some(dir)
586}
587
588#[cfg(test)]
589mod tests {
590 use std::io::{Read, Write};
591
592 use super::*;
593
594 #[test]
595 fn test_runtime_to_image() {
596 assert_eq!(
597 runtime_to_image("python3.12"),
598 Some("public.ecr.aws/lambda/python:3.12".to_string())
599 );
600 assert_eq!(
601 runtime_to_image("nodejs20.x"),
602 Some("public.ecr.aws/lambda/nodejs:20".to_string())
603 );
604 assert_eq!(
605 runtime_to_image("provided.al2023"),
606 Some("public.ecr.aws/lambda/provided:al2023".to_string())
607 );
608 assert_eq!(
609 runtime_to_image("ruby3.4"),
610 Some("public.ecr.aws/lambda/ruby:3.4".to_string())
611 );
612 assert_eq!(
613 runtime_to_image("java21"),
614 Some("public.ecr.aws/lambda/java:21".to_string())
615 );
616 assert_eq!(
617 runtime_to_image("dotnet8"),
618 Some("public.ecr.aws/lambda/dotnet:8".to_string())
619 );
620 assert_eq!(
621 runtime_to_image("nodejs16.x"),
622 Some("public.ecr.aws/lambda/nodejs:16".to_string())
623 );
624 assert_eq!(
625 runtime_to_image("python3.10"),
626 Some("public.ecr.aws/lambda/python:3.10".to_string())
627 );
628 assert_eq!(
629 runtime_to_image("python3.9"),
630 Some("public.ecr.aws/lambda/python:3.9".to_string())
631 );
632 assert_eq!(
633 runtime_to_image("python3.8"),
634 Some("public.ecr.aws/lambda/python:3.8".to_string())
635 );
636 assert_eq!(
637 runtime_to_image("java11"),
638 Some("public.ecr.aws/lambda/java:11".to_string())
639 );
640 assert_eq!(
641 runtime_to_image("go1.x"),
642 Some("public.ecr.aws/lambda/go:1".to_string())
643 );
644 assert_eq!(
645 runtime_to_image("nodejs24.x"),
646 Some("public.ecr.aws/lambda/nodejs:24".to_string())
647 );
648 assert_eq!(
649 runtime_to_image("python3.14"),
650 Some("public.ecr.aws/lambda/python:3.14".to_string())
651 );
652 assert_eq!(
653 runtime_to_image("java25"),
654 Some("public.ecr.aws/lambda/java:25".to_string())
655 );
656 assert_eq!(
657 runtime_to_image("dotnet10"),
658 Some("public.ecr.aws/lambda/dotnet:10".to_string())
659 );
660 assert_eq!(runtime_to_image("unknown"), None);
661 }
662
663 #[test]
664 fn test_extract_zip() {
665 let buf = Vec::new();
666 let cursor = std::io::Cursor::new(buf);
667 let mut writer = zip::ZipWriter::new(cursor);
668 let options = zip::write::SimpleFileOptions::default();
669 writer.start_file("handler.py", options).unwrap();
670 writer
671 .write_all(b"def handler(event, context):\n return {'statusCode': 200}\n")
672 .unwrap();
673 let cursor = writer.finish().unwrap();
674 let zip_bytes = cursor.into_inner();
675
676 let dir = TempDir::new().unwrap();
677 extract_zip(&zip_bytes, dir.path()).unwrap();
678
679 let handler_path = dir.path().join("handler.py");
680 assert!(handler_path.exists());
681
682 let mut content = String::new();
683 std::fs::File::open(&handler_path)
684 .unwrap()
685 .read_to_string(&mut content)
686 .unwrap();
687 assert!(content.contains("def handler"));
688 }
689
690 #[test]
691 fn ephemeral_storage_tmpfs_arg_defaults_to_512_when_none() {
692 assert_eq!(ephemeral_storage_tmpfs_arg(None), "/tmp:size=512m,exec");
696 }
697
698 #[test]
699 fn ephemeral_storage_tmpfs_arg_uses_supplied_size() {
700 assert_eq!(
701 ephemeral_storage_tmpfs_arg(Some(2048)),
702 "/tmp:size=2048m,exec"
703 );
704 assert_eq!(
705 ephemeral_storage_tmpfs_arg(Some(10240)),
706 "/tmp:size=10240m,exec"
707 );
708 }
709
710 #[test]
711 fn ephemeral_storage_tmpfs_arg_clamps_to_64_floor() {
712 assert_eq!(ephemeral_storage_tmpfs_arg(Some(0)), "/tmp:size=64m,exec");
716 assert_eq!(ephemeral_storage_tmpfs_arg(Some(32)), "/tmp:size=64m,exec");
717 }
718}