Skip to main content

fakecloud_lambda/
runtime.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::RwLock;
7use tempfile::TempDir;
8
9use crate::state::LambdaFunction;
10
11/// A running container kept warm for reuse.
12struct WarmContainer {
13    container_id: String,
14    host_port: u16,
15    last_used: RwLock<Instant>,
16    code_sha256: String,
17}
18
19/// Docker/Podman-based Lambda execution engine.
20pub struct ContainerRuntime {
21    cli: String,
22    containers: RwLock<HashMap<String, WarmContainer>>,
23    /// Serializes container startup per function to prevent duplicate containers.
24    starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
25    instance_id: String,
26    /// IP address that containers should use to reach the host
27    host_ip: String,
28}
29
30#[derive(Debug, thiserror::Error)]
31pub enum RuntimeError {
32    #[error("no code ZIP provided for function {0}")]
33    NoCodeZip(String),
34    #[error("unsupported runtime: {0}")]
35    UnsupportedRuntime(String),
36    #[error("container failed to start: {0}")]
37    ContainerStartFailed(String),
38    #[error("invocation failed: {0}")]
39    InvocationFailed(String),
40    #[error("ZIP extraction failed: {0}")]
41    ZipExtractionFailed(String),
42}
43
44impl ContainerRuntime {
45    /// Auto-detect Docker or Podman. Returns `None` if neither is available.
46    /// Override with `FAKECLOUD_CONTAINER_CLI` env var.
47    pub fn new() -> Option<Self> {
48        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
49            // Verify the configured CLI works
50            if std::process::Command::new(&cli)
51                .arg("info")
52                .stdout(std::process::Stdio::null())
53                .stderr(std::process::Stdio::null())
54                .status()
55                .map(|s| s.success())
56                .unwrap_or(false)
57            {
58                cli
59            } else {
60                return None;
61            }
62        } else if is_cli_available("docker") {
63            "docker".to_string()
64        } else if is_cli_available("podman") {
65            "podman".to_string()
66        } else {
67            return None;
68        };
69
70        let instance_id = format!("fakecloud-{}", std::process::id());
71
72        // Detect the appropriate host address for containers
73        // On Linux, use the bridge gateway IP directly (more reliable)
74        // On Mac/Windows, use host-gateway which Docker Desktop handles
75        let host_ip = if cfg!(target_os = "linux") {
76            detect_bridge_gateway(&cli).unwrap_or_else(|| "172.17.0.1".to_string())
77        } else {
78            "host-gateway".to_string()
79        };
80
81        Some(Self {
82            cli,
83            containers: RwLock::new(HashMap::new()),
84            starting: RwLock::new(HashMap::new()),
85            instance_id,
86            host_ip,
87        })
88    }
89
90    pub fn cli_name(&self) -> &str {
91        &self.cli
92    }
93
94    /// Invoke a Lambda function, starting a container if needed.
95    pub async fn invoke(
96        &self,
97        func: &LambdaFunction,
98        payload: &[u8],
99    ) -> Result<Vec<u8>, RuntimeError> {
100        let zip_bytes = func
101            .code_zip
102            .as_ref()
103            .ok_or_else(|| RuntimeError::NoCodeZip(func.function_name.clone()))?;
104
105        // Check for warm container with matching code
106        let port = {
107            let containers = self.containers.read();
108            if let Some(container) = containers.get(&func.function_name) {
109                if container.code_sha256 == func.code_sha256 {
110                    *container.last_used.write() = Instant::now();
111                    Some(container.host_port)
112                } else {
113                    None
114                }
115            } else {
116                None
117            }
118        };
119
120        let port = match port {
121            Some(p) => p,
122            None => {
123                // Serialize container startup per function to prevent duplicates
124                let startup_lock = {
125                    let mut starting = self.starting.write();
126                    starting
127                        .entry(func.function_name.clone())
128                        .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
129                        .clone()
130                };
131                let _guard = startup_lock.lock().await;
132
133                // Re-check after acquiring lock — another task may have started it
134                let existing_port = {
135                    let containers = self.containers.read();
136                    containers
137                        .get(&func.function_name)
138                        .filter(|c| c.code_sha256 == func.code_sha256)
139                        .map(|c| {
140                            *c.last_used.write() = Instant::now();
141                            c.host_port
142                        })
143                };
144                if let Some(p) = existing_port {
145                    p
146                } else {
147                    self.stop_container(&func.function_name).await;
148                    let container = self.start_container(func, zip_bytes).await?;
149                    let p = container.host_port;
150                    self.containers
151                        .write()
152                        .insert(func.function_name.clone(), container);
153                    p
154                }
155            }
156        };
157
158        // POST to the RIE endpoint
159        let url = format!(
160            "http://localhost:{}/2015-03-31/functions/function/invocations",
161            port
162        );
163        let client = reqwest::Client::new();
164        let resp = client
165            .post(&url)
166            .body(payload.to_vec())
167            .timeout(Duration::from_secs(func.timeout as u64 + 5))
168            .send()
169            .await
170            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
171
172        let body = resp
173            .bytes()
174            .await
175            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
176
177        Ok(body.to_vec())
178    }
179
180    async fn start_container(
181        &self,
182        func: &LambdaFunction,
183        zip_bytes: &[u8],
184    ) -> Result<WarmContainer, RuntimeError> {
185        let image = runtime_to_image(&func.runtime)
186            .ok_or_else(|| RuntimeError::UnsupportedRuntime(func.runtime.clone()))?;
187
188        // Extract ZIP to a temp directory (only needed during container setup).
189        // Run in spawn_blocking to avoid blocking the async runtime with fs I/O.
190        let code_dir =
191            TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
192        let zip_bytes = zip_bytes.to_vec();
193        let code_path = code_dir.path().to_path_buf();
194        tokio::task::spawn_blocking(move || extract_zip(&zip_bytes, &code_path))
195            .await
196            .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
197
198        // Step 1: docker create (no volume mounts — works in Docker-in-Docker)
199        let mut cmd = tokio::process::Command::new(&self.cli);
200        cmd.arg("create")
201            .arg("-p")
202            .arg(":8080")
203            .arg("--label")
204            .arg(format!("fakecloud-lambda={}", func.function_name))
205            .arg("--label")
206            .arg(format!("fakecloud-instance={}", self.instance_id))
207            // Map host.docker.internal to the detected host IP (bridge gateway on Linux, or explicit IP)
208            .arg("--add-host")
209            .arg(format!("host.docker.internal:{}", self.host_ip));
210
211        for (key, value) in &func.environment {
212            // Transform localhost URLs to use host.docker.internal, which we've set up via --add-host
213            let transformed_value = value
214                .replace("http://127.0.0.1:", "http://host.docker.internal:")
215                .replace("https://127.0.0.1:", "https://host.docker.internal:")
216                .replace("http://localhost:", "http://host.docker.internal:")
217                .replace("https://localhost:", "https://host.docker.internal:");
218            cmd.arg("-e").arg(format!("{}={}", key, transformed_value));
219        }
220
221        cmd.arg("-e")
222            .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
223
224        cmd.arg(&image).arg(&func.handler);
225
226        let output = cmd
227            .output()
228            .await
229            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
230
231        if !output.status.success() {
232            let stderr = String::from_utf8_lossy(&output.stderr);
233            return Err(RuntimeError::ContainerStartFailed(stderr.to_string()));
234        }
235
236        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
237
238        // Step 2: docker cp — copy code into the container
239        let cp_result = tokio::process::Command::new(&self.cli)
240            .arg("cp")
241            .arg(format!("{}/.", code_dir.path().display()))
242            .arg(format!("{}:/var/task", container_id))
243            .output()
244            .await
245            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
246
247        if !cp_result.status.success() {
248            let _ = self.remove_container(&container_id).await;
249            let stderr = String::from_utf8_lossy(&cp_result.stderr);
250            return Err(RuntimeError::ContainerStartFailed(format!(
251                "docker cp failed: {}",
252                stderr
253            )));
254        }
255
256        // For provided/custom runtimes, also copy to /var/runtime
257        if func.runtime.starts_with("provided") {
258            let cp_runtime = tokio::process::Command::new(&self.cli)
259                .arg("cp")
260                .arg(format!("{}/.", code_dir.path().display()))
261                .arg(format!("{}:/var/runtime", container_id))
262                .output()
263                .await
264                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
265
266            if !cp_runtime.status.success() {
267                let _ = self.remove_container(&container_id).await;
268                let stderr = String::from_utf8_lossy(&cp_runtime.stderr);
269                return Err(RuntimeError::ContainerStartFailed(format!(
270                    "docker cp to /var/runtime failed: {}",
271                    stderr
272                )));
273            }
274        }
275
276        // TempDir is dropped here — code now lives inside the container
277
278        // Step 3: docker start
279        let start_result = tokio::process::Command::new(&self.cli)
280            .args(["start", &container_id])
281            .output()
282            .await
283            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
284
285        if !start_result.status.success() {
286            let _ = self.remove_container(&container_id).await;
287            let stderr = String::from_utf8_lossy(&start_result.stderr);
288            return Err(RuntimeError::ContainerStartFailed(format!(
289                "docker start failed: {}",
290                stderr
291            )));
292        }
293
294        // Query the actual assigned port
295        let port_output = tokio::process::Command::new(&self.cli)
296            .args(["port", &container_id, "8080"])
297            .output()
298            .await
299            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
300
301        let port_str = String::from_utf8_lossy(&port_output.stdout);
302        let port: u16 = port_str
303            .trim()
304            .rsplit(':')
305            .next()
306            .and_then(|p| p.parse().ok())
307            .ok_or_else(|| {
308                RuntimeError::ContainerStartFailed(format!(
309                    "could not determine port from: {}",
310                    port_str.trim()
311                ))
312            })?;
313
314        // Wait for RIE to start accepting connections
315        let mut ready = false;
316        for _ in 0..20 {
317            tokio::time::sleep(Duration::from_millis(500)).await;
318            if tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
319                .await
320                .is_ok()
321            {
322                ready = true;
323                break;
324            }
325        }
326
327        if !ready {
328            let _ = self.remove_container(&container_id).await;
329            return Err(RuntimeError::ContainerStartFailed(
330                "container did not become ready within 10 seconds".to_string(),
331            ));
332        }
333
334        tracing::info!(
335            function = %func.function_name,
336            container_id = %container_id,
337            port = port,
338            runtime = %func.runtime,
339            "Lambda container started"
340        );
341
342        Ok(WarmContainer {
343            container_id,
344            host_port: port,
345            last_used: RwLock::new(Instant::now()),
346            code_sha256: func.code_sha256.clone(),
347        })
348    }
349
350    /// Remove a container (stop + rm, since we don't use --rm with docker create).
351    async fn remove_container(&self, container_id: &str) {
352        let _ = tokio::process::Command::new(&self.cli)
353            .args(["rm", "-f", container_id])
354            .output()
355            .await;
356    }
357
358    /// Stop and remove a container for a specific function.
359    pub async fn stop_container(&self, function_name: &str) {
360        let container = self.containers.write().remove(function_name);
361        if let Some(container) = container {
362            tracing::info!(
363                function = %function_name,
364                container_id = %container.container_id,
365                "stopping Lambda container"
366            );
367            self.remove_container(&container.container_id).await;
368        }
369    }
370
371    /// Stop and remove all containers (used on server shutdown or reset).
372    pub async fn stop_all(&self) {
373        let containers: Vec<(String, String)> = {
374            let mut map = self.containers.write();
375            map.drain()
376                .map(|(name, c)| (name, c.container_id))
377                .collect()
378        };
379        for (name, container_id) in containers {
380            tracing::info!(
381                function = %name,
382                container_id = %container_id,
383                "stopping Lambda container (cleanup)"
384            );
385            self.remove_container(&container_id).await;
386        }
387    }
388
389    /// List all warm containers and their metadata for introspection.
390    pub fn list_warm_containers(
391        &self,
392        lambda_state: &crate::state::SharedLambdaState,
393    ) -> Vec<serde_json::Value> {
394        let containers = self.containers.read();
395        let accounts = lambda_state.read();
396        containers
397            .iter()
398            .map(|(name, container)| {
399                let runtime = accounts
400                    .iter()
401                    .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
402                    .unwrap_or_default();
403                let last_used = container.last_used.read();
404                let idle_secs = last_used.elapsed().as_secs();
405                serde_json::json!({
406                    "functionName": name,
407                    "runtime": runtime,
408                    "containerId": container.container_id,
409                    "lastUsedSecsAgo": idle_secs,
410                })
411            })
412            .collect()
413    }
414
415    /// Evict (stop and remove) the warm container for a specific function.
416    /// Returns true if a container was found and evicted.
417    pub async fn evict_container(&self, function_name: &str) -> bool {
418        let container = self.containers.write().remove(function_name);
419        if let Some(container) = container {
420            tracing::info!(
421                function = %function_name,
422                container_id = %container.container_id,
423                "evicting Lambda container via simulation API"
424            );
425            self.remove_container(&container.container_id).await;
426            true
427        } else {
428            false
429        }
430    }
431
432    /// Background loop that stops containers idle longer than `ttl`.
433    pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
434        let mut interval = tokio::time::interval(Duration::from_secs(30));
435        loop {
436            interval.tick().await;
437            self.cleanup_idle(ttl).await;
438        }
439    }
440
441    async fn cleanup_idle(&self, ttl: Duration) {
442        let expired: Vec<String> = {
443            let containers = self.containers.read();
444            containers
445                .iter()
446                .filter(|(_, c)| c.last_used.read().elapsed() > ttl)
447                .map(|(name, _)| name.clone())
448                .collect()
449        };
450        for name in expired {
451            tracing::info!(function = %name, "stopping idle Lambda container");
452            self.stop_container(&name).await;
453        }
454    }
455}
456
457/// Map AWS runtime identifier to a Docker image tag.
458pub fn runtime_to_image(runtime: &str) -> Option<String> {
459    let (base, tag) = match runtime {
460        "python3.13" => ("python", "3.13"),
461        "python3.12" => ("python", "3.12"),
462        "python3.11" => ("python", "3.11"),
463        "nodejs22.x" => ("nodejs", "22"),
464        "nodejs20.x" => ("nodejs", "20"),
465        "nodejs18.x" => ("nodejs", "18"),
466        "ruby3.4" => ("ruby", "3.4"),
467        "ruby3.3" => ("ruby", "3.3"),
468        "java21" => ("java", "21"),
469        "java17" => ("java", "17"),
470        "dotnet8" => ("dotnet", "8"),
471        "provided.al2023" => ("provided", "al2023"),
472        "provided.al2" => ("provided", "al2"),
473        _ => return None,
474    };
475    Some(format!("public.ecr.aws/lambda/{}:{}", base, tag))
476}
477
478/// Extract a ZIP archive to a destination directory.
479pub fn extract_zip(zip_bytes: &[u8], dest: &Path) -> Result<(), RuntimeError> {
480    let cursor = std::io::Cursor::new(zip_bytes);
481    let mut archive = zip::ZipArchive::new(cursor)
482        .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
483
484    for i in 0..archive.len() {
485        let mut file = archive
486            .by_index(i)
487            .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
488
489        let out_path = dest.join(file.enclosed_name().ok_or_else(|| {
490            RuntimeError::ZipExtractionFailed("invalid file name in ZIP".to_string())
491        })?);
492
493        if file.is_dir() {
494            std::fs::create_dir_all(&out_path)
495                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
496        } else {
497            if let Some(parent) = out_path.parent() {
498                std::fs::create_dir_all(parent)
499                    .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
500            }
501            let mut out_file = std::fs::File::create(&out_path)
502                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
503            std::io::copy(&mut file, &mut out_file)
504                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
505
506            // Preserve executable permissions
507            #[cfg(unix)]
508            {
509                use std::os::unix::fs::PermissionsExt;
510                if let Some(mode) = file.unix_mode() {
511                    std::fs::set_permissions(&out_path, std::fs::Permissions::from_mode(mode))
512                        .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
513                }
514            }
515        }
516    }
517    Ok(())
518}
519
520/// Detect the Docker bridge gateway IP on Linux.
521/// Returns None if detection fails.
522fn detect_bridge_gateway(cli: &str) -> Option<String> {
523    let output = std::process::Command::new(cli)
524        .args([
525            "network",
526            "inspect",
527            "bridge",
528            "--format",
529            "{{range .IPAM.Config}}{{.Gateway}}{{end}}",
530        ])
531        .output()
532        .ok()?;
533
534    if output.status.success() {
535        let gateway = String::from_utf8_lossy(&output.stdout).trim().to_string();
536        if !gateway.is_empty() && gateway.contains('.') {
537            tracing::info!(
538                gateway = %gateway,
539                "Detected Docker bridge gateway for Lambda containers"
540            );
541            return Some(gateway);
542        }
543    }
544    None
545}
546
547fn is_cli_available(name: &str) -> bool {
548    std::process::Command::new(name)
549        .arg("info")
550        .stdout(std::process::Stdio::null())
551        .stderr(std::process::Stdio::null())
552        .status()
553        .map(|s| s.success())
554        .unwrap_or(false)
555}
556
557#[cfg(test)]
558mod tests {
559    use std::io::{Read, Write};
560
561    use super::*;
562
563    #[test]
564    fn test_runtime_to_image() {
565        assert_eq!(
566            runtime_to_image("python3.12"),
567            Some("public.ecr.aws/lambda/python:3.12".to_string())
568        );
569        assert_eq!(
570            runtime_to_image("nodejs20.x"),
571            Some("public.ecr.aws/lambda/nodejs:20".to_string())
572        );
573        assert_eq!(
574            runtime_to_image("provided.al2023"),
575            Some("public.ecr.aws/lambda/provided:al2023".to_string())
576        );
577        assert_eq!(
578            runtime_to_image("ruby3.4"),
579            Some("public.ecr.aws/lambda/ruby:3.4".to_string())
580        );
581        assert_eq!(
582            runtime_to_image("java21"),
583            Some("public.ecr.aws/lambda/java:21".to_string())
584        );
585        assert_eq!(
586            runtime_to_image("dotnet8"),
587            Some("public.ecr.aws/lambda/dotnet:8".to_string())
588        );
589        assert_eq!(runtime_to_image("unknown"), None);
590    }
591
592    #[test]
593    fn test_extract_zip() {
594        // Create a minimal ZIP in memory
595        let buf = Vec::new();
596        let cursor = std::io::Cursor::new(buf);
597        let mut writer = zip::ZipWriter::new(cursor);
598        let options = zip::write::SimpleFileOptions::default();
599        writer.start_file("handler.py", options).unwrap();
600        writer
601            .write_all(b"def handler(event, context):\n    return {'statusCode': 200}\n")
602            .unwrap();
603        let cursor = writer.finish().unwrap();
604        let zip_bytes = cursor.into_inner();
605
606        let dir = TempDir::new().unwrap();
607        extract_zip(&zip_bytes, dir.path()).unwrap();
608
609        let handler_path = dir.path().join("handler.py");
610        assert!(handler_path.exists());
611
612        let mut content = String::new();
613        std::fs::File::open(&handler_path)
614            .unwrap()
615            .read_to_string(&mut content)
616            .unwrap();
617        assert!(content.contains("def handler"));
618    }
619}