1use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::Duration;
9
10use async_trait::async_trait;
11use base64::Engine;
12use tempfile::TempDir;
13
14use super::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
15use super::env_rewrite::rewrite_localhost_envs;
16use crate::state::LambdaFunction;
17
18pub struct DockerBackend {
20 cli: String,
21 instance_id: String,
22 host_alias: String,
32 add_host_arg: Option<String>,
35 server_port: u16,
39 sibling_host: String,
49 docker_config: Option<Arc<TempDir>>,
53}
54
55impl DockerBackend {
56 pub fn auto_detect(server_port: u16) -> Option<Self> {
61 let cli = fakecloud_core::container_net::detect_container_cli()?;
69 let instance_id = format!("fakecloud-{}", std::process::id());
70 let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
71
72 let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
73 Some(Self {
74 cli,
75 instance_id,
76 host_alias: net.host_alias,
77 add_host_arg: net.add_host_arg,
78 server_port,
79 sibling_host: net.sibling_host,
80 docker_config,
81 })
82 }
83
84 fn apply_host_alias(&self, cmd: &mut tokio::process::Command) {
88 if let Some(arg) = &self.add_host_arg {
89 cmd.arg("--add-host").arg(arg);
90 }
91 }
92
93 fn docker_config_path(&self) -> Option<PathBuf> {
94 self.docker_config.as_ref().map(|d| d.path().to_path_buf())
95 }
96
97 async fn start_image_container(
103 &self,
104 func: &LambdaFunction,
105 layers: &[Vec<u8>],
106 ) -> Result<WarmInstance, RuntimeError> {
107 let image = func.image_uri.as_deref().ok_or_else(|| {
108 RuntimeError::ContainerStartFailed("PackageType=Image function has no ImageUri".into())
109 })?;
110
111 let local_pull_uri = fakecloud_core::ecr_uri::translate_to_local_at(
118 image,
119 &self.sibling_host,
120 self.server_port,
121 );
122 let pull_uri = local_pull_uri.as_deref().unwrap_or(image);
123
124 let mut pull_cmd = tokio::process::Command::new(&self.cli);
125 if let Some(p) = self.docker_config_path() {
126 pull_cmd.env("DOCKER_CONFIG", p);
127 }
128 let pull_out = pull_cmd
129 .args(["pull", pull_uri])
130 .output()
131 .await
132 .map_err(|e| RuntimeError::ContainerStartFailed(format!("docker pull: {e}")))?;
133 if !pull_out.status.success() {
134 return Err(RuntimeError::ContainerStartFailed(format!(
135 "docker pull failed: {}",
136 String::from_utf8_lossy(&pull_out.stderr)
137 )));
138 }
139 let run_image = if let Some(ref local_uri) = local_pull_uri {
144 if fakecloud_core::ecr_uri::is_digest_ref(image) {
145 local_uri.clone()
146 } else {
147 let _ = tokio::process::Command::new(&self.cli)
148 .args(["tag", local_uri, image])
149 .output()
150 .await;
151 image.to_string()
152 }
153 } else {
154 image.to_string()
155 };
156
157 let mut cmd = tokio::process::Command::new(&self.cli);
158 cmd.arg("create")
159 .arg("-p")
160 .arg(":8080")
161 .arg("--label")
162 .arg(format!("fakecloud-lambda={}", func.function_name))
163 .arg("--label")
164 .arg(format!("fakecloud-instance={}", self.instance_id));
165 self.apply_host_alias(&mut cmd);
166
167 for (key, value) in rewrite_localhost_envs(&func.environment, &self.host_alias) {
168 cmd.arg("-e").arg(format!("{key}={value}"));
169 }
170 cmd.arg("-e")
171 .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
172
173 let tmpfs_arg = ephemeral_storage_tmpfs_arg(func.ephemeral_storage_size);
174 cmd.arg("--tmpfs").arg(tmpfs_arg);
175
176 cmd.arg(&run_image);
177
178 let output = cmd
179 .output()
180 .await
181 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
182 if !output.status.success() {
183 return Err(RuntimeError::ContainerStartFailed(
184 String::from_utf8_lossy(&output.stderr).to_string(),
185 ));
186 }
187 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
188
189 if let Err(e) = self.copy_layers_into(&container_id, layers).await {
190 self.remove_container(&container_id).await;
191 return Err(e);
192 }
193
194 let start_result = tokio::process::Command::new(&self.cli)
195 .args(["start", &container_id])
196 .output()
197 .await
198 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
199 if !start_result.status.success() {
200 self.remove_container(&container_id).await;
201 return Err(RuntimeError::ContainerStartFailed(format!(
202 "docker start failed: {}",
203 String::from_utf8_lossy(&start_result.stderr)
204 )));
205 }
206
207 let port = self.query_host_port(&container_id).await?;
208 self.wait_for_ready(&container_id, port).await?;
209
210 tracing::info!(
211 function = %func.function_name,
212 container_id = %container_id,
213 port = port,
214 image = %image,
215 "Lambda image container started"
216 );
217
218 Ok(WarmInstance {
219 endpoint: format!("{}:{port}", self.sibling_host),
220 handle: BackendHandle::Container { id: container_id },
221 })
222 }
223
224 async fn start_zip_container(
225 &self,
226 func: &LambdaFunction,
227 zip_bytes: &[u8],
228 layers: &[Vec<u8>],
229 ) -> Result<WarmInstance, RuntimeError> {
230 let image = runtime_to_image(&func.runtime)
231 .ok_or_else(|| RuntimeError::UnsupportedRuntime(func.runtime.clone()))?;
232
233 let code_dir =
236 TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
237 let zip_bytes = zip_bytes.to_vec();
238 let code_path = code_dir.path().to_path_buf();
239 tokio::task::spawn_blocking(move || extract_zip(&zip_bytes, &code_path))
240 .await
241 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
242
243 let mut cmd = tokio::process::Command::new(&self.cli);
245 cmd.arg("create")
246 .arg("-p")
247 .arg(":8080")
248 .arg("--label")
249 .arg(format!("fakecloud-lambda={}", func.function_name))
250 .arg("--label")
251 .arg(format!("fakecloud-instance={}", self.instance_id));
252 self.apply_host_alias(&mut cmd);
253
254 for (key, value) in rewrite_localhost_envs(&func.environment, &self.host_alias) {
255 cmd.arg("-e").arg(format!("{key}={value}"));
256 }
257
258 cmd.arg("-e")
259 .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
260
261 let tmpfs_arg = ephemeral_storage_tmpfs_arg(func.ephemeral_storage_size);
262 cmd.arg("--tmpfs").arg(tmpfs_arg);
263
264 cmd.arg(&image).arg(&func.handler);
265
266 let output = cmd
267 .output()
268 .await
269 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
270
271 if !output.status.success() {
272 let stderr = String::from_utf8_lossy(&output.stderr);
273 return Err(RuntimeError::ContainerStartFailed(stderr.to_string()));
274 }
275
276 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
277
278 let cp_result = tokio::process::Command::new(&self.cli)
280 .arg("cp")
281 .arg(format!("{}/.", code_dir.path().display()))
282 .arg(format!("{}:/var/task", container_id))
283 .output()
284 .await
285 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
286
287 if !cp_result.status.success() {
288 self.remove_container(&container_id).await;
289 let stderr = String::from_utf8_lossy(&cp_result.stderr);
290 return Err(RuntimeError::ContainerStartFailed(format!(
291 "docker cp failed: {stderr}"
292 )));
293 }
294
295 if func.runtime.starts_with("provided") {
297 let cp_runtime = tokio::process::Command::new(&self.cli)
298 .arg("cp")
299 .arg(format!("{}/.", code_dir.path().display()))
300 .arg(format!("{}:/var/runtime", container_id))
301 .output()
302 .await
303 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
304
305 if !cp_runtime.status.success() {
306 self.remove_container(&container_id).await;
307 let stderr = String::from_utf8_lossy(&cp_runtime.stderr);
308 return Err(RuntimeError::ContainerStartFailed(format!(
309 "docker cp to /var/runtime failed: {stderr}"
310 )));
311 }
312 }
313
314 if let Err(e) = self.copy_layers_into(&container_id, layers).await {
315 self.remove_container(&container_id).await;
316 return Err(e);
317 }
318
319 let start_result = tokio::process::Command::new(&self.cli)
322 .args(["start", &container_id])
323 .output()
324 .await
325 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
326
327 if !start_result.status.success() {
328 self.remove_container(&container_id).await;
329 let stderr = String::from_utf8_lossy(&start_result.stderr);
330 return Err(RuntimeError::ContainerStartFailed(format!(
331 "docker start failed: {stderr}"
332 )));
333 }
334
335 let port = self.query_host_port(&container_id).await?;
336 self.wait_for_ready(&container_id, port).await?;
337
338 tracing::info!(
339 function = %func.function_name,
340 container_id = %container_id,
341 port = port,
342 runtime = %func.runtime,
343 "Lambda container started"
344 );
345
346 Ok(WarmInstance {
347 endpoint: format!("{}:{port}", self.sibling_host),
348 handle: BackendHandle::Container { id: container_id },
349 })
350 }
351
352 async fn query_host_port(&self, container_id: &str) -> Result<u16, RuntimeError> {
353 let port_output = tokio::process::Command::new(&self.cli)
354 .args(["port", container_id, "8080"])
355 .output()
356 .await
357 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
358 let port_str = String::from_utf8_lossy(&port_output.stdout);
359 port_str
360 .trim()
361 .rsplit(':')
362 .next()
363 .and_then(|p| p.parse().ok())
364 .ok_or_else(|| {
365 RuntimeError::ContainerStartFailed(format!(
366 "could not determine port from: {}",
367 port_str.trim()
368 ))
369 })
370 }
371
372 async fn wait_for_ready(&self, container_id: &str, port: u16) -> Result<(), RuntimeError> {
373 for _ in 0..20 {
374 tokio::time::sleep(Duration::from_millis(500)).await;
375 if tokio::net::TcpStream::connect(format!("{}:{port}", self.sibling_host))
376 .await
377 .is_ok()
378 {
379 return Ok(());
380 }
381 }
382 self.remove_container(container_id).await;
383 Err(RuntimeError::ContainerStartFailed(
384 "container did not become ready within 10 seconds".to_string(),
385 ))
386 }
387
388 async fn copy_layers_into(
395 &self,
396 container_id: &str,
397 layers: &[Vec<u8>],
398 ) -> Result<(), RuntimeError> {
399 if layers.is_empty() {
400 return Ok(());
401 }
402 let layers_dir =
403 TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
404 let layers_path = layers_dir.path().to_path_buf();
405 let layers_owned: Vec<Vec<u8>> = layers.to_vec();
406 tokio::task::spawn_blocking(move || {
407 for bytes in &layers_owned {
408 extract_zip(bytes, &layers_path)?;
409 }
410 Ok::<_, RuntimeError>(())
411 })
412 .await
413 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
414
415 let cp_result = tokio::process::Command::new(&self.cli)
416 .arg("cp")
417 .arg(format!("{}/.", layers_dir.path().display()))
418 .arg(format!("{}:/opt", container_id))
419 .output()
420 .await
421 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
422 if !cp_result.status.success() {
423 let stderr = String::from_utf8_lossy(&cp_result.stderr);
424 return Err(RuntimeError::ContainerStartFailed(format!(
425 "docker cp layers to /opt failed: {stderr}"
426 )));
427 }
428 Ok(())
429 }
430
431 async fn remove_container(&self, container_id: &str) {
433 let _ = tokio::process::Command::new(&self.cli)
434 .args(["rm", "-f", container_id])
435 .output()
436 .await;
437 }
438}
439
440#[async_trait]
441impl LambdaBackend for DockerBackend {
442 fn name(&self) -> &str {
443 &self.cli
444 }
445
446 async fn launch(
447 &self,
448 func: &LambdaFunction,
449 code_zip: Option<&[u8]>,
450 layers: &[Vec<u8>],
451 _deploy_id: &str,
452 ) -> Result<WarmInstance, RuntimeError> {
453 if func.package_type == "Image" {
454 self.start_image_container(func, layers).await
455 } else {
456 let bytes =
457 code_zip.ok_or_else(|| RuntimeError::NoCodeZip(func.function_name.clone()))?;
458 self.start_zip_container(func, bytes, layers).await
459 }
460 }
461
462 async fn terminate(&self, handle: &BackendHandle) {
463 match handle {
464 BackendHandle::Container { id } => self.remove_container(id).await,
465 BackendHandle::Pod { .. } => {}
468 }
469 }
470
471 async fn instance_logs(&self, handle: &BackendHandle) -> Option<String> {
472 let BackendHandle::Container { id } = handle else {
473 return None;
474 };
475 let output = tokio::process::Command::new(&self.cli)
479 .args(["logs", "--tail", "200", id])
480 .output()
481 .await
482 .ok()?;
483 let mut combined = String::from_utf8_lossy(&output.stdout).into_owned();
484 combined.push_str(&String::from_utf8_lossy(&output.stderr));
485 if combined.is_empty() {
486 None
487 } else {
488 Some(combined)
489 }
490 }
491
492 async fn prepull_image(&self, image: &str) -> Result<(), RuntimeError> {
493 let local_uri = fakecloud_core::ecr_uri::translate_to_local_at(
497 image,
498 &self.sibling_host,
499 self.server_port,
500 );
501 let pull_uri = local_uri.as_deref().unwrap_or(image);
502
503 let mut cmd = tokio::process::Command::new(&self.cli);
504 if let Some(p) = self.docker_config_path() {
505 cmd.env("DOCKER_CONFIG", p);
506 }
507 let out = cmd
508 .args(["pull", pull_uri])
509 .output()
510 .await
511 .map_err(|e| RuntimeError::ContainerStartFailed(format!("docker pull: {e}")))?;
512 if !out.status.success() {
513 return Err(RuntimeError::ContainerStartFailed(format!(
514 "docker pull failed for {pull_uri}: {}",
515 String::from_utf8_lossy(&out.stderr)
516 )));
517 }
518 Ok(())
519 }
520}
521
522pub fn runtime_to_image(runtime: &str) -> Option<String> {
524 let (base, tag) = match runtime {
525 "python3.14" => ("python", "3.14"),
526 "python3.13" => ("python", "3.13"),
527 "python3.12" => ("python", "3.12"),
528 "python3.11" => ("python", "3.11"),
529 "python3.10" => ("python", "3.10"),
530 "python3.9" => ("python", "3.9"),
531 "python3.8" => ("python", "3.8"),
532 "nodejs24.x" => ("nodejs", "24"),
533 "nodejs22.x" => ("nodejs", "22"),
534 "nodejs20.x" => ("nodejs", "20"),
535 "nodejs18.x" => ("nodejs", "18"),
536 "nodejs16.x" => ("nodejs", "16"),
537 "ruby3.4" => ("ruby", "3.4"),
538 "ruby3.3" => ("ruby", "3.3"),
539 "java25" => ("java", "25"),
540 "java21" => ("java", "21"),
541 "java17" => ("java", "17"),
542 "java11" => ("java", "11"),
543 "dotnet10" => ("dotnet", "10"),
544 "dotnet8" => ("dotnet", "8"),
545 "go1.x" => ("go", "1"),
546 "provided.al2023" => ("provided", "al2023"),
547 "provided.al2" => ("provided", "al2"),
548 _ => return None,
549 };
550 Some(format!("public.ecr.aws/lambda/{base}:{tag}"))
551}
552
553pub(crate) fn ephemeral_storage_tmpfs_arg(size: Option<i64>) -> String {
565 let mib = size.unwrap_or(512).max(64);
566 format!("/tmp:size={mib}m,exec")
567}
568
569pub fn extract_zip(zip_bytes: &[u8], dest: &Path) -> Result<(), RuntimeError> {
571 let cursor = std::io::Cursor::new(zip_bytes);
572 let mut archive = zip::ZipArchive::new(cursor)
573 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
574
575 for i in 0..archive.len() {
576 let mut file = archive
577 .by_index(i)
578 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
579
580 let out_path = dest.join(file.enclosed_name().ok_or_else(|| {
581 RuntimeError::ZipExtractionFailed("invalid file name in ZIP".to_string())
582 })?);
583
584 if file.is_dir() {
585 std::fs::create_dir_all(&out_path)
586 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
587 } else {
588 if let Some(parent) = out_path.parent() {
589 std::fs::create_dir_all(parent)
590 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
591 }
592 let mut out_file = std::fs::File::create(&out_path)
593 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
594 std::io::copy(&mut file, &mut out_file)
595 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
596
597 #[cfg(unix)]
598 {
599 use std::os::unix::fs::PermissionsExt;
600 if let Some(mode) = file.unix_mode() {
601 std::fs::set_permissions(&out_path, std::fs::Permissions::from_mode(mode))
602 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
603 }
604 }
605 }
606 }
607 Ok(())
608}
609
610fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
611 let dir = TempDir::new().ok()?;
612 let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-lambda-runtime");
613 let auths: serde_json::Map<String, serde_json::Value> =
619 fakecloud_core::container_net::registry_auth_hosts(server_port)
620 .into_iter()
621 .map(|host| (host, serde_json::json!({ "auth": auth })))
622 .collect();
623 let config = serde_json::json!({ "auths": auths });
624 std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
625 Some(dir)
626}
627
628#[cfg(test)]
629mod tests {
630 use std::io::{Read, Write};
631
632 use super::*;
633
634 #[test]
635 fn test_runtime_to_image() {
636 assert_eq!(
637 runtime_to_image("python3.12"),
638 Some("public.ecr.aws/lambda/python:3.12".to_string())
639 );
640 assert_eq!(
641 runtime_to_image("nodejs20.x"),
642 Some("public.ecr.aws/lambda/nodejs:20".to_string())
643 );
644 assert_eq!(
645 runtime_to_image("provided.al2023"),
646 Some("public.ecr.aws/lambda/provided:al2023".to_string())
647 );
648 assert_eq!(
649 runtime_to_image("ruby3.4"),
650 Some("public.ecr.aws/lambda/ruby:3.4".to_string())
651 );
652 assert_eq!(
653 runtime_to_image("java21"),
654 Some("public.ecr.aws/lambda/java:21".to_string())
655 );
656 assert_eq!(
657 runtime_to_image("dotnet8"),
658 Some("public.ecr.aws/lambda/dotnet:8".to_string())
659 );
660 assert_eq!(
661 runtime_to_image("nodejs16.x"),
662 Some("public.ecr.aws/lambda/nodejs:16".to_string())
663 );
664 assert_eq!(
665 runtime_to_image("python3.10"),
666 Some("public.ecr.aws/lambda/python:3.10".to_string())
667 );
668 assert_eq!(
669 runtime_to_image("python3.9"),
670 Some("public.ecr.aws/lambda/python:3.9".to_string())
671 );
672 assert_eq!(
673 runtime_to_image("python3.8"),
674 Some("public.ecr.aws/lambda/python:3.8".to_string())
675 );
676 assert_eq!(
677 runtime_to_image("java11"),
678 Some("public.ecr.aws/lambda/java:11".to_string())
679 );
680 assert_eq!(
681 runtime_to_image("go1.x"),
682 Some("public.ecr.aws/lambda/go:1".to_string())
683 );
684 assert_eq!(
685 runtime_to_image("nodejs24.x"),
686 Some("public.ecr.aws/lambda/nodejs:24".to_string())
687 );
688 assert_eq!(
689 runtime_to_image("python3.14"),
690 Some("public.ecr.aws/lambda/python:3.14".to_string())
691 );
692 assert_eq!(
693 runtime_to_image("java25"),
694 Some("public.ecr.aws/lambda/java:25".to_string())
695 );
696 assert_eq!(
697 runtime_to_image("dotnet10"),
698 Some("public.ecr.aws/lambda/dotnet:10".to_string())
699 );
700 assert_eq!(runtime_to_image("unknown"), None);
701 }
702
703 #[test]
704 fn test_extract_zip() {
705 let buf = Vec::new();
706 let cursor = std::io::Cursor::new(buf);
707 let mut writer = zip::ZipWriter::new(cursor);
708 let options = zip::write::SimpleFileOptions::default();
709 writer.start_file("handler.py", options).unwrap();
710 writer
711 .write_all(b"def handler(event, context):\n return {'statusCode': 200}\n")
712 .unwrap();
713 let cursor = writer.finish().unwrap();
714 let zip_bytes = cursor.into_inner();
715
716 let dir = TempDir::new().unwrap();
717 extract_zip(&zip_bytes, dir.path()).unwrap();
718
719 let handler_path = dir.path().join("handler.py");
720 assert!(handler_path.exists());
721
722 let mut content = String::new();
723 std::fs::File::open(&handler_path)
724 .unwrap()
725 .read_to_string(&mut content)
726 .unwrap();
727 assert!(content.contains("def handler"));
728 }
729
730 #[test]
731 fn ephemeral_storage_tmpfs_arg_defaults_to_512_when_none() {
732 assert_eq!(ephemeral_storage_tmpfs_arg(None), "/tmp:size=512m,exec");
736 }
737
738 #[test]
739 fn ephemeral_storage_tmpfs_arg_uses_supplied_size() {
740 assert_eq!(
741 ephemeral_storage_tmpfs_arg(Some(2048)),
742 "/tmp:size=2048m,exec"
743 );
744 assert_eq!(
745 ephemeral_storage_tmpfs_arg(Some(10240)),
746 "/tmp:size=10240m,exec"
747 );
748 }
749
750 #[test]
751 fn ephemeral_storage_tmpfs_arg_clamps_to_64_floor() {
752 assert_eq!(ephemeral_storage_tmpfs_arg(Some(0)), "/tmp:size=64m,exec");
756 assert_eq!(ephemeral_storage_tmpfs_arg(Some(32)), "/tmp:size=64m,exec");
757 }
758}