1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::RwLock;
7use tempfile::TempDir;
8
9use crate::state::LambdaFunction;
10
11struct WarmContainer {
13 container_id: String,
14 host_port: u16,
15 last_used: RwLock<Instant>,
16 code_sha256: String,
17}
18
19pub struct ContainerRuntime {
21 cli: String,
22 containers: RwLock<HashMap<String, WarmContainer>>,
23 starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
25 instance_id: String,
26 host_ip: String,
28}
29
30#[derive(Debug, thiserror::Error)]
31pub enum RuntimeError {
32 #[error("no code ZIP provided for function {0}")]
33 NoCodeZip(String),
34 #[error("unsupported runtime: {0}")]
35 UnsupportedRuntime(String),
36 #[error("container failed to start: {0}")]
37 ContainerStartFailed(String),
38 #[error("invocation failed: {0}")]
39 InvocationFailed(String),
40 #[error("ZIP extraction failed: {0}")]
41 ZipExtractionFailed(String),
42}
43
44impl ContainerRuntime {
45 pub fn new() -> Option<Self> {
48 let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
49 if std::process::Command::new(&cli)
51 .arg("info")
52 .stdout(std::process::Stdio::null())
53 .stderr(std::process::Stdio::null())
54 .status()
55 .map(|s| s.success())
56 .unwrap_or(false)
57 {
58 cli
59 } else {
60 return None;
61 }
62 } else if is_cli_available("docker") {
63 "docker".to_string()
64 } else if is_cli_available("podman") {
65 "podman".to_string()
66 } else {
67 return None;
68 };
69
70 let instance_id = format!("fakecloud-{}", std::process::id());
71
72 let host_ip = if cfg!(target_os = "linux") {
76 detect_bridge_gateway(&cli).unwrap_or_else(|| "172.17.0.1".to_string())
77 } else {
78 "host-gateway".to_string()
79 };
80
81 Some(Self {
82 cli,
83 containers: RwLock::new(HashMap::new()),
84 starting: RwLock::new(HashMap::new()),
85 instance_id,
86 host_ip,
87 })
88 }
89
90 pub fn cli_name(&self) -> &str {
91 &self.cli
92 }
93
94 pub async fn invoke(
96 &self,
97 func: &LambdaFunction,
98 payload: &[u8],
99 ) -> Result<Vec<u8>, RuntimeError> {
100 let zip_bytes = func
101 .code_zip
102 .as_ref()
103 .ok_or_else(|| RuntimeError::NoCodeZip(func.function_name.clone()))?;
104
105 let port = {
107 let containers = self.containers.read();
108 if let Some(container) = containers.get(&func.function_name) {
109 if container.code_sha256 == func.code_sha256 {
110 *container.last_used.write() = Instant::now();
111 Some(container.host_port)
112 } else {
113 None
114 }
115 } else {
116 None
117 }
118 };
119
120 let port = match port {
121 Some(p) => p,
122 None => {
123 let startup_lock = {
125 let mut starting = self.starting.write();
126 starting
127 .entry(func.function_name.clone())
128 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
129 .clone()
130 };
131 let _guard = startup_lock.lock().await;
132
133 let existing_port = {
135 let containers = self.containers.read();
136 containers
137 .get(&func.function_name)
138 .filter(|c| c.code_sha256 == func.code_sha256)
139 .map(|c| {
140 *c.last_used.write() = Instant::now();
141 c.host_port
142 })
143 };
144 if let Some(p) = existing_port {
145 p
146 } else {
147 self.stop_container(&func.function_name).await;
148 let container = self.start_container(func, zip_bytes).await?;
149 let p = container.host_port;
150 self.containers
151 .write()
152 .insert(func.function_name.clone(), container);
153 p
154 }
155 }
156 };
157
158 let url = format!(
160 "http://localhost:{}/2015-03-31/functions/function/invocations",
161 port
162 );
163 let client = reqwest::Client::new();
164 let resp = client
165 .post(&url)
166 .body(payload.to_vec())
167 .timeout(Duration::from_secs(func.timeout as u64 + 5))
168 .send()
169 .await
170 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
171
172 let body = resp
173 .bytes()
174 .await
175 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
176
177 Ok(body.to_vec())
178 }
179
180 async fn start_container(
181 &self,
182 func: &LambdaFunction,
183 zip_bytes: &[u8],
184 ) -> Result<WarmContainer, RuntimeError> {
185 let image = runtime_to_image(&func.runtime)
186 .ok_or_else(|| RuntimeError::UnsupportedRuntime(func.runtime.clone()))?;
187
188 let code_dir =
190 TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
191 extract_zip(zip_bytes, code_dir.path())?;
192
193 let mut cmd = tokio::process::Command::new(&self.cli);
195 cmd.arg("create")
196 .arg("-p")
197 .arg(":8080")
198 .arg("--label")
199 .arg(format!("fakecloud-lambda={}", func.function_name))
200 .arg("--label")
201 .arg(format!("fakecloud-instance={}", self.instance_id))
202 .arg("--add-host")
204 .arg(format!("host.docker.internal:{}", self.host_ip));
205
206 for (key, value) in &func.environment {
207 let transformed_value = value
209 .replace("http://127.0.0.1:", "http://host.docker.internal:")
210 .replace("https://127.0.0.1:", "https://host.docker.internal:")
211 .replace("http://localhost:", "http://host.docker.internal:")
212 .replace("https://localhost:", "https://host.docker.internal:");
213 cmd.arg("-e").arg(format!("{}={}", key, transformed_value));
214 }
215
216 cmd.arg("-e")
217 .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
218
219 cmd.arg(&image).arg(&func.handler);
220
221 let output = cmd
222 .output()
223 .await
224 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
225
226 if !output.status.success() {
227 let stderr = String::from_utf8_lossy(&output.stderr);
228 return Err(RuntimeError::ContainerStartFailed(stderr.to_string()));
229 }
230
231 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
232
233 let cp_result = tokio::process::Command::new(&self.cli)
235 .arg("cp")
236 .arg(format!("{}/.", code_dir.path().display()))
237 .arg(format!("{}:/var/task", container_id))
238 .output()
239 .await
240 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
241
242 if !cp_result.status.success() {
243 let _ = self.remove_container(&container_id).await;
244 let stderr = String::from_utf8_lossy(&cp_result.stderr);
245 return Err(RuntimeError::ContainerStartFailed(format!(
246 "docker cp failed: {}",
247 stderr
248 )));
249 }
250
251 if func.runtime.starts_with("provided") {
253 let cp_runtime = tokio::process::Command::new(&self.cli)
254 .arg("cp")
255 .arg(format!("{}/.", code_dir.path().display()))
256 .arg(format!("{}:/var/runtime", container_id))
257 .output()
258 .await
259 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
260
261 if !cp_runtime.status.success() {
262 let _ = self.remove_container(&container_id).await;
263 let stderr = String::from_utf8_lossy(&cp_runtime.stderr);
264 return Err(RuntimeError::ContainerStartFailed(format!(
265 "docker cp to /var/runtime failed: {}",
266 stderr
267 )));
268 }
269 }
270
271 let start_result = tokio::process::Command::new(&self.cli)
275 .args(["start", &container_id])
276 .output()
277 .await
278 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
279
280 if !start_result.status.success() {
281 let _ = self.remove_container(&container_id).await;
282 let stderr = String::from_utf8_lossy(&start_result.stderr);
283 return Err(RuntimeError::ContainerStartFailed(format!(
284 "docker start failed: {}",
285 stderr
286 )));
287 }
288
289 let port_output = tokio::process::Command::new(&self.cli)
291 .args(["port", &container_id, "8080"])
292 .output()
293 .await
294 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
295
296 let port_str = String::from_utf8_lossy(&port_output.stdout);
297 let port: u16 = port_str
298 .trim()
299 .rsplit(':')
300 .next()
301 .and_then(|p| p.parse().ok())
302 .ok_or_else(|| {
303 RuntimeError::ContainerStartFailed(format!(
304 "could not determine port from: {}",
305 port_str.trim()
306 ))
307 })?;
308
309 let mut ready = false;
311 for _ in 0..20 {
312 tokio::time::sleep(Duration::from_millis(500)).await;
313 if tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
314 .await
315 .is_ok()
316 {
317 ready = true;
318 break;
319 }
320 }
321
322 if !ready {
323 let _ = self.remove_container(&container_id).await;
324 return Err(RuntimeError::ContainerStartFailed(
325 "container did not become ready within 10 seconds".to_string(),
326 ));
327 }
328
329 tracing::info!(
330 function = %func.function_name,
331 container_id = %container_id,
332 port = port,
333 runtime = %func.runtime,
334 "Lambda container started"
335 );
336
337 Ok(WarmContainer {
338 container_id,
339 host_port: port,
340 last_used: RwLock::new(Instant::now()),
341 code_sha256: func.code_sha256.clone(),
342 })
343 }
344
345 async fn remove_container(&self, container_id: &str) {
347 let _ = tokio::process::Command::new(&self.cli)
348 .args(["rm", "-f", container_id])
349 .output()
350 .await;
351 }
352
353 pub async fn stop_container(&self, function_name: &str) {
355 let container = self.containers.write().remove(function_name);
356 if let Some(container) = container {
357 tracing::info!(
358 function = %function_name,
359 container_id = %container.container_id,
360 "stopping Lambda container"
361 );
362 self.remove_container(&container.container_id).await;
363 }
364 }
365
366 pub async fn stop_all(&self) {
368 let containers: Vec<(String, String)> = {
369 let mut map = self.containers.write();
370 map.drain()
371 .map(|(name, c)| (name, c.container_id))
372 .collect()
373 };
374 for (name, container_id) in containers {
375 tracing::info!(
376 function = %name,
377 container_id = %container_id,
378 "stopping Lambda container (cleanup)"
379 );
380 self.remove_container(&container_id).await;
381 }
382 }
383
384 pub fn list_warm_containers(
386 &self,
387 lambda_state: &crate::state::SharedLambdaState,
388 ) -> Vec<serde_json::Value> {
389 let containers = self.containers.read();
390 let state = lambda_state.read();
391 containers
392 .iter()
393 .map(|(name, container)| {
394 let runtime = state
395 .functions
396 .get(name)
397 .map(|f| f.runtime.clone())
398 .unwrap_or_default();
399 let last_used = container.last_used.read();
400 let idle_secs = last_used.elapsed().as_secs();
401 serde_json::json!({
402 "functionName": name,
403 "runtime": runtime,
404 "containerId": container.container_id,
405 "lastUsedSecsAgo": idle_secs,
406 })
407 })
408 .collect()
409 }
410
411 pub async fn evict_container(&self, function_name: &str) -> bool {
414 let container = self.containers.write().remove(function_name);
415 if let Some(container) = container {
416 tracing::info!(
417 function = %function_name,
418 container_id = %container.container_id,
419 "evicting Lambda container via simulation API"
420 );
421 self.remove_container(&container.container_id).await;
422 true
423 } else {
424 false
425 }
426 }
427
428 pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
430 let mut interval = tokio::time::interval(Duration::from_secs(30));
431 loop {
432 interval.tick().await;
433 self.cleanup_idle(ttl).await;
434 }
435 }
436
437 async fn cleanup_idle(&self, ttl: Duration) {
438 let expired: Vec<String> = {
439 let containers = self.containers.read();
440 containers
441 .iter()
442 .filter(|(_, c)| c.last_used.read().elapsed() > ttl)
443 .map(|(name, _)| name.clone())
444 .collect()
445 };
446 for name in expired {
447 tracing::info!(function = %name, "stopping idle Lambda container");
448 self.stop_container(&name).await;
449 }
450 }
451}
452
453pub fn runtime_to_image(runtime: &str) -> Option<String> {
455 let (base, tag) = match runtime {
456 "python3.13" => ("python", "3.13"),
457 "python3.12" => ("python", "3.12"),
458 "python3.11" => ("python", "3.11"),
459 "nodejs22.x" => ("nodejs", "22"),
460 "nodejs20.x" => ("nodejs", "20"),
461 "nodejs18.x" => ("nodejs", "18"),
462 "ruby3.4" => ("ruby", "3.4"),
463 "ruby3.3" => ("ruby", "3.3"),
464 "java21" => ("java", "21"),
465 "java17" => ("java", "17"),
466 "dotnet8" => ("dotnet", "8"),
467 "provided.al2023" => ("provided", "al2023"),
468 "provided.al2" => ("provided", "al2"),
469 _ => return None,
470 };
471 Some(format!("public.ecr.aws/lambda/{}:{}", base, tag))
472}
473
474pub fn extract_zip(zip_bytes: &[u8], dest: &Path) -> Result<(), RuntimeError> {
476 let cursor = std::io::Cursor::new(zip_bytes);
477 let mut archive = zip::ZipArchive::new(cursor)
478 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
479
480 for i in 0..archive.len() {
481 let mut file = archive
482 .by_index(i)
483 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
484
485 let out_path = dest.join(file.enclosed_name().ok_or_else(|| {
486 RuntimeError::ZipExtractionFailed("invalid file name in ZIP".to_string())
487 })?);
488
489 if file.is_dir() {
490 std::fs::create_dir_all(&out_path)
491 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
492 } else {
493 if let Some(parent) = out_path.parent() {
494 std::fs::create_dir_all(parent)
495 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
496 }
497 let mut out_file = std::fs::File::create(&out_path)
498 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
499 std::io::copy(&mut file, &mut out_file)
500 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
501
502 #[cfg(unix)]
504 {
505 use std::os::unix::fs::PermissionsExt;
506 if let Some(mode) = file.unix_mode() {
507 std::fs::set_permissions(&out_path, std::fs::Permissions::from_mode(mode))
508 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
509 }
510 }
511 }
512 }
513 Ok(())
514}
515
516fn detect_bridge_gateway(cli: &str) -> Option<String> {
519 let output = std::process::Command::new(cli)
520 .args([
521 "network",
522 "inspect",
523 "bridge",
524 "--format",
525 "{{range .IPAM.Config}}{{.Gateway}}{{end}}",
526 ])
527 .output()
528 .ok()?;
529
530 if output.status.success() {
531 let gateway = String::from_utf8_lossy(&output.stdout).trim().to_string();
532 if !gateway.is_empty() && gateway.contains('.') {
533 tracing::info!(
534 gateway = %gateway,
535 "Detected Docker bridge gateway for Lambda containers"
536 );
537 return Some(gateway);
538 }
539 }
540 None
541}
542
543fn is_cli_available(name: &str) -> bool {
544 std::process::Command::new(name)
545 .arg("info")
546 .stdout(std::process::Stdio::null())
547 .stderr(std::process::Stdio::null())
548 .status()
549 .map(|s| s.success())
550 .unwrap_or(false)
551}
552
553#[cfg(test)]
554mod tests {
555 use std::io::{Read, Write};
556
557 use super::*;
558
559 #[test]
560 fn test_runtime_to_image() {
561 assert_eq!(
562 runtime_to_image("python3.12"),
563 Some("public.ecr.aws/lambda/python:3.12".to_string())
564 );
565 assert_eq!(
566 runtime_to_image("nodejs20.x"),
567 Some("public.ecr.aws/lambda/nodejs:20".to_string())
568 );
569 assert_eq!(
570 runtime_to_image("provided.al2023"),
571 Some("public.ecr.aws/lambda/provided:al2023".to_string())
572 );
573 assert_eq!(
574 runtime_to_image("ruby3.4"),
575 Some("public.ecr.aws/lambda/ruby:3.4".to_string())
576 );
577 assert_eq!(
578 runtime_to_image("java21"),
579 Some("public.ecr.aws/lambda/java:21".to_string())
580 );
581 assert_eq!(
582 runtime_to_image("dotnet8"),
583 Some("public.ecr.aws/lambda/dotnet:8".to_string())
584 );
585 assert_eq!(runtime_to_image("unknown"), None);
586 }
587
588 #[test]
589 fn test_extract_zip() {
590 let buf = Vec::new();
592 let cursor = std::io::Cursor::new(buf);
593 let mut writer = zip::ZipWriter::new(cursor);
594 let options = zip::write::SimpleFileOptions::default();
595 writer.start_file("handler.py", options).unwrap();
596 writer
597 .write_all(b"def handler(event, context):\n return {'statusCode': 200}\n")
598 .unwrap();
599 let cursor = writer.finish().unwrap();
600 let zip_bytes = cursor.into_inner();
601
602 let dir = TempDir::new().unwrap();
603 extract_zip(&zip_bytes, dir.path()).unwrap();
604
605 let handler_path = dir.path().join("handler.py");
606 assert!(handler_path.exists());
607
608 let mut content = String::new();
609 std::fs::File::open(&handler_path)
610 .unwrap()
611 .read_to_string(&mut content)
612 .unwrap();
613 assert!(content.contains("def handler"));
614 }
615}