mod decommit_queue;
mod index_allocator;
mod memory_pool;
mod table_pool;
#[cfg(feature = "gc")]
mod gc_heap_pool;
#[cfg(all(feature = "async"))]
mod generic_stack_pool;
#[cfg(all(feature = "async", unix, not(miri)))]
mod unix_stack_pool;
#[cfg(all(feature = "async"))]
cfg_if::cfg_if! {
if #[cfg(all(unix, not(miri), not(asan)))] {
use unix_stack_pool as stack_pool;
} else {
use generic_stack_pool as stack_pool;
}
}
use self::decommit_queue::DecommitQueue;
use self::memory_pool::MemoryPool;
use self::table_pool::TablePool;
use super::{
InstanceAllocationRequest, InstanceAllocatorImpl, MemoryAllocationIndex, TableAllocationIndex,
};
use crate::prelude::*;
use crate::runtime::vm::{
instance::Instance,
mpk::{self, MpkEnabled, ProtectionKey, ProtectionMask},
CompiledModuleId, Memory, Table,
};
use anyhow::{bail, Result};
use std::borrow::Cow;
use std::fmt::Display;
use std::sync::{Mutex, MutexGuard};
use std::{
mem,
sync::atomic::{AtomicU64, Ordering},
};
use wasmtime_environ::{
DefinedMemoryIndex, DefinedTableIndex, HostPtr, MemoryPlan, Module, TablePlan, Tunables,
VMOffsets,
};
#[cfg(feature = "gc")]
use super::GcHeapAllocationIndex;
#[cfg(feature = "gc")]
use crate::runtime::vm::{GcHeap, GcRuntime};
#[cfg(feature = "gc")]
use gc_heap_pool::GcHeapPool;
#[cfg(feature = "async")]
use stack_pool::StackPool;
#[cfg(feature = "component-model")]
use wasmtime_environ::{
component::{Component, VMComponentOffsets},
StaticModuleIndex,
};
fn round_up_to_pow2(n: usize, to: usize) -> usize {
debug_assert!(to > 0);
debug_assert!(to.is_power_of_two());
(n + to - 1) & !(to - 1)
}
#[derive(Debug, Copy, Clone)]
pub struct InstanceLimits {
pub total_component_instances: u32,
pub component_instance_size: usize,
pub total_core_instances: u32,
pub max_core_instances_per_component: u32,
pub max_memories_per_component: u32,
pub max_tables_per_component: u32,
pub total_memories: u32,
pub total_tables: u32,
#[cfg(feature = "async")]
pub total_stacks: u32,
pub core_instance_size: usize,
pub max_tables_per_module: u32,
pub table_elements: u32,
pub max_memories_per_module: u32,
pub max_memory_size: usize,
#[cfg(feature = "gc")]
pub total_gc_heaps: u32,
}
impl Default for InstanceLimits {
fn default() -> Self {
Self {
total_component_instances: 1000,
component_instance_size: 1 << 20, total_core_instances: 1000,
max_core_instances_per_component: 20,
max_memories_per_component: 20,
max_tables_per_component: 20,
total_memories: 1000,
total_tables: 1000,
#[cfg(feature = "async")]
total_stacks: 1000,
core_instance_size: 1 << 20, max_tables_per_module: 1,
table_elements: 20_000,
max_memories_per_module: 1,
max_memory_size: 10 * (1 << 20), #[cfg(feature = "gc")]
total_gc_heaps: 1000,
}
}
}
#[derive(Copy, Clone, Debug)]
pub struct PoolingInstanceAllocatorConfig {
pub max_unused_warm_slots: u32,
pub decommit_batch_size: usize,
pub stack_size: usize,
pub limits: InstanceLimits,
pub async_stack_zeroing: bool,
pub async_stack_keep_resident: usize,
pub linear_memory_keep_resident: usize,
pub table_keep_resident: usize,
pub memory_protection_keys: MpkEnabled,
pub max_memory_protection_keys: usize,
}
impl Default for PoolingInstanceAllocatorConfig {
fn default() -> PoolingInstanceAllocatorConfig {
PoolingInstanceAllocatorConfig {
max_unused_warm_slots: 100,
decommit_batch_size: 1,
stack_size: 2 << 20,
limits: InstanceLimits::default(),
async_stack_zeroing: false,
async_stack_keep_resident: 0,
linear_memory_keep_resident: 0,
table_keep_resident: 0,
memory_protection_keys: MpkEnabled::Disable,
max_memory_protection_keys: 16,
}
}
}
#[derive(Debug)]
pub struct PoolConcurrencyLimitError {
limit: usize,
kind: Cow<'static, str>,
}
impl std::error::Error for PoolConcurrencyLimitError {}
impl Display for PoolConcurrencyLimitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let limit = self.limit;
let kind = &self.kind;
write!(f, "maximum concurrent limit of {limit} for {kind} reached")
}
}
impl PoolConcurrencyLimitError {
fn new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self {
Self {
limit,
kind: kind.into(),
}
}
}
#[derive(Debug)]
pub struct PoolingInstanceAllocator {
decommit_batch_size: usize,
limits: InstanceLimits,
live_core_instances: AtomicU64,
live_component_instances: AtomicU64,
decommit_queue: Mutex<DecommitQueue>,
memories: MemoryPool,
tables: TablePool,
#[cfg(feature = "gc")]
gc_heaps: GcHeapPool,
#[cfg(feature = "async")]
stacks: StackPool,
}
#[cfg(debug_assertions)]
impl Drop for PoolingInstanceAllocator {
fn drop(&mut self) {
let queue = self.decommit_queue.lock().unwrap();
self.flush_decommit_queue(queue);
debug_assert_eq!(self.live_component_instances.load(Ordering::Acquire), 0);
debug_assert_eq!(self.live_core_instances.load(Ordering::Acquire), 0);
debug_assert!(self.memories.is_empty());
debug_assert!(self.tables.is_empty());
#[cfg(feature = "gc")]
debug_assert!(self.gc_heaps.is_empty());
#[cfg(feature = "async")]
debug_assert!(self.stacks.is_empty());
}
}
impl PoolingInstanceAllocator {
pub fn new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self> {
Ok(Self {
decommit_batch_size: config.decommit_batch_size,
limits: config.limits,
live_component_instances: AtomicU64::new(0),
live_core_instances: AtomicU64::new(0),
decommit_queue: Mutex::new(DecommitQueue::default()),
memories: MemoryPool::new(config, tunables)?,
tables: TablePool::new(config)?,
#[cfg(feature = "gc")]
gc_heaps: GcHeapPool::new(config)?,
#[cfg(feature = "async")]
stacks: StackPool::new(config)?,
})
}
fn core_instance_size(&self) -> usize {
round_up_to_pow2(self.limits.core_instance_size, mem::align_of::<Instance>())
}
fn validate_table_plans(&self, module: &Module) -> Result<()> {
self.tables.validate(module)
}
fn validate_memory_plans(&self, module: &Module) -> Result<()> {
self.memories.validate(module)
}
fn validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()> {
let layout = Instance::alloc_layout(offsets);
if layout.size() <= self.core_instance_size() {
return Ok(());
}
let mut message = format!(
"instance allocation for this module \
requires {} bytes which exceeds the configured maximum \
of {} bytes; breakdown of allocation requirement:\n\n",
layout.size(),
self.core_instance_size(),
);
let mut remaining = layout.size();
let mut push = |name: &str, bytes: usize| {
assert!(remaining >= bytes);
remaining -= bytes;
if bytes > layout.size() / 20 {
message.push_str(&format!(
" * {:.02}% - {} bytes - {}\n",
((bytes as f32) / (layout.size() as f32)) * 100.0,
bytes,
name,
));
}
};
push("instance state management", mem::size_of::<Instance>());
for (desc, size) in offsets.region_sizes() {
push(desc, size as usize);
}
assert_eq!(remaining, 0);
bail!("{}", message)
}
#[cfg(feature = "component-model")]
fn validate_component_instance_size(
&self,
offsets: &VMComponentOffsets<HostPtr>,
) -> Result<()> {
if usize::try_from(offsets.size_of_vmctx()).unwrap() <= self.limits.component_instance_size
{
return Ok(());
}
bail!(
"instance allocation for this component requires {} bytes of `VMComponentContext` \
space which exceeds the configured maximum of {} bytes",
offsets.size_of_vmctx(),
self.limits.component_instance_size
)
}
fn flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool {
let queue = mem::take(&mut *locked_queue);
drop(locked_queue);
queue.flush(self)
}
fn with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T> {
f().or_else(|e| {
if e.is::<PoolConcurrencyLimitError>() {
let queue = self.decommit_queue.lock().unwrap();
if self.flush_decommit_queue(queue) {
return f();
}
}
Err(e)
})
}
fn merge_or_flush(&self, mut local_queue: DecommitQueue) {
match local_queue.raw_len() {
0 => {
local_queue.flush(self);
}
n if n >= self.decommit_batch_size => {
local_queue.flush(self);
}
n => {
debug_assert!(n < self.decommit_batch_size);
let mut shared_queue = self.decommit_queue.lock().unwrap();
shared_queue.append(&mut local_queue);
if shared_queue.raw_len() >= self.decommit_batch_size {
self.flush_decommit_queue(shared_queue);
}
}
}
}
}
unsafe impl InstanceAllocatorImpl for PoolingInstanceAllocator {
#[cfg(feature = "component-model")]
fn validate_component_impl<'a>(
&self,
component: &Component,
offsets: &VMComponentOffsets<HostPtr>,
get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module,
) -> Result<()> {
self.validate_component_instance_size(offsets)?;
let mut num_core_instances = 0;
let mut num_memories = 0;
let mut num_tables = 0;
for init in &component.initializers {
use wasmtime_environ::component::GlobalInitializer::*;
use wasmtime_environ::component::InstantiateModule;
match init {
InstantiateModule(InstantiateModule::Import(_, _)) => {
num_core_instances += 1;
}
InstantiateModule(InstantiateModule::Static(static_module_index, _)) => {
let module = get_module(*static_module_index);
let offsets = VMOffsets::new(HostPtr, &module);
self.validate_module_impl(module, &offsets)?;
num_core_instances += 1;
num_memories += module.memory_plans.len() - module.num_imported_memories;
num_tables += module.table_plans.len() - module.num_imported_tables;
}
LowerImport { .. }
| ExtractMemory(_)
| ExtractRealloc(_)
| ExtractPostReturn(_)
| Resource(_) => {}
}
}
if num_core_instances
> usize::try_from(self.limits.max_core_instances_per_component).unwrap()
{
bail!(
"The component transitively contains {num_core_instances} core module instances, \
which exceeds the configured maximum of {}",
self.limits.max_core_instances_per_component
);
}
if num_memories > usize::try_from(self.limits.max_memories_per_component).unwrap() {
bail!(
"The component transitively contains {num_memories} Wasm linear memories, which \
exceeds the configured maximum of {}",
self.limits.max_memories_per_component
);
}
if num_tables > usize::try_from(self.limits.max_tables_per_component).unwrap() {
bail!(
"The component transitively contains {num_tables} tables, which exceeds the \
configured maximum of {}",
self.limits.max_tables_per_component
);
}
Ok(())
}
fn validate_module_impl(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()> {
self.validate_memory_plans(module)?;
self.validate_table_plans(module)?;
self.validate_core_instance_size(offsets)?;
Ok(())
}
fn increment_component_instance_count(&self) -> Result<()> {
let old_count = self.live_component_instances.fetch_add(1, Ordering::AcqRel);
if old_count >= u64::from(self.limits.total_component_instances) {
self.decrement_component_instance_count();
return Err(PoolConcurrencyLimitError::new(
usize::try_from(self.limits.total_component_instances).unwrap(),
"component instances",
)
.into());
}
Ok(())
}
fn decrement_component_instance_count(&self) {
self.live_component_instances.fetch_sub(1, Ordering::AcqRel);
}
fn increment_core_instance_count(&self) -> Result<()> {
let old_count = self.live_core_instances.fetch_add(1, Ordering::AcqRel);
if old_count >= u64::from(self.limits.total_core_instances) {
self.decrement_core_instance_count();
return Err(PoolConcurrencyLimitError::new(
usize::try_from(self.limits.total_core_instances).unwrap(),
"core instances",
)
.into());
}
Ok(())
}
fn decrement_core_instance_count(&self) {
self.live_core_instances.fetch_sub(1, Ordering::AcqRel);
}
unsafe fn allocate_memory(
&self,
request: &mut InstanceAllocationRequest,
memory_plan: &MemoryPlan,
memory_index: DefinedMemoryIndex,
) -> Result<(MemoryAllocationIndex, Memory)> {
self.with_flush_and_retry(|| self.memories.allocate(request, memory_plan, memory_index))
}
unsafe fn deallocate_memory(
&self,
_memory_index: DefinedMemoryIndex,
allocation_index: MemoryAllocationIndex,
memory: Memory,
) {
let mut image = memory.unwrap_static_image();
let mut queue = DecommitQueue::default();
image
.clear_and_remain_ready(self.memories.keep_resident, |ptr, len| {
queue.push_raw(ptr, len);
})
.expect("failed to reset memory image");
queue.push_memory(allocation_index, image);
self.merge_or_flush(queue);
}
unsafe fn allocate_table(
&self,
request: &mut InstanceAllocationRequest,
table_plan: &TablePlan,
_table_index: DefinedTableIndex,
) -> Result<(super::TableAllocationIndex, Table)> {
self.with_flush_and_retry(|| self.tables.allocate(request, table_plan))
}
unsafe fn deallocate_table(
&self,
_table_index: DefinedTableIndex,
allocation_index: TableAllocationIndex,
mut table: Table,
) {
let mut queue = DecommitQueue::default();
self.tables
.reset_table_pages_to_zero(allocation_index, &mut table, |ptr, len| {
queue.push_raw(ptr, len);
});
queue.push_table(allocation_index, table);
self.merge_or_flush(queue);
}
#[cfg(feature = "async")]
fn allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack> {
self.with_flush_and_retry(|| self.stacks.allocate())
}
#[cfg(feature = "async")]
unsafe fn deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack) {
let mut queue = DecommitQueue::default();
self.stacks
.zero_stack(&mut stack, |ptr, len| queue.push_raw(ptr, len));
queue.push_stack(stack);
self.merge_or_flush(queue);
}
fn purge_module(&self, module: CompiledModuleId) {
self.memories.purge_module(module);
}
fn next_available_pkey(&self) -> Option<ProtectionKey> {
self.memories.next_available_pkey()
}
fn restrict_to_pkey(&self, pkey: ProtectionKey) {
mpk::allow(ProtectionMask::zero().or(pkey));
}
fn allow_all_pkeys(&self) {
mpk::allow(ProtectionMask::all());
}
#[cfg(feature = "gc")]
fn allocate_gc_heap(
&self,
gc_runtime: &dyn GcRuntime,
) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)> {
self.gc_heaps.allocate(gc_runtime)
}
#[cfg(feature = "gc")]
fn deallocate_gc_heap(
&self,
allocation_index: GcHeapAllocationIndex,
gc_heap: Box<dyn GcHeap>,
) {
self.gc_heaps.deallocate(allocation_index, gc_heap);
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_pooling_allocator_with_memory_pages_exceeded() {
let config = PoolingInstanceAllocatorConfig {
limits: InstanceLimits {
total_memories: 1,
max_memory_size: 0x100010000,
..Default::default()
},
..PoolingInstanceAllocatorConfig::default()
};
assert_eq!(
PoolingInstanceAllocator::new(
&config,
&Tunables {
static_memory_reservation: 0x10000,
..Tunables::default_host()
},
)
.map_err(|e| e.to_string())
.expect_err("expected a failure constructing instance allocator"),
"maximum memory size of 0x100010000 bytes exceeds the configured \
static memory reservation of 0x10000 bytes"
);
}
#[cfg(all(unix, target_pointer_width = "64", feature = "async", not(miri)))]
#[test]
fn test_stack_zeroed() -> Result<()> {
let config = PoolingInstanceAllocatorConfig {
max_unused_warm_slots: 0,
limits: InstanceLimits {
total_stacks: 1,
total_memories: 0,
total_tables: 0,
..Default::default()
},
stack_size: 128,
async_stack_zeroing: true,
..PoolingInstanceAllocatorConfig::default()
};
let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
unsafe {
for _ in 0..255 {
let stack = allocator.allocate_fiber_stack()?;
let addr = stack.top().unwrap().sub(1);
assert_eq!(*addr, 0);
*addr = 1;
allocator.deallocate_fiber_stack(stack);
}
}
Ok(())
}
#[cfg(all(unix, target_pointer_width = "64", feature = "async", not(miri)))]
#[test]
fn test_stack_unzeroed() -> Result<()> {
let config = PoolingInstanceAllocatorConfig {
max_unused_warm_slots: 0,
limits: InstanceLimits {
total_stacks: 1,
total_memories: 0,
total_tables: 0,
..Default::default()
},
stack_size: 128,
async_stack_zeroing: false,
..PoolingInstanceAllocatorConfig::default()
};
let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
unsafe {
for i in 0..255 {
let stack = allocator.allocate_fiber_stack()?;
let addr = stack.top().unwrap().sub(1);
assert_eq!(*addr, i);
*addr = i + 1;
allocator.deallocate_fiber_stack(stack);
}
}
Ok(())
}
}