Skip to main content

fakecloud_lambda/
runtime.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::RwLock;
7use tempfile::TempDir;
8
9use crate::state::LambdaFunction;
10
11/// A running container kept warm for reuse.
12struct WarmContainer {
13    container_id: String,
14    host_port: u16,
15    last_used: RwLock<Instant>,
16    code_sha256: String,
17}
18
19/// Docker/Podman-based Lambda execution engine.
20pub struct ContainerRuntime {
21    cli: String,
22    containers: RwLock<HashMap<String, WarmContainer>>,
23    /// Serializes container startup per function to prevent duplicate containers.
24    starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
25    instance_id: String,
26}
27
28#[derive(Debug, thiserror::Error)]
29pub enum RuntimeError {
30    #[error("no code ZIP provided for function {0}")]
31    NoCodeZip(String),
32    #[error("unsupported runtime: {0}")]
33    UnsupportedRuntime(String),
34    #[error("container failed to start: {0}")]
35    ContainerStartFailed(String),
36    #[error("invocation failed: {0}")]
37    InvocationFailed(String),
38    #[error("ZIP extraction failed: {0}")]
39    ZipExtractionFailed(String),
40}
41
42impl ContainerRuntime {
43    /// Auto-detect Docker or Podman. Returns `None` if neither is available.
44    /// Override with `FAKECLOUD_CONTAINER_CLI` env var.
45    pub fn new() -> Option<Self> {
46        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
47            // Verify the configured CLI works
48            if std::process::Command::new(&cli)
49                .arg("info")
50                .stdout(std::process::Stdio::null())
51                .stderr(std::process::Stdio::null())
52                .status()
53                .map(|s| s.success())
54                .unwrap_or(false)
55            {
56                cli
57            } else {
58                return None;
59            }
60        } else if is_cli_available("docker") {
61            "docker".to_string()
62        } else if is_cli_available("podman") {
63            "podman".to_string()
64        } else {
65            return None;
66        };
67
68        let instance_id = format!("fakecloud-{}", std::process::id());
69
70        Some(Self {
71            cli,
72            containers: RwLock::new(HashMap::new()),
73            starting: RwLock::new(HashMap::new()),
74            instance_id,
75        })
76    }
77
78    pub fn cli_name(&self) -> &str {
79        &self.cli
80    }
81
82    /// Invoke a Lambda function, starting a container if needed.
83    pub async fn invoke(
84        &self,
85        func: &LambdaFunction,
86        payload: &[u8],
87    ) -> Result<Vec<u8>, RuntimeError> {
88        let zip_bytes = func
89            .code_zip
90            .as_ref()
91            .ok_or_else(|| RuntimeError::NoCodeZip(func.function_name.clone()))?;
92
93        // Check for warm container with matching code
94        let port = {
95            let containers = self.containers.read();
96            if let Some(container) = containers.get(&func.function_name) {
97                if container.code_sha256 == func.code_sha256 {
98                    *container.last_used.write() = Instant::now();
99                    Some(container.host_port)
100                } else {
101                    None
102                }
103            } else {
104                None
105            }
106        };
107
108        let port = match port {
109            Some(p) => p,
110            None => {
111                // Serialize container startup per function to prevent duplicates
112                let startup_lock = {
113                    let mut starting = self.starting.write();
114                    starting
115                        .entry(func.function_name.clone())
116                        .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
117                        .clone()
118                };
119                let _guard = startup_lock.lock().await;
120
121                // Re-check after acquiring lock — another task may have started it
122                let existing_port = {
123                    let containers = self.containers.read();
124                    containers
125                        .get(&func.function_name)
126                        .filter(|c| c.code_sha256 == func.code_sha256)
127                        .map(|c| {
128                            *c.last_used.write() = Instant::now();
129                            c.host_port
130                        })
131                };
132                if let Some(p) = existing_port {
133                    p
134                } else {
135                    self.stop_container(&func.function_name).await;
136                    let container = self.start_container(func, zip_bytes).await?;
137                    let p = container.host_port;
138                    self.containers
139                        .write()
140                        .insert(func.function_name.clone(), container);
141                    p
142                }
143            }
144        };
145
146        // POST to the RIE endpoint
147        let url = format!(
148            "http://localhost:{}/2015-03-31/functions/function/invocations",
149            port
150        );
151        let client = reqwest::Client::new();
152        let resp = client
153            .post(&url)
154            .body(payload.to_vec())
155            .timeout(Duration::from_secs(func.timeout as u64 + 5))
156            .send()
157            .await
158            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
159
160        let body = resp
161            .bytes()
162            .await
163            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
164
165        Ok(body.to_vec())
166    }
167
168    async fn start_container(
169        &self,
170        func: &LambdaFunction,
171        zip_bytes: &[u8],
172    ) -> Result<WarmContainer, RuntimeError> {
173        let image = runtime_to_image(&func.runtime)
174            .ok_or_else(|| RuntimeError::UnsupportedRuntime(func.runtime.clone()))?;
175
176        // Extract ZIP to a temp directory (only needed during container setup)
177        let code_dir =
178            TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
179        extract_zip(zip_bytes, code_dir.path())?;
180
181        // Step 1: docker create (no volume mounts — works in Docker-in-Docker)
182        let mut cmd = tokio::process::Command::new(&self.cli);
183        cmd.arg("create")
184            .arg("-p")
185            .arg(":8080")
186            .arg("--label")
187            .arg(format!("fakecloud-lambda={}", func.function_name))
188            .arg("--label")
189            .arg(format!("fakecloud-instance={}", self.instance_id));
190
191        for (key, value) in &func.environment {
192            cmd.arg("-e").arg(format!("{}={}", key, value));
193        }
194
195        cmd.arg("-e")
196            .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
197
198        cmd.arg(&image).arg(&func.handler);
199
200        let output = cmd
201            .output()
202            .await
203            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
204
205        if !output.status.success() {
206            let stderr = String::from_utf8_lossy(&output.stderr);
207            return Err(RuntimeError::ContainerStartFailed(stderr.to_string()));
208        }
209
210        let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
211
212        // Step 2: docker cp — copy code into the container
213        let cp_result = tokio::process::Command::new(&self.cli)
214            .arg("cp")
215            .arg(format!("{}/.", code_dir.path().display()))
216            .arg(format!("{}:/var/task", container_id))
217            .output()
218            .await
219            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
220
221        if !cp_result.status.success() {
222            let _ = self.remove_container(&container_id).await;
223            let stderr = String::from_utf8_lossy(&cp_result.stderr);
224            return Err(RuntimeError::ContainerStartFailed(format!(
225                "docker cp failed: {}",
226                stderr
227            )));
228        }
229
230        // For provided/custom runtimes, also copy to /var/runtime
231        if func.runtime.starts_with("provided") {
232            let cp_runtime = tokio::process::Command::new(&self.cli)
233                .arg("cp")
234                .arg(format!("{}/.", code_dir.path().display()))
235                .arg(format!("{}:/var/runtime", container_id))
236                .output()
237                .await
238                .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
239
240            if !cp_runtime.status.success() {
241                let _ = self.remove_container(&container_id).await;
242                let stderr = String::from_utf8_lossy(&cp_runtime.stderr);
243                return Err(RuntimeError::ContainerStartFailed(format!(
244                    "docker cp to /var/runtime failed: {}",
245                    stderr
246                )));
247            }
248        }
249
250        // TempDir is dropped here — code now lives inside the container
251
252        // Step 3: docker start
253        let start_result = tokio::process::Command::new(&self.cli)
254            .args(["start", &container_id])
255            .output()
256            .await
257            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
258
259        if !start_result.status.success() {
260            let _ = self.remove_container(&container_id).await;
261            let stderr = String::from_utf8_lossy(&start_result.stderr);
262            return Err(RuntimeError::ContainerStartFailed(format!(
263                "docker start failed: {}",
264                stderr
265            )));
266        }
267
268        // Query the actual assigned port
269        let port_output = tokio::process::Command::new(&self.cli)
270            .args(["port", &container_id, "8080"])
271            .output()
272            .await
273            .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
274
275        let port_str = String::from_utf8_lossy(&port_output.stdout);
276        let port: u16 = port_str
277            .trim()
278            .rsplit(':')
279            .next()
280            .and_then(|p| p.parse().ok())
281            .ok_or_else(|| {
282                RuntimeError::ContainerStartFailed(format!(
283                    "could not determine port from: {}",
284                    port_str.trim()
285                ))
286            })?;
287
288        // Wait for RIE to start accepting connections
289        let mut ready = false;
290        for _ in 0..20 {
291            tokio::time::sleep(Duration::from_millis(500)).await;
292            if tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
293                .await
294                .is_ok()
295            {
296                ready = true;
297                break;
298            }
299        }
300
301        if !ready {
302            let _ = self.remove_container(&container_id).await;
303            return Err(RuntimeError::ContainerStartFailed(
304                "container did not become ready within 10 seconds".to_string(),
305            ));
306        }
307
308        tracing::info!(
309            function = %func.function_name,
310            container_id = %container_id,
311            port = port,
312            runtime = %func.runtime,
313            "Lambda container started"
314        );
315
316        Ok(WarmContainer {
317            container_id,
318            host_port: port,
319            last_used: RwLock::new(Instant::now()),
320            code_sha256: func.code_sha256.clone(),
321        })
322    }
323
324    /// Remove a container (stop + rm, since we don't use --rm with docker create).
325    async fn remove_container(&self, container_id: &str) {
326        let _ = tokio::process::Command::new(&self.cli)
327            .args(["rm", "-f", container_id])
328            .output()
329            .await;
330    }
331
332    /// Stop and remove a container for a specific function.
333    pub async fn stop_container(&self, function_name: &str) {
334        let container = self.containers.write().remove(function_name);
335        if let Some(container) = container {
336            tracing::info!(
337                function = %function_name,
338                container_id = %container.container_id,
339                "stopping Lambda container"
340            );
341            self.remove_container(&container.container_id).await;
342        }
343    }
344
345    /// Stop and remove all containers (used on server shutdown or reset).
346    pub async fn stop_all(&self) {
347        let containers: Vec<(String, String)> = {
348            let mut map = self.containers.write();
349            map.drain()
350                .map(|(name, c)| (name, c.container_id))
351                .collect()
352        };
353        for (name, container_id) in containers {
354            tracing::info!(
355                function = %name,
356                container_id = %container_id,
357                "stopping Lambda container (cleanup)"
358            );
359            self.remove_container(&container_id).await;
360        }
361    }
362
363    /// List all warm containers and their metadata for introspection.
364    pub fn list_warm_containers(
365        &self,
366        lambda_state: &crate::state::SharedLambdaState,
367    ) -> Vec<serde_json::Value> {
368        let containers = self.containers.read();
369        let state = lambda_state.read();
370        containers
371            .iter()
372            .map(|(name, container)| {
373                let runtime = state
374                    .functions
375                    .get(name)
376                    .map(|f| f.runtime.clone())
377                    .unwrap_or_default();
378                let last_used = container.last_used.read();
379                let idle_secs = last_used.elapsed().as_secs();
380                serde_json::json!({
381                    "functionName": name,
382                    "runtime": runtime,
383                    "containerId": container.container_id,
384                    "lastUsedSecsAgo": idle_secs,
385                })
386            })
387            .collect()
388    }
389
390    /// Evict (stop and remove) the warm container for a specific function.
391    /// Returns true if a container was found and evicted.
392    pub async fn evict_container(&self, function_name: &str) -> bool {
393        let container = self.containers.write().remove(function_name);
394        if let Some(container) = container {
395            tracing::info!(
396                function = %function_name,
397                container_id = %container.container_id,
398                "evicting Lambda container via simulation API"
399            );
400            self.remove_container(&container.container_id).await;
401            true
402        } else {
403            false
404        }
405    }
406
407    /// Background loop that stops containers idle longer than `ttl`.
408    pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
409        let mut interval = tokio::time::interval(Duration::from_secs(30));
410        loop {
411            interval.tick().await;
412            self.cleanup_idle(ttl).await;
413        }
414    }
415
416    async fn cleanup_idle(&self, ttl: Duration) {
417        let expired: Vec<String> = {
418            let containers = self.containers.read();
419            containers
420                .iter()
421                .filter(|(_, c)| c.last_used.read().elapsed() > ttl)
422                .map(|(name, _)| name.clone())
423                .collect()
424        };
425        for name in expired {
426            tracing::info!(function = %name, "stopping idle Lambda container");
427            self.stop_container(&name).await;
428        }
429    }
430}
431
432/// Map AWS runtime identifier to a Docker image tag.
433pub fn runtime_to_image(runtime: &str) -> Option<String> {
434    let (base, tag) = match runtime {
435        "python3.13" => ("python", "3.13"),
436        "python3.12" => ("python", "3.12"),
437        "python3.11" => ("python", "3.11"),
438        "nodejs22.x" => ("nodejs", "22"),
439        "nodejs20.x" => ("nodejs", "20"),
440        "nodejs18.x" => ("nodejs", "18"),
441        "ruby3.4" => ("ruby", "3.4"),
442        "ruby3.3" => ("ruby", "3.3"),
443        "java21" => ("java", "21"),
444        "java17" => ("java", "17"),
445        "dotnet8" => ("dotnet", "8"),
446        "provided.al2023" => ("provided", "al2023"),
447        "provided.al2" => ("provided", "al2"),
448        _ => return None,
449    };
450    Some(format!("public.ecr.aws/lambda/{}:{}", base, tag))
451}
452
453/// Extract a ZIP archive to a destination directory.
454pub fn extract_zip(zip_bytes: &[u8], dest: &Path) -> Result<(), RuntimeError> {
455    let cursor = std::io::Cursor::new(zip_bytes);
456    let mut archive = zip::ZipArchive::new(cursor)
457        .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
458
459    for i in 0..archive.len() {
460        let mut file = archive
461            .by_index(i)
462            .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
463
464        let out_path = dest.join(file.enclosed_name().ok_or_else(|| {
465            RuntimeError::ZipExtractionFailed("invalid file name in ZIP".to_string())
466        })?);
467
468        if file.is_dir() {
469            std::fs::create_dir_all(&out_path)
470                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
471        } else {
472            if let Some(parent) = out_path.parent() {
473                std::fs::create_dir_all(parent)
474                    .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
475            }
476            let mut out_file = std::fs::File::create(&out_path)
477                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
478            std::io::copy(&mut file, &mut out_file)
479                .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
480
481            // Preserve executable permissions
482            #[cfg(unix)]
483            {
484                use std::os::unix::fs::PermissionsExt;
485                if let Some(mode) = file.unix_mode() {
486                    std::fs::set_permissions(&out_path, std::fs::Permissions::from_mode(mode))
487                        .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
488                }
489            }
490        }
491    }
492    Ok(())
493}
494
495fn is_cli_available(name: &str) -> bool {
496    std::process::Command::new(name)
497        .arg("info")
498        .stdout(std::process::Stdio::null())
499        .stderr(std::process::Stdio::null())
500        .status()
501        .map(|s| s.success())
502        .unwrap_or(false)
503}
504
505#[cfg(test)]
506mod tests {
507    use std::io::{Read, Write};
508
509    use super::*;
510
511    #[test]
512    fn test_runtime_to_image() {
513        assert_eq!(
514            runtime_to_image("python3.12"),
515            Some("public.ecr.aws/lambda/python:3.12".to_string())
516        );
517        assert_eq!(
518            runtime_to_image("nodejs20.x"),
519            Some("public.ecr.aws/lambda/nodejs:20".to_string())
520        );
521        assert_eq!(
522            runtime_to_image("provided.al2023"),
523            Some("public.ecr.aws/lambda/provided:al2023".to_string())
524        );
525        assert_eq!(
526            runtime_to_image("ruby3.4"),
527            Some("public.ecr.aws/lambda/ruby:3.4".to_string())
528        );
529        assert_eq!(
530            runtime_to_image("java21"),
531            Some("public.ecr.aws/lambda/java:21".to_string())
532        );
533        assert_eq!(
534            runtime_to_image("dotnet8"),
535            Some("public.ecr.aws/lambda/dotnet:8".to_string())
536        );
537        assert_eq!(runtime_to_image("unknown"), None);
538    }
539
540    #[test]
541    fn test_extract_zip() {
542        // Create a minimal ZIP in memory
543        let buf = Vec::new();
544        let cursor = std::io::Cursor::new(buf);
545        let mut writer = zip::ZipWriter::new(cursor);
546        let options = zip::write::SimpleFileOptions::default();
547        writer.start_file("handler.py", options).unwrap();
548        writer
549            .write_all(b"def handler(event, context):\n    return {'statusCode': 200}\n")
550            .unwrap();
551        let cursor = writer.finish().unwrap();
552        let zip_bytes = cursor.into_inner();
553
554        let dir = TempDir::new().unwrap();
555        extract_zip(&zip_bytes, dir.path()).unwrap();
556
557        let handler_path = dir.path().join("handler.py");
558        assert!(handler_path.exists());
559
560        let mut content = String::new();
561        std::fs::File::open(&handler_path)
562            .unwrap()
563            .read_to_string(&mut content)
564            .unwrap();
565        assert!(content.contains("def handler"));
566    }
567}