use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use wasmtime::Store;
use wasmtime::component::{Instance, InstancePre};
use super::host_state::HostState;
use crate::error::{CapsuleError, CapsuleResult};
const EVICT_INTERVAL: Duration = Duration::from_secs(30);
pub(super) struct PooledInstance {
pub(super) store: Store<HostState>,
pub(super) instance: Instance,
}
pub(super) struct InstanceBuilder {
engine: wasmtime::Engine,
instance_pre: InstancePre<HostState>,
make_state: Arc<dyn Fn() -> HostState + Send + Sync>,
epoch_deadline: u64,
fuel_budget: u64,
}
impl InstanceBuilder {
pub(super) fn new(
engine: wasmtime::Engine,
instance_pre: InstancePre<HostState>,
make_state: Arc<dyn Fn() -> HostState + Send + Sync>,
epoch_deadline: u64,
fuel_budget: u64,
) -> Self {
Self {
engine,
instance_pre,
make_state,
epoch_deadline,
fuel_budget,
}
}
pub(super) async fn build(&self) -> CapsuleResult<PooledInstance> {
let mut store = Store::new(&self.engine, (self.make_state)());
store.limiter(|state| &mut state.store_meter);
store.set_epoch_deadline(self.epoch_deadline);
store.set_fuel(self.fuel_budget).map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!("Failed to seed store fuel: {e}"))
})?;
let instance = self
.instance_pre
.instantiate_async(&mut store)
.await
.map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to instantiate WASM component: {e}"
))
})?;
Ok(PooledInstance { store, instance })
}
}
pub(super) struct CapsuleInstancePool {
available: Arc<Mutex<VecDeque<PooledInstance>>>,
permits: Arc<Semaphore>,
reset_resources_on_return: bool,
builder: Arc<InstanceBuilder>,
allow_grow: bool,
evict_task: Option<JoinHandle<()>>,
}
impl CapsuleInstancePool {
pub(super) fn new(
initial: Vec<PooledInstance>,
max: usize,
min_idle: usize,
reset_resources_on_return: bool,
builder: InstanceBuilder,
cancel_token: &CancellationToken,
) -> Self {
debug_assert!(max >= 1, "pool max must be >= 1");
debug_assert!(min_idle >= 1 && min_idle <= max, "1 <= min_idle <= max");
debug_assert!(initial.len() <= max, "warm-start cannot exceed max");
let available = Arc::new(Mutex::new(VecDeque::from(initial)));
let allow_grow = max > min_idle;
let evict_task = allow_grow.then(|| {
let available = Arc::clone(&available);
let cancel = cancel_token.clone();
tokio::spawn(async move { evict_loop(available, min_idle, cancel).await })
});
Self {
available,
permits: Arc::new(Semaphore::new(max)),
reset_resources_on_return,
builder: Arc::new(builder),
allow_grow,
evict_task,
}
}
pub(super) async fn checkout(&self) -> Option<PoolCheckout> {
let permit = Arc::clone(&self.permits).acquire_owned().await.ok()?;
let warm = self
.available
.lock()
.expect("instance pool mutex poisoned")
.pop_back();
let pooled = match warm {
Some(pooled) => pooled,
None => {
if !self.allow_grow {
return None;
}
match self.builder.build().await {
Ok(pooled) => pooled,
Err(e) => {
tracing::error!(error = %e, "failed to grow capsule instance pool");
return None;
},
}
},
};
Some(PoolCheckout {
pooled: Some(pooled),
available: Arc::clone(&self.available),
reset_resources_on_return: self.reset_resources_on_return,
_permit: permit,
})
}
}
impl Drop for CapsuleInstancePool {
fn drop(&mut self) {
if let Some(task) = self.evict_task.take() {
task.abort();
}
}
}
async fn evict_loop(
available: Arc<Mutex<VecDeque<PooledInstance>>>,
min_idle: usize,
cancel: CancellationToken,
) {
loop {
tokio::select! {
biased;
() = cancel.cancelled() => return,
() = tokio::time::sleep(EVICT_INTERVAL) => {
let evicted = {
let mut q = available.lock().expect("instance pool mutex poisoned");
drain_excess(&mut q, min_idle)
};
if !evicted.is_empty() {
tracing::debug!(
evicted = evicted.len(),
min_idle,
"evicted idle pool instances"
);
}
drop(evicted);
}
}
}
}
fn drain_excess<T>(queue: &mut VecDeque<T>, min_idle: usize) -> Vec<T> {
let mut evicted = Vec::new();
while queue.len() > min_idle {
match queue.pop_front() {
Some(item) => evicted.push(item),
None => break,
}
}
evicted
}
pub(super) struct PoolCheckout {
pooled: Option<PooledInstance>,
available: Arc<Mutex<VecDeque<PooledInstance>>>,
reset_resources_on_return: bool,
_permit: OwnedSemaphorePermit,
}
impl PoolCheckout {
pub(super) fn instance(&self) -> Instance {
self.pooled.as_ref().expect("active checkout").instance
}
pub(super) fn store_mut(&mut self) -> &mut Store<HostState> {
&mut self.pooled.as_mut().expect("active checkout").store
}
}
impl Drop for PoolCheckout {
fn drop(&mut self) {
if let Some(mut pooled) = self.pooled.take() {
clear_on_return(pooled.store.data_mut(), self.reset_resources_on_return);
self.available
.lock()
.expect("instance pool mutex poisoned")
.push_back(pooled);
}
}
}
fn clear_on_return(state: &mut HostState, reset_resources: bool) {
state.caller_context = None;
state.interceptor_active = false;
state.invocation_kv = None;
state.invocation_home = None;
state.invocation_tmp = None;
state.invocation_secret_store = None;
state.invocation_capsule_log = None;
state.invocation_profile = None;
state.invocation_env_overlay = None;
if reset_resources {
state.resource_table = wasmtime::component::ResourceTable::new();
state.active_http_streams.clear();
state.net_stream_count = 0;
state.subscription_count = 0;
state.process_count_total = 0;
state.process_count_by_principal.clear();
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use wasmtime::component::Resource;
use super::super::test_fixtures::minimal_host_state;
use super::*;
struct DropFlag(Arc<AtomicBool>);
impl Drop for DropFlag {
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
}
}
#[test]
fn clear_on_return_resets_orphaned_resources() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.expect("runtime");
let mut state = minimal_host_state(rt.handle().clone());
let dropped = Arc::new(AtomicBool::new(false));
let res = state
.resource_table
.push(DropFlag(Arc::clone(&dropped)))
.expect("push test resource");
state.net_stream_count = 1;
state.subscription_count = 2;
state.process_count_total = 1;
state
.process_count_by_principal
.insert(astrid_core::PrincipalId::default(), 1);
state.interceptor_active = true;
clear_on_return(&mut state, true);
assert!(
dropped.load(Ordering::SeqCst),
"orphaned resource must be dropped on return"
);
assert!(
state
.resource_table
.get::<DropFlag>(&Resource::<DropFlag>::new_borrow(res.rep()))
.is_err(),
"returned instance must observe an empty resource table"
);
assert_eq!(state.net_stream_count, 0);
assert_eq!(state.subscription_count, 0);
assert_eq!(state.process_count_total, 0);
assert!(state.process_count_by_principal.is_empty());
assert!(!state.interceptor_active);
}
#[test]
fn clear_on_return_preserves_resources_for_carveout() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.expect("runtime");
let mut state = minimal_host_state(rt.handle().clone());
let dropped = Arc::new(AtomicBool::new(false));
let res = state
.resource_table
.push(DropFlag(Arc::clone(&dropped)))
.expect("push test resource");
state.process_count_total = 1;
state.interceptor_active = true;
clear_on_return(&mut state, false);
assert!(
!dropped.load(Ordering::SeqCst),
"carve-out must not drop cross-invocation resources"
);
assert!(
state
.resource_table
.get::<DropFlag>(&Resource::<DropFlag>::new_borrow(res.rep()))
.is_ok(),
"carve-out resource table must persist across return"
);
assert_eq!(state.process_count_total, 1);
assert!(!state.interceptor_active);
}
#[test]
fn drain_excess_trims_to_min_idle_from_the_front() {
let mut q: VecDeque<i32> = (0..5).collect();
let evicted = drain_excess(&mut q, 2);
assert_eq!(
evicted,
vec![0, 1, 2],
"evict oldest-returned off the front"
);
assert_eq!(q.into_iter().collect::<Vec<_>>(), vec![3, 4]);
let mut q: VecDeque<i32> = (0..2).collect();
assert!(drain_excess(&mut q, 2).is_empty());
assert_eq!(q.len(), 2);
let mut q: VecDeque<i32> = (0..1).collect();
assert!(drain_excess(&mut q, 2).is_empty());
assert_eq!(q.len(), 1);
let mut q: VecDeque<i32> = (0..3).collect();
assert_eq!(drain_excess(&mut q, 0).len(), 3);
assert!(q.is_empty());
}
async fn empty_pool(
max: usize,
min_idle: usize,
cancel: &CancellationToken,
) -> CapsuleInstancePool {
let engine = super::super::build_wasmtime_engine().expect("engine");
let component =
wasmtime::component::Component::new(&engine, "(component)").expect("empty component");
let linker: wasmtime::component::Linker<HostState> =
wasmtime::component::Linker::new(&engine);
let instance_pre = linker.instantiate_pre(&component).expect("instantiate_pre");
let handle = tokio::runtime::Handle::current();
let make_state: Arc<dyn Fn() -> HostState + Send + Sync> =
Arc::new(move || minimal_host_state(handle.clone()));
let builder = InstanceBuilder::new(engine, instance_pre, make_state, u64::MAX, 1_000_000);
let mut initial = Vec::with_capacity(min_idle);
for _ in 0..min_idle {
initial.push(builder.build().await.expect("warm-start build"));
}
CapsuleInstancePool::new(initial, max, min_idle, true, builder, cancel)
}
#[tokio::test(flavor = "multi_thread")]
async fn checkout_grows_lazily_then_bounds_at_max() {
let cancel = CancellationToken::new();
let pool = empty_pool(4, 2, &cancel).await;
let c1 = pool.checkout().await.expect("warm 1");
let c2 = pool.checkout().await.expect("warm 2");
let c3 = pool.checkout().await.expect("lazy grow 3");
let c4 = pool.checkout().await.expect("lazy grow 4");
let blocked = tokio::time::timeout(Duration::from_millis(100), pool.checkout()).await;
assert!(
blocked.is_err(),
"checkout must block once max are in flight"
);
drop(c4);
let c5 = tokio::time::timeout(Duration::from_millis(1000), pool.checkout())
.await
.expect("a returned instance must unblock the waiter")
.expect("checkout after return");
drop((c1, c2, c3, c5));
cancel.cancel();
}
#[tokio::test(flavor = "multi_thread")]
async fn carveout_pool_never_grows() {
let cancel = CancellationToken::new();
let pool = empty_pool(1, 1, &cancel).await;
assert!(!pool.allow_grow, "size-1 pool must not be growable");
assert!(
pool.evict_task.is_none(),
"non-growable pool spawns no evictor"
);
let c1 = pool.checkout().await.expect("the one instance");
let blocked = tokio::time::timeout(Duration::from_millis(100), pool.checkout()).await;
assert!(blocked.is_err(), "carve-out serialises: no second Store");
drop(c1);
let c2 = tokio::time::timeout(Duration::from_millis(1000), pool.checkout())
.await
.expect("unblocks on return")
.expect("same instance again");
drop(c2);
cancel.cancel();
}
}