mod decommit_queue;
mod index_allocator;
mod memory_pool;
mod metrics;
mod table_pool;
#[cfg(feature = "gc")]
mod gc_heap_pool;
#[cfg(all(feature = "async"))]
mod generic_stack_pool;
#[cfg(all(feature = "async", unix, not(miri)))]
mod unix_stack_pool;
#[cfg(all(feature = "async"))]
cfg_if::cfg_if! {
if #[cfg(all(unix, not(miri), not(asan)))] {
use unix_stack_pool as stack_pool;
} else {
use generic_stack_pool as stack_pool;
}
}
use self::decommit_queue::DecommitQueue;
use self::memory_pool::MemoryPool;
use self::table_pool::TablePool;
use super::{
InstanceAllocationRequest, InstanceAllocator, MemoryAllocationIndex, TableAllocationIndex,
};
use crate::Enabled;
use crate::prelude::*;
use crate::runtime::vm::{
CompiledModuleId, Memory, Table,
instance::Instance,
mpk::{self, ProtectionKey, ProtectionMask},
sys::vm::PageMap,
};
use core::future::Future;
use core::pin::Pin;
use core::sync::atomic::AtomicUsize;
use std::borrow::Cow;
use std::fmt::Display;
use std::sync::{Mutex, MutexGuard};
use std::{
mem,
sync::atomic::{AtomicU64, Ordering},
};
use wasmtime_environ::{
DefinedMemoryIndex, DefinedTableIndex, HostPtr, Module, Tunables, VMOffsets,
};
pub use self::metrics::PoolingAllocatorMetrics;
#[cfg(feature = "gc")]
use super::GcHeapAllocationIndex;
#[cfg(feature = "gc")]
use crate::runtime::vm::{GcHeap, GcRuntime};
#[cfg(feature = "gc")]
use gc_heap_pool::GcHeapPool;
#[cfg(feature = "async")]
use stack_pool::StackPool;
#[cfg(feature = "component-model")]
use wasmtime_environ::{
StaticModuleIndex,
component::{Component, VMComponentOffsets},
};
fn round_up_to_pow2(n: usize, to: usize) -> usize {
debug_assert!(to > 0);
debug_assert!(to.is_power_of_two());
(n + to - 1) & !(to - 1)
}
#[derive(Debug, Copy, Clone)]
pub struct InstanceLimits {
pub total_component_instances: u32,
pub component_instance_size: usize,
pub total_core_instances: u32,
pub max_core_instances_per_component: u32,
pub max_memories_per_component: u32,
pub max_tables_per_component: u32,
pub total_memories: u32,
pub total_tables: u32,
#[cfg(feature = "async")]
pub total_stacks: u32,
pub core_instance_size: usize,
pub max_tables_per_module: u32,
pub table_elements: usize,
pub max_memories_per_module: u32,
pub max_memory_size: usize,
#[cfg(feature = "gc")]
pub total_gc_heaps: u32,
}
impl Default for InstanceLimits {
fn default() -> Self {
let total = if cfg!(target_pointer_width = "32") {
100
} else {
1000
};
Self {
total_component_instances: total,
component_instance_size: 1 << 20, total_core_instances: total,
max_core_instances_per_component: u32::MAX,
max_memories_per_component: u32::MAX,
max_tables_per_component: u32::MAX,
total_memories: total,
total_tables: total,
#[cfg(feature = "async")]
total_stacks: total,
core_instance_size: 1 << 20, max_tables_per_module: 1,
table_elements: 20_000,
max_memories_per_module: 1,
#[cfg(target_pointer_width = "64")]
max_memory_size: 1 << 32, #[cfg(target_pointer_width = "32")]
max_memory_size: 10 << 20, #[cfg(feature = "gc")]
total_gc_heaps: total,
}
}
}
#[derive(Copy, Clone, Debug)]
pub struct PoolingInstanceAllocatorConfig {
pub max_unused_warm_slots: u32,
pub decommit_batch_size: usize,
pub stack_size: usize,
pub limits: InstanceLimits,
pub async_stack_zeroing: bool,
#[cfg(feature = "async")]
pub async_stack_keep_resident: usize,
pub linear_memory_keep_resident: usize,
pub table_keep_resident: usize,
pub memory_protection_keys: Enabled,
pub max_memory_protection_keys: usize,
pub pagemap_scan: Enabled,
}
impl Default for PoolingInstanceAllocatorConfig {
fn default() -> PoolingInstanceAllocatorConfig {
PoolingInstanceAllocatorConfig {
max_unused_warm_slots: 100,
decommit_batch_size: 1,
stack_size: 2 << 20,
limits: InstanceLimits::default(),
async_stack_zeroing: false,
#[cfg(feature = "async")]
async_stack_keep_resident: 0,
linear_memory_keep_resident: 0,
table_keep_resident: 0,
memory_protection_keys: Enabled::No,
max_memory_protection_keys: 16,
pagemap_scan: Enabled::No,
}
}
}
impl PoolingInstanceAllocatorConfig {
pub fn is_pagemap_scan_available() -> bool {
PageMap::new().is_some()
}
}
#[derive(Debug)]
pub struct PoolConcurrencyLimitError {
limit: usize,
kind: Cow<'static, str>,
}
impl core::error::Error for PoolConcurrencyLimitError {}
impl Display for PoolConcurrencyLimitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let limit = self.limit;
let kind = &self.kind;
write!(f, "maximum concurrent limit of {limit} for {kind} reached")
}
}
impl PoolConcurrencyLimitError {
fn new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self {
Self {
limit,
kind: kind.into(),
}
}
}
#[derive(Debug)]
pub struct PoolingInstanceAllocator {
decommit_batch_size: usize,
limits: InstanceLimits,
live_core_instances: AtomicU64,
live_component_instances: AtomicU64,
decommit_queue: Mutex<DecommitQueue>,
memories: MemoryPool,
live_memories: AtomicUsize,
tables: TablePool,
live_tables: AtomicUsize,
#[cfg(feature = "gc")]
gc_heaps: GcHeapPool,
#[cfg(feature = "gc")]
live_gc_heaps: AtomicUsize,
#[cfg(feature = "async")]
stacks: StackPool,
#[cfg(feature = "async")]
live_stacks: AtomicUsize,
pagemap: Option<PageMap>,
}
impl Drop for PoolingInstanceAllocator {
fn drop(&mut self) {
if !cfg!(debug_assertions) {
return;
}
let queue = self.decommit_queue.lock().unwrap();
self.flush_decommit_queue(queue);
debug_assert_eq!(self.live_component_instances.load(Ordering::Acquire), 0);
debug_assert_eq!(self.live_core_instances.load(Ordering::Acquire), 0);
debug_assert_eq!(self.live_memories.load(Ordering::Acquire), 0);
debug_assert_eq!(self.live_tables.load(Ordering::Acquire), 0);
debug_assert!(self.memories.is_empty());
debug_assert!(self.tables.is_empty());
#[cfg(feature = "gc")]
{
debug_assert!(self.gc_heaps.is_empty());
debug_assert_eq!(self.live_gc_heaps.load(Ordering::Acquire), 0);
}
#[cfg(feature = "async")]
{
debug_assert!(self.stacks.is_empty());
debug_assert_eq!(self.live_stacks.load(Ordering::Acquire), 0);
}
}
}
impl PoolingInstanceAllocator {
pub fn new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self> {
Ok(Self {
decommit_batch_size: config.decommit_batch_size,
limits: config.limits,
live_component_instances: AtomicU64::new(0),
live_core_instances: AtomicU64::new(0),
decommit_queue: Mutex::new(DecommitQueue::default()),
memories: MemoryPool::new(config, tunables)?,
live_memories: AtomicUsize::new(0),
tables: TablePool::new(config)?,
live_tables: AtomicUsize::new(0),
#[cfg(feature = "gc")]
gc_heaps: GcHeapPool::new(config)?,
#[cfg(feature = "gc")]
live_gc_heaps: AtomicUsize::new(0),
#[cfg(feature = "async")]
stacks: StackPool::new(config)?,
#[cfg(feature = "async")]
live_stacks: AtomicUsize::new(0),
pagemap: match config.pagemap_scan {
Enabled::Auto => PageMap::new(),
Enabled::Yes => Some(PageMap::new().ok_or_else(|| {
format_err!(
"required to enable PAGEMAP_SCAN but this system \
does not support it"
)
})?),
Enabled::No => None,
},
})
}
fn core_instance_size(&self) -> usize {
round_up_to_pow2(self.limits.core_instance_size, mem::align_of::<Instance>())
}
fn validate_table_plans(&self, module: &Module) -> Result<()> {
self.tables.validate(module)
}
fn validate_memory_plans(&self, module: &Module) -> Result<()> {
self.memories.validate_memories(module)
}
fn validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()> {
let layout = Instance::alloc_layout(offsets);
if layout.size() <= self.core_instance_size() {
return Ok(());
}
let mut message = format!(
"instance allocation for this module \
requires {} bytes which exceeds the configured maximum \
of {} bytes; breakdown of allocation requirement:\n\n",
layout.size(),
self.core_instance_size(),
);
let mut remaining = layout.size();
let mut push = |name: &str, bytes: usize| {
assert!(remaining >= bytes);
remaining -= bytes;
if bytes > layout.size() / 20 {
message.push_str(&format!(
" * {:.02}% - {} bytes - {}\n",
((bytes as f32) / (layout.size() as f32)) * 100.0,
bytes,
name,
));
}
};
push("instance state management", mem::size_of::<Instance>());
for (desc, size) in offsets.region_sizes() {
push(desc, size as usize);
}
assert_eq!(remaining, 0);
bail!("{message}")
}
#[cfg(feature = "component-model")]
fn validate_component_instance_size(
&self,
offsets: &VMComponentOffsets<HostPtr>,
core_instances_aggregate_size: usize,
) -> Result<()> {
let vmcomponentctx_size = usize::try_from(offsets.size_of_vmctx()).unwrap();
let total_instance_size = core_instances_aggregate_size.saturating_add(vmcomponentctx_size);
if total_instance_size <= self.limits.component_instance_size {
return Ok(());
}
bail!(
"instance allocation for this component requires {total_instance_size} bytes of `VMComponentContext` \
and aggregated core instance runtime space which exceeds the configured maximum of {} bytes. \
`VMComponentContext` used {vmcomponentctx_size} bytes, `core module instances` used \
{core_instances_aggregate_size} bytes.",
self.limits.component_instance_size
)
}
fn flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool {
let queue = mem::take(&mut *locked_queue);
drop(locked_queue);
queue.flush(self)
}
#[cfg(feature = "async")]
fn with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T> {
f().or_else(|e| {
if e.is::<PoolConcurrencyLimitError>() {
let queue = self.decommit_queue.lock().unwrap();
if self.flush_decommit_queue(queue) {
return f();
}
}
Err(e)
})
}
fn merge_or_flush(&self, mut local_queue: DecommitQueue) {
match local_queue.raw_len() {
0 => {
local_queue.flush(self);
}
n if n >= self.decommit_batch_size => {
local_queue.flush(self);
}
n => {
debug_assert!(n < self.decommit_batch_size);
let mut shared_queue = self.decommit_queue.lock().unwrap();
shared_queue.append(&mut local_queue);
if shared_queue.raw_len() >= self.decommit_batch_size {
self.flush_decommit_queue(shared_queue);
}
}
}
}
}
unsafe impl InstanceAllocator for PoolingInstanceAllocator {
#[cfg(feature = "component-model")]
fn validate_component<'a>(
&self,
component: &Component,
offsets: &VMComponentOffsets<HostPtr>,
get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module,
) -> Result<()> {
let mut num_core_instances = 0;
let mut num_memories = 0;
let mut num_tables = 0;
let mut core_instances_aggregate_size: usize = 0;
for init in &component.initializers {
use wasmtime_environ::component::GlobalInitializer::*;
use wasmtime_environ::component::InstantiateModule;
match init {
InstantiateModule(InstantiateModule::Import(_, _), _) => {
num_core_instances += 1;
}
InstantiateModule(InstantiateModule::Static(static_module_index, _), _) => {
let module = get_module(*static_module_index);
let offsets = VMOffsets::new(HostPtr, &module);
let layout = Instance::alloc_layout(&offsets);
self.validate_module(module, &offsets)?;
num_core_instances += 1;
num_memories += module.num_defined_memories();
num_tables += module.num_defined_tables();
core_instances_aggregate_size += layout.size();
}
LowerImport { .. }
| ExtractMemory(_)
| ExtractTable(_)
| ExtractRealloc(_)
| ExtractCallback(_)
| ExtractPostReturn(_)
| Resource(_) => {}
}
}
if num_core_instances
> usize::try_from(self.limits.max_core_instances_per_component).unwrap()
{
bail!(
"The component transitively contains {num_core_instances} core module instances, \
which exceeds the configured maximum of {} in the pooling allocator",
self.limits.max_core_instances_per_component
);
}
if num_memories > usize::try_from(self.limits.max_memories_per_component).unwrap() {
bail!(
"The component transitively contains {num_memories} Wasm linear memories, which \
exceeds the configured maximum of {} in the pooling allocator",
self.limits.max_memories_per_component
);
}
if num_tables > usize::try_from(self.limits.max_tables_per_component).unwrap() {
bail!(
"The component transitively contains {num_tables} tables, which exceeds the \
configured maximum of {} in the pooling allocator",
self.limits.max_tables_per_component
);
}
self.validate_component_instance_size(offsets, core_instances_aggregate_size)
.context("component instance size does not fit in pooling allocator requirements")?;
Ok(())
}
fn validate_module(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()> {
self.validate_memory_plans(module)
.context("module memory does not fit in pooling allocator requirements")?;
self.validate_table_plans(module)
.context("module table does not fit in pooling allocator requirements")?;
self.validate_core_instance_size(offsets)
.context("module instance size does not fit in pooling allocator requirements")?;
Ok(())
}
#[cfg(feature = "gc")]
fn validate_memory(&self, memory: &wasmtime_environ::Memory) -> Result<()> {
self.memories.validate_memory(memory)
}
#[cfg(feature = "component-model")]
fn increment_component_instance_count(&self) -> Result<()> {
let old_count = self.live_component_instances.fetch_add(1, Ordering::AcqRel);
if old_count >= u64::from(self.limits.total_component_instances) {
self.decrement_component_instance_count();
return Err(PoolConcurrencyLimitError::new(
usize::try_from(self.limits.total_component_instances).unwrap(),
"component instances",
)
.into());
}
Ok(())
}
#[cfg(feature = "component-model")]
fn decrement_component_instance_count(&self) {
self.live_component_instances.fetch_sub(1, Ordering::AcqRel);
}
fn increment_core_instance_count(&self) -> Result<()> {
let old_count = self.live_core_instances.fetch_add(1, Ordering::AcqRel);
if old_count >= u64::from(self.limits.total_core_instances) {
self.decrement_core_instance_count();
return Err(PoolConcurrencyLimitError::new(
usize::try_from(self.limits.total_core_instances).unwrap(),
"core instances",
)
.into());
}
Ok(())
}
fn decrement_core_instance_count(&self) {
self.live_core_instances.fetch_sub(1, Ordering::AcqRel);
}
fn allocate_memory<'a, 'b: 'a, 'c: 'a>(
&'a self,
request: &'a mut InstanceAllocationRequest<'b, 'c>,
ty: &'a wasmtime_environ::Memory,
memory_index: Option<DefinedMemoryIndex>,
) -> Pin<Box<dyn Future<Output = Result<(MemoryAllocationIndex, Memory)>> + Send + 'a>> {
crate::runtime::box_future(async move {
async {
let e = match self.memories.allocate(request, ty, memory_index).await {
Ok(result) => return Ok(result),
Err(e) => e,
};
if e.is::<PoolConcurrencyLimitError>() {
let queue = self.decommit_queue.lock().unwrap();
if self.flush_decommit_queue(queue) {
return self.memories.allocate(request, ty, memory_index).await;
}
}
Err(e)
}
.await
.inspect(|_| {
self.live_memories.fetch_add(1, Ordering::Relaxed);
})
})
}
unsafe fn deallocate_memory(
&self,
_memory_index: Option<DefinedMemoryIndex>,
allocation_index: MemoryAllocationIndex,
memory: Memory,
) {
let prev = self.live_memories.fetch_sub(1, Ordering::Relaxed);
debug_assert!(prev > 0);
let mut image = memory.unwrap_static_image();
let mut queue = DecommitQueue::default();
let bytes_resident = image.clear_and_remain_ready(
self.pagemap.as_ref(),
self.memories.keep_resident,
|ptr, len| {
unsafe {
queue.push_raw(ptr, len);
}
},
);
match bytes_resident {
Ok(bytes_resident) => {
unsafe {
queue.push_memory(allocation_index, image, bytes_resident);
}
self.merge_or_flush(queue);
}
Err(e) => {
log::warn!("ignoring clear_and_remain_ready error {e}");
unsafe {
self.memories.deallocate(allocation_index, None, 0);
}
}
}
}
fn allocate_table<'a, 'b: 'a, 'c: 'a>(
&'a self,
request: &'a mut InstanceAllocationRequest<'b, 'c>,
ty: &'a wasmtime_environ::Table,
_table_index: DefinedTableIndex,
) -> Pin<Box<dyn Future<Output = Result<(super::TableAllocationIndex, Table)>> + Send + 'a>>
{
crate::runtime::box_future(async move {
async {
let e = match self.tables.allocate(request, ty).await {
Ok(result) => return Ok(result),
Err(e) => e,
};
if e.is::<PoolConcurrencyLimitError>() {
let queue = self.decommit_queue.lock().unwrap();
if self.flush_decommit_queue(queue) {
return self.tables.allocate(request, ty).await;
}
}
Err(e)
}
.await
.inspect(|_| {
self.live_tables.fetch_add(1, Ordering::Relaxed);
})
})
}
unsafe fn deallocate_table(
&self,
_table_index: DefinedTableIndex,
allocation_index: TableAllocationIndex,
mut table: Table,
) {
let prev = self.live_tables.fetch_sub(1, Ordering::Relaxed);
debug_assert!(prev > 0);
let mut queue = DecommitQueue::default();
let bytes_resident = unsafe {
self.tables.reset_table_pages_to_zero(
self.pagemap.as_ref(),
allocation_index,
&mut table,
|ptr, len| {
queue.push_raw(ptr, len);
},
)
};
unsafe {
queue.push_table(allocation_index, table, bytes_resident);
}
self.merge_or_flush(queue);
}
#[cfg(feature = "async")]
fn allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack> {
let ret = self.with_flush_and_retry(|| self.stacks.allocate())?;
self.live_stacks.fetch_add(1, Ordering::Relaxed);
Ok(ret)
}
#[cfg(feature = "async")]
unsafe fn deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack) {
self.live_stacks.fetch_sub(1, Ordering::Relaxed);
let mut queue = DecommitQueue::default();
let bytes_resident = unsafe {
self.stacks
.zero_stack(&mut stack, |ptr, len| queue.push_raw(ptr, len))
};
unsafe {
queue.push_stack(stack, bytes_resident);
}
self.merge_or_flush(queue);
}
fn purge_module(&self, module: CompiledModuleId) {
self.memories.purge_module(module);
}
fn next_available_pkey(&self) -> Option<ProtectionKey> {
self.memories.next_available_pkey()
}
fn restrict_to_pkey(&self, pkey: ProtectionKey) {
mpk::allow(ProtectionMask::zero().or(pkey));
}
fn allow_all_pkeys(&self) {
mpk::allow(ProtectionMask::all());
}
#[cfg(feature = "gc")]
fn allocate_gc_heap(
&self,
engine: &crate::Engine,
gc_runtime: &dyn GcRuntime,
memory_alloc_index: MemoryAllocationIndex,
memory: Memory,
) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)> {
let ret = self
.gc_heaps
.allocate(engine, gc_runtime, memory_alloc_index, memory)?;
self.live_gc_heaps.fetch_add(1, Ordering::Relaxed);
Ok(ret)
}
#[cfg(feature = "gc")]
fn deallocate_gc_heap(
&self,
allocation_index: GcHeapAllocationIndex,
gc_heap: Box<dyn GcHeap>,
) -> (MemoryAllocationIndex, Memory) {
self.live_gc_heaps.fetch_sub(1, Ordering::Relaxed);
self.gc_heaps.deallocate(allocation_index, gc_heap)
}
fn as_pooling(&self) -> Option<&PoolingInstanceAllocator> {
Some(self)
}
}
#[cfg(test)]
#[cfg(target_pointer_width = "64")]
mod test {
use super::*;
#[test]
fn test_pooling_allocator_with_memory_pages_exceeded() {
let config = PoolingInstanceAllocatorConfig {
limits: InstanceLimits {
total_memories: 1,
max_memory_size: 0x100010000,
..Default::default()
},
..PoolingInstanceAllocatorConfig::default()
};
assert_eq!(
PoolingInstanceAllocator::new(
&config,
&Tunables {
memory_reservation: 0x10000,
..Tunables::default_host()
},
)
.map_err(|e| e.to_string())
.expect_err("expected a failure constructing instance allocator"),
"maximum memory size of 0x100010000 bytes exceeds the configured \
memory reservation of 0x10000 bytes"
);
}
#[cfg(all(
unix,
target_pointer_width = "64",
feature = "async",
not(miri),
not(asan)
))]
#[test]
fn test_stack_zeroed() -> Result<()> {
let config = PoolingInstanceAllocatorConfig {
max_unused_warm_slots: 0,
limits: InstanceLimits {
total_stacks: 1,
total_memories: 0,
total_tables: 0,
..Default::default()
},
stack_size: 128,
async_stack_zeroing: true,
..PoolingInstanceAllocatorConfig::default()
};
let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
unsafe {
for _ in 0..255 {
let stack = allocator.allocate_fiber_stack()?;
let addr = stack.top().unwrap().sub(1);
assert_eq!(*addr, 0);
*addr = 1;
allocator.deallocate_fiber_stack(stack);
}
}
Ok(())
}
#[cfg(all(
unix,
target_pointer_width = "64",
feature = "async",
not(miri),
not(asan)
))]
#[test]
fn test_stack_unzeroed() -> Result<()> {
let config = PoolingInstanceAllocatorConfig {
max_unused_warm_slots: 0,
limits: InstanceLimits {
total_stacks: 1,
total_memories: 0,
total_tables: 0,
..Default::default()
},
stack_size: 128,
async_stack_zeroing: false,
..PoolingInstanceAllocatorConfig::default()
};
let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
unsafe {
for i in 0..255 {
let stack = allocator.allocate_fiber_stack()?;
let addr = stack.top().unwrap().sub(1);
assert_eq!(*addr, i);
*addr = i + 1;
allocator.deallocate_fiber_stack(stack);
}
}
Ok(())
}
}