Skip to main content

fakecloud_lambda/runtime/
facade.rs

1//! Backend-agnostic Lambda runtime facade.
2//!
3//! Owns the warm-pool bookkeeping, per-function startup serialization,
4//! and the HTTP invocation path. Dispatches container lifecycle to
5//! whatever [`LambdaBackend`] it was constructed with.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use base64::Engine;
12use parking_lot::RwLock;
13use sha2::{Digest, Sha256};
14
15use super::backend::{
16    BackendHandle, LambdaBackend, RuntimeError, StreamingInvocation, WarmInstance,
17};
18use super::docker::DockerBackend;
19use crate::state::LambdaFunction;
20
21/// A running runtime instance kept warm for reuse.
22struct WarmEntry {
23    instance: WarmInstance,
24    last_used: RwLock<Instant>,
25    /// Combined fingerprint of the function's code SHA-256 plus the
26    /// SHA-256 of every attached layer's ZIP bytes, joined in attach
27    /// order. Layers mutate `/opt`, so a layer change invalidates the
28    /// warm instance even when the function code is unchanged.
29    deploy_id: String,
30}
31
32/// Compute the warm-instance key for a function with its current layer
33/// set. Stable across calls — layer ARNs are immutable in AWS, so the
34/// hash of their bytes is the right cache key.
35fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
36    let mut hasher = Sha256::new();
37    hasher.update(func.code_sha256.as_bytes());
38    for bytes in layers {
39        let mut layer_hasher = Sha256::new();
40        layer_hasher.update(bytes);
41        hasher.update(b":");
42        hasher.update(layer_hasher.finalize());
43    }
44    base64::engine::general_purpose::STANDARD.encode(hasher.finalize())
45}
46
47pub struct LambdaRuntime {
48    backend: Arc<dyn LambdaBackend>,
49    instances: RwLock<HashMap<String, WarmEntry>>,
50    /// Serializes runtime startup per function to prevent duplicate instances.
51    starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
52}
53
54impl LambdaRuntime {
55    /// Construct a runtime over the supplied backend. Callers that want
56    /// auto-detection should use [`Self::auto_detect_docker`] or
57    /// [`Self::new`].
58    pub fn from_backend(backend: Arc<dyn LambdaBackend>) -> Self {
59        Self {
60            backend,
61            instances: RwLock::new(HashMap::new()),
62            starting: RwLock::new(HashMap::new()),
63        }
64    }
65
66    /// Auto-detect Docker or Podman. Returns `None` if neither is available.
67    /// Override with `FAKECLOUD_CONTAINER_CLI` env var.
68    pub fn auto_detect_docker(server_port: u16) -> Option<Self> {
69        DockerBackend::auto_detect(server_port)
70            .map(|b| Self::from_backend(Arc::new(b) as Arc<dyn LambdaBackend>))
71    }
72
73    /// Backwards-compatible alias for [`Self::auto_detect_docker`].
74    /// Callers across the workspace use `ContainerRuntime::new(port)`.
75    pub fn new(server_port: u16) -> Option<Self> {
76        Self::auto_detect_docker(server_port)
77    }
78
79    /// Construct a runtime backed by the Kubernetes backend. Reads
80    /// configuration from env vars (`FAKECLOUD_K8S_SELF_URL`,
81    /// `FAKECLOUD_K8S_NAMESPACE`, etc.) and connects to the cluster
82    /// via in-cluster service account or kubeconfig. Hard-fails on
83    /// any configuration or connectivity issue — we don't silently
84    /// fall back to Docker because the operator explicitly opted in
85    /// to K8s.
86    ///
87    /// `internal_token` is the bearer token the artifact endpoints on
88    /// the fakecloud server expect from Pod init containers — caller
89    /// must register the same token on those endpoints.
90    pub async fn new_k8s(
91        server_port: u16,
92        internal_token: String,
93    ) -> Result<Self, super::k8s::K8sBackendError> {
94        let backend = super::k8s::K8sBackend::from_env(server_port, internal_token).await?;
95        backend.reap_stale().await;
96        Ok(Self::from_backend(Arc::new(backend)))
97    }
98
99    pub fn cli_name(&self) -> &str {
100        self.backend.name()
101    }
102
103    /// Background pre-warm hook: pull the image a Zip-package function
104    /// will need at invoke time, or the `ImageUri` of an Image-package
105    /// function. The first cold pull of an AWS base image (~700 MB)
106    /// frequently exceeds the AWS CLI default 60s read timeout, surfacing
107    /// to users as `Connection was closed` (issue #1539). Call after
108    /// `CreateFunction` persists so the warm path is ready before the
109    /// caller turns around and calls `Invoke`.
110    ///
111    /// Returns `None` if the function has no resolvable image (e.g. an
112    /// unsupported runtime string we can't map to a base image).
113    /// Otherwise returns the result of the backend's `prepull_image` —
114    /// callers log failures and move on, since invoke time still
115    /// re-attempts the pull as a fallback.
116    pub async fn prepull_for_function(
117        &self,
118        func: &LambdaFunction,
119    ) -> Option<Result<(), super::backend::RuntimeError>> {
120        let image = if func.package_type == "Image" {
121            func.image_uri.clone()?
122        } else {
123            super::docker::runtime_to_image(&func.runtime)?
124        };
125        Some(self.backend.prepull_image(&image).await)
126    }
127
128    /// Invoke a Lambda function, starting an instance if needed. Layer
129    /// ZIPs are extracted into `/opt` of the runtime sandbox; AWS base
130    /// images already include `/opt/python`, `/opt/nodejs/node_modules`,
131    /// `/opt/lib`, and `/opt/bin` on the right import paths.
132    pub async fn invoke(
133        &self,
134        func: &LambdaFunction,
135        payload: &[u8],
136        layers: &[Vec<u8>],
137    ) -> Result<Vec<u8>, RuntimeError> {
138        let endpoint = self.ensure_warm_instance(func, layers).await?;
139
140        let url = format!("http://{endpoint}/2015-03-31/functions/function/invocations");
141        let client = reqwest::Client::new();
142        let resp = client
143            .post(&url)
144            .body(payload.to_vec())
145            .timeout(Duration::from_secs(func.timeout as u64 + 5))
146            .send()
147            .await
148            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
149
150        let body = resp
151            .bytes()
152            .await
153            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
154
155        Ok(body.to_vec())
156    }
157
158    /// Invoke a Lambda function and yield the raw HTTP body as a stream
159    /// of byte chunks. Each chunk corresponds to one HTTP frame the RIE
160    /// flushed to the wire — for streaming-aware handlers this
161    /// preserves the chunk boundaries the function emitted. Buffered
162    /// handlers come back as a single chunk, which is still a valid
163    /// streamed response.
164    pub async fn invoke_streaming(
165        &self,
166        func: &LambdaFunction,
167        payload: &[u8],
168        layers: &[Vec<u8>],
169    ) -> Result<StreamingInvocation, RuntimeError> {
170        let endpoint = self.ensure_warm_instance(func, layers).await?;
171
172        let url = format!("http://{endpoint}/2015-03-31/functions/function/invocations");
173        let client = reqwest::Client::new();
174        let resp = client
175            .post(&url)
176            .body(payload.to_vec())
177            .timeout(Duration::from_secs(func.timeout as u64 + 5))
178            .send()
179            .await
180            .map_err(|e| RuntimeError::InvocationFailed(e.to_string()))?;
181
182        Ok(StreamingInvocation { resp })
183    }
184
185    /// Resolve a warm instance for `func`, launching one if its
186    /// fingerprint doesn't match (or there isn't one yet). Returns the
187    /// invocation endpoint. Shared by `invoke` and `invoke_streaming`
188    /// so both paths use the same warm-pool logic.
189    async fn ensure_warm_instance(
190        &self,
191        func: &LambdaFunction,
192        layers: &[Vec<u8>],
193    ) -> Result<String, RuntimeError> {
194        let is_image = func.package_type == "Image";
195        if !is_image && func.code_zip.is_none() {
196            return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
197        }
198
199        let deploy_id = deploy_id_for(func, layers);
200
201        let endpoint = {
202            let entries = self.instances.read();
203            entries.get(&func.function_name).and_then(|e| {
204                if e.deploy_id == deploy_id {
205                    *e.last_used.write() = Instant::now();
206                    Some(e.instance.endpoint.clone())
207                } else {
208                    None
209                }
210            })
211        };
212        if let Some(ep) = endpoint {
213            return Ok(ep);
214        }
215
216        let startup_lock = {
217            let mut starting = self.starting.write();
218            starting
219                .entry(func.function_name.clone())
220                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
221                .clone()
222        };
223        let _guard = startup_lock.lock().await;
224
225        // Re-check after acquiring lock — another task may have launched it.
226        let existing = {
227            let entries = self.instances.read();
228            entries
229                .get(&func.function_name)
230                .filter(|e| e.deploy_id == deploy_id)
231                .map(|e| {
232                    *e.last_used.write() = Instant::now();
233                    e.instance.endpoint.clone()
234                })
235        };
236        if let Some(ep) = existing {
237            return Ok(ep);
238        }
239
240        self.stop_container(&func.function_name).await;
241
242        let instance = self
243            .backend
244            .launch(func, func.code_zip.as_deref(), layers, &deploy_id)
245            .await?;
246        let endpoint = instance.endpoint.clone();
247        let entry = WarmEntry {
248            instance,
249            last_used: RwLock::new(Instant::now()),
250            deploy_id,
251        };
252        self.instances
253            .write()
254            .insert(func.function_name.clone(), entry);
255        Ok(endpoint)
256    }
257
258    /// Stop and remove a warm instance for a specific function.
259    pub async fn stop_container(&self, function_name: &str) {
260        let entry = self.instances.write().remove(function_name);
261        if let Some(entry) = entry {
262            tracing::info!(
263                function = %function_name,
264                handle = ?entry.instance.handle,
265                "stopping Lambda runtime instance"
266            );
267            self.backend.terminate(&entry.instance.handle).await;
268        }
269    }
270
271    /// Stop and remove all warm instances (used on server shutdown or reset).
272    pub async fn stop_all(&self) {
273        let entries: Vec<(String, WarmInstance)> = {
274            let mut map = self.instances.write();
275            map.drain().map(|(name, e)| (name, e.instance)).collect()
276        };
277        for (name, instance) in entries {
278            tracing::info!(
279                function = %name,
280                handle = ?instance.handle,
281                "stopping Lambda runtime instance (cleanup)"
282            );
283            self.backend.terminate(&instance.handle).await;
284        }
285    }
286
287    /// List all warm instances and their metadata for introspection.
288    pub fn list_warm_containers(
289        &self,
290        lambda_state: &crate::state::SharedLambdaState,
291    ) -> Vec<serde_json::Value> {
292        let entries = self.instances.read();
293        let accounts = lambda_state.read();
294        entries
295            .iter()
296            .map(|(name, entry)| {
297                let runtime = accounts
298                    .iter()
299                    .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
300                    .unwrap_or_default();
301                let last_used = entry.last_used.read();
302                let idle_secs = last_used.elapsed().as_secs();
303                let mut row = serde_json::json!({
304                    "functionName": name,
305                    "runtime": runtime,
306                    "backend": self.backend.name(),
307                    "lastUsedSecsAgo": idle_secs,
308                });
309                let obj = row.as_object_mut().expect("json object");
310                match &entry.instance.handle {
311                    BackendHandle::Container { id } => {
312                        obj.insert("containerId".into(), serde_json::Value::String(id.clone()));
313                    }
314                    BackendHandle::Pod { namespace, name } => {
315                        obj.insert("podName".into(), serde_json::Value::String(name.clone()));
316                        obj.insert(
317                            "namespace".into(),
318                            serde_json::Value::String(namespace.clone()),
319                        );
320                    }
321                }
322                row
323            })
324            .collect()
325    }
326
327    /// Evict (stop and remove) the warm instance for a specific function.
328    /// Returns true if an instance was found and evicted.
329    pub async fn evict_container(&self, function_name: &str) -> bool {
330        let entry = self.instances.write().remove(function_name);
331        if let Some(entry) = entry {
332            tracing::info!(
333                function = %function_name,
334                handle = ?entry.instance.handle,
335                "evicting Lambda runtime instance via simulation API"
336            );
337            self.backend.terminate(&entry.instance.handle).await;
338            true
339        } else {
340            false
341        }
342    }
343
344    /// Background loop that stops instances idle longer than `ttl`.
345    pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
346        let mut interval = tokio::time::interval(Duration::from_secs(30));
347        loop {
348            interval.tick().await;
349            self.cleanup_idle(ttl).await;
350        }
351    }
352
353    async fn cleanup_idle(&self, ttl: Duration) {
354        let expired: Vec<String> = {
355            let entries = self.instances.read();
356            entries
357                .iter()
358                .filter(|(_, e)| e.last_used.read().elapsed() > ttl)
359                .map(|(name, _)| name.clone())
360                .collect()
361        };
362        for name in expired {
363            tracing::info!(function = %name, "stopping idle Lambda runtime instance");
364            self.stop_container(&name).await;
365        }
366    }
367}