use std::{ffi::OsStr, sync::Arc, any::Any};
#[cfg(not(feature = "tokio-host"))]
pub use std_runtime::*;
#[cfg(feature = "tokio-host")]
pub use tokio_runtime::*;
#[cfg(feature = "tokio-host")]
pub mod tokio_runtime {
use super::*;
use tokio::sync::mpsc;
pub struct Host<T> {
plugins: Vec<mpsc::Sender<Message<T>>>,
pub tasks: Vec<tokio::task::JoinHandle<Option<u8>>>,
}
impl<T: Sync + Send + 'static> Host<T> {
pub fn new() -> Self {
Host {
plugins: Vec::new(),
tasks: Vec::new(),
}
}
pub const DEFAULT_CHANNEL_CAPACITY: usize = 4;
pub async fn send(&mut self, message: impl Into<Message<T>>) {
let message = message.into();
futures::future::join_all(
self.plugins
.iter_mut()
.map(|plugin| plugin.send(message.clone())),
)
.await;
}
pub async fn attach(&mut self, plugin: impl Plugin<T>) {
self.attach_with_capacity(plugin, Self::DEFAULT_CHANNEL_CAPACITY)
.await
}
pub async fn attach_with_capacity(&mut self, mut plugin: impl Plugin<T>, capacity: usize) {
let (tx, mut rx) = mpsc::channel(capacity);
self.plugins.push(tx);
self.tasks.push(tokio::spawn(async move {
while let Some(message) = rx.recv().await {
if let Some(status) = plugin.handle_message(message) {
return Some(status);
}
}
None
}))
}
pub fn end(&mut self) -> futures::future::JoinAll<tokio::task::JoinHandle<Option<u8>>> {
self.plugins.clear();
futures::future::join_all(self.tasks.drain(..))
}
}
}
#[cfg(not(feature = "tokio-host"))]
pub mod std_runtime {
use super::*;
use std::sync::mpsc;
pub struct Host<T> {
plugins: Vec<mpsc::SyncSender<Message<T>>>,
pub tasks: Vec<std::thread::JoinHandle<Option<u8>>>,
}
impl<T> Drop for Host<T> {
#[allow(unused_must_use)]
fn drop(&mut self) {
self.plugins.clear();
for task in self.tasks.drain(..) {
task.join();
}
}
}
impl<T: Sync + Send + 'static> Host<T> {
pub fn new() -> Self {
Host {
plugins: Vec::new(),
tasks: Vec::new(),
}
}
pub const DEFAULT_CHANNEL_CAPACITY: usize = 4;
#[allow(unused_must_use)]
pub fn send(&mut self, message: impl Into<Message<T>>) {
let message = message.into();
for plugin in self.plugins.iter() {
plugin.send(message.clone());
}
}
pub fn attach(&mut self, plugin: impl Plugin<T>) {
self.attach_with_capacity(plugin, Self::DEFAULT_CHANNEL_CAPACITY)
}
pub fn attach_with_capacity(&mut self, mut plugin: impl Plugin<T>, capacity: usize) {
let (tx, rx) = mpsc::sync_channel(capacity);
self.plugins.push(tx);
self.tasks.push(std::thread::spawn(move || {
while let Ok(message) = rx.recv() {
if let Some(status) = plugin.handle_message(message) {
return Some(status);
}
}
None
}))
}
pub fn end(&mut self) -> Vec<std::thread::Result<Option<u8>>> {
self.plugins.clear();
self.tasks.drain(..).map(|t| t.join()).collect()
}
}
}
pub struct Message<T> {
pub content: Arc<T>,
}
impl<T> AsRef<T> for Message<T> {
fn as_ref(&self) -> &T {
self.content.as_ref()
}
}
impl<T> Clone for Message<T> {
fn clone(&self) -> Self {
Message {
content: self.content.clone(),
}
}
}
impl<T> Message<T> {
pub fn new(value: T) -> Self {
Message {
content: Arc::new(value),
}
}
}
impl<T> From<Arc<T>> for Message<T> {
fn from(content: Arc<T>) -> Self {
Message { content }
}
}
impl<T> From<T> for Message<T> {
fn from(content: T) -> Self {
Message {
content: Arc::new(content),
}
}
}
pub trait Plugin<T>: Sync + Send + 'static {
fn handle_message(&mut self, message: Message<T>) -> Option<u8>;
}
#[derive(Debug)]
pub enum PluginConstructionError {
Loading(libloading::Error),
Construction,
}
impl From<libloading::Error> for PluginConstructionError {
fn from(e: libloading::Error) -> Self {
PluginConstructionError::Loading(e)
}
}
pub fn construct_plugin_with_constructor<T>(
path: impl AsRef<OsStr>,
constructor: impl AsRef<[u8]>,
args: Option<&dyn Any>
) -> Result<Box<dyn Plugin<T>>, PluginConstructionError> {
let lib = libloading::Library::new(path)?;
let mut instance = std::mem::MaybeUninit::zeroed();
Ok(unsafe {
lib.get::<FfiPluginInit<T>>(constructor.as_ref())?(instance.as_mut_ptr(), args);
if ((*instance.as_ptr()).as_ref() as *const dyn Plugin<T>).is_null() {
return Err(PluginConstructionError::Construction);
}
instance.assume_init()
})
}
pub fn construct_plugin<T>(
path: impl AsRef<OsStr>,
args: Option<&dyn Any>
) -> Result<Box<dyn Plugin<T>>, PluginConstructionError> {
construct_plugin_with_constructor(path, b"plugin_constructor", args)
}
pub fn insert_instance<T>(ptr: *mut Box<dyn Plugin<T>>, mut plugin: Box<dyn Plugin<T>>) {
unsafe { std::mem::swap(&mut plugin, &mut *ptr) };
std::mem::forget(plugin);
}
impl<T: 'static, B: AsMut<dyn Plugin<T>> + Sync + Send + 'static> Plugin<T> for B {
fn handle_message(&mut self, message: Message<T>) -> Option<u8> {
self.as_mut().handle_message(message)
}
}
pub type FfiPluginInit<T> = unsafe extern "C" fn(*mut Box<dyn Plugin<T>>, Option<&dyn Any>);