Skip to main content

fakecloud_lambda/runtime/
facade.rs

1//! Backend-agnostic Lambda runtime facade.
2//!
3//! Owns the warm-pool bookkeeping, per-function startup serialization,
4//! and the HTTP invocation path. Dispatches container lifecycle to
5//! whatever [`LambdaBackend`] it was constructed with.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use base64::Engine;
12use parking_lot::RwLock;
13use sha2::{Digest, Sha256};
14
15use super::backend::{
16    BackendHandle, LambdaBackend, RuntimeError, StreamingInvocation, WarmInstance,
17};
18use super::docker::DockerBackend;
19use crate::state::LambdaFunction;
20
21/// A running runtime instance kept warm for reuse.
22pub(crate) struct WarmEntry {
23    instance: WarmInstance,
24    last_used: RwLock<Instant>,
25    /// Combined fingerprint of the function's code SHA-256 plus the
26    /// SHA-256 of every attached layer's ZIP bytes, joined in attach
27    /// order. Layers mutate `/opt`, so a layer change invalidates the
28    /// warm instance even when the function code is unchanged.
29    deploy_id: String,
30    /// Held for the duration of a single invocation against this
31    /// instance. The AWS Runtime Interface Emulator (and real Lambda)
32    /// handles exactly one event per execution environment at a time;
33    /// the RIE's `rapidcore` server nil-pointer-derefs and the process
34    /// exits if two invokes overlap (issue #1644). Acquiring this lock
35    /// before forwarding guarantees one in-flight event per instance,
36    /// and lets the pool pick a *free* instance via `try_lock`.
37    busy: Arc<tokio::sync::Mutex<()>>,
38}
39
40/// Default cap on warm instances per function. Real Lambda scales
41/// execution environments with concurrent demand; we bound it so a
42/// burst can't spawn unbounded containers/Pods. Override with
43/// `FAKECLOUD_LAMBDA_MAX_CONCURRENCY`. Beyond the cap, invocations queue
44/// on a busy instance rather than starting a new one.
45const DEFAULT_MAX_CONCURRENCY: usize = 10;
46
47/// Max attempts per invocation when a reserved warm instance turns out to be
48/// unreachable. Each failover is a fast probe (or connect error) plus a cold
49/// start; the connection provably never reached the handler, so retrying can't
50/// double-execute. Bounded so an all-dead pool can't spin forever.
51const MAX_INVOKE_ATTEMPTS: u32 = 5;
52
53/// Timeout for the pre-invoke TCP reachability probe. A black-holed Pod IP (a
54/// killed pod / drained node that drops packets with no RST) would otherwise
55/// hang the full invoke timeout (~`func.timeout + 5s`); this detects it in ~1s
56/// so the state-machine retry can succeed within its window.
57const REACHABILITY_PROBE_TIMEOUT: Duration = Duration::from_millis(1500);
58
59/// A reserved invocation slot: a warm instance plus the held busy guard
60/// that grants exclusive use of it until the guard drops.
61struct Slot {
62    entry: Arc<WarmEntry>,
63    guard: tokio::sync::OwnedMutexGuard<()>,
64}
65
66/// Compute the warm-instance key for a function with its current layer
67/// set. Stable across calls — layer ARNs are immutable in AWS, so the
68/// hash of their bytes is the right cache key.
69///
70/// Encoded with `URL_SAFE_NO_PAD` so the result never contains `/`, `+`,
71/// or `=`. The id is spliced raw into the init-container artifact URL
72/// (`.../_internal/code/{account}/{function}/{deploy}.zip`) and into the
73/// `fakecloud-deploy-id` Pod label; standard base64's `/` would grow an
74/// extra URL path segment, break the axum route match, and wedge the Pod
75/// in a cold-start loop for ~49% of deploys (issue #1643).
76fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
77    deploy_id_from(&func.code_sha256, layers)
78}
79
80/// Pure core of [`deploy_id_for`], split out so the URL-path-safety
81/// invariant can be tested without constructing a full `LambdaFunction`.
82fn deploy_id_from(code_sha256: &str, layers: &[Vec<u8>]) -> String {
83    let mut hasher = Sha256::new();
84    hasher.update(code_sha256.as_bytes());
85    for bytes in layers {
86        let mut layer_hasher = Sha256::new();
87        layer_hasher.update(bytes);
88        hasher.update(b":");
89        hasher.update(layer_hasher.finalize());
90    }
91    base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(hasher.finalize())
92}
93
94/// Quick liveness check: can a TCP connection to `endpoint` (`host:port`) be
95/// opened within `timeout`? Used before forwarding a payload to a warm instance
96/// so a dead/black-holed Pod is detected in ~1s instead of hanging the full
97/// invoke timeout. A failed connect provably never reached the handler, so the
98/// caller can safely evict and retry.
99async fn endpoint_reachable(endpoint: &str, timeout: Duration) -> bool {
100    matches!(
101        tokio::time::timeout(timeout, tokio::net::TcpStream::connect(endpoint)).await,
102        Ok(Ok(_))
103    )
104}
105
106pub struct LambdaRuntime {
107    backend: Arc<dyn LambdaBackend>,
108    /// Per-function pool of warm instances. Each instance serves one
109    /// invocation at a time (its `busy` lock); the pool grows on demand
110    /// up to `max_concurrency`, and the idle reaper trims it.
111    instances: RwLock<HashMap<String, Vec<Arc<WarmEntry>>>>,
112    /// Serializes runtime startup per function to prevent duplicate
113    /// instances racing into the pool when several cold invokes arrive
114    /// together.
115    starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
116    /// Cap on warm instances per function.
117    max_concurrency: usize,
118}
119
120impl LambdaRuntime {
121    /// Construct a runtime over the supplied backend. Callers that want
122    /// auto-detection should use [`Self::auto_detect_docker`] or
123    /// [`Self::new`].
124    pub fn from_backend(backend: Arc<dyn LambdaBackend>) -> Self {
125        let max_concurrency = std::env::var("FAKECLOUD_LAMBDA_MAX_CONCURRENCY")
126            .ok()
127            .and_then(|v| v.parse::<usize>().ok())
128            .filter(|n| *n >= 1)
129            .unwrap_or(DEFAULT_MAX_CONCURRENCY);
130        Self {
131            backend,
132            instances: RwLock::new(HashMap::new()),
133            starting: RwLock::new(HashMap::new()),
134            max_concurrency,
135        }
136    }
137
138    /// Auto-detect Docker or Podman. Returns `None` if neither is available.
139    /// Override with `FAKECLOUD_CONTAINER_CLI` env var.
140    pub fn auto_detect_docker(server_port: u16) -> Option<Self> {
141        DockerBackend::auto_detect(server_port)
142            .map(|b| Self::from_backend(Arc::new(b) as Arc<dyn LambdaBackend>))
143    }
144
145    /// Backwards-compatible alias for [`Self::auto_detect_docker`].
146    /// Callers across the workspace use `ContainerRuntime::new(port)`.
147    pub fn new(server_port: u16) -> Option<Self> {
148        Self::auto_detect_docker(server_port)
149    }
150
151    /// Construct a runtime backed by the Kubernetes backend. Reads
152    /// configuration from env vars (`FAKECLOUD_K8S_SELF_URL`,
153    /// `FAKECLOUD_K8S_NAMESPACE`, etc.) and connects to the cluster
154    /// via in-cluster service account or kubeconfig. Hard-fails on
155    /// any configuration or connectivity issue — we don't silently
156    /// fall back to Docker because the operator explicitly opted in
157    /// to K8s.
158    ///
159    /// `internal_token` is the bearer token the artifact endpoints on
160    /// the fakecloud server expect from Pod init containers — caller
161    /// must register the same token on those endpoints.
162    pub async fn new_k8s(
163        server_port: u16,
164        internal_token: String,
165    ) -> Result<Self, super::k8s::K8sBackendError> {
166        let backend = super::k8s::K8sBackend::from_env(server_port, internal_token).await?;
167        backend.reap_stale().await;
168        Ok(Self::from_backend(Arc::new(backend)))
169    }
170
171    pub fn cli_name(&self) -> &str {
172        self.backend.name()
173    }
174
175    /// Background pre-warm hook: pull the image a Zip-package function
176    /// will need at invoke time, or the `ImageUri` of an Image-package
177    /// function. The first cold pull of an AWS base image (~700 MB)
178    /// frequently exceeds the AWS CLI default 60s read timeout, surfacing
179    /// to users as `Connection was closed` (issue #1539). Call after
180    /// `CreateFunction` persists so the warm path is ready before the
181    /// caller turns around and calls `Invoke`.
182    ///
183    /// Returns `None` if the function has no resolvable image (e.g. an
184    /// unsupported runtime string we can't map to a base image).
185    /// Otherwise returns the result of the backend's `prepull_image` —
186    /// callers log failures and move on, since invoke time still
187    /// re-attempts the pull as a fallback.
188    pub async fn prepull_for_function(
189        &self,
190        func: &LambdaFunction,
191    ) -> Option<Result<(), super::backend::RuntimeError>> {
192        let image = if func.package_type == "Image" {
193            func.image_uri.clone()?
194        } else {
195            super::docker::runtime_to_image(&func.runtime)?
196        };
197        Some(self.backend.prepull_image(&image).await)
198    }
199
200    /// Invoke a Lambda function, starting an instance if needed. Layer
201    /// ZIPs are extracted into `/opt` of the runtime sandbox; AWS base
202    /// images already include `/opt/python`, `/opt/nodejs/node_modules`,
203    /// `/opt/lib`, and `/opt/bin` on the right import paths.
204    ///
205    /// Reserves a warm instance for the call (one in-flight invocation
206    /// per instance — the RIE crashes on overlap, issue #1644). If the
207    /// instance is unreachable (dead Pod/container from a node drain, OOM,
208    /// or prior crash) it is evicted and the call retried (up to four
209    /// times) against a freshly cold-started instance, so a dead instance can't
210    /// wedge the function permanently.
211    pub async fn invoke(
212        &self,
213        func: &LambdaFunction,
214        payload: &[u8],
215        layers: &[Vec<u8>],
216    ) -> Result<Vec<u8>, RuntimeError> {
217        let client = reqwest::Client::builder()
218            .connect_timeout(REACHABILITY_PROBE_TIMEOUT)
219            .build()
220            .unwrap_or_else(|_| reqwest::Client::new());
221        let mut attempt: u32 = 0;
222        loop {
223            attempt += 1;
224            let slot = self.acquire_slot(func, layers).await?;
225
226            // Fast reachability probe before forwarding the payload. A warm Pod
227            // that was killed (FakeCloud recreating it, OOM, node reclaim) often
228            // black-holes its old IP, so the POST would hang the full invoke
229            // timeout. A short TCP probe detects the dead instance in ~1s; the
230            // connection never reached the handler, so we can safely evict and
231            // fail over to a cold start.
232            if !endpoint_reachable(&slot.entry.instance.endpoint, REACHABILITY_PROBE_TIMEOUT).await
233            {
234                let entry = slot.entry.clone();
235                drop(slot);
236                self.evict_entry(&func.function_name, &entry).await;
237                if attempt < MAX_INVOKE_ATTEMPTS {
238                    tracing::warn!(
239                        function = %func.function_name,
240                        endpoint = %entry.instance.endpoint,
241                        "warm Lambda instance failed reachability probe; evicted, retrying with a cold start"
242                    );
243                    continue;
244                }
245                return Err(RuntimeError::InvocationFailed(format!(
246                    "no reachable warm instance for {} after {attempt} attempts",
247                    func.function_name
248                )));
249            }
250
251            let url = format!(
252                "http://{}/2015-03-31/functions/function/invocations",
253                slot.entry.instance.endpoint
254            );
255            let send = client
256                .post(&url)
257                .body(payload.to_vec())
258                .timeout(Duration::from_secs(func.timeout as u64 + 5))
259                .send()
260                .await;
261            match send {
262                Ok(resp) => {
263                    let body = resp.bytes().await;
264                    *slot.entry.last_used.write() = Instant::now();
265                    return match body {
266                        Ok(b) => Ok(b.to_vec()),
267                        Err(e) => {
268                            // Response failed mid-stream — the instance is
269                            // suspect. Evict it but don't retry: the
270                            // function already ran and may have side effects.
271                            let entry = slot.entry.clone();
272                            drop(slot);
273                            self.evict_entry(&func.function_name, &entry).await;
274                            Err(RuntimeError::InvocationFailed(e.to_string()))
275                        }
276                    };
277                }
278                Err(e) => {
279                    // Transport-level failure. Evict the suspect instance.
280                    // Only retry when the connection was never
281                    // established (`is_connect` — e.g. refused by a dead
282                    // Pod): then the request provably never reached the
283                    // function, so a cold-start retry can't double-execute
284                    // it. A reset/timeout mid-flight may have already run
285                    // the handler, so surface those instead of risking a
286                    // duplicate invoke.
287                    let entry = slot.entry.clone();
288                    drop(slot);
289                    self.evict_entry(&func.function_name, &entry).await;
290                    if attempt < MAX_INVOKE_ATTEMPTS && e.is_connect() {
291                        tracing::warn!(
292                            function = %func.function_name,
293                            error = %e,
294                            "warm Lambda instance unreachable; evicted, retrying with a cold start"
295                        );
296                        continue;
297                    }
298                    return Err(RuntimeError::InvocationFailed(e.to_string()));
299                }
300            }
301        }
302    }
303
304    /// Invoke a Lambda function and yield the raw HTTP body as a stream
305    /// of byte chunks. Each chunk corresponds to one HTTP frame the RIE
306    /// flushed to the wire — for streaming-aware handlers this
307    /// preserves the chunk boundaries the function emitted. Buffered
308    /// handlers come back as a single chunk, which is still a valid
309    /// streamed response.
310    ///
311    /// The reserved instance's busy guard travels with the returned
312    /// [`StreamingInvocation`] so the slot stays held until the caller
313    /// finishes draining the stream.
314    pub async fn invoke_streaming(
315        &self,
316        func: &LambdaFunction,
317        payload: &[u8],
318        layers: &[Vec<u8>],
319    ) -> Result<StreamingInvocation, RuntimeError> {
320        let client = reqwest::Client::builder()
321            .connect_timeout(REACHABILITY_PROBE_TIMEOUT)
322            .build()
323            .unwrap_or_else(|_| reqwest::Client::new());
324        let mut attempt: u32 = 0;
325        loop {
326            attempt += 1;
327            let slot = self.acquire_slot(func, layers).await?;
328
329            // Same fast reachability probe as `invoke`: detect a dead/black-holed
330            // warm instance in ~1s and fail over instead of hanging.
331            if !endpoint_reachable(&slot.entry.instance.endpoint, REACHABILITY_PROBE_TIMEOUT).await
332            {
333                let entry = slot.entry.clone();
334                drop(slot);
335                self.evict_entry(&func.function_name, &entry).await;
336                if attempt < MAX_INVOKE_ATTEMPTS {
337                    continue;
338                }
339                return Err(RuntimeError::InvocationFailed(format!(
340                    "no reachable warm instance for {} after {attempt} attempts",
341                    func.function_name
342                )));
343            }
344
345            let url = format!(
346                "http://{}/2015-03-31/functions/function/invocations",
347                slot.entry.instance.endpoint
348            );
349            let send = client
350                .post(&url)
351                .body(payload.to_vec())
352                .timeout(Duration::from_secs(func.timeout as u64 + 5))
353                .send()
354                .await;
355            match send {
356                Ok(resp) => {
357                    *slot.entry.last_used.write() = Instant::now();
358                    let Slot {
359                        entry: _entry,
360                        guard,
361                    } = slot;
362                    return Ok(StreamingInvocation {
363                        resp,
364                        _slot_guard: Some(guard),
365                    });
366                }
367                Err(e) => {
368                    // Same connect-only retry policy as `invoke`: retry
369                    // only when the connection never established, so a
370                    // half-run handler isn't invoked twice.
371                    let entry = slot.entry.clone();
372                    drop(slot);
373                    self.evict_entry(&func.function_name, &entry).await;
374                    if attempt < MAX_INVOKE_ATTEMPTS && e.is_connect() {
375                        continue;
376                    }
377                    return Err(RuntimeError::InvocationFailed(e.to_string()));
378                }
379            }
380        }
381    }
382
383    /// Reserve a warm instance to run exactly one invocation, returning a
384    /// held busy guard that grants exclusive use until it drops. Shared
385    /// by `invoke` and `invoke_streaming`.
386    ///
387    /// Order of preference: (1) a free, current-deploy instance already
388    /// in the pool; (2) a freshly launched instance, if the pool is below
389    /// `max_concurrency`; (3) queue on a busy current-deploy instance.
390    /// Instances whose `deploy_id` no longer matches the function's
391    /// current code+layers are torn down before sizing the pool.
392    async fn acquire_slot(
393        &self,
394        func: &LambdaFunction,
395        layers: &[Vec<u8>],
396    ) -> Result<Slot, RuntimeError> {
397        let is_image = func.package_type == "Image";
398        if !is_image && func.code_zip.is_none() {
399            return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
400        }
401
402        let deploy_id = deploy_id_for(func, layers);
403
404        // (1) Fast path: a free instance already running the right deploy.
405        if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
406            return Ok(slot);
407        }
408
409        // Serialize launch decisions per function so a burst of cold
410        // invokes doesn't each push the pool past the cap.
411        let startup_lock = {
412            let mut starting = self.starting.write();
413            starting
414                .entry(func.function_name.clone())
415                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
416                .clone()
417        };
418        let startup_guard = startup_lock.lock().await;
419
420        // Re-check under the startup lock — another task may have freed or
421        // launched an instance while we waited.
422        if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
423            return Ok(slot);
424        }
425
426        // Tear down any instances left over from a previous deploy.
427        self.evict_stale_deploy(&func.function_name, &deploy_id)
428            .await;
429
430        let pool_len = self
431            .instances
432            .read()
433            .get(&func.function_name)
434            .map_or(0, |v| v.len());
435
436        // (2) Room to grow: launch a fresh instance and reserve it.
437        if pool_len < self.max_concurrency {
438            let instance = self
439                .backend
440                .launch(func, func.code_zip.as_deref(), layers, &deploy_id)
441                .await?;
442            let entry = Arc::new(WarmEntry {
443                instance,
444                last_used: RwLock::new(Instant::now()),
445                deploy_id,
446                busy: Arc::new(tokio::sync::Mutex::new(())),
447            });
448            let guard = entry
449                .busy
450                .clone()
451                .try_lock_owned()
452                .expect("freshly created busy lock is uncontended");
453            self.instances
454                .write()
455                .entry(func.function_name.clone())
456                .or_default()
457                .push(entry.clone());
458            return Ok(Slot { entry, guard });
459        }
460
461        // (3) At capacity: release the startup lock and wait for whichever
462        // current instance frees up *first*. Racing every instance's lock
463        // (rather than blocking on a fixed one) avoids convoying every
464        // queued caller onto pool[0] while a different instance goes idle.
465        drop(startup_guard);
466        let candidates: Vec<Arc<WarmEntry>> = {
467            let map = self.instances.read();
468            map.get(&func.function_name)
469                .map(|pool| {
470                    pool.iter()
471                        .filter(|e| e.deploy_id == deploy_id)
472                        .cloned()
473                        .collect()
474                })
475                .unwrap_or_default()
476        };
477        if candidates.is_empty() {
478            return Err(RuntimeError::InvocationFailed(format!(
479                "no warm instance available for {}",
480                func.function_name
481            )));
482        }
483        let waiters = candidates.into_iter().map(|entry| {
484            Box::pin(async move {
485                let guard = entry.busy.clone().lock_owned().await;
486                Slot { entry, guard }
487            })
488        });
489        let (slot, _idx, _rest) = futures_util::future::select_all(waiters).await;
490        *slot.entry.last_used.write() = Instant::now();
491        Ok(slot)
492    }
493
494    /// Try to reserve a free, current-deploy instance without launching.
495    /// Returns `None` if every matching instance is busy (or there are
496    /// none).
497    fn try_take_free(&self, function_name: &str, deploy_id: &str) -> Option<Slot> {
498        let map = self.instances.read();
499        let pool = map.get(function_name)?;
500        for entry in pool {
501            if entry.deploy_id != deploy_id {
502                continue;
503            }
504            if let Ok(guard) = entry.busy.clone().try_lock_owned() {
505                *entry.last_used.write() = Instant::now();
506                return Some(Slot {
507                    entry: entry.clone(),
508                    guard,
509                });
510            }
511        }
512        None
513    }
514
515    /// Remove one specific instance from a function's pool and terminate
516    /// it. Used when an invocation finds the instance unreachable.
517    async fn evict_entry(&self, function_name: &str, target: &Arc<WarmEntry>) {
518        let removed = {
519            let mut map = self.instances.write();
520            match map.get_mut(function_name) {
521                Some(pool) => {
522                    let removed = pool
523                        .iter()
524                        .position(|e| Arc::ptr_eq(e, target))
525                        .map(|pos| pool.remove(pos));
526                    if pool.is_empty() {
527                        map.remove(function_name);
528                    }
529                    removed
530                }
531                None => None,
532            }
533        };
534        if let Some(entry) = removed {
535            tracing::info!(
536                function = %function_name,
537                handle = ?entry.instance.handle,
538                "evicting unreachable Lambda runtime instance"
539            );
540            self.backend.terminate(&entry.instance.handle).await;
541        }
542    }
543
544    /// Tear down every instance in a function's pool whose `deploy_id`
545    /// no longer matches the current code+layers fingerprint.
546    async fn evict_stale_deploy(&self, function_name: &str, deploy_id: &str) {
547        let stale: Vec<Arc<WarmEntry>> = {
548            let mut map = self.instances.write();
549            match map.get_mut(function_name) {
550                Some(pool) => {
551                    let mut stale = Vec::new();
552                    pool.retain(|e| {
553                        if e.deploy_id == deploy_id {
554                            true
555                        } else {
556                            stale.push(e.clone());
557                            false
558                        }
559                    });
560                    if pool.is_empty() {
561                        map.remove(function_name);
562                    }
563                    stale
564                }
565                None => Vec::new(),
566            }
567        };
568        for entry in stale {
569            tracing::info!(
570                function = %function_name,
571                handle = ?entry.instance.handle,
572                "stopping stale-deploy Lambda runtime instance"
573            );
574            self.backend.terminate(&entry.instance.handle).await;
575        }
576    }
577
578    /// Remove and return the warm pool for a function **without**
579    /// terminating it. Lets DeleteFunction snapshot exactly the instances
580    /// that exist at delete time and terminate those, so a concurrent
581    /// recreate of the same name (whose fresh warm instance is keyed
582    /// identically) is not reaped by the deferred stop. Synchronous so the
583    /// caller can take the snapshot while still ordered before any recreate
584    /// (bug-hunt 2026-06-13, finding 4.2).
585    pub(crate) fn take_warm_instances(&self, function_name: &str) -> Vec<Arc<WarmEntry>> {
586        self.instances
587            .write()
588            .remove(function_name)
589            .unwrap_or_default()
590    }
591
592    /// Terminate a previously-snapshotted set of warm instances. Pairs with
593    /// [`take_warm_instances`] for the delete path.
594    pub(crate) async fn terminate_instances(&self, pool: Vec<Arc<WarmEntry>>) {
595        for entry in pool {
596            tracing::info!(
597                handle = ?entry.instance.handle,
598                "stopping Lambda runtime instance"
599            );
600            self.backend.terminate(&entry.instance.handle).await;
601        }
602    }
603
604    /// Stop and remove every warm instance for a specific function.
605    pub async fn stop_container(&self, function_name: &str) {
606        let pool = self.take_warm_instances(function_name);
607        self.terminate_instances(pool).await;
608    }
609
610    /// Stop and remove all warm instances (used on server shutdown or reset).
611    pub async fn stop_all(&self) {
612        let pools: Vec<(String, Vec<Arc<WarmEntry>>)> =
613            { self.instances.write().drain().collect() };
614        for (name, pool) in pools {
615            for entry in pool {
616                tracing::info!(
617                    function = %name,
618                    handle = ?entry.instance.handle,
619                    "stopping Lambda runtime instance (cleanup)"
620                );
621                self.backend.terminate(&entry.instance.handle).await;
622            }
623        }
624    }
625
626    /// List all warm instances and their metadata for introspection.
627    /// One row per running instance — a function scaled to several warm
628    /// instances appears once per instance.
629    pub fn list_warm_containers(
630        &self,
631        lambda_state: &crate::state::SharedLambdaState,
632    ) -> Vec<serde_json::Value> {
633        let entries = self.instances.read();
634        let accounts = lambda_state.read();
635        let mut rows = Vec::new();
636        for (name, pool) in entries.iter() {
637            let runtime = accounts
638                .iter()
639                .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
640                .unwrap_or_default();
641            for entry in pool {
642                let idle_secs = entry.last_used.read().elapsed().as_secs();
643                let mut row = serde_json::json!({
644                    "functionName": name,
645                    "runtime": runtime,
646                    "backend": self.backend.name(),
647                    "lastUsedSecsAgo": idle_secs,
648                });
649                let obj = row.as_object_mut().expect("json object");
650                match &entry.instance.handle {
651                    BackendHandle::Container { id } => {
652                        obj.insert("containerId".into(), serde_json::Value::String(id.clone()));
653                    }
654                    BackendHandle::Pod { namespace, name } => {
655                        obj.insert("podName".into(), serde_json::Value::String(name.clone()));
656                        obj.insert(
657                            "namespace".into(),
658                            serde_json::Value::String(namespace.clone()),
659                        );
660                    }
661                }
662                rows.push(row);
663            }
664        }
665        rows
666    }
667
668    /// Evict (stop and remove) every warm instance for a specific
669    /// function. Returns true if at least one instance was evicted.
670    pub async fn evict_container(&self, function_name: &str) -> bool {
671        let pool = self
672            .instances
673            .write()
674            .remove(function_name)
675            .unwrap_or_default();
676        let found = !pool.is_empty();
677        for entry in pool {
678            tracing::info!(
679                function = %function_name,
680                handle = ?entry.instance.handle,
681                "evicting Lambda runtime instance via simulation API"
682            );
683            self.backend.terminate(&entry.instance.handle).await;
684        }
685        found
686    }
687
688    /// Background loop that stops instances idle longer than `ttl`.
689    pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
690        let mut interval = tokio::time::interval(Duration::from_secs(30));
691        loop {
692            interval.tick().await;
693            self.cleanup_idle(ttl).await;
694        }
695    }
696
697    async fn cleanup_idle(&self, ttl: Duration) {
698        // Reap individual instances that are both idle past the TTL and
699        // currently free (a busy instance is mid-invocation, so its
700        // `last_used` is fresh anyway — the `try_lock` check just avoids
701        // racing a slot that's about to be used).
702        let expired: Vec<(String, Arc<WarmEntry>)> = {
703            let mut map = self.instances.write();
704            let mut out = Vec::new();
705            for (name, pool) in map.iter_mut() {
706                let mut i = 0;
707                while i < pool.len() {
708                    let idle = pool[i].last_used.read().elapsed() > ttl;
709                    let free = pool[i].busy.try_lock().is_ok();
710                    if idle && free {
711                        out.push((name.clone(), pool.remove(i)));
712                    } else {
713                        i += 1;
714                    }
715                }
716            }
717            map.retain(|_, pool| !pool.is_empty());
718            out
719        };
720        for (name, entry) in expired {
721            tracing::info!(function = %name, "stopping idle Lambda runtime instance");
722            self.backend.terminate(&entry.instance.handle).await;
723        }
724    }
725}
726
727#[cfg(test)]
728mod tests {
729    use super::deploy_id_from;
730
731    /// The deploy id is spliced raw into the init-container artifact URL
732    /// path and into a Pod label, so it must never contain characters
733    /// that standard base64 emits (`/`, `+`, `=`). Standard base64
734    /// produced `/` for ~49% of code hashes (issue #1643); sweep a wide
735    /// range of inputs to catch any regression back to a non-URL-safe
736    /// alphabet.
737    #[test]
738    fn deploy_id_is_url_path_safe() {
739        for i in 0..2_000u32 {
740            // Vary both the code hash and the layer set.
741            let code_sha256 = format!("sha256-seed-{i}-{}", i.wrapping_mul(2_654_435_761));
742            let layers: Vec<Vec<u8>> = if i % 3 == 0 {
743                vec![format!("layer-{i}").into_bytes()]
744            } else {
745                vec![]
746            };
747            let id = deploy_id_from(&code_sha256, &layers);
748            assert!(
749                !id.contains('/') && !id.contains('+') && !id.contains('='),
750                "deploy id {id:?} (seed {i}) is not URL-path-safe"
751            );
752        }
753    }
754
755    /// Same inputs must always map to the same deploy id — the value is a
756    /// warm-pool cache key, so instability would defeat reuse.
757    #[test]
758    fn deploy_id_is_stable() {
759        let layers = vec![b"layer-a".to_vec(), b"layer-b".to_vec()];
760        let a = deploy_id_from("abc123", &layers);
761        let b = deploy_id_from("abc123", &layers);
762        assert_eq!(a, b);
763        assert_ne!(a, deploy_id_from("abc124", &layers));
764        assert_ne!(a, deploy_id_from("abc123", &[]));
765    }
766
767    // ---- warm-pool concurrency + eviction (issue #1644) ----
768
769    use super::LambdaRuntime;
770    use crate::runtime::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
771    use crate::state::LambdaFunction;
772    use parking_lot::RwLock;
773    use std::collections::{HashMap, VecDeque};
774    use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
775    use std::sync::Arc;
776    use std::sync::Mutex as StdMutex;
777    use std::time::Duration;
778
779    /// Backend double: counts launches/terminates and hands out endpoints
780    /// from a queue (falling back to `default_endpoint`), so a test can
781    /// inject a dead endpoint followed by a live one.
782    struct CountingBackend {
783        endpoints: StdMutex<VecDeque<String>>,
784        default_endpoint: String,
785        launches: AtomicUsize,
786        terminates: AtomicUsize,
787    }
788
789    impl CountingBackend {
790        fn new(default_endpoint: impl Into<String>) -> Arc<Self> {
791            Arc::new(Self {
792                endpoints: StdMutex::new(VecDeque::new()),
793                default_endpoint: default_endpoint.into(),
794                launches: AtomicUsize::new(0),
795                terminates: AtomicUsize::new(0),
796            })
797        }
798        fn with_queue(default_endpoint: impl Into<String>, queue: Vec<String>) -> Arc<Self> {
799            Arc::new(Self {
800                endpoints: StdMutex::new(queue.into()),
801                default_endpoint: default_endpoint.into(),
802                launches: AtomicUsize::new(0),
803                terminates: AtomicUsize::new(0),
804            })
805        }
806    }
807
808    #[async_trait::async_trait]
809    impl LambdaBackend for CountingBackend {
810        fn name(&self) -> &str {
811            "test"
812        }
813        async fn launch(
814            &self,
815            _func: &LambdaFunction,
816            _code_zip: Option<&[u8]>,
817            _layers: &[Vec<u8>],
818            _deploy_id: &str,
819        ) -> Result<WarmInstance, RuntimeError> {
820            let n = self.launches.fetch_add(1, SeqCst);
821            let endpoint = self
822                .endpoints
823                .lock()
824                .unwrap()
825                .pop_front()
826                .unwrap_or_else(|| self.default_endpoint.clone());
827            Ok(WarmInstance {
828                endpoint,
829                handle: BackendHandle::Container {
830                    id: format!("c{n}"),
831                },
832            })
833        }
834        async fn terminate(&self, _handle: &BackendHandle) {
835            self.terminates.fetch_add(1, SeqCst);
836        }
837    }
838
839    /// Spin a minimal in-process "RIE": one TCP accept loop that records
840    /// the peak number of simultaneously-open connections, holds each
841    /// request for `delay`, then returns a tiny 200. The peak directly
842    /// observes how many invocations overlapped on the instance(s) it
843    /// backs. Returns the `host:port` to use as a warm endpoint.
844    async fn spawn_rie(delay: Duration, peak: Arc<AtomicUsize>) -> String {
845        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
846        let addr = listener.local_addr().unwrap();
847        let cur = Arc::new(AtomicUsize::new(0));
848        tokio::spawn(async move {
849            loop {
850                let Ok((mut sock, _)) = listener.accept().await else {
851                    break;
852                };
853                let cur = cur.clone();
854                let peak = peak.clone();
855                tokio::spawn(async move {
856                    use tokio::io::{AsyncReadExt, AsyncWriteExt};
857                    // Only connections that actually send a request count as an
858                    // invocation. A bare reachability probe (TCP connect, no
859                    // bytes) reads EOF and is ignored — it neither overlaps a
860                    // real invoke nor inflates the concurrency peak, mirroring
861                    // the real RIE, which only starts an event on a request.
862                    let mut buf = [0u8; 1024];
863                    let n = sock.read(&mut buf).await.unwrap_or(0);
864                    if n == 0 {
865                        return;
866                    }
867                    let now = cur.fetch_add(1, SeqCst) + 1;
868                    peak.fetch_max(now, SeqCst);
869                    tokio::time::sleep(delay).await;
870                    let _ = sock
871                        .write_all(
872                            b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: close\r\n\r\nok",
873                        )
874                        .await;
875                    let _ = sock.flush().await;
876                    cur.fetch_sub(1, SeqCst);
877                });
878            }
879        });
880        format!("{addr}")
881    }
882
883    fn runtime_with(backend: Arc<CountingBackend>, max_concurrency: usize) -> Arc<LambdaRuntime> {
884        Arc::new(LambdaRuntime {
885            backend,
886            instances: RwLock::new(HashMap::new()),
887            starting: RwLock::new(HashMap::new()),
888            max_concurrency,
889        })
890    }
891
892    fn test_func(name: &str, sha: &str) -> LambdaFunction {
893        serde_json::from_value(serde_json::json!({
894            "function_name": name,
895            "function_arn": format!("arn:aws:lambda:us-east-1:123456789012:function:{name}"),
896            "runtime": "python3.12",
897            "role": "arn:aws:iam::123456789012:role/r",
898            "handler": "index.handler",
899            "description": "",
900            "timeout": 5,
901            "memory_size": 128,
902            "code_sha256": sha,
903            "code_size": 1,
904            "version": "$LATEST",
905            "last_modified": "2020-01-01T00:00:00Z",
906            "tags": {},
907            "environment": {},
908            "architectures": ["x86_64"],
909            "package_type": "Zip",
910            "code_zip": [1, 2, 3],
911            "policy": null
912        }))
913        .expect("build test LambdaFunction")
914    }
915
916    /// The core of #1644: with one warm instance, a burst of concurrent
917    /// invocations must be serialized onto it — never delivered in
918    /// parallel (which segfaults the real RIE). Peak overlap must be 1.
919    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
920    async fn concurrent_invokes_are_serialized_on_a_single_instance() {
921        let peak = Arc::new(AtomicUsize::new(0));
922        let endpoint = spawn_rie(Duration::from_millis(40), peak.clone()).await;
923        let backend = CountingBackend::new(endpoint);
924        let rt = runtime_with(backend.clone(), 1);
925        let func = test_func("conc", "sha-A");
926
927        let mut handles = Vec::new();
928        for _ in 0..8 {
929            let rt = rt.clone();
930            let func = func.clone();
931            handles.push(tokio::spawn(
932                async move { rt.invoke(&func, b"{}", &[]).await },
933            ));
934        }
935        for h in handles {
936            h.await.unwrap().expect("invoke ok");
937        }
938
939        assert_eq!(
940            peak.load(SeqCst),
941            1,
942            "concurrent invokes overlapped on a single RIE instance"
943        );
944        assert_eq!(
945            backend.launches.load(SeqCst),
946            1,
947            "max_concurrency=1 must launch exactly one instance"
948        );
949    }
950
951    /// Under load the pool grows beyond one instance but never past the
952    /// cap, so genuine concurrency is served (AWS semantics) without an
953    /// unbounded container/Pod fan-out.
954    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
955    async fn pool_scales_under_load_and_respects_cap() {
956        let peak = Arc::new(AtomicUsize::new(0));
957        let endpoint = spawn_rie(Duration::from_millis(60), peak.clone()).await;
958        let backend = CountingBackend::new(endpoint);
959        let rt = runtime_with(backend.clone(), 4);
960        let func = test_func("scale", "sha-A");
961
962        let mut handles = Vec::new();
963        for _ in 0..8 {
964            let rt = rt.clone();
965            let func = func.clone();
966            handles.push(tokio::spawn(
967                async move { rt.invoke(&func, b"{}", &[]).await },
968            ));
969        }
970        for h in handles {
971            h.await.unwrap().expect("invoke ok");
972        }
973
974        let launched = backend.launches.load(SeqCst);
975        assert!(
976            (2..=4).contains(&launched),
977            "expected the pool to scale within the cap, launched={launched}"
978        );
979        assert!(
980            peak.load(SeqCst) > 1,
981            "expected concurrent forwards across the scaled pool"
982        );
983    }
984
985    /// An unreachable instance (dead Pod/container) must be evicted and
986    /// the invocation retried against a fresh cold start, instead of
987    /// wedging the function forever.
988    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
989    async fn dead_instance_is_evicted_and_retried() {
990        let peak = Arc::new(AtomicUsize::new(0));
991        let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
992        // Two launches in a row hand back refused ports; the third gets the
993        // live one. Connect-refused provably never reached a handler, so the
994        // extra retry budget can't double-execute.
995        let backend = CountingBackend::with_queue(
996            live,
997            vec!["127.0.0.1:1".to_string(), "127.0.0.1:1".to_string()],
998        );
999        let rt = runtime_with(backend.clone(), 1);
1000
1001        let out = rt
1002            .invoke(&test_func("dead", "sha-A"), b"{}", &[])
1003            .await
1004            .expect("should recover via cold-start retry");
1005        assert_eq!(out, b"ok");
1006        assert_eq!(
1007            backend.launches.load(SeqCst),
1008            3,
1009            "expected two dead instances plus one cold-start replacement"
1010        );
1011        assert!(
1012            backend.terminates.load(SeqCst) >= 2,
1013            "both dead instances should have been terminated on eviction"
1014        );
1015    }
1016
1017    /// A black-holed warm instance (a killed Pod whose IP now drops packets,
1018    /// rather than refusing with an RST) must be detected by the fast
1019    /// reachability probe and failed over in seconds — not hang the full invoke
1020    /// timeout (~`func.timeout + 5s`). This is the failure mode that flaked the
1021    /// LOE workflows: a dead pod cost ~305s, blowing the test window.
1022    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1023    async fn black_holed_instance_fails_over_fast() {
1024        let peak = Arc::new(AtomicUsize::new(0));
1025        let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
1026        // 192.0.2.1 is RFC 5737 TEST-NET-1: unrouted, so a connect black-holes
1027        // (caught by the probe's ~1.5s timeout) instead of getting a fast RST.
1028        let backend = CountingBackend::with_queue(live, vec!["192.0.2.1:9".to_string()]);
1029        let rt = runtime_with(backend.clone(), 1);
1030
1031        // test_func timeout is 5 -> the pre-fix hang would be ~10s.
1032        let func = test_func("blackhole", "sha-A");
1033        let started = std::time::Instant::now();
1034        let out = rt
1035            .invoke(&func, b"{}", &[])
1036            .await
1037            .expect("should recover via cold-start retry");
1038        let elapsed = started.elapsed();
1039
1040        assert_eq!(out, b"ok");
1041        assert_eq!(
1042            backend.launches.load(SeqCst),
1043            2,
1044            "expected one black-holed instance plus one cold-start replacement"
1045        );
1046        assert!(
1047            backend.terminates.load(SeqCst) >= 1,
1048            "the black-holed instance should have been evicted"
1049        );
1050        assert!(
1051            elapsed < Duration::from_secs(5),
1052            "failover took {elapsed:?}; must be far below the ~10s invoke timeout"
1053        );
1054    }
1055
1056    /// Changing the function's code (a new deploy id) tears down the
1057    /// stale warm instance before serving against a fresh one.
1058    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1059    async fn deploy_change_evicts_stale_instance() {
1060        let peak = Arc::new(AtomicUsize::new(0));
1061        let endpoint = spawn_rie(Duration::from_millis(5), peak.clone()).await;
1062        let backend = CountingBackend::new(endpoint);
1063        let rt = runtime_with(backend.clone(), 2);
1064
1065        rt.invoke(&test_func("upd", "sha-A"), b"{}", &[])
1066            .await
1067            .unwrap();
1068        rt.invoke(&test_func("upd", "sha-B"), b"{}", &[])
1069            .await
1070            .unwrap();
1071
1072        assert_eq!(
1073            backend.launches.load(SeqCst),
1074            2,
1075            "a new deploy id should launch a fresh instance"
1076        );
1077        assert!(
1078            backend.terminates.load(SeqCst) >= 1,
1079            "the stale-deploy instance should have been torn down"
1080        );
1081        // Exactly one current instance remains in the pool.
1082        let pool_len = rt.instances.read().get("upd").map_or(0, |v| v.len());
1083        assert_eq!(pool_len, 1);
1084    }
1085
1086    /// bug-hunt 2026-06-13, finding 4.2: DeleteFunction must snapshot the
1087    /// warm pool and terminate *that snapshot*, not whatever pool exists
1088    /// when the deferred stop runs. Otherwise a CreateFunction + warm-up of
1089    /// the same name racing ahead of the stop has its fresh container
1090    /// reaped. This exercises the snapshot primitives directly.
1091    #[tokio::test]
1092    async fn take_warm_instances_snapshot_does_not_reap_recreated_pool() {
1093        let backend = CountingBackend::new("127.0.0.1:1");
1094        let rt = runtime_with(backend.clone(), 10);
1095        let mk = |id: &str| {
1096            Arc::new(super::WarmEntry {
1097                instance: WarmInstance {
1098                    endpoint: "127.0.0.1:1".to_string(),
1099                    handle: BackendHandle::Container { id: id.to_string() },
1100                },
1101                last_used: RwLock::new(std::time::Instant::now()),
1102                deploy_id: "d".to_string(),
1103                busy: Arc::new(tokio::sync::Mutex::new(())),
1104            })
1105        };
1106
1107        // A function "f" with one warm instance.
1108        rt.instances
1109            .write()
1110            .insert("f".to_string(), vec![mk("old")]);
1111
1112        // Delete snapshots the pool synchronously and removes it from the map.
1113        let snapshot = rt.take_warm_instances("f");
1114        assert_eq!(snapshot.len(), 1);
1115        assert!(rt.instances.read().get("f").is_none());
1116
1117        // A recreate + warm-up of the same name wins the race ahead of the
1118        // deferred terminate.
1119        rt.instances
1120            .write()
1121            .insert("f".to_string(), vec![mk("new")]);
1122
1123        // Terminating the snapshot must touch only the old instance.
1124        rt.terminate_instances(snapshot).await;
1125        assert_eq!(
1126            backend.terminates.load(SeqCst),
1127            1,
1128            "only the snapshotted instance is terminated"
1129        );
1130
1131        // The recreated function keeps its fresh warm instance.
1132        let pool = rt.instances.read();
1133        let f = pool.get("f").expect("recreated function pool must survive");
1134        assert_eq!(f.len(), 1);
1135    }
1136}