1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::RwLock;
7use tempfile::TempDir;
8
9use crate::state::LambdaFunction;
10
11struct WarmContainer {
13 container_id: String,
14 host_port: u16,
15 last_used: RwLock<Instant>,
16 code_sha256: String,
17}
18
19pub struct ContainerRuntime {
21 cli: String,
22 containers: RwLock<HashMap<String, WarmContainer>>,
23 starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
25 instance_id: String,
26 host_ip: String,
28}
29
30#[derive(Debug, thiserror::Error)]
31pub enum RuntimeError {
32 #[error("no code ZIP provided for function {0}")]
33 NoCodeZip(String),
34 #[error("unsupported runtime: {0}")]
35 UnsupportedRuntime(String),
36 #[error("container failed to start: {0}")]
37 ContainerStartFailed(String),
38 #[error("invocation failed: {0}")]
39 InvocationFailed(String),
40 #[error("ZIP extraction failed: {0}")]
41 ZipExtractionFailed(String),
42}
43
44impl ContainerRuntime {
45 pub fn new() -> Option<Self> {
48 let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
49 if std::process::Command::new(&cli)
51 .arg("info")
52 .stdout(std::process::Stdio::null())
53 .stderr(std::process::Stdio::null())
54 .status()
55 .map(|s| s.success())
56 .unwrap_or(false)
57 {
58 cli
59 } else {
60 return None;
61 }
62 } else if is_cli_available("docker") {
63 "docker".to_string()
64 } else if is_cli_available("podman") {
65 "podman".to_string()
66 } else {
67 return None;
68 };
69
70 let instance_id = format!("fakecloud-{}", std::process::id());
71
72 let host_ip = if cfg!(target_os = "linux") {
76 detect_bridge_gateway(&cli).unwrap_or_else(|| "172.17.0.1".to_string())
77 } else {
78 "host-gateway".to_string()
79 };
80
81 Some(Self {
82 cli,
83 containers: RwLock::new(HashMap::new()),
84 starting: RwLock::new(HashMap::new()),
85 instance_id,
86 host_ip,
87 })
88 }
89
90 pub fn cli_name(&self) -> &str {
91 &self.cli
92 }
93
94 pub async fn invoke(
96 &self,
97 func: &LambdaFunction,
98 payload: &[u8],
99 ) -> Result<Vec<u8>, RuntimeError> {
100 let zip_bytes = func
101 .code_zip
102 .as_ref()
103 .ok_or_else(|| RuntimeError::NoCodeZip(func.function_name.clone()))?;
104
105 let port = {
107 let containers = self.containers.read();
108 if let Some(container) = containers.get(&func.function_name) {
109 if container.code_sha256 == func.code_sha256 {
110 *container.last_used.write() = Instant::now();
111 Some(container.host_port)
112 } else {
113 None
114 }
115 } else {
116 None
117 }
118 };
119
120 let port = match port {
121 Some(p) => p,
122 None => {
123 let startup_lock = {
125 let mut starting = self.starting.write();
126 starting
127 .entry(func.function_name.clone())
128 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
129 .clone()
130 };
131 let _guard = startup_lock.lock().await;
132
133 let existing_port = {
135 let containers = self.containers.read();
136 containers
137 .get(&func.function_name)
138 .filter(|c| c.code_sha256 == func.code_sha256)
139 .map(|c| {
140 *c.last_used.write() = Instant::now();
141 c.host_port
142 })
143 };
144 if let Some(p) = existing_port {
145 p
146 } else {
147 self.stop_container(&func.function_name).await;
148 let container = self.start_container(func, zip_bytes).await?;
149 let p = container.host_port;
150 self.containers
151 .write()
152 .insert(func.function_name.clone(), container);
153 p
154 }
155 }
156 };
157
158 let url = format!(
160 "http://localhost:{}/2015-03-31/functions/function/invocations",
161 port
162 );
163 let client = reqwest::Client::new();
164 let resp = client
165 .post(&url)
166 .body(payload.to_vec())
167 .timeout(Duration::from_secs(func.timeout as u64 + 5))
168 .send()
169 .await
170 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
171
172 let body = resp
173 .bytes()
174 .await
175 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
176
177 Ok(body.to_vec())
178 }
179
180 async fn start_container(
181 &self,
182 func: &LambdaFunction,
183 zip_bytes: &[u8],
184 ) -> Result<WarmContainer, RuntimeError> {
185 let image = runtime_to_image(&func.runtime)
186 .ok_or_else(|| RuntimeError::UnsupportedRuntime(func.runtime.clone()))?;
187
188 let code_dir =
191 TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
192 let zip_bytes = zip_bytes.to_vec();
193 let code_path = code_dir.path().to_path_buf();
194 tokio::task::spawn_blocking(move || extract_zip(&zip_bytes, &code_path))
195 .await
196 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))??;
197
198 let mut cmd = tokio::process::Command::new(&self.cli);
200 cmd.arg("create")
201 .arg("-p")
202 .arg(":8080")
203 .arg("--label")
204 .arg(format!("fakecloud-lambda={}", func.function_name))
205 .arg("--label")
206 .arg(format!("fakecloud-instance={}", self.instance_id))
207 .arg("--add-host")
209 .arg(format!("host.docker.internal:{}", self.host_ip));
210
211 for (key, value) in &func.environment {
212 let transformed_value = value
214 .replace("http://127.0.0.1:", "http://host.docker.internal:")
215 .replace("https://127.0.0.1:", "https://host.docker.internal:")
216 .replace("http://localhost:", "http://host.docker.internal:")
217 .replace("https://localhost:", "https://host.docker.internal:");
218 cmd.arg("-e").arg(format!("{}={}", key, transformed_value));
219 }
220
221 cmd.arg("-e")
222 .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
223
224 cmd.arg(&image).arg(&func.handler);
225
226 let output = cmd
227 .output()
228 .await
229 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
230
231 if !output.status.success() {
232 let stderr = String::from_utf8_lossy(&output.stderr);
233 return Err(RuntimeError::ContainerStartFailed(stderr.to_string()));
234 }
235
236 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
237
238 let cp_result = tokio::process::Command::new(&self.cli)
240 .arg("cp")
241 .arg(format!("{}/.", code_dir.path().display()))
242 .arg(format!("{}:/var/task", container_id))
243 .output()
244 .await
245 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
246
247 if !cp_result.status.success() {
248 let _ = self.remove_container(&container_id).await;
249 let stderr = String::from_utf8_lossy(&cp_result.stderr);
250 return Err(RuntimeError::ContainerStartFailed(format!(
251 "docker cp failed: {}",
252 stderr
253 )));
254 }
255
256 if func.runtime.starts_with("provided") {
258 let cp_runtime = tokio::process::Command::new(&self.cli)
259 .arg("cp")
260 .arg(format!("{}/.", code_dir.path().display()))
261 .arg(format!("{}:/var/runtime", container_id))
262 .output()
263 .await
264 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
265
266 if !cp_runtime.status.success() {
267 let _ = self.remove_container(&container_id).await;
268 let stderr = String::from_utf8_lossy(&cp_runtime.stderr);
269 return Err(RuntimeError::ContainerStartFailed(format!(
270 "docker cp to /var/runtime failed: {}",
271 stderr
272 )));
273 }
274 }
275
276 let start_result = tokio::process::Command::new(&self.cli)
280 .args(["start", &container_id])
281 .output()
282 .await
283 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
284
285 if !start_result.status.success() {
286 let _ = self.remove_container(&container_id).await;
287 let stderr = String::from_utf8_lossy(&start_result.stderr);
288 return Err(RuntimeError::ContainerStartFailed(format!(
289 "docker start failed: {}",
290 stderr
291 )));
292 }
293
294 let port_output = tokio::process::Command::new(&self.cli)
296 .args(["port", &container_id, "8080"])
297 .output()
298 .await
299 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
300
301 let port_str = String::from_utf8_lossy(&port_output.stdout);
302 let port: u16 = port_str
303 .trim()
304 .rsplit(':')
305 .next()
306 .and_then(|p| p.parse().ok())
307 .ok_or_else(|| {
308 RuntimeError::ContainerStartFailed(format!(
309 "could not determine port from: {}",
310 port_str.trim()
311 ))
312 })?;
313
314 let mut ready = false;
316 for _ in 0..20 {
317 tokio::time::sleep(Duration::from_millis(500)).await;
318 if tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
319 .await
320 .is_ok()
321 {
322 ready = true;
323 break;
324 }
325 }
326
327 if !ready {
328 let _ = self.remove_container(&container_id).await;
329 return Err(RuntimeError::ContainerStartFailed(
330 "container did not become ready within 10 seconds".to_string(),
331 ));
332 }
333
334 tracing::info!(
335 function = %func.function_name,
336 container_id = %container_id,
337 port = port,
338 runtime = %func.runtime,
339 "Lambda container started"
340 );
341
342 Ok(WarmContainer {
343 container_id,
344 host_port: port,
345 last_used: RwLock::new(Instant::now()),
346 code_sha256: func.code_sha256.clone(),
347 })
348 }
349
350 async fn remove_container(&self, container_id: &str) {
352 let _ = tokio::process::Command::new(&self.cli)
353 .args(["rm", "-f", container_id])
354 .output()
355 .await;
356 }
357
358 pub async fn stop_container(&self, function_name: &str) {
360 let container = self.containers.write().remove(function_name);
361 if let Some(container) = container {
362 tracing::info!(
363 function = %function_name,
364 container_id = %container.container_id,
365 "stopping Lambda container"
366 );
367 self.remove_container(&container.container_id).await;
368 }
369 }
370
371 pub async fn stop_all(&self) {
373 let containers: Vec<(String, String)> = {
374 let mut map = self.containers.write();
375 map.drain()
376 .map(|(name, c)| (name, c.container_id))
377 .collect()
378 };
379 for (name, container_id) in containers {
380 tracing::info!(
381 function = %name,
382 container_id = %container_id,
383 "stopping Lambda container (cleanup)"
384 );
385 self.remove_container(&container_id).await;
386 }
387 }
388
389 pub fn list_warm_containers(
391 &self,
392 lambda_state: &crate::state::SharedLambdaState,
393 ) -> Vec<serde_json::Value> {
394 let containers = self.containers.read();
395 let accounts = lambda_state.read();
396 containers
397 .iter()
398 .map(|(name, container)| {
399 let runtime = accounts
400 .iter()
401 .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
402 .unwrap_or_default();
403 let last_used = container.last_used.read();
404 let idle_secs = last_used.elapsed().as_secs();
405 serde_json::json!({
406 "functionName": name,
407 "runtime": runtime,
408 "containerId": container.container_id,
409 "lastUsedSecsAgo": idle_secs,
410 })
411 })
412 .collect()
413 }
414
415 pub async fn evict_container(&self, function_name: &str) -> bool {
418 let container = self.containers.write().remove(function_name);
419 if let Some(container) = container {
420 tracing::info!(
421 function = %function_name,
422 container_id = %container.container_id,
423 "evicting Lambda container via simulation API"
424 );
425 self.remove_container(&container.container_id).await;
426 true
427 } else {
428 false
429 }
430 }
431
432 pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
434 let mut interval = tokio::time::interval(Duration::from_secs(30));
435 loop {
436 interval.tick().await;
437 self.cleanup_idle(ttl).await;
438 }
439 }
440
441 async fn cleanup_idle(&self, ttl: Duration) {
442 let expired: Vec<String> = {
443 let containers = self.containers.read();
444 containers
445 .iter()
446 .filter(|(_, c)| c.last_used.read().elapsed() > ttl)
447 .map(|(name, _)| name.clone())
448 .collect()
449 };
450 for name in expired {
451 tracing::info!(function = %name, "stopping idle Lambda container");
452 self.stop_container(&name).await;
453 }
454 }
455}
456
457pub fn runtime_to_image(runtime: &str) -> Option<String> {
459 let (base, tag) = match runtime {
460 "python3.13" => ("python", "3.13"),
461 "python3.12" => ("python", "3.12"),
462 "python3.11" => ("python", "3.11"),
463 "nodejs22.x" => ("nodejs", "22"),
464 "nodejs20.x" => ("nodejs", "20"),
465 "nodejs18.x" => ("nodejs", "18"),
466 "ruby3.4" => ("ruby", "3.4"),
467 "ruby3.3" => ("ruby", "3.3"),
468 "java21" => ("java", "21"),
469 "java17" => ("java", "17"),
470 "dotnet8" => ("dotnet", "8"),
471 "provided.al2023" => ("provided", "al2023"),
472 "provided.al2" => ("provided", "al2"),
473 _ => return None,
474 };
475 Some(format!("public.ecr.aws/lambda/{}:{}", base, tag))
476}
477
478pub fn extract_zip(zip_bytes: &[u8], dest: &Path) -> Result<(), RuntimeError> {
480 let cursor = std::io::Cursor::new(zip_bytes);
481 let mut archive = zip::ZipArchive::new(cursor)
482 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
483
484 for i in 0..archive.len() {
485 let mut file = archive
486 .by_index(i)
487 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
488
489 let out_path = dest.join(file.enclosed_name().ok_or_else(|| {
490 RuntimeError::ZipExtractionFailed("invalid file name in ZIP".to_string())
491 })?);
492
493 if file.is_dir() {
494 std::fs::create_dir_all(&out_path)
495 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
496 } else {
497 if let Some(parent) = out_path.parent() {
498 std::fs::create_dir_all(parent)
499 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
500 }
501 let mut out_file = std::fs::File::create(&out_path)
502 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
503 std::io::copy(&mut file, &mut out_file)
504 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
505
506 #[cfg(unix)]
508 {
509 use std::os::unix::fs::PermissionsExt;
510 if let Some(mode) = file.unix_mode() {
511 std::fs::set_permissions(&out_path, std::fs::Permissions::from_mode(mode))
512 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
513 }
514 }
515 }
516 }
517 Ok(())
518}
519
520fn detect_bridge_gateway(cli: &str) -> Option<String> {
523 let output = std::process::Command::new(cli)
524 .args([
525 "network",
526 "inspect",
527 "bridge",
528 "--format",
529 "{{range .IPAM.Config}}{{.Gateway}}{{end}}",
530 ])
531 .output()
532 .ok()?;
533
534 if output.status.success() {
535 let gateway = String::from_utf8_lossy(&output.stdout).trim().to_string();
536 if !gateway.is_empty() && gateway.contains('.') {
537 tracing::info!(
538 gateway = %gateway,
539 "Detected Docker bridge gateway for Lambda containers"
540 );
541 return Some(gateway);
542 }
543 }
544 None
545}
546
547fn is_cli_available(name: &str) -> bool {
548 std::process::Command::new(name)
549 .arg("info")
550 .stdout(std::process::Stdio::null())
551 .stderr(std::process::Stdio::null())
552 .status()
553 .map(|s| s.success())
554 .unwrap_or(false)
555}
556
557#[cfg(test)]
558mod tests {
559 use std::io::{Read, Write};
560
561 use super::*;
562
563 #[test]
564 fn test_runtime_to_image() {
565 assert_eq!(
566 runtime_to_image("python3.12"),
567 Some("public.ecr.aws/lambda/python:3.12".to_string())
568 );
569 assert_eq!(
570 runtime_to_image("nodejs20.x"),
571 Some("public.ecr.aws/lambda/nodejs:20".to_string())
572 );
573 assert_eq!(
574 runtime_to_image("provided.al2023"),
575 Some("public.ecr.aws/lambda/provided:al2023".to_string())
576 );
577 assert_eq!(
578 runtime_to_image("ruby3.4"),
579 Some("public.ecr.aws/lambda/ruby:3.4".to_string())
580 );
581 assert_eq!(
582 runtime_to_image("java21"),
583 Some("public.ecr.aws/lambda/java:21".to_string())
584 );
585 assert_eq!(
586 runtime_to_image("dotnet8"),
587 Some("public.ecr.aws/lambda/dotnet:8".to_string())
588 );
589 assert_eq!(runtime_to_image("unknown"), None);
590 }
591
592 #[test]
593 fn test_extract_zip() {
594 let buf = Vec::new();
596 let cursor = std::io::Cursor::new(buf);
597 let mut writer = zip::ZipWriter::new(cursor);
598 let options = zip::write::SimpleFileOptions::default();
599 writer.start_file("handler.py", options).unwrap();
600 writer
601 .write_all(b"def handler(event, context):\n return {'statusCode': 200}\n")
602 .unwrap();
603 let cursor = writer.finish().unwrap();
604 let zip_bytes = cursor.into_inner();
605
606 let dir = TempDir::new().unwrap();
607 extract_zip(&zip_bytes, dir.path()).unwrap();
608
609 let handler_path = dir.path().join("handler.py");
610 assert!(handler_path.exists());
611
612 let mut content = String::new();
613 std::fs::File::open(&handler_path)
614 .unwrap()
615 .read_to_string(&mut content)
616 .unwrap();
617 assert!(content.contains("def handler"));
618 }
619}