pub mod executor;
pub mod plugin_manifest;
pub mod wit;
use anyhow::{Context, Result};
use futures::{
channel::{
mpsc::{self, UnboundedSender},
oneshot,
},
future::BoxFuture,
FutureExt, StreamExt,
};
use plugin_manifest::PluginManifest;
use std::{
collections::HashMap,
path::PathBuf,
sync::{Arc, Mutex, OnceLock},
};
use wasmtime::{component::Linker, Store};
use wit::Extension;
use extension_http::HttpClient;
type ExtensionCall = Box<
dyn Send + for<'a> FnOnce(&'a mut Extension, &'a mut Store<WasmState>) -> BoxFuture<'a, ()>,
>;
struct WasmState {
pub table: wasmtime_wasi::ResourceTable,
ctx: wasmtime_wasi::WasiCtx,
pub host: Arc<WasmHost>,
}
pub struct WasmHost {
engine: wasmtime::Engine,
http_client: Arc<dyn HttpClient>,
pub work_dir: PathBuf,
}
fn wasm_engine() -> wasmtime::Engine {
static WASM_ENGINE: OnceLock<wasmtime::Engine> = OnceLock::new();
WASM_ENGINE
.get_or_init(|| {
let mut config = wasmtime::Config::new();
config
.async_support(true)
.wasm_component_model(true)
.debug_info(true)
.wasm_memory64(true);
let engine = wasmtime::Engine::new(&config).unwrap();
engine
})
.clone()
}
impl WasmHost {
pub fn new(work_dir: PathBuf) -> Arc<Self> {
Arc::new(Self {
engine: wasm_engine(),
http_client: Arc::new(::extension_http::DefaultHttpClient::new(
None,
Some(vec![
"https?://httpbin.org/**".to_string(),
"https?://www.baidu.com".to_string(),
]),
)),
work_dir,
})
}
pub fn load_plugin(
self: &Arc<Self>,
wasm_bytes: Vec<u8>,
manifest: &Arc<PluginManifest>,
executor: executor::BackgroundExecutor,
) -> executor::Task<Result<WasmExtension>> {
let this = self.clone();
let manifest = manifest.clone();
let extension_work_dir = self.work_dir.join(manifest.id.clone());
executor.clone().spawn(async move {
let component = wasmtime::component::Component::from_binary(&this.engine, &wasm_bytes)
.context("failed to compile wasm component")?;
let mut linker: Linker<WasmState> = Linker::<WasmState>::new(&this.engine);
wasmtime_wasi::add_to_linker_async(&mut linker)?;
Extension::add_to_linker(&mut linker, |s: &mut WasmState| s)?;
let wasi_ctx = wasmtime_wasi::WasiCtx::builder()
.inherit_stdio()
.preopened_dir(
&extension_work_dir,
".",
wasmtime_wasi::DirPerms::all(),
wasmtime_wasi::FilePerms::all(),
)?
.env("PWD", extension_work_dir.to_string_lossy())
.build();
let mut store = wasmtime::Store::new(
&this.engine,
WasmState {
ctx: wasi_ctx,
table: wasmtime_wasi::ResourceTable::new(),
host: this.clone(),
},
);
let mut extension =
Extension::instantiate_async(&mut store, &component, &linker).await?;
extension
.call_init_plugin(&mut store)
.await
.context("failed to initialize wasm extension")?;
let (sender, mut receiver) = mpsc::unbounded::<ExtensionCall>();
executor
.spawn(async move {
while let Some(call) = receiver.next().await {
(call)(&mut extension, &mut store).await;
}
})
.detach();
Ok(WasmExtension {
tx: sender,
work_dir: this.work_dir.join(manifest.id.clone()).into(),
manifest: manifest.clone(),
})
})
}
}
#[derive(Clone)]
pub struct WasmExtension {
tx: UnboundedSender<ExtensionCall>,
pub manifest: Arc<PluginManifest>,
pub work_dir: Arc<PathBuf>,
}
static GLOBAL_EXTENSION: OnceLock<Mutex<HashMap<String, WasmExtension>>> = OnceLock::new();
impl WasmExtension {
pub async fn load(wasm_path: PathBuf) -> Result<Self> {
let key = wasm_path.to_str().unwrap();
let ext_map = GLOBAL_EXTENSION.get_or_init(|| Mutex::new(HashMap::new()));
if let Ok(table) = ext_map.lock() {
if table.contains_key(key) {
return Ok(table.get(key).unwrap().clone());
}
}
log::info!("wasm load:{}", key);
let work_dir = dirs::home_dir()
.unwrap()
.join(".config")
.join("wasm_extension")
.join("plugin");
let wasm_host = WasmHost::new(work_dir.clone());
let wasm_bytes = async_fs::read(&wasm_path).await?;
if wasm_bytes.is_empty() {
anyhow::bail!("wasm extension file:{:#?} not exists", wasm_path)
}
let manifest = parse_wasm_manifest(&wasm_bytes)?;
let wasm_extension = wasm_host
.load_plugin(
wasm_bytes,
&Arc::new(manifest.clone()),
executor::BackgroundExecutor::new(),
)
.await
.with_context(|| format!("failed to load wasm extension {}", manifest.id))?;
if let Ok(mut table) = ext_map.lock() {
table.insert(key.to_string(), wasm_extension.clone());
}
Ok(wasm_extension)
}
pub async fn run_plugin(&self, input: wit::PluginInput) -> Result<wit::PluginOutput> {
self.call(|extension, store| {
async move {
let plugin_out = extension
.call_run_plugin(store, &input)
.await?
.map_err(|error| anyhow::anyhow!("{error}"))?;
Ok(plugin_out.into())
}
.boxed()
})
.await
}
async fn call<T, Fn>(&self, f: Fn) -> T
where
T: 'static + Send,
Fn: 'static
+ Send
+ for<'a> FnOnce(&'a mut Extension, &'a mut Store<WasmState>) -> BoxFuture<'a, T>,
{
let (return_tx, return_rx) = oneshot::channel::<T>();
self.tx
.clone()
.unbounded_send(Box::new(move |extension, store| {
async {
let result = f(extension, store).await;
return_tx.send(result).ok();
}
.boxed()
}))
.expect("wasm extension channel should not be closed yet");
return_rx.await.expect("wasm extension channel")
}
}
fn parse_wasm_manifest(wasm_byte: &[u8]) -> Result<PluginManifest> {
let payloads = wasmparser::Parser::new(0).parse_all(wasm_byte);
let mut api_version = String::new();
let mut api_schema = String::new();
for payload in payloads {
match payload.unwrap() {
wasmparser::Payload::CustomSection(custom_section_reader) => {
match custom_section_reader.name() {
"workoss:api-version" => {
let data = custom_section_reader.data();
let version = String::from_utf8_lossy(data).to_string();
api_version.push_str(&version);
}
"workoss:api-schema" => {
let data = custom_section_reader.data();
let schema = String::from_utf8_lossy(data).to_string();
api_schema.push_str(&schema);
}
_ => {}
}
}
_ => {}
}
}
if api_version.is_empty() || api_schema.is_empty() {
anyhow::bail!("api_version,api_schema can't be null");
}
let schema_map: HashMap<String, String> = api_schema
.split("\n")
.map(|pair| {
let mut kv = pair.split("::");
(
kv.next().unwrap().to_string(),
kv.next().unwrap().to_string(),
)
})
.collect();
let id = schema_map.get("id").unwrap().to_string();
let name = schema_map.get("name").unwrap().to_string();
let version = schema_map.get("version").unwrap().to_string();
let description = schema_map.get("description").map(ToString::to_string);
let repository = schema_map.get("repository").map(ToString::to_string);
let authors = schema_map
.get("authors")
.unwrap()
.split(":")
.into_iter()
.map(ToString::to_string)
.collect::<Vec<String>>();
Ok(PluginManifest::new(
id,
name,
version,
description,
authors,
repository,
Some(api_version),
))
}
#[cfg(test)]
mod tests {
use std::{
path::{Path, PathBuf},
thread,
time::Instant,
};
use futures::{
channel::mpsc::{self},
executor::{self, ThreadPool},
StreamExt,
};
use wasmparser::Parser;
use wasmtime::{
component::{Component, Linker},
Config, Engine, Store,
};
use wasmtime_wasi::ResourceTable;
use crate::{
parse_wasm_manifest,
wit::{Extension, PluginInput},
WasmExtension, WasmHost, WasmState,
};
#[test]
fn test_parse() -> anyhow::Result<()> {
let bytes = std::fs::read("/Users/workoss/IDE/rustProjects/wasm-extension/target/wasm32-wasip2/release/extension_plugin.wasm").unwrap();
let manifest = parse_wasm_manifest(&bytes)?;
let json_string = serde_json::to_string(&manifest).unwrap();
println!("{json_string:#?}");
Ok(())
}
#[test]
fn test_futures() {
let pool = ThreadPool::builder()
.pool_size(4)
.create()
.expect("failed create pool");
let (tx, rx) = mpsc::unbounded::<i32>();
let fut_values = async {
let thread = thread::current();
let thread_name = thread.name().expect("----");
println!("thread-name:{thread_name}");
let fut_tx_result = async move {
(0..100).for_each(|v| {
tx.unbounded_send(v).expect("Failed to send");
})
};
pool.spawn_ok(fut_tx_result);
let fut_values = rx.map(|v| v * 2).collect();
fut_values.await
};
let values: Vec<i32> = executor::block_on(fut_values);
println!("Values={values:?}");
println!("{:#?}", dirs::home_dir());
println!("{:#?}", dirs::public_dir());
println!("{:#?}", dirs::cache_dir());
println!("{:#?}", dirs::config_dir());
}
#[tokio::test]
async fn test_load() {
let wasm_path = PathBuf::new().join("/Users/workoss/IDE/rustProjects/wasm-extension/target/wasm32-wasip2/release/extension_plugin.min.wasm");
let _wasm_extension = WasmExtension::load(wasm_path).await.unwrap();
let now = Instant::now();
for i in 0..100 {
let wasm_path = PathBuf::new().join("/Users/workoss/IDE/rustProjects/wasm-extension/target/wasm32-wasip2/release/extension_plugin.min.wasm");
let wasm_extension = WasmExtension::load(wasm_path).await.unwrap();
let json_string = format!("{{\"id\":\"{i:?}\",\"name\":\"lisi\"}}");
let input = PluginInput {
body: Some(json_string.into_bytes()),
mime_type: crate::wit::MimeType::Json,
envs: None,
};
let out = wasm_extension.run_plugin(input).await.unwrap();
println!("body:{:#?}", String::from_utf8(out.body.unwrap()));
}
println!("run plugin cost:{:?}", now.elapsed());
}
#[tokio::test]
async fn test_wasm() -> anyhow::Result<()> {
let _now = Instant::now();
let mut config = Config::new();
config.async_support(true);
config.wasm_component_model(true);
config.debug_info(true);
let engine = Engine::new(&config)?;
let component = Component::from_file(
&engine,
Path::new("/Users/workoss/IDE/rustProjects/wasm-extension/target/wasm32-wasip2/release/extension_plugin.wasm"),
)?;
let mut linker: Linker<WasmState> = Linker::<WasmState>::new(&engine);
wasmtime_wasi::add_to_linker_async(&mut linker)?;
Extension::add_to_linker(&mut linker, |s: &mut WasmState| s)?;
let wasi_ctx = wasmtime_wasi::WasiCtx::builder()
.inherit_stdio()
.preopened_dir(
"/Users/workoss/IDE",
".",
wasmtime_wasi::DirPerms::all(),
wasmtime_wasi::FilePerms::all(),
)?
.env(
"PWD",
"/Users/workoss/.config/wasm_extension/plugin/extension_plugin",
)
.build();
let wasm_state = WasmState {
table: ResourceTable::new(),
ctx: wasi_ctx,
host: WasmHost::new(PathBuf::new().join("path")),
};
let mut store = Store::new(&engine, wasm_state);
let extension = Extension::instantiate_async(&mut store, &component, &linker)
.await
.unwrap();
extension.call_init_plugin(&mut store).await.unwrap();
for _i in 0..100 {
let input = PluginInput {
body: Some(r#"{"id":"2","name":"lisi"}"#.into()),
mime_type: crate::wit::MimeType::Json,
envs: None,
};
let _out = extension.call_run_plugin(&mut store, &input).await.unwrap();
}
Ok(())
}
#[test]
fn test_wasmparser() {
let bytes = std::fs::read("/Users/workoss/IDE/rustProjects/wasm-extension/target/wasm32-wasip2/release/extension_plugin.wasm").unwrap();
let payloads = Parser::new(0).parse_all(&bytes);
for payload in payloads {
match payload.unwrap() {
wasmparser::Payload::Version {
num,
encoding: _,
range,
} => {
println!("num:{},range:{:#?}", num, range);
}
wasmparser::Payload::CustomSection(custom_section_reader) => {
let name = custom_section_reader.name();
match name {
"producers" => {
match custom_section_reader.as_known() {
wasmparser::KnownCustom::Producers(section_limited) => {
let _data = custom_section_reader.data();
let fields = section_limited.into_iter().collect::<Vec<_>>();
for field in fields {
let field = field.unwrap();
println!("field {}", field.name);
let values = field.values.into_iter().collect::<Vec<_>>();
for value in values {
let value = value.unwrap();
println!(
"field value:{} version:{}",
value.name, value.version
);
}
}
}
_ => {
let data =
String::from_utf8_lossy(custom_section_reader.data())
.to_string();
println!("CustomSection name:{name:?} - {data:#?}");
}
};
}
"workoss:api-version" => {
let data = custom_section_reader.data();
let version = String::from_utf8_lossy(data).to_string();
println!("CustomSection name:{name:?} - {version:#?}");
}
"workoss:api-schema" => {
let data = custom_section_reader.data();
let version = String::from_utf8_lossy(data).to_string();
println!("CustomSection name:{name:?} - {version:#?}");
}
_ => {
println!("CustomSection name:{name:?}");
}
}
}
_ => {}
}
}
}
}