Skip to main content

fakecloud_lambda/runtime/
facade.rs

1//! Backend-agnostic Lambda runtime facade.
2//!
3//! Owns the warm-pool bookkeeping, per-function startup serialization,
4//! and the HTTP invocation path. Dispatches container lifecycle to
5//! whatever [`LambdaBackend`] it was constructed with.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use base64::Engine;
12use parking_lot::RwLock;
13use sha2::{Digest, Sha256};
14
15use super::backend::{
16    BackendHandle, LambdaBackend, RuntimeError, StreamingInvocation, WarmInstance,
17};
18use super::docker::DockerBackend;
19use crate::state::LambdaFunction;
20
21/// A running runtime instance kept warm for reuse.
22struct WarmEntry {
23    instance: WarmInstance,
24    last_used: RwLock<Instant>,
25    /// Combined fingerprint of the function's code SHA-256 plus the
26    /// SHA-256 of every attached layer's ZIP bytes, joined in attach
27    /// order. Layers mutate `/opt`, so a layer change invalidates the
28    /// warm instance even when the function code is unchanged.
29    deploy_id: String,
30    /// Held for the duration of a single invocation against this
31    /// instance. The AWS Runtime Interface Emulator (and real Lambda)
32    /// handles exactly one event per execution environment at a time;
33    /// the RIE's `rapidcore` server nil-pointer-derefs and the process
34    /// exits if two invokes overlap (issue #1644). Acquiring this lock
35    /// before forwarding guarantees one in-flight event per instance,
36    /// and lets the pool pick a *free* instance via `try_lock`.
37    busy: Arc<tokio::sync::Mutex<()>>,
38}
39
40/// Default cap on warm instances per function. Real Lambda scales
41/// execution environments with concurrent demand; we bound it so a
42/// burst can't spawn unbounded containers/Pods. Override with
43/// `FAKECLOUD_LAMBDA_MAX_CONCURRENCY`. Beyond the cap, invocations queue
44/// on a busy instance rather than starting a new one.
45const DEFAULT_MAX_CONCURRENCY: usize = 10;
46
47/// A reserved invocation slot: a warm instance plus the held busy guard
48/// that grants exclusive use of it until the guard drops.
49struct Slot {
50    entry: Arc<WarmEntry>,
51    guard: tokio::sync::OwnedMutexGuard<()>,
52}
53
54/// Compute the warm-instance key for a function with its current layer
55/// set. Stable across calls — layer ARNs are immutable in AWS, so the
56/// hash of their bytes is the right cache key.
57///
58/// Encoded with `URL_SAFE_NO_PAD` so the result never contains `/`, `+`,
59/// or `=`. The id is spliced raw into the init-container artifact URL
60/// (`.../_internal/code/{account}/{function}/{deploy}.zip`) and into the
61/// `fakecloud-deploy-id` Pod label; standard base64's `/` would grow an
62/// extra URL path segment, break the axum route match, and wedge the Pod
63/// in a cold-start loop for ~49% of deploys (issue #1643).
64fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
65    deploy_id_from(&func.code_sha256, layers)
66}
67
68/// Pure core of [`deploy_id_for`], split out so the URL-path-safety
69/// invariant can be tested without constructing a full `LambdaFunction`.
70fn deploy_id_from(code_sha256: &str, layers: &[Vec<u8>]) -> String {
71    let mut hasher = Sha256::new();
72    hasher.update(code_sha256.as_bytes());
73    for bytes in layers {
74        let mut layer_hasher = Sha256::new();
75        layer_hasher.update(bytes);
76        hasher.update(b":");
77        hasher.update(layer_hasher.finalize());
78    }
79    base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(hasher.finalize())
80}
81
82pub struct LambdaRuntime {
83    backend: Arc<dyn LambdaBackend>,
84    /// Per-function pool of warm instances. Each instance serves one
85    /// invocation at a time (its `busy` lock); the pool grows on demand
86    /// up to `max_concurrency`, and the idle reaper trims it.
87    instances: RwLock<HashMap<String, Vec<Arc<WarmEntry>>>>,
88    /// Serializes runtime startup per function to prevent duplicate
89    /// instances racing into the pool when several cold invokes arrive
90    /// together.
91    starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
92    /// Cap on warm instances per function.
93    max_concurrency: usize,
94}
95
96impl LambdaRuntime {
97    /// Construct a runtime over the supplied backend. Callers that want
98    /// auto-detection should use [`Self::auto_detect_docker`] or
99    /// [`Self::new`].
100    pub fn from_backend(backend: Arc<dyn LambdaBackend>) -> Self {
101        let max_concurrency = std::env::var("FAKECLOUD_LAMBDA_MAX_CONCURRENCY")
102            .ok()
103            .and_then(|v| v.parse::<usize>().ok())
104            .filter(|n| *n >= 1)
105            .unwrap_or(DEFAULT_MAX_CONCURRENCY);
106        Self {
107            backend,
108            instances: RwLock::new(HashMap::new()),
109            starting: RwLock::new(HashMap::new()),
110            max_concurrency,
111        }
112    }
113
114    /// Auto-detect Docker or Podman. Returns `None` if neither is available.
115    /// Override with `FAKECLOUD_CONTAINER_CLI` env var.
116    pub fn auto_detect_docker(server_port: u16) -> Option<Self> {
117        DockerBackend::auto_detect(server_port)
118            .map(|b| Self::from_backend(Arc::new(b) as Arc<dyn LambdaBackend>))
119    }
120
121    /// Backwards-compatible alias for [`Self::auto_detect_docker`].
122    /// Callers across the workspace use `ContainerRuntime::new(port)`.
123    pub fn new(server_port: u16) -> Option<Self> {
124        Self::auto_detect_docker(server_port)
125    }
126
127    /// Construct a runtime backed by the Kubernetes backend. Reads
128    /// configuration from env vars (`FAKECLOUD_K8S_SELF_URL`,
129    /// `FAKECLOUD_K8S_NAMESPACE`, etc.) and connects to the cluster
130    /// via in-cluster service account or kubeconfig. Hard-fails on
131    /// any configuration or connectivity issue — we don't silently
132    /// fall back to Docker because the operator explicitly opted in
133    /// to K8s.
134    ///
135    /// `internal_token` is the bearer token the artifact endpoints on
136    /// the fakecloud server expect from Pod init containers — caller
137    /// must register the same token on those endpoints.
138    pub async fn new_k8s(
139        server_port: u16,
140        internal_token: String,
141    ) -> Result<Self, super::k8s::K8sBackendError> {
142        let backend = super::k8s::K8sBackend::from_env(server_port, internal_token).await?;
143        backend.reap_stale().await;
144        Ok(Self::from_backend(Arc::new(backend)))
145    }
146
147    pub fn cli_name(&self) -> &str {
148        self.backend.name()
149    }
150
151    /// Background pre-warm hook: pull the image a Zip-package function
152    /// will need at invoke time, or the `ImageUri` of an Image-package
153    /// function. The first cold pull of an AWS base image (~700 MB)
154    /// frequently exceeds the AWS CLI default 60s read timeout, surfacing
155    /// to users as `Connection was closed` (issue #1539). Call after
156    /// `CreateFunction` persists so the warm path is ready before the
157    /// caller turns around and calls `Invoke`.
158    ///
159    /// Returns `None` if the function has no resolvable image (e.g. an
160    /// unsupported runtime string we can't map to a base image).
161    /// Otherwise returns the result of the backend's `prepull_image` —
162    /// callers log failures and move on, since invoke time still
163    /// re-attempts the pull as a fallback.
164    pub async fn prepull_for_function(
165        &self,
166        func: &LambdaFunction,
167    ) -> Option<Result<(), super::backend::RuntimeError>> {
168        let image = if func.package_type == "Image" {
169            func.image_uri.clone()?
170        } else {
171            super::docker::runtime_to_image(&func.runtime)?
172        };
173        Some(self.backend.prepull_image(&image).await)
174    }
175
176    /// Invoke a Lambda function, starting an instance if needed. Layer
177    /// ZIPs are extracted into `/opt` of the runtime sandbox; AWS base
178    /// images already include `/opt/python`, `/opt/nodejs/node_modules`,
179    /// `/opt/lib`, and `/opt/bin` on the right import paths.
180    ///
181    /// Reserves a warm instance for the call (one in-flight invocation
182    /// per instance — the RIE crashes on overlap, issue #1644). If the
183    /// instance is unreachable (dead Pod/container from a node drain, OOM,
184    /// or prior crash) it is evicted and the call retried once against a
185    /// freshly cold-started instance, so a dead instance can't wedge the
186    /// function permanently.
187    pub async fn invoke(
188        &self,
189        func: &LambdaFunction,
190        payload: &[u8],
191        layers: &[Vec<u8>],
192    ) -> Result<Vec<u8>, RuntimeError> {
193        let client = reqwest::Client::new();
194        let mut attempt = 0;
195        loop {
196            attempt += 1;
197            let slot = self.acquire_slot(func, layers).await?;
198            let url = format!(
199                "http://{}/2015-03-31/functions/function/invocations",
200                slot.entry.instance.endpoint
201            );
202            let send = client
203                .post(&url)
204                .body(payload.to_vec())
205                .timeout(Duration::from_secs(func.timeout as u64 + 5))
206                .send()
207                .await;
208            match send {
209                Ok(resp) => {
210                    let body = resp.bytes().await;
211                    *slot.entry.last_used.write() = Instant::now();
212                    return match body {
213                        Ok(b) => Ok(b.to_vec()),
214                        Err(e) => {
215                            // Response failed mid-stream — the instance is
216                            // suspect. Evict it but don't retry: the
217                            // function already ran and may have side effects.
218                            let entry = slot.entry.clone();
219                            drop(slot);
220                            self.evict_entry(&func.function_name, &entry).await;
221                            Err(RuntimeError::InvocationFailed(e.to_string()))
222                        }
223                    };
224                }
225                Err(e) => {
226                    // Transport-level failure. Evict the suspect instance.
227                    // Only retry when the connection was never
228                    // established (`is_connect` — e.g. refused by a dead
229                    // Pod): then the request provably never reached the
230                    // function, so a cold-start retry can't double-execute
231                    // it. A reset/timeout mid-flight may have already run
232                    // the handler, so surface those instead of risking a
233                    // duplicate invoke.
234                    let entry = slot.entry.clone();
235                    drop(slot);
236                    self.evict_entry(&func.function_name, &entry).await;
237                    if attempt < 2 && e.is_connect() {
238                        tracing::warn!(
239                            function = %func.function_name,
240                            error = %e,
241                            "warm Lambda instance unreachable; evicted, retrying with a cold start"
242                        );
243                        continue;
244                    }
245                    return Err(RuntimeError::InvocationFailed(e.to_string()));
246                }
247            }
248        }
249    }
250
251    /// Invoke a Lambda function and yield the raw HTTP body as a stream
252    /// of byte chunks. Each chunk corresponds to one HTTP frame the RIE
253    /// flushed to the wire — for streaming-aware handlers this
254    /// preserves the chunk boundaries the function emitted. Buffered
255    /// handlers come back as a single chunk, which is still a valid
256    /// streamed response.
257    ///
258    /// The reserved instance's busy guard travels with the returned
259    /// [`StreamingInvocation`] so the slot stays held until the caller
260    /// finishes draining the stream.
261    pub async fn invoke_streaming(
262        &self,
263        func: &LambdaFunction,
264        payload: &[u8],
265        layers: &[Vec<u8>],
266    ) -> Result<StreamingInvocation, RuntimeError> {
267        let client = reqwest::Client::new();
268        let mut attempt = 0;
269        loop {
270            attempt += 1;
271            let slot = self.acquire_slot(func, layers).await?;
272            let url = format!(
273                "http://{}/2015-03-31/functions/function/invocations",
274                slot.entry.instance.endpoint
275            );
276            let send = client
277                .post(&url)
278                .body(payload.to_vec())
279                .timeout(Duration::from_secs(func.timeout as u64 + 5))
280                .send()
281                .await;
282            match send {
283                Ok(resp) => {
284                    *slot.entry.last_used.write() = Instant::now();
285                    let Slot {
286                        entry: _entry,
287                        guard,
288                    } = slot;
289                    return Ok(StreamingInvocation {
290                        resp,
291                        _slot_guard: Some(guard),
292                    });
293                }
294                Err(e) => {
295                    // Same connect-only retry policy as `invoke`: retry
296                    // only when the connection never established, so a
297                    // half-run handler isn't invoked twice.
298                    let entry = slot.entry.clone();
299                    drop(slot);
300                    self.evict_entry(&func.function_name, &entry).await;
301                    if attempt < 2 && e.is_connect() {
302                        continue;
303                    }
304                    return Err(RuntimeError::InvocationFailed(e.to_string()));
305                }
306            }
307        }
308    }
309
310    /// Reserve a warm instance to run exactly one invocation, returning a
311    /// held busy guard that grants exclusive use until it drops. Shared
312    /// by `invoke` and `invoke_streaming`.
313    ///
314    /// Order of preference: (1) a free, current-deploy instance already
315    /// in the pool; (2) a freshly launched instance, if the pool is below
316    /// `max_concurrency`; (3) queue on a busy current-deploy instance.
317    /// Instances whose `deploy_id` no longer matches the function's
318    /// current code+layers are torn down before sizing the pool.
319    async fn acquire_slot(
320        &self,
321        func: &LambdaFunction,
322        layers: &[Vec<u8>],
323    ) -> Result<Slot, RuntimeError> {
324        let is_image = func.package_type == "Image";
325        if !is_image && func.code_zip.is_none() {
326            return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
327        }
328
329        let deploy_id = deploy_id_for(func, layers);
330
331        // (1) Fast path: a free instance already running the right deploy.
332        if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
333            return Ok(slot);
334        }
335
336        // Serialize launch decisions per function so a burst of cold
337        // invokes doesn't each push the pool past the cap.
338        let startup_lock = {
339            let mut starting = self.starting.write();
340            starting
341                .entry(func.function_name.clone())
342                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
343                .clone()
344        };
345        let startup_guard = startup_lock.lock().await;
346
347        // Re-check under the startup lock — another task may have freed or
348        // launched an instance while we waited.
349        if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
350            return Ok(slot);
351        }
352
353        // Tear down any instances left over from a previous deploy.
354        self.evict_stale_deploy(&func.function_name, &deploy_id)
355            .await;
356
357        let pool_len = self
358            .instances
359            .read()
360            .get(&func.function_name)
361            .map_or(0, |v| v.len());
362
363        // (2) Room to grow: launch a fresh instance and reserve it.
364        if pool_len < self.max_concurrency {
365            let instance = self
366                .backend
367                .launch(func, func.code_zip.as_deref(), layers, &deploy_id)
368                .await?;
369            let entry = Arc::new(WarmEntry {
370                instance,
371                last_used: RwLock::new(Instant::now()),
372                deploy_id,
373                busy: Arc::new(tokio::sync::Mutex::new(())),
374            });
375            let guard = entry
376                .busy
377                .clone()
378                .try_lock_owned()
379                .expect("freshly created busy lock is uncontended");
380            self.instances
381                .write()
382                .entry(func.function_name.clone())
383                .or_default()
384                .push(entry.clone());
385            return Ok(Slot { entry, guard });
386        }
387
388        // (3) At capacity: release the startup lock and wait for whichever
389        // current instance frees up *first*. Racing every instance's lock
390        // (rather than blocking on a fixed one) avoids convoying every
391        // queued caller onto pool[0] while a different instance goes idle.
392        drop(startup_guard);
393        let candidates: Vec<Arc<WarmEntry>> = {
394            let map = self.instances.read();
395            map.get(&func.function_name)
396                .map(|pool| {
397                    pool.iter()
398                        .filter(|e| e.deploy_id == deploy_id)
399                        .cloned()
400                        .collect()
401                })
402                .unwrap_or_default()
403        };
404        if candidates.is_empty() {
405            return Err(RuntimeError::InvocationFailed(format!(
406                "no warm instance available for {}",
407                func.function_name
408            )));
409        }
410        let waiters = candidates.into_iter().map(|entry| {
411            Box::pin(async move {
412                let guard = entry.busy.clone().lock_owned().await;
413                Slot { entry, guard }
414            })
415        });
416        let (slot, _idx, _rest) = futures_util::future::select_all(waiters).await;
417        *slot.entry.last_used.write() = Instant::now();
418        Ok(slot)
419    }
420
421    /// Try to reserve a free, current-deploy instance without launching.
422    /// Returns `None` if every matching instance is busy (or there are
423    /// none).
424    fn try_take_free(&self, function_name: &str, deploy_id: &str) -> Option<Slot> {
425        let map = self.instances.read();
426        let pool = map.get(function_name)?;
427        for entry in pool {
428            if entry.deploy_id != deploy_id {
429                continue;
430            }
431            if let Ok(guard) = entry.busy.clone().try_lock_owned() {
432                *entry.last_used.write() = Instant::now();
433                return Some(Slot {
434                    entry: entry.clone(),
435                    guard,
436                });
437            }
438        }
439        None
440    }
441
442    /// Remove one specific instance from a function's pool and terminate
443    /// it. Used when an invocation finds the instance unreachable.
444    async fn evict_entry(&self, function_name: &str, target: &Arc<WarmEntry>) {
445        let removed = {
446            let mut map = self.instances.write();
447            match map.get_mut(function_name) {
448                Some(pool) => {
449                    let removed = pool
450                        .iter()
451                        .position(|e| Arc::ptr_eq(e, target))
452                        .map(|pos| pool.remove(pos));
453                    if pool.is_empty() {
454                        map.remove(function_name);
455                    }
456                    removed
457                }
458                None => None,
459            }
460        };
461        if let Some(entry) = removed {
462            tracing::info!(
463                function = %function_name,
464                handle = ?entry.instance.handle,
465                "evicting unreachable Lambda runtime instance"
466            );
467            self.backend.terminate(&entry.instance.handle).await;
468        }
469    }
470
471    /// Tear down every instance in a function's pool whose `deploy_id`
472    /// no longer matches the current code+layers fingerprint.
473    async fn evict_stale_deploy(&self, function_name: &str, deploy_id: &str) {
474        let stale: Vec<Arc<WarmEntry>> = {
475            let mut map = self.instances.write();
476            match map.get_mut(function_name) {
477                Some(pool) => {
478                    let mut stale = Vec::new();
479                    pool.retain(|e| {
480                        if e.deploy_id == deploy_id {
481                            true
482                        } else {
483                            stale.push(e.clone());
484                            false
485                        }
486                    });
487                    if pool.is_empty() {
488                        map.remove(function_name);
489                    }
490                    stale
491                }
492                None => Vec::new(),
493            }
494        };
495        for entry in stale {
496            tracing::info!(
497                function = %function_name,
498                handle = ?entry.instance.handle,
499                "stopping stale-deploy Lambda runtime instance"
500            );
501            self.backend.terminate(&entry.instance.handle).await;
502        }
503    }
504
505    /// Stop and remove every warm instance for a specific function.
506    pub async fn stop_container(&self, function_name: &str) {
507        let pool = self
508            .instances
509            .write()
510            .remove(function_name)
511            .unwrap_or_default();
512        for entry in pool {
513            tracing::info!(
514                function = %function_name,
515                handle = ?entry.instance.handle,
516                "stopping Lambda runtime instance"
517            );
518            self.backend.terminate(&entry.instance.handle).await;
519        }
520    }
521
522    /// Stop and remove all warm instances (used on server shutdown or reset).
523    pub async fn stop_all(&self) {
524        let pools: Vec<(String, Vec<Arc<WarmEntry>>)> =
525            { self.instances.write().drain().collect() };
526        for (name, pool) in pools {
527            for entry in pool {
528                tracing::info!(
529                    function = %name,
530                    handle = ?entry.instance.handle,
531                    "stopping Lambda runtime instance (cleanup)"
532                );
533                self.backend.terminate(&entry.instance.handle).await;
534            }
535        }
536    }
537
538    /// List all warm instances and their metadata for introspection.
539    /// One row per running instance — a function scaled to several warm
540    /// instances appears once per instance.
541    pub fn list_warm_containers(
542        &self,
543        lambda_state: &crate::state::SharedLambdaState,
544    ) -> Vec<serde_json::Value> {
545        let entries = self.instances.read();
546        let accounts = lambda_state.read();
547        let mut rows = Vec::new();
548        for (name, pool) in entries.iter() {
549            let runtime = accounts
550                .iter()
551                .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
552                .unwrap_or_default();
553            for entry in pool {
554                let idle_secs = entry.last_used.read().elapsed().as_secs();
555                let mut row = serde_json::json!({
556                    "functionName": name,
557                    "runtime": runtime,
558                    "backend": self.backend.name(),
559                    "lastUsedSecsAgo": idle_secs,
560                });
561                let obj = row.as_object_mut().expect("json object");
562                match &entry.instance.handle {
563                    BackendHandle::Container { id } => {
564                        obj.insert("containerId".into(), serde_json::Value::String(id.clone()));
565                    }
566                    BackendHandle::Pod { namespace, name } => {
567                        obj.insert("podName".into(), serde_json::Value::String(name.clone()));
568                        obj.insert(
569                            "namespace".into(),
570                            serde_json::Value::String(namespace.clone()),
571                        );
572                    }
573                }
574                rows.push(row);
575            }
576        }
577        rows
578    }
579
580    /// Evict (stop and remove) every warm instance for a specific
581    /// function. Returns true if at least one instance was evicted.
582    pub async fn evict_container(&self, function_name: &str) -> bool {
583        let pool = self
584            .instances
585            .write()
586            .remove(function_name)
587            .unwrap_or_default();
588        let found = !pool.is_empty();
589        for entry in pool {
590            tracing::info!(
591                function = %function_name,
592                handle = ?entry.instance.handle,
593                "evicting Lambda runtime instance via simulation API"
594            );
595            self.backend.terminate(&entry.instance.handle).await;
596        }
597        found
598    }
599
600    /// Background loop that stops instances idle longer than `ttl`.
601    pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
602        let mut interval = tokio::time::interval(Duration::from_secs(30));
603        loop {
604            interval.tick().await;
605            self.cleanup_idle(ttl).await;
606        }
607    }
608
609    async fn cleanup_idle(&self, ttl: Duration) {
610        // Reap individual instances that are both idle past the TTL and
611        // currently free (a busy instance is mid-invocation, so its
612        // `last_used` is fresh anyway — the `try_lock` check just avoids
613        // racing a slot that's about to be used).
614        let expired: Vec<(String, Arc<WarmEntry>)> = {
615            let mut map = self.instances.write();
616            let mut out = Vec::new();
617            for (name, pool) in map.iter_mut() {
618                let mut i = 0;
619                while i < pool.len() {
620                    let idle = pool[i].last_used.read().elapsed() > ttl;
621                    let free = pool[i].busy.try_lock().is_ok();
622                    if idle && free {
623                        out.push((name.clone(), pool.remove(i)));
624                    } else {
625                        i += 1;
626                    }
627                }
628            }
629            map.retain(|_, pool| !pool.is_empty());
630            out
631        };
632        for (name, entry) in expired {
633            tracing::info!(function = %name, "stopping idle Lambda runtime instance");
634            self.backend.terminate(&entry.instance.handle).await;
635        }
636    }
637}
638
639#[cfg(test)]
640mod tests {
641    use super::deploy_id_from;
642
643    /// The deploy id is spliced raw into the init-container artifact URL
644    /// path and into a Pod label, so it must never contain characters
645    /// that standard base64 emits (`/`, `+`, `=`). Standard base64
646    /// produced `/` for ~49% of code hashes (issue #1643); sweep a wide
647    /// range of inputs to catch any regression back to a non-URL-safe
648    /// alphabet.
649    #[test]
650    fn deploy_id_is_url_path_safe() {
651        for i in 0..2_000u32 {
652            // Vary both the code hash and the layer set.
653            let code_sha256 = format!("sha256-seed-{i}-{}", i.wrapping_mul(2_654_435_761));
654            let layers: Vec<Vec<u8>> = if i % 3 == 0 {
655                vec![format!("layer-{i}").into_bytes()]
656            } else {
657                vec![]
658            };
659            let id = deploy_id_from(&code_sha256, &layers);
660            assert!(
661                !id.contains('/') && !id.contains('+') && !id.contains('='),
662                "deploy id {id:?} (seed {i}) is not URL-path-safe"
663            );
664        }
665    }
666
667    /// Same inputs must always map to the same deploy id — the value is a
668    /// warm-pool cache key, so instability would defeat reuse.
669    #[test]
670    fn deploy_id_is_stable() {
671        let layers = vec![b"layer-a".to_vec(), b"layer-b".to_vec()];
672        let a = deploy_id_from("abc123", &layers);
673        let b = deploy_id_from("abc123", &layers);
674        assert_eq!(a, b);
675        assert_ne!(a, deploy_id_from("abc124", &layers));
676        assert_ne!(a, deploy_id_from("abc123", &[]));
677    }
678
679    // ---- warm-pool concurrency + eviction (issue #1644) ----
680
681    use super::LambdaRuntime;
682    use crate::runtime::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
683    use crate::state::LambdaFunction;
684    use parking_lot::RwLock;
685    use std::collections::{HashMap, VecDeque};
686    use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
687    use std::sync::Arc;
688    use std::sync::Mutex as StdMutex;
689    use std::time::Duration;
690
691    /// Backend double: counts launches/terminates and hands out endpoints
692    /// from a queue (falling back to `default_endpoint`), so a test can
693    /// inject a dead endpoint followed by a live one.
694    struct CountingBackend {
695        endpoints: StdMutex<VecDeque<String>>,
696        default_endpoint: String,
697        launches: AtomicUsize,
698        terminates: AtomicUsize,
699    }
700
701    impl CountingBackend {
702        fn new(default_endpoint: impl Into<String>) -> Arc<Self> {
703            Arc::new(Self {
704                endpoints: StdMutex::new(VecDeque::new()),
705                default_endpoint: default_endpoint.into(),
706                launches: AtomicUsize::new(0),
707                terminates: AtomicUsize::new(0),
708            })
709        }
710        fn with_queue(default_endpoint: impl Into<String>, queue: Vec<String>) -> Arc<Self> {
711            Arc::new(Self {
712                endpoints: StdMutex::new(queue.into()),
713                default_endpoint: default_endpoint.into(),
714                launches: AtomicUsize::new(0),
715                terminates: AtomicUsize::new(0),
716            })
717        }
718    }
719
720    #[async_trait::async_trait]
721    impl LambdaBackend for CountingBackend {
722        fn name(&self) -> &str {
723            "test"
724        }
725        async fn launch(
726            &self,
727            _func: &LambdaFunction,
728            _code_zip: Option<&[u8]>,
729            _layers: &[Vec<u8>],
730            _deploy_id: &str,
731        ) -> Result<WarmInstance, RuntimeError> {
732            let n = self.launches.fetch_add(1, SeqCst);
733            let endpoint = self
734                .endpoints
735                .lock()
736                .unwrap()
737                .pop_front()
738                .unwrap_or_else(|| self.default_endpoint.clone());
739            Ok(WarmInstance {
740                endpoint,
741                handle: BackendHandle::Container {
742                    id: format!("c{n}"),
743                },
744            })
745        }
746        async fn terminate(&self, _handle: &BackendHandle) {
747            self.terminates.fetch_add(1, SeqCst);
748        }
749    }
750
751    /// Spin a minimal in-process "RIE": one TCP accept loop that records
752    /// the peak number of simultaneously-open connections, holds each
753    /// request for `delay`, then returns a tiny 200. The peak directly
754    /// observes how many invocations overlapped on the instance(s) it
755    /// backs. Returns the `host:port` to use as a warm endpoint.
756    async fn spawn_rie(delay: Duration, peak: Arc<AtomicUsize>) -> String {
757        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
758        let addr = listener.local_addr().unwrap();
759        let cur = Arc::new(AtomicUsize::new(0));
760        tokio::spawn(async move {
761            loop {
762                let Ok((mut sock, _)) = listener.accept().await else {
763                    break;
764                };
765                let cur = cur.clone();
766                let peak = peak.clone();
767                tokio::spawn(async move {
768                    use tokio::io::{AsyncReadExt, AsyncWriteExt};
769                    let now = cur.fetch_add(1, SeqCst) + 1;
770                    peak.fetch_max(now, SeqCst);
771                    let mut buf = [0u8; 1024];
772                    let _ = sock.read(&mut buf).await;
773                    tokio::time::sleep(delay).await;
774                    let _ = sock
775                        .write_all(
776                            b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: close\r\n\r\nok",
777                        )
778                        .await;
779                    let _ = sock.flush().await;
780                    cur.fetch_sub(1, SeqCst);
781                });
782            }
783        });
784        format!("{addr}")
785    }
786
787    fn runtime_with(backend: Arc<CountingBackend>, max_concurrency: usize) -> Arc<LambdaRuntime> {
788        Arc::new(LambdaRuntime {
789            backend,
790            instances: RwLock::new(HashMap::new()),
791            starting: RwLock::new(HashMap::new()),
792            max_concurrency,
793        })
794    }
795
796    fn test_func(name: &str, sha: &str) -> LambdaFunction {
797        serde_json::from_value(serde_json::json!({
798            "function_name": name,
799            "function_arn": format!("arn:aws:lambda:us-east-1:123456789012:function:{name}"),
800            "runtime": "python3.12",
801            "role": "arn:aws:iam::123456789012:role/r",
802            "handler": "index.handler",
803            "description": "",
804            "timeout": 5,
805            "memory_size": 128,
806            "code_sha256": sha,
807            "code_size": 1,
808            "version": "$LATEST",
809            "last_modified": "2020-01-01T00:00:00Z",
810            "tags": {},
811            "environment": {},
812            "architectures": ["x86_64"],
813            "package_type": "Zip",
814            "code_zip": [1, 2, 3],
815            "policy": null
816        }))
817        .expect("build test LambdaFunction")
818    }
819
820    /// The core of #1644: with one warm instance, a burst of concurrent
821    /// invocations must be serialized onto it — never delivered in
822    /// parallel (which segfaults the real RIE). Peak overlap must be 1.
823    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
824    async fn concurrent_invokes_are_serialized_on_a_single_instance() {
825        let peak = Arc::new(AtomicUsize::new(0));
826        let endpoint = spawn_rie(Duration::from_millis(40), peak.clone()).await;
827        let backend = CountingBackend::new(endpoint);
828        let rt = runtime_with(backend.clone(), 1);
829        let func = test_func("conc", "sha-A");
830
831        let mut handles = Vec::new();
832        for _ in 0..8 {
833            let rt = rt.clone();
834            let func = func.clone();
835            handles.push(tokio::spawn(
836                async move { rt.invoke(&func, b"{}", &[]).await },
837            ));
838        }
839        for h in handles {
840            h.await.unwrap().expect("invoke ok");
841        }
842
843        assert_eq!(
844            peak.load(SeqCst),
845            1,
846            "concurrent invokes overlapped on a single RIE instance"
847        );
848        assert_eq!(
849            backend.launches.load(SeqCst),
850            1,
851            "max_concurrency=1 must launch exactly one instance"
852        );
853    }
854
855    /// Under load the pool grows beyond one instance but never past the
856    /// cap, so genuine concurrency is served (AWS semantics) without an
857    /// unbounded container/Pod fan-out.
858    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
859    async fn pool_scales_under_load_and_respects_cap() {
860        let peak = Arc::new(AtomicUsize::new(0));
861        let endpoint = spawn_rie(Duration::from_millis(60), peak.clone()).await;
862        let backend = CountingBackend::new(endpoint);
863        let rt = runtime_with(backend.clone(), 4);
864        let func = test_func("scale", "sha-A");
865
866        let mut handles = Vec::new();
867        for _ in 0..8 {
868            let rt = rt.clone();
869            let func = func.clone();
870            handles.push(tokio::spawn(
871                async move { rt.invoke(&func, b"{}", &[]).await },
872            ));
873        }
874        for h in handles {
875            h.await.unwrap().expect("invoke ok");
876        }
877
878        let launched = backend.launches.load(SeqCst);
879        assert!(
880            (2..=4).contains(&launched),
881            "expected the pool to scale within the cap, launched={launched}"
882        );
883        assert!(
884            peak.load(SeqCst) > 1,
885            "expected concurrent forwards across the scaled pool"
886        );
887    }
888
889    /// An unreachable instance (dead Pod/container) must be evicted and
890    /// the invocation retried against a fresh cold start, instead of
891    /// wedging the function forever.
892    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
893    async fn dead_instance_is_evicted_and_retried() {
894        let peak = Arc::new(AtomicUsize::new(0));
895        let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
896        // First launch hands back a refused port; the retry gets the live one.
897        let backend = CountingBackend::with_queue(live, vec!["127.0.0.1:1".to_string()]);
898        let rt = runtime_with(backend.clone(), 1);
899
900        let out = rt
901            .invoke(&test_func("dead", "sha-A"), b"{}", &[])
902            .await
903            .expect("should recover via cold-start retry");
904        assert_eq!(out, b"ok");
905        assert_eq!(
906            backend.launches.load(SeqCst),
907            2,
908            "expected the dead instance plus one cold-start replacement"
909        );
910        assert!(
911            backend.terminates.load(SeqCst) >= 1,
912            "the dead instance should have been terminated on eviction"
913        );
914    }
915
916    /// Changing the function's code (a new deploy id) tears down the
917    /// stale warm instance before serving against a fresh one.
918    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
919    async fn deploy_change_evicts_stale_instance() {
920        let peak = Arc::new(AtomicUsize::new(0));
921        let endpoint = spawn_rie(Duration::from_millis(5), peak.clone()).await;
922        let backend = CountingBackend::new(endpoint);
923        let rt = runtime_with(backend.clone(), 2);
924
925        rt.invoke(&test_func("upd", "sha-A"), b"{}", &[])
926            .await
927            .unwrap();
928        rt.invoke(&test_func("upd", "sha-B"), b"{}", &[])
929            .await
930            .unwrap();
931
932        assert_eq!(
933            backend.launches.load(SeqCst),
934            2,
935            "a new deploy id should launch a fresh instance"
936        );
937        assert!(
938            backend.terminates.load(SeqCst) >= 1,
939            "the stale-deploy instance should have been torn down"
940        );
941        // Exactly one current instance remains in the pool.
942        let pool_len = rt.instances.read().get("upd").map_or(0, |v| v.len());
943        assert_eq!(pool_len, 1);
944    }
945}