#![warn(missing_docs)]
use std::sync::Arc;
use std::sync::RwLock;
use loader::Handler;
use reqwest;
use serde::Deserialize;
use serde::Serialize;
use super::MessageHandler;
use crate::error::Error;
use crate::error::Result;
use crate::prelude::*;
use crate::provider::Provider;
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub enum Path {
Local(String),
Remote(String),
}
#[derive(Deserialize, Serialize, Debug, Clone, Default, PartialEq, Eq)]
pub struct ExtensionConfig {
pub paths: Vec<Path>,
}
pub struct Extension {
handlers: Vec<Handler>,
}
pub trait ExtensionHandlerCaller {
fn call(&self, msg: bytes::Bytes, provider: Arc<Provider>) -> Result<()>;
}
#[derive(Clone, Default)]
pub struct WasmABIContainer {
msg: Arc<RwLock<Option<Box<bytes::Bytes>>>>,
provider: Arc<RwLock<Option<Arc<Provider>>>>,
}
impl WasmABIContainer {
pub fn set_message(&self, msg: bytes::Bytes) -> Result<()> {
let mut guard = self
.msg
.write()
.map_err(|_| Error::WasmBackendMessageRwLockError)?;
*guard = Some(Box::new(msg));
Ok(())
}
pub fn set_provider(&self, provider: Arc<Provider>) -> Result<()> {
let mut guard = self
.provider
.write()
.map_err(|_| Error::WasmBackendMessageRwLockError)?;
*guard = Some(provider);
Ok(())
}
pub fn new(msg: Option<bytes::Bytes>, provider: Arc<Provider>) -> Self {
Self {
msg: Arc::new(RwLock::new(msg.map(Box::new))),
provider: Arc::new(RwLock::new(Some(provider))),
}
}
}
impl Extension {
async fn load(path: &Path) -> Result<Handler> {
match path {
Path::Local(path) => loader::load_from_fs(path.to_string()).await,
Path::Remote(path) => {
let data: String = reqwest::get(path)
.await
.map_err(|e| Error::HttpRequestError(e.to_string()))?
.text()
.await
.map_err(|e| Error::HttpRequestError(e.to_string()))?;
loader::load(data).await
}
}
}
pub async fn new(config: &ExtensionConfig) -> Result<Self> {
let mut handlers = vec![];
for p in &config.paths {
if let Ok(h) = Self::load(p).await {
handlers.push(h)
} else {
log::error!("Failed on loading extension {:?}", p)
}
}
Ok(Self { handlers })
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl MessageHandler<bytes::Bytes> for Extension {
async fn handle_message(
&self,
provider: Arc<Provider>,
_ctx: &MessagePayload,
data: &bytes::Bytes,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
for h in &self.handlers {
h.call(data.clone(), provider.clone())?;
}
Ok(())
}
}
pub mod loader {
use core::any::Any;
use std::ffi::CStr;
use std::fs;
use std::os::raw::c_char;
use std::sync::Arc;
use std::sync::RwLock;
use lazy_static::lazy_static;
use wasmer::imports;
use wasmer::AsStoreMut;
use wasmer::ExternRef;
use wasmer::FromToNativeWasmType;
use wasmer::FunctionEnv;
use wasmer::FunctionEnvMut;
use wasmer::FunctionType;
use wasmer::Type;
use wasmer::TypedFunction;
use wasmer::Value;
use super::WasmABIContainer;
use crate::error::Error;
use crate::error::Result;
use crate::provider::Provider;
lazy_static! {
static ref WASM_MEM: Arc<RwLock<wasmer::Store>> =
Arc::new(RwLock::new(wasmer::Store::default()));
}
impl TryFrom<WasmABIContainer> for FunctionEnv<WasmABIContainer> {
type Error = Error;
fn try_from(data: WasmABIContainer) -> Result<Self> {
let mut mem = WASM_MEM
.write()
.map_err(|_| Error::WasmGlobalMemoryLockError)?;
Ok(FunctionEnv::new(&mut mem, data))
}
}
pub trait WasmABILander: Sized + Any + Send + 'static {
fn land_abi(env: &FunctionEnv<Self>, store: &mut impl AsStoreMut) -> wasmer::Imports;
}
impl WasmABILander for WasmABIContainer {
fn land_abi(env: &FunctionEnv<Self>, store: &mut impl AsStoreMut) -> wasmer::Imports {
let request = wasmer::Function::new_with_env(
store,
env,
FunctionType::new(vec![Type::ExternRef, Type::I32, Type::I32], vec![]),
WasmABIContainer::request,
);
#[rustfmt::skip]
imports! {
"message_abi" => {
"request" => request
}
}
}
}
impl WasmABIContainer {
pub fn request(
env: FunctionEnvMut<WasmABIContainer>,
params: &[Value],
) -> core::result::Result<Vec<Value>, wasmer::RuntimeError> {
match params {
[Value::ExternRef(_), Value::I32(method_ptr), Value::I32(params_ptr)] => {
let container: WasmABIContainer = env.data().clone();
let method_cstr = unsafe { CStr::from_ptr((*method_ptr) as *const c_char) };
let method = method_cstr.to_str().unwrap();
let params_cstr = unsafe { CStr::from_ptr((*params_ptr) as *const c_char) };
let params = params_cstr.to_str().unwrap();
let guard = container.provider.read().map_err(|_| {
wasmer::RuntimeError::new("Failed on lock memory of external ref")
})?;
if let Some(provider) = guard.clone() {
let params = serde_json::from_str(params)
.map_err(|_| wasmer::RuntimeError::new("Failed to serialize params"))?;
futures::executor::block_on(
provider.request_internal(method.to_string(), params),
)
.map_err(|_| {
wasmer::RuntimeError::new("Failed to call async request function")
})?;
} else {
return Err(wasmer::RuntimeError::new(
"Failed on call func `write_at`:: ExternalRef is NULL",
));
}
Ok(vec![])
}
x => Err(wasmer::RuntimeError::new(format!(
"Expect [Externef, i32], got {:?}",
x
))),
}
}
}
unsafe impl FromToNativeWasmType for WasmABIContainer {
type Native = Option<ExternRef>;
fn from_native(native: Self::Native) -> Self {
if native.is_none() {
return Self::default();
}
match WASM_MEM
.read()
.map_err(|_| Error::WasmGlobalMemoryLockError)
{
Ok(mem) => {
if let Some(m) = native.unwrap().downcast::<Self>(&mem) {
m.clone()
} else {
Self::default()
}
}
Err(e) => {
log::error!("{:?}", e);
Self::default()
}
}
}
fn to_native(self) -> Self::Native {
match WASM_MEM
.write()
.map_err(|_| Error::WasmGlobalMemoryLockError)
{
Ok(mut mem) => {
let ext_ref = ExternRef::new::<Self>(&mut mem, self);
if ext_ref.is_from_store(&mem) {
Some(ext_ref)
} else {
None
}
}
Err(e) => {
log::error!("{:?}", e);
None
}
}
}
}
type TyHandler = TypedFunction<Option<ExternRef>, Option<ExternRef>>;
pub struct Handler {
pub func: TyHandler,
pub container: WasmABIContainer,
}
impl super::ExtensionHandlerCaller for Handler {
fn call(&self, msg: bytes::Bytes, provider: Arc<Provider>) -> Result<()> {
self.container.set_message(msg)?;
self.container.set_provider(provider)?;
let native_container = self.container.clone().to_native();
{
let mut mem = WASM_MEM
.write()
.map_err(|_| Error::WasmGlobalMemoryLockError)?;
self.func
.call(&mut mem, native_container)
.map_err(|e| Error::WasmRuntimeError(e.to_string()))?
};
Ok(())
}
}
pub async fn load(bytes: impl AsRef<[u8]>) -> Result<Handler> {
let container = WasmABIContainer::default();
let env: FunctionEnv<WasmABIContainer> = container.clone().try_into()?;
let mut store = WASM_MEM
.write()
.map_err(|_| Error::WasmGlobalMemoryLockError)?;
let module = wasmer::Module::new(&store, &bytes)
.map_err(|e| Error::WasmCompileError(e.to_string()))?;
let import_object = WasmABIContainer::land_abi(&env, &mut store);
let ins = wasmer::Instance::new(&mut store, &module, &import_object)
.map_err(|_| Error::WasmInstantiationError)?;
let exports: wasmer::Exports = ins.exports;
let handler: TyHandler = exports
.get_function("handler")
.map_err(|_| Error::WasmExportError)?
.typed(&store)
.map_err(|_| Error::WasmExportError)?;
Ok(Handler {
func: handler,
container,
})
}
pub async fn load_from_fs(path: String) -> Result<Handler> {
if let Ok(wat) = fs::read_to_string(path) {
load(wat).await
} else {
Err(Error::WasmFailedToLoadFile)
}
}
}
#[cfg(not(feature = "browser"))]
#[cfg(test)]
mod test {
use crate::backend::native::extension::loader::load;
#[tokio::test]
async fn test_load_wasm() {
let wasm = r#"
(module
;; fn handler(param: ExternRef) -> ExternRef
(func $handler (param externref) (result externref)
(return (local.get 0))
)
(export "handler" (func $handler))
)
"#;
let _handler = load(wasm.to_string()).await.unwrap();
}
}