Skip to main content

fakecloud_lambda/
runtime.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::RwLock;
7use tempfile::TempDir;
8
9use crate::state::LambdaFunction;
10
11/// A running container kept warm for reuse.
12struct WarmContainer {
13    container_id: String,
14    host_port: u16,
15    last_used: RwLock<Instant>,
16    code_sha256: String,
17}
18
19/// Docker/Podman-based Lambda execution engine.
20pub struct ContainerRuntime {
21    cli: String,
22    containers: RwLock<HashMap<String, WarmContainer>>,
23    /// Serializes container startup per function to prevent duplicate containers.
24    starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
25    instance_id: String,
26    /// IP address that containers should use to reach the host
27    host_ip: String,
28}
29
30#[derive(Debug, thiserror::Error)]
31pub enum RuntimeError {
32    #[error("no code ZIP provided for function {0}")]
33    NoCodeZip(String),
34    #[error("unsupported runtime: {0}")]
35    UnsupportedRuntime(String),
36    #[error("container failed to start: {0}")]
37    ContainerStartFailed(String),
38    #[error("invocation failed: {0}")]
39    InvocationFailed(String),
40    #[error("ZIP extraction failed: {0}")]
41    ZipExtractionFailed(String),
42}
43
44impl ContainerRuntime {
45    /// Auto-detect Docker or Podman. Returns `None` if neither is available.
46    /// Override with `FAKECLOUD_CONTAINER_CLI` env var.
47    pub fn new() -> Option<Self> {
48        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
49            // Verify the configured CLI works
50            if std::process::Command::new(&cli)
51                .arg("info")
52                .stdout(std::process::Stdio::null())
53                .stderr(std::process::Stdio::null())
54                .status()
55                .map(|s| s.success())
56                .unwrap_or(false)
57            {
58                cli
59            } else {
60                return None;
61            }
62        } else if is_cli_available("docker") {
63            "docker".to_string()
64        } else if is_cli_available("podman") {
65            "podman".to_string()
66        } else {
67            return None;
68        };
69
70        let instance_id = format!("fakecloud-{}", std::process::id());
71
72        // Detect the appropriate host address for containers
73        // On Linux, use the bridge gateway IP directly (more reliable)
74        // On Mac/Windows, use host-gateway which Docker Desktop handles
75        let host_ip = if cfg!(target_os = "linux") {
76            detect_bridge_gateway(&cli).unwrap_or_else(|| "172.17.0.1".to_string())
77        } else {
78            "host-gateway".to_string()
79        };
80
81        Some(Self {
82            cli,
83            containers: RwLock::new(HashMap::new()),
84            starting: RwLock::new(HashMap::new()),
85            instance_id,
86            host_ip,
87        })
88    }
89
90    pub fn cli_name(&self) -> &str {
91        &self.cli
92    }
93
94    /// Invoke a Lambda function, starting a container if needed.
95    pub async fn invoke(
96        &self,
97        func: &LambdaFunction,
98        payload: &[u8],
99    ) -> Result<Vec<u8>, RuntimeError> {
100        let zip_bytes = func
101            .code_zip
102            .as_ref()
103            .ok_or_else(|| RuntimeError::NoCodeZip(func.function_name.clone()))?;
104
105        // Check for warm container with matching code
106        let port = {
107            let containers = self.containers.read();
108            if let Some(container) = containers.get(&func.function_name) {
109                if container.code_sha256 == func.code_sha256 {
110                    *container.last_used.write() = Instant::now();
111                    Some(container.host_port)
112                } else {
113                    None
114                }
115            } else {
116                None
117            }
118        };
119
120        let port = match port {
121            Some(p) => p,
122            None => {
123                // Serialize container startup per function to prevent duplicates
124                let startup_lock = {
125                    let mut starting = self.starting.write();
126                    starting
127                        .entry(func.function_name.clone())
128                        .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
129                        .clone()
130                };
131                let _guard = startup_lock.lock().await;
132
133                // Re-check after acquiring lock — another task may have started it
134                let existing_port = {
135                    let containers = self.containers.read();
136                    containers
137                        .get(&func.function_name)
138                        .filter(|c| c.code_sha256 == func.code_sha256)
139                        .map(|c| {
140                            *c.last_used.write() = Instant::now();
141                            c.host_port
142                        })
143                };
144                if let Some(p) = existing_port {
145                    p
146                } else {
147                    self.stop_container(&func.function_name).await;
148                    let container = self.start_container(func, zip_bytes).await?;
149                    let p = container.host_port;
150                    self.containers
151                        .write()
152                        .insert(func.function_name.clone(), container);
153                    p
154                }
155            }
156        };
157
158        // POST to the RIE endpoint
159        let url = format!(
160            "http://localhost:{}/2015-03-31/functions/function/invocations",
161            port
162        );
163        let client = reqwest::Client::new();
164        let resp = client
165            .post(&url)
166            .body(payload.to_vec())
167            .timeout(Duration::from_secs(func.timeout as u64 + 5))
168            .send()
169            .await
170            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
171
172        let body = resp
173            .bytes()
174            .await
175            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
176
177        Ok(body.to_vec())
178    }
179
180    async fn start_container(
181        &self,
182        func: &LambdaFunction,
183        zip_bytes: &[u8],
184    ) -> Result<WarmContainer, RuntimeError> {
185        let image = runtime_to_image(&func.runtime)
186            .ok_or_else(|| RuntimeError::UnsupportedRuntime(func.runtime.clone()))?;
187
188        // Extract ZIP to a temp directory (only needed during container setup)
189        let code_dir =
190            TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
191        extract_zip(zip_bytes, code_dir.path())?;
192
193        // Step 1: docker create (no volume mounts — works in Docker-in-Docker)
194        let mut cmd = tokio::process::Command::new(&self.cli);
195        cmd.arg("create")
196            .arg("-p")
197            .arg(":8080")
198            .arg("--label")
199            .arg(format!("fakecloud-lambda={}", func.function_name))
200            .arg("--label")
201            .arg(format!("fakecloud-instance={}", self.instance_id))
202            // Map host.docker.internal to the detected host IP (bridge gateway on Linux, or explicit IP)
203            .arg("--add-host")
204            .arg(format!("host.docker.internal:{}", self.host_ip));
205
206        for (key, value) in &func.environment {
207            // Transform localhost URLs to use host.docker.internal, which we've set up via --add-host
208            let transformed_value = value
209                .replace("http://127.0.0.1:", "http://host.docker.internal:")
210                .replace("https://127.0.0.1:", "https://host.docker.internal:")
211                .replace("http://localhost:", "http://host.docker.internal:")
212                .replace("https://localhost:", "https://host.docker.internal:");
213            cmd.arg("-e").arg(format!("{}={}", key, transformed_value));
214        }
215
216        cmd.arg("-e")
217            .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
218
219        cmd.arg(&image).arg(&func.handler);
220
221        let output = cmd
222            .output()
223            .await
224            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
225
226        if !output.status.success() {
227            let stderr = String::from_utf8_lossy(&output.stderr);
228            return Err(RuntimeError::ContainerStartFailed(stderr.to_string()));
229        }
230
231        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
232
233        // Step 2: docker cp — copy code into the container
234        let cp_result = tokio::process::Command::new(&self.cli)
235            .arg("cp")
236            .arg(format!("{}/.", code_dir.path().display()))
237            .arg(format!("{}:/var/task", container_id))
238            .output()
239            .await
240            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
241
242        if !cp_result.status.success() {
243            let _ = self.remove_container(&container_id).await;
244            let stderr = String::from_utf8_lossy(&cp_result.stderr);
245            return Err(RuntimeError::ContainerStartFailed(format!(
246                "docker cp failed: {}",
247                stderr
248            )));
249        }
250
251        // For provided/custom runtimes, also copy to /var/runtime
252        if func.runtime.starts_with("provided") {
253            let cp_runtime = tokio::process::Command::new(&self.cli)
254                .arg("cp")
255                .arg(format!("{}/.", code_dir.path().display()))
256                .arg(format!("{}:/var/runtime", container_id))
257                .output()
258                .await
259                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
260
261            if !cp_runtime.status.success() {
262                let _ = self.remove_container(&container_id).await;
263                let stderr = String::from_utf8_lossy(&cp_runtime.stderr);
264                return Err(RuntimeError::ContainerStartFailed(format!(
265                    "docker cp to /var/runtime failed: {}",
266                    stderr
267                )));
268            }
269        }
270
271        // TempDir is dropped here — code now lives inside the container
272
273        // Step 3: docker start
274        let start_result = tokio::process::Command::new(&self.cli)
275            .args(["start", &container_id])
276            .output()
277            .await
278            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
279
280        if !start_result.status.success() {
281            let _ = self.remove_container(&container_id).await;
282            let stderr = String::from_utf8_lossy(&start_result.stderr);
283            return Err(RuntimeError::ContainerStartFailed(format!(
284                "docker start failed: {}",
285                stderr
286            )));
287        }
288
289        // Query the actual assigned port
290        let port_output = tokio::process::Command::new(&self.cli)
291            .args(["port", &container_id, "8080"])
292            .output()
293            .await
294            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
295
296        let port_str = String::from_utf8_lossy(&port_output.stdout);
297        let port: u16 = port_str
298            .trim()
299            .rsplit(':')
300            .next()
301            .and_then(|p| p.parse().ok())
302            .ok_or_else(|| {
303                RuntimeError::ContainerStartFailed(format!(
304                    "could not determine port from: {}",
305                    port_str.trim()
306                ))
307            })?;
308
309        // Wait for RIE to start accepting connections
310        let mut ready = false;
311        for _ in 0..20 {
312            tokio::time::sleep(Duration::from_millis(500)).await;
313            if tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
314                .await
315                .is_ok()
316            {
317                ready = true;
318                break;
319            }
320        }
321
322        if !ready {
323            let _ = self.remove_container(&container_id).await;
324            return Err(RuntimeError::ContainerStartFailed(
325                "container did not become ready within 10 seconds".to_string(),
326            ));
327        }
328
329        tracing::info!(
330            function = %func.function_name,
331            container_id = %container_id,
332            port = port,
333            runtime = %func.runtime,
334            "Lambda container started"
335        );
336
337        Ok(WarmContainer {
338            container_id,
339            host_port: port,
340            last_used: RwLock::new(Instant::now()),
341            code_sha256: func.code_sha256.clone(),
342        })
343    }
344
345    /// Remove a container (stop + rm, since we don't use --rm with docker create).
346    async fn remove_container(&self, container_id: &str) {
347        let _ = tokio::process::Command::new(&self.cli)
348            .args(["rm", "-f", container_id])
349            .output()
350            .await;
351    }
352
353    /// Stop and remove a container for a specific function.
354    pub async fn stop_container(&self, function_name: &str) {
355        let container = self.containers.write().remove(function_name);
356        if let Some(container) = container {
357            tracing::info!(
358                function = %function_name,
359                container_id = %container.container_id,
360                "stopping Lambda container"
361            );
362            self.remove_container(&container.container_id).await;
363        }
364    }
365
366    /// Stop and remove all containers (used on server shutdown or reset).
367    pub async fn stop_all(&self) {
368        let containers: Vec<(String, String)> = {
369            let mut map = self.containers.write();
370            map.drain()
371                .map(|(name, c)| (name, c.container_id))
372                .collect()
373        };
374        for (name, container_id) in containers {
375            tracing::info!(
376                function = %name,
377                container_id = %container_id,
378                "stopping Lambda container (cleanup)"
379            );
380            self.remove_container(&container_id).await;
381        }
382    }
383
384    /// List all warm containers and their metadata for introspection.
385    pub fn list_warm_containers(
386        &self,
387        lambda_state: &crate::state::SharedLambdaState,
388    ) -> Vec<serde_json::Value> {
389        let containers = self.containers.read();
390        let state = lambda_state.read();
391        containers
392            .iter()
393            .map(|(name, container)| {
394                let runtime = state
395                    .functions
396                    .get(name)
397                    .map(|f| f.runtime.clone())
398                    .unwrap_or_default();
399                let last_used = container.last_used.read();
400                let idle_secs = last_used.elapsed().as_secs();
401                serde_json::json!({
402                    "functionName": name,
403                    "runtime": runtime,
404                    "containerId": container.container_id,
405                    "lastUsedSecsAgo": idle_secs,
406                })
407            })
408            .collect()
409    }
410
411    /// Evict (stop and remove) the warm container for a specific function.
412    /// Returns true if a container was found and evicted.
413    pub async fn evict_container(&self, function_name: &str) -> bool {
414        let container = self.containers.write().remove(function_name);
415        if let Some(container) = container {
416            tracing::info!(
417                function = %function_name,
418                container_id = %container.container_id,
419                "evicting Lambda container via simulation API"
420            );
421            self.remove_container(&container.container_id).await;
422            true
423        } else {
424            false
425        }
426    }
427
428    /// Background loop that stops containers idle longer than `ttl`.
429    pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
430        let mut interval = tokio::time::interval(Duration::from_secs(30));
431        loop {
432            interval.tick().await;
433            self.cleanup_idle(ttl).await;
434        }
435    }
436
437    async fn cleanup_idle(&self, ttl: Duration) {
438        let expired: Vec<String> = {
439            let containers = self.containers.read();
440            containers
441                .iter()
442                .filter(|(_, c)| c.last_used.read().elapsed() > ttl)
443                .map(|(name, _)| name.clone())
444                .collect()
445        };
446        for name in expired {
447            tracing::info!(function = %name, "stopping idle Lambda container");
448            self.stop_container(&name).await;
449        }
450    }
451}
452
453/// Map AWS runtime identifier to a Docker image tag.
454pub fn runtime_to_image(runtime: &str) -> Option<String> {
455    let (base, tag) = match runtime {
456        "python3.13" => ("python", "3.13"),
457        "python3.12" => ("python", "3.12"),
458        "python3.11" => ("python", "3.11"),
459        "nodejs22.x" => ("nodejs", "22"),
460        "nodejs20.x" => ("nodejs", "20"),
461        "nodejs18.x" => ("nodejs", "18"),
462        "ruby3.4" => ("ruby", "3.4"),
463        "ruby3.3" => ("ruby", "3.3"),
464        "java21" => ("java", "21"),
465        "java17" => ("java", "17"),
466        "dotnet8" => ("dotnet", "8"),
467        "provided.al2023" => ("provided", "al2023"),
468        "provided.al2" => ("provided", "al2"),
469        _ => return None,
470    };
471    Some(format!("public.ecr.aws/lambda/{}:{}", base, tag))
472}
473
474/// Extract a ZIP archive to a destination directory.
475pub fn extract_zip(zip_bytes: &[u8], dest: &Path) -> Result<(), RuntimeError> {
476    let cursor = std::io::Cursor::new(zip_bytes);
477    let mut archive = zip::ZipArchive::new(cursor)
478        .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
479
480    for i in 0..archive.len() {
481        let mut file = archive
482            .by_index(i)
483            .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
484
485        let out_path = dest.join(file.enclosed_name().ok_or_else(|| {
486            RuntimeError::ZipExtractionFailed("invalid file name in ZIP".to_string())
487        })?);
488
489        if file.is_dir() {
490            std::fs::create_dir_all(&out_path)
491                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
492        } else {
493            if let Some(parent) = out_path.parent() {
494                std::fs::create_dir_all(parent)
495                    .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
496            }
497            let mut out_file = std::fs::File::create(&out_path)
498                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
499            std::io::copy(&mut file, &mut out_file)
500                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
501
502            // Preserve executable permissions
503            #[cfg(unix)]
504            {
505                use std::os::unix::fs::PermissionsExt;
506                if let Some(mode) = file.unix_mode() {
507                    std::fs::set_permissions(&out_path, std::fs::Permissions::from_mode(mode))
508                        .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
509                }
510            }
511        }
512    }
513    Ok(())
514}
515
516/// Detect the Docker bridge gateway IP on Linux.
517/// Returns None if detection fails.
518fn detect_bridge_gateway(cli: &str) -> Option<String> {
519    let output = std::process::Command::new(cli)
520        .args([
521            "network",
522            "inspect",
523            "bridge",
524            "--format",
525            "{{range .IPAM.Config}}{{.Gateway}}{{end}}",
526        ])
527        .output()
528        .ok()?;
529
530    if output.status.success() {
531        let gateway = String::from_utf8_lossy(&output.stdout).trim().to_string();
532        if !gateway.is_empty() && gateway.contains('.') {
533            tracing::info!(
534                gateway = %gateway,
535                "Detected Docker bridge gateway for Lambda containers"
536            );
537            return Some(gateway);
538        }
539    }
540    None
541}
542
543fn is_cli_available(name: &str) -> bool {
544    std::process::Command::new(name)
545        .arg("info")
546        .stdout(std::process::Stdio::null())
547        .stderr(std::process::Stdio::null())
548        .status()
549        .map(|s| s.success())
550        .unwrap_or(false)
551}
552
553#[cfg(test)]
554mod tests {
555    use std::io::{Read, Write};
556
557    use super::*;
558
559    #[test]
560    fn test_runtime_to_image() {
561        assert_eq!(
562            runtime_to_image("python3.12"),
563            Some("public.ecr.aws/lambda/python:3.12".to_string())
564        );
565        assert_eq!(
566            runtime_to_image("nodejs20.x"),
567            Some("public.ecr.aws/lambda/nodejs:20".to_string())
568        );
569        assert_eq!(
570            runtime_to_image("provided.al2023"),
571            Some("public.ecr.aws/lambda/provided:al2023".to_string())
572        );
573        assert_eq!(
574            runtime_to_image("ruby3.4"),
575            Some("public.ecr.aws/lambda/ruby:3.4".to_string())
576        );
577        assert_eq!(
578            runtime_to_image("java21"),
579            Some("public.ecr.aws/lambda/java:21".to_string())
580        );
581        assert_eq!(
582            runtime_to_image("dotnet8"),
583            Some("public.ecr.aws/lambda/dotnet:8".to_string())
584        );
585        assert_eq!(runtime_to_image("unknown"), None);
586    }
587
588    #[test]
589    fn test_extract_zip() {
590        // Create a minimal ZIP in memory
591        let buf = Vec::new();
592        let cursor = std::io::Cursor::new(buf);
593        let mut writer = zip::ZipWriter::new(cursor);
594        let options = zip::write::SimpleFileOptions::default();
595        writer.start_file("handler.py", options).unwrap();
596        writer
597            .write_all(b"def handler(event, context):\n    return {'statusCode': 200}\n")
598            .unwrap();
599        let cursor = writer.finish().unwrap();
600        let zip_bytes = cursor.into_inner();
601
602        let dir = TempDir::new().unwrap();
603        extract_zip(&zip_bytes, dir.path()).unwrap();
604
605        let handler_path = dir.path().join("handler.py");
606        assert!(handler_path.exists());
607
608        let mut content = String::new();
609        std::fs::File::open(&handler_path)
610            .unwrap()
611            .read_to_string(&mut content)
612            .unwrap();
613        assert!(content.contains("def handler"));
614    }
615}