use std::cell::RefCell;
use std::path::Path;
use std::ptr;
use actr_framework::guest::dynclib_abi::{self as guest_abi, AbiReply, InitPayloadV1};
use libloading::Library;
struct SendPtr<T>(*const T);
unsafe impl<T> Send for SendPtr<T> {}
impl<T> SendPtr<T> {
fn as_ptr(&self) -> *const T {
self.0
}
}
use actr_framework::guest::vtable::HostVTable;
use actr_protocol::{ActrId, DataStream};
use crate::workload::{
HostAbiFn, HostOperation, HostOperationResult, InvocationContext, PackageHookEvent,
encode_guest_data_stream_request, encode_guest_handle_request, encode_guest_hook_request,
encode_guest_lifecycle_request,
};
use super::error::{DynclibError, DynclibResult};
type InitFn = unsafe extern "C" fn(
vtable: *const HostVTable,
init_payload: *const u8,
init_len: usize,
) -> i32;
type HandleFn = unsafe extern "C" fn(
req: *const u8,
req_len: usize,
resp_out: *mut *mut u8,
resp_len_out: *mut usize,
) -> i32;
type FreeResponseFn = unsafe extern "C" fn(ptr: *mut u8, len: usize);
thread_local! {
static CURRENT_EXECUTOR: RefCell<Option<*const HostAbiFn>> = const { RefCell::new(None) };
static TOKIO_HANDLE: RefCell<Option<tokio::runtime::Handle>> = const { RefCell::new(None) };
}
fn install_thread_locals(executor: *const HostAbiFn, handle: tokio::runtime::Handle) {
CURRENT_EXECUTOR.with(|cell| *cell.borrow_mut() = Some(executor));
TOKIO_HANDLE.with(|cell| *cell.borrow_mut() = Some(handle));
}
fn clear_thread_locals() {
CURRENT_EXECUTOR.with(|cell| *cell.borrow_mut() = None);
TOKIO_HANDLE.with(|cell| *cell.borrow_mut() = None);
}
unsafe fn host_alloc_and_write(data: &[u8], out_ptr: *mut *mut u8, out_len: *mut usize) {
let len = data.len();
let buf = if len > 0 {
let layout = std::alloc::Layout::from_size_align(len, 1).expect("invalid layout");
let ptr = unsafe { std::alloc::alloc(layout) };
if ptr.is_null() {
std::alloc::handle_alloc_error(layout);
}
unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), ptr, len) };
ptr
} else {
ptr::null_mut()
};
unsafe {
*out_ptr = buf;
*out_len = len;
}
}
fn trampoline_execute(pending: HostOperation) -> HostOperationResult {
let maybe_result = TOKIO_HANDLE.with(|h_cell| {
let h_borrow = h_cell.borrow();
let handle = match h_borrow.as_ref() {
Some(h) => h,
None => {
tracing::error!("dynclib trampoline: TOKIO_HANDLE not set");
return None;
}
};
CURRENT_EXECUTOR.with(|e_cell| {
let e_borrow = e_cell.borrow();
let executor_ptr = match *e_borrow {
Some(p) => p,
None => {
tracing::error!("dynclib trampoline: CURRENT_EXECUTOR not set");
return None;
}
};
let executor: &HostAbiFn = unsafe { &*executor_ptr };
let future = executor(pending);
Some(handle.block_on(future))
})
});
maybe_result.unwrap_or(HostOperationResult::Error(guest_abi::code::GENERIC_ERROR))
}
unsafe fn read_raw_bytes(ptr: *const u8, len: usize) -> Vec<u8> {
if ptr.is_null() || len == 0 {
return Vec::new();
}
unsafe { std::slice::from_raw_parts(ptr, len) }.to_vec()
}
use crate::workload::decode_host_operation;
unsafe extern "C" fn vtable_invoke(
frame_ptr: *const u8,
frame_len: usize,
resp_ptr_out: *mut *mut u8,
resp_len_out: *mut usize,
) -> i32 {
if resp_ptr_out.is_null() || resp_len_out.is_null() {
return guest_abi::code::PROTOCOL_ERROR;
}
let frame_bytes = unsafe { read_raw_bytes(frame_ptr, frame_len) };
let frame = match guest_abi::decode_message::<guest_abi::AbiFrame>(&frame_bytes) {
Ok(frame) => frame,
Err(code) => return code,
};
let pending = match decode_host_operation(frame) {
Ok(pending) => pending,
Err(code) => return code,
};
let reply = match trampoline_execute(pending) {
HostOperationResult::Bytes(bytes) => AbiReply {
abi_version: guest_abi::version::V1,
status: guest_abi::code::SUCCESS,
payload: bytes,
},
HostOperationResult::Done => AbiReply {
abi_version: guest_abi::version::V1,
status: guest_abi::code::SUCCESS,
payload: Vec::new(),
},
HostOperationResult::Error(code) => AbiReply {
abi_version: guest_abi::version::V1,
status: code,
payload: Vec::new(),
},
};
let reply_bytes = match guest_abi::encode_message(&reply) {
Ok(reply_bytes) => reply_bytes,
Err(code) => return code,
};
unsafe { host_alloc_and_write(&reply_bytes, resp_ptr_out, resp_len_out) };
guest_abi::code::SUCCESS
}
unsafe extern "C" fn vtable_free_host_buf(ptr: *mut u8, len: usize) {
if ptr.is_null() || len == 0 {
return;
}
let layout = std::alloc::Layout::from_size_align(len, 1).expect("invalid layout in free");
unsafe { std::alloc::dealloc(ptr, layout) };
}
static HOST_VTABLE: HostVTable = HostVTable {
invoke: vtable_invoke,
free_host_buf: vtable_free_host_buf,
};
pub struct DynclibHost {
_library: Library,
init_fn: InitFn,
handle_fn: HandleFn,
free_response_fn: FreeResponseFn,
}
impl std::fmt::Debug for DynclibHost {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DynclibHost").finish_non_exhaustive()
}
}
unsafe impl Send for DynclibHost {}
unsafe impl Sync for DynclibHost {}
impl DynclibHost {
pub fn load(path: impl AsRef<Path>) -> DynclibResult<Self> {
let path = path.as_ref();
tracing::info!(path = %path.display(), "loading dynclib actor");
let library = unsafe {
Library::new(path)
.map_err(|e| DynclibError::LoadFailed(format!("{}: {e}", path.display())))?
};
let init_fn: InitFn = unsafe {
let sym =
library
.get::<InitFn>(b"actr_init\0")
.map_err(|e| DynclibError::MissingSymbol {
symbol: "actr_init".into(),
detail: e.to_string(),
})?;
*sym
};
let handle_fn: HandleFn = unsafe {
let sym = library.get::<HandleFn>(b"actr_handle\0").map_err(|e| {
DynclibError::MissingSymbol {
symbol: "actr_handle".into(),
detail: e.to_string(),
}
})?;
*sym
};
let free_response_fn: FreeResponseFn = unsafe {
let sym = library
.get::<FreeResponseFn>(b"actr_free_response\0")
.map_err(|e| DynclibError::MissingSymbol {
symbol: "actr_free_response".into(),
detail: e.to_string(),
})?;
*sym
};
tracing::info!(path = %path.display(), "dynclib symbols resolved successfully");
Ok(Self {
_library: library,
init_fn,
handle_fn,
free_response_fn,
})
}
pub(crate) fn instantiate(
&self,
init_payload: &InitPayloadV1,
) -> DynclibResult<DynclibInstance> {
let init_bytes = guest_abi::encode_message(init_payload).map_err(|code| {
DynclibError::DispatchFailed(format!("init payload encode failed: {code}"))
})?;
let init_ptr = if init_bytes.is_empty() {
ptr::null()
} else {
init_bytes.as_ptr()
};
let result = unsafe { (self.init_fn)(&HOST_VTABLE, init_ptr, init_bytes.len()) };
if result != 0 {
tracing::error!(code = result, "actr_init failed");
return Err(DynclibError::InitFailed(result));
}
tracing::info!("dynclib actor initialised successfully");
Ok(DynclibInstance {
handle_fn: self.handle_fn,
free_response_fn: self.free_response_fn,
})
}
}
pub(crate) struct DynclibInstance {
handle_fn: HandleFn,
free_response_fn: FreeResponseFn,
}
impl std::fmt::Debug for DynclibInstance {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DynclibInstance").finish_non_exhaustive()
}
}
unsafe impl Send for DynclibInstance {}
#[derive(Debug)]
pub(crate) struct DynClibWorkload {
instance: DynclibInstance,
_host: DynclibHost,
}
impl DynClibWorkload {
pub(crate) fn new(host: DynclibHost, instance: DynclibInstance) -> Self {
Self {
instance,
_host: host,
}
}
}
impl DynclibInstance {
async fn handle_encoded_request(
&mut self,
request_owned: Vec<u8>,
call_executor: &HostAbiFn,
) -> DynclibResult<Vec<u8>> {
let handle_fn = self.handle_fn;
let free_response_fn = self.free_response_fn;
let rt_handle = tokio::runtime::Handle::current();
let executor_ptr = SendPtr(call_executor as *const HostAbiFn);
let result = tokio::task::spawn_blocking(move || {
install_thread_locals(executor_ptr.as_ptr(), rt_handle);
let mut resp_ptr: *mut u8 = ptr::null_mut();
let mut resp_len: usize = 0;
let code = unsafe {
(handle_fn)(
request_owned.as_ptr(),
request_owned.len(),
&mut resp_ptr,
&mut resp_len,
)
};
let response = if !resp_ptr.is_null() && resp_len > 0 {
let data = unsafe { std::slice::from_raw_parts(resp_ptr, resp_len).to_vec() };
unsafe { (free_response_fn)(resp_ptr, resp_len) };
data
} else {
Vec::new()
};
clear_thread_locals();
if code != 0 {
tracing::warn!(code, "actr_handle returned error");
return Err(DynclibError::DispatchFailed(format!(
"actr_handle returned error code {code}"
)));
}
tracing::debug!(
req_bytes = request_owned.len(),
resp_bytes = response.len(),
"actr_handle completed"
);
Ok(response)
})
.await
.map_err(|e| DynclibError::DispatchFailed(format!("spawn_blocking panicked: {e}")))??;
let reply = guest_abi::decode_message::<AbiReply>(&result).map_err(|code| {
DynclibError::DispatchFailed(format!(
"guest returned malformed AbiReply with code {code}"
))
})?;
if reply.status != guest_abi::code::SUCCESS {
let message = String::from_utf8(reply.payload)
.unwrap_or_else(|_| format!("guest returned status {}", reply.status));
return Err(DynclibError::DispatchFailed(message));
}
Ok(reply.payload)
}
pub(crate) async fn handle(
&mut self,
request_bytes: &[u8],
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> DynclibResult<Vec<u8>> {
let request_owned = encode_guest_handle_request(request_bytes, ctx).map_err(|code| {
DynclibError::DispatchFailed(format!("guest handle frame serialization failed: {code}"))
})?;
self.handle_encoded_request(request_owned, call_executor)
.await
}
pub(crate) async fn handle_data_stream(
&mut self,
chunk: DataStream,
sender: ActrId,
call_executor: &HostAbiFn,
) -> DynclibResult<()> {
let request_owned = encode_guest_data_stream_request(chunk, sender).map_err(|code| {
DynclibError::DispatchFailed(format!(
"guest data stream frame serialization failed: {code}"
))
})?;
self.handle_encoded_request(request_owned, call_executor)
.await
.map(|_| ())
}
pub(crate) async fn handle_lifecycle(
&mut self,
hook: u32,
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> DynclibResult<()> {
let request_owned = encode_guest_lifecycle_request(hook, ctx).map_err(|code| {
DynclibError::DispatchFailed(format!(
"guest lifecycle frame serialization failed: {code}"
))
})?;
self.handle_encoded_request(request_owned, call_executor)
.await
.map(|_| ())
}
pub(crate) async fn handle_hook_event(
&mut self,
event: PackageHookEvent,
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> DynclibResult<()> {
let request_owned = encode_guest_hook_request(event, ctx).map_err(|code| {
DynclibError::DispatchFailed(format!("guest hook frame serialization failed: {code}"))
})?;
self.handle_encoded_request(request_owned, call_executor)
.await
.map(|_| ())
}
}
impl DynClibWorkload {
pub(crate) async fn handle(
&mut self,
request_bytes: &[u8],
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> DynclibResult<Vec<u8>> {
self.instance
.handle(request_bytes, ctx, call_executor)
.await
}
pub(crate) async fn handle_data_stream(
&mut self,
chunk: DataStream,
sender: ActrId,
call_executor: &HostAbiFn,
) -> DynclibResult<()> {
self.instance
.handle_data_stream(chunk, sender, call_executor)
.await
}
pub(crate) async fn call_on_start(
&mut self,
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> DynclibResult<()> {
self.instance
.handle_lifecycle(guest_abi::lifecycle_hook::ON_START, ctx, call_executor)
.await
}
pub(crate) async fn call_on_ready(
&mut self,
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> DynclibResult<()> {
self.instance
.handle_lifecycle(guest_abi::lifecycle_hook::ON_READY, ctx, call_executor)
.await
}
pub(crate) async fn call_on_stop(
&mut self,
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> DynclibResult<()> {
self.instance
.handle_lifecycle(guest_abi::lifecycle_hook::ON_STOP, ctx, call_executor)
.await
}
pub(crate) async fn call_hook_event(
&mut self,
event: PackageHookEvent,
ctx: InvocationContext,
call_executor: &HostAbiFn,
) -> DynclibResult<()> {
self.instance
.handle_hook_event(event, ctx, call_executor)
.await
}
}