fakecloud_lambda/runtime/
facade.rs1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use base64::Engine;
12use parking_lot::RwLock;
13use sha2::{Digest, Sha256};
14
15use super::backend::{
16 BackendHandle, LambdaBackend, RuntimeError, StreamingInvocation, WarmInstance,
17};
18use super::docker::DockerBackend;
19use crate::state::LambdaFunction;
20
21struct WarmEntry {
23 instance: WarmInstance,
24 last_used: RwLock<Instant>,
25 deploy_id: String,
30}
31
32fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
36 let mut hasher = Sha256::new();
37 hasher.update(func.code_sha256.as_bytes());
38 for bytes in layers {
39 let mut layer_hasher = Sha256::new();
40 layer_hasher.update(bytes);
41 hasher.update(b":");
42 hasher.update(layer_hasher.finalize());
43 }
44 base64::engine::general_purpose::STANDARD.encode(hasher.finalize())
45}
46
47pub struct LambdaRuntime {
48 backend: Arc<dyn LambdaBackend>,
49 instances: RwLock<HashMap<String, WarmEntry>>,
50 starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
52}
53
54impl LambdaRuntime {
55 pub fn from_backend(backend: Arc<dyn LambdaBackend>) -> Self {
59 Self {
60 backend,
61 instances: RwLock::new(HashMap::new()),
62 starting: RwLock::new(HashMap::new()),
63 }
64 }
65
66 pub fn auto_detect_docker(server_port: u16) -> Option<Self> {
69 DockerBackend::auto_detect(server_port)
70 .map(|b| Self::from_backend(Arc::new(b) as Arc<dyn LambdaBackend>))
71 }
72
73 pub fn new(server_port: u16) -> Option<Self> {
76 Self::auto_detect_docker(server_port)
77 }
78
79 pub async fn new_k8s(
91 server_port: u16,
92 internal_token: String,
93 ) -> Result<Self, super::k8s::K8sBackendError> {
94 let backend = super::k8s::K8sBackend::from_env(server_port, internal_token).await?;
95 backend.reap_stale().await;
96 Ok(Self::from_backend(Arc::new(backend)))
97 }
98
99 pub fn cli_name(&self) -> &str {
100 self.backend.name()
101 }
102
103 pub async fn prepull_for_function(
117 &self,
118 func: &LambdaFunction,
119 ) -> Option<Result<(), super::backend::RuntimeError>> {
120 let image = if func.package_type == "Image" {
121 func.image_uri.clone()?
122 } else {
123 super::docker::runtime_to_image(&func.runtime)?
124 };
125 Some(self.backend.prepull_image(&image).await)
126 }
127
128 pub async fn invoke(
133 &self,
134 func: &LambdaFunction,
135 payload: &[u8],
136 layers: &[Vec<u8>],
137 ) -> Result<Vec<u8>, RuntimeError> {
138 let endpoint = self.ensure_warm_instance(func, layers).await?;
139
140 let url = format!("http://{endpoint}/2015-03-31/functions/function/invocations");
141 let client = reqwest::Client::new();
142 let resp = client
143 .post(&url)
144 .body(payload.to_vec())
145 .timeout(Duration::from_secs(func.timeout as u64 + 5))
146 .send()
147 .await
148 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
149
150 let body = resp
151 .bytes()
152 .await
153 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
154
155 Ok(body.to_vec())
156 }
157
158 pub async fn invoke_streaming(
165 &self,
166 func: &LambdaFunction,
167 payload: &[u8],
168 layers: &[Vec<u8>],
169 ) -> Result<StreamingInvocation, RuntimeError> {
170 let endpoint = self.ensure_warm_instance(func, layers).await?;
171
172 let url = format!("http://{endpoint}/2015-03-31/functions/function/invocations");
173 let client = reqwest::Client::new();
174 let resp = client
175 .post(&url)
176 .body(payload.to_vec())
177 .timeout(Duration::from_secs(func.timeout as u64 + 5))
178 .send()
179 .await
180 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
181
182 Ok(StreamingInvocation { resp })
183 }
184
185 async fn ensure_warm_instance(
190 &self,
191 func: &LambdaFunction,
192 layers: &[Vec<u8>],
193 ) -> Result<String, RuntimeError> {
194 let is_image = func.package_type == "Image";
195 if !is_image && func.code_zip.is_none() {
196 return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
197 }
198
199 let deploy_id = deploy_id_for(func, layers);
200
201 let endpoint = {
202 let entries = self.instances.read();
203 entries.get(&func.function_name).and_then(|e| {
204 if e.deploy_id == deploy_id {
205 *e.last_used.write() = Instant::now();
206 Some(e.instance.endpoint.clone())
207 } else {
208 None
209 }
210 })
211 };
212 if let Some(ep) = endpoint {
213 return Ok(ep);
214 }
215
216 let startup_lock = {
217 let mut starting = self.starting.write();
218 starting
219 .entry(func.function_name.clone())
220 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
221 .clone()
222 };
223 let _guard = startup_lock.lock().await;
224
225 let existing = {
227 let entries = self.instances.read();
228 entries
229 .get(&func.function_name)
230 .filter(|e| e.deploy_id == deploy_id)
231 .map(|e| {
232 *e.last_used.write() = Instant::now();
233 e.instance.endpoint.clone()
234 })
235 };
236 if let Some(ep) = existing {
237 return Ok(ep);
238 }
239
240 self.stop_container(&func.function_name).await;
241
242 let instance = self
243 .backend
244 .launch(func, func.code_zip.as_deref(), layers, &deploy_id)
245 .await?;
246 let endpoint = instance.endpoint.clone();
247 let entry = WarmEntry {
248 instance,
249 last_used: RwLock::new(Instant::now()),
250 deploy_id,
251 };
252 self.instances
253 .write()
254 .insert(func.function_name.clone(), entry);
255 Ok(endpoint)
256 }
257
258 pub async fn stop_container(&self, function_name: &str) {
260 let entry = self.instances.write().remove(function_name);
261 if let Some(entry) = entry {
262 tracing::info!(
263 function = %function_name,
264 handle = ?entry.instance.handle,
265 "stopping Lambda runtime instance"
266 );
267 self.backend.terminate(&entry.instance.handle).await;
268 }
269 }
270
271 pub async fn stop_all(&self) {
273 let entries: Vec<(String, WarmInstance)> = {
274 let mut map = self.instances.write();
275 map.drain().map(|(name, e)| (name, e.instance)).collect()
276 };
277 for (name, instance) in entries {
278 tracing::info!(
279 function = %name,
280 handle = ?instance.handle,
281 "stopping Lambda runtime instance (cleanup)"
282 );
283 self.backend.terminate(&instance.handle).await;
284 }
285 }
286
287 pub fn list_warm_containers(
289 &self,
290 lambda_state: &crate::state::SharedLambdaState,
291 ) -> Vec<serde_json::Value> {
292 let entries = self.instances.read();
293 let accounts = lambda_state.read();
294 entries
295 .iter()
296 .map(|(name, entry)| {
297 let runtime = accounts
298 .iter()
299 .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
300 .unwrap_or_default();
301 let last_used = entry.last_used.read();
302 let idle_secs = last_used.elapsed().as_secs();
303 let mut row = serde_json::json!({
304 "functionName": name,
305 "runtime": runtime,
306 "backend": self.backend.name(),
307 "lastUsedSecsAgo": idle_secs,
308 });
309 let obj = row.as_object_mut().expect("json object");
310 match &entry.instance.handle {
311 BackendHandle::Container { id } => {
312 obj.insert("containerId".into(), serde_json::Value::String(id.clone()));
313 }
314 BackendHandle::Pod { namespace, name } => {
315 obj.insert("podName".into(), serde_json::Value::String(name.clone()));
316 obj.insert(
317 "namespace".into(),
318 serde_json::Value::String(namespace.clone()),
319 );
320 }
321 }
322 row
323 })
324 .collect()
325 }
326
327 pub async fn evict_container(&self, function_name: &str) -> bool {
330 let entry = self.instances.write().remove(function_name);
331 if let Some(entry) = entry {
332 tracing::info!(
333 function = %function_name,
334 handle = ?entry.instance.handle,
335 "evicting Lambda runtime instance via simulation API"
336 );
337 self.backend.terminate(&entry.instance.handle).await;
338 true
339 } else {
340 false
341 }
342 }
343
344 pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
346 let mut interval = tokio::time::interval(Duration::from_secs(30));
347 loop {
348 interval.tick().await;
349 self.cleanup_idle(ttl).await;
350 }
351 }
352
353 async fn cleanup_idle(&self, ttl: Duration) {
354 let expired: Vec<String> = {
355 let entries = self.instances.read();
356 entries
357 .iter()
358 .filter(|(_, e)| e.last_used.read().elapsed() > ttl)
359 .map(|(name, _)| name.clone())
360 .collect()
361 };
362 for name in expired {
363 tracing::info!(function = %name, "stopping idle Lambda runtime instance");
364 self.stop_container(&name).await;
365 }
366 }
367}