Skip to main content

fakecloud_lambda/runtime/
facade.rs

1//! Backend-agnostic Lambda runtime facade.
2//!
3//! Owns the warm-pool bookkeeping, per-function startup serialization,
4//! and the HTTP invocation path. Dispatches container lifecycle to
5//! whatever [`LambdaBackend`] it was constructed with.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use base64::Engine;
12use parking_lot::RwLock;
13use sha2::{Digest, Sha256};
14
15use super::backend::{
16    BackendHandle, LambdaBackend, RuntimeError, StreamingInvocation, WarmInstance,
17};
18use super::docker::DockerBackend;
19use crate::state::LambdaFunction;
20
21/// A running runtime instance kept warm for reuse.
22struct WarmEntry {
23    instance: WarmInstance,
24    last_used: RwLock<Instant>,
25    /// Combined fingerprint of the function's code SHA-256 plus the
26    /// SHA-256 of every attached layer's ZIP bytes, joined in attach
27    /// order. Layers mutate `/opt`, so a layer change invalidates the
28    /// warm instance even when the function code is unchanged.
29    deploy_id: String,
30}
31
32/// Compute the warm-instance key for a function with its current layer
33/// set. Stable across calls — layer ARNs are immutable in AWS, so the
34/// hash of their bytes is the right cache key.
35fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
36    let mut hasher = Sha256::new();
37    hasher.update(func.code_sha256.as_bytes());
38    for bytes in layers {
39        let mut layer_hasher = Sha256::new();
40        layer_hasher.update(bytes);
41        hasher.update(b":");
42        hasher.update(layer_hasher.finalize());
43    }
44    base64::engine::general_purpose::STANDARD.encode(hasher.finalize())
45}
46
47pub struct LambdaRuntime {
48    backend: Arc<dyn LambdaBackend>,
49    instances: RwLock<HashMap<String, WarmEntry>>,
50    /// Serializes runtime startup per function to prevent duplicate instances.
51    starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
52}
53
54impl LambdaRuntime {
55    /// Construct a runtime over the supplied backend. Callers that want
56    /// auto-detection should use [`Self::auto_detect_docker`] or
57    /// [`Self::new`].
58    pub fn from_backend(backend: Arc<dyn LambdaBackend>) -> Self {
59        Self {
60            backend,
61            instances: RwLock::new(HashMap::new()),
62            starting: RwLock::new(HashMap::new()),
63        }
64    }
65
66    /// Auto-detect Docker or Podman. Returns `None` if neither is available.
67    /// Override with `FAKECLOUD_CONTAINER_CLI` env var.
68    pub fn auto_detect_docker(server_port: u16) -> Option<Self> {
69        DockerBackend::auto_detect(server_port)
70            .map(|b| Self::from_backend(Arc::new(b) as Arc<dyn LambdaBackend>))
71    }
72
73    /// Backwards-compatible alias for [`Self::auto_detect_docker`].
74    /// Callers across the workspace use `ContainerRuntime::new(port)`.
75    pub fn new(server_port: u16) -> Option<Self> {
76        Self::auto_detect_docker(server_port)
77    }
78
79    /// Construct a runtime backed by the Kubernetes backend. Reads
80    /// configuration from env vars (`FAKECLOUD_K8S_SELF_URL`,
81    /// `FAKECLOUD_K8S_NAMESPACE`, etc.) and connects to the cluster
82    /// via in-cluster service account or kubeconfig. Hard-fails on
83    /// any configuration or connectivity issue — we don't silently
84    /// fall back to Docker because the operator explicitly opted in
85    /// to K8s.
86    ///
87    /// `internal_token` is the bearer token the artifact endpoints on
88    /// the fakecloud server expect from Pod init containers — caller
89    /// must register the same token on those endpoints.
90    pub async fn new_k8s(
91        server_port: u16,
92        internal_token: String,
93    ) -> Result<Self, super::k8s::K8sBackendError> {
94        let backend = super::k8s::K8sBackend::from_env(server_port, internal_token).await?;
95        backend.reap_stale().await;
96        Ok(Self::from_backend(Arc::new(backend)))
97    }
98
99    pub fn cli_name(&self) -> &str {
100        self.backend.name()
101    }
102
103    /// Invoke a Lambda function, starting an instance if needed. Layer
104    /// ZIPs are extracted into `/opt` of the runtime sandbox; AWS base
105    /// images already include `/opt/python`, `/opt/nodejs/node_modules`,
106    /// `/opt/lib`, and `/opt/bin` on the right import paths.
107    pub async fn invoke(
108        &self,
109        func: &LambdaFunction,
110        payload: &[u8],
111        layers: &[Vec<u8>],
112    ) -> Result<Vec<u8>, RuntimeError> {
113        let endpoint = self.ensure_warm_instance(func, layers).await?;
114
115        let url = format!("http://{endpoint}/2015-03-31/functions/function/invocations");
116        let client = reqwest::Client::new();
117        let resp = client
118            .post(&url)
119            .body(payload.to_vec())
120            .timeout(Duration::from_secs(func.timeout as u64 + 5))
121            .send()
122            .await
123            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
124
125        let body = resp
126            .bytes()
127            .await
128            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
129
130        Ok(body.to_vec())
131    }
132
133    /// Invoke a Lambda function and yield the raw HTTP body as a stream
134    /// of byte chunks. Each chunk corresponds to one HTTP frame the RIE
135    /// flushed to the wire — for streaming-aware handlers this
136    /// preserves the chunk boundaries the function emitted. Buffered
137    /// handlers come back as a single chunk, which is still a valid
138    /// streamed response.
139    pub async fn invoke_streaming(
140        &self,
141        func: &LambdaFunction,
142        payload: &[u8],
143        layers: &[Vec<u8>],
144    ) -> Result<StreamingInvocation, RuntimeError> {
145        let endpoint = self.ensure_warm_instance(func, layers).await?;
146
147        let url = format!("http://{endpoint}/2015-03-31/functions/function/invocations");
148        let client = reqwest::Client::new();
149        let resp = client
150            .post(&url)
151            .body(payload.to_vec())
152            .timeout(Duration::from_secs(func.timeout as u64 + 5))
153            .send()
154            .await
155            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
156
157        Ok(StreamingInvocation { resp })
158    }
159
160    /// Resolve a warm instance for `func`, launching one if its
161    /// fingerprint doesn't match (or there isn't one yet). Returns the
162    /// invocation endpoint. Shared by `invoke` and `invoke_streaming`
163    /// so both paths use the same warm-pool logic.
164    async fn ensure_warm_instance(
165        &self,
166        func: &LambdaFunction,
167        layers: &[Vec<u8>],
168    ) -> Result<String, RuntimeError> {
169        let is_image = func.package_type == "Image";
170        if !is_image && func.code_zip.is_none() {
171            return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
172        }
173
174        let deploy_id = deploy_id_for(func, layers);
175
176        let endpoint = {
177            let entries = self.instances.read();
178            entries.get(&func.function_name).and_then(|e| {
179                if e.deploy_id == deploy_id {
180                    *e.last_used.write() = Instant::now();
181                    Some(e.instance.endpoint.clone())
182                } else {
183                    None
184                }
185            })
186        };
187        if let Some(ep) = endpoint {
188            return Ok(ep);
189        }
190
191        let startup_lock = {
192            let mut starting = self.starting.write();
193            starting
194                .entry(func.function_name.clone())
195                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
196                .clone()
197        };
198        let _guard = startup_lock.lock().await;
199
200        // Re-check after acquiring lock — another task may have launched it.
201        let existing = {
202            let entries = self.instances.read();
203            entries
204                .get(&func.function_name)
205                .filter(|e| e.deploy_id == deploy_id)
206                .map(|e| {
207                    *e.last_used.write() = Instant::now();
208                    e.instance.endpoint.clone()
209                })
210        };
211        if let Some(ep) = existing {
212            return Ok(ep);
213        }
214
215        self.stop_container(&func.function_name).await;
216
217        let instance = self
218            .backend
219            .launch(func, func.code_zip.as_deref(), layers, &deploy_id)
220            .await?;
221        let endpoint = instance.endpoint.clone();
222        let entry = WarmEntry {
223            instance,
224            last_used: RwLock::new(Instant::now()),
225            deploy_id,
226        };
227        self.instances
228            .write()
229            .insert(func.function_name.clone(), entry);
230        Ok(endpoint)
231    }
232
233    /// Stop and remove a warm instance for a specific function.
234    pub async fn stop_container(&self, function_name: &str) {
235        let entry = self.instances.write().remove(function_name);
236        if let Some(entry) = entry {
237            tracing::info!(
238                function = %function_name,
239                handle = ?entry.instance.handle,
240                "stopping Lambda runtime instance"
241            );
242            self.backend.terminate(&entry.instance.handle).await;
243        }
244    }
245
246    /// Stop and remove all warm instances (used on server shutdown or reset).
247    pub async fn stop_all(&self) {
248        let entries: Vec<(String, WarmInstance)> = {
249            let mut map = self.instances.write();
250            map.drain().map(|(name, e)| (name, e.instance)).collect()
251        };
252        for (name, instance) in entries {
253            tracing::info!(
254                function = %name,
255                handle = ?instance.handle,
256                "stopping Lambda runtime instance (cleanup)"
257            );
258            self.backend.terminate(&instance.handle).await;
259        }
260    }
261
262    /// List all warm instances and their metadata for introspection.
263    pub fn list_warm_containers(
264        &self,
265        lambda_state: &crate::state::SharedLambdaState,
266    ) -> Vec<serde_json::Value> {
267        let entries = self.instances.read();
268        let accounts = lambda_state.read();
269        entries
270            .iter()
271            .map(|(name, entry)| {
272                let runtime = accounts
273                    .iter()
274                    .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
275                    .unwrap_or_default();
276                let last_used = entry.last_used.read();
277                let idle_secs = last_used.elapsed().as_secs();
278                let mut row = serde_json::json!({
279                    "functionName": name,
280                    "runtime": runtime,
281                    "backend": self.backend.name(),
282                    "lastUsedSecsAgo": idle_secs,
283                });
284                let obj = row.as_object_mut().expect("json object");
285                match &entry.instance.handle {
286                    BackendHandle::Container { id } => {
287                        obj.insert("containerId".into(), serde_json::Value::String(id.clone()));
288                    }
289                    BackendHandle::Pod { namespace, name } => {
290                        obj.insert("podName".into(), serde_json::Value::String(name.clone()));
291                        obj.insert(
292                            "namespace".into(),
293                            serde_json::Value::String(namespace.clone()),
294                        );
295                    }
296                }
297                row
298            })
299            .collect()
300    }
301
302    /// Evict (stop and remove) the warm instance for a specific function.
303    /// Returns true if an instance was found and evicted.
304    pub async fn evict_container(&self, function_name: &str) -> bool {
305        let entry = self.instances.write().remove(function_name);
306        if let Some(entry) = entry {
307            tracing::info!(
308                function = %function_name,
309                handle = ?entry.instance.handle,
310                "evicting Lambda runtime instance via simulation API"
311            );
312            self.backend.terminate(&entry.instance.handle).await;
313            true
314        } else {
315            false
316        }
317    }
318
319    /// Background loop that stops instances idle longer than `ttl`.
320    pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
321        let mut interval = tokio::time::interval(Duration::from_secs(30));
322        loop {
323            interval.tick().await;
324            self.cleanup_idle(ttl).await;
325        }
326    }
327
328    async fn cleanup_idle(&self, ttl: Duration) {
329        let expired: Vec<String> = {
330            let entries = self.instances.read();
331            entries
332                .iter()
333                .filter(|(_, e)| e.last_used.read().elapsed() > ttl)
334                .map(|(name, _)| name.clone())
335                .collect()
336        };
337        for name in expired {
338            tracing::info!(function = %name, "stopping idle Lambda runtime instance");
339            self.stop_container(&name).await;
340        }
341    }
342}