1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use base64::Engine;
12use parking_lot::RwLock;
13use sha2::{Digest, Sha256};
14
15use super::backend::{
16 BackendHandle, LambdaBackend, RuntimeError, StreamingInvocation, WarmInstance,
17};
18use super::docker::DockerBackend;
19use crate::state::LambdaFunction;
20
21struct WarmEntry {
23 instance: WarmInstance,
24 last_used: RwLock<Instant>,
25 deploy_id: String,
30 busy: Arc<tokio::sync::Mutex<()>>,
38}
39
40const DEFAULT_MAX_CONCURRENCY: usize = 10;
46
47struct Slot {
50 entry: Arc<WarmEntry>,
51 guard: tokio::sync::OwnedMutexGuard<()>,
52}
53
54fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
65 deploy_id_from(&func.code_sha256, layers)
66}
67
68fn deploy_id_from(code_sha256: &str, layers: &[Vec<u8>]) -> String {
71 let mut hasher = Sha256::new();
72 hasher.update(code_sha256.as_bytes());
73 for bytes in layers {
74 let mut layer_hasher = Sha256::new();
75 layer_hasher.update(bytes);
76 hasher.update(b":");
77 hasher.update(layer_hasher.finalize());
78 }
79 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(hasher.finalize())
80}
81
82pub struct LambdaRuntime {
83 backend: Arc<dyn LambdaBackend>,
84 instances: RwLock<HashMap<String, Vec<Arc<WarmEntry>>>>,
88 starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
92 max_concurrency: usize,
94}
95
96impl LambdaRuntime {
97 pub fn from_backend(backend: Arc<dyn LambdaBackend>) -> Self {
101 let max_concurrency = std::env::var("FAKECLOUD_LAMBDA_MAX_CONCURRENCY")
102 .ok()
103 .and_then(|v| v.parse::<usize>().ok())
104 .filter(|n| *n >= 1)
105 .unwrap_or(DEFAULT_MAX_CONCURRENCY);
106 Self {
107 backend,
108 instances: RwLock::new(HashMap::new()),
109 starting: RwLock::new(HashMap::new()),
110 max_concurrency,
111 }
112 }
113
114 pub fn auto_detect_docker(server_port: u16) -> Option<Self> {
117 DockerBackend::auto_detect(server_port)
118 .map(|b| Self::from_backend(Arc::new(b) as Arc<dyn LambdaBackend>))
119 }
120
121 pub fn new(server_port: u16) -> Option<Self> {
124 Self::auto_detect_docker(server_port)
125 }
126
127 pub async fn new_k8s(
139 server_port: u16,
140 internal_token: String,
141 ) -> Result<Self, super::k8s::K8sBackendError> {
142 let backend = super::k8s::K8sBackend::from_env(server_port, internal_token).await?;
143 backend.reap_stale().await;
144 Ok(Self::from_backend(Arc::new(backend)))
145 }
146
147 pub fn cli_name(&self) -> &str {
148 self.backend.name()
149 }
150
151 pub async fn prepull_for_function(
165 &self,
166 func: &LambdaFunction,
167 ) -> Option<Result<(), super::backend::RuntimeError>> {
168 let image = if func.package_type == "Image" {
169 func.image_uri.clone()?
170 } else {
171 super::docker::runtime_to_image(&func.runtime)?
172 };
173 Some(self.backend.prepull_image(&image).await)
174 }
175
176 pub async fn invoke(
188 &self,
189 func: &LambdaFunction,
190 payload: &[u8],
191 layers: &[Vec<u8>],
192 ) -> Result<Vec<u8>, RuntimeError> {
193 let client = reqwest::Client::new();
194 let mut attempt = 0;
195 loop {
196 attempt += 1;
197 let slot = self.acquire_slot(func, layers).await?;
198 let url = format!(
199 "http://{}/2015-03-31/functions/function/invocations",
200 slot.entry.instance.endpoint
201 );
202 let send = client
203 .post(&url)
204 .body(payload.to_vec())
205 .timeout(Duration::from_secs(func.timeout as u64 + 5))
206 .send()
207 .await;
208 match send {
209 Ok(resp) => {
210 let body = resp.bytes().await;
211 *slot.entry.last_used.write() = Instant::now();
212 return match body {
213 Ok(b) => Ok(b.to_vec()),
214 Err(e) => {
215 let entry = slot.entry.clone();
219 drop(slot);
220 self.evict_entry(&func.function_name, &entry).await;
221 Err(RuntimeError::InvocationFailed(e.to_string()))
222 }
223 };
224 }
225 Err(e) => {
226 let entry = slot.entry.clone();
235 drop(slot);
236 self.evict_entry(&func.function_name, &entry).await;
237 if attempt < 2 && e.is_connect() {
238 tracing::warn!(
239 function = %func.function_name,
240 error = %e,
241 "warm Lambda instance unreachable; evicted, retrying with a cold start"
242 );
243 continue;
244 }
245 return Err(RuntimeError::InvocationFailed(e.to_string()));
246 }
247 }
248 }
249 }
250
251 pub async fn invoke_streaming(
262 &self,
263 func: &LambdaFunction,
264 payload: &[u8],
265 layers: &[Vec<u8>],
266 ) -> Result<StreamingInvocation, RuntimeError> {
267 let client = reqwest::Client::new();
268 let mut attempt = 0;
269 loop {
270 attempt += 1;
271 let slot = self.acquire_slot(func, layers).await?;
272 let url = format!(
273 "http://{}/2015-03-31/functions/function/invocations",
274 slot.entry.instance.endpoint
275 );
276 let send = client
277 .post(&url)
278 .body(payload.to_vec())
279 .timeout(Duration::from_secs(func.timeout as u64 + 5))
280 .send()
281 .await;
282 match send {
283 Ok(resp) => {
284 *slot.entry.last_used.write() = Instant::now();
285 let Slot {
286 entry: _entry,
287 guard,
288 } = slot;
289 return Ok(StreamingInvocation {
290 resp,
291 _slot_guard: Some(guard),
292 });
293 }
294 Err(e) => {
295 let entry = slot.entry.clone();
299 drop(slot);
300 self.evict_entry(&func.function_name, &entry).await;
301 if attempt < 2 && e.is_connect() {
302 continue;
303 }
304 return Err(RuntimeError::InvocationFailed(e.to_string()));
305 }
306 }
307 }
308 }
309
310 async fn acquire_slot(
320 &self,
321 func: &LambdaFunction,
322 layers: &[Vec<u8>],
323 ) -> Result<Slot, RuntimeError> {
324 let is_image = func.package_type == "Image";
325 if !is_image && func.code_zip.is_none() {
326 return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
327 }
328
329 let deploy_id = deploy_id_for(func, layers);
330
331 if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
333 return Ok(slot);
334 }
335
336 let startup_lock = {
339 let mut starting = self.starting.write();
340 starting
341 .entry(func.function_name.clone())
342 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
343 .clone()
344 };
345 let startup_guard = startup_lock.lock().await;
346
347 if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
350 return Ok(slot);
351 }
352
353 self.evict_stale_deploy(&func.function_name, &deploy_id)
355 .await;
356
357 let pool_len = self
358 .instances
359 .read()
360 .get(&func.function_name)
361 .map_or(0, |v| v.len());
362
363 if pool_len < self.max_concurrency {
365 let instance = self
366 .backend
367 .launch(func, func.code_zip.as_deref(), layers, &deploy_id)
368 .await?;
369 let entry = Arc::new(WarmEntry {
370 instance,
371 last_used: RwLock::new(Instant::now()),
372 deploy_id,
373 busy: Arc::new(tokio::sync::Mutex::new(())),
374 });
375 let guard = entry
376 .busy
377 .clone()
378 .try_lock_owned()
379 .expect("freshly created busy lock is uncontended");
380 self.instances
381 .write()
382 .entry(func.function_name.clone())
383 .or_default()
384 .push(entry.clone());
385 return Ok(Slot { entry, guard });
386 }
387
388 drop(startup_guard);
393 let candidates: Vec<Arc<WarmEntry>> = {
394 let map = self.instances.read();
395 map.get(&func.function_name)
396 .map(|pool| {
397 pool.iter()
398 .filter(|e| e.deploy_id == deploy_id)
399 .cloned()
400 .collect()
401 })
402 .unwrap_or_default()
403 };
404 if candidates.is_empty() {
405 return Err(RuntimeError::InvocationFailed(format!(
406 "no warm instance available for {}",
407 func.function_name
408 )));
409 }
410 let waiters = candidates.into_iter().map(|entry| {
411 Box::pin(async move {
412 let guard = entry.busy.clone().lock_owned().await;
413 Slot { entry, guard }
414 })
415 });
416 let (slot, _idx, _rest) = futures_util::future::select_all(waiters).await;
417 *slot.entry.last_used.write() = Instant::now();
418 Ok(slot)
419 }
420
421 fn try_take_free(&self, function_name: &str, deploy_id: &str) -> Option<Slot> {
425 let map = self.instances.read();
426 let pool = map.get(function_name)?;
427 for entry in pool {
428 if entry.deploy_id != deploy_id {
429 continue;
430 }
431 if let Ok(guard) = entry.busy.clone().try_lock_owned() {
432 *entry.last_used.write() = Instant::now();
433 return Some(Slot {
434 entry: entry.clone(),
435 guard,
436 });
437 }
438 }
439 None
440 }
441
442 async fn evict_entry(&self, function_name: &str, target: &Arc<WarmEntry>) {
445 let removed = {
446 let mut map = self.instances.write();
447 match map.get_mut(function_name) {
448 Some(pool) => {
449 let removed = pool
450 .iter()
451 .position(|e| Arc::ptr_eq(e, target))
452 .map(|pos| pool.remove(pos));
453 if pool.is_empty() {
454 map.remove(function_name);
455 }
456 removed
457 }
458 None => None,
459 }
460 };
461 if let Some(entry) = removed {
462 tracing::info!(
463 function = %function_name,
464 handle = ?entry.instance.handle,
465 "evicting unreachable Lambda runtime instance"
466 );
467 self.backend.terminate(&entry.instance.handle).await;
468 }
469 }
470
471 async fn evict_stale_deploy(&self, function_name: &str, deploy_id: &str) {
474 let stale: Vec<Arc<WarmEntry>> = {
475 let mut map = self.instances.write();
476 match map.get_mut(function_name) {
477 Some(pool) => {
478 let mut stale = Vec::new();
479 pool.retain(|e| {
480 if e.deploy_id == deploy_id {
481 true
482 } else {
483 stale.push(e.clone());
484 false
485 }
486 });
487 if pool.is_empty() {
488 map.remove(function_name);
489 }
490 stale
491 }
492 None => Vec::new(),
493 }
494 };
495 for entry in stale {
496 tracing::info!(
497 function = %function_name,
498 handle = ?entry.instance.handle,
499 "stopping stale-deploy Lambda runtime instance"
500 );
501 self.backend.terminate(&entry.instance.handle).await;
502 }
503 }
504
505 pub async fn stop_container(&self, function_name: &str) {
507 let pool = self
508 .instances
509 .write()
510 .remove(function_name)
511 .unwrap_or_default();
512 for entry in pool {
513 tracing::info!(
514 function = %function_name,
515 handle = ?entry.instance.handle,
516 "stopping Lambda runtime instance"
517 );
518 self.backend.terminate(&entry.instance.handle).await;
519 }
520 }
521
522 pub async fn stop_all(&self) {
524 let pools: Vec<(String, Vec<Arc<WarmEntry>>)> =
525 { self.instances.write().drain().collect() };
526 for (name, pool) in pools {
527 for entry in pool {
528 tracing::info!(
529 function = %name,
530 handle = ?entry.instance.handle,
531 "stopping Lambda runtime instance (cleanup)"
532 );
533 self.backend.terminate(&entry.instance.handle).await;
534 }
535 }
536 }
537
538 pub fn list_warm_containers(
542 &self,
543 lambda_state: &crate::state::SharedLambdaState,
544 ) -> Vec<serde_json::Value> {
545 let entries = self.instances.read();
546 let accounts = lambda_state.read();
547 let mut rows = Vec::new();
548 for (name, pool) in entries.iter() {
549 let runtime = accounts
550 .iter()
551 .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
552 .unwrap_or_default();
553 for entry in pool {
554 let idle_secs = entry.last_used.read().elapsed().as_secs();
555 let mut row = serde_json::json!({
556 "functionName": name,
557 "runtime": runtime,
558 "backend": self.backend.name(),
559 "lastUsedSecsAgo": idle_secs,
560 });
561 let obj = row.as_object_mut().expect("json object");
562 match &entry.instance.handle {
563 BackendHandle::Container { id } => {
564 obj.insert("containerId".into(), serde_json::Value::String(id.clone()));
565 }
566 BackendHandle::Pod { namespace, name } => {
567 obj.insert("podName".into(), serde_json::Value::String(name.clone()));
568 obj.insert(
569 "namespace".into(),
570 serde_json::Value::String(namespace.clone()),
571 );
572 }
573 }
574 rows.push(row);
575 }
576 }
577 rows
578 }
579
580 pub async fn evict_container(&self, function_name: &str) -> bool {
583 let pool = self
584 .instances
585 .write()
586 .remove(function_name)
587 .unwrap_or_default();
588 let found = !pool.is_empty();
589 for entry in pool {
590 tracing::info!(
591 function = %function_name,
592 handle = ?entry.instance.handle,
593 "evicting Lambda runtime instance via simulation API"
594 );
595 self.backend.terminate(&entry.instance.handle).await;
596 }
597 found
598 }
599
600 pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
602 let mut interval = tokio::time::interval(Duration::from_secs(30));
603 loop {
604 interval.tick().await;
605 self.cleanup_idle(ttl).await;
606 }
607 }
608
609 async fn cleanup_idle(&self, ttl: Duration) {
610 let expired: Vec<(String, Arc<WarmEntry>)> = {
615 let mut map = self.instances.write();
616 let mut out = Vec::new();
617 for (name, pool) in map.iter_mut() {
618 let mut i = 0;
619 while i < pool.len() {
620 let idle = pool[i].last_used.read().elapsed() > ttl;
621 let free = pool[i].busy.try_lock().is_ok();
622 if idle && free {
623 out.push((name.clone(), pool.remove(i)));
624 } else {
625 i += 1;
626 }
627 }
628 }
629 map.retain(|_, pool| !pool.is_empty());
630 out
631 };
632 for (name, entry) in expired {
633 tracing::info!(function = %name, "stopping idle Lambda runtime instance");
634 self.backend.terminate(&entry.instance.handle).await;
635 }
636 }
637}
638
639#[cfg(test)]
640mod tests {
641 use super::deploy_id_from;
642
643 #[test]
650 fn deploy_id_is_url_path_safe() {
651 for i in 0..2_000u32 {
652 let code_sha256 = format!("sha256-seed-{i}-{}", i.wrapping_mul(2_654_435_761));
654 let layers: Vec<Vec<u8>> = if i % 3 == 0 {
655 vec![format!("layer-{i}").into_bytes()]
656 } else {
657 vec![]
658 };
659 let id = deploy_id_from(&code_sha256, &layers);
660 assert!(
661 !id.contains('/') && !id.contains('+') && !id.contains('='),
662 "deploy id {id:?} (seed {i}) is not URL-path-safe"
663 );
664 }
665 }
666
667 #[test]
670 fn deploy_id_is_stable() {
671 let layers = vec![b"layer-a".to_vec(), b"layer-b".to_vec()];
672 let a = deploy_id_from("abc123", &layers);
673 let b = deploy_id_from("abc123", &layers);
674 assert_eq!(a, b);
675 assert_ne!(a, deploy_id_from("abc124", &layers));
676 assert_ne!(a, deploy_id_from("abc123", &[]));
677 }
678
679 use super::LambdaRuntime;
682 use crate::runtime::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
683 use crate::state::LambdaFunction;
684 use parking_lot::RwLock;
685 use std::collections::{HashMap, VecDeque};
686 use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
687 use std::sync::Arc;
688 use std::sync::Mutex as StdMutex;
689 use std::time::Duration;
690
691 struct CountingBackend {
695 endpoints: StdMutex<VecDeque<String>>,
696 default_endpoint: String,
697 launches: AtomicUsize,
698 terminates: AtomicUsize,
699 }
700
701 impl CountingBackend {
702 fn new(default_endpoint: impl Into<String>) -> Arc<Self> {
703 Arc::new(Self {
704 endpoints: StdMutex::new(VecDeque::new()),
705 default_endpoint: default_endpoint.into(),
706 launches: AtomicUsize::new(0),
707 terminates: AtomicUsize::new(0),
708 })
709 }
710 fn with_queue(default_endpoint: impl Into<String>, queue: Vec<String>) -> Arc<Self> {
711 Arc::new(Self {
712 endpoints: StdMutex::new(queue.into()),
713 default_endpoint: default_endpoint.into(),
714 launches: AtomicUsize::new(0),
715 terminates: AtomicUsize::new(0),
716 })
717 }
718 }
719
720 #[async_trait::async_trait]
721 impl LambdaBackend for CountingBackend {
722 fn name(&self) -> &str {
723 "test"
724 }
725 async fn launch(
726 &self,
727 _func: &LambdaFunction,
728 _code_zip: Option<&[u8]>,
729 _layers: &[Vec<u8>],
730 _deploy_id: &str,
731 ) -> Result<WarmInstance, RuntimeError> {
732 let n = self.launches.fetch_add(1, SeqCst);
733 let endpoint = self
734 .endpoints
735 .lock()
736 .unwrap()
737 .pop_front()
738 .unwrap_or_else(|| self.default_endpoint.clone());
739 Ok(WarmInstance {
740 endpoint,
741 handle: BackendHandle::Container {
742 id: format!("c{n}"),
743 },
744 })
745 }
746 async fn terminate(&self, _handle: &BackendHandle) {
747 self.terminates.fetch_add(1, SeqCst);
748 }
749 }
750
751 async fn spawn_rie(delay: Duration, peak: Arc<AtomicUsize>) -> String {
757 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
758 let addr = listener.local_addr().unwrap();
759 let cur = Arc::new(AtomicUsize::new(0));
760 tokio::spawn(async move {
761 loop {
762 let Ok((mut sock, _)) = listener.accept().await else {
763 break;
764 };
765 let cur = cur.clone();
766 let peak = peak.clone();
767 tokio::spawn(async move {
768 use tokio::io::{AsyncReadExt, AsyncWriteExt};
769 let now = cur.fetch_add(1, SeqCst) + 1;
770 peak.fetch_max(now, SeqCst);
771 let mut buf = [0u8; 1024];
772 let _ = sock.read(&mut buf).await;
773 tokio::time::sleep(delay).await;
774 let _ = sock
775 .write_all(
776 b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: close\r\n\r\nok",
777 )
778 .await;
779 let _ = sock.flush().await;
780 cur.fetch_sub(1, SeqCst);
781 });
782 }
783 });
784 format!("{addr}")
785 }
786
787 fn runtime_with(backend: Arc<CountingBackend>, max_concurrency: usize) -> Arc<LambdaRuntime> {
788 Arc::new(LambdaRuntime {
789 backend,
790 instances: RwLock::new(HashMap::new()),
791 starting: RwLock::new(HashMap::new()),
792 max_concurrency,
793 })
794 }
795
796 fn test_func(name: &str, sha: &str) -> LambdaFunction {
797 serde_json::from_value(serde_json::json!({
798 "function_name": name,
799 "function_arn": format!("arn:aws:lambda:us-east-1:123456789012:function:{name}"),
800 "runtime": "python3.12",
801 "role": "arn:aws:iam::123456789012:role/r",
802 "handler": "index.handler",
803 "description": "",
804 "timeout": 5,
805 "memory_size": 128,
806 "code_sha256": sha,
807 "code_size": 1,
808 "version": "$LATEST",
809 "last_modified": "2020-01-01T00:00:00Z",
810 "tags": {},
811 "environment": {},
812 "architectures": ["x86_64"],
813 "package_type": "Zip",
814 "code_zip": [1, 2, 3],
815 "policy": null
816 }))
817 .expect("build test LambdaFunction")
818 }
819
820 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
824 async fn concurrent_invokes_are_serialized_on_a_single_instance() {
825 let peak = Arc::new(AtomicUsize::new(0));
826 let endpoint = spawn_rie(Duration::from_millis(40), peak.clone()).await;
827 let backend = CountingBackend::new(endpoint);
828 let rt = runtime_with(backend.clone(), 1);
829 let func = test_func("conc", "sha-A");
830
831 let mut handles = Vec::new();
832 for _ in 0..8 {
833 let rt = rt.clone();
834 let func = func.clone();
835 handles.push(tokio::spawn(
836 async move { rt.invoke(&func, b"{}", &[]).await },
837 ));
838 }
839 for h in handles {
840 h.await.unwrap().expect("invoke ok");
841 }
842
843 assert_eq!(
844 peak.load(SeqCst),
845 1,
846 "concurrent invokes overlapped on a single RIE instance"
847 );
848 assert_eq!(
849 backend.launches.load(SeqCst),
850 1,
851 "max_concurrency=1 must launch exactly one instance"
852 );
853 }
854
855 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
859 async fn pool_scales_under_load_and_respects_cap() {
860 let peak = Arc::new(AtomicUsize::new(0));
861 let endpoint = spawn_rie(Duration::from_millis(60), peak.clone()).await;
862 let backend = CountingBackend::new(endpoint);
863 let rt = runtime_with(backend.clone(), 4);
864 let func = test_func("scale", "sha-A");
865
866 let mut handles = Vec::new();
867 for _ in 0..8 {
868 let rt = rt.clone();
869 let func = func.clone();
870 handles.push(tokio::spawn(
871 async move { rt.invoke(&func, b"{}", &[]).await },
872 ));
873 }
874 for h in handles {
875 h.await.unwrap().expect("invoke ok");
876 }
877
878 let launched = backend.launches.load(SeqCst);
879 assert!(
880 (2..=4).contains(&launched),
881 "expected the pool to scale within the cap, launched={launched}"
882 );
883 assert!(
884 peak.load(SeqCst) > 1,
885 "expected concurrent forwards across the scaled pool"
886 );
887 }
888
889 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
893 async fn dead_instance_is_evicted_and_retried() {
894 let peak = Arc::new(AtomicUsize::new(0));
895 let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
896 let backend = CountingBackend::with_queue(live, vec!["127.0.0.1:1".to_string()]);
898 let rt = runtime_with(backend.clone(), 1);
899
900 let out = rt
901 .invoke(&test_func("dead", "sha-A"), b"{}", &[])
902 .await
903 .expect("should recover via cold-start retry");
904 assert_eq!(out, b"ok");
905 assert_eq!(
906 backend.launches.load(SeqCst),
907 2,
908 "expected the dead instance plus one cold-start replacement"
909 );
910 assert!(
911 backend.terminates.load(SeqCst) >= 1,
912 "the dead instance should have been terminated on eviction"
913 );
914 }
915
916 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
919 async fn deploy_change_evicts_stale_instance() {
920 let peak = Arc::new(AtomicUsize::new(0));
921 let endpoint = spawn_rie(Duration::from_millis(5), peak.clone()).await;
922 let backend = CountingBackend::new(endpoint);
923 let rt = runtime_with(backend.clone(), 2);
924
925 rt.invoke(&test_func("upd", "sha-A"), b"{}", &[])
926 .await
927 .unwrap();
928 rt.invoke(&test_func("upd", "sha-B"), b"{}", &[])
929 .await
930 .unwrap();
931
932 assert_eq!(
933 backend.launches.load(SeqCst),
934 2,
935 "a new deploy id should launch a fresh instance"
936 );
937 assert!(
938 backend.terminates.load(SeqCst) >= 1,
939 "the stale-deploy instance should have been torn down"
940 );
941 let pool_len = rt.instances.read().get("upd").map_or(0, |v| v.len());
943 assert_eq!(pool_len, 1);
944 }
945}