1use std::sync::Arc;
2
3use async_trait::async_trait;
4use log::{error, info};
5use parking_lot::RwLock;
6use tracing::trace;
7#[cfg(feature = "wasi")]
8use wapc::WasiParams;
9use wapc::{wapc_functions, ModuleStateAsync, WebAssemblyEngineProviderAsync};
10use wasmtime::{AsContextMut, Engine, Instance, InstancePre, Linker, Module, Store, TypedFunc};
11
12use crate::errors::{Error, Result};
13use crate::store_async::WapcStoreAsync;
14use crate::{callbacks_async, EpochDeadlines};
15
16struct EngineInner {
17 instance: Arc<RwLock<Instance>>,
18 guest_call_fn: TypedFunc<(i32, i32), i32>,
19 host: Arc<ModuleStateAsync>,
20}
21
22#[allow(missing_debug_implementations)]
28#[derive(Clone)]
29pub struct WasmtimeEngineProviderAsyncPre {
30 module: Module,
31 #[cfg(feature = "wasi")]
32 wasi_params: WasiParams,
33 engine: Engine,
34 linker: Linker<WapcStoreAsync>,
35 instance_pre: InstancePre<WapcStoreAsync>,
36 epoch_deadlines: Option<EpochDeadlines>,
37}
38
39impl WasmtimeEngineProviderAsyncPre {
40 #[cfg(feature = "wasi")]
41 pub(crate) fn new(
42 engine: Engine,
43 module: Module,
44 wasi: Option<WasiParams>,
45 epoch_deadlines: Option<EpochDeadlines>,
46 ) -> Result<Self> {
47 let mut linker: Linker<WapcStoreAsync> = Linker::new(&engine);
48
49 let wasi_params = wasi.unwrap_or_default();
50 wasi_common::tokio::add_to_linker(&mut linker, |s: &mut WapcStoreAsync| &mut s.wasi_ctx).unwrap();
51
52 callbacks_async::add_to_linker(&mut linker)?;
54
55 let instance_pre = linker.instantiate_pre(&module)?;
56
57 Ok(Self {
58 module,
59 wasi_params,
60 engine,
61 linker,
62 instance_pre,
63 epoch_deadlines,
64 })
65 }
66
67 #[cfg(not(feature = "wasi"))]
68 pub(crate) fn new(engine: Engine, module: Module, epoch_deadlines: Option<EpochDeadlines>) -> Result<Self> {
69 let mut linker: Linker<WapcStoreAsync> = Linker::new(&engine);
70
71 callbacks_async::add_to_linker(&mut linker)?;
73
74 let instance_pre = linker.instantiate_pre(&module)?;
75
76 Ok(Self {
77 module,
78 engine,
79 linker,
80 instance_pre,
81 epoch_deadlines,
82 })
83 }
84
85 pub fn rehydrate(&self) -> Result<WasmtimeEngineProviderAsync> {
90 let engine = self.engine.clone();
91
92 #[cfg(feature = "wasi")]
93 let wapc_store = WapcStoreAsync::new(&self.wasi_params, None)?;
94 #[cfg(not(feature = "wasi"))]
95 let wapc_store = WapcStoreAsync::new(None);
96
97 let store = Store::new(&engine, wapc_store);
98
99 Ok(WasmtimeEngineProviderAsync {
100 module: self.module.clone(),
101 inner: None,
102 engine,
103 epoch_deadlines: self.epoch_deadlines,
104 linker: self.linker.clone(),
105 instance_pre: self.instance_pre.clone(),
106 store,
107 #[cfg(feature = "wasi")]
108 wasi_params: self.wasi_params.clone(),
109 })
110 }
111}
112
113#[allow(missing_debug_implementations)]
167pub struct WasmtimeEngineProviderAsync {
168 module: Module,
169 #[cfg(feature = "wasi")]
170 wasi_params: WasiParams,
171 inner: Option<EngineInner>,
172 engine: Engine,
173 linker: Linker<WapcStoreAsync>,
174 store: Store<WapcStoreAsync>,
175 instance_pre: InstancePre<WapcStoreAsync>,
176 epoch_deadlines: Option<EpochDeadlines>,
177}
178
179impl Clone for WasmtimeEngineProviderAsync {
180 fn clone(&self) -> Self {
181 let engine = self.engine.clone();
182
183 #[cfg(feature = "wasi")]
184 let wapc_store = WapcStoreAsync::new(&self.wasi_params, None).unwrap();
185 #[cfg(not(feature = "wasi"))]
186 let wapc_store = WapcStoreAsync::new(None);
187
188 let store = Store::new(&engine, wapc_store);
189
190 match &self.inner {
191 Some(state) => {
192 let mut new = Self {
193 module: self.module.clone(),
194 inner: None,
195 engine,
196 epoch_deadlines: self.epoch_deadlines,
197 linker: self.linker.clone(),
198 instance_pre: self.instance_pre.clone(),
199 store,
200 #[cfg(feature = "wasi")]
201 wasi_params: self.wasi_params.clone(),
202 };
203
204 tokio::runtime::Handle::current().block_on(async {
205 new.init(state.host.clone()).await.unwrap();
206 });
207
208 new
209 }
210 None => Self {
211 module: self.module.clone(),
212 inner: None,
213 engine,
214 epoch_deadlines: self.epoch_deadlines,
215 linker: self.linker.clone(),
216 instance_pre: self.instance_pre.clone(),
217 store,
218 #[cfg(feature = "wasi")]
219 wasi_params: self.wasi_params.clone(),
220 },
221 }
222 }
223}
224
225#[async_trait]
226impl WebAssemblyEngineProviderAsync for WasmtimeEngineProviderAsync {
227 async fn init(
228 &mut self,
229 host: Arc<ModuleStateAsync>,
230 ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
231 #[cfg(feature = "wasi")]
233 let wapc_store = WapcStoreAsync::new(&self.wasi_params, Some(host.clone()))?;
234 #[cfg(not(feature = "wasi"))]
235 let wapc_store = WapcStoreAsync::new(Some(host.clone()));
236
237 self.store = Store::new(&self.engine, wapc_store);
238
239 let instance = self.instance_pre.instantiate_async(&mut self.store).await?;
240
241 let instance_ref = Arc::new(RwLock::new(instance));
242 let gc = guest_call_fn(&mut self.store, &instance_ref)?;
243 self.inner = Some(EngineInner {
244 instance: instance_ref,
245 guest_call_fn: gc,
246 host,
247 });
248 self.initialize().await?;
249 Ok(())
250 }
251
252 async fn call(
253 &mut self,
254 op_length: i32,
255 msg_length: i32,
256 ) -> std::result::Result<i32, Box<dyn std::error::Error + Send + Sync>> {
257 if let Some(deadlines) = &self.epoch_deadlines {
258 self.store.set_epoch_deadline(deadlines.wapc_func);
260 }
261
262 let engine_inner = self.inner.as_ref().unwrap();
263 let call = engine_inner
264 .guest_call_fn
265 .call_async(&mut self.store, (op_length, msg_length))
266 .await;
267
268 match call {
269 Ok(result) => Ok(result),
270 Err(err) => {
271 error!("Failure invoking guest module handler: {err:?}");
272 let mut guest_error = err.to_string();
273 if let Some(trap) = err.downcast_ref::<wasmtime::Trap>() {
274 if matches!(trap, wasmtime::Trap::Interrupt) {
275 "guest code interrupted, execution deadline exceeded".clone_into(&mut guest_error);
276 }
277 }
278 engine_inner.host.set_guest_error(guest_error).await;
279 Ok(0)
280 }
281 }
282 }
283
284 async fn replace(&mut self, module: &[u8]) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
285 info!(
286 "HOT SWAP - Replacing existing WebAssembly module with new buffer, {} bytes",
287 module.len()
288 );
289
290 let module = Module::new(&self.engine, module)?;
291 self.module = module;
292 self.instance_pre = self.linker.instantiate_pre(&self.module)?;
293 let new_instance = self.instance_pre.instantiate_async(&mut self.store).await?;
294 if let Some(inner) = self.inner.as_mut() {
295 *inner.instance.write() = new_instance;
296 let gc = guest_call_fn(&mut self.store, &inner.instance)?;
297 inner.guest_call_fn = gc;
298 }
299
300 Ok(self.initialize().await?)
301 }
302}
303
304impl WasmtimeEngineProviderAsync {
305 async fn initialize(&mut self) -> Result<()> {
306 for starter in wapc_functions::REQUIRED_STARTS.iter() {
307 if let Some(deadlines) = &self.epoch_deadlines {
308 self.store.set_epoch_deadline(deadlines.wapc_init);
310 }
311
312 let engine_inner = self.inner.as_ref().unwrap();
313 if engine_inner
314 .instance
315 .read()
316 .get_export(&mut self.store, starter)
317 .is_some()
318 {
319 let starter_func: TypedFunc<(), ()> = engine_inner.instance.read().get_typed_func(&mut self.store, starter)?;
324
325 if let Err(err) = starter_func.call_async(&mut self.store, ()).await {
326 trace!(function = starter, ?err, "handling error returned by init function");
327 if let Some(trap) = err.downcast_ref::<wasmtime::Trap>() {
328 if matches!(trap, wasmtime::Trap::Interrupt) {
329 return Err(Error::InitializationFailedTimeout((*starter).to_owned()));
330 }
331 return Err(Error::InitializationFailed(err.to_string()));
332 }
333
334 #[cfg(feature = "wasi")]
342 if let Some(exit_err) = err.downcast_ref::<wasi_common::I32Exit>() {
343 if exit_err.0 != 0 {
344 return Err(Error::InitializationFailed(err.to_string()));
345 }
346 trace!("ignoring successful exit trap generated by WASI");
347 continue;
348 }
349
350 return Err(Error::InitializationFailed(err.to_string()));
351 };
352 }
353 }
354 Ok(())
355 }
356}
357
358fn guest_call_fn(store: impl AsContextMut, instance: &Arc<RwLock<Instance>>) -> Result<TypedFunc<(i32, i32), i32>> {
361 instance
362 .read()
363 .get_typed_func::<(i32, i32), i32>(store, wapc_functions::GUEST_CALL)
364 .map_err(|_| Error::GuestCallNotFound)
365}