1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::RwLock;
7use tempfile::TempDir;
8
9use crate::state::LambdaFunction;
10
11struct WarmContainer {
13 container_id: String,
14 host_port: u16,
15 last_used: RwLock<Instant>,
16 code_sha256: String,
17}
18
19pub struct ContainerRuntime {
21 cli: String,
22 containers: RwLock<HashMap<String, WarmContainer>>,
23 starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
25 instance_id: String,
26}
27
28#[derive(Debug, thiserror::Error)]
29pub enum RuntimeError {
30 #[error("no code ZIP provided for function {0}")]
31 NoCodeZip(String),
32 #[error("unsupported runtime: {0}")]
33 UnsupportedRuntime(String),
34 #[error("container failed to start: {0}")]
35 ContainerStartFailed(String),
36 #[error("invocation failed: {0}")]
37 InvocationFailed(String),
38 #[error("ZIP extraction failed: {0}")]
39 ZipExtractionFailed(String),
40}
41
42impl ContainerRuntime {
43 pub fn new() -> Option<Self> {
46 let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
47 if std::process::Command::new(&cli)
49 .arg("info")
50 .stdout(std::process::Stdio::null())
51 .stderr(std::process::Stdio::null())
52 .status()
53 .map(|s| s.success())
54 .unwrap_or(false)
55 {
56 cli
57 } else {
58 return None;
59 }
60 } else if is_cli_available("docker") {
61 "docker".to_string()
62 } else if is_cli_available("podman") {
63 "podman".to_string()
64 } else {
65 return None;
66 };
67
68 let instance_id = format!("fakecloud-{}", std::process::id());
69
70 Some(Self {
71 cli,
72 containers: RwLock::new(HashMap::new()),
73 starting: RwLock::new(HashMap::new()),
74 instance_id,
75 })
76 }
77
78 pub fn cli_name(&self) -> &str {
79 &self.cli
80 }
81
82 pub async fn invoke(
84 &self,
85 func: &LambdaFunction,
86 payload: &[u8],
87 ) -> Result<Vec<u8>, RuntimeError> {
88 let zip_bytes = func
89 .code_zip
90 .as_ref()
91 .ok_or_else(|| RuntimeError::NoCodeZip(func.function_name.clone()))?;
92
93 let port = {
95 let containers = self.containers.read();
96 if let Some(container) = containers.get(&func.function_name) {
97 if container.code_sha256 == func.code_sha256 {
98 *container.last_used.write() = Instant::now();
99 Some(container.host_port)
100 } else {
101 None
102 }
103 } else {
104 None
105 }
106 };
107
108 let port = match port {
109 Some(p) => p,
110 None => {
111 let startup_lock = {
113 let mut starting = self.starting.write();
114 starting
115 .entry(func.function_name.clone())
116 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
117 .clone()
118 };
119 let _guard = startup_lock.lock().await;
120
121 let existing_port = {
123 let containers = self.containers.read();
124 containers
125 .get(&func.function_name)
126 .filter(|c| c.code_sha256 == func.code_sha256)
127 .map(|c| {
128 *c.last_used.write() = Instant::now();
129 c.host_port
130 })
131 };
132 if let Some(p) = existing_port {
133 p
134 } else {
135 self.stop_container(&func.function_name).await;
136 let container = self.start_container(func, zip_bytes).await?;
137 let p = container.host_port;
138 self.containers
139 .write()
140 .insert(func.function_name.clone(), container);
141 p
142 }
143 }
144 };
145
146 let url = format!(
148 "http://localhost:{}/2015-03-31/functions/function/invocations",
149 port
150 );
151 let client = reqwest::Client::new();
152 let resp = client
153 .post(&url)
154 .body(payload.to_vec())
155 .timeout(Duration::from_secs(func.timeout as u64 + 5))
156 .send()
157 .await
158 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
159
160 let body = resp
161 .bytes()
162 .await
163 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
164
165 Ok(body.to_vec())
166 }
167
168 async fn start_container(
169 &self,
170 func: &LambdaFunction,
171 zip_bytes: &[u8],
172 ) -> Result<WarmContainer, RuntimeError> {
173 let image = runtime_to_image(&func.runtime)
174 .ok_or_else(|| RuntimeError::UnsupportedRuntime(func.runtime.clone()))?;
175
176 let code_dir =
178 TempDir::new().map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
179 extract_zip(zip_bytes, code_dir.path())?;
180
181 let mut cmd = tokio::process::Command::new(&self.cli);
183 cmd.arg("create")
184 .arg("-p")
185 .arg(":8080")
186 .arg("--label")
187 .arg(format!("fakecloud-lambda={}", func.function_name))
188 .arg("--label")
189 .arg(format!("fakecloud-instance={}", self.instance_id));
190
191 for (key, value) in &func.environment {
192 cmd.arg("-e").arg(format!("{}={}", key, value));
193 }
194
195 cmd.arg("-e")
196 .arg(format!("AWS_LAMBDA_FUNCTION_TIMEOUT={}", func.timeout));
197
198 cmd.arg(&image).arg(&func.handler);
199
200 let output = cmd
201 .output()
202 .await
203 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
204
205 if !output.status.success() {
206 let stderr = String::from_utf8_lossy(&output.stderr);
207 return Err(RuntimeError::ContainerStartFailed(stderr.to_string()));
208 }
209
210 let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
211
212 let cp_result = tokio::process::Command::new(&self.cli)
214 .arg("cp")
215 .arg(format!("{}/.", code_dir.path().display()))
216 .arg(format!("{}:/var/task", container_id))
217 .output()
218 .await
219 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
220
221 if !cp_result.status.success() {
222 let _ = self.remove_container(&container_id).await;
223 let stderr = String::from_utf8_lossy(&cp_result.stderr);
224 return Err(RuntimeError::ContainerStartFailed(format!(
225 "docker cp failed: {}",
226 stderr
227 )));
228 }
229
230 if func.runtime.starts_with("provided") {
232 let cp_runtime = tokio::process::Command::new(&self.cli)
233 .arg("cp")
234 .arg(format!("{}/.", code_dir.path().display()))
235 .arg(format!("{}:/var/runtime", container_id))
236 .output()
237 .await
238 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
239
240 if !cp_runtime.status.success() {
241 let _ = self.remove_container(&container_id).await;
242 let stderr = String::from_utf8_lossy(&cp_runtime.stderr);
243 return Err(RuntimeError::ContainerStartFailed(format!(
244 "docker cp to /var/runtime failed: {}",
245 stderr
246 )));
247 }
248 }
249
250 let start_result = tokio::process::Command::new(&self.cli)
254 .args(["start", &container_id])
255 .output()
256 .await
257 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
258
259 if !start_result.status.success() {
260 let _ = self.remove_container(&container_id).await;
261 let stderr = String::from_utf8_lossy(&start_result.stderr);
262 return Err(RuntimeError::ContainerStartFailed(format!(
263 "docker start failed: {}",
264 stderr
265 )));
266 }
267
268 let port_output = tokio::process::Command::new(&self.cli)
270 .args(["port", &container_id, "8080"])
271 .output()
272 .await
273 .map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
274
275 let port_str = String::from_utf8_lossy(&port_output.stdout);
276 let port: u16 = port_str
277 .trim()
278 .rsplit(':')
279 .next()
280 .and_then(|p| p.parse().ok())
281 .ok_or_else(|| {
282 RuntimeError::ContainerStartFailed(format!(
283 "could not determine port from: {}",
284 port_str.trim()
285 ))
286 })?;
287
288 let mut ready = false;
290 for _ in 0..20 {
291 tokio::time::sleep(Duration::from_millis(500)).await;
292 if tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
293 .await
294 .is_ok()
295 {
296 ready = true;
297 break;
298 }
299 }
300
301 if !ready {
302 let _ = self.remove_container(&container_id).await;
303 return Err(RuntimeError::ContainerStartFailed(
304 "container did not become ready within 10 seconds".to_string(),
305 ));
306 }
307
308 tracing::info!(
309 function = %func.function_name,
310 container_id = %container_id,
311 port = port,
312 runtime = %func.runtime,
313 "Lambda container started"
314 );
315
316 Ok(WarmContainer {
317 container_id,
318 host_port: port,
319 last_used: RwLock::new(Instant::now()),
320 code_sha256: func.code_sha256.clone(),
321 })
322 }
323
324 async fn remove_container(&self, container_id: &str) {
326 let _ = tokio::process::Command::new(&self.cli)
327 .args(["rm", "-f", container_id])
328 .output()
329 .await;
330 }
331
332 pub async fn stop_container(&self, function_name: &str) {
334 let container = self.containers.write().remove(function_name);
335 if let Some(container) = container {
336 tracing::info!(
337 function = %function_name,
338 container_id = %container.container_id,
339 "stopping Lambda container"
340 );
341 self.remove_container(&container.container_id).await;
342 }
343 }
344
345 pub async fn stop_all(&self) {
347 let containers: Vec<(String, String)> = {
348 let mut map = self.containers.write();
349 map.drain()
350 .map(|(name, c)| (name, c.container_id))
351 .collect()
352 };
353 for (name, container_id) in containers {
354 tracing::info!(
355 function = %name,
356 container_id = %container_id,
357 "stopping Lambda container (cleanup)"
358 );
359 self.remove_container(&container_id).await;
360 }
361 }
362
363 pub fn list_warm_containers(
365 &self,
366 lambda_state: &crate::state::SharedLambdaState,
367 ) -> Vec<serde_json::Value> {
368 let containers = self.containers.read();
369 let state = lambda_state.read();
370 containers
371 .iter()
372 .map(|(name, container)| {
373 let runtime = state
374 .functions
375 .get(name)
376 .map(|f| f.runtime.clone())
377 .unwrap_or_default();
378 let last_used = container.last_used.read();
379 let idle_secs = last_used.elapsed().as_secs();
380 serde_json::json!({
381 "functionName": name,
382 "runtime": runtime,
383 "containerId": container.container_id,
384 "lastUsedSecsAgo": idle_secs,
385 })
386 })
387 .collect()
388 }
389
390 pub async fn evict_container(&self, function_name: &str) -> bool {
393 let container = self.containers.write().remove(function_name);
394 if let Some(container) = container {
395 tracing::info!(
396 function = %function_name,
397 container_id = %container.container_id,
398 "evicting Lambda container via simulation API"
399 );
400 self.remove_container(&container.container_id).await;
401 true
402 } else {
403 false
404 }
405 }
406
407 pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
409 let mut interval = tokio::time::interval(Duration::from_secs(30));
410 loop {
411 interval.tick().await;
412 self.cleanup_idle(ttl).await;
413 }
414 }
415
416 async fn cleanup_idle(&self, ttl: Duration) {
417 let expired: Vec<String> = {
418 let containers = self.containers.read();
419 containers
420 .iter()
421 .filter(|(_, c)| c.last_used.read().elapsed() > ttl)
422 .map(|(name, _)| name.clone())
423 .collect()
424 };
425 for name in expired {
426 tracing::info!(function = %name, "stopping idle Lambda container");
427 self.stop_container(&name).await;
428 }
429 }
430}
431
432pub fn runtime_to_image(runtime: &str) -> Option<String> {
434 let (base, tag) = match runtime {
435 "python3.13" => ("python", "3.13"),
436 "python3.12" => ("python", "3.12"),
437 "python3.11" => ("python", "3.11"),
438 "nodejs22.x" => ("nodejs", "22"),
439 "nodejs20.x" => ("nodejs", "20"),
440 "nodejs18.x" => ("nodejs", "18"),
441 "ruby3.4" => ("ruby", "3.4"),
442 "ruby3.3" => ("ruby", "3.3"),
443 "java21" => ("java", "21"),
444 "java17" => ("java", "17"),
445 "dotnet8" => ("dotnet", "8"),
446 "provided.al2023" => ("provided", "al2023"),
447 "provided.al2" => ("provided", "al2"),
448 _ => return None,
449 };
450 Some(format!("public.ecr.aws/lambda/{}:{}", base, tag))
451}
452
453pub fn extract_zip(zip_bytes: &[u8], dest: &Path) -> Result<(), RuntimeError> {
455 let cursor = std::io::Cursor::new(zip_bytes);
456 let mut archive = zip::ZipArchive::new(cursor)
457 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
458
459 for i in 0..archive.len() {
460 let mut file = archive
461 .by_index(i)
462 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
463
464 let out_path = dest.join(file.enclosed_name().ok_or_else(|| {
465 RuntimeError::ZipExtractionFailed("invalid file name in ZIP".to_string())
466 })?);
467
468 if file.is_dir() {
469 std::fs::create_dir_all(&out_path)
470 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
471 } else {
472 if let Some(parent) = out_path.parent() {
473 std::fs::create_dir_all(parent)
474 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
475 }
476 let mut out_file = std::fs::File::create(&out_path)
477 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
478 std::io::copy(&mut file, &mut out_file)
479 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
480
481 #[cfg(unix)]
483 {
484 use std::os::unix::fs::PermissionsExt;
485 if let Some(mode) = file.unix_mode() {
486 std::fs::set_permissions(&out_path, std::fs::Permissions::from_mode(mode))
487 .map_err(|e| RuntimeError::ZipExtractionFailed(e.to_string()))?;
488 }
489 }
490 }
491 }
492 Ok(())
493}
494
495fn is_cli_available(name: &str) -> bool {
496 std::process::Command::new(name)
497 .arg("info")
498 .stdout(std::process::Stdio::null())
499 .stderr(std::process::Stdio::null())
500 .status()
501 .map(|s| s.success())
502 .unwrap_or(false)
503}
504
505#[cfg(test)]
506mod tests {
507 use std::io::{Read, Write};
508
509 use super::*;
510
511 #[test]
512 fn test_runtime_to_image() {
513 assert_eq!(
514 runtime_to_image("python3.12"),
515 Some("public.ecr.aws/lambda/python:3.12".to_string())
516 );
517 assert_eq!(
518 runtime_to_image("nodejs20.x"),
519 Some("public.ecr.aws/lambda/nodejs:20".to_string())
520 );
521 assert_eq!(
522 runtime_to_image("provided.al2023"),
523 Some("public.ecr.aws/lambda/provided:al2023".to_string())
524 );
525 assert_eq!(
526 runtime_to_image("ruby3.4"),
527 Some("public.ecr.aws/lambda/ruby:3.4".to_string())
528 );
529 assert_eq!(
530 runtime_to_image("java21"),
531 Some("public.ecr.aws/lambda/java:21".to_string())
532 );
533 assert_eq!(
534 runtime_to_image("dotnet8"),
535 Some("public.ecr.aws/lambda/dotnet:8".to_string())
536 );
537 assert_eq!(runtime_to_image("unknown"), None);
538 }
539
540 #[test]
541 fn test_extract_zip() {
542 let buf = Vec::new();
544 let cursor = std::io::Cursor::new(buf);
545 let mut writer = zip::ZipWriter::new(cursor);
546 let options = zip::write::SimpleFileOptions::default();
547 writer.start_file("handler.py", options).unwrap();
548 writer
549 .write_all(b"def handler(event, context):\n return {'statusCode': 200}\n")
550 .unwrap();
551 let cursor = writer.finish().unwrap();
552 let zip_bytes = cursor.into_inner();
553
554 let dir = TempDir::new().unwrap();
555 extract_zip(&zip_bytes, dir.path()).unwrap();
556
557 let handler_path = dir.path().join("handler.py");
558 assert!(handler_path.exists());
559
560 let mut content = String::new();
561 std::fs::File::open(&handler_path)
562 .unwrap()
563 .read_to_string(&mut content)
564 .unwrap();
565 assert!(content.contains("def handler"));
566 }
567}