use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use base64::Engine;
use parking_lot::RwLock;
use sha2::{Digest, Sha256};
use super::backend::{
BackendHandle, LambdaBackend, RuntimeError, StreamingInvocation, WarmInstance,
};
use super::docker::DockerBackend;
use crate::state::LambdaFunction;
pub(crate) struct WarmEntry {
instance: WarmInstance,
last_used: RwLock<Instant>,
deploy_id: String,
busy: Arc<tokio::sync::Mutex<()>>,
}
const DEFAULT_MAX_CONCURRENCY: usize = 10;
const MAX_INVOKE_ATTEMPTS: u32 = 5;
const REACHABILITY_PROBE_TIMEOUT: Duration = Duration::from_millis(1500);
struct Slot {
entry: Arc<WarmEntry>,
guard: tokio::sync::OwnedMutexGuard<()>,
}
fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
deploy_id_from(&func.code_sha256, layers)
}
fn deploy_id_from(code_sha256: &str, layers: &[Vec<u8>]) -> String {
let mut hasher = Sha256::new();
hasher.update(code_sha256.as_bytes());
for bytes in layers {
let mut layer_hasher = Sha256::new();
layer_hasher.update(bytes);
hasher.update(b":");
hasher.update(layer_hasher.finalize());
}
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(hasher.finalize())
}
async fn endpoint_reachable(endpoint: &str, timeout: Duration) -> bool {
matches!(
tokio::time::timeout(timeout, tokio::net::TcpStream::connect(endpoint)).await,
Ok(Ok(_))
)
}
pub struct LambdaRuntime {
backend: Arc<dyn LambdaBackend>,
instances: RwLock<HashMap<String, Vec<Arc<WarmEntry>>>>,
starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
max_concurrency: usize,
}
impl LambdaRuntime {
pub fn from_backend(backend: Arc<dyn LambdaBackend>) -> Self {
let max_concurrency = std::env::var("FAKECLOUD_LAMBDA_MAX_CONCURRENCY")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|n| *n >= 1)
.unwrap_or(DEFAULT_MAX_CONCURRENCY);
Self {
backend,
instances: RwLock::new(HashMap::new()),
starting: RwLock::new(HashMap::new()),
max_concurrency,
}
}
pub fn auto_detect_docker(server_port: u16) -> Option<Self> {
DockerBackend::auto_detect(server_port)
.map(|b| Self::from_backend(Arc::new(b) as Arc<dyn LambdaBackend>))
}
pub fn new(server_port: u16) -> Option<Self> {
Self::auto_detect_docker(server_port)
}
pub async fn new_k8s(
server_port: u16,
internal_token: String,
) -> Result<Self, super::k8s::K8sBackendError> {
let backend = super::k8s::K8sBackend::from_env(server_port, internal_token).await?;
backend.reap_stale().await;
Ok(Self::from_backend(Arc::new(backend)))
}
pub fn cli_name(&self) -> &str {
self.backend.name()
}
pub async fn prepull_for_function(
&self,
func: &LambdaFunction,
) -> Option<Result<(), super::backend::RuntimeError>> {
let image = if func.package_type == "Image" {
func.image_uri.clone()?
} else {
super::docker::runtime_to_image(&func.runtime)?
};
Some(self.backend.prepull_image(&image).await)
}
pub async fn invoke(
&self,
func: &LambdaFunction,
payload: &[u8],
layers: &[Vec<u8>],
) -> Result<Vec<u8>, RuntimeError> {
self.invoke_inner(func, payload, layers, false)
.await
.map(|(bytes, _)| bytes)
}
pub async fn invoke_with_log_tail(
&self,
func: &LambdaFunction,
payload: &[u8],
layers: &[Vec<u8>],
) -> Result<(Vec<u8>, Option<String>), RuntimeError> {
self.invoke_inner(func, payload, layers, true).await
}
async fn invoke_inner(
&self,
func: &LambdaFunction,
payload: &[u8],
layers: &[Vec<u8>],
capture_logs: bool,
) -> Result<(Vec<u8>, Option<String>), RuntimeError> {
let client = reqwest::Client::builder()
.connect_timeout(REACHABILITY_PROBE_TIMEOUT)
.build()
.unwrap_or_else(|_| reqwest::Client::new());
let mut attempt: u32 = 0;
loop {
attempt += 1;
let slot = self.acquire_slot(func, layers).await?;
if !endpoint_reachable(&slot.entry.instance.endpoint, REACHABILITY_PROBE_TIMEOUT).await
{
let entry = slot.entry.clone();
drop(slot);
self.evict_entry(&func.function_name, &entry).await;
if attempt < MAX_INVOKE_ATTEMPTS {
tracing::warn!(
function = %func.function_name,
endpoint = %entry.instance.endpoint,
"warm Lambda instance failed reachability probe; evicted, retrying with a cold start"
);
continue;
}
return Err(RuntimeError::InvocationFailed(format!(
"no reachable warm instance for {} after {attempt} attempts",
func.function_name
)));
}
let url = format!(
"http://{}/2015-03-31/functions/function/invocations",
slot.entry.instance.endpoint
);
let send = client
.post(&url)
.body(payload.to_vec())
.timeout(Duration::from_secs(func.timeout as u64 + 5))
.send()
.await;
match send {
Ok(resp) => {
let body = resp.bytes().await;
*slot.entry.last_used.write() = Instant::now();
return match body {
Ok(b) => {
let logs = if capture_logs {
self.backend
.instance_logs(&slot.entry.instance.handle)
.await
} else {
None
};
Ok((b.to_vec(), logs))
}
Err(e) => {
let entry = slot.entry.clone();
drop(slot);
self.evict_entry(&func.function_name, &entry).await;
Err(RuntimeError::InvocationFailed(e.to_string()))
}
};
}
Err(e) => {
let entry = slot.entry.clone();
drop(slot);
self.evict_entry(&func.function_name, &entry).await;
if attempt < MAX_INVOKE_ATTEMPTS && e.is_connect() {
tracing::warn!(
function = %func.function_name,
error = %e,
"warm Lambda instance unreachable; evicted, retrying with a cold start"
);
continue;
}
return Err(RuntimeError::InvocationFailed(e.to_string()));
}
}
}
}
pub async fn invoke_streaming(
&self,
func: &LambdaFunction,
payload: &[u8],
layers: &[Vec<u8>],
) -> Result<StreamingInvocation, RuntimeError> {
let client = reqwest::Client::builder()
.connect_timeout(REACHABILITY_PROBE_TIMEOUT)
.build()
.unwrap_or_else(|_| reqwest::Client::new());
let mut attempt: u32 = 0;
loop {
attempt += 1;
let slot = self.acquire_slot(func, layers).await?;
if !endpoint_reachable(&slot.entry.instance.endpoint, REACHABILITY_PROBE_TIMEOUT).await
{
let entry = slot.entry.clone();
drop(slot);
self.evict_entry(&func.function_name, &entry).await;
if attempt < MAX_INVOKE_ATTEMPTS {
continue;
}
return Err(RuntimeError::InvocationFailed(format!(
"no reachable warm instance for {} after {attempt} attempts",
func.function_name
)));
}
let url = format!(
"http://{}/2015-03-31/functions/function/invocations",
slot.entry.instance.endpoint
);
let send = client
.post(&url)
.body(payload.to_vec())
.timeout(Duration::from_secs(func.timeout as u64 + 5))
.send()
.await;
match send {
Ok(resp) => {
*slot.entry.last_used.write() = Instant::now();
let Slot {
entry: _entry,
guard,
} = slot;
return Ok(StreamingInvocation {
resp,
_slot_guard: Some(guard),
});
}
Err(e) => {
let entry = slot.entry.clone();
drop(slot);
self.evict_entry(&func.function_name, &entry).await;
if attempt < MAX_INVOKE_ATTEMPTS && e.is_connect() {
continue;
}
return Err(RuntimeError::InvocationFailed(e.to_string()));
}
}
}
}
async fn acquire_slot(
&self,
func: &LambdaFunction,
layers: &[Vec<u8>],
) -> Result<Slot, RuntimeError> {
let is_image = func.package_type == "Image";
if !is_image && func.code_zip.is_none() {
return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
}
let deploy_id = deploy_id_for(func, layers);
if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
return Ok(slot);
}
let startup_lock = {
let mut starting = self.starting.write();
starting
.entry(func.function_name.clone())
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
.clone()
};
let startup_guard = startup_lock.lock().await;
if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
return Ok(slot);
}
self.evict_stale_deploy(&func.function_name, &deploy_id)
.await;
let pool_len = self
.instances
.read()
.get(&func.function_name)
.map_or(0, |v| v.len());
if pool_len < self.max_concurrency {
let instance = self
.backend
.launch(func, func.code_zip.as_deref(), layers, &deploy_id)
.await?;
let entry = Arc::new(WarmEntry {
instance,
last_used: RwLock::new(Instant::now()),
deploy_id,
busy: Arc::new(tokio::sync::Mutex::new(())),
});
let guard = entry
.busy
.clone()
.try_lock_owned()
.expect("freshly created busy lock is uncontended");
self.instances
.write()
.entry(func.function_name.clone())
.or_default()
.push(entry.clone());
return Ok(Slot { entry, guard });
}
drop(startup_guard);
let candidates: Vec<Arc<WarmEntry>> = {
let map = self.instances.read();
map.get(&func.function_name)
.map(|pool| {
pool.iter()
.filter(|e| e.deploy_id == deploy_id)
.cloned()
.collect()
})
.unwrap_or_default()
};
if candidates.is_empty() {
return Err(RuntimeError::InvocationFailed(format!(
"no warm instance available for {}",
func.function_name
)));
}
let waiters = candidates.into_iter().map(|entry| {
Box::pin(async move {
let guard = entry.busy.clone().lock_owned().await;
Slot { entry, guard }
})
});
let (slot, _idx, _rest) = futures_util::future::select_all(waiters).await;
*slot.entry.last_used.write() = Instant::now();
Ok(slot)
}
fn try_take_free(&self, function_name: &str, deploy_id: &str) -> Option<Slot> {
let map = self.instances.read();
let pool = map.get(function_name)?;
for entry in pool {
if entry.deploy_id != deploy_id {
continue;
}
if let Ok(guard) = entry.busy.clone().try_lock_owned() {
*entry.last_used.write() = Instant::now();
return Some(Slot {
entry: entry.clone(),
guard,
});
}
}
None
}
async fn evict_entry(&self, function_name: &str, target: &Arc<WarmEntry>) {
let removed = {
let mut map = self.instances.write();
match map.get_mut(function_name) {
Some(pool) => {
let removed = pool
.iter()
.position(|e| Arc::ptr_eq(e, target))
.map(|pos| pool.remove(pos));
if pool.is_empty() {
map.remove(function_name);
}
removed
}
None => None,
}
};
if let Some(entry) = removed {
tracing::info!(
function = %function_name,
handle = ?entry.instance.handle,
"evicting unreachable Lambda runtime instance"
);
self.backend.terminate(&entry.instance.handle).await;
}
}
async fn evict_stale_deploy(&self, function_name: &str, deploy_id: &str) {
let stale: Vec<Arc<WarmEntry>> = {
let mut map = self.instances.write();
match map.get_mut(function_name) {
Some(pool) => {
let mut stale = Vec::new();
pool.retain(|e| {
if e.deploy_id == deploy_id {
true
} else {
stale.push(e.clone());
false
}
});
if pool.is_empty() {
map.remove(function_name);
}
stale
}
None => Vec::new(),
}
};
for entry in stale {
tracing::info!(
function = %function_name,
handle = ?entry.instance.handle,
"stopping stale-deploy Lambda runtime instance"
);
self.backend.terminate(&entry.instance.handle).await;
}
}
pub(crate) fn take_warm_instances(&self, function_name: &str) -> Vec<Arc<WarmEntry>> {
self.instances
.write()
.remove(function_name)
.unwrap_or_default()
}
pub(crate) async fn terminate_instances(&self, pool: Vec<Arc<WarmEntry>>) {
for entry in pool {
tracing::info!(
handle = ?entry.instance.handle,
"stopping Lambda runtime instance"
);
self.backend.terminate(&entry.instance.handle).await;
}
}
pub async fn stop_container(&self, function_name: &str) {
let pool = self.take_warm_instances(function_name);
self.terminate_instances(pool).await;
}
pub async fn stop_all(&self) {
let pools: Vec<(String, Vec<Arc<WarmEntry>>)> =
{ self.instances.write().drain().collect() };
for (name, pool) in pools {
for entry in pool {
tracing::info!(
function = %name,
handle = ?entry.instance.handle,
"stopping Lambda runtime instance (cleanup)"
);
self.backend.terminate(&entry.instance.handle).await;
}
}
}
pub fn list_warm_containers(
&self,
lambda_state: &crate::state::SharedLambdaState,
) -> Vec<serde_json::Value> {
let entries = self.instances.read();
let accounts = lambda_state.read();
let mut rows = Vec::new();
for (name, pool) in entries.iter() {
let runtime = accounts
.iter()
.find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
.unwrap_or_default();
for entry in pool {
let idle_secs = entry.last_used.read().elapsed().as_secs();
let mut row = serde_json::json!({
"functionName": name,
"runtime": runtime,
"backend": self.backend.name(),
"lastUsedSecsAgo": idle_secs,
});
let obj = row.as_object_mut().expect("json object");
match &entry.instance.handle {
BackendHandle::Container { id } => {
obj.insert("containerId".into(), serde_json::Value::String(id.clone()));
}
BackendHandle::Pod { namespace, name } => {
obj.insert("podName".into(), serde_json::Value::String(name.clone()));
obj.insert(
"namespace".into(),
serde_json::Value::String(namespace.clone()),
);
}
}
rows.push(row);
}
}
rows
}
pub async fn evict_container(&self, function_name: &str) -> bool {
let pool = self
.instances
.write()
.remove(function_name)
.unwrap_or_default();
let found = !pool.is_empty();
for entry in pool {
tracing::info!(
function = %function_name,
handle = ?entry.instance.handle,
"evicting Lambda runtime instance via simulation API"
);
self.backend.terminate(&entry.instance.handle).await;
}
found
}
pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
self.cleanup_idle(ttl).await;
}
}
async fn cleanup_idle(&self, ttl: Duration) {
let expired: Vec<(String, Arc<WarmEntry>)> = {
let mut map = self.instances.write();
let mut out = Vec::new();
for (name, pool) in map.iter_mut() {
let mut i = 0;
while i < pool.len() {
let idle = pool[i].last_used.read().elapsed() > ttl;
let free = pool[i].busy.try_lock().is_ok();
if idle && free {
out.push((name.clone(), pool.remove(i)));
} else {
i += 1;
}
}
}
map.retain(|_, pool| !pool.is_empty());
out
};
for (name, entry) in expired {
tracing::info!(function = %name, "stopping idle Lambda runtime instance");
self.backend.terminate(&entry.instance.handle).await;
}
}
}
#[cfg(test)]
mod tests {
use super::deploy_id_from;
#[test]
fn deploy_id_is_url_path_safe() {
for i in 0..2_000u32 {
let code_sha256 = format!("sha256-seed-{i}-{}", i.wrapping_mul(2_654_435_761));
let layers: Vec<Vec<u8>> = if i % 3 == 0 {
vec![format!("layer-{i}").into_bytes()]
} else {
vec![]
};
let id = deploy_id_from(&code_sha256, &layers);
assert!(
!id.contains('/') && !id.contains('+') && !id.contains('='),
"deploy id {id:?} (seed {i}) is not URL-path-safe"
);
}
}
#[test]
fn deploy_id_is_stable() {
let layers = vec![b"layer-a".to_vec(), b"layer-b".to_vec()];
let a = deploy_id_from("abc123", &layers);
let b = deploy_id_from("abc123", &layers);
assert_eq!(a, b);
assert_ne!(a, deploy_id_from("abc124", &layers));
assert_ne!(a, deploy_id_from("abc123", &[]));
}
use super::LambdaRuntime;
use crate::runtime::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
use crate::state::LambdaFunction;
use parking_lot::RwLock;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::time::Duration;
struct CountingBackend {
endpoints: StdMutex<VecDeque<String>>,
default_endpoint: String,
launches: AtomicUsize,
terminates: AtomicUsize,
}
impl CountingBackend {
fn new(default_endpoint: impl Into<String>) -> Arc<Self> {
Arc::new(Self {
endpoints: StdMutex::new(VecDeque::new()),
default_endpoint: default_endpoint.into(),
launches: AtomicUsize::new(0),
terminates: AtomicUsize::new(0),
})
}
fn with_queue(default_endpoint: impl Into<String>, queue: Vec<String>) -> Arc<Self> {
Arc::new(Self {
endpoints: StdMutex::new(queue.into()),
default_endpoint: default_endpoint.into(),
launches: AtomicUsize::new(0),
terminates: AtomicUsize::new(0),
})
}
}
#[async_trait::async_trait]
impl LambdaBackend for CountingBackend {
fn name(&self) -> &str {
"test"
}
async fn launch(
&self,
_func: &LambdaFunction,
_code_zip: Option<&[u8]>,
_layers: &[Vec<u8>],
_deploy_id: &str,
) -> Result<WarmInstance, RuntimeError> {
let n = self.launches.fetch_add(1, SeqCst);
let endpoint = self
.endpoints
.lock()
.unwrap()
.pop_front()
.unwrap_or_else(|| self.default_endpoint.clone());
Ok(WarmInstance {
endpoint,
handle: BackendHandle::Container {
id: format!("c{n}"),
},
})
}
async fn terminate(&self, _handle: &BackendHandle) {
self.terminates.fetch_add(1, SeqCst);
}
}
async fn spawn_rie(delay: Duration, peak: Arc<AtomicUsize>) -> String {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let cur = Arc::new(AtomicUsize::new(0));
tokio::spawn(async move {
loop {
let Ok((mut sock, _)) = listener.accept().await else {
break;
};
let cur = cur.clone();
let peak = peak.clone();
tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut buf = [0u8; 1024];
let n = sock.read(&mut buf).await.unwrap_or(0);
if n == 0 {
return;
}
let now = cur.fetch_add(1, SeqCst) + 1;
peak.fetch_max(now, SeqCst);
tokio::time::sleep(delay).await;
let _ = sock
.write_all(
b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: close\r\n\r\nok",
)
.await;
let _ = sock.flush().await;
cur.fetch_sub(1, SeqCst);
});
}
});
format!("{addr}")
}
fn runtime_with(backend: Arc<CountingBackend>, max_concurrency: usize) -> Arc<LambdaRuntime> {
Arc::new(LambdaRuntime {
backend,
instances: RwLock::new(HashMap::new()),
starting: RwLock::new(HashMap::new()),
max_concurrency,
})
}
fn test_func(name: &str, sha: &str) -> LambdaFunction {
serde_json::from_value(serde_json::json!({
"function_name": name,
"function_arn": format!("arn:aws:lambda:us-east-1:123456789012:function:{name}"),
"runtime": "python3.12",
"role": "arn:aws:iam::123456789012:role/r",
"handler": "index.handler",
"description": "",
"timeout": 5,
"memory_size": 128,
"code_sha256": sha,
"code_size": 1,
"version": "$LATEST",
"last_modified": "2020-01-01T00:00:00Z",
"tags": {},
"environment": {},
"architectures": ["x86_64"],
"package_type": "Zip",
"code_zip": [1, 2, 3],
"policy": null
}))
.expect("build test LambdaFunction")
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_invokes_are_serialized_on_a_single_instance() {
let peak = Arc::new(AtomicUsize::new(0));
let endpoint = spawn_rie(Duration::from_millis(40), peak.clone()).await;
let backend = CountingBackend::new(endpoint);
let rt = runtime_with(backend.clone(), 1);
let func = test_func("conc", "sha-A");
let mut handles = Vec::new();
for _ in 0..8 {
let rt = rt.clone();
let func = func.clone();
handles.push(tokio::spawn(
async move { rt.invoke(&func, b"{}", &[]).await },
));
}
for h in handles {
h.await.unwrap().expect("invoke ok");
}
assert_eq!(
peak.load(SeqCst),
1,
"concurrent invokes overlapped on a single RIE instance"
);
assert_eq!(
backend.launches.load(SeqCst),
1,
"max_concurrency=1 must launch exactly one instance"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn pool_scales_under_load_and_respects_cap() {
let peak = Arc::new(AtomicUsize::new(0));
let endpoint = spawn_rie(Duration::from_millis(60), peak.clone()).await;
let backend = CountingBackend::new(endpoint);
let rt = runtime_with(backend.clone(), 4);
let func = test_func("scale", "sha-A");
let mut handles = Vec::new();
for _ in 0..8 {
let rt = rt.clone();
let func = func.clone();
handles.push(tokio::spawn(
async move { rt.invoke(&func, b"{}", &[]).await },
));
}
for h in handles {
h.await.unwrap().expect("invoke ok");
}
let launched = backend.launches.load(SeqCst);
assert!(
(2..=4).contains(&launched),
"expected the pool to scale within the cap, launched={launched}"
);
assert!(
peak.load(SeqCst) > 1,
"expected concurrent forwards across the scaled pool"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn dead_instance_is_evicted_and_retried() {
let peak = Arc::new(AtomicUsize::new(0));
let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
let backend = CountingBackend::with_queue(
live,
vec!["127.0.0.1:1".to_string(), "127.0.0.1:1".to_string()],
);
let rt = runtime_with(backend.clone(), 1);
let out = rt
.invoke(&test_func("dead", "sha-A"), b"{}", &[])
.await
.expect("should recover via cold-start retry");
assert_eq!(out, b"ok");
assert_eq!(
backend.launches.load(SeqCst),
3,
"expected two dead instances plus one cold-start replacement"
);
assert!(
backend.terminates.load(SeqCst) >= 2,
"both dead instances should have been terminated on eviction"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn black_holed_instance_fails_over_fast() {
let peak = Arc::new(AtomicUsize::new(0));
let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
let backend = CountingBackend::with_queue(live, vec!["192.0.2.1:9".to_string()]);
let rt = runtime_with(backend.clone(), 1);
let func = test_func("blackhole", "sha-A");
let started = std::time::Instant::now();
let out = rt
.invoke(&func, b"{}", &[])
.await
.expect("should recover via cold-start retry");
let elapsed = started.elapsed();
assert_eq!(out, b"ok");
assert_eq!(
backend.launches.load(SeqCst),
2,
"expected one black-holed instance plus one cold-start replacement"
);
assert!(
backend.terminates.load(SeqCst) >= 1,
"the black-holed instance should have been evicted"
);
assert!(
elapsed < Duration::from_secs(5),
"failover took {elapsed:?}; must be far below the ~10s invoke timeout"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn deploy_change_evicts_stale_instance() {
let peak = Arc::new(AtomicUsize::new(0));
let endpoint = spawn_rie(Duration::from_millis(5), peak.clone()).await;
let backend = CountingBackend::new(endpoint);
let rt = runtime_with(backend.clone(), 2);
rt.invoke(&test_func("upd", "sha-A"), b"{}", &[])
.await
.unwrap();
rt.invoke(&test_func("upd", "sha-B"), b"{}", &[])
.await
.unwrap();
assert_eq!(
backend.launches.load(SeqCst),
2,
"a new deploy id should launch a fresh instance"
);
assert!(
backend.terminates.load(SeqCst) >= 1,
"the stale-deploy instance should have been torn down"
);
let pool_len = rt.instances.read().get("upd").map_or(0, |v| v.len());
assert_eq!(pool_len, 1);
}
#[tokio::test]
async fn take_warm_instances_snapshot_does_not_reap_recreated_pool() {
let backend = CountingBackend::new("127.0.0.1:1");
let rt = runtime_with(backend.clone(), 10);
let mk = |id: &str| {
Arc::new(super::WarmEntry {
instance: WarmInstance {
endpoint: "127.0.0.1:1".to_string(),
handle: BackendHandle::Container { id: id.to_string() },
},
last_used: RwLock::new(std::time::Instant::now()),
deploy_id: "d".to_string(),
busy: Arc::new(tokio::sync::Mutex::new(())),
})
};
rt.instances
.write()
.insert("f".to_string(), vec![mk("old")]);
let snapshot = rt.take_warm_instances("f");
assert_eq!(snapshot.len(), 1);
assert!(rt.instances.read().get("f").is_none());
rt.instances
.write()
.insert("f".to_string(), vec![mk("new")]);
rt.terminate_instances(snapshot).await;
assert_eq!(
backend.terminates.load(SeqCst),
1,
"only the snapshotted instance is terminated"
);
let pool = rt.instances.read();
let f = pool.get("f").expect("recreated function pool must survive");
assert_eq!(f.len(), 1);
}
}