1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use base64::Engine;
12use parking_lot::RwLock;
13use sha2::{Digest, Sha256};
14
15use super::backend::{
16 BackendHandle, LambdaBackend, RuntimeError, StreamingInvocation, WarmInstance,
17};
18use super::docker::DockerBackend;
19use crate::state::LambdaFunction;
20
21pub(crate) struct WarmEntry {
23 instance: WarmInstance,
24 last_used: RwLock<Instant>,
25 deploy_id: String,
30 busy: Arc<tokio::sync::Mutex<()>>,
38}
39
40const DEFAULT_MAX_CONCURRENCY: usize = 10;
46
47const MAX_INVOKE_ATTEMPTS: u32 = 5;
52
53const REACHABILITY_PROBE_TIMEOUT: Duration = Duration::from_millis(1500);
58
59struct Slot {
62 entry: Arc<WarmEntry>,
63 guard: tokio::sync::OwnedMutexGuard<()>,
64}
65
66fn deploy_id_for(func: &LambdaFunction, layers: &[Vec<u8>]) -> String {
77 deploy_id_from(&func.code_sha256, layers)
78}
79
80fn deploy_id_from(code_sha256: &str, layers: &[Vec<u8>]) -> String {
83 let mut hasher = Sha256::new();
84 hasher.update(code_sha256.as_bytes());
85 for bytes in layers {
86 let mut layer_hasher = Sha256::new();
87 layer_hasher.update(bytes);
88 hasher.update(b":");
89 hasher.update(layer_hasher.finalize());
90 }
91 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(hasher.finalize())
92}
93
94async fn endpoint_reachable(endpoint: &str, timeout: Duration) -> bool {
100 matches!(
101 tokio::time::timeout(timeout, tokio::net::TcpStream::connect(endpoint)).await,
102 Ok(Ok(_))
103 )
104}
105
106pub struct LambdaRuntime {
107 backend: Arc<dyn LambdaBackend>,
108 instances: RwLock<HashMap<String, Vec<Arc<WarmEntry>>>>,
112 starting: RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
116 max_concurrency: usize,
118}
119
120impl LambdaRuntime {
121 pub fn from_backend(backend: Arc<dyn LambdaBackend>) -> Self {
125 let max_concurrency = std::env::var("FAKECLOUD_LAMBDA_MAX_CONCURRENCY")
126 .ok()
127 .and_then(|v| v.parse::<usize>().ok())
128 .filter(|n| *n >= 1)
129 .unwrap_or(DEFAULT_MAX_CONCURRENCY);
130 Self {
131 backend,
132 instances: RwLock::new(HashMap::new()),
133 starting: RwLock::new(HashMap::new()),
134 max_concurrency,
135 }
136 }
137
138 pub fn auto_detect_docker(server_port: u16) -> Option<Self> {
141 DockerBackend::auto_detect(server_port)
142 .map(|b| Self::from_backend(Arc::new(b) as Arc<dyn LambdaBackend>))
143 }
144
145 pub fn new(server_port: u16) -> Option<Self> {
148 Self::auto_detect_docker(server_port)
149 }
150
151 pub async fn new_k8s(
163 server_port: u16,
164 internal_token: String,
165 ) -> Result<Self, super::k8s::K8sBackendError> {
166 let backend = super::k8s::K8sBackend::from_env(server_port, internal_token).await?;
167 backend.reap_stale().await;
168 Ok(Self::from_backend(Arc::new(backend)))
169 }
170
171 pub fn cli_name(&self) -> &str {
172 self.backend.name()
173 }
174
175 pub async fn prepull_for_function(
189 &self,
190 func: &LambdaFunction,
191 ) -> Option<Result<(), super::backend::RuntimeError>> {
192 let image = if func.package_type == "Image" {
193 func.image_uri.clone()?
194 } else {
195 super::docker::runtime_to_image(&func.runtime)?
196 };
197 Some(self.backend.prepull_image(&image).await)
198 }
199
200 pub async fn invoke(
212 &self,
213 func: &LambdaFunction,
214 payload: &[u8],
215 layers: &[Vec<u8>],
216 ) -> Result<Vec<u8>, RuntimeError> {
217 self.invoke_inner(func, payload, layers, false)
218 .await
219 .map(|(bytes, _)| bytes)
220 }
221
222 pub async fn invoke_with_log_tail(
226 &self,
227 func: &LambdaFunction,
228 payload: &[u8],
229 layers: &[Vec<u8>],
230 ) -> Result<(Vec<u8>, Option<String>), RuntimeError> {
231 self.invoke_inner(func, payload, layers, true).await
232 }
233
234 async fn invoke_inner(
235 &self,
236 func: &LambdaFunction,
237 payload: &[u8],
238 layers: &[Vec<u8>],
239 capture_logs: bool,
240 ) -> Result<(Vec<u8>, Option<String>), RuntimeError> {
241 let client = reqwest::Client::builder()
242 .connect_timeout(REACHABILITY_PROBE_TIMEOUT)
243 .build()
244 .unwrap_or_else(|_| reqwest::Client::new());
245 let mut attempt: u32 = 0;
246 loop {
247 attempt += 1;
248 let slot = self.acquire_slot(func, layers).await?;
249
250 if !endpoint_reachable(&slot.entry.instance.endpoint, REACHABILITY_PROBE_TIMEOUT).await
257 {
258 let entry = slot.entry.clone();
259 drop(slot);
260 self.evict_entry(&func.function_name, &entry).await;
261 if attempt < MAX_INVOKE_ATTEMPTS {
262 tracing::warn!(
263 function = %func.function_name,
264 endpoint = %entry.instance.endpoint,
265 "warm Lambda instance failed reachability probe; evicted, retrying with a cold start"
266 );
267 continue;
268 }
269 return Err(RuntimeError::InvocationFailed(format!(
270 "no reachable warm instance for {} after {attempt} attempts",
271 func.function_name
272 )));
273 }
274
275 let url = format!(
276 "http://{}/2015-03-31/functions/function/invocations",
277 slot.entry.instance.endpoint
278 );
279 let send = client
280 .post(&url)
281 .body(payload.to_vec())
282 .timeout(Duration::from_secs(func.timeout as u64 + 5))
283 .send()
284 .await;
285 match send {
286 Ok(resp) => {
287 let body = resp.bytes().await;
288 *slot.entry.last_used.write() = Instant::now();
289 return match body {
290 Ok(b) => {
291 let logs = if capture_logs {
294 self.backend
295 .instance_logs(&slot.entry.instance.handle)
296 .await
297 } else {
298 None
299 };
300 Ok((b.to_vec(), logs))
301 }
302 Err(e) => {
303 let entry = slot.entry.clone();
307 drop(slot);
308 self.evict_entry(&func.function_name, &entry).await;
309 Err(RuntimeError::InvocationFailed(e.to_string()))
310 }
311 };
312 }
313 Err(e) => {
314 let entry = slot.entry.clone();
323 drop(slot);
324 self.evict_entry(&func.function_name, &entry).await;
325 if attempt < MAX_INVOKE_ATTEMPTS && e.is_connect() {
326 tracing::warn!(
327 function = %func.function_name,
328 error = %e,
329 "warm Lambda instance unreachable; evicted, retrying with a cold start"
330 );
331 continue;
332 }
333 return Err(RuntimeError::InvocationFailed(e.to_string()));
334 }
335 }
336 }
337 }
338
339 pub async fn invoke_streaming(
350 &self,
351 func: &LambdaFunction,
352 payload: &[u8],
353 layers: &[Vec<u8>],
354 ) -> Result<StreamingInvocation, RuntimeError> {
355 let client = reqwest::Client::builder()
356 .connect_timeout(REACHABILITY_PROBE_TIMEOUT)
357 .build()
358 .unwrap_or_else(|_| reqwest::Client::new());
359 let mut attempt: u32 = 0;
360 loop {
361 attempt += 1;
362 let slot = self.acquire_slot(func, layers).await?;
363
364 if !endpoint_reachable(&slot.entry.instance.endpoint, REACHABILITY_PROBE_TIMEOUT).await
367 {
368 let entry = slot.entry.clone();
369 drop(slot);
370 self.evict_entry(&func.function_name, &entry).await;
371 if attempt < MAX_INVOKE_ATTEMPTS {
372 continue;
373 }
374 return Err(RuntimeError::InvocationFailed(format!(
375 "no reachable warm instance for {} after {attempt} attempts",
376 func.function_name
377 )));
378 }
379
380 let url = format!(
381 "http://{}/2015-03-31/functions/function/invocations",
382 slot.entry.instance.endpoint
383 );
384 let send = client
385 .post(&url)
386 .body(payload.to_vec())
387 .timeout(Duration::from_secs(func.timeout as u64 + 5))
388 .send()
389 .await;
390 match send {
391 Ok(resp) => {
392 *slot.entry.last_used.write() = Instant::now();
393 let Slot {
394 entry: _entry,
395 guard,
396 } = slot;
397 return Ok(StreamingInvocation {
398 resp,
399 _slot_guard: Some(guard),
400 });
401 }
402 Err(e) => {
403 let entry = slot.entry.clone();
407 drop(slot);
408 self.evict_entry(&func.function_name, &entry).await;
409 if attempt < MAX_INVOKE_ATTEMPTS && e.is_connect() {
410 continue;
411 }
412 return Err(RuntimeError::InvocationFailed(e.to_string()));
413 }
414 }
415 }
416 }
417
418 async fn acquire_slot(
428 &self,
429 func: &LambdaFunction,
430 layers: &[Vec<u8>],
431 ) -> Result<Slot, RuntimeError> {
432 let is_image = func.package_type == "Image";
433 if !is_image && func.code_zip.is_none() {
434 return Err(RuntimeError::NoCodeZip(func.function_name.clone()));
435 }
436
437 let deploy_id = deploy_id_for(func, layers);
438
439 if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
441 return Ok(slot);
442 }
443
444 let startup_lock = {
447 let mut starting = self.starting.write();
448 starting
449 .entry(func.function_name.clone())
450 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
451 .clone()
452 };
453 let startup_guard = startup_lock.lock().await;
454
455 if let Some(slot) = self.try_take_free(&func.function_name, &deploy_id) {
458 return Ok(slot);
459 }
460
461 self.evict_stale_deploy(&func.function_name, &deploy_id)
463 .await;
464
465 let pool_len = self
466 .instances
467 .read()
468 .get(&func.function_name)
469 .map_or(0, |v| v.len());
470
471 if pool_len < self.max_concurrency {
473 let instance = self
474 .backend
475 .launch(func, func.code_zip.as_deref(), layers, &deploy_id)
476 .await?;
477 let entry = Arc::new(WarmEntry {
478 instance,
479 last_used: RwLock::new(Instant::now()),
480 deploy_id,
481 busy: Arc::new(tokio::sync::Mutex::new(())),
482 });
483 let guard = entry
484 .busy
485 .clone()
486 .try_lock_owned()
487 .expect("freshly created busy lock is uncontended");
488 self.instances
489 .write()
490 .entry(func.function_name.clone())
491 .or_default()
492 .push(entry.clone());
493 return Ok(Slot { entry, guard });
494 }
495
496 drop(startup_guard);
501 let candidates: Vec<Arc<WarmEntry>> = {
502 let map = self.instances.read();
503 map.get(&func.function_name)
504 .map(|pool| {
505 pool.iter()
506 .filter(|e| e.deploy_id == deploy_id)
507 .cloned()
508 .collect()
509 })
510 .unwrap_or_default()
511 };
512 if candidates.is_empty() {
513 return Err(RuntimeError::InvocationFailed(format!(
514 "no warm instance available for {}",
515 func.function_name
516 )));
517 }
518 let waiters = candidates.into_iter().map(|entry| {
519 Box::pin(async move {
520 let guard = entry.busy.clone().lock_owned().await;
521 Slot { entry, guard }
522 })
523 });
524 let (slot, _idx, _rest) = futures_util::future::select_all(waiters).await;
525 *slot.entry.last_used.write() = Instant::now();
526 Ok(slot)
527 }
528
529 fn try_take_free(&self, function_name: &str, deploy_id: &str) -> Option<Slot> {
533 let map = self.instances.read();
534 let pool = map.get(function_name)?;
535 for entry in pool {
536 if entry.deploy_id != deploy_id {
537 continue;
538 }
539 if let Ok(guard) = entry.busy.clone().try_lock_owned() {
540 *entry.last_used.write() = Instant::now();
541 return Some(Slot {
542 entry: entry.clone(),
543 guard,
544 });
545 }
546 }
547 None
548 }
549
550 async fn evict_entry(&self, function_name: &str, target: &Arc<WarmEntry>) {
553 let removed = {
554 let mut map = self.instances.write();
555 match map.get_mut(function_name) {
556 Some(pool) => {
557 let removed = pool
558 .iter()
559 .position(|e| Arc::ptr_eq(e, target))
560 .map(|pos| pool.remove(pos));
561 if pool.is_empty() {
562 map.remove(function_name);
563 }
564 removed
565 }
566 None => None,
567 }
568 };
569 if let Some(entry) = removed {
570 tracing::info!(
571 function = %function_name,
572 handle = ?entry.instance.handle,
573 "evicting unreachable Lambda runtime instance"
574 );
575 self.backend.terminate(&entry.instance.handle).await;
576 }
577 }
578
579 async fn evict_stale_deploy(&self, function_name: &str, deploy_id: &str) {
582 let stale: Vec<Arc<WarmEntry>> = {
583 let mut map = self.instances.write();
584 match map.get_mut(function_name) {
585 Some(pool) => {
586 let mut stale = Vec::new();
587 pool.retain(|e| {
588 if e.deploy_id == deploy_id {
589 true
590 } else {
591 stale.push(e.clone());
592 false
593 }
594 });
595 if pool.is_empty() {
596 map.remove(function_name);
597 }
598 stale
599 }
600 None => Vec::new(),
601 }
602 };
603 for entry in stale {
604 tracing::info!(
605 function = %function_name,
606 handle = ?entry.instance.handle,
607 "stopping stale-deploy Lambda runtime instance"
608 );
609 self.backend.terminate(&entry.instance.handle).await;
610 }
611 }
612
613 pub(crate) fn take_warm_instances(&self, function_name: &str) -> Vec<Arc<WarmEntry>> {
621 self.instances
622 .write()
623 .remove(function_name)
624 .unwrap_or_default()
625 }
626
627 pub(crate) async fn terminate_instances(&self, pool: Vec<Arc<WarmEntry>>) {
630 for entry in pool {
631 tracing::info!(
632 handle = ?entry.instance.handle,
633 "stopping Lambda runtime instance"
634 );
635 self.backend.terminate(&entry.instance.handle).await;
636 }
637 }
638
639 pub async fn stop_container(&self, function_name: &str) {
641 let pool = self.take_warm_instances(function_name);
642 self.terminate_instances(pool).await;
643 }
644
645 pub async fn stop_all(&self) {
647 let pools: Vec<(String, Vec<Arc<WarmEntry>>)> =
648 { self.instances.write().drain().collect() };
649 for (name, pool) in pools {
650 for entry in pool {
651 tracing::info!(
652 function = %name,
653 handle = ?entry.instance.handle,
654 "stopping Lambda runtime instance (cleanup)"
655 );
656 self.backend.terminate(&entry.instance.handle).await;
657 }
658 }
659 }
660
661 pub fn list_warm_containers(
665 &self,
666 lambda_state: &crate::state::SharedLambdaState,
667 ) -> Vec<serde_json::Value> {
668 let entries = self.instances.read();
669 let accounts = lambda_state.read();
670 let mut rows = Vec::new();
671 for (name, pool) in entries.iter() {
672 let runtime = accounts
673 .iter()
674 .find_map(|(_, state)| state.functions.get(name).map(|f| f.runtime.clone()))
675 .unwrap_or_default();
676 for entry in pool {
677 let idle_secs = entry.last_used.read().elapsed().as_secs();
678 let mut row = serde_json::json!({
679 "functionName": name,
680 "runtime": runtime,
681 "backend": self.backend.name(),
682 "lastUsedSecsAgo": idle_secs,
683 });
684 let obj = row.as_object_mut().expect("json object");
685 match &entry.instance.handle {
686 BackendHandle::Container { id } => {
687 obj.insert("containerId".into(), serde_json::Value::String(id.clone()));
688 }
689 BackendHandle::Pod { namespace, name } => {
690 obj.insert("podName".into(), serde_json::Value::String(name.clone()));
691 obj.insert(
692 "namespace".into(),
693 serde_json::Value::String(namespace.clone()),
694 );
695 }
696 }
697 rows.push(row);
698 }
699 }
700 rows
701 }
702
703 pub async fn evict_container(&self, function_name: &str) -> bool {
706 let pool = self
707 .instances
708 .write()
709 .remove(function_name)
710 .unwrap_or_default();
711 let found = !pool.is_empty();
712 for entry in pool {
713 tracing::info!(
714 function = %function_name,
715 handle = ?entry.instance.handle,
716 "evicting Lambda runtime instance via simulation API"
717 );
718 self.backend.terminate(&entry.instance.handle).await;
719 }
720 found
721 }
722
723 pub async fn run_cleanup_loop(self: Arc<Self>, ttl: Duration) {
725 let mut interval = tokio::time::interval(Duration::from_secs(30));
726 loop {
727 interval.tick().await;
728 self.cleanup_idle(ttl).await;
729 }
730 }
731
732 async fn cleanup_idle(&self, ttl: Duration) {
733 let expired: Vec<(String, Arc<WarmEntry>)> = {
738 let mut map = self.instances.write();
739 let mut out = Vec::new();
740 for (name, pool) in map.iter_mut() {
741 let mut i = 0;
742 while i < pool.len() {
743 let idle = pool[i].last_used.read().elapsed() > ttl;
744 let free = pool[i].busy.try_lock().is_ok();
745 if idle && free {
746 out.push((name.clone(), pool.remove(i)));
747 } else {
748 i += 1;
749 }
750 }
751 }
752 map.retain(|_, pool| !pool.is_empty());
753 out
754 };
755 for (name, entry) in expired {
756 tracing::info!(function = %name, "stopping idle Lambda runtime instance");
757 self.backend.terminate(&entry.instance.handle).await;
758 }
759 }
760}
761
762#[cfg(test)]
763mod tests {
764 use super::deploy_id_from;
765
766 #[test]
773 fn deploy_id_is_url_path_safe() {
774 for i in 0..2_000u32 {
775 let code_sha256 = format!("sha256-seed-{i}-{}", i.wrapping_mul(2_654_435_761));
777 let layers: Vec<Vec<u8>> = if i % 3 == 0 {
778 vec![format!("layer-{i}").into_bytes()]
779 } else {
780 vec![]
781 };
782 let id = deploy_id_from(&code_sha256, &layers);
783 assert!(
784 !id.contains('/') && !id.contains('+') && !id.contains('='),
785 "deploy id {id:?} (seed {i}) is not URL-path-safe"
786 );
787 }
788 }
789
790 #[test]
793 fn deploy_id_is_stable() {
794 let layers = vec![b"layer-a".to_vec(), b"layer-b".to_vec()];
795 let a = deploy_id_from("abc123", &layers);
796 let b = deploy_id_from("abc123", &layers);
797 assert_eq!(a, b);
798 assert_ne!(a, deploy_id_from("abc124", &layers));
799 assert_ne!(a, deploy_id_from("abc123", &[]));
800 }
801
802 use super::LambdaRuntime;
805 use crate::runtime::backend::{BackendHandle, LambdaBackend, RuntimeError, WarmInstance};
806 use crate::state::LambdaFunction;
807 use parking_lot::RwLock;
808 use std::collections::{HashMap, VecDeque};
809 use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
810 use std::sync::Arc;
811 use std::sync::Mutex as StdMutex;
812 use std::time::Duration;
813
814 struct CountingBackend {
818 endpoints: StdMutex<VecDeque<String>>,
819 default_endpoint: String,
820 launches: AtomicUsize,
821 terminates: AtomicUsize,
822 }
823
824 impl CountingBackend {
825 fn new(default_endpoint: impl Into<String>) -> Arc<Self> {
826 Arc::new(Self {
827 endpoints: StdMutex::new(VecDeque::new()),
828 default_endpoint: default_endpoint.into(),
829 launches: AtomicUsize::new(0),
830 terminates: AtomicUsize::new(0),
831 })
832 }
833 fn with_queue(default_endpoint: impl Into<String>, queue: Vec<String>) -> Arc<Self> {
834 Arc::new(Self {
835 endpoints: StdMutex::new(queue.into()),
836 default_endpoint: default_endpoint.into(),
837 launches: AtomicUsize::new(0),
838 terminates: AtomicUsize::new(0),
839 })
840 }
841 }
842
843 #[async_trait::async_trait]
844 impl LambdaBackend for CountingBackend {
845 fn name(&self) -> &str {
846 "test"
847 }
848 async fn launch(
849 &self,
850 _func: &LambdaFunction,
851 _code_zip: Option<&[u8]>,
852 _layers: &[Vec<u8>],
853 _deploy_id: &str,
854 ) -> Result<WarmInstance, RuntimeError> {
855 let n = self.launches.fetch_add(1, SeqCst);
856 let endpoint = self
857 .endpoints
858 .lock()
859 .unwrap()
860 .pop_front()
861 .unwrap_or_else(|| self.default_endpoint.clone());
862 Ok(WarmInstance {
863 endpoint,
864 handle: BackendHandle::Container {
865 id: format!("c{n}"),
866 },
867 })
868 }
869 async fn terminate(&self, _handle: &BackendHandle) {
870 self.terminates.fetch_add(1, SeqCst);
871 }
872 }
873
874 async fn spawn_rie(delay: Duration, peak: Arc<AtomicUsize>) -> String {
880 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
881 let addr = listener.local_addr().unwrap();
882 let cur = Arc::new(AtomicUsize::new(0));
883 tokio::spawn(async move {
884 loop {
885 let Ok((mut sock, _)) = listener.accept().await else {
886 break;
887 };
888 let cur = cur.clone();
889 let peak = peak.clone();
890 tokio::spawn(async move {
891 use tokio::io::{AsyncReadExt, AsyncWriteExt};
892 let mut buf = [0u8; 1024];
898 let n = sock.read(&mut buf).await.unwrap_or(0);
899 if n == 0 {
900 return;
901 }
902 let now = cur.fetch_add(1, SeqCst) + 1;
903 peak.fetch_max(now, SeqCst);
904 tokio::time::sleep(delay).await;
905 let _ = sock
906 .write_all(
907 b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: close\r\n\r\nok",
908 )
909 .await;
910 let _ = sock.flush().await;
911 cur.fetch_sub(1, SeqCst);
912 });
913 }
914 });
915 format!("{addr}")
916 }
917
918 fn runtime_with(backend: Arc<CountingBackend>, max_concurrency: usize) -> Arc<LambdaRuntime> {
919 Arc::new(LambdaRuntime {
920 backend,
921 instances: RwLock::new(HashMap::new()),
922 starting: RwLock::new(HashMap::new()),
923 max_concurrency,
924 })
925 }
926
927 fn test_func(name: &str, sha: &str) -> LambdaFunction {
928 serde_json::from_value(serde_json::json!({
929 "function_name": name,
930 "function_arn": format!("arn:aws:lambda:us-east-1:123456789012:function:{name}"),
931 "runtime": "python3.12",
932 "role": "arn:aws:iam::123456789012:role/r",
933 "handler": "index.handler",
934 "description": "",
935 "timeout": 5,
936 "memory_size": 128,
937 "code_sha256": sha,
938 "code_size": 1,
939 "version": "$LATEST",
940 "last_modified": "2020-01-01T00:00:00Z",
941 "tags": {},
942 "environment": {},
943 "architectures": ["x86_64"],
944 "package_type": "Zip",
945 "code_zip": [1, 2, 3],
946 "policy": null
947 }))
948 .expect("build test LambdaFunction")
949 }
950
951 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
955 async fn concurrent_invokes_are_serialized_on_a_single_instance() {
956 let peak = Arc::new(AtomicUsize::new(0));
957 let endpoint = spawn_rie(Duration::from_millis(40), peak.clone()).await;
958 let backend = CountingBackend::new(endpoint);
959 let rt = runtime_with(backend.clone(), 1);
960 let func = test_func("conc", "sha-A");
961
962 let mut handles = Vec::new();
963 for _ in 0..8 {
964 let rt = rt.clone();
965 let func = func.clone();
966 handles.push(tokio::spawn(
967 async move { rt.invoke(&func, b"{}", &[]).await },
968 ));
969 }
970 for h in handles {
971 h.await.unwrap().expect("invoke ok");
972 }
973
974 assert_eq!(
975 peak.load(SeqCst),
976 1,
977 "concurrent invokes overlapped on a single RIE instance"
978 );
979 assert_eq!(
980 backend.launches.load(SeqCst),
981 1,
982 "max_concurrency=1 must launch exactly one instance"
983 );
984 }
985
986 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
990 async fn pool_scales_under_load_and_respects_cap() {
991 let peak = Arc::new(AtomicUsize::new(0));
992 let endpoint = spawn_rie(Duration::from_millis(60), peak.clone()).await;
993 let backend = CountingBackend::new(endpoint);
994 let rt = runtime_with(backend.clone(), 4);
995 let func = test_func("scale", "sha-A");
996
997 let mut handles = Vec::new();
998 for _ in 0..8 {
999 let rt = rt.clone();
1000 let func = func.clone();
1001 handles.push(tokio::spawn(
1002 async move { rt.invoke(&func, b"{}", &[]).await },
1003 ));
1004 }
1005 for h in handles {
1006 h.await.unwrap().expect("invoke ok");
1007 }
1008
1009 let launched = backend.launches.load(SeqCst);
1010 assert!(
1011 (2..=4).contains(&launched),
1012 "expected the pool to scale within the cap, launched={launched}"
1013 );
1014 assert!(
1015 peak.load(SeqCst) > 1,
1016 "expected concurrent forwards across the scaled pool"
1017 );
1018 }
1019
1020 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1024 async fn dead_instance_is_evicted_and_retried() {
1025 let peak = Arc::new(AtomicUsize::new(0));
1026 let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
1027 let backend = CountingBackend::with_queue(
1031 live,
1032 vec!["127.0.0.1:1".to_string(), "127.0.0.1:1".to_string()],
1033 );
1034 let rt = runtime_with(backend.clone(), 1);
1035
1036 let out = rt
1037 .invoke(&test_func("dead", "sha-A"), b"{}", &[])
1038 .await
1039 .expect("should recover via cold-start retry");
1040 assert_eq!(out, b"ok");
1041 assert_eq!(
1042 backend.launches.load(SeqCst),
1043 3,
1044 "expected two dead instances plus one cold-start replacement"
1045 );
1046 assert!(
1047 backend.terminates.load(SeqCst) >= 2,
1048 "both dead instances should have been terminated on eviction"
1049 );
1050 }
1051
1052 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1058 async fn black_holed_instance_fails_over_fast() {
1059 let peak = Arc::new(AtomicUsize::new(0));
1060 let live = spawn_rie(Duration::from_millis(5), peak.clone()).await;
1061 let backend = CountingBackend::with_queue(live, vec!["192.0.2.1:9".to_string()]);
1064 let rt = runtime_with(backend.clone(), 1);
1065
1066 let func = test_func("blackhole", "sha-A");
1068 let started = std::time::Instant::now();
1069 let out = rt
1070 .invoke(&func, b"{}", &[])
1071 .await
1072 .expect("should recover via cold-start retry");
1073 let elapsed = started.elapsed();
1074
1075 assert_eq!(out, b"ok");
1076 assert_eq!(
1077 backend.launches.load(SeqCst),
1078 2,
1079 "expected one black-holed instance plus one cold-start replacement"
1080 );
1081 assert!(
1082 backend.terminates.load(SeqCst) >= 1,
1083 "the black-holed instance should have been evicted"
1084 );
1085 assert!(
1086 elapsed < Duration::from_secs(5),
1087 "failover took {elapsed:?}; must be far below the ~10s invoke timeout"
1088 );
1089 }
1090
1091 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1094 async fn deploy_change_evicts_stale_instance() {
1095 let peak = Arc::new(AtomicUsize::new(0));
1096 let endpoint = spawn_rie(Duration::from_millis(5), peak.clone()).await;
1097 let backend = CountingBackend::new(endpoint);
1098 let rt = runtime_with(backend.clone(), 2);
1099
1100 rt.invoke(&test_func("upd", "sha-A"), b"{}", &[])
1101 .await
1102 .unwrap();
1103 rt.invoke(&test_func("upd", "sha-B"), b"{}", &[])
1104 .await
1105 .unwrap();
1106
1107 assert_eq!(
1108 backend.launches.load(SeqCst),
1109 2,
1110 "a new deploy id should launch a fresh instance"
1111 );
1112 assert!(
1113 backend.terminates.load(SeqCst) >= 1,
1114 "the stale-deploy instance should have been torn down"
1115 );
1116 let pool_len = rt.instances.read().get("upd").map_or(0, |v| v.len());
1118 assert_eq!(pool_len, 1);
1119 }
1120
1121 #[tokio::test]
1127 async fn take_warm_instances_snapshot_does_not_reap_recreated_pool() {
1128 let backend = CountingBackend::new("127.0.0.1:1");
1129 let rt = runtime_with(backend.clone(), 10);
1130 let mk = |id: &str| {
1131 Arc::new(super::WarmEntry {
1132 instance: WarmInstance {
1133 endpoint: "127.0.0.1:1".to_string(),
1134 handle: BackendHandle::Container { id: id.to_string() },
1135 },
1136 last_used: RwLock::new(std::time::Instant::now()),
1137 deploy_id: "d".to_string(),
1138 busy: Arc::new(tokio::sync::Mutex::new(())),
1139 })
1140 };
1141
1142 rt.instances
1144 .write()
1145 .insert("f".to_string(), vec![mk("old")]);
1146
1147 let snapshot = rt.take_warm_instances("f");
1149 assert_eq!(snapshot.len(), 1);
1150 assert!(rt.instances.read().get("f").is_none());
1151
1152 rt.instances
1155 .write()
1156 .insert("f".to_string(), vec![mk("new")]);
1157
1158 rt.terminate_instances(snapshot).await;
1160 assert_eq!(
1161 backend.terminates.load(SeqCst),
1162 1,
1163 "only the snapshotted instance is terminated"
1164 );
1165
1166 let pool = rt.instances.read();
1168 let f = pool.get("f").expect("recreated function pool must survive");
1169 assert_eq!(f.len(), 1);
1170 }
1171}