1use std::sync::Arc;
2
3use log::{error, info};
4
5use async_trait::async_trait;
6use parking_lot::RwLock;
7use tracing::trace;
8#[cfg(feature = "wasi")]
9use wapc::WasiParams;
10use wapc::{wapc_functions, ModuleStateAsync, WebAssemblyEngineProviderAsync};
11use wasmtime::{AsContextMut, Engine, Instance, InstancePre, Linker, Module, Store, TypedFunc};
12
13use crate::callbacks_async;
14use crate::errors::{Error, Result};
15use crate::store_async::WapcStoreAsync;
16use crate::EpochDeadlines;
17
18struct EngineInner {
19 instance: Arc<RwLock<Instance>>,
20 guest_call_fn: TypedFunc<(i32, i32), i32>,
21 host: Arc<ModuleStateAsync>,
22}
23
24#[allow(missing_debug_implementations)]
30#[derive(Clone)]
31pub struct WasmtimeEngineProviderAsyncPre {
32 module: Module,
33 #[cfg(feature = "wasi")]
34 wasi_params: WasiParams,
35 engine: Engine,
36 linker: Linker<WapcStoreAsync>,
37 instance_pre: InstancePre<WapcStoreAsync>,
38 epoch_deadlines: Option<EpochDeadlines>,
39}
40
41impl WasmtimeEngineProviderAsyncPre {
42 #[cfg(feature = "wasi")]
43 pub(crate) fn new(
44 engine: Engine,
45 module: Module,
46 wasi: Option<WasiParams>,
47 epoch_deadlines: Option<EpochDeadlines>,
48 ) -> Result<Self> {
49 let mut linker: Linker<WapcStoreAsync> = Linker::new(&engine);
50
51 let wasi_params = wasi.unwrap_or_default();
52 wasi_common::tokio::add_to_linker(&mut linker, |s: &mut WapcStoreAsync| &mut s.wasi_ctx).unwrap();
53
54 callbacks_async::add_to_linker(&mut linker)?;
56
57 let instance_pre = linker.instantiate_pre(&module)?;
58
59 Ok(Self {
60 module,
61 wasi_params,
62 engine,
63 linker,
64 instance_pre,
65 epoch_deadlines,
66 })
67 }
68
69 #[cfg(not(feature = "wasi"))]
70 pub(crate) fn new(engine: Engine, module: Module, epoch_deadlines: Option<EpochDeadlines>) -> Result<Self> {
71 let mut linker: Linker<WapcStoreAsync> = Linker::new(&engine);
72
73 callbacks_async::add_to_linker(&mut linker)?;
75
76 let instance_pre = linker.instantiate_pre(&module)?;
77
78 Ok(Self {
79 module,
80 engine,
81 linker,
82 instance_pre,
83 epoch_deadlines,
84 })
85 }
86
87 pub fn rehydrate(&self) -> Result<WasmtimeEngineProviderAsync> {
92 let engine = self.engine.clone();
93
94 #[cfg(feature = "wasi")]
95 let wapc_store = WapcStoreAsync::new(&self.wasi_params, None)?;
96 #[cfg(not(feature = "wasi"))]
97 let wapc_store = WapcStoreAsync::new(None);
98
99 let store = Store::new(&engine, wapc_store);
100
101 Ok(WasmtimeEngineProviderAsync {
102 module: self.module.clone(),
103 inner: None,
104 engine,
105 epoch_deadlines: self.epoch_deadlines,
106 linker: self.linker.clone(),
107 instance_pre: self.instance_pre.clone(),
108 store,
109 #[cfg(feature = "wasi")]
110 wasi_params: self.wasi_params.clone(),
111 })
112 }
113}
114
115#[allow(missing_debug_implementations)]
169pub struct WasmtimeEngineProviderAsync {
170 module: Module,
171 #[cfg(feature = "wasi")]
172 wasi_params: WasiParams,
173 inner: Option<EngineInner>,
174 engine: Engine,
175 linker: Linker<WapcStoreAsync>,
176 store: Store<WapcStoreAsync>,
177 instance_pre: InstancePre<WapcStoreAsync>,
178 epoch_deadlines: Option<EpochDeadlines>,
179}
180
181impl Clone for WasmtimeEngineProviderAsync {
182 fn clone(&self) -> Self {
183 let engine = self.engine.clone();
184
185 #[cfg(feature = "wasi")]
186 let wapc_store = WapcStoreAsync::new(&self.wasi_params, None).unwrap();
187 #[cfg(not(feature = "wasi"))]
188 let wapc_store = WapcStoreAsync::new(None);
189
190 let store = Store::new(&engine, wapc_store);
191
192 match &self.inner {
193 Some(state) => {
194 let mut new = Self {
195 module: self.module.clone(),
196 inner: None,
197 engine,
198 epoch_deadlines: self.epoch_deadlines,
199 linker: self.linker.clone(),
200 instance_pre: self.instance_pre.clone(),
201 store,
202 #[cfg(feature = "wasi")]
203 wasi_params: self.wasi_params.clone(),
204 };
205
206 tokio::runtime::Handle::current().block_on(async {
207 new.init(state.host.clone()).await.unwrap();
208 });
209
210 new
211 }
212 None => Self {
213 module: self.module.clone(),
214 inner: None,
215 engine,
216 epoch_deadlines: self.epoch_deadlines,
217 linker: self.linker.clone(),
218 instance_pre: self.instance_pre.clone(),
219 store,
220 #[cfg(feature = "wasi")]
221 wasi_params: self.wasi_params.clone(),
222 },
223 }
224 }
225}
226
227#[async_trait]
228impl WebAssemblyEngineProviderAsync for WasmtimeEngineProviderAsync {
229 async fn init(
230 &mut self,
231 host: Arc<ModuleStateAsync>,
232 ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
233 #[cfg(feature = "wasi")]
235 let wapc_store = WapcStoreAsync::new(&self.wasi_params, Some(host.clone()))?;
236 #[cfg(not(feature = "wasi"))]
237 let wapc_store = WapcStoreAsync::new(Some(host.clone()));
238
239 self.store = Store::new(&self.engine, wapc_store);
240
241 let instance = self.instance_pre.instantiate_async(&mut self.store).await?;
242
243 let instance_ref = Arc::new(RwLock::new(instance));
244 let gc = guest_call_fn(&mut self.store, &instance_ref)?;
245 self.inner = Some(EngineInner {
246 instance: instance_ref,
247 guest_call_fn: gc,
248 host,
249 });
250 self.initialize().await?;
251 Ok(())
252 }
253
254 async fn call(
255 &mut self,
256 op_length: i32,
257 msg_length: i32,
258 ) -> std::result::Result<i32, Box<dyn std::error::Error + Send + Sync>> {
259 if let Some(deadlines) = &self.epoch_deadlines {
260 self.store.set_epoch_deadline(deadlines.wapc_func);
262 }
263
264 let engine_inner = self.inner.as_ref().unwrap();
265 let call = engine_inner
266 .guest_call_fn
267 .call_async(&mut self.store, (op_length, msg_length))
268 .await;
269
270 match call {
271 Ok(result) => Ok(result),
272 Err(err) => {
273 error!("Failure invoking guest module handler: {:?}", err);
274 let mut guest_error = err.to_string();
275 if let Some(trap) = err.downcast_ref::<wasmtime::Trap>() {
276 if matches!(trap, wasmtime::Trap::Interrupt) {
277 "guest code interrupted, execution deadline exceeded".clone_into(&mut guest_error);
278 }
279 }
280 engine_inner.host.set_guest_error(guest_error).await;
281 Ok(0)
282 }
283 }
284 }
285
286 async fn replace(&mut self, module: &[u8]) -> std::result::Result<(), Box<(dyn std::error::Error + Send + Sync)>> {
287 info!(
288 "HOT SWAP - Replacing existing WebAssembly module with new buffer, {} bytes",
289 module.len()
290 );
291
292 let module = Module::new(&self.engine, module)?;
293 self.module = module;
294 self.instance_pre = self.linker.instantiate_pre(&self.module)?;
295 let new_instance = self.instance_pre.instantiate_async(&mut self.store).await?;
296 if let Some(inner) = self.inner.as_mut() {
297 *inner.instance.write() = new_instance;
298 let gc = guest_call_fn(&mut self.store, &inner.instance)?;
299 inner.guest_call_fn = gc;
300 }
301
302 Ok(self.initialize().await?)
303 }
304}
305
306impl WasmtimeEngineProviderAsync {
307 async fn initialize(&mut self) -> Result<()> {
308 for starter in wapc_functions::REQUIRED_STARTS.iter() {
309 if let Some(deadlines) = &self.epoch_deadlines {
310 self.store.set_epoch_deadline(deadlines.wapc_init);
312 }
313
314 let engine_inner = self.inner.as_ref().unwrap();
315 if engine_inner
316 .instance
317 .read()
318 .get_export(&mut self.store, starter)
319 .is_some()
320 {
321 let starter_func: TypedFunc<(), ()> = engine_inner.instance.read().get_typed_func(&mut self.store, starter)?;
326
327 if let Err(err) = starter_func.call_async(&mut self.store, ()).await {
328 trace!(function = starter, ?err, "handling error returned by init function");
329 if let Some(trap) = err.downcast_ref::<wasmtime::Trap>() {
330 if matches!(trap, wasmtime::Trap::Interrupt) {
331 return Err(Error::InitializationFailedTimeout((*starter).to_owned()));
332 }
333 return Err(Error::InitializationFailed(err.to_string()));
334 }
335
336 #[cfg(feature = "wasi")]
344 if let Some(exit_err) = err.downcast_ref::<wasi_common::I32Exit>() {
345 if exit_err.0 != 0 {
346 return Err(Error::InitializationFailed(err.to_string()));
347 }
348 trace!("ignoring successful exit trap generated by WASI");
349 continue;
350 }
351
352 return Err(Error::InitializationFailed(err.to_string()));
353 };
354 }
355 }
356 Ok(())
357 }
358}
359
360fn guest_call_fn(store: impl AsContextMut, instance: &Arc<RwLock<Instance>>) -> Result<TypedFunc<(i32, i32), i32>> {
363 instance
364 .read()
365 .get_typed_func::<(i32, i32), i32>(store, wapc_functions::GUEST_CALL)
366 .map_err(|_| Error::GuestCallNotFound)
367}