fakecloud_lambda/runtime/
facade.rs1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use base64::Engine;
12use parking_lot::RwLock;
13use sha2::{Digest, Sha256};
14
15use super::backend::{
16 BackendHandle, LambdaBackend, RuntimeError, StreamingInvocation, WarmInstance,
17};
18use super::docker::DockerBackend;
19use crate::state::LambdaFunction;
20
21struct WarmEntry {
23 instance: WarmInstance,
24 last_used: RwLock<Instant>,
25 deploy_id: String,
30}
31
32fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
36 let mut hasher = Sha256::new();
37 hasher.update(func.code_sha256.as_bytes());
38 for bytes in layers {
39 let mut layer_hasher = Sha256::new();
40 layer_hasher.update(bytes);
41 hasher.update(b":");
42 hasher.update(layer_hasher.finalize());
43 }
44 base64::engine::general_purpose::STANDARD.encode(hasher.finalize())
45}
46
47pub struct LambdaRuntime {
48 backend: Arc<dyn LambdaBackend>,
49 instances: RwLock<HashMap<String, WarmEntry>>,
50 starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
52}
53
54impl LambdaRuntime {
55 pub fn from_backend(backend: Arc<dyn LambdaBackend>) -> Self {
59 Self {
60 backend,
61 instances: RwLock::new(HashMap::new()),
62 starting: RwLock::new(HashMap::new()),
63 }
64 }
65
66 pub fn auto_detect_docker(server_port: u16) -> Option<Self> {
69 DockerBackend::auto_detect(server_port)
70 .map(|b| Self::from_backend(Arc::new(b) as Arc<dyn LambdaBackend>))
71 }
72
73 pub fn new(server_port: u16) -> Option<Self> {
76 Self::auto_detect_docker(server_port)
77 }
78
79 pub async fn new_k8s(
91 server_port: u16,
92 internal_token: String,
93 ) -> Result<Self, super::k8s::K8sBackendError> {
94 let backend = super::k8s::K8sBackend::from_env(server_port, internal_token).await?;
95 backend.reap_stale().await;
96 Ok(Self::from_backend(Arc::new(backend)))
97 }
98
99 pub fn cli_name(&self) -> &str {
100 self.backend.name()
101 }
102
103 pub async fn invoke(
108 &self,
109 func: &LambdaFunction,
110 payload: &[u8],
111 layers: &[Vec<u8>],
112 ) -> Result<Vec<u8>, RuntimeError> {
113 let endpoint = self.ensure_warm_instance(func, layers).await?;
114
115 let url = format!("http://{endpoint}/2015-03-31/functions/function/invocations");
116 let client = reqwest::Client::new();
117 let resp = client
118 .post(&url)
119 .body(payload.to_vec())
120 .timeout(Duration::from_secs(func.timeout as u64 + 5))
121 .send()
122 .await
123 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
124
125 let body = resp
126 .bytes()
127 .await
128 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
129
130 Ok(body.to_vec())
131 }
132
133 pub async fn invoke_streaming(
140 &self,
141 func: &LambdaFunction,
142 payload: &[u8],
143 layers: &[Vec<u8>],
144 ) -> Result<StreamingInvocation, RuntimeError> {
145 let endpoint = self.ensure_warm_instance(func, layers).await?;
146
147 let url = format!("http://{endpoint}/2015-03-31/functions/function/invocations");
148 let client = reqwest::Client::new();
149 let resp = client
150 .post(&url)
151 .body(payload.to_vec())
152 .timeout(Duration::from_secs(func.timeout as u64 + 5))
153 .send()
154 .await
155 .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
156
157 Ok(StreamingInvocation { resp })
158 }
159
160 async fn ensure_warm_instance(
165 &self,
166 func: &LambdaFunction,
167 layers: &[Vec<u8>],
168 ) -> Result<String, RuntimeError> {
169 let is_image = func.package_type == "Image";
170 if !is_image && func.code_zip.is_none() {
171 return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
172 }
173
174 let deploy_id = deploy_id_for(func, layers);
175
176 let endpoint = {
177 let entries = self.instances.read();
178 entries.get(&func.function_name).and_then(|e| {
179 if e.deploy_id == deploy_id {
180 *e.last_used.write() = Instant::now();
181 Some(e.instance.endpoint.clone())
182 } else {
183 None
184 }
185 })
186 };
187 if let Some(ep) = endpoint {
188 return Ok(ep);
189 }
190
191 let startup_lock = {
192 let mut starting = self.starting.write();
193 starting
194 .entry(func.function_name.clone())
195 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
196 .clone()
197 };
198 let _guard = startup_lock.lock().await;
199
200 let existing = {
202 let entries = self.instances.read();
203 entries
204 .get(&func.function_name)
205 .filter(|e| e.deploy_id == deploy_id)
206 .map(|e| {
207 *e.last_used.write() = Instant::now();
208 e.instance.endpoint.clone()
209 })
210 };
211 if let Some(ep) = existing {
212 return Ok(ep);
213 }
214
215 self.stop_container(&func.function_name).await;
216
217 let instance = self
218 .backend
219 .launch(func, func.code_zip.as_deref(), layers, &deploy_id)
220 .await?;
221 let endpoint = instance.endpoint.clone();
222 let entry = WarmEntry {
223 instance,
224 last_used: RwLock::new(Instant::now()),
225 deploy_id,
226 };
227 self.instances
228 .write()
229 .insert(func.function_name.clone(), entry);
230 Ok(endpoint)
231 }
232
233 pub async fn stop_container(&self, function_name: &str) {
235 let entry = self.instances.write().remove(function_name);
236 if let Some(entry) = entry {
237 tracing::info!(
238 function = %function_name,
239 handle = ?entry.instance.handle,
240 "stopping Lambda runtime instance"
241 );
242 self.backend.terminate(&entry.instance.handle).await;
243 }
244 }
245
246 pub async fn stop_all(&self) {
248 let entries: Vec<(String, WarmInstance)> = {
249 let mut map = self.instances.write();
250 map.drain().map(|(name, e)| (name, e.instance)).collect()
251 };
252 for (name, instance) in entries {
253 tracing::info!(
254 function = %name,
255 handle = ?instance.handle,
256 "stopping Lambda runtime instance (cleanup)"
257 );
258 self.backend.terminate(&instance.handle).await;
259 }
260 }
261
262 pub fn list_warm_containers(
264 &self,
265 lambda_state: &crate::state::SharedLambdaState,
266 ) -> Vec<serde_json::Value> {
267 let entries = self.instances.read();
268 let accounts = lambda_state.read();
269 entries
270 .iter()
271 .map(|(name, entry)| {
272 let runtime = accounts
273 .iter()
274 .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
275 .unwrap_or_default();
276 let last_used = entry.last_used.read();
277 let idle_secs = last_used.elapsed().as_secs();
278 let mut row = serde_json::json!({
279 "functionName": name,
280 "runtime": runtime,
281 "backend": self.backend.name(),
282 "lastUsedSecsAgo": idle_secs,
283 });
284 let obj = row.as_object_mut().expect("json object");
285 match &entry.instance.handle {
286 BackendHandle::Container { id } => {
287 obj.insert("containerId".into(), serde_json::Value::String(id.clone()));
288 }
289 BackendHandle::Pod { namespace, name } => {
290 obj.insert("podName".into(), serde_json::Value::String(name.clone()));
291 obj.insert(
292 "namespace".into(),
293 serde_json::Value::String(namespace.clone()),
294 );
295 }
296 }
297 row
298 })
299 .collect()
300 }
301
302 pub async fn evict_container(&self, function_name: &str) -> bool {
305 let entry = self.instances.write().remove(function_name);
306 if let Some(entry) = entry {
307 tracing::info!(
308 function = %function_name,
309 handle = ?entry.instance.handle,
310 "evicting Lambda runtime instance via simulation API"
311 );
312 self.backend.terminate(&entry.instance.handle).await;
313 true
314 } else {
315 false
316 }
317 }
318
319 pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
321 let mut interval = tokio::time::interval(Duration::from_secs(30));
322 loop {
323 interval.tick().await;
324 self.cleanup_idle(ttl).await;
325 }
326 }
327
328 async fn cleanup_idle(&self, ttl: Duration) {
329 let expired: Vec<String> = {
330 let entries = self.instances.read();
331 entries
332 .iter()
333 .filter(|(_, e)| e.last_used.read().elapsed() > ttl)
334 .map(|(name, _)| name.clone())
335 .collect()
336 };
337 for name in expired {
338 tracing::info!(function = %name, "stopping idle Lambda runtime instance");
339 self.stop_container(&name).await;
340 }
341 }
342}