1use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::Duration;
9
10use async_trait::async_trait;
11use base64::Engine;
12use tempfile::TempDir;
13
14use super::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
15use super::env_rewrite::rewrite_localhost_envs;
16use crate::state::LambdaFunction;
17
18pub struct DockerBackend {
20 cli: String,
21 instance_id: String,
22 host_alias: String,
32 add_host_arg: Option<String>,
35 server_port: u16,
39 sibling_host: String,
49 docker_config: Option<Arc<TempDir>>,
53}
54
55impl DockerBackend {
56 pub fn auto_detect(server_port: u16) -> Option<Self> {
61 let cli = fakecloud_core::container_net::detect_container_cli()?;
69 let instance_id = format!("fakecloud-{}", std::process::id());
70 let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
71
72 let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
73 Some(Self {
74 cli,
75 instance_id,
76 host_alias: net.host_alias,
77 add_host_arg: net.add_host_arg,
78 server_port,
79 sibling_host: net.sibling_host,
80 docker_config,
81 })
82 }
83
84 fn apply_host_alias(&self, cmd: &mut tokio::process::Command) {
88 if let Some(arg) = &self.add_host_arg {
89 cmd.arg("--add-host").arg(arg);
90 }
91 }
92
93 fn docker_config_path(&self) -> Option<PathBuf> {
94 self.docker_config.as_ref().map(|d| d.path().to_path_buf())
95 }
96
97 async fn start_image_container(
103 &self,
104 func: &LambdaFunction,
105 layers: &[Vec<u8>],
106 ) -> Result<WarmInstance, RuntimeError> {
107 let image = func.image_uri.as_deref().ok_or_else(|| {
108 RuntimeError::ContainerStartFailed("PackageType=Image function has no ImageUri".into())
109 })?;
110
111 let local_pull_uri = fakecloud_core::ecr_uri::translate_to_local_at(
118 image,
119 &self.sibling_host,
120 self.server_port,
121 );
122 let pull_uri = local_pull_uri.as_deref().unwrap_or(image);
123
124 let mut pull_cmd = tokio::process::Command::new(&self.cli);
125 if let Some(p) = self.docker_config_path() {
126 pull_cmd.env("DOCKER_CONFIG", p);
127 }
128 let pull_out = pull_cmd
129 .args(["pull", pull_uri])
130 .output()
131 .await
132 .map_err(|e| RuntimeError::ContainerStartFailed(format!("docker pull: {e}")))?;
133 if !pull_out.status.success() {
134 return Err(RuntimeError::ContainerStartFailed(format!(
135 "docker pull failed: {}",
136 String::from_utf8_lossy(&pull_out.stderr)
137 )));
138 }
139 let run_image = if let Some(ref local_uri) = local_pull_uri {
144 if fakecloud_core::ecr_uri::is_digest_ref(image) {
145 local_uri.clone()
146 } else {
147 let _ = tokio::process::Command::new(&self.cli)
148 .args(["tag", local_uri, image])
149 .output()
150 .await;
151 image.to_string()
152 }
153 } else {
154 image.to_string()
155 };
156
157 let mut cmd = tokio::process::Command::new(&self.cli);
158 cmd.arg("create")
159 .arg("-p")
160 .arg(":8080")
161 .arg("--label")
162 .arg(format!("fakecloud-lambda={}", func.function_name))
163 .arg("--label")
164 .arg(format!("fakecloud-instance={}", self.instance_id));
165 self.apply_host_alias(&mut cmd);
166
167 for (key, value) in rewrite_localhost_envs(&func.environment, &self.host_alias) {
168 cmd.arg("-e").arg(format!("{key}={value}"));
169 }
170 cmd.arg("-e")
171 .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
172
173 let tmpfs_arg = ephemeral_storage_tmpfs_arg(func.ephemeral_storage_size);
174 cmd.arg("--tmpfs").arg(tmpfs_arg);
175
176 cmd.arg(&run_image);
177
178 let output = cmd
179 .output()
180 .await
181 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
182 if !output.status.success() {
183 return Err(RuntimeError::ContainerStartFailed(
184 String::from_utf8_lossy(&output.stderr).to_string(),
185 ));
186 }
187 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
188
189 if let Err(e) = self.copy_layers_into(&container_id, layers).await {
190 self.remove_container(&container_id).await;
191 return Err(e);
192 }
193
194 let start_result = tokio::process::Command::new(&self.cli)
195 .args(["start", &container_id])
196 .output()
197 .await
198 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
199 if !start_result.status.success() {
200 self.remove_container(&container_id).await;
201 return Err(RuntimeError::ContainerStartFailed(format!(
202 "docker start failed: {}",
203 String::from_utf8_lossy(&start_result.stderr)
204 )));
205 }
206
207 let port = self.query_host_port(&container_id).await?;
208 self.wait_for_ready(&container_id, port).await?;
209
210 tracing::info!(
211 function = %func.function_name,
212 container_id = %container_id,
213 port = port,
214 image = %image,
215 "Lambda image container started"
216 );
217
218 Ok(WarmInstance {
219 endpoint: format!("{}:{port}", self.sibling_host),
220 handle: BackendHandle::Container { id: container_id },
221 })
222 }
223
224 async fn start_zip_container(
225 &self,
226 func: &LambdaFunction,
227 zip_bytes: &[u8],
228 layers: &[Vec<u8>],
229 ) -> Result<WarmInstance, RuntimeError> {
230 let image = runtime_to_image(&func.runtime)
231 .ok_or_else(|| RuntimeError::UnsupportedRuntime(func.runtime.clone()))?;
232
233 let code_dir =
236 TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
237 let zip_bytes = zip_bytes.to_vec();
238 let code_path = code_dir.path().to_path_buf();
239 tokio::task::spawn_blocking(move || extract_zip(&zip_bytes, &code_path))
240 .await
241 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
242
243 let mut cmd = tokio::process::Command::new(&self.cli);
245 cmd.arg("create")
246 .arg("-p")
247 .arg(":8080")
248 .arg("--label")
249 .arg(format!("fakecloud-lambda={}", func.function_name))
250 .arg("--label")
251 .arg(format!("fakecloud-instance={}", self.instance_id));
252 self.apply_host_alias(&mut cmd);
253
254 for (key, value) in rewrite_localhost_envs(&func.environment, &self.host_alias) {
255 cmd.arg("-e").arg(format!("{key}={value}"));
256 }
257
258 cmd.arg("-e")
259 .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
260
261 let tmpfs_arg = ephemeral_storage_tmpfs_arg(func.ephemeral_storage_size);
262 cmd.arg("--tmpfs").arg(tmpfs_arg);
263
264 cmd.arg(&image).arg(&func.handler);
265
266 let output = cmd
267 .output()
268 .await
269 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
270
271 if !output.status.success() {
272 let stderr = String::from_utf8_lossy(&output.stderr);
273 return Err(RuntimeError::ContainerStartFailed(stderr.to_string()));
274 }
275
276 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
277
278 let cp_result = tokio::process::Command::new(&self.cli)
280 .arg("cp")
281 .arg(format!("{}/.", code_dir.path().display()))
282 .arg(format!("{}:/var/task", container_id))
283 .output()
284 .await
285 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
286
287 if !cp_result.status.success() {
288 self.remove_container(&container_id).await;
289 let stderr = String::from_utf8_lossy(&cp_result.stderr);
290 return Err(RuntimeError::ContainerStartFailed(format!(
291 "docker cp failed: {stderr}"
292 )));
293 }
294
295 if func.runtime.starts_with("provided") {
297 let cp_runtime = tokio::process::Command::new(&self.cli)
298 .arg("cp")
299 .arg(format!("{}/.", code_dir.path().display()))
300 .arg(format!("{}:/var/runtime", container_id))
301 .output()
302 .await
303 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
304
305 if !cp_runtime.status.success() {
306 self.remove_container(&container_id).await;
307 let stderr = String::from_utf8_lossy(&cp_runtime.stderr);
308 return Err(RuntimeError::ContainerStartFailed(format!(
309 "docker cp to /var/runtime failed: {stderr}"
310 )));
311 }
312 }
313
314 if let Err(e) = self.copy_layers_into(&container_id, layers).await {
315 self.remove_container(&container_id).await;
316 return Err(e);
317 }
318
319 let start_result = tokio::process::Command::new(&self.cli)
322 .args(["start", &container_id])
323 .output()
324 .await
325 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
326
327 if !start_result.status.success() {
328 self.remove_container(&container_id).await;
329 let stderr = String::from_utf8_lossy(&start_result.stderr);
330 return Err(RuntimeError::ContainerStartFailed(format!(
331 "docker start failed: {stderr}"
332 )));
333 }
334
335 let port = self.query_host_port(&container_id).await?;
336 self.wait_for_ready(&container_id, port).await?;
337
338 tracing::info!(
339 function = %func.function_name,
340 container_id = %container_id,
341 port = port,
342 runtime = %func.runtime,
343 "Lambda container started"
344 );
345
346 Ok(WarmInstance {
347 endpoint: format!("{}:{port}", self.sibling_host),
348 handle: BackendHandle::Container { id: container_id },
349 })
350 }
351
352 async fn query_host_port(&self, container_id: &str) -> Result<u16, RuntimeError> {
353 let port_output = tokio::process::Command::new(&self.cli)
354 .args(["port", container_id, "8080"])
355 .output()
356 .await
357 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
358 let port_str = String::from_utf8_lossy(&port_output.stdout);
359 port_str
360 .trim()
361 .rsplit(':')
362 .next()
363 .and_then(|p| p.parse().ok())
364 .ok_or_else(|| {
365 RuntimeError::ContainerStartFailed(format!(
366 "could not determine port from: {}",
367 port_str.trim()
368 ))
369 })
370 }
371
372 async fn wait_for_ready(&self, container_id: &str, port: u16) -> Result<(), RuntimeError> {
373 for _ in 0..20 {
374 tokio::time::sleep(Duration::from_millis(500)).await;
375 if tokio::net::TcpStream::connect(format!("{}:{port}", self.sibling_host))
376 .await
377 .is_ok()
378 {
379 return Ok(());
380 }
381 }
382 self.remove_container(container_id).await;
383 Err(RuntimeError::ContainerStartFailed(
384 "container did not become ready within 10 seconds".to_string(),
385 ))
386 }
387
388 async fn copy_layers_into(
395 &self,
396 container_id: &str,
397 layers: &[Vec<u8>],
398 ) -> Result<(), RuntimeError> {
399 if layers.is_empty() {
400 return Ok(());
401 }
402 let layers_dir =
403 TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
404 let layers_path = layers_dir.path().to_path_buf();
405 let layers_owned: Vec<Vec<u8>> = layers.to_vec();
406 tokio::task::spawn_blocking(move || {
407 for bytes in &layers_owned {
408 extract_zip(bytes, &layers_path)?;
409 }
410 Ok::<_, RuntimeError>(())
411 })
412 .await
413 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
414
415 let cp_result = tokio::process::Command::new(&self.cli)
416 .arg("cp")
417 .arg(format!("{}/.", layers_dir.path().display()))
418 .arg(format!("{}:/opt", container_id))
419 .output()
420 .await
421 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
422 if !cp_result.status.success() {
423 let stderr = String::from_utf8_lossy(&cp_result.stderr);
424 return Err(RuntimeError::ContainerStartFailed(format!(
425 "docker cp layers to /opt failed: {stderr}"
426 )));
427 }
428 Ok(())
429 }
430
431 async fn remove_container(&self, container_id: &str) {
433 let _ = tokio::process::Command::new(&self.cli)
434 .args(["rm", "-f", container_id])
435 .output()
436 .await;
437 }
438}
439
440#[async_trait]
441impl LambdaBackend for DockerBackend {
442 fn name(&self) -> &str {
443 &self.cli
444 }
445
446 async fn launch(
447 &self,
448 func: &LambdaFunction,
449 code_zip: Option<&[u8]>,
450 layers: &[Vec<u8>],
451 _deploy_id: &str,
452 ) -> Result<WarmInstance, RuntimeError> {
453 if func.package_type == "Image" {
454 self.start_image_container(func, layers).await
455 } else {
456 let bytes =
457 code_zip.ok_or_else(|| RuntimeError::NoCodeZip(func.function_name.clone()))?;
458 self.start_zip_container(func, bytes, layers).await
459 }
460 }
461
462 async fn terminate(&self, handle: &BackendHandle) {
463 match handle {
464 BackendHandle::Container { id } => self.remove_container(id).await,
465 BackendHandle::Pod { .. } => {}
468 }
469 }
470
471 async fn prepull_image(&self, image: &str) -> Result<(), RuntimeError> {
472 let local_uri = fakecloud_core::ecr_uri::translate_to_local_at(
476 image,
477 &self.sibling_host,
478 self.server_port,
479 );
480 let pull_uri = local_uri.as_deref().unwrap_or(image);
481
482 let mut cmd = tokio::process::Command::new(&self.cli);
483 if let Some(p) = self.docker_config_path() {
484 cmd.env("DOCKER_CONFIG", p);
485 }
486 let out = cmd
487 .args(["pull", pull_uri])
488 .output()
489 .await
490 .map_err(|e| RuntimeError::ContainerStartFailed(format!("docker pull: {e}")))?;
491 if !out.status.success() {
492 return Err(RuntimeError::ContainerStartFailed(format!(
493 "docker pull failed for {pull_uri}: {}",
494 String::from_utf8_lossy(&out.stderr)
495 )));
496 }
497 Ok(())
498 }
499}
500
501pub fn runtime_to_image(runtime: &str) -> Option<String> {
503 let (base, tag) = match runtime {
504 "python3.14" => ("python", "3.14"),
505 "python3.13" => ("python", "3.13"),
506 "python3.12" => ("python", "3.12"),
507 "python3.11" => ("python", "3.11"),
508 "python3.10" => ("python", "3.10"),
509 "python3.9" => ("python", "3.9"),
510 "python3.8" => ("python", "3.8"),
511 "nodejs24.x" => ("nodejs", "24"),
512 "nodejs22.x" => ("nodejs", "22"),
513 "nodejs20.x" => ("nodejs", "20"),
514 "nodejs18.x" => ("nodejs", "18"),
515 "nodejs16.x" => ("nodejs", "16"),
516 "ruby3.4" => ("ruby", "3.4"),
517 "ruby3.3" => ("ruby", "3.3"),
518 "java25" => ("java", "25"),
519 "java21" => ("java", "21"),
520 "java17" => ("java", "17"),
521 "java11" => ("java", "11"),
522 "dotnet10" => ("dotnet", "10"),
523 "dotnet8" => ("dotnet", "8"),
524 "go1.x" => ("go", "1"),
525 "provided.al2023" => ("provided", "al2023"),
526 "provided.al2" => ("provided", "al2"),
527 _ => return None,
528 };
529 Some(format!("public.ecr.aws/lambda/{base}:{tag}"))
530}
531
532pub(crate) fn ephemeral_storage_tmpfs_arg(size: Option<i64>) -> String {
544 let mib = size.unwrap_or(512).max(64);
545 format!("/tmp:size={mib}m,exec")
546}
547
548pub fn extract_zip(zip_bytes: &[u8], dest: &Path) -> Result<(), RuntimeError> {
550 let cursor = std::io::Cursor::new(zip_bytes);
551 let mut archive = zip::ZipArchive::new(cursor)
552 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
553
554 for i in 0..archive.len() {
555 let mut file = archive
556 .by_index(i)
557 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
558
559 let out_path = dest.join(file.enclosed_name().ok_or_else(|| {
560 RuntimeError::ZipExtractionFailed("invalid file name in ZIP".to_string())
561 })?);
562
563 if file.is_dir() {
564 std::fs::create_dir_all(&out_path)
565 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
566 } else {
567 if let Some(parent) = out_path.parent() {
568 std::fs::create_dir_all(parent)
569 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
570 }
571 let mut out_file = std::fs::File::create(&out_path)
572 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
573 std::io::copy(&mut file, &mut out_file)
574 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
575
576 #[cfg(unix)]
577 {
578 use std::os::unix::fs::PermissionsExt;
579 if let Some(mode) = file.unix_mode() {
580 std::fs::set_permissions(&out_path, std::fs::Permissions::from_mode(mode))
581 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
582 }
583 }
584 }
585 }
586 Ok(())
587}
588
589fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
590 let dir = TempDir::new().ok()?;
591 let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-lambda-runtime");
592 let auths: serde_json::Map<String, serde_json::Value> =
598 fakecloud_core::container_net::registry_auth_hosts(server_port)
599 .into_iter()
600 .map(|host| (host, serde_json::json!({ "auth": auth })))
601 .collect();
602 let config = serde_json::json!({ "auths": auths });
603 std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
604 Some(dir)
605}
606
607#[cfg(test)]
608mod tests {
609 use std::io::{Read, Write};
610
611 use super::*;
612
613 #[test]
614 fn test_runtime_to_image() {
615 assert_eq!(
616 runtime_to_image("python3.12"),
617 Some("public.ecr.aws/lambda/python:3.12".to_string())
618 );
619 assert_eq!(
620 runtime_to_image("nodejs20.x"),
621 Some("public.ecr.aws/lambda/nodejs:20".to_string())
622 );
623 assert_eq!(
624 runtime_to_image("provided.al2023"),
625 Some("public.ecr.aws/lambda/provided:al2023".to_string())
626 );
627 assert_eq!(
628 runtime_to_image("ruby3.4"),
629 Some("public.ecr.aws/lambda/ruby:3.4".to_string())
630 );
631 assert_eq!(
632 runtime_to_image("java21"),
633 Some("public.ecr.aws/lambda/java:21".to_string())
634 );
635 assert_eq!(
636 runtime_to_image("dotnet8"),
637 Some("public.ecr.aws/lambda/dotnet:8".to_string())
638 );
639 assert_eq!(
640 runtime_to_image("nodejs16.x"),
641 Some("public.ecr.aws/lambda/nodejs:16".to_string())
642 );
643 assert_eq!(
644 runtime_to_image("python3.10"),
645 Some("public.ecr.aws/lambda/python:3.10".to_string())
646 );
647 assert_eq!(
648 runtime_to_image("python3.9"),
649 Some("public.ecr.aws/lambda/python:3.9".to_string())
650 );
651 assert_eq!(
652 runtime_to_image("python3.8"),
653 Some("public.ecr.aws/lambda/python:3.8".to_string())
654 );
655 assert_eq!(
656 runtime_to_image("java11"),
657 Some("public.ecr.aws/lambda/java:11".to_string())
658 );
659 assert_eq!(
660 runtime_to_image("go1.x"),
661 Some("public.ecr.aws/lambda/go:1".to_string())
662 );
663 assert_eq!(
664 runtime_to_image("nodejs24.x"),
665 Some("public.ecr.aws/lambda/nodejs:24".to_string())
666 );
667 assert_eq!(
668 runtime_to_image("python3.14"),
669 Some("public.ecr.aws/lambda/python:3.14".to_string())
670 );
671 assert_eq!(
672 runtime_to_image("java25"),
673 Some("public.ecr.aws/lambda/java:25".to_string())
674 );
675 assert_eq!(
676 runtime_to_image("dotnet10"),
677 Some("public.ecr.aws/lambda/dotnet:10".to_string())
678 );
679 assert_eq!(runtime_to_image("unknown"), None);
680 }
681
682 #[test]
683 fn test_extract_zip() {
684 let buf = Vec::new();
685 let cursor = std::io::Cursor::new(buf);
686 let mut writer = zip::ZipWriter::new(cursor);
687 let options = zip::write::SimpleFileOptions::default();
688 writer.start_file("handler.py", options).unwrap();
689 writer
690 .write_all(b"def handler(event, context):\n return {'statusCode': 200}\n")
691 .unwrap();
692 let cursor = writer.finish().unwrap();
693 let zip_bytes = cursor.into_inner();
694
695 let dir = TempDir::new().unwrap();
696 extract_zip(&zip_bytes, dir.path()).unwrap();
697
698 let handler_path = dir.path().join("handler.py");
699 assert!(handler_path.exists());
700
701 let mut content = String::new();
702 std::fs::File::open(&handler_path)
703 .unwrap()
704 .read_to_string(&mut content)
705 .unwrap();
706 assert!(content.contains("def handler"));
707 }
708
709 #[test]
710 fn ephemeral_storage_tmpfs_arg_defaults_to_512_when_none() {
711 assert_eq!(ephemeral_storage_tmpfs_arg(None), "/tmp:size=512m,exec");
715 }
716
717 #[test]
718 fn ephemeral_storage_tmpfs_arg_uses_supplied_size() {
719 assert_eq!(
720 ephemeral_storage_tmpfs_arg(Some(2048)),
721 "/tmp:size=2048m,exec"
722 );
723 assert_eq!(
724 ephemeral_storage_tmpfs_arg(Some(10240)),
725 "/tmp:size=10240m,exec"
726 );
727 }
728
729 #[test]
730 fn ephemeral_storage_tmpfs_arg_clamps_to_64_floor() {
731 assert_eq!(ephemeral_storage_tmpfs_arg(Some(0)), "/tmp:size=64m,exec");
735 assert_eq!(ephemeral_storage_tmpfs_arg(Some(32)), "/tmp:size=64m,exec");
736 }
737}