1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use base64::Engine;
12use parking_lot::RwLock;
13use sha2::{Digest, Sha256};
14
15use super::backend::{
16 BackendHandle, LambdaBackend, RuntimeError, StreamingInvocation, WarmInstance,
17};
18use super::docker::DockerBackend;
19use crate::state::LambdaFunction;
20
21pub(crate) struct WarmEntry {
23 instance: WarmInstance,
24 last_used: RwLock<Instant>,
25 deploy_id: String,
30 busy: Arc<tokio::sync::Mutex<()>>,
38}
39
40const DEFAULT_MAX_CONCURRENCY: usize = 10;
46
47const MAX_INVOKE_ATTEMPTS: u32 = 5;
52
53const REACHABILITY_PROBE_TIMEOUT: Duration = Duration::from_millis(1500);
58
59struct Slot {
62 entry: Arc<WarmEntry>,
63 guard: tokio::sync::OwnedMutexGuard<()>,
64}
65
66fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
77 deploy_id_from(&func.code_sha256, layers)
78}
79
80fn deploy_id_from(code_sha256: &str, layers: &[Vec<u8>]) -> String {
83 let mut hasher = Sha256::new();
84 hasher.update(code_sha256.as_bytes());
85 for bytes in layers {
86 let mut layer_hasher = Sha256::new();
87 layer_hasher.update(bytes);
88 hasher.update(b":");
89 hasher.update(layer_hasher.finalize());
90 }
91 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(hasher.finalize())
92}
93
94async fn endpoint_reachable(endpoint: &str, timeout: Duration) -> bool {
100 matches!(
101 tokio::time::timeout(timeout, tokio::net::TcpStream::connect(endpoint)).await,
102 Ok(Ok(_))
103 )
104}
105
106pub struct LambdaRuntime {
107 backend: Arc<dyn LambdaBackend>,
108 instances: RwLock<HashMap<String, Vec<Arc<WarmEntry>>>>,
112 starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
116 max_concurrency: usize,
118}
119
120impl LambdaRuntime {
121 pub fn from_backend(backend: Arc<dyn LambdaBackend>) -> Self {
125 let max_concurrency = std::env::var("FAKECLOUD_LAMBDA_MAX_CONCURRENCY")
126 .ok()
127 .and_then(|v| v.parse::<usize>().ok())
128 .filter(|n| *n >= 1)
129 .unwrap_or(DEFAULT_MAX_CONCURRENCY);
130 Self {
131 backend,
132 instances: RwLock::new(HashMap::new()),
133 starting: RwLock::new(HashMap::new()),
134 max_concurrency,
135 }
136 }
137
138 pub fn auto_detect_docker(server_port: u16) -> Option<Self> {
141 DockerBackend::auto_detect(server_port)
142 .map(|b| Self::from_backend(Arc::new(b) as Arc<dyn LambdaBackend>))
143 }
144
145 pub fn new(server_port: u16) -> Option<Self> {
148 Self::auto_detect_docker(server_port)
149 }
150
151 pub async fn new_k8s(
163 server_port: u16,
164 internal_token: String,
165 ) -> Result<Self, super::k8s::K8sBackendError> {
166 let backend = super::k8s::K8sBackend::from_env(server_port, internal_token).await?;
167 backend.reap_stale().await;
168 Ok(Self::from_backend(Arc::new(backend)))
169 }
170
171 pub fn cli_name(&self) -> &str {
172 self.backend.name()
173 }
174
175 pub async fn prepull_for_function(
189 &self,
190 func: &LambdaFunction,
191 ) -> Option<Result<(), super::backend::RuntimeError>> {
192 let image = if func.package_type == "Image" {
193 func.image_uri.clone()?
194 } else {
195 super::docker::runtime_to_image(&func.runtime)?
196 };
197 Some(self.backend.prepull_image(&image).await)
198 }
199
200 pub async fn invoke(
212 &self,
213 func: &LambdaFunction,
214 payload: &[u8],
215 layers: &[Vec<u8>],
216 ) -> Result<Vec<u8>, RuntimeError> {
217 let client = reqwest::Client::builder()
218 .connect_timeout(REACHABILITY_PROBE_TIMEOUT)
219 .build()
220 .unwrap_or_else(|_| reqwest::Client::new());
221 let mut attempt: u32 = 0;
222 loop {
223 attempt += 1;
224 let slot = self.acquire_slot(func, layers).await?;
225
226 if !endpoint_reachable(&slot.entry.instance.endpoint, REACHABILITY_PROBE_TIMEOUT).await
233 {
234 let entry = slot.entry.clone();
235 drop(slot);
236 self.evict_entry(&func.function_name, &entry).await;
237 if attempt < MAX_INVOKE_ATTEMPTS {
238 tracing::warn!(
239 function = %func.function_name,
240 endpoint = %entry.instance.endpoint,
241 "warm Lambda instance failed reachability probe; evicted, retrying with a cold start"
242 );
243 continue;
244 }
245 return Err(RuntimeError::InvocationFailed(format!(
246 "no reachable warm instance for {} after {attempt} attempts",
247 func.function_name
248 )));
249 }
250
251 let url = format!(
252 "http://{}/2015-03-31/functions/function/invocations",
253 slot.entry.instance.endpoint
254 );
255 let send = client
256 .post(&url)
257 .body(payload.to_vec())
258 .timeout(Duration::from_secs(func.timeout as u64 + 5))
259 .send()
260 .await;
261 match send {
262 Ok(resp) => {
263 let body = resp.bytes().await;
264 *slot.entry.last_used.write() = Instant::now();
265 return match body {
266 Ok(b) => Ok(b.to_vec()),
267 Err(e) => {
268 let entry = slot.entry.clone();
272 drop(slot);
273 self.evict_entry(&func.function_name, &entry).await;
274 Err(RuntimeError::InvocationFailed(e.to_string()))
275 }
276 };
277 }
278 Err(e) => {
279 let entry = slot.entry.clone();
288 drop(slot);
289 self.evict_entry(&func.function_name, &entry).await;
290 if attempt < MAX_INVOKE_ATTEMPTS && e.is_connect() {
291 tracing::warn!(
292 function = %func.function_name,
293 error = %e,
294 "warm Lambda instance unreachable; evicted, retrying with a cold start"
295 );
296 continue;
297 }
298 return Err(RuntimeError::InvocationFailed(e.to_string()));
299 }
300 }
301 }
302 }
303
304 pub async fn invoke_streaming(
315 &self,
316 func: &LambdaFunction,
317 payload: &[u8],
318 layers: &[Vec<u8>],
319 ) -> Result<StreamingInvocation, RuntimeError> {
320 let client = reqwest::Client::builder()
321 .connect_timeout(REACHABILITY_PROBE_TIMEOUT)
322 .build()
323 .unwrap_or_else(|_| reqwest::Client::new());
324 let mut attempt: u32 = 0;
325 loop {
326 attempt += 1;
327 let slot = self.acquire_slot(func, layers).await?;
328
329 if !endpoint_reachable(&slot.entry.instance.endpoint, REACHABILITY_PROBE_TIMEOUT).await
332 {
333 let entry = slot.entry.clone();
334 drop(slot);
335 self.evict_entry(&func.function_name, &entry).await;
336 if attempt < MAX_INVOKE_ATTEMPTS {
337 continue;
338 }
339 return Err(RuntimeError::InvocationFailed(format!(
340 "no reachable warm instance for {} after {attempt} attempts",
341 func.function_name
342 )));
343 }
344
345 let url = format!(
346 "http://{}/2015-03-31/functions/function/invocations",
347 slot.entry.instance.endpoint
348 );
349 let send = client
350 .post(&url)
351 .body(payload.to_vec())
352 .timeout(Duration::from_secs(func.timeout as u64 + 5))
353 .send()
354 .await;
355 match send {
356 Ok(resp) => {
357 *slot.entry.last_used.write() = Instant::now();
358 let Slot {
359 entry: _entry,
360 guard,
361 } = slot;
362 return Ok(StreamingInvocation {
363 resp,
364 _slot_guard: Some(guard),
365 });
366 }
367 Err(e) => {
368 let entry = slot.entry.clone();
372 drop(slot);
373 self.evict_entry(&func.function_name, &entry).await;
374 if attempt < MAX_INVOKE_ATTEMPTS && e.is_connect() {
375 continue;
376 }
377 return Err(RuntimeError::InvocationFailed(e.to_string()));
378 }
379 }
380 }
381 }
382
383 async fn acquire_slot(
393 &self,
394 func: &LambdaFunction,
395 layers: &[Vec<u8>],
396 ) -> Result<Slot, RuntimeError> {
397 let is_image = func.package_type == "Image";
398 if !is_image && func.code_zip.is_none() {
399 return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
400 }
401
402 let deploy_id = deploy_id_for(func, layers);
403
404 if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
406 return Ok(slot);
407 }
408
409 let startup_lock = {
412 let mut starting = self.starting.write();
413 starting
414 .entry(func.function_name.clone())
415 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
416 .clone()
417 };
418 let startup_guard = startup_lock.lock().await;
419
420 if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
423 return Ok(slot);
424 }
425
426 self.evict_stale_deploy(&func.function_name, &deploy_id)
428 .await;
429
430 let pool_len = self
431 .instances
432 .read()
433 .get(&func.function_name)
434 .map_or(0, |v| v.len());
435
436 if pool_len < self.max_concurrency {
438 let instance = self
439 .backend
440 .launch(func, func.code_zip.as_deref(), layers, &deploy_id)
441 .await?;
442 let entry = Arc::new(WarmEntry {
443 instance,
444 last_used: RwLock::new(Instant::now()),
445 deploy_id,
446 busy: Arc::new(tokio::sync::Mutex::new(())),
447 });
448 let guard = entry
449 .busy
450 .clone()
451 .try_lock_owned()
452 .expect("freshly created busy lock is uncontended");
453 self.instances
454 .write()
455 .entry(func.function_name.clone())
456 .or_default()
457 .push(entry.clone());
458 return Ok(Slot { entry, guard });
459 }
460
461 drop(startup_guard);
466 let candidates: Vec<Arc<WarmEntry>> = {
467 let map = self.instances.read();
468 map.get(&func.function_name)
469 .map(|pool| {
470 pool.iter()
471 .filter(|e| e.deploy_id == deploy_id)
472 .cloned()
473 .collect()
474 })
475 .unwrap_or_default()
476 };
477 if candidates.is_empty() {
478 return Err(RuntimeError::InvocationFailed(format!(
479 "no warm instance available for {}",
480 func.function_name
481 )));
482 }
483 let waiters = candidates.into_iter().map(|entry| {
484 Box::pin(async move {
485 let guard = entry.busy.clone().lock_owned().await;
486 Slot { entry, guard }
487 })
488 });
489 let (slot, _idx, _rest) = futures_util::future::select_all(waiters).await;
490 *slot.entry.last_used.write() = Instant::now();
491 Ok(slot)
492 }
493
494 fn try_take_free(&self, function_name: &str, deploy_id: &str) -> Option<Slot> {
498 let map = self.instances.read();
499 let pool = map.get(function_name)?;
500 for entry in pool {
501 if entry.deploy_id != deploy_id {
502 continue;
503 }
504 if let Ok(guard) = entry.busy.clone().try_lock_owned() {
505 *entry.last_used.write() = Instant::now();
506 return Some(Slot {
507 entry: entry.clone(),
508 guard,
509 });
510 }
511 }
512 None
513 }
514
515 async fn evict_entry(&self, function_name: &str, target: &Arc<WarmEntry>) {
518 let removed = {
519 let mut map = self.instances.write();
520 match map.get_mut(function_name) {
521 Some(pool) => {
522 let removed = pool
523 .iter()
524 .position(|e| Arc::ptr_eq(e, target))
525 .map(|pos| pool.remove(pos));
526 if pool.is_empty() {
527 map.remove(function_name);
528 }
529 removed
530 }
531 None => None,
532 }
533 };
534 if let Some(entry) = removed {
535 tracing::info!(
536 function = %function_name,
537 handle = ?entry.instance.handle,
538 "evicting unreachable Lambda runtime instance"
539 );
540 self.backend.terminate(&entry.instance.handle).await;
541 }
542 }
543
544 async fn evict_stale_deploy(&self, function_name: &str, deploy_id: &str) {
547 let stale: Vec<Arc<WarmEntry>> = {
548 let mut map = self.instances.write();
549 match map.get_mut(function_name) {
550 Some(pool) => {
551 let mut stale = Vec::new();
552 pool.retain(|e| {
553 if e.deploy_id == deploy_id {
554 true
555 } else {
556 stale.push(e.clone());
557 false
558 }
559 });
560 if pool.is_empty() {
561 map.remove(function_name);
562 }
563 stale
564 }
565 None => Vec::new(),
566 }
567 };
568 for entry in stale {
569 tracing::info!(
570 function = %function_name,
571 handle = ?entry.instance.handle,
572 "stopping stale-deploy Lambda runtime instance"
573 );
574 self.backend.terminate(&entry.instance.handle).await;
575 }
576 }
577
578 pub(crate) fn take_warm_instances(&self, function_name: &str) -> Vec<Arc<WarmEntry>> {
586 self.instances
587 .write()
588 .remove(function_name)
589 .unwrap_or_default()
590 }
591
592 pub(crate) async fn terminate_instances(&self, pool: Vec<Arc<WarmEntry>>) {
595 for entry in pool {
596 tracing::info!(
597 handle = ?entry.instance.handle,
598 "stopping Lambda runtime instance"
599 );
600 self.backend.terminate(&entry.instance.handle).await;
601 }
602 }
603
604 pub async fn stop_container(&self, function_name: &str) {
606 let pool = self.take_warm_instances(function_name);
607 self.terminate_instances(pool).await;
608 }
609
610 pub async fn stop_all(&self) {
612 let pools: Vec<(String, Vec<Arc<WarmEntry>>)> =
613 { self.instances.write().drain().collect() };
614 for (name, pool) in pools {
615 for entry in pool {
616 tracing::info!(
617 function = %name,
618 handle = ?entry.instance.handle,
619 "stopping Lambda runtime instance (cleanup)"
620 );
621 self.backend.terminate(&entry.instance.handle).await;
622 }
623 }
624 }
625
626 pub fn list_warm_containers(
630 &self,
631 lambda_state: &crate::state::SharedLambdaState,
632 ) -> Vec<serde_json::Value> {
633 let entries = self.instances.read();
634 let accounts = lambda_state.read();
635 let mut rows = Vec::new();
636 for (name, pool) in entries.iter() {
637 let runtime = accounts
638 .iter()
639 .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
640 .unwrap_or_default();
641 for entry in pool {
642 let idle_secs = entry.last_used.read().elapsed().as_secs();
643 let mut row = serde_json::json!({
644 "functionName": name,
645 "runtime": runtime,
646 "backend": self.backend.name(),
647 "lastUsedSecsAgo": idle_secs,
648 });
649 let obj = row.as_object_mut().expect("json object");
650 match &entry.instance.handle {
651 BackendHandle::Container { id } => {
652 obj.insert("containerId".into(), serde_json::Value::String(id.clone()));
653 }
654 BackendHandle::Pod { namespace, name } => {
655 obj.insert("podName".into(), serde_json::Value::String(name.clone()));
656 obj.insert(
657 "namespace".into(),
658 serde_json::Value::String(namespace.clone()),
659 );
660 }
661 }
662 rows.push(row);
663 }
664 }
665 rows
666 }
667
668 pub async fn evict_container(&self, function_name: &str) -> bool {
671 let pool = self
672 .instances
673 .write()
674 .remove(function_name)
675 .unwrap_or_default();
676 let found = !pool.is_empty();
677 for entry in pool {
678 tracing::info!(
679 function = %function_name,
680 handle = ?entry.instance.handle,
681 "evicting Lambda runtime instance via simulation API"
682 );
683 self.backend.terminate(&entry.instance.handle).await;
684 }
685 found
686 }
687
688 pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
690 let mut interval = tokio::time::interval(Duration::from_secs(30));
691 loop {
692 interval.tick().await;
693 self.cleanup_idle(ttl).await;
694 }
695 }
696
697 async fn cleanup_idle(&self, ttl: Duration) {
698 let expired: Vec<(String, Arc<WarmEntry>)> = {
703 let mut map = self.instances.write();
704 let mut out = Vec::new();
705 for (name, pool) in map.iter_mut() {
706 let mut i = 0;
707 while i < pool.len() {
708 let idle = pool[i].last_used.read().elapsed() > ttl;
709 let free = pool[i].busy.try_lock().is_ok();
710 if idle && free {
711 out.push((name.clone(), pool.remove(i)));
712 } else {
713 i += 1;
714 }
715 }
716 }
717 map.retain(|_, pool| !pool.is_empty());
718 out
719 };
720 for (name, entry) in expired {
721 tracing::info!(function = %name, "stopping idle Lambda runtime instance");
722 self.backend.terminate(&entry.instance.handle).await;
723 }
724 }
725}
726
727#[cfg(test)]
728mod tests {
729 use super::deploy_id_from;
730
731 #[test]
738 fn deploy_id_is_url_path_safe() {
739 for i in 0..2_000u32 {
740 let code_sha256 = format!("sha256-seed-{i}-{}", i.wrapping_mul(2_654_435_761));
742 let layers: Vec<Vec<u8>> = if i % 3 == 0 {
743 vec![format!("layer-{i}").into_bytes()]
744 } else {
745 vec![]
746 };
747 let id = deploy_id_from(&code_sha256, &layers);
748 assert!(
749 !id.contains('/') && !id.contains('+') && !id.contains('='),
750 "deploy id {id:?} (seed {i}) is not URL-path-safe"
751 );
752 }
753 }
754
755 #[test]
758 fn deploy_id_is_stable() {
759 let layers = vec![b"layer-a".to_vec(), b"layer-b".to_vec()];
760 let a = deploy_id_from("abc123", &layers);
761 let b = deploy_id_from("abc123", &layers);
762 assert_eq!(a, b);
763 assert_ne!(a, deploy_id_from("abc124", &layers));
764 assert_ne!(a, deploy_id_from("abc123", &[]));
765 }
766
767 use super::LambdaRuntime;
770 use crate::runtime::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
771 use crate::state::LambdaFunction;
772 use parking_lot::RwLock;
773 use std::collections::{HashMap, VecDeque};
774 use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
775 use std::sync::Arc;
776 use std::sync::Mutex as StdMutex;
777 use std::time::Duration;
778
779 struct CountingBackend {
783 endpoints: StdMutex<VecDeque<String>>,
784 default_endpoint: String,
785 launches: AtomicUsize,
786 terminates: AtomicUsize,
787 }
788
789 impl CountingBackend {
790 fn new(default_endpoint: impl Into<String>) -> Arc<Self> {
791 Arc::new(Self {
792 endpoints: StdMutex::new(VecDeque::new()),
793 default_endpoint: default_endpoint.into(),
794 launches: AtomicUsize::new(0),
795 terminates: AtomicUsize::new(0),
796 })
797 }
798 fn with_queue(default_endpoint: impl Into<String>, queue: Vec<String>) -> Arc<Self> {
799 Arc::new(Self {
800 endpoints: StdMutex::new(queue.into()),
801 default_endpoint: default_endpoint.into(),
802 launches: AtomicUsize::new(0),
803 terminates: AtomicUsize::new(0),
804 })
805 }
806 }
807
808 #[async_trait::async_trait]
809 impl LambdaBackend for CountingBackend {
810 fn name(&self) -> &str {
811 "test"
812 }
813 async fn launch(
814 &self,
815 _func: &LambdaFunction,
816 _code_zip: Option<&[u8]>,
817 _layers: &[Vec<u8>],
818 _deploy_id: &str,
819 ) -> Result<WarmInstance, RuntimeError> {
820 let n = self.launches.fetch_add(1, SeqCst);
821 let endpoint = self
822 .endpoints
823 .lock()
824 .unwrap()
825 .pop_front()
826 .unwrap_or_else(|| self.default_endpoint.clone());
827 Ok(WarmInstance {
828 endpoint,
829 handle: BackendHandle::Container {
830 id: format!("c{n}"),
831 },
832 })
833 }
834 async fn terminate(&self, _handle: &BackendHandle) {
835 self.terminates.fetch_add(1, SeqCst);
836 }
837 }
838
839 async fn spawn_rie(delay: Duration, peak: Arc<AtomicUsize>) -> String {
845 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
846 let addr = listener.local_addr().unwrap();
847 let cur = Arc::new(AtomicUsize::new(0));
848 tokio::spawn(async move {
849 loop {
850 let Ok((mut sock, _)) = listener.accept().await else {
851 break;
852 };
853 let cur = cur.clone();
854 let peak = peak.clone();
855 tokio::spawn(async move {
856 use tokio::io::{AsyncReadExt, AsyncWriteExt};
857 let mut buf = [0u8; 1024];
863 let n = sock.read(&mut buf).await.unwrap_or(0);
864 if n == 0 {
865 return;
866 }
867 let now = cur.fetch_add(1, SeqCst) + 1;
868 peak.fetch_max(now, SeqCst);
869 tokio::time::sleep(delay).await;
870 let _ = sock
871 .write_all(
872 b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: close\r\n\r\nok",
873 )
874 .await;
875 let _ = sock.flush().await;
876 cur.fetch_sub(1, SeqCst);
877 });
878 }
879 });
880 format!("{addr}")
881 }
882
883 fn runtime_with(backend: Arc<CountingBackend>, max_concurrency: usize) -> Arc<LambdaRuntime> {
884 Arc::new(LambdaRuntime {
885 backend,
886 instances: RwLock::new(HashMap::new()),
887 starting: RwLock::new(HashMap::new()),
888 max_concurrency,
889 })
890 }
891
892 fn test_func(name: &str, sha: &str) -> LambdaFunction {
893 serde_json::from_value(serde_json::json!({
894 "function_name": name,
895 "function_arn": format!("arn:aws:lambda:us-east-1:123456789012:function:{name}"),
896 "runtime": "python3.12",
897 "role": "arn:aws:iam::123456789012:role/r",
898 "handler": "index.handler",
899 "description": "",
900 "timeout": 5,
901 "memory_size": 128,
902 "code_sha256": sha,
903 "code_size": 1,
904 "version": "$LATEST",
905 "last_modified": "2020-01-01T00:00:00Z",
906 "tags": {},
907 "environment": {},
908 "architectures": ["x86_64"],
909 "package_type": "Zip",
910 "code_zip": [1, 2, 3],
911 "policy": null
912 }))
913 .expect("build test LambdaFunction")
914 }
915
916 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
920 async fn concurrent_invokes_are_serialized_on_a_single_instance() {
921 let peak = Arc::new(AtomicUsize::new(0));
922 let endpoint = spawn_rie(Duration::from_millis(40), peak.clone()).await;
923 let backend = CountingBackend::new(endpoint);
924 let rt = runtime_with(backend.clone(), 1);
925 let func = test_func("conc", "sha-A");
926
927 let mut handles = Vec::new();
928 for _ in 0..8 {
929 let rt = rt.clone();
930 let func = func.clone();
931 handles.push(tokio::spawn(
932 async move { rt.invoke(&func, b"{}", &[]).await },
933 ));
934 }
935 for h in handles {
936 h.await.unwrap().expect("invoke ok");
937 }
938
939 assert_eq!(
940 peak.load(SeqCst),
941 1,
942 "concurrent invokes overlapped on a single RIE instance"
943 );
944 assert_eq!(
945 backend.launches.load(SeqCst),
946 1,
947 "max_concurrency=1 must launch exactly one instance"
948 );
949 }
950
951 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
955 async fn pool_scales_under_load_and_respects_cap() {
956 let peak = Arc::new(AtomicUsize::new(0));
957 let endpoint = spawn_rie(Duration::from_millis(60), peak.clone()).await;
958 let backend = CountingBackend::new(endpoint);
959 let rt = runtime_with(backend.clone(), 4);
960 let func = test_func("scale", "sha-A");
961
962 let mut handles = Vec::new();
963 for _ in 0..8 {
964 let rt = rt.clone();
965 let func = func.clone();
966 handles.push(tokio::spawn(
967 async move { rt.invoke(&func, b"{}", &[]).await },
968 ));
969 }
970 for h in handles {
971 h.await.unwrap().expect("invoke ok");
972 }
973
974 let launched = backend.launches.load(SeqCst);
975 assert!(
976 (2..=4).contains(&launched),
977 "expected the pool to scale within the cap, launched={launched}"
978 );
979 assert!(
980 peak.load(SeqCst) > 1,
981 "expected concurrent forwards across the scaled pool"
982 );
983 }
984
985 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
989 async fn dead_instance_is_evicted_and_retried() {
990 let peak = Arc::new(AtomicUsize::new(0));
991 let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
992 let backend = CountingBackend::with_queue(
996 live,
997 vec!["127.0.0.1:1".to_string(), "127.0.0.1:1".to_string()],
998 );
999 let rt = runtime_with(backend.clone(), 1);
1000
1001 let out = rt
1002 .invoke(&test_func("dead", "sha-A"), b"{}", &[])
1003 .await
1004 .expect("should recover via cold-start retry");
1005 assert_eq!(out, b"ok");
1006 assert_eq!(
1007 backend.launches.load(SeqCst),
1008 3,
1009 "expected two dead instances plus one cold-start replacement"
1010 );
1011 assert!(
1012 backend.terminates.load(SeqCst) >= 2,
1013 "both dead instances should have been terminated on eviction"
1014 );
1015 }
1016
1017 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1023 async fn black_holed_instance_fails_over_fast() {
1024 let peak = Arc::new(AtomicUsize::new(0));
1025 let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
1026 let backend = CountingBackend::with_queue(live, vec!["192.0.2.1:9".to_string()]);
1029 let rt = runtime_with(backend.clone(), 1);
1030
1031 let func = test_func("blackhole", "sha-A");
1033 let started = std::time::Instant::now();
1034 let out = rt
1035 .invoke(&func, b"{}", &[])
1036 .await
1037 .expect("should recover via cold-start retry");
1038 let elapsed = started.elapsed();
1039
1040 assert_eq!(out, b"ok");
1041 assert_eq!(
1042 backend.launches.load(SeqCst),
1043 2,
1044 "expected one black-holed instance plus one cold-start replacement"
1045 );
1046 assert!(
1047 backend.terminates.load(SeqCst) >= 1,
1048 "the black-holed instance should have been evicted"
1049 );
1050 assert!(
1051 elapsed < Duration::from_secs(5),
1052 "failover took {elapsed:?}; must be far below the ~10s invoke timeout"
1053 );
1054 }
1055
1056 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1059 async fn deploy_change_evicts_stale_instance() {
1060 let peak = Arc::new(AtomicUsize::new(0));
1061 let endpoint = spawn_rie(Duration::from_millis(5), peak.clone()).await;
1062 let backend = CountingBackend::new(endpoint);
1063 let rt = runtime_with(backend.clone(), 2);
1064
1065 rt.invoke(&test_func("upd", "sha-A"), b"{}", &[])
1066 .await
1067 .unwrap();
1068 rt.invoke(&test_func("upd", "sha-B"), b"{}", &[])
1069 .await
1070 .unwrap();
1071
1072 assert_eq!(
1073 backend.launches.load(SeqCst),
1074 2,
1075 "a new deploy id should launch a fresh instance"
1076 );
1077 assert!(
1078 backend.terminates.load(SeqCst) >= 1,
1079 "the stale-deploy instance should have been torn down"
1080 );
1081 let pool_len = rt.instances.read().get("upd").map_or(0, |v| v.len());
1083 assert_eq!(pool_len, 1);
1084 }
1085
1086 #[tokio::test]
1092 async fn take_warm_instances_snapshot_does_not_reap_recreated_pool() {
1093 let backend = CountingBackend::new("127.0.0.1:1");
1094 let rt = runtime_with(backend.clone(), 10);
1095 let mk = |id: &str| {
1096 Arc::new(super::WarmEntry {
1097 instance: WarmInstance {
1098 endpoint: "127.0.0.1:1".to_string(),
1099 handle: BackendHandle::Container { id: id.to_string() },
1100 },
1101 last_used: RwLock::new(std::time::Instant::now()),
1102 deploy_id: "d".to_string(),
1103 busy: Arc::new(tokio::sync::Mutex::new(())),
1104 })
1105 };
1106
1107 rt.instances
1109 .write()
1110 .insert("f".to_string(), vec![mk("old")]);
1111
1112 let snapshot = rt.take_warm_instances("f");
1114 assert_eq!(snapshot.len(), 1);
1115 assert!(rt.instances.read().get("f").is_none());
1116
1117 rt.instances
1120 .write()
1121 .insert("f".to_string(), vec![mk("new")]);
1122
1123 rt.terminate_instances(snapshot).await;
1125 assert_eq!(
1126 backend.terminates.load(SeqCst),
1127 1,
1128 "only the snapshotted instance is terminated"
1129 );
1130
1131 let pool = rt.instances.read();
1133 let f = pool.get("f").expect("recreated function pool must survive");
1134 assert_eq!(f.len(), 1);
1135 }
1136}