wasmtime_provider/
provider_async.rs

1use std::sync::Arc;
2
3use log::{error, info};
4
5use async_trait::async_trait;
6use parking_lot::RwLock;
7use tracing::trace;
8#[cfg(feature = "wasi")]
9use wapc::WasiParams;
10use wapc::{wapc_functions, ModuleStateAsync, WebAssemblyEngineProviderAsync};
11use wasmtime::{AsContextMut, Engine, Instance, InstancePre, Linker, Module, Store, TypedFunc};
12
13use crate::callbacks_async;
14use crate::errors::{Error, Result};
15use crate::store_async::WapcStoreAsync;
16use crate::EpochDeadlines;
17
18struct EngineInner {
19  instance: Arc<RwLock<Instance>>,
20  guest_call_fn: TypedFunc<(i32, i32), i32>,
21  host: Arc<ModuleStateAsync>,
22}
23
24/// A pre initialized [`WasmtimeEngineProviderAsync`]
25///
26/// Can be used to quickly create a new instance of [`WasmtimeEngineProviderAsync`]
27///
28/// Refer to [`WasmtimeEngineProviderBuilder::build_async_pre`](crate::WasmtimeEngineProviderBuilder::build_async_pre) to create an instance of this struct.
29#[allow(missing_debug_implementations)]
30#[derive(Clone)]
31pub struct WasmtimeEngineProviderAsyncPre {
32  module: Module,
33  #[cfg(feature = "wasi")]
34  wasi_params: WasiParams,
35  engine: Engine,
36  linker: Linker<WapcStoreAsync>,
37  instance_pre: InstancePre<WapcStoreAsync>,
38  epoch_deadlines: Option<EpochDeadlines>,
39}
40
41impl WasmtimeEngineProviderAsyncPre {
42  #[cfg(feature = "wasi")]
43  pub(crate) fn new(
44    engine: Engine,
45    module: Module,
46    wasi: Option<WasiParams>,
47    epoch_deadlines: Option<EpochDeadlines>,
48  ) -> Result<Self> {
49    let mut linker: Linker<WapcStoreAsync> = Linker::new(&engine);
50
51    let wasi_params = wasi.unwrap_or_default();
52    wasi_common::tokio::add_to_linker(&mut linker, |s: &mut WapcStoreAsync| &mut s.wasi_ctx).unwrap();
53
54    // register all the waPC host functions
55    callbacks_async::add_to_linker(&mut linker)?;
56
57    let instance_pre = linker.instantiate_pre(&module)?;
58
59    Ok(Self {
60      module,
61      wasi_params,
62      engine,
63      linker,
64      instance_pre,
65      epoch_deadlines,
66    })
67  }
68
69  #[cfg(not(feature = "wasi"))]
70  pub(crate) fn new(engine: Engine, module: Module, epoch_deadlines: Option<EpochDeadlines>) -> Result<Self> {
71    let mut linker: Linker<WapcStoreAsync> = Linker::new(&engine);
72
73    // register all the waPC host functions
74    callbacks_async::add_to_linker(&mut linker)?;
75
76    let instance_pre = linker.instantiate_pre(&module)?;
77
78    Ok(Self {
79      module,
80      engine,
81      linker,
82      instance_pre,
83      epoch_deadlines,
84    })
85  }
86
87  /// Create an instance of [`WasmtimeEngineProviderAsync`] ready to be consumed
88  ///
89  /// Note: from micro-benchmarking, this method is 10 microseconds faster than
90  /// `WasmtimeEngineProviderAsync::clone`.
91  pub fn rehydrate(&self) -> Result<WasmtimeEngineProviderAsync> {
92    let engine = self.engine.clone();
93
94    #[cfg(feature = "wasi")]
95    let wapc_store = WapcStoreAsync::new(&self.wasi_params, None)?;
96    #[cfg(not(feature = "wasi"))]
97    let wapc_store = WapcStoreAsync::new(None);
98
99    let store = Store::new(&engine, wapc_store);
100
101    Ok(WasmtimeEngineProviderAsync {
102      module: self.module.clone(),
103      inner: None,
104      engine,
105      epoch_deadlines: self.epoch_deadlines,
106      linker: self.linker.clone(),
107      instance_pre: self.instance_pre.clone(),
108      store,
109      #[cfg(feature = "wasi")]
110      wasi_params: self.wasi_params.clone(),
111    })
112  }
113}
114
115/// A waPC engine provider that encapsulates the Wasmtime WebAssembly runtime.
116/// This can be used inside of async contexts.
117///
118/// Refer to
119/// [`WasmtimeEngineProviderBuilder::build_async`](crate::WasmtimeEngineProviderBuilder::build_async) to create an instance of this struct.
120///
121/// ## Example
122///
123/// ```rust
124/// use wasmtime_provider::WasmtimeEngineProviderBuilder;
125/// use wapc::WapcHostAsync;
126/// use std::error::Error;
127///
128/// // Sample host callback that prints the operation a WASM module requested.
129/// async fn host_callback(
130///   id: u64,
131///   bd: String,
132///   ns: String,
133///   op: String,
134///   payload: Vec<u8>,
135/// ) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
136///   println!(
137///     "Guest {} invoked '{}->{}:{}' on the host with a payload of '{}'",
138///     id,
139///     bd,
140///     ns,
141///     op,
142///     ::std::str::from_utf8(&payload).unwrap()
143///   );
144///   Ok(vec![])
145/// }
146///
147/// #[tokio::main]
148/// pub async fn main() -> Result<(), Box<dyn Error>> {
149///   let callback: Box<wapc::HostCallbackAsync> = Box::new(move |id, bd, ns, op, payload| {
150///     let fut = host_callback(id, bd, ns, op, payload);
151///     Box::pin(fut)
152///   });
153///
154///   let file = "../../wasm/crates/wasm-basic/build/wasm_basic.wasm";
155///   let module_bytes = std::fs::read(file)?;
156///
157///   let engine = WasmtimeEngineProviderBuilder::new()
158///     .module_bytes(&module_bytes)
159///     .build_async()?;
160///   let host = WapcHostAsync::new(Box::new(engine), Some(callback)).await?;
161///
162///   let res = host.call("ping", b"payload bytes").await?;
163///   assert_eq!(res, b"payload bytes");
164///
165///   Ok(())
166/// }
167/// ```
168#[allow(missing_debug_implementations)]
169pub struct WasmtimeEngineProviderAsync {
170  module: Module,
171  #[cfg(feature = "wasi")]
172  wasi_params: WasiParams,
173  inner: Option<EngineInner>,
174  engine: Engine,
175  linker: Linker<WapcStoreAsync>,
176  store: Store<WapcStoreAsync>,
177  instance_pre: InstancePre<WapcStoreAsync>,
178  epoch_deadlines: Option<EpochDeadlines>,
179}
180
181impl Clone for WasmtimeEngineProviderAsync {
182  fn clone(&self) -> Self {
183    let engine = self.engine.clone();
184
185    #[cfg(feature = "wasi")]
186    let wapc_store = WapcStoreAsync::new(&self.wasi_params, None).unwrap();
187    #[cfg(not(feature = "wasi"))]
188    let wapc_store = WapcStoreAsync::new(None);
189
190    let store = Store::new(&engine, wapc_store);
191
192    match &self.inner {
193      Some(state) => {
194        let mut new = Self {
195          module: self.module.clone(),
196          inner: None,
197          engine,
198          epoch_deadlines: self.epoch_deadlines,
199          linker: self.linker.clone(),
200          instance_pre: self.instance_pre.clone(),
201          store,
202          #[cfg(feature = "wasi")]
203          wasi_params: self.wasi_params.clone(),
204        };
205
206        tokio::runtime::Handle::current().block_on(async {
207          new.init(state.host.clone()).await.unwrap();
208        });
209
210        new
211      }
212      None => Self {
213        module: self.module.clone(),
214        inner: None,
215        engine,
216        epoch_deadlines: self.epoch_deadlines,
217        linker: self.linker.clone(),
218        instance_pre: self.instance_pre.clone(),
219        store,
220        #[cfg(feature = "wasi")]
221        wasi_params: self.wasi_params.clone(),
222      },
223    }
224  }
225}
226
227#[async_trait]
228impl WebAssemblyEngineProviderAsync for WasmtimeEngineProviderAsync {
229  async fn init(
230    &mut self,
231    host: Arc<ModuleStateAsync>,
232  ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
233    // create the proper store, now we have a value for `host`
234    #[cfg(feature = "wasi")]
235    let wapc_store = WapcStoreAsync::new(&self.wasi_params, Some(host.clone()))?;
236    #[cfg(not(feature = "wasi"))]
237    let wapc_store = WapcStoreAsync::new(Some(host.clone()));
238
239    self.store = Store::new(&self.engine, wapc_store);
240
241    let instance = self.instance_pre.instantiate_async(&mut self.store).await?;
242
243    let instance_ref = Arc::new(RwLock::new(instance));
244    let gc = guest_call_fn(&mut self.store, &instance_ref)?;
245    self.inner = Some(EngineInner {
246      instance: instance_ref,
247      guest_call_fn: gc,
248      host,
249    });
250    self.initialize().await?;
251    Ok(())
252  }
253
254  async fn call(
255    &mut self,
256    op_length: i32,
257    msg_length: i32,
258  ) -> std::result::Result<i32, Box<dyn std::error::Error + Send + Sync>> {
259    if let Some(deadlines) = &self.epoch_deadlines {
260      // the deadline counter must be set before invoking the wasm function
261      self.store.set_epoch_deadline(deadlines.wapc_func);
262    }
263
264    let engine_inner = self.inner.as_ref().unwrap();
265    let call = engine_inner
266      .guest_call_fn
267      .call_async(&mut self.store, (op_length, msg_length))
268      .await;
269
270    match call {
271      Ok(result) => Ok(result),
272      Err(err) => {
273        error!("Failure invoking guest module handler: {:?}", err);
274        let mut guest_error = err.to_string();
275        if let Some(trap) = err.downcast_ref::<wasmtime::Trap>() {
276          if matches!(trap, wasmtime::Trap::Interrupt) {
277            "guest code interrupted, execution deadline exceeded".clone_into(&mut guest_error);
278          }
279        }
280        engine_inner.host.set_guest_error(guest_error).await;
281        Ok(0)
282      }
283    }
284  }
285
286  async fn replace(&mut self, module: &[u8]) -> std::result::Result<(), Box<(dyn std::error::Error + Send + Sync)>> {
287    info!(
288      "HOT SWAP - Replacing existing WebAssembly module with new buffer, {} bytes",
289      module.len()
290    );
291
292    let module = Module::new(&self.engine, module)?;
293    self.module = module;
294    self.instance_pre = self.linker.instantiate_pre(&self.module)?;
295    let new_instance = self.instance_pre.instantiate_async(&mut self.store).await?;
296    if let Some(inner) = self.inner.as_mut() {
297      *inner.instance.write() = new_instance;
298      let gc = guest_call_fn(&mut self.store, &inner.instance)?;
299      inner.guest_call_fn = gc;
300    }
301
302    Ok(self.initialize().await?)
303  }
304}
305
306impl WasmtimeEngineProviderAsync {
307  async fn initialize(&mut self) -> Result<()> {
308    for starter in wapc_functions::REQUIRED_STARTS.iter() {
309      if let Some(deadlines) = &self.epoch_deadlines {
310        // the deadline counter must be set before invoking the wasm function
311        self.store.set_epoch_deadline(deadlines.wapc_init);
312      }
313
314      let engine_inner = self.inner.as_ref().unwrap();
315      if engine_inner
316        .instance
317        .read()
318        .get_export(&mut self.store, starter)
319        .is_some()
320      {
321        // Need to get a `wasmtime::TypedFunc` because its `call` method
322        // can return a Trap error. Non-typed functions instead return a
323        // generic `anyhow::Error` that doesn't allow nice handling of
324        // errors
325        let starter_func: TypedFunc<(), ()> = engine_inner.instance.read().get_typed_func(&mut self.store, starter)?;
326
327        if let Err(err) = starter_func.call_async(&mut self.store, ()).await {
328          trace!(function = starter, ?err, "handling error returned by init function");
329          if let Some(trap) = err.downcast_ref::<wasmtime::Trap>() {
330            if matches!(trap, wasmtime::Trap::Interrupt) {
331              return Err(Error::InitializationFailedTimeout((*starter).to_owned()));
332            }
333            return Err(Error::InitializationFailed(err.to_string()));
334          }
335
336          // WASI programs built by tinygo have to be written with a `main` function, even if it's empty.
337          // Starting from tinygo >= 0.35.0, the `main` function calls the WASI process exit function,
338          // which is handled by wasmtime as an Error.
339          //
340          // We must check if this error can be converted into a WASI exit
341          // error and, if the exit code is 0, we can ignore it. Otherwise the waPC initialization
342          // will fail.
343          #[cfg(feature = "wasi")]
344          if let Some(exit_err) = err.downcast_ref::<wasi_common::I32Exit>() {
345            if exit_err.0 != 0 {
346              return Err(Error::InitializationFailed(err.to_string()));
347            }
348            trace!("ignoring successful exit trap generated by WASI");
349            continue;
350          }
351
352          return Err(Error::InitializationFailed(err.to_string()));
353        };
354      }
355    }
356    Ok(())
357  }
358}
359
360// Called once, then the result is cached. This returns a `Func` that corresponds
361// to the `__guest_call` export
362fn guest_call_fn(store: impl AsContextMut, instance: &Arc<RwLock<Instance>>) -> Result<TypedFunc<(i32, i32), i32>> {
363  instance
364    .read()
365    .get_typed_func::<(i32, i32), i32>(store, wapc_functions::GUEST_CALL)
366    .map_err(|_| Error::GuestCallNotFound)
367}