wasmtime_provider/
provider_async.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use log::{error, info};
5use parking_lot::RwLock;
6use tracing::trace;
7#[cfg(feature = "wasi")]
8use wapc::WasiParams;
9use wapc::{wapc_functions, ModuleStateAsync, WebAssemblyEngineProviderAsync};
10use wasmtime::{AsContextMut, Engine, Instance, InstancePre, Linker, Module, Store, TypedFunc};
11
12use crate::errors::{Error, Result};
13use crate::store_async::WapcStoreAsync;
14use crate::{callbacks_async, EpochDeadlines};
15
16struct EngineInner {
17  instance: Arc<RwLock<Instance>>,
18  guest_call_fn: TypedFunc<(i32, i32), i32>,
19  host: Arc<ModuleStateAsync>,
20}
21
22/// A pre initialized [`WasmtimeEngineProviderAsync`]
23///
24/// Can be used to quickly create a new instance of [`WasmtimeEngineProviderAsync`]
25///
26/// Refer to [`WasmtimeEngineProviderBuilder::build_async_pre`](crate::WasmtimeEngineProviderBuilder::build_async_pre) to create an instance of this struct.
27#[allow(missing_debug_implementations)]
28#[derive(Clone)]
29pub struct WasmtimeEngineProviderAsyncPre {
30  module: Module,
31  #[cfg(feature = "wasi")]
32  wasi_params: WasiParams,
33  engine: Engine,
34  linker: Linker<WapcStoreAsync>,
35  instance_pre: InstancePre<WapcStoreAsync>,
36  epoch_deadlines: Option<EpochDeadlines>,
37}
38
39impl WasmtimeEngineProviderAsyncPre {
40  #[cfg(feature = "wasi")]
41  pub(crate) fn new(
42    engine: Engine,
43    module: Module,
44    wasi: Option<WasiParams>,
45    epoch_deadlines: Option<EpochDeadlines>,
46  ) -> Result<Self> {
47    let mut linker: Linker<WapcStoreAsync> = Linker::new(&engine);
48
49    let wasi_params = wasi.unwrap_or_default();
50    wasi_common::tokio::add_to_linker(&mut linker, |s: &mut WapcStoreAsync| &mut s.wasi_ctx).unwrap();
51
52    // register all the waPC host functions
53    callbacks_async::add_to_linker(&mut linker)?;
54
55    let instance_pre = linker.instantiate_pre(&module)?;
56
57    Ok(Self {
58      module,
59      wasi_params,
60      engine,
61      linker,
62      instance_pre,
63      epoch_deadlines,
64    })
65  }
66
67  #[cfg(not(feature = "wasi"))]
68  pub(crate) fn new(engine: Engine, module: Module, epoch_deadlines: Option<EpochDeadlines>) -> Result<Self> {
69    let mut linker: Linker<WapcStoreAsync> = Linker::new(&engine);
70
71    // register all the waPC host functions
72    callbacks_async::add_to_linker(&mut linker)?;
73
74    let instance_pre = linker.instantiate_pre(&module)?;
75
76    Ok(Self {
77      module,
78      engine,
79      linker,
80      instance_pre,
81      epoch_deadlines,
82    })
83  }
84
85  /// Create an instance of [`WasmtimeEngineProviderAsync`] ready to be consumed
86  ///
87  /// Note: from micro-benchmarking, this method is 10 microseconds faster than
88  /// `WasmtimeEngineProviderAsync::clone`.
89  pub fn rehydrate(&self) -> Result<WasmtimeEngineProviderAsync> {
90    let engine = self.engine.clone();
91
92    #[cfg(feature = "wasi")]
93    let wapc_store = WapcStoreAsync::new(&self.wasi_params, None)?;
94    #[cfg(not(feature = "wasi"))]
95    let wapc_store = WapcStoreAsync::new(None);
96
97    let store = Store::new(&engine, wapc_store);
98
99    Ok(WasmtimeEngineProviderAsync {
100      module: self.module.clone(),
101      inner: None,
102      engine,
103      epoch_deadlines: self.epoch_deadlines,
104      linker: self.linker.clone(),
105      instance_pre: self.instance_pre.clone(),
106      store,
107      #[cfg(feature = "wasi")]
108      wasi_params: self.wasi_params.clone(),
109    })
110  }
111}
112
113/// A waPC engine provider that encapsulates the Wasmtime WebAssembly runtime.
114/// This can be used inside of async contexts.
115///
116/// Refer to
117/// [`WasmtimeEngineProviderBuilder::build_async`](crate::WasmtimeEngineProviderBuilder::build_async) to create an instance of this struct.
118///
119/// ## Example
120///
121/// ```rust
122/// use wasmtime_provider::WasmtimeEngineProviderBuilder;
123/// use wapc::WapcHostAsync;
124/// use std::error::Error;
125///
126/// // Sample host callback that prints the operation a WASM module requested.
127/// async fn host_callback(
128///   id: u64,
129///   bd: String,
130///   ns: String,
131///   op: String,
132///   payload: Vec<u8>,
133/// ) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
134///   println!(
135///     "Guest {} invoked '{}->{}:{}' on the host with a payload of '{}'",
136///     id,
137///     bd,
138///     ns,
139///     op,
140///     ::std::str::from_utf8(&payload).unwrap()
141///   );
142///   Ok(vec![])
143/// }
144///
145/// #[tokio::main]
146/// pub async fn main() -> Result<(), Box<dyn Error>> {
147///   let callback: Box<wapc::HostCallbackAsync> = Box::new(move |id, bd, ns, op, payload| {
148///     let fut = host_callback(id, bd, ns, op, payload);
149///     Box::pin(fut)
150///   });
151///
152///   let file = "../../wasm/crates/wasm-basic/build/wasm_basic.wasm";
153///   let module_bytes = std::fs::read(file)?;
154///
155///   let engine = WasmtimeEngineProviderBuilder::new()
156///     .module_bytes(&module_bytes)
157///     .build_async()?;
158///   let host = WapcHostAsync::new(Box::new(engine), Some(callback)).await?;
159///
160///   let res = host.call("ping", b"payload bytes").await?;
161///   assert_eq!(res, b"payload bytes");
162///
163///   Ok(())
164/// }
165/// ```
166#[allow(missing_debug_implementations)]
167pub struct WasmtimeEngineProviderAsync {
168  module: Module,
169  #[cfg(feature = "wasi")]
170  wasi_params: WasiParams,
171  inner: Option<EngineInner>,
172  engine: Engine,
173  linker: Linker<WapcStoreAsync>,
174  store: Store<WapcStoreAsync>,
175  instance_pre: InstancePre<WapcStoreAsync>,
176  epoch_deadlines: Option<EpochDeadlines>,
177}
178
179impl Clone for WasmtimeEngineProviderAsync {
180  fn clone(&self) -> Self {
181    let engine = self.engine.clone();
182
183    #[cfg(feature = "wasi")]
184    let wapc_store = WapcStoreAsync::new(&self.wasi_params, None).unwrap();
185    #[cfg(not(feature = "wasi"))]
186    let wapc_store = WapcStoreAsync::new(None);
187
188    let store = Store::new(&engine, wapc_store);
189
190    match &self.inner {
191      Some(state) => {
192        let mut new = Self {
193          module: self.module.clone(),
194          inner: None,
195          engine,
196          epoch_deadlines: self.epoch_deadlines,
197          linker: self.linker.clone(),
198          instance_pre: self.instance_pre.clone(),
199          store,
200          #[cfg(feature = "wasi")]
201          wasi_params: self.wasi_params.clone(),
202        };
203
204        tokio::runtime::Handle::current().block_on(async {
205          new.init(state.host.clone()).await.unwrap();
206        });
207
208        new
209      }
210      None => Self {
211        module: self.module.clone(),
212        inner: None,
213        engine,
214        epoch_deadlines: self.epoch_deadlines,
215        linker: self.linker.clone(),
216        instance_pre: self.instance_pre.clone(),
217        store,
218        #[cfg(feature = "wasi")]
219        wasi_params: self.wasi_params.clone(),
220      },
221    }
222  }
223}
224
225#[async_trait]
226impl WebAssemblyEngineProviderAsync for WasmtimeEngineProviderAsync {
227  async fn init(
228    &mut self,
229    host: Arc<ModuleStateAsync>,
230  ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
231    // create the proper store, now we have a value for `host`
232    #[cfg(feature = "wasi")]
233    let wapc_store = WapcStoreAsync::new(&self.wasi_params, Some(host.clone()))?;
234    #[cfg(not(feature = "wasi"))]
235    let wapc_store = WapcStoreAsync::new(Some(host.clone()));
236
237    self.store = Store::new(&self.engine, wapc_store);
238
239    let instance = self.instance_pre.instantiate_async(&mut self.store).await?;
240
241    let instance_ref = Arc::new(RwLock::new(instance));
242    let gc = guest_call_fn(&mut self.store, &instance_ref)?;
243    self.inner = Some(EngineInner {
244      instance: instance_ref,
245      guest_call_fn: gc,
246      host,
247    });
248    self.initialize().await?;
249    Ok(())
250  }
251
252  async fn call(
253    &mut self,
254    op_length: i32,
255    msg_length: i32,
256  ) -> std::result::Result<i32, Box<dyn std::error::Error + Send + Sync>> {
257    if let Some(deadlines) = &self.epoch_deadlines {
258      // the deadline counter must be set before invoking the wasm function
259      self.store.set_epoch_deadline(deadlines.wapc_func);
260    }
261
262    let engine_inner = self.inner.as_ref().unwrap();
263    let call = engine_inner
264      .guest_call_fn
265      .call_async(&mut self.store, (op_length, msg_length))
266      .await;
267
268    match call {
269      Ok(result) => Ok(result),
270      Err(err) => {
271        error!("Failure invoking guest module handler: {err:?}");
272        let mut guest_error = err.to_string();
273        if let Some(trap) = err.downcast_ref::<wasmtime::Trap>() {
274          if matches!(trap, wasmtime::Trap::Interrupt) {
275            "guest code interrupted, execution deadline exceeded".clone_into(&mut guest_error);
276          }
277        }
278        engine_inner.host.set_guest_error(guest_error).await;
279        Ok(0)
280      }
281    }
282  }
283
284  async fn replace(&mut self, module: &[u8]) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
285    info!(
286      "HOT SWAP - Replacing existing WebAssembly module with new buffer, {} bytes",
287      module.len()
288    );
289
290    let module = Module::new(&self.engine, module)?;
291    self.module = module;
292    self.instance_pre = self.linker.instantiate_pre(&self.module)?;
293    let new_instance = self.instance_pre.instantiate_async(&mut self.store).await?;
294    if let Some(inner) = self.inner.as_mut() {
295      *inner.instance.write() = new_instance;
296      let gc = guest_call_fn(&mut self.store, &inner.instance)?;
297      inner.guest_call_fn = gc;
298    }
299
300    Ok(self.initialize().await?)
301  }
302}
303
304impl WasmtimeEngineProviderAsync {
305  async fn initialize(&mut self) -> Result<()> {
306    for starter in wapc_functions::REQUIRED_STARTS.iter() {
307      if let Some(deadlines) = &self.epoch_deadlines {
308        // the deadline counter must be set before invoking the wasm function
309        self.store.set_epoch_deadline(deadlines.wapc_init);
310      }
311
312      let engine_inner = self.inner.as_ref().unwrap();
313      if engine_inner
314        .instance
315        .read()
316        .get_export(&mut self.store, starter)
317        .is_some()
318      {
319        // Need to get a `wasmtime::TypedFunc` because its `call` method
320        // can return a Trap error. Non-typed functions instead return a
321        // generic `anyhow::Error` that doesn't allow nice handling of
322        // errors
323        let starter_func: TypedFunc<(), ()> = engine_inner.instance.read().get_typed_func(&mut self.store, starter)?;
324
325        if let Err(err) = starter_func.call_async(&mut self.store, ()).await {
326          trace!(function = starter, ?err, "handling error returned by init function");
327          if let Some(trap) = err.downcast_ref::<wasmtime::Trap>() {
328            if matches!(trap, wasmtime::Trap::Interrupt) {
329              return Err(Error::InitializationFailedTimeout((*starter).to_owned()));
330            }
331            return Err(Error::InitializationFailed(err.to_string()));
332          }
333
334          // WASI programs built by tinygo have to be written with a `main` function, even if it's empty.
335          // Starting from tinygo >= 0.35.0, the `main` function calls the WASI process exit function,
336          // which is handled by wasmtime as an Error.
337          //
338          // We must check if this error can be converted into a WASI exit
339          // error and, if the exit code is 0, we can ignore it. Otherwise the waPC initialization
340          // will fail.
341          #[cfg(feature = "wasi")]
342          if let Some(exit_err) = err.downcast_ref::<wasi_common::I32Exit>() {
343            if exit_err.0 != 0 {
344              return Err(Error::InitializationFailed(err.to_string()));
345            }
346            trace!("ignoring successful exit trap generated by WASI");
347            continue;
348          }
349
350          return Err(Error::InitializationFailed(err.to_string()));
351        };
352      }
353    }
354    Ok(())
355  }
356}
357
358// Called once, then the result is cached. This returns a `Func` that corresponds
359// to the `__guest_call` export
360fn guest_call_fn(store: impl AsContextMut, instance: &Arc<RwLock<Instance>>) -> Result<TypedFunc<(i32, i32), i32>> {
361  instance
362    .read()
363    .get_typed_func::<(i32, i32), i32>(store, wapc_functions::GUEST_CALL)
364    .map_err(|_| Error::GuestCallNotFound)
365}