#![allow(clippy::missing_safety_doc)]
pub use reflow_pack_macro::reflow_pack;
use std::ffi::{CString, c_void};
use std::sync::Arc;
pub use reflow_actor::message::Message;
pub use reflow_actor::{Actor, ActorContext};
pub use reflow_pack_loader::REFLOW_PACK_ABI_VERSION;
pub use reflow_pack_loader::host::{
PackActorHandle, PackFactoryDropFn, PackFactoryFn, PackHostVtable, PackRegisterStatus,
};
#[cfg(not(target_arch = "wasm32"))]
pub struct PackHost<'a> {
vtable: &'a mut PackHostVtable,
status: i32,
}
#[cfg(not(target_arch = "wasm32"))]
impl<'a> PackHost<'a> {
#[doc(hidden)]
pub fn from_vtable(vtable: &'a mut PackHostVtable) -> Self {
Self {
vtable,
status: PackRegisterStatus::Ok as i32,
}
}
#[doc(hidden)]
pub fn take_status(self) -> i32 {
self.status
}
pub fn register<F>(&mut self, template_id: &str, factory: F)
where
F: Fn() -> Arc<dyn Actor> + Send + Sync + 'static,
{
let id = match CString::new(template_id) {
Ok(c) => c,
Err(_) => {
self.status = PackRegisterStatus::BadUtf8 as i32;
return;
}
};
let boxed: Box<dyn Fn() -> Arc<dyn Actor> + Send + Sync> = Box::new(factory);
let user_data = Box::into_raw(Box::new(boxed)) as *mut c_void;
let status = unsafe {
(self.vtable.register_template)(
self.vtable.host_data,
id.as_ptr(),
Some(factory_trampoline),
Some(factory_drop_trampoline),
user_data,
)
};
if status != PackRegisterStatus::Ok as i32 {
if self.status == PackRegisterStatus::Ok as i32 {
self.status = status;
}
unsafe {
let _ =
Box::from_raw(user_data as *mut Box<dyn Fn() -> Arc<dyn Actor> + Send + Sync>);
}
}
}
}
#[cfg(not(target_arch = "wasm32"))]
unsafe extern "C" fn factory_trampoline(user_data: *mut c_void) -> *mut PackActorHandle {
if user_data.is_null() {
return std::ptr::null_mut();
}
let factory = unsafe { &*(user_data as *const Box<dyn Fn() -> Arc<dyn Actor> + Send + Sync>) };
let actor = factory();
PackActorHandle::new(actor)
}
#[cfg(not(target_arch = "wasm32"))]
unsafe extern "C" fn factory_drop_trampoline(user_data: *mut c_void) {
if user_data.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(user_data as *mut Box<dyn Fn() -> Arc<dyn Actor> + Send + Sync>);
}
}
pub use reflow_actor::{ActorBehavior, ActorLoad, ActorPayload, ActorState, MemoryState, Port};
#[cfg(target_arch = "wasm32")]
mod wasm_abi {
use super::*;
use std::collections::HashMap;
use std::sync::Mutex;
use reflow_actor::message::Message;
type Factory = Box<dyn Fn() -> std::sync::Arc<dyn Actor> + Send + Sync>;
struct TemplateEntry {
factory: Factory,
inports: Vec<String>,
outports: Vec<String>,
}
struct InstanceEntry {
actor: std::sync::Arc<dyn Actor>,
state: std::sync::Arc<parking_lot::Mutex<dyn ActorState>>,
load: std::sync::Arc<reflow_actor::ActorLoad>,
}
#[repr(transparent)]
struct WasmSync<T>(T);
unsafe impl<T> Sync for WasmSync<T> {}
unsafe impl<T> Send for WasmSync<T> {}
impl<T> std::ops::Deref for WasmSync<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
}
}
static TEMPLATES: std::sync::OnceLock<WasmSync<Mutex<Vec<TemplateEntry>>>> =
std::sync::OnceLock::new();
static INSTANCES: std::sync::OnceLock<WasmSync<Mutex<HashMap<u32, InstanceEntry>>>> =
std::sync::OnceLock::new();
static NEXT_INSTANCE_ID: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(1);
fn templates() -> &'static Mutex<Vec<TemplateEntry>> {
&TEMPLATES.get_or_init(|| WasmSync(Mutex::new(Vec::new()))).0
}
fn instances() -> &'static Mutex<HashMap<u32, InstanceEntry>> {
&INSTANCES.get_or_init(|| WasmSync(Mutex::new(HashMap::new()))).0
}
#[link(wasm_import_module = "env")]
unsafe extern "C" {
fn __reflow_pack_register_template(
metadata_ptr: *const u8,
metadata_len: u32,
factory_id: u32,
);
}
pub struct WasmPackHost<'a> {
status: i32,
_phantom: std::marker::PhantomData<&'a ()>,
}
impl<'a> WasmPackHost<'a> {
#[doc(hidden)]
pub fn new() -> Self {
Self {
status: PackRegisterStatus::Ok as i32,
_phantom: std::marker::PhantomData,
}
}
#[doc(hidden)]
pub fn take_status(self) -> i32 {
self.status
}
pub fn register<F>(&mut self, template_id: &str, factory: F)
where
F: Fn() -> std::sync::Arc<dyn Actor> + Send + Sync + 'static,
{
let probe = factory();
let inports = probe.inport_names();
let outports = probe.outport_names();
drop(probe);
let mut table = match templates().lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
let factory_id = table.len() as u32;
table.push(TemplateEntry {
factory: Box::new(factory),
inports: inports.clone(),
outports: outports.clone(),
});
drop(table);
let metadata = serde_json::json!({
"name": template_id,
"inports": inports,
"outports": outports,
})
.to_string();
let bytes = metadata.as_bytes();
unsafe {
__reflow_pack_register_template(
bytes.as_ptr(),
bytes.len() as u32,
factory_id,
);
}
}
}
impl<'a> Default for WasmPackHost<'a> {
fn default() -> Self {
Self::new()
}
}
#[unsafe(no_mangle)]
pub extern "C" fn __reflow_pack_alloc(size: u32) -> u32 {
if size == 0 {
return 0;
}
let layout = match std::alloc::Layout::from_size_align(size as usize, 8) {
Ok(l) => l,
Err(_) => return 0,
};
let ptr = unsafe { std::alloc::alloc(layout) };
ptr as u32
}
#[unsafe(no_mangle)]
pub extern "C" fn __reflow_pack_free(ptr: u32, size: u32) {
if ptr == 0 || size == 0 {
return;
}
let layout = match std::alloc::Layout::from_size_align(size as usize, 8) {
Ok(l) => l,
Err(_) => return,
};
unsafe { std::alloc::dealloc(ptr as *mut u8, layout) };
}
#[unsafe(no_mangle)]
pub extern "C" fn __reflow_pack_create_actor(factory_id: u32) -> u32 {
let table = match templates().lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
let entry = match table.get(factory_id as usize) {
Some(e) => e,
None => return 0,
};
let actor = (entry.factory)();
let state = actor.create_state();
let load = actor.load_count();
drop(table);
let id = NEXT_INSTANCE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut map = match instances().lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
map.insert(id, InstanceEntry { actor, state, load });
id
}
#[unsafe(no_mangle)]
pub extern "C" fn __reflow_pack_destroy_actor(instance_id: u32) {
let mut map = match instances().lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
map.remove(&instance_id);
}
#[unsafe(no_mangle)]
pub extern "C" fn __reflow_pack_actor_run(
instance_id: u32,
payload_ptr: u32,
payload_len: u32,
out_ptr_ptr: u32,
out_len_ptr: u32,
) -> i32 {
fn write_out(json: String, out_ptr_ptr: u32, out_len_ptr: u32) {
let bytes = json.into_bytes();
let len = bytes.len() as u32;
let buf_ptr = __reflow_pack_alloc(len.max(1));
if buf_ptr != 0 {
unsafe {
std::ptr::copy_nonoverlapping(bytes.as_ptr(), buf_ptr as *mut u8, bytes.len());
}
}
unsafe {
std::ptr::write_unaligned(out_ptr_ptr as *mut u32, buf_ptr);
std::ptr::write_unaligned(out_len_ptr as *mut u32, len);
}
}
let instance = {
let map = match instances().lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
match map.get(&instance_id) {
Some(e) => (e.actor.clone(), e.state.clone(), e.load.clone()),
None => {
write_out(
format!(r#"{{"error":"unknown instance_id {}"}}"#, instance_id),
out_ptr_ptr,
out_len_ptr,
);
return -1;
}
}
};
let payload_bytes = if payload_len == 0 {
&[][..]
} else {
unsafe { std::slice::from_raw_parts(payload_ptr as *const u8, payload_len as usize) }
};
let payload_str = match std::str::from_utf8(payload_bytes) {
Ok(s) => s,
Err(e) => {
write_out(
format!(r#"{{"error":"invalid utf-8 in payload: {}"}}"#, e),
out_ptr_ptr,
out_len_ptr,
);
return -2;
}
};
#[derive(serde::Deserialize, Default)]
struct ActorRunPayload {
#[serde(default)]
input: HashMap<String, Message>,
}
let parsed: ActorRunPayload = match serde_json::from_str(payload_str) {
Ok(p) => p,
Err(e) => {
write_out(
format!(r#"{{"error":"parse payload: {}"}}"#, e),
out_ptr_ptr,
out_len_ptr,
);
return -3;
}
};
let (actor, state, load) = instance;
let channel = flume::unbounded();
let context = reflow_actor::ActorContext::new(
parsed.input,
channel,
state,
reflow_actor::ActorConfig::default(),
load,
);
let future = actor.get_behavior()(context);
let result = match pollster::block_on(future) {
Ok(out) => out,
Err(e) => {
write_out(
format!(
r#"{{"error":"actor run failed: {}"}}"#,
e.to_string().replace('"', "\\\"")
),
out_ptr_ptr,
out_len_ptr,
);
return -4;
}
};
let json = match serde_json::to_string(&result) {
Ok(s) => s,
Err(e) => {
write_out(
format!(r#"{{"error":"serialize output: {}"}}"#, e),
out_ptr_ptr,
out_len_ptr,
);
return -5;
}
};
write_out(json, out_ptr_ptr, out_len_ptr);
0
}
}
#[cfg(target_arch = "wasm32")]
pub use wasm_abi::{
WasmPackHost, __reflow_pack_actor_run, __reflow_pack_alloc, __reflow_pack_create_actor,
__reflow_pack_destroy_actor, __reflow_pack_free,
};
#[cfg(target_arch = "wasm32")]
pub type PackHost<'a> = WasmPackHost<'a>;