Skip to main content

fakecloud_lambda/runtime/
facade.rs

1//! Backend-agnostic Lambda runtime facade.
2//!
3//! Owns the warm-pool bookkeeping, per-function startup serialization,
4//! and the HTTP invocation path. Dispatches container lifecycle to
5//! whatever [`LambdaBackend`] it was constructed with.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use base64::Engine;
12use parking_lot::RwLock;
13use sha2::{Digest, Sha256};
14
15use super::backend::{
16    BackendHandle, LambdaBackend, RuntimeError, StreamingInvocation, WarmInstance,
17};
18use super::docker::DockerBackend;
19use crate::state::LambdaFunction;
20
21/// A running runtime instance kept warm for reuse.
22pub(crate) struct WarmEntry {
23    instance: WarmInstance,
24    last_used: RwLock<Instant>,
25    /// Combined fingerprint of the function's code SHA-256 plus the
26    /// SHA-256 of every attached layer's ZIP bytes, joined in attach
27    /// order. Layers mutate `/opt`, so a layer change invalidates the
28    /// warm instance even when the function code is unchanged.
29    deploy_id: String,
30    /// Held for the duration of a single invocation against this
31    /// instance. The AWS Runtime Interface Emulator (and real Lambda)
32    /// handles exactly one event per execution environment at a time;
33    /// the RIE's `rapidcore` server nil-pointer-derefs and the process
34    /// exits if two invokes overlap (issue #1644). Acquiring this lock
35    /// before forwarding guarantees one in-flight event per instance,
36    /// and lets the pool pick a *free* instance via `try_lock`.
37    busy: Arc<tokio::sync::Mutex<()>>,
38}
39
40/// Default cap on warm instances per function. Real Lambda scales
41/// execution environments with concurrent demand; we bound it so a
42/// burst can't spawn unbounded containers/Pods. Override with
43/// `FAKECLOUD_LAMBDA_MAX_CONCURRENCY`. Beyond the cap, invocations queue
44/// on a busy instance rather than starting a new one.
45const DEFAULT_MAX_CONCURRENCY: usize = 10;
46
47/// Max attempts per invocation when a reserved warm instance turns out to be
48/// unreachable. Each failover is a fast probe (or connect error) plus a cold
49/// start; the connection provably never reached the handler, so retrying can't
50/// double-execute. Bounded so an all-dead pool can't spin forever.
51const MAX_INVOKE_ATTEMPTS: u32 = 5;
52
53/// Timeout for the pre-invoke TCP reachability probe. A black-holed Pod IP (a
54/// killed pod / drained node that drops packets with no RST) would otherwise
55/// hang the full invoke timeout (~`func.timeout + 5s`); this detects it in ~1s
56/// so the state-machine retry can succeed within its window.
57const REACHABILITY_PROBE_TIMEOUT: Duration = Duration::from_millis(1500);
58
59/// A reserved invocation slot: a warm instance plus the held busy guard
60/// that grants exclusive use of it until the guard drops.
61struct Slot {
62    entry: Arc<WarmEntry>,
63    guard: tokio::sync::OwnedMutexGuard<()>,
64}
65
66/// Compute the warm-instance key for a function with its current layer
67/// set. Stable across calls — layer ARNs are immutable in AWS, so the
68/// hash of their bytes is the right cache key.
69///
70/// Encoded with `URL_SAFE_NO_PAD` so the result never contains `/`, `+`,
71/// or `=`. The id is spliced raw into the init-container artifact URL
72/// (`.../_internal/code/{account}/{function}/{deploy}.zip`) and into the
73/// `fakecloud-deploy-id` Pod label; standard base64's `/` would grow an
74/// extra URL path segment, break the axum route match, and wedge the Pod
75/// in a cold-start loop for ~49% of deploys (issue #1643).
76fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
77    deploy_id_from(&func.code_sha256, layers)
78}
79
80/// Pure core of [`deploy_id_for`], split out so the URL-path-safety
81/// invariant can be tested without constructing a full `LambdaFunction`.
82fn deploy_id_from(code_sha256: &str, layers: &[Vec<u8>]) -> String {
83    let mut hasher = Sha256::new();
84    hasher.update(code_sha256.as_bytes());
85    for bytes in layers {
86        let mut layer_hasher = Sha256::new();
87        layer_hasher.update(bytes);
88        hasher.update(b":");
89        hasher.update(layer_hasher.finalize());
90    }
91    base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(hasher.finalize())
92}
93
94/// Quick liveness check: can a TCP connection to `endpoint` (`host:port`) be
95/// opened within `timeout`? Used before forwarding a payload to a warm instance
96/// so a dead/black-holed Pod is detected in ~1s instead of hanging the full
97/// invoke timeout. A failed connect provably never reached the handler, so the
98/// caller can safely evict and retry.
99async fn endpoint_reachable(endpoint: &str, timeout: Duration) -> bool {
100    matches!(
101        tokio::time::timeout(timeout, tokio::net::TcpStream::connect(endpoint)).await,
102        Ok(Ok(_))
103    )
104}
105
106pub struct LambdaRuntime {
107    backend: Arc<dyn LambdaBackend>,
108    /// Per-function pool of warm instances. Each instance serves one
109    /// invocation at a time (its `busy` lock); the pool grows on demand
110    /// up to `max_concurrency`, and the idle reaper trims it.
111    instances: RwLock<HashMap<String, Vec<Arc<WarmEntry>>>>,
112    /// Serializes runtime startup per function to prevent duplicate
113    /// instances racing into the pool when several cold invokes arrive
114    /// together.
115    starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
116    /// Cap on warm instances per function.
117    max_concurrency: usize,
118}
119
120impl LambdaRuntime {
121    /// Construct a runtime over the supplied backend. Callers that want
122    /// auto-detection should use [`Self::auto_detect_docker`] or
123    /// [`Self::new`].
124    pub fn from_backend(backend: Arc<dyn LambdaBackend>) -> Self {
125        let max_concurrency = std::env::var("FAKECLOUD_LAMBDA_MAX_CONCURRENCY")
126            .ok()
127            .and_then(|v| v.parse::<usize>().ok())
128            .filter(|n| *n >= 1)
129            .unwrap_or(DEFAULT_MAX_CONCURRENCY);
130        Self {
131            backend,
132            instances: RwLock::new(HashMap::new()),
133            starting: RwLock::new(HashMap::new()),
134            max_concurrency,
135        }
136    }
137
138    /// Auto-detect Docker or Podman. Returns `None` if neither is available.
139    /// Override with `FAKECLOUD_CONTAINER_CLI` env var.
140    pub fn auto_detect_docker(server_port: u16) -> Option<Self> {
141        DockerBackend::auto_detect(server_port)
142            .map(|b| Self::from_backend(Arc::new(b) as Arc<dyn LambdaBackend>))
143    }
144
145    /// Backwards-compatible alias for [`Self::auto_detect_docker`].
146    /// Callers across the workspace use `ContainerRuntime::new(port)`.
147    pub fn new(server_port: u16) -> Option<Self> {
148        Self::auto_detect_docker(server_port)
149    }
150
151    /// Construct a runtime backed by the Kubernetes backend. Reads
152    /// configuration from env vars (`FAKECLOUD_K8S_SELF_URL`,
153    /// `FAKECLOUD_K8S_NAMESPACE`, etc.) and connects to the cluster
154    /// via in-cluster service account or kubeconfig. Hard-fails on
155    /// any configuration or connectivity issue — we don't silently
156    /// fall back to Docker because the operator explicitly opted in
157    /// to K8s.
158    ///
159    /// `internal_token` is the bearer token the artifact endpoints on
160    /// the fakecloud server expect from Pod init containers — caller
161    /// must register the same token on those endpoints.
162    pub async fn new_k8s(
163        server_port: u16,
164        internal_token: String,
165    ) -> Result<Self, super::k8s::K8sBackendError> {
166        let backend = super::k8s::K8sBackend::from_env(server_port, internal_token).await?;
167        backend.reap_stale().await;
168        Ok(Self::from_backend(Arc::new(backend)))
169    }
170
171    pub fn cli_name(&self) -> &str {
172        self.backend.name()
173    }
174
175    /// Background pre-warm hook: pull the image a Zip-package function
176    /// will need at invoke time, or the `ImageUri` of an Image-package
177    /// function. The first cold pull of an AWS base image (~700 MB)
178    /// frequently exceeds the AWS CLI default 60s read timeout, surfacing
179    /// to users as `Connection was closed` (issue #1539). Call after
180    /// `CreateFunction` persists so the warm path is ready before the
181    /// caller turns around and calls `Invoke`.
182    ///
183    /// Returns `None` if the function has no resolvable image (e.g. an
184    /// unsupported runtime string we can't map to a base image).
185    /// Otherwise returns the result of the backend's `prepull_image` —
186    /// callers log failures and move on, since invoke time still
187    /// re-attempts the pull as a fallback.
188    pub async fn prepull_for_function(
189        &self,
190        func: &LambdaFunction,
191    ) -> Option<Result<(), super::backend::RuntimeError>> {
192        let image = if func.package_type == "Image" {
193            func.image_uri.clone()?
194        } else {
195            super::docker::runtime_to_image(&func.runtime)?
196        };
197        Some(self.backend.prepull_image(&image).await)
198    }
199
200    /// Invoke a Lambda function, starting an instance if needed. Layer
201    /// ZIPs are extracted into `/opt` of the runtime sandbox; AWS base
202    /// images already include `/opt/python`, `/opt/nodejs/node_modules`,
203    /// `/opt/lib`, and `/opt/bin` on the right import paths.
204    ///
205    /// Reserves a warm instance for the call (one in-flight invocation
206    /// per instance — the RIE crashes on overlap, issue #1644). If the
207    /// instance is unreachable (dead Pod/container from a node drain, OOM,
208    /// or prior crash) it is evicted and the call retried (up to four
209    /// times) against a freshly cold-started instance, so a dead instance can't
210    /// wedge the function permanently.
211    pub async fn invoke(
212        &self,
213        func: &LambdaFunction,
214        payload: &[u8],
215        layers: &[Vec<u8>],
216    ) -> Result<Vec<u8>, RuntimeError> {
217        self.invoke_inner(func, payload, layers, false)
218            .await
219            .map(|(bytes, _)| bytes)
220    }
221
222    /// Like [`Self::invoke`] but also returns the instance's recent log tail
223    /// (for `Invoke` with `LogType=Tail` -> `X-Amz-Log-Result`). `None` when the
224    /// backend can't supply logs.
225    pub async fn invoke_with_log_tail(
226        &self,
227        func: &LambdaFunction,
228        payload: &[u8],
229        layers: &[Vec<u8>],
230    ) -> Result<(Vec<u8>, Option<String>), RuntimeError> {
231        self.invoke_inner(func, payload, layers, true).await
232    }
233
234    async fn invoke_inner(
235        &self,
236        func: &LambdaFunction,
237        payload: &[u8],
238        layers: &[Vec<u8>],
239        capture_logs: bool,
240    ) -> Result<(Vec<u8>, Option<String>), RuntimeError> {
241        let client = reqwest::Client::builder()
242            .connect_timeout(REACHABILITY_PROBE_TIMEOUT)
243            .build()
244            .unwrap_or_else(|_| reqwest::Client::new());
245        let mut attempt: u32 = 0;
246        loop {
247            attempt += 1;
248            let slot = self.acquire_slot(func, layers).await?;
249
250            // Fast reachability probe before forwarding the payload. A warm Pod
251            // that was killed (FakeCloud recreating it, OOM, node reclaim) often
252            // black-holes its old IP, so the POST would hang the full invoke
253            // timeout. A short TCP probe detects the dead instance in ~1s; the
254            // connection never reached the handler, so we can safely evict and
255            // fail over to a cold start.
256            if !endpoint_reachable(&slot.entry.instance.endpoint, REACHABILITY_PROBE_TIMEOUT).await
257            {
258                let entry = slot.entry.clone();
259                drop(slot);
260                self.evict_entry(&func.function_name, &entry).await;
261                if attempt < MAX_INVOKE_ATTEMPTS {
262                    tracing::warn!(
263                        function = %func.function_name,
264                        endpoint = %entry.instance.endpoint,
265                        "warm Lambda instance failed reachability probe; evicted, retrying with a cold start"
266                    );
267                    continue;
268                }
269                return Err(RuntimeError::InvocationFailed(format!(
270                    "no reachable warm instance for {} after {attempt} attempts",
271                    func.function_name
272                )));
273            }
274
275            let url = format!(
276                "http://{}/2015-03-31/functions/function/invocations",
277                slot.entry.instance.endpoint
278            );
279            let send = client
280                .post(&url)
281                .body(payload.to_vec())
282                .timeout(Duration::from_secs(func.timeout as u64 + 5))
283                .send()
284                .await;
285            match send {
286                Ok(resp) => {
287                    let body = resp.bytes().await;
288                    *slot.entry.last_used.write() = Instant::now();
289                    return match body {
290                        Ok(b) => {
291                            // Capture the instance's log tail while the slot
292                            // (and thus the container/Pod) is still alive.
293                            let logs = if capture_logs {
294                                self.backend
295                                    .instance_logs(&slot.entry.instance.handle)
296                                    .await
297                            } else {
298                                None
299                            };
300                            Ok((b.to_vec(), logs))
301                        }
302                        Err(e) => {
303                            // Response failed mid-stream — the instance is
304                            // suspect. Evict it but don't retry: the
305                            // function already ran and may have side effects.
306                            let entry = slot.entry.clone();
307                            drop(slot);
308                            self.evict_entry(&func.function_name, &entry).await;
309                            Err(RuntimeError::InvocationFailed(e.to_string()))
310                        }
311                    };
312                }
313                Err(e) => {
314                    // Transport-level failure. Evict the suspect instance.
315                    // Only retry when the connection was never
316                    // established (`is_connect` — e.g. refused by a dead
317                    // Pod): then the request provably never reached the
318                    // function, so a cold-start retry can't double-execute
319                    // it. A reset/timeout mid-flight may have already run
320                    // the handler, so surface those instead of risking a
321                    // duplicate invoke.
322                    let entry = slot.entry.clone();
323                    drop(slot);
324                    self.evict_entry(&func.function_name, &entry).await;
325                    if attempt < MAX_INVOKE_ATTEMPTS && e.is_connect() {
326                        tracing::warn!(
327                            function = %func.function_name,
328                            error = %e,
329                            "warm Lambda instance unreachable; evicted, retrying with a cold start"
330                        );
331                        continue;
332                    }
333                    return Err(RuntimeError::InvocationFailed(e.to_string()));
334                }
335            }
336        }
337    }
338
339    /// Invoke a Lambda function and yield the raw HTTP body as a stream
340    /// of byte chunks. Each chunk corresponds to one HTTP frame the RIE
341    /// flushed to the wire — for streaming-aware handlers this
342    /// preserves the chunk boundaries the function emitted. Buffered
343    /// handlers come back as a single chunk, which is still a valid
344    /// streamed response.
345    ///
346    /// The reserved instance's busy guard travels with the returned
347    /// [`StreamingInvocation`] so the slot stays held until the caller
348    /// finishes draining the stream.
349    pub async fn invoke_streaming(
350        &self,
351        func: &LambdaFunction,
352        payload: &[u8],
353        layers: &[Vec<u8>],
354    ) -> Result<StreamingInvocation, RuntimeError> {
355        let client = reqwest::Client::builder()
356            .connect_timeout(REACHABILITY_PROBE_TIMEOUT)
357            .build()
358            .unwrap_or_else(|_| reqwest::Client::new());
359        let mut attempt: u32 = 0;
360        loop {
361            attempt += 1;
362            let slot = self.acquire_slot(func, layers).await?;
363
364            // Same fast reachability probe as `invoke`: detect a dead/black-holed
365            // warm instance in ~1s and fail over instead of hanging.
366            if !endpoint_reachable(&slot.entry.instance.endpoint, REACHABILITY_PROBE_TIMEOUT).await
367            {
368                let entry = slot.entry.clone();
369                drop(slot);
370                self.evict_entry(&func.function_name, &entry).await;
371                if attempt < MAX_INVOKE_ATTEMPTS {
372                    continue;
373                }
374                return Err(RuntimeError::InvocationFailed(format!(
375                    "no reachable warm instance for {} after {attempt} attempts",
376                    func.function_name
377                )));
378            }
379
380            let url = format!(
381                "http://{}/2015-03-31/functions/function/invocations",
382                slot.entry.instance.endpoint
383            );
384            let send = client
385                .post(&url)
386                .body(payload.to_vec())
387                .timeout(Duration::from_secs(func.timeout as u64 + 5))
388                .send()
389                .await;
390            match send {
391                Ok(resp) => {
392                    *slot.entry.last_used.write() = Instant::now();
393                    let Slot {
394                        entry: _entry,
395                        guard,
396                    } = slot;
397                    return Ok(StreamingInvocation {
398                        resp,
399                        _slot_guard: Some(guard),
400                    });
401                }
402                Err(e) => {
403                    // Same connect-only retry policy as `invoke`: retry
404                    // only when the connection never established, so a
405                    // half-run handler isn't invoked twice.
406                    let entry = slot.entry.clone();
407                    drop(slot);
408                    self.evict_entry(&func.function_name, &entry).await;
409                    if attempt < MAX_INVOKE_ATTEMPTS && e.is_connect() {
410                        continue;
411                    }
412                    return Err(RuntimeError::InvocationFailed(e.to_string()));
413                }
414            }
415        }
416    }
417
418    /// Reserve a warm instance to run exactly one invocation, returning a
419    /// held busy guard that grants exclusive use until it drops. Shared
420    /// by `invoke` and `invoke_streaming`.
421    ///
422    /// Order of preference: (1) a free, current-deploy instance already
423    /// in the pool; (2) a freshly launched instance, if the pool is below
424    /// `max_concurrency`; (3) queue on a busy current-deploy instance.
425    /// Instances whose `deploy_id` no longer matches the function's
426    /// current code+layers are torn down before sizing the pool.
427    async fn acquire_slot(
428        &self,
429        func: &LambdaFunction,
430        layers: &[Vec<u8>],
431    ) -> Result<Slot, RuntimeError> {
432        let is_image = func.package_type == "Image";
433        if !is_image && func.code_zip.is_none() {
434            return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
435        }
436
437        let deploy_id = deploy_id_for(func, layers);
438
439        // (1) Fast path: a free instance already running the right deploy.
440        if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
441            return Ok(slot);
442        }
443
444        // Serialize launch decisions per function so a burst of cold
445        // invokes doesn't each push the pool past the cap.
446        let startup_lock = {
447            let mut starting = self.starting.write();
448            starting
449                .entry(func.function_name.clone())
450                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
451                .clone()
452        };
453        let startup_guard = startup_lock.lock().await;
454
455        // Re-check under the startup lock — another task may have freed or
456        // launched an instance while we waited.
457        if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
458            return Ok(slot);
459        }
460
461        // Tear down any instances left over from a previous deploy.
462        self.evict_stale_deploy(&func.function_name, &deploy_id)
463            .await;
464
465        let pool_len = self
466            .instances
467            .read()
468            .get(&func.function_name)
469            .map_or(0, |v| v.len());
470
471        // (2) Room to grow: launch a fresh instance and reserve it.
472        if pool_len < self.max_concurrency {
473            let instance = self
474                .backend
475                .launch(func, func.code_zip.as_deref(), layers, &deploy_id)
476                .await?;
477            let entry = Arc::new(WarmEntry {
478                instance,
479                last_used: RwLock::new(Instant::now()),
480                deploy_id,
481                busy: Arc::new(tokio::sync::Mutex::new(())),
482            });
483            let guard = entry
484                .busy
485                .clone()
486                .try_lock_owned()
487                .expect("freshly created busy lock is uncontended");
488            self.instances
489                .write()
490                .entry(func.function_name.clone())
491                .or_default()
492                .push(entry.clone());
493            return Ok(Slot { entry, guard });
494        }
495
496        // (3) At capacity: release the startup lock and wait for whichever
497        // current instance frees up *first*. Racing every instance's lock
498        // (rather than blocking on a fixed one) avoids convoying every
499        // queued caller onto pool[0] while a different instance goes idle.
500        drop(startup_guard);
501        let candidates: Vec<Arc<WarmEntry>> = {
502            let map = self.instances.read();
503            map.get(&func.function_name)
504                .map(|pool| {
505                    pool.iter()
506                        .filter(|e| e.deploy_id == deploy_id)
507                        .cloned()
508                        .collect()
509                })
510                .unwrap_or_default()
511        };
512        if candidates.is_empty() {
513            return Err(RuntimeError::InvocationFailed(format!(
514                "no warm instance available for {}",
515                func.function_name
516            )));
517        }
518        let waiters = candidates.into_iter().map(|entry| {
519            Box::pin(async move {
520                let guard = entry.busy.clone().lock_owned().await;
521                Slot { entry, guard }
522            })
523        });
524        let (slot, _idx, _rest) = futures_util::future::select_all(waiters).await;
525        *slot.entry.last_used.write() = Instant::now();
526        Ok(slot)
527    }
528
529    /// Try to reserve a free, current-deploy instance without launching.
530    /// Returns `None` if every matching instance is busy (or there are
531    /// none).
532    fn try_take_free(&self, function_name: &str, deploy_id: &str) -> Option<Slot> {
533        let map = self.instances.read();
534        let pool = map.get(function_name)?;
535        for entry in pool {
536            if entry.deploy_id != deploy_id {
537                continue;
538            }
539            if let Ok(guard) = entry.busy.clone().try_lock_owned() {
540                *entry.last_used.write() = Instant::now();
541                return Some(Slot {
542                    entry: entry.clone(),
543                    guard,
544                });
545            }
546        }
547        None
548    }
549
550    /// Remove one specific instance from a function's pool and terminate
551    /// it. Used when an invocation finds the instance unreachable.
552    async fn evict_entry(&self, function_name: &str, target: &Arc<WarmEntry>) {
553        let removed = {
554            let mut map = self.instances.write();
555            match map.get_mut(function_name) {
556                Some(pool) => {
557                    let removed = pool
558                        .iter()
559                        .position(|e| Arc::ptr_eq(e, target))
560                        .map(|pos| pool.remove(pos));
561                    if pool.is_empty() {
562                        map.remove(function_name);
563                    }
564                    removed
565                }
566                None => None,
567            }
568        };
569        if let Some(entry) = removed {
570            tracing::info!(
571                function = %function_name,
572                handle = ?entry.instance.handle,
573                "evicting unreachable Lambda runtime instance"
574            );
575            self.backend.terminate(&entry.instance.handle).await;
576        }
577    }
578
579    /// Tear down every instance in a function's pool whose `deploy_id`
580    /// no longer matches the current code+layers fingerprint.
581    async fn evict_stale_deploy(&self, function_name: &str, deploy_id: &str) {
582        let stale: Vec<Arc<WarmEntry>> = {
583            let mut map = self.instances.write();
584            match map.get_mut(function_name) {
585                Some(pool) => {
586                    let mut stale = Vec::new();
587                    pool.retain(|e| {
588                        if e.deploy_id == deploy_id {
589                            true
590                        } else {
591                            stale.push(e.clone());
592                            false
593                        }
594                    });
595                    if pool.is_empty() {
596                        map.remove(function_name);
597                    }
598                    stale
599                }
600                None => Vec::new(),
601            }
602        };
603        for entry in stale {
604            tracing::info!(
605                function = %function_name,
606                handle = ?entry.instance.handle,
607                "stopping stale-deploy Lambda runtime instance"
608            );
609            self.backend.terminate(&entry.instance.handle).await;
610        }
611    }
612
613    /// Remove and return the warm pool for a function **without**
614    /// terminating it. Lets DeleteFunction snapshot exactly the instances
615    /// that exist at delete time and terminate those, so a concurrent
616    /// recreate of the same name (whose fresh warm instance is keyed
617    /// identically) is not reaped by the deferred stop. Synchronous so the
618    /// caller can take the snapshot while still ordered before any recreate
619    /// (bug-hunt 2026-06-13, finding 4.2).
620    pub(crate) fn take_warm_instances(&self, function_name: &str) -> Vec<Arc<WarmEntry>> {
621        self.instances
622            .write()
623            .remove(function_name)
624            .unwrap_or_default()
625    }
626
627    /// Terminate a previously-snapshotted set of warm instances. Pairs with
628    /// [`take_warm_instances`] for the delete path.
629    pub(crate) async fn terminate_instances(&self, pool: Vec<Arc<WarmEntry>>) {
630        for entry in pool {
631            tracing::info!(
632                handle = ?entry.instance.handle,
633                "stopping Lambda runtime instance"
634            );
635            self.backend.terminate(&entry.instance.handle).await;
636        }
637    }
638
639    /// Stop and remove every warm instance for a specific function.
640    pub async fn stop_container(&self, function_name: &str) {
641        let pool = self.take_warm_instances(function_name);
642        self.terminate_instances(pool).await;
643    }
644
645    /// Stop and remove all warm instances (used on server shutdown or reset).
646    pub async fn stop_all(&self) {
647        let pools: Vec<(String, Vec<Arc<WarmEntry>>)> =
648            { self.instances.write().drain().collect() };
649        for (name, pool) in pools {
650            for entry in pool {
651                tracing::info!(
652                    function = %name,
653                    handle = ?entry.instance.handle,
654                    "stopping Lambda runtime instance (cleanup)"
655                );
656                self.backend.terminate(&entry.instance.handle).await;
657            }
658        }
659    }
660
661    /// List all warm instances and their metadata for introspection.
662    /// One row per running instance — a function scaled to several warm
663    /// instances appears once per instance.
664    pub fn list_warm_containers(
665        &self,
666        lambda_state: &crate::state::SharedLambdaState,
667    ) -> Vec<serde_json::Value> {
668        let entries = self.instances.read();
669        let accounts = lambda_state.read();
670        let mut rows = Vec::new();
671        for (name, pool) in entries.iter() {
672            let runtime = accounts
673                .iter()
674                .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
675                .unwrap_or_default();
676            for entry in pool {
677                let idle_secs = entry.last_used.read().elapsed().as_secs();
678                let mut row = serde_json::json!({
679                    "functionName": name,
680                    "runtime": runtime,
681                    "backend": self.backend.name(),
682                    "lastUsedSecsAgo": idle_secs,
683                });
684                let obj = row.as_object_mut().expect("json object");
685                match &entry.instance.handle {
686                    BackendHandle::Container { id } => {
687                        obj.insert("containerId".into(), serde_json::Value::String(id.clone()));
688                    }
689                    BackendHandle::Pod { namespace, name } => {
690                        obj.insert("podName".into(), serde_json::Value::String(name.clone()));
691                        obj.insert(
692                            "namespace".into(),
693                            serde_json::Value::String(namespace.clone()),
694                        );
695                    }
696                }
697                rows.push(row);
698            }
699        }
700        rows
701    }
702
703    /// Evict (stop and remove) every warm instance for a specific
704    /// function. Returns true if at least one instance was evicted.
705    pub async fn evict_container(&self, function_name: &str) -> bool {
706        let pool = self
707            .instances
708            .write()
709            .remove(function_name)
710            .unwrap_or_default();
711        let found = !pool.is_empty();
712        for entry in pool {
713            tracing::info!(
714                function = %function_name,
715                handle = ?entry.instance.handle,
716                "evicting Lambda runtime instance via simulation API"
717            );
718            self.backend.terminate(&entry.instance.handle).await;
719        }
720        found
721    }
722
723    /// Background loop that stops instances idle longer than `ttl`.
724    pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
725        let mut interval = tokio::time::interval(Duration::from_secs(30));
726        loop {
727            interval.tick().await;
728            self.cleanup_idle(ttl).await;
729        }
730    }
731
732    async fn cleanup_idle(&self, ttl: Duration) {
733        // Reap individual instances that are both idle past the TTL and
734        // currently free (a busy instance is mid-invocation, so its
735        // `last_used` is fresh anyway — the `try_lock` check just avoids
736        // racing a slot that's about to be used).
737        let expired: Vec<(String, Arc<WarmEntry>)> = {
738            let mut map = self.instances.write();
739            let mut out = Vec::new();
740            for (name, pool) in map.iter_mut() {
741                let mut i = 0;
742                while i < pool.len() {
743                    let idle = pool[i].last_used.read().elapsed() > ttl;
744                    let free = pool[i].busy.try_lock().is_ok();
745                    if idle && free {
746                        out.push((name.clone(), pool.remove(i)));
747                    } else {
748                        i += 1;
749                    }
750                }
751            }
752            map.retain(|_, pool| !pool.is_empty());
753            out
754        };
755        for (name, entry) in expired {
756            tracing::info!(function = %name, "stopping idle Lambda runtime instance");
757            self.backend.terminate(&entry.instance.handle).await;
758        }
759    }
760}
761
762#[cfg(test)]
763mod tests {
764    use super::deploy_id_from;
765
766    /// The deploy id is spliced raw into the init-container artifact URL
767    /// path and into a Pod label, so it must never contain characters
768    /// that standard base64 emits (`/`, `+`, `=`). Standard base64
769    /// produced `/` for ~49% of code hashes (issue #1643); sweep a wide
770    /// range of inputs to catch any regression back to a non-URL-safe
771    /// alphabet.
772    #[test]
773    fn deploy_id_is_url_path_safe() {
774        for i in 0..2_000u32 {
775            // Vary both the code hash and the layer set.
776            let code_sha256 = format!("sha256-seed-{i}-{}", i.wrapping_mul(2_654_435_761));
777            let layers: Vec<Vec<u8>> = if i % 3 == 0 {
778                vec![format!("layer-{i}").into_bytes()]
779            } else {
780                vec![]
781            };
782            let id = deploy_id_from(&code_sha256, &layers);
783            assert!(
784                !id.contains('/') && !id.contains('+') && !id.contains('='),
785                "deploy id {id:?} (seed {i}) is not URL-path-safe"
786            );
787        }
788    }
789
790    /// Same inputs must always map to the same deploy id — the value is a
791    /// warm-pool cache key, so instability would defeat reuse.
792    #[test]
793    fn deploy_id_is_stable() {
794        let layers = vec![b"layer-a".to_vec(), b"layer-b".to_vec()];
795        let a = deploy_id_from("abc123", &layers);
796        let b = deploy_id_from("abc123", &layers);
797        assert_eq!(a, b);
798        assert_ne!(a, deploy_id_from("abc124", &layers));
799        assert_ne!(a, deploy_id_from("abc123", &[]));
800    }
801
802    // ---- warm-pool concurrency + eviction (issue #1644) ----
803
804    use super::LambdaRuntime;
805    use crate::runtime::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
806    use crate::state::LambdaFunction;
807    use parking_lot::RwLock;
808    use std::collections::{HashMap, VecDeque};
809    use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
810    use std::sync::Arc;
811    use std::sync::Mutex as StdMutex;
812    use std::time::Duration;
813
814    /// Backend double: counts launches/terminates and hands out endpoints
815    /// from a queue (falling back to `default_endpoint`), so a test can
816    /// inject a dead endpoint followed by a live one.
817    struct CountingBackend {
818        endpoints: StdMutex<VecDeque<String>>,
819        default_endpoint: String,
820        launches: AtomicUsize,
821        terminates: AtomicUsize,
822    }
823
824    impl CountingBackend {
825        fn new(default_endpoint: impl Into<String>) -> Arc<Self> {
826            Arc::new(Self {
827                endpoints: StdMutex::new(VecDeque::new()),
828                default_endpoint: default_endpoint.into(),
829                launches: AtomicUsize::new(0),
830                terminates: AtomicUsize::new(0),
831            })
832        }
833        fn with_queue(default_endpoint: impl Into<String>, queue: Vec<String>) -> Arc<Self> {
834            Arc::new(Self {
835                endpoints: StdMutex::new(queue.into()),
836                default_endpoint: default_endpoint.into(),
837                launches: AtomicUsize::new(0),
838                terminates: AtomicUsize::new(0),
839            })
840        }
841    }
842
843    #[async_trait::async_trait]
844    impl LambdaBackend for CountingBackend {
845        fn name(&self) -> &str {
846            "test"
847        }
848        async fn launch(
849            &self,
850            _func: &LambdaFunction,
851            _code_zip: Option<&[u8]>,
852            _layers: &[Vec<u8>],
853            _deploy_id: &str,
854        ) -> Result<WarmInstance, RuntimeError> {
855            let n = self.launches.fetch_add(1, SeqCst);
856            let endpoint = self
857                .endpoints
858                .lock()
859                .unwrap()
860                .pop_front()
861                .unwrap_or_else(|| self.default_endpoint.clone());
862            Ok(WarmInstance {
863                endpoint,
864                handle: BackendHandle::Container {
865                    id: format!("c{n}"),
866                },
867            })
868        }
869        async fn terminate(&self, _handle: &BackendHandle) {
870            self.terminates.fetch_add(1, SeqCst);
871        }
872    }
873
874    /// Spin a minimal in-process "RIE": one TCP accept loop that records
875    /// the peak number of simultaneously-open connections, holds each
876    /// request for `delay`, then returns a tiny 200. The peak directly
877    /// observes how many invocations overlapped on the instance(s) it
878    /// backs. Returns the `host:port` to use as a warm endpoint.
879    async fn spawn_rie(delay: Duration, peak: Arc<AtomicUsize>) -> String {
880        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
881        let addr = listener.local_addr().unwrap();
882        let cur = Arc::new(AtomicUsize::new(0));
883        tokio::spawn(async move {
884            loop {
885                let Ok((mut sock, _)) = listener.accept().await else {
886                    break;
887                };
888                let cur = cur.clone();
889                let peak = peak.clone();
890                tokio::spawn(async move {
891                    use tokio::io::{AsyncReadExt, AsyncWriteExt};
892                    // Only connections that actually send a request count as an
893                    // invocation. A bare reachability probe (TCP connect, no
894                    // bytes) reads EOF and is ignored — it neither overlaps a
895                    // real invoke nor inflates the concurrency peak, mirroring
896                    // the real RIE, which only starts an event on a request.
897                    let mut buf = [0u8; 1024];
898                    let n = sock.read(&mut buf).await.unwrap_or(0);
899                    if n == 0 {
900                        return;
901                    }
902                    let now = cur.fetch_add(1, SeqCst) + 1;
903                    peak.fetch_max(now, SeqCst);
904                    tokio::time::sleep(delay).await;
905                    let _ = sock
906                        .write_all(
907                            b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: close\r\n\r\nok",
908                        )
909                        .await;
910                    let _ = sock.flush().await;
911                    cur.fetch_sub(1, SeqCst);
912                });
913            }
914        });
915        format!("{addr}")
916    }
917
918    fn runtime_with(backend: Arc<CountingBackend>, max_concurrency: usize) -> Arc<LambdaRuntime> {
919        Arc::new(LambdaRuntime {
920            backend,
921            instances: RwLock::new(HashMap::new()),
922            starting: RwLock::new(HashMap::new()),
923            max_concurrency,
924        })
925    }
926
927    fn test_func(name: &str, sha: &str) -> LambdaFunction {
928        serde_json::from_value(serde_json::json!({
929            "function_name": name,
930            "function_arn": format!("arn:aws:lambda:us-east-1:123456789012:function:{name}"),
931            "runtime": "python3.12",
932            "role": "arn:aws:iam::123456789012:role/r",
933            "handler": "index.handler",
934            "description": "",
935            "timeout": 5,
936            "memory_size": 128,
937            "code_sha256": sha,
938            "code_size": 1,
939            "version": "$LATEST",
940            "last_modified": "2020-01-01T00:00:00Z",
941            "tags": {},
942            "environment": {},
943            "architectures": ["x86_64"],
944            "package_type": "Zip",
945            "code_zip": [1, 2, 3],
946            "policy": null
947        }))
948        .expect("build test LambdaFunction")
949    }
950
951    /// The core of #1644: with one warm instance, a burst of concurrent
952    /// invocations must be serialized onto it — never delivered in
953    /// parallel (which segfaults the real RIE). Peak overlap must be 1.
954    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
955    async fn concurrent_invokes_are_serialized_on_a_single_instance() {
956        let peak = Arc::new(AtomicUsize::new(0));
957        let endpoint = spawn_rie(Duration::from_millis(40), peak.clone()).await;
958        let backend = CountingBackend::new(endpoint);
959        let rt = runtime_with(backend.clone(), 1);
960        let func = test_func("conc", "sha-A");
961
962        let mut handles = Vec::new();
963        for _ in 0..8 {
964            let rt = rt.clone();
965            let func = func.clone();
966            handles.push(tokio::spawn(
967                async move { rt.invoke(&func, b"{}", &[]).await },
968            ));
969        }
970        for h in handles {
971            h.await.unwrap().expect("invoke ok");
972        }
973
974        assert_eq!(
975            peak.load(SeqCst),
976            1,
977            "concurrent invokes overlapped on a single RIE instance"
978        );
979        assert_eq!(
980            backend.launches.load(SeqCst),
981            1,
982            "max_concurrency=1 must launch exactly one instance"
983        );
984    }
985
986    /// Under load the pool grows beyond one instance but never past the
987    /// cap, so genuine concurrency is served (AWS semantics) without an
988    /// unbounded container/Pod fan-out.
989    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
990    async fn pool_scales_under_load_and_respects_cap() {
991        let peak = Arc::new(AtomicUsize::new(0));
992        let endpoint = spawn_rie(Duration::from_millis(60), peak.clone()).await;
993        let backend = CountingBackend::new(endpoint);
994        let rt = runtime_with(backend.clone(), 4);
995        let func = test_func("scale", "sha-A");
996
997        let mut handles = Vec::new();
998        for _ in 0..8 {
999            let rt = rt.clone();
1000            let func = func.clone();
1001            handles.push(tokio::spawn(
1002                async move { rt.invoke(&func, b"{}", &[]).await },
1003            ));
1004        }
1005        for h in handles {
1006            h.await.unwrap().expect("invoke ok");
1007        }
1008
1009        let launched = backend.launches.load(SeqCst);
1010        assert!(
1011            (2..=4).contains(&launched),
1012            "expected the pool to scale within the cap, launched={launched}"
1013        );
1014        assert!(
1015            peak.load(SeqCst) > 1,
1016            "expected concurrent forwards across the scaled pool"
1017        );
1018    }
1019
1020    /// An unreachable instance (dead Pod/container) must be evicted and
1021    /// the invocation retried against a fresh cold start, instead of
1022    /// wedging the function forever.
1023    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1024    async fn dead_instance_is_evicted_and_retried() {
1025        let peak = Arc::new(AtomicUsize::new(0));
1026        let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
1027        // Two launches in a row hand back refused ports; the third gets the
1028        // live one. Connect-refused provably never reached a handler, so the
1029        // extra retry budget can't double-execute.
1030        let backend = CountingBackend::with_queue(
1031            live,
1032            vec!["127.0.0.1:1".to_string(), "127.0.0.1:1".to_string()],
1033        );
1034        let rt = runtime_with(backend.clone(), 1);
1035
1036        let out = rt
1037            .invoke(&test_func("dead", "sha-A"), b"{}", &[])
1038            .await
1039            .expect("should recover via cold-start retry");
1040        assert_eq!(out, b"ok");
1041        assert_eq!(
1042            backend.launches.load(SeqCst),
1043            3,
1044            "expected two dead instances plus one cold-start replacement"
1045        );
1046        assert!(
1047            backend.terminates.load(SeqCst) >= 2,
1048            "both dead instances should have been terminated on eviction"
1049        );
1050    }
1051
1052    /// A black-holed warm instance (a killed Pod whose IP now drops packets,
1053    /// rather than refusing with an RST) must be detected by the fast
1054    /// reachability probe and failed over in seconds — not hang the full invoke
1055    /// timeout (~`func.timeout + 5s`). This is the failure mode that flaked the
1056    /// LOE workflows: a dead pod cost ~305s, blowing the test window.
1057    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1058    async fn black_holed_instance_fails_over_fast() {
1059        let peak = Arc::new(AtomicUsize::new(0));
1060        let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
1061        // 192.0.2.1 is RFC 5737 TEST-NET-1: unrouted, so a connect black-holes
1062        // (caught by the probe's ~1.5s timeout) instead of getting a fast RST.
1063        let backend = CountingBackend::with_queue(live, vec!["192.0.2.1:9".to_string()]);
1064        let rt = runtime_with(backend.clone(), 1);
1065
1066        // test_func timeout is 5 -> the pre-fix hang would be ~10s.
1067        let func = test_func("blackhole", "sha-A");
1068        let started = std::time::Instant::now();
1069        let out = rt
1070            .invoke(&func, b"{}", &[])
1071            .await
1072            .expect("should recover via cold-start retry");
1073        let elapsed = started.elapsed();
1074
1075        assert_eq!(out, b"ok");
1076        assert_eq!(
1077            backend.launches.load(SeqCst),
1078            2,
1079            "expected one black-holed instance plus one cold-start replacement"
1080        );
1081        assert!(
1082            backend.terminates.load(SeqCst) >= 1,
1083            "the black-holed instance should have been evicted"
1084        );
1085        assert!(
1086            elapsed < Duration::from_secs(5),
1087            "failover took {elapsed:?}; must be far below the ~10s invoke timeout"
1088        );
1089    }
1090
1091    /// Changing the function's code (a new deploy id) tears down the
1092    /// stale warm instance before serving against a fresh one.
1093    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1094    async fn deploy_change_evicts_stale_instance() {
1095        let peak = Arc::new(AtomicUsize::new(0));
1096        let endpoint = spawn_rie(Duration::from_millis(5), peak.clone()).await;
1097        let backend = CountingBackend::new(endpoint);
1098        let rt = runtime_with(backend.clone(), 2);
1099
1100        rt.invoke(&test_func("upd", "sha-A"), b"{}", &[])
1101            .await
1102            .unwrap();
1103        rt.invoke(&test_func("upd", "sha-B"), b"{}", &[])
1104            .await
1105            .unwrap();
1106
1107        assert_eq!(
1108            backend.launches.load(SeqCst),
1109            2,
1110            "a new deploy id should launch a fresh instance"
1111        );
1112        assert!(
1113            backend.terminates.load(SeqCst) >= 1,
1114            "the stale-deploy instance should have been torn down"
1115        );
1116        // Exactly one current instance remains in the pool.
1117        let pool_len = rt.instances.read().get("upd").map_or(0, |v| v.len());
1118        assert_eq!(pool_len, 1);
1119    }
1120
1121    /// bug-hunt 2026-06-13, finding 4.2: DeleteFunction must snapshot the
1122    /// warm pool and terminate *that snapshot*, not whatever pool exists
1123    /// when the deferred stop runs. Otherwise a CreateFunction + warm-up of
1124    /// the same name racing ahead of the stop has its fresh container
1125    /// reaped. This exercises the snapshot primitives directly.
1126    #[tokio::test]
1127    async fn take_warm_instances_snapshot_does_not_reap_recreated_pool() {
1128        let backend = CountingBackend::new("127.0.0.1:1");
1129        let rt = runtime_with(backend.clone(), 10);
1130        let mk = |id: &str| {
1131            Arc::new(super::WarmEntry {
1132                instance: WarmInstance {
1133                    endpoint: "127.0.0.1:1".to_string(),
1134                    handle: BackendHandle::Container { id: id.to_string() },
1135                },
1136                last_used: RwLock::new(std::time::Instant::now()),
1137                deploy_id: "d".to_string(),
1138                busy: Arc::new(tokio::sync::Mutex::new(())),
1139            })
1140        };
1141
1142        // A function "f" with one warm instance.
1143        rt.instances
1144            .write()
1145            .insert("f".to_string(), vec![mk("old")]);
1146
1147        // Delete snapshots the pool synchronously and removes it from the map.
1148        let snapshot = rt.take_warm_instances("f");
1149        assert_eq!(snapshot.len(), 1);
1150        assert!(rt.instances.read().get("f").is_none());
1151
1152        // A recreate + warm-up of the same name wins the race ahead of the
1153        // deferred terminate.
1154        rt.instances
1155            .write()
1156            .insert("f".to_string(), vec![mk("new")]);
1157
1158        // Terminating the snapshot must touch only the old instance.
1159        rt.terminate_instances(snapshot).await;
1160        assert_eq!(
1161            backend.terminates.load(SeqCst),
1162            1,
1163            "only the snapshotted instance is terminated"
1164        );
1165
1166        // The recreated function keeps its fresh warm instance.
1167        let pool = rt.instances.read();
1168        let f = pool.get("f").expect("recreated function pool must survive");
1169        assert_eq!(f.len(), 1);
1170    }
1171}