pub mod module_cache;
pub mod package_loader;
pub mod resolver;
pub mod task_manager;
use self::module_cache::CacheError;
pub use self::task_manager::{SpawnType, VirtualTaskManager};
use module_cache::HashedModuleData;
use wasmer_types::{CompilationProgressCallback, ModuleHash};
use std::{
borrow::Cow,
fmt,
ops::Deref,
sync::{Arc, Mutex},
};
use futures::future::BoxFuture;
use virtual_mio::block_on;
use virtual_net::{DynVirtualNetworking, VirtualNetworking};
use wasmer::{Engine, Module, RuntimeError};
use wasmer_wasix_types::wasi::ExitCode;
#[cfg(feature = "journal")]
use crate::journal::{DynJournal, DynReadableJournal};
use crate::{
SpawnError, WasiTtyState,
bin_factory::BinaryPackageCommand,
http::{DynHttpClient, HttpClient},
os::TtyBridge,
runtime::{
module_cache::{
ModuleCache, ThreadLocalCache,
progress::{ModuleLoadProgress, ModuleLoadProgressReporter},
},
package_loader::{PackageLoader, UnsupportedPackageLoader},
resolver::{BackendSource, MultiSource, Source},
},
};
pub type MakeImportCallback = dyn Fn(&wasmer::Module, &mut wasmer::StoreMut) -> anyhow::Result<wasmer::Imports>
+ Send
+ Sync
+ 'static;
pub type ConfigureInstanceCallback = dyn Fn(
&wasmer::Module,
&mut wasmer::StoreMut,
&wasmer::Instance,
Option<&wasmer::Memory>,
) -> anyhow::Result<()>
+ Send
+ Sync
+ 'static;
#[derive(Clone)]
pub struct ImportCallback(pub Arc<MakeImportCallback>);
impl fmt::Debug for ImportCallback {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ImportCallback(..)")
}
}
#[derive(Clone)]
pub struct InstanceCallback(pub Arc<ConfigureInstanceCallback>);
impl fmt::Debug for InstanceCallback {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("InstanceCallback(..)")
}
}
#[derive(Clone)]
pub enum TaintReason {
UnknownWasiVersion,
NonZeroExitCode(ExitCode),
RuntimeError(RuntimeError),
DlSymbolResolutionFailed(String),
}
#[allow(clippy::large_enum_variant)]
pub enum ModuleInput<'a> {
Bytes(Cow<'a, [u8]>),
Hashed(Cow<'a, HashedModuleData>),
Command(Cow<'a, BinaryPackageCommand>),
}
impl<'a> ModuleInput<'a> {
pub fn to_owned(&'a self) -> ModuleInput<'static> {
match self {
Self::Bytes(Cow::Borrowed(b)) => {
let v: Vec<u8> = (*b).to_owned();
let c: Cow<'static, [u8]> = Cow::from(v);
ModuleInput::Bytes(c)
}
Self::Bytes(Cow::Owned(b)) => ModuleInput::Bytes(Cow::Owned((*b).clone())),
Self::Hashed(Cow::Borrowed(h)) => ModuleInput::Hashed(Cow::Owned((*h).clone())),
Self::Hashed(Cow::Owned(h)) => ModuleInput::Hashed(Cow::Owned(h.clone())),
Self::Command(Cow::Borrowed(c)) => ModuleInput::Command(Cow::Owned((*c).clone())),
Self::Command(Cow::Owned(c)) => ModuleInput::Command(Cow::Owned(c.clone())),
}
}
pub fn hash(&self) -> ModuleHash {
match self {
Self::Bytes(b) => {
ModuleHash::new(b)
}
Self::Hashed(hashed) => *hashed.hash(),
Self::Command(cmd) => *cmd.hash(),
}
}
pub fn wasm(&self) -> &[u8] {
match self {
Self::Bytes(b) => b,
Self::Hashed(hashed) => hashed.wasm().as_ref(),
Self::Command(cmd) => cmd.atom_ref().as_ref(),
}
}
pub fn to_hashed(&self) -> HashedModuleData {
match self {
Self::Bytes(b) => HashedModuleData::new(b.as_ref()),
Self::Hashed(hashed) => hashed.as_ref().clone(),
Self::Command(cmd) => HashedModuleData::from_command(cmd),
}
}
}
#[allow(unused_variables)]
pub trait Runtime
where
Self: fmt::Debug,
{
fn networking(&self) -> &DynVirtualNetworking;
fn task_manager(&self) -> &Arc<dyn VirtualTaskManager>;
fn package_loader(&self) -> Arc<dyn PackageLoader + Send + Sync> {
Arc::new(UnsupportedPackageLoader)
}
fn module_cache(&self) -> Arc<dyn ModuleCache + Send + Sync> {
Arc::new(ThreadLocalCache::default())
}
fn source(&self) -> Arc<dyn Source + Send + Sync>;
fn engine(&self) -> Engine {
Engine::default()
}
fn new_store(&self) -> wasmer::Store {
cfg_if::cfg_if! {
if #[cfg(feature = "sys")] {
wasmer::Store::new(self.engine())
} else {
wasmer::Store::default()
}
}
}
fn additional_imports(
&self,
_module: &wasmer::Module,
_store: &mut wasmer::StoreMut,
) -> anyhow::Result<wasmer::Imports> {
Ok(wasmer::Imports::new())
}
fn configure_new_instance(
&self,
_module: &wasmer::Module,
_store: &mut wasmer::StoreMut,
_instance: &wasmer::Instance,
_imported_memory: Option<&wasmer::Memory>,
) -> anyhow::Result<()> {
Ok(())
}
fn http_client(&self) -> Option<&DynHttpClient> {
None
}
fn tty(&self) -> Option<&(dyn TtyBridge + Send + Sync)> {
None
}
fn resolve_module<'a>(
&'a self,
input: ModuleInput<'a>,
engine: Option<&Engine>,
on_progress: Option<ModuleLoadProgressReporter>,
) -> BoxFuture<'a, Result<Module, SpawnError>> {
let data = input.to_hashed();
let engine = if let Some(e) = engine {
e.clone()
} else {
match &input {
ModuleInput::Bytes(_) => self.engine(),
ModuleInput::Hashed(_) => self.engine(),
ModuleInput::Command(cmd) => self.engine(),
}
};
let module_cache = self.module_cache();
let task = async move { load_module(&engine, &module_cache, input, on_progress).await };
Box::pin(task)
}
fn resolve_module_sync(
&self,
input: ModuleInput<'_>,
engine: Option<&Engine>,
on_progress: Option<ModuleLoadProgressReporter>,
) -> Result<Module, SpawnError> {
block_on(self.resolve_module(input, engine, on_progress))
}
#[deprecated(since = "0.601.0", note = "Use `resolve_module` instead")]
fn load_command_module(
&self,
cmd: &BinaryPackageCommand,
) -> BoxFuture<'_, Result<Module, SpawnError>> {
self.resolve_module(ModuleInput::Command(Cow::Owned(cmd.clone())), None, None)
}
#[deprecated(since = "0.601.0", note = "Use `resolve_module_sync` instead")]
fn load_command_module_sync(&self, cmd: &BinaryPackageCommand) -> Result<Module, SpawnError> {
block_on(self.resolve_module(ModuleInput::Command(Cow::Borrowed(cmd)), None, None))
}
#[deprecated(since = "0.601.0", note = "Use `resolve_module` instead")]
fn load_module<'a>(&'a self, wasm: &'a [u8]) -> BoxFuture<'a, Result<Module, SpawnError>> {
self.resolve_module(ModuleInput::Bytes(Cow::Borrowed(wasm)), None, None)
}
#[deprecated(
since = "0.601.0",
note = "Use `load_command_module` or `load_hashed_module` instead - this method can have high overhead"
)]
fn load_module_sync(&self, wasm: &[u8]) -> Result<Module, SpawnError> {
block_on(self.resolve_module(ModuleInput::Bytes(Cow::Borrowed(wasm)), None, None))
}
fn load_hashed_module(
&self,
module: HashedModuleData,
engine: Option<&Engine>,
) -> BoxFuture<'_, Result<Module, SpawnError>> {
self.resolve_module(ModuleInput::Hashed(Cow::Owned(module)), engine, None)
}
fn load_hashed_module_sync(
&self,
wasm: HashedModuleData,
engine: Option<&Engine>,
) -> Result<Module, SpawnError> {
block_on(self.resolve_module(ModuleInput::Hashed(Cow::Owned(wasm)), engine, None))
}
fn on_taint(&self, _reason: TaintReason) {}
#[cfg(feature = "journal")]
fn read_only_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynReadableJournal>> + 'a> {
Box::new(std::iter::empty())
}
#[cfg(feature = "journal")]
fn writable_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynJournal>> + 'a> {
Box::new(std::iter::empty())
}
#[cfg(feature = "journal")]
fn active_journal(&self) -> Option<&'_ DynJournal> {
None
}
}
pub type DynRuntime = dyn Runtime + Send + Sync;
#[tracing::instrument(level = "debug", skip_all)]
pub async fn load_module(
engine: &Engine,
module_cache: &(dyn ModuleCache + Send + Sync),
input: ModuleInput<'_>,
on_progress: Option<ModuleLoadProgressReporter>,
) -> Result<Module, crate::SpawnError> {
let wasm_hash = input.hash();
let result = if let Some(on_progress) = &on_progress {
module_cache
.load_with_progress(wasm_hash, engine, on_progress.clone())
.await
} else {
module_cache.load(wasm_hash, engine).await
};
match result {
Ok(module) => return Ok(module),
Err(CacheError::NotFound) => {}
Err(other) => {
tracing::warn!(
%wasm_hash,
error=&other as &dyn std::error::Error,
"Unable to load the cached module",
);
}
}
let res = if let Some(progress) = on_progress {
#[allow(unused_variables)]
let p = CompilationProgressCallback::new(move |p| {
progress.notify(ModuleLoadProgress::CompilingModule(p))
});
#[cfg(feature = "sys")]
{
if engine.is_sys() {
use wasmer::sys::NativeEngineExt;
engine.new_module_with_progress(input.wasm(), p)
} else {
Module::new(&engine, input.wasm())
}
}
#[cfg(not(feature = "sys"))]
{
Module::new(&engine, input.wasm())
}
} else {
Module::new(&engine, input.wasm())
};
let module = res.map_err(|err| crate::SpawnError::CompileError {
module_hash: wasm_hash,
error: err,
})?;
if let Err(e) = module_cache.save(wasm_hash, engine, &module).await {
tracing::warn!(
%wasm_hash,
error=&e as &dyn std::error::Error,
"Unable to cache the compiled module",
);
}
Ok(module)
}
#[derive(Debug, Default)]
pub struct DefaultTty {
state: Mutex<WasiTtyState>,
}
impl TtyBridge for DefaultTty {
fn reset(&self) {
let mut state = self.state.lock().unwrap();
state.echo = false;
state.line_buffered = false;
state.line_feeds = false
}
fn tty_get(&self) -> WasiTtyState {
let state = self.state.lock().unwrap();
state.clone()
}
fn tty_set(&self, tty_state: WasiTtyState) {
let mut state = self.state.lock().unwrap();
*state = tty_state;
}
}
#[derive(Debug, Clone)]
pub struct PluggableRuntime {
pub rt: Arc<dyn VirtualTaskManager>,
pub networking: DynVirtualNetworking,
pub http_client: Option<DynHttpClient>,
pub package_loader: Arc<dyn PackageLoader + Send + Sync>,
pub source: Arc<dyn Source + Send + Sync>,
pub engine: Engine,
pub module_cache: Arc<dyn ModuleCache + Send + Sync>,
pub tty: Option<Arc<dyn TtyBridge + Send + Sync>>,
#[cfg(feature = "journal")]
pub read_only_journals: Vec<Arc<DynReadableJournal>>,
#[cfg(feature = "journal")]
pub writable_journals: Vec<Arc<DynJournal>>,
pub additional_imports: Vec<ImportCallback>,
pub instance_callbacks: Vec<InstanceCallback>,
}
impl PluggableRuntime {
pub fn new(rt: Arc<dyn VirtualTaskManager>) -> Self {
cfg_if::cfg_if! {
if #[cfg(feature = "host-vnet")] {
let networking = Arc::new(virtual_net::host::LocalNetworking::default());
} else {
let networking = Arc::new(virtual_net::UnsupportedVirtualNetworking::default());
}
}
let http_client =
crate::http::default_http_client().map(|client| Arc::new(client) as DynHttpClient);
let loader = UnsupportedPackageLoader;
let mut source = MultiSource::default();
if let Some(client) = &http_client {
source.add_source(BackendSource::new(
BackendSource::WASMER_PROD_ENDPOINT.parse().unwrap(),
client.clone(),
));
}
Self {
rt,
networking,
http_client,
engine: Default::default(),
tty: None,
source: Arc::new(source),
package_loader: Arc::new(loader),
module_cache: Arc::new(module_cache::in_memory()),
#[cfg(feature = "journal")]
read_only_journals: Vec::new(),
#[cfg(feature = "journal")]
writable_journals: Vec::new(),
additional_imports: Vec::new(),
instance_callbacks: Vec::new(),
}
}
pub fn set_networking_implementation<I>(&mut self, net: I) -> &mut Self
where
I: VirtualNetworking + Sync,
{
self.networking = Arc::new(net);
self
}
pub fn set_engine(&mut self, engine: Engine) -> &mut Self {
self.engine = engine;
self
}
pub fn set_tty(&mut self, tty: Arc<dyn TtyBridge + Send + Sync>) -> &mut Self {
self.tty = Some(tty);
self
}
pub fn set_module_cache(
&mut self,
module_cache: impl ModuleCache + Send + Sync + 'static,
) -> &mut Self {
self.module_cache = Arc::new(module_cache);
self
}
pub fn set_source(&mut self, source: impl Source + Send + 'static) -> &mut Self {
self.source = Arc::new(source);
self
}
pub fn set_package_loader(
&mut self,
package_loader: impl PackageLoader + 'static,
) -> &mut Self {
self.package_loader = Arc::new(package_loader);
self
}
pub fn set_http_client(
&mut self,
client: impl HttpClient + Send + Sync + 'static,
) -> &mut Self {
self.http_client = Some(Arc::new(client));
self
}
#[cfg(feature = "journal")]
pub fn add_read_only_journal(&mut self, journal: Arc<DynReadableJournal>) -> &mut Self {
self.read_only_journals.push(journal);
self
}
#[cfg(feature = "journal")]
pub fn add_writable_journal(&mut self, journal: Arc<DynJournal>) -> &mut Self {
self.writable_journals.push(journal);
self
}
pub fn with_additional_imports(
&mut self,
imports: impl Fn(&wasmer::Module, &mut wasmer::StoreMut) -> anyhow::Result<wasmer::Imports>
+ Send
+ Sync
+ 'static,
) -> &mut Self {
self.additional_imports
.push(ImportCallback(Arc::new(imports)));
self
}
pub fn with_instance_setup(
&mut self,
callback: impl Fn(
&wasmer::Module,
&mut wasmer::StoreMut,
&wasmer::Instance,
Option<&wasmer::Memory>,
) -> anyhow::Result<()>
+ Send
+ Sync
+ 'static,
) -> &mut Self {
self.instance_callbacks
.push(InstanceCallback(Arc::new(callback)));
self
}
}
impl Runtime for PluggableRuntime {
fn networking(&self) -> &DynVirtualNetworking {
&self.networking
}
fn http_client(&self) -> Option<&DynHttpClient> {
self.http_client.as_ref()
}
fn package_loader(&self) -> Arc<dyn PackageLoader + Send + Sync> {
Arc::clone(&self.package_loader)
}
fn source(&self) -> Arc<dyn Source + Send + Sync> {
Arc::clone(&self.source)
}
fn engine(&self) -> Engine {
self.engine.clone()
}
fn new_store(&self) -> wasmer::Store {
wasmer::Store::new(self.engine.clone())
}
fn task_manager(&self) -> &Arc<dyn VirtualTaskManager> {
&self.rt
}
fn tty(&self) -> Option<&(dyn TtyBridge + Send + Sync)> {
self.tty.as_deref()
}
fn module_cache(&self) -> Arc<dyn ModuleCache + Send + Sync> {
self.module_cache.clone()
}
fn additional_imports(
&self,
module: &wasmer::Module,
store: &mut wasmer::StoreMut,
) -> anyhow::Result<wasmer::Imports> {
let mut imports = wasmer::Imports::new();
for cb in &self.additional_imports {
imports.extend(&(*(cb.0))(module, store)?);
}
Ok(imports)
}
fn configure_new_instance(
&self,
module: &wasmer::Module,
store: &mut wasmer::StoreMut,
instance: &wasmer::Instance,
imported_memory: Option<&wasmer::Memory>,
) -> anyhow::Result<()> {
for cb in &self.instance_callbacks {
(*(cb.0))(module, store, instance, imported_memory)?;
}
Ok(())
}
#[cfg(feature = "journal")]
fn read_only_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynReadableJournal>> + 'a> {
Box::new(self.read_only_journals.iter().cloned())
}
#[cfg(feature = "journal")]
fn writable_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynJournal>> + 'a> {
Box::new(self.writable_journals.iter().cloned())
}
#[cfg(feature = "journal")]
fn active_journal(&self) -> Option<&DynJournal> {
self.writable_journals.iter().last().map(|a| a.as_ref())
}
}
#[derive(Clone, Debug)]
pub struct OverriddenRuntime {
inner: Arc<DynRuntime>,
task_manager: Option<Arc<dyn VirtualTaskManager>>,
networking: Option<DynVirtualNetworking>,
http_client: Option<DynHttpClient>,
package_loader: Option<Arc<dyn PackageLoader + Send + Sync>>,
source: Option<Arc<dyn Source + Send + Sync>>,
engine: Option<Engine>,
module_cache: Option<Arc<dyn ModuleCache + Send + Sync>>,
tty: Option<Arc<dyn TtyBridge + Send + Sync>>,
additional_imports: Vec<ImportCallback>,
instance_callbacks: Vec<InstanceCallback>,
#[cfg(feature = "journal")]
pub read_only_journals: Option<Vec<Arc<DynReadableJournal>>>,
#[cfg(feature = "journal")]
pub writable_journals: Option<Vec<Arc<DynJournal>>>,
}
impl OverriddenRuntime {
pub fn new(inner: Arc<DynRuntime>) -> Self {
Self {
inner,
task_manager: None,
networking: None,
http_client: None,
package_loader: None,
source: None,
engine: None,
module_cache: None,
tty: None,
additional_imports: Vec::new(),
instance_callbacks: Vec::new(),
#[cfg(feature = "journal")]
read_only_journals: None,
#[cfg(feature = "journal")]
writable_journals: None,
}
}
pub fn with_task_manager(mut self, task_manager: Arc<dyn VirtualTaskManager>) -> Self {
self.task_manager.replace(task_manager);
self
}
pub fn with_networking(mut self, networking: DynVirtualNetworking) -> Self {
self.networking.replace(networking);
self
}
pub fn with_http_client(mut self, http_client: DynHttpClient) -> Self {
self.http_client.replace(http_client);
self
}
pub fn with_package_loader(
mut self,
package_loader: Arc<dyn PackageLoader + Send + Sync>,
) -> Self {
self.package_loader.replace(package_loader);
self
}
pub fn with_source(mut self, source: Arc<dyn Source + Send + Sync>) -> Self {
self.source.replace(source);
self
}
pub fn with_engine(mut self, engine: Engine) -> Self {
self.engine.replace(engine);
self
}
pub fn with_module_cache(mut self, module_cache: Arc<dyn ModuleCache + Send + Sync>) -> Self {
self.module_cache.replace(module_cache);
self
}
pub fn with_tty(mut self, tty: Arc<dyn TtyBridge + Send + Sync>) -> Self {
self.tty.replace(tty);
self
}
pub fn with_additional_imports(
mut self,
imports: impl Fn(&wasmer::Module, &mut wasmer::StoreMut) -> anyhow::Result<wasmer::Imports>
+ Send
+ Sync
+ 'static,
) -> Self {
self.additional_imports
.push(ImportCallback(Arc::new(imports)));
self
}
pub fn with_instance_setup(
mut self,
callback: impl Fn(
&wasmer::Module,
&mut wasmer::StoreMut,
&wasmer::Instance,
Option<&wasmer::Memory>,
) -> anyhow::Result<()>
+ Send
+ Sync
+ 'static,
) -> Self {
self.instance_callbacks
.push(InstanceCallback(Arc::new(callback)));
self
}
#[cfg(feature = "journal")]
pub fn with_read_only_journals(mut self, journals: Vec<Arc<DynReadableJournal>>) -> Self {
self.read_only_journals.replace(journals);
self
}
#[cfg(feature = "journal")]
pub fn with_writable_journals(mut self, journals: Vec<Arc<DynJournal>>) -> Self {
self.writable_journals.replace(journals);
self
}
}
impl Runtime for OverriddenRuntime {
fn networking(&self) -> &DynVirtualNetworking {
if let Some(net) = self.networking.as_ref() {
net
} else {
self.inner.networking()
}
}
fn task_manager(&self) -> &Arc<dyn VirtualTaskManager> {
if let Some(rt) = self.task_manager.as_ref() {
rt
} else {
self.inner.task_manager()
}
}
fn source(&self) -> Arc<dyn Source + Send + Sync> {
if let Some(source) = self.source.clone() {
source
} else {
self.inner.source()
}
}
fn package_loader(&self) -> Arc<dyn PackageLoader + Send + Sync> {
if let Some(loader) = self.package_loader.clone() {
loader
} else {
self.inner.package_loader()
}
}
fn module_cache(&self) -> Arc<dyn ModuleCache + Send + Sync> {
if let Some(cache) = self.module_cache.clone() {
cache
} else {
self.inner.module_cache()
}
}
fn engine(&self) -> Engine {
if let Some(engine) = self.engine.clone() {
engine
} else {
self.inner.engine()
}
}
fn new_store(&self) -> wasmer::Store {
if let Some(engine) = self.engine.clone() {
wasmer::Store::new(engine)
} else {
self.inner.new_store()
}
}
fn additional_imports(
&self,
module: &wasmer::Module,
store: &mut wasmer::StoreMut,
) -> anyhow::Result<wasmer::Imports> {
let mut imports = self.inner.additional_imports(module, store)?;
for cb in &self.additional_imports {
imports.extend(&(*(cb.0))(module, store)?);
}
Ok(imports)
}
fn configure_new_instance(
&self,
module: &wasmer::Module,
store: &mut wasmer::StoreMut,
instance: &wasmer::Instance,
imported_memory: Option<&wasmer::Memory>,
) -> anyhow::Result<()> {
self.inner
.configure_new_instance(module, store, instance, imported_memory)?;
for cb in &self.instance_callbacks {
(*(cb.0))(module, store, instance, imported_memory)?;
}
Ok(())
}
fn http_client(&self) -> Option<&DynHttpClient> {
if let Some(client) = self.http_client.as_ref() {
Some(client)
} else {
self.inner.http_client()
}
}
fn tty(&self) -> Option<&(dyn TtyBridge + Send + Sync)> {
if let Some(tty) = self.tty.as_ref() {
Some(tty.deref())
} else {
self.inner.tty()
}
}
#[cfg(feature = "journal")]
fn read_only_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynReadableJournal>> + 'a> {
if let Some(journals) = self.read_only_journals.as_ref() {
Box::new(journals.iter().cloned())
} else {
self.inner.read_only_journals()
}
}
#[cfg(feature = "journal")]
fn writable_journals<'a>(&'a self) -> Box<dyn Iterator<Item = Arc<DynJournal>> + 'a> {
if let Some(journals) = self.writable_journals.as_ref() {
Box::new(journals.iter().cloned())
} else {
self.inner.writable_journals()
}
}
#[cfg(feature = "journal")]
fn active_journal(&self) -> Option<&'_ DynJournal> {
if let Some(journals) = self.writable_journals.as_ref() {
journals.iter().last().map(|a| a.as_ref())
} else {
self.inner.active_journal()
}
}
}