Skip to main content

fakecloud_lambda/
runtime.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use base64::Engine;
7use parking_lot::RwLock;
8use sha2::{Digest, Sha256};
9use tempfile::TempDir;
10
11use crate::state::LambdaFunction;
12
13/// A running container kept warm for reuse.
14struct WarmContainer {
15    container_id: String,
16    host_port: u16,
17    last_used: RwLock<Instant>,
18    /// Combined fingerprint of the function's code SHA-256 plus the
19    /// SHA-256 of every attached layer's ZIP bytes, joined in attach
20    /// order. Layers mutate `/opt`, so a layer change invalidates the
21    /// warm container even when the function code is unchanged.
22    deploy_id: String,
23}
24
25/// Compute the warm-container key for a function with its current layer
26/// set. Stable across calls — layer ARNs are immutable in AWS, so the
27/// hash of their bytes is the right cache key.
28fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
29    let mut hasher = Sha256::new();
30    hasher.update(func.code_sha256.as_bytes());
31    for bytes in layers {
32        let mut layer_hasher = Sha256::new();
33        layer_hasher.update(bytes);
34        hasher.update(b":");
35        hasher.update(layer_hasher.finalize());
36    }
37    base64::Engine::encode(
38        &base64::engine::general_purpose::STANDARD,
39        hasher.finalize(),
40    )
41}
42
43/// Docker/Podman-based Lambda execution engine.
44pub struct ContainerRuntime {
45    cli: String,
46    containers: RwLock<HashMap<String, WarmContainer>>,
47    /// Serializes container startup per function to prevent duplicate containers.
48    starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
49    instance_id: String,
50    /// IP address that containers should use to reach the host
51    host_ip: String,
52    /// Port the main fakecloud server bound to. Used to translate AWS
53    /// private-ECR URIs in `PackageType=Image` functions to fakecloud's
54    /// local OCI v2 registry.
55    server_port: u16,
56    /// Isolated DOCKER_CONFIG dir with Basic auth for `127.0.0.1:<port>`.
57    /// Lets `docker pull` talk to fakecloud ECR without mutating the user's
58    /// `~/.docker/config.json`.
59    docker_config: Option<Arc<TempDir>>,
60}
61
62/// Wrapper around an in-flight streaming invocation. Yields raw body
63/// chunks via [`Self::next_chunk`] until the RIE closes the response,
64/// at which point the final `Ok(None)` signals the caller to emit the
65/// terminal `InvokeComplete` frame.
66pub struct StreamingInvocation {
67    resp: reqwest::Response,
68}
69
70impl StreamingInvocation {
71    /// Read the next chunk of the function's response body. Returns
72    /// `Ok(None)` once the RIE has finished streaming. Buffered
73    /// handlers tend to deliver a single chunk; streaming handlers
74    /// deliver one chunk per `responseStream.write(...)` call.
75    pub async fn next_chunk(&mut self) -> Result<Option<bytes::Bytes>, RuntimeError> {
76        match self.resp.chunk().await {
77            Ok(Some(b)) => Ok(Some(b)),
78            Ok(None) => Ok(None),
79            Err(e) => Err(RuntimeError::InvocationFailed(e.to_string())),
80        }
81    }
82}
83
84#[derive(Debug, thiserror::Error)]
85pub enum RuntimeError {
86    #[error("no code ZIP provided for function {0}")]
87    NoCodeZip(String),
88    #[error("unsupported runtime: {0}")]
89    UnsupportedRuntime(String),
90    #[error("container failed to start: {0}")]
91    ContainerStartFailed(String),
92    #[error("invocation failed: {0}")]
93    InvocationFailed(String),
94    #[error("ZIP extraction failed: {0}")]
95    ZipExtractionFailed(String),
96}
97
98impl ContainerRuntime {
99    /// Auto-detect Docker or Podman. Returns `None` if neither is available.
100    /// Override with `FAKECLOUD_CONTAINER_CLI` env var.
101    /// `server_port` is the port the main fakecloud server bound to; used
102    /// to resolve `PackageType=Image` ECR URIs against fakecloud ECR.
103    pub fn new(server_port: u16) -> Option<Self> {
104        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
105            // Verify the configured CLI works
106            if std::process::Command::new(&cli)
107                .arg("info")
108                .stdout(std::process::Stdio::null())
109                .stderr(std::process::Stdio::null())
110                .status()
111                .map(|s| s.success())
112                .unwrap_or(false)
113            {
114                cli
115            } else {
116                return None;
117            }
118        } else if is_cli_available("docker") {
119            "docker".to_string()
120        } else if is_cli_available("podman") {
121            "podman".to_string()
122        } else {
123            return None;
124        };
125
126        let instance_id = format!("fakecloud-{}", std::process::id());
127
128        // Detect the appropriate host address for containers
129        // On Linux, use the bridge gateway IP directly (more reliable)
130        // On Mac/Windows, use host-gateway which Docker Desktop handles
131        let host_ip = if cfg!(target_os = "linux") {
132            detect_bridge_gateway(&cli).unwrap_or_else(|| "172.17.0.1".to_string())
133        } else {
134            "host-gateway".to_string()
135        };
136
137        let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
138        Some(Self {
139            cli,
140            containers: RwLock::new(HashMap::new()),
141            starting: RwLock::new(HashMap::new()),
142            instance_id,
143            host_ip,
144            server_port,
145            docker_config,
146        })
147    }
148
149    fn docker_config_path(&self) -> Option<PathBuf> {
150        self.docker_config.as_ref().map(|d| d.path().to_path_buf())
151    }
152
153    pub fn cli_name(&self) -> &str {
154        &self.cli
155    }
156
157    /// Invoke a Lambda function, starting a container if needed. Layer
158    /// ZIPs are extracted into `/opt` of the runtime sandbox; AWS base
159    /// images already include `/opt/python`, `/opt/nodejs/node_modules`,
160    /// `/opt/lib`, and `/opt/bin` on the right import paths.
161    pub async fn invoke(
162        &self,
163        func: &LambdaFunction,
164        payload: &[u8],
165        layers: &[Vec<u8>],
166    ) -> Result<Vec<u8>, RuntimeError> {
167        let port = self.ensure_warm_container(func, layers).await?;
168
169        // POST to the RIE endpoint
170        let url = format!(
171            "http://localhost:{}/2015-03-31/functions/function/invocations",
172            port
173        );
174        let client = reqwest::Client::new();
175        let resp = client
176            .post(&url)
177            .body(payload.to_vec())
178            .timeout(Duration::from_secs(func.timeout as u64 + 5))
179            .send()
180            .await
181            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
182
183        let body = resp
184            .bytes()
185            .await
186            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
187
188        Ok(body.to_vec())
189    }
190
191    /// Invoke a Lambda function and yield the raw HTTP body as a stream
192    /// of byte chunks. Each chunk corresponds to one HTTP frame the RIE
193    /// flushed to the wire — for streaming-aware handlers (Node.js
194    /// `awslambda.streamifyResponse`, Python streaming response, custom
195    /// runtimes that flush mid-handler) this preserves the chunk
196    /// boundaries the function emitted. Buffered handlers come back as
197    /// a single chunk, which is still a valid streamed response.
198    pub async fn invoke_streaming(
199        &self,
200        func: &LambdaFunction,
201        payload: &[u8],
202        layers: &[Vec<u8>],
203    ) -> Result<StreamingInvocation, RuntimeError> {
204        let port = self.ensure_warm_container(func, layers).await?;
205
206        let url = format!(
207            "http://localhost:{}/2015-03-31/functions/function/invocations",
208            port
209        );
210        let client = reqwest::Client::new();
211        let resp = client
212            .post(&url)
213            .body(payload.to_vec())
214            .timeout(Duration::from_secs(func.timeout as u64 + 5))
215            .send()
216            .await
217            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
218
219        Ok(StreamingInvocation { resp })
220    }
221
222    /// Resolve a warm container for `func`, starting one if its
223    /// fingerprint doesn't match (or there isn't one yet). Returns the
224    /// host port the RIE is bound to. Shared by `invoke` and
225    /// `invoke_streaming` so both paths use the same warm-pool logic.
226    async fn ensure_warm_container(
227        &self,
228        func: &LambdaFunction,
229        layers: &[Vec<u8>],
230    ) -> Result<u16, RuntimeError> {
231        // Zip-based functions need code bytes; image-based functions have
232        // everything baked into the image. Defer the zip check until we
233        // know we need to start a fresh container.
234        let is_image = func.package_type == "Image";
235        if !is_image && func.code_zip.is_none() {
236            return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
237        }
238
239        let deploy_id = deploy_id_for(func, layers);
240
241        // Check for warm container with matching deploy fingerprint
242        let port = {
243            let containers = self.containers.read();
244            if let Some(container) = containers.get(&func.function_name) {
245                if container.deploy_id == deploy_id {
246                    *container.last_used.write() = Instant::now();
247                    Some(container.host_port)
248                } else {
249                    None
250                }
251            } else {
252                None
253            }
254        };
255
256        if let Some(p) = port {
257            return Ok(p);
258        }
259
260        // Serialize container startup per function to prevent duplicates
261        let startup_lock = {
262            let mut starting = self.starting.write();
263            starting
264                .entry(func.function_name.clone())
265                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
266                .clone()
267        };
268        let _guard = startup_lock.lock().await;
269
270        // Re-check after acquiring lock — another task may have started it
271        let existing_port = {
272            let containers = self.containers.read();
273            containers
274                .get(&func.function_name)
275                .filter(|c| c.deploy_id == deploy_id)
276                .map(|c| {
277                    *c.last_used.write() = Instant::now();
278                    c.host_port
279                })
280        };
281        if let Some(p) = existing_port {
282            return Ok(p);
283        }
284
285        self.stop_container(&func.function_name).await;
286        let container = if is_image {
287            self.start_image_container(func, layers, &deploy_id).await?
288        } else {
289            let zip_bytes = func
290                .code_zip
291                .as_ref()
292                .ok_or_else(|| RuntimeError::NoCodeZip(func.function_name.clone()))?;
293            self.start_container(func, zip_bytes, layers, &deploy_id)
294                .await?
295        };
296        let p = container.host_port;
297        self.containers
298            .write()
299            .insert(func.function_name.clone(), container);
300        Ok(p)
301    }
302
303    /// Start a container for a `PackageType=Image` function. The image is
304    /// expected to already embed the Runtime Interface Emulator (RIE) or
305    /// an equivalent, exposing port 8080 — that's the AWS convention for
306    /// container-based Lambda. AWS private-ECR URIs get translated to
307    /// fakecloud's local OCI v2 registry and retagged so the container
308    /// reports its user-visible image name.
309    async fn start_image_container(
310        &self,
311        func: &LambdaFunction,
312        layers: &[Vec<u8>],
313        deploy_id: &str,
314    ) -> Result<WarmContainer, RuntimeError> {
315        let image = func.image_uri.as_deref().ok_or_else(|| {
316            RuntimeError::ContainerStartFailed("PackageType=Image function has no ImageUri".into())
317        })?;
318
319        // Translate AWS private-ECR URIs to fakecloud ECR's local endpoint.
320        let local_pull_uri = fakecloud_core::ecr_uri::translate_to_local(image, self.server_port);
321        let pull_uri = local_pull_uri.as_deref().unwrap_or(image);
322
323        let mut pull_cmd = tokio::process::Command::new(&self.cli);
324        if let Some(p) = self.docker_config_path() {
325            pull_cmd.env("DOCKER_CONFIG", p);
326        }
327        let pull_out = pull_cmd
328            .args(["pull", pull_uri])
329            .output()
330            .await
331            .map_err(|e| RuntimeError::ContainerStartFailed(format!("docker pull: {e}")))?;
332        if !pull_out.status.success() {
333            return Err(RuntimeError::ContainerStartFailed(format!(
334                "docker pull failed: {}",
335                String::from_utf8_lossy(&pull_out.stderr)
336            )));
337        }
338        // Retag the local pull URI to the AWS URI so `docker create`
339        // finds the image under the user-visible name. Digest-pinned
340        // refs can't be `docker tag` targets, so fall through and
341        // create under the local URI instead.
342        let run_image = if let Some(ref local_uri) = local_pull_uri {
343            if fakecloud_core::ecr_uri::is_digest_ref(image) {
344                local_uri.clone()
345            } else {
346                let _ = tokio::process::Command::new(&self.cli)
347                    .args(["tag", local_uri, image])
348                    .output()
349                    .await;
350                image.to_string()
351            }
352        } else {
353            image.to_string()
354        };
355
356        let mut cmd = tokio::process::Command::new(&self.cli);
357        cmd.arg("create")
358            .arg("-p")
359            .arg(":8080")
360            .arg("--label")
361            .arg(format!("fakecloud-lambda={}", func.function_name))
362            .arg("--label")
363            .arg(format!("fakecloud-instance={}", self.instance_id))
364            .arg("--add-host")
365            .arg(format!("host.docker.internal:{}", self.host_ip));
366
367        for (key, value) in &func.environment {
368            let transformed_value = value
369                .replace("http://127.0.0.1:", "http://host.docker.internal:")
370                .replace("https://127.0.0.1:", "https://host.docker.internal:")
371                .replace("http://localhost:", "http://host.docker.internal:")
372                .replace("https://localhost:", "https://host.docker.internal:");
373            cmd.arg("-e").arg(format!("{}={}", key, transformed_value));
374        }
375        cmd.arg("-e")
376            .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
377
378        // EphemeralStorage.Size (MiB) maps to a tmpfs at /tmp so
379        // function code that writes there hits the configured limit
380        // instead of the docker default. Default 512 MiB matches AWS.
381        // `exec` matches AWS Lambda's /tmp behavior (binaries unpacked
382        // there can be invoked); the default `noexec` would break that.
383        let tmpfs_arg = ephemeral_storage_tmpfs_arg(func.ephemeral_storage_size);
384        cmd.arg("--tmpfs").arg(tmpfs_arg);
385
386        cmd.arg(&run_image);
387
388        let output = cmd
389            .output()
390            .await
391            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
392        if !output.status.success() {
393            return Err(RuntimeError::ContainerStartFailed(
394                String::from_utf8_lossy(&output.stderr).to_string(),
395            ));
396        }
397        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
398
399        if let Err(e) = self.copy_layers_into(&container_id, layers).await {
400            let _ = self.remove_container(&container_id).await;
401            return Err(e);
402        }
403
404        let start_result = tokio::process::Command::new(&self.cli)
405            .args(["start", &container_id])
406            .output()
407            .await
408            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
409        if !start_result.status.success() {
410            let _ = self.remove_container(&container_id).await;
411            return Err(RuntimeError::ContainerStartFailed(format!(
412                "docker start failed: {}",
413                String::from_utf8_lossy(&start_result.stderr)
414            )));
415        }
416
417        let port_output = tokio::process::Command::new(&self.cli)
418            .args(["port", &container_id, "8080"])
419            .output()
420            .await
421            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
422        let port_str = String::from_utf8_lossy(&port_output.stdout);
423        let port: u16 = port_str
424            .trim()
425            .rsplit(':')
426            .next()
427            .and_then(|p| p.parse().ok())
428            .ok_or_else(|| {
429                RuntimeError::ContainerStartFailed(format!(
430                    "could not determine port from: {}",
431                    port_str.trim()
432                ))
433            })?;
434
435        let mut ready = false;
436        for _ in 0..20 {
437            tokio::time::sleep(Duration::from_millis(500)).await;
438            if tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
439                .await
440                .is_ok()
441            {
442                ready = true;
443                break;
444            }
445        }
446        if !ready {
447            let _ = self.remove_container(&container_id).await;
448            return Err(RuntimeError::ContainerStartFailed(
449                "container did not become ready within 10 seconds".to_string(),
450            ));
451        }
452
453        tracing::info!(
454            function = %func.function_name,
455            container_id = %container_id,
456            port = port,
457            image = %image,
458            "Lambda image container started"
459        );
460
461        Ok(WarmContainer {
462            container_id,
463            host_port: port,
464            last_used: RwLock::new(Instant::now()),
465            deploy_id: deploy_id.to_string(),
466        })
467    }
468
469    async fn start_container(
470        &self,
471        func: &LambdaFunction,
472        zip_bytes: &[u8],
473        layers: &[Vec<u8>],
474        deploy_id: &str,
475    ) -> Result<WarmContainer, RuntimeError> {
476        let image = runtime_to_image(&func.runtime)
477            .ok_or_else(|| RuntimeError::UnsupportedRuntime(func.runtime.clone()))?;
478
479        // Extract ZIP to a temp directory (only needed during container setup).
480        // Run in spawn_blocking to avoid blocking the async runtime with fs I/O.
481        let code_dir =
482            TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
483        let zip_bytes = zip_bytes.to_vec();
484        let code_path = code_dir.path().to_path_buf();
485        tokio::task::spawn_blocking(move || extract_zip(&zip_bytes, &code_path))
486            .await
487            .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
488
489        // Step 1: docker create (no volume mounts — works in Docker-in-Docker)
490        let mut cmd = tokio::process::Command::new(&self.cli);
491        cmd.arg("create")
492            .arg("-p")
493            .arg(":8080")
494            .arg("--label")
495            .arg(format!("fakecloud-lambda={}", func.function_name))
496            .arg("--label")
497            .arg(format!("fakecloud-instance={}", self.instance_id))
498            // Map host.docker.internal to the detected host IP (bridge gateway on Linux, or explicit IP)
499            .arg("--add-host")
500            .arg(format!("host.docker.internal:{}", self.host_ip));
501
502        for (key, value) in &func.environment {
503            // Transform localhost URLs to use host.docker.internal, which we've set up via --add-host
504            let transformed_value = value
505                .replace("http://127.0.0.1:", "http://host.docker.internal:")
506                .replace("https://127.0.0.1:", "https://host.docker.internal:")
507                .replace("http://localhost:", "http://host.docker.internal:")
508                .replace("https://localhost:", "https://host.docker.internal:");
509            cmd.arg("-e").arg(format!("{}={}", key, transformed_value));
510        }
511
512        cmd.arg("-e")
513            .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
514
515        // EphemeralStorage.Size (MiB) maps to a tmpfs at /tmp so
516        // function code that writes there hits the configured limit
517        // instead of the docker default. Default 512 MiB matches AWS.
518        // `exec` matches AWS Lambda's /tmp behavior (binaries unpacked
519        // there can be invoked); the default `noexec` would break that.
520        let tmpfs_arg = ephemeral_storage_tmpfs_arg(func.ephemeral_storage_size);
521        cmd.arg("--tmpfs").arg(tmpfs_arg);
522
523        cmd.arg(&image).arg(&func.handler);
524
525        let output = cmd
526            .output()
527            .await
528            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
529
530        if !output.status.success() {
531            let stderr = String::from_utf8_lossy(&output.stderr);
532            return Err(RuntimeError::ContainerStartFailed(stderr.to_string()));
533        }
534
535        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
536
537        // Step 2: docker cp — copy code into the container
538        let cp_result = tokio::process::Command::new(&self.cli)
539            .arg("cp")
540            .arg(format!("{}/.", code_dir.path().display()))
541            .arg(format!("{}:/var/task", container_id))
542            .output()
543            .await
544            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
545
546        if !cp_result.status.success() {
547            let _ = self.remove_container(&container_id).await;
548            let stderr = String::from_utf8_lossy(&cp_result.stderr);
549            return Err(RuntimeError::ContainerStartFailed(format!(
550                "docker cp failed: {}",
551                stderr
552            )));
553        }
554
555        // For provided/custom runtimes, also copy to /var/runtime
556        if func.runtime.starts_with("provided") {
557            let cp_runtime = tokio::process::Command::new(&self.cli)
558                .arg("cp")
559                .arg(format!("{}/.", code_dir.path().display()))
560                .arg(format!("{}:/var/runtime", container_id))
561                .output()
562                .await
563                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
564
565            if !cp_runtime.status.success() {
566                let _ = self.remove_container(&container_id).await;
567                let stderr = String::from_utf8_lossy(&cp_runtime.stderr);
568                return Err(RuntimeError::ContainerStartFailed(format!(
569                    "docker cp to /var/runtime failed: {}",
570                    stderr
571                )));
572            }
573        }
574
575        if let Err(e) = self.copy_layers_into(&container_id, layers).await {
576            let _ = self.remove_container(&container_id).await;
577            return Err(e);
578        }
579
580        // TempDir is dropped here — code now lives inside the container
581
582        // Step 3: docker start
583        let start_result = tokio::process::Command::new(&self.cli)
584            .args(["start", &container_id])
585            .output()
586            .await
587            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
588
589        if !start_result.status.success() {
590            let _ = self.remove_container(&container_id).await;
591            let stderr = String::from_utf8_lossy(&start_result.stderr);
592            return Err(RuntimeError::ContainerStartFailed(format!(
593                "docker start failed: {}",
594                stderr
595            )));
596        }
597
598        // Query the actual assigned port
599        let port_output = tokio::process::Command::new(&self.cli)
600            .args(["port", &container_id, "8080"])
601            .output()
602            .await
603            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
604
605        let port_str = String::from_utf8_lossy(&port_output.stdout);
606        let port: u16 = port_str
607            .trim()
608            .rsplit(':')
609            .next()
610            .and_then(|p| p.parse().ok())
611            .ok_or_else(|| {
612                RuntimeError::ContainerStartFailed(format!(
613                    "could not determine port from: {}",
614                    port_str.trim()
615                ))
616            })?;
617
618        // Wait for RIE to start accepting connections
619        let mut ready = false;
620        for _ in 0..20 {
621            tokio::time::sleep(Duration::from_millis(500)).await;
622            if tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
623                .await
624                .is_ok()
625            {
626                ready = true;
627                break;
628            }
629        }
630
631        if !ready {
632            let _ = self.remove_container(&container_id).await;
633            return Err(RuntimeError::ContainerStartFailed(
634                "container did not become ready within 10 seconds".to_string(),
635            ));
636        }
637
638        tracing::info!(
639            function = %func.function_name,
640            container_id = %container_id,
641            port = port,
642            runtime = %func.runtime,
643            "Lambda container started"
644        );
645
646        Ok(WarmContainer {
647            container_id,
648            host_port: port,
649            last_used: RwLock::new(Instant::now()),
650            deploy_id: deploy_id.to_string(),
651        })
652    }
653
654    /// Extract each layer ZIP into a shared temp directory and `docker cp`
655    /// it into `/opt/` of the target container. Layer ZIPs include
656    /// language-specific subpaths (`python/`, `nodejs/`, `java/`, `lib/`,
657    /// `bin/`) that AWS base images already wire onto the runtime's
658    /// import paths, so plain extraction at the temp root produces the
659    /// correct on-disk layout. Empty `layers` is a no-op.
660    async fn copy_layers_into(
661        &self,
662        container_id: &str,
663        layers: &[Vec<u8>],
664    ) -> Result<(), RuntimeError> {
665        if layers.is_empty() {
666            return Ok(());
667        }
668        let layers_dir =
669            TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
670        let layers_path = layers_dir.path().to_path_buf();
671        let layers_owned: Vec<Vec<u8>> = layers.to_vec();
672        tokio::task::spawn_blocking(move || {
673            for bytes in &layers_owned {
674                extract_zip(bytes, &layers_path)?;
675            }
676            Ok::<_, RuntimeError>(())
677        })
678        .await
679        .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
680
681        let cp_result = tokio::process::Command::new(&self.cli)
682            .arg("cp")
683            .arg(format!("{}/.", layers_dir.path().display()))
684            .arg(format!("{}:/opt", container_id))
685            .output()
686            .await
687            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
688        if !cp_result.status.success() {
689            let stderr = String::from_utf8_lossy(&cp_result.stderr);
690            return Err(RuntimeError::ContainerStartFailed(format!(
691                "docker cp layers to /opt failed: {stderr}"
692            )));
693        }
694        Ok(())
695    }
696
697    /// Remove a container (stop + rm, since we don't use --rm with docker create).
698    async fn remove_container(&self, container_id: &str) {
699        let _ = tokio::process::Command::new(&self.cli)
700            .args(["rm", "-f", container_id])
701            .output()
702            .await;
703    }
704
705    /// Stop and remove a container for a specific function.
706    pub async fn stop_container(&self, function_name: &str) {
707        let container = self.containers.write().remove(function_name);
708        if let Some(container) = container {
709            tracing::info!(
710                function = %function_name,
711                container_id = %container.container_id,
712                "stopping Lambda container"
713            );
714            self.remove_container(&container.container_id).await;
715        }
716    }
717
718    /// Stop and remove all containers (used on server shutdown or reset).
719    pub async fn stop_all(&self) {
720        let containers: Vec<(String, String)> = {
721            let mut map = self.containers.write();
722            map.drain()
723                .map(|(name, c)| (name, c.container_id))
724                .collect()
725        };
726        for (name, container_id) in containers {
727            tracing::info!(
728                function = %name,
729                container_id = %container_id,
730                "stopping Lambda container (cleanup)"
731            );
732            self.remove_container(&container_id).await;
733        }
734    }
735
736    /// List all warm containers and their metadata for introspection.
737    pub fn list_warm_containers(
738        &self,
739        lambda_state: &crate::state::SharedLambdaState,
740    ) -> Vec<serde_json::Value> {
741        let containers = self.containers.read();
742        let accounts = lambda_state.read();
743        containers
744            .iter()
745            .map(|(name, container)| {
746                let runtime = accounts
747                    .iter()
748                    .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
749                    .unwrap_or_default();
750                let last_used = container.last_used.read();
751                let idle_secs = last_used.elapsed().as_secs();
752                serde_json::json!({
753                    "functionName": name,
754                    "runtime": runtime,
755                    "containerId": container.container_id,
756                    "lastUsedSecsAgo": idle_secs,
757                })
758            })
759            .collect()
760    }
761
762    /// Evict (stop and remove) the warm container for a specific function.
763    /// Returns true if a container was found and evicted.
764    pub async fn evict_container(&self, function_name: &str) -> bool {
765        let container = self.containers.write().remove(function_name);
766        if let Some(container) = container {
767            tracing::info!(
768                function = %function_name,
769                container_id = %container.container_id,
770                "evicting Lambda container via simulation API"
771            );
772            self.remove_container(&container.container_id).await;
773            true
774        } else {
775            false
776        }
777    }
778
779    /// Background loop that stops containers idle longer than `ttl`.
780    pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
781        let mut interval = tokio::time::interval(Duration::from_secs(30));
782        loop {
783            interval.tick().await;
784            self.cleanup_idle(ttl).await;
785        }
786    }
787
788    async fn cleanup_idle(&self, ttl: Duration) {
789        let expired: Vec<String> = {
790            let containers = self.containers.read();
791            containers
792                .iter()
793                .filter(|(_, c)| c.last_used.read().elapsed() > ttl)
794                .map(|(name, _)| name.clone())
795                .collect()
796        };
797        for name in expired {
798            tracing::info!(function = %name, "stopping idle Lambda container");
799            self.stop_container(&name).await;
800        }
801    }
802}
803
804/// Map AWS runtime identifier to a Docker image tag.
805pub fn runtime_to_image(runtime: &str) -> Option<String> {
806    let (base, tag) = match runtime {
807        "python3.14" => ("python", "3.14"),
808        "python3.13" => ("python", "3.13"),
809        "python3.12" => ("python", "3.12"),
810        "python3.11" => ("python", "3.11"),
811        "python3.10" => ("python", "3.10"),
812        "python3.9" => ("python", "3.9"),
813        "python3.8" => ("python", "3.8"),
814        "nodejs24.x" => ("nodejs", "24"),
815        "nodejs22.x" => ("nodejs", "22"),
816        "nodejs20.x" => ("nodejs", "20"),
817        "nodejs18.x" => ("nodejs", "18"),
818        "nodejs16.x" => ("nodejs", "16"),
819        "ruby3.4" => ("ruby", "3.4"),
820        "ruby3.3" => ("ruby", "3.3"),
821        "java25" => ("java", "25"),
822        "java21" => ("java", "21"),
823        "java17" => ("java", "17"),
824        "java11" => ("java", "11"),
825        "dotnet10" => ("dotnet", "10"),
826        "dotnet8" => ("dotnet", "8"),
827        "go1.x" => ("go", "1"),
828        "provided.al2023" => ("provided", "al2023"),
829        "provided.al2" => ("provided", "al2"),
830        _ => return None,
831    };
832    Some(format!("public.ecr.aws/lambda/{}:{}", base, tag))
833}
834
835/// Build the `--tmpfs` argument string used by `docker create` so that
836/// `/tmp` inside the container is sized to the function's
837/// `EphemeralStorage.Size`. Pure helper extracted from the container
838/// boot path so unit tests can verify the flag without spawning Docker.
839///
840/// Defaults to AWS's 512 MiB when `size` is `None`, and clamps to a 64
841/// MiB minimum so legacy snapshots that smuggled in absurd values still
842/// produce a tmpfs Docker accepts. The `exec` mount option matches AWS
843/// Lambda's `/tmp` behavior — handlers that unpack and run binaries
844/// from `/tmp` would otherwise hit `EACCES` against Docker's default
845/// `noexec` tmpfs.
846pub(crate) fn ephemeral_storage_tmpfs_arg(size: Option<i64>) -> String {
847    let mib = size.unwrap_or(512).max(64);
848    format!("/tmp:size={mib}m,exec")
849}
850
851/// Extract a ZIP archive to a destination directory.
852pub fn extract_zip(zip_bytes: &[u8], dest: &Path) -> Result<(), RuntimeError> {
853    let cursor = std::io::Cursor::new(zip_bytes);
854    let mut archive = zip::ZipArchive::new(cursor)
855        .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
856
857    for i in 0..archive.len() {
858        let mut file = archive
859            .by_index(i)
860            .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
861
862        let out_path = dest.join(file.enclosed_name().ok_or_else(|| {
863            RuntimeError::ZipExtractionFailed("invalid file name in ZIP".to_string())
864        })?);
865
866        if file.is_dir() {
867            std::fs::create_dir_all(&out_path)
868                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
869        } else {
870            if let Some(parent) = out_path.parent() {
871                std::fs::create_dir_all(parent)
872                    .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
873            }
874            let mut out_file = std::fs::File::create(&out_path)
875                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
876            std::io::copy(&mut file, &mut out_file)
877                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
878
879            // Preserve executable permissions
880            #[cfg(unix)]
881            {
882                use std::os::unix::fs::PermissionsExt;
883                if let Some(mode) = file.unix_mode() {
884                    std::fs::set_permissions(&out_path, std::fs::Permissions::from_mode(mode))
885                        .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
886                }
887            }
888        }
889    }
890    Ok(())
891}
892
893/// Detect the Docker bridge gateway IP on Linux.
894/// Returns None if detection fails.
895fn detect_bridge_gateway(cli: &str) -> Option<String> {
896    let output = std::process::Command::new(cli)
897        .args([
898            "network",
899            "inspect",
900            "bridge",
901            "--format",
902            "{{range .IPAM.Config}}{{.Gateway}}{{end}}",
903        ])
904        .output()
905        .ok()?;
906
907    if output.status.success() {
908        let gateway = String::from_utf8_lossy(&output.stdout).trim().to_string();
909        if !gateway.is_empty() && gateway.contains('.') {
910            tracing::info!(
911                gateway = %gateway,
912                "Detected Docker bridge gateway for Lambda containers"
913            );
914            return Some(gateway);
915        }
916    }
917    None
918}
919
920fn is_cli_available(name: &str) -> bool {
921    std::process::Command::new(name)
922        .arg("info")
923        .stdout(std::process::Stdio::null())
924        .stderr(std::process::Stdio::null())
925        .status()
926        .map(|s| s.success())
927        .unwrap_or(false)
928}
929
930fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
931    let dir = TempDir::new().ok()?;
932    let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-lambda-runtime");
933    let config = serde_json::json!({
934        "auths": {
935            format!("127.0.0.1:{server_port}"): { "auth": auth },
936        }
937    });
938    std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
939    Some(dir)
940}
941
942#[cfg(test)]
943mod tests {
944    use std::io::{Read, Write};
945
946    use super::*;
947
948    #[test]
949    fn test_runtime_to_image() {
950        assert_eq!(
951            runtime_to_image("python3.12"),
952            Some("public.ecr.aws/lambda/python:3.12".to_string())
953        );
954        assert_eq!(
955            runtime_to_image("nodejs20.x"),
956            Some("public.ecr.aws/lambda/nodejs:20".to_string())
957        );
958        assert_eq!(
959            runtime_to_image("provided.al2023"),
960            Some("public.ecr.aws/lambda/provided:al2023".to_string())
961        );
962        assert_eq!(
963            runtime_to_image("ruby3.4"),
964            Some("public.ecr.aws/lambda/ruby:3.4".to_string())
965        );
966        assert_eq!(
967            runtime_to_image("java21"),
968            Some("public.ecr.aws/lambda/java:21".to_string())
969        );
970        assert_eq!(
971            runtime_to_image("dotnet8"),
972            Some("public.ecr.aws/lambda/dotnet:8".to_string())
973        );
974        assert_eq!(
975            runtime_to_image("nodejs16.x"),
976            Some("public.ecr.aws/lambda/nodejs:16".to_string())
977        );
978        assert_eq!(
979            runtime_to_image("python3.10"),
980            Some("public.ecr.aws/lambda/python:3.10".to_string())
981        );
982        assert_eq!(
983            runtime_to_image("python3.9"),
984            Some("public.ecr.aws/lambda/python:3.9".to_string())
985        );
986        assert_eq!(
987            runtime_to_image("python3.8"),
988            Some("public.ecr.aws/lambda/python:3.8".to_string())
989        );
990        assert_eq!(
991            runtime_to_image("java11"),
992            Some("public.ecr.aws/lambda/java:11".to_string())
993        );
994        assert_eq!(
995            runtime_to_image("go1.x"),
996            Some("public.ecr.aws/lambda/go:1".to_string())
997        );
998        assert_eq!(
999            runtime_to_image("nodejs24.x"),
1000            Some("public.ecr.aws/lambda/nodejs:24".to_string())
1001        );
1002        assert_eq!(
1003            runtime_to_image("python3.14"),
1004            Some("public.ecr.aws/lambda/python:3.14".to_string())
1005        );
1006        assert_eq!(
1007            runtime_to_image("java25"),
1008            Some("public.ecr.aws/lambda/java:25".to_string())
1009        );
1010        assert_eq!(
1011            runtime_to_image("dotnet10"),
1012            Some("public.ecr.aws/lambda/dotnet:10".to_string())
1013        );
1014        assert_eq!(runtime_to_image("unknown"), None);
1015    }
1016
1017    #[test]
1018    fn test_extract_zip() {
1019        // Create a minimal ZIP in memory
1020        let buf = Vec::new();
1021        let cursor = std::io::Cursor::new(buf);
1022        let mut writer = zip::ZipWriter::new(cursor);
1023        let options = zip::write::SimpleFileOptions::default();
1024        writer.start_file("handler.py", options).unwrap();
1025        writer
1026            .write_all(b"def handler(event, context):\n    return {'statusCode': 200}\n")
1027            .unwrap();
1028        let cursor = writer.finish().unwrap();
1029        let zip_bytes = cursor.into_inner();
1030
1031        let dir = TempDir::new().unwrap();
1032        extract_zip(&zip_bytes, dir.path()).unwrap();
1033
1034        let handler_path = dir.path().join("handler.py");
1035        assert!(handler_path.exists());
1036
1037        let mut content = String::new();
1038        std::fs::File::open(&handler_path)
1039            .unwrap()
1040            .read_to_string(&mut content)
1041            .unwrap();
1042        assert!(content.contains("def handler"));
1043    }
1044
1045    #[test]
1046    fn ephemeral_storage_tmpfs_arg_defaults_to_512_when_none() {
1047        // None -> AWS default of 512 MiB. The `exec` flag is required so
1048        // handlers that unpack and run binaries from /tmp don't hit
1049        // EACCES against Docker's default `noexec` tmpfs.
1050        assert_eq!(ephemeral_storage_tmpfs_arg(None), "/tmp:size=512m,exec");
1051    }
1052
1053    #[test]
1054    fn ephemeral_storage_tmpfs_arg_uses_supplied_size() {
1055        assert_eq!(
1056            ephemeral_storage_tmpfs_arg(Some(2048)),
1057            "/tmp:size=2048m,exec"
1058        );
1059        assert_eq!(
1060            ephemeral_storage_tmpfs_arg(Some(10240)),
1061            "/tmp:size=10240m,exec"
1062        );
1063    }
1064
1065    #[test]
1066    fn ephemeral_storage_tmpfs_arg_clamps_to_64_floor() {
1067        // API-level validation already rejects values below 512, but the
1068        // runtime defends against legacy snapshots and stale state by
1069        // clamping to a 64 MiB floor that Docker still accepts.
1070        assert_eq!(ephemeral_storage_tmpfs_arg(Some(0)), "/tmp:size=64m,exec");
1071        assert_eq!(ephemeral_storage_tmpfs_arg(Some(32)), "/tmp:size=64m,exec");
1072    }
1073}