1use std::{ffi::OsStr, sync::Arc, any::Any};
2
3#[cfg(not(feature = "tokio-host"))]
4pub use std_runtime::*;
5#[cfg(feature = "tokio-host")]
6pub use tokio_runtime::*;
7
8#[cfg(feature = "tokio-host")]
9pub mod tokio_runtime {
10 use super::*;
11 use tokio::sync::mpsc;
12 pub struct Host<T> {
16 plugins: Vec<mpsc::Sender<Message<T>>>,
17 pub tasks: Vec<tokio::task::JoinHandle<Option<u8>>>,
18 }
19
20 impl<T: Sync + Send + 'static> Host<T> {
21 pub fn new() -> Self {
22 Host {
23 plugins: Vec::new(),
24 tasks: Vec::new(),
25 }
26 }
27
28 pub const DEFAULT_CHANNEL_CAPACITY: usize = 4;
30
31 pub async fn send(&mut self, message: impl Into<Message<T>>) {
33 let message = message.into();
34 futures::future::join_all(
35 self.plugins
36 .iter_mut()
37 .map(|plugin| plugin.send(message.clone())),
38 )
39 .await;
40 }
41
42 pub async fn attach(&mut self, plugin: impl Plugin<T>) {
45 self.attach_with_capacity(plugin, Self::DEFAULT_CHANNEL_CAPACITY)
46 .await
47 }
48
49 pub async fn attach_with_capacity(&mut self, mut plugin: impl Plugin<T>, capacity: usize) {
51 let (tx, mut rx) = mpsc::channel(capacity);
52 self.plugins.push(tx);
53 self.tasks.push(tokio::spawn(async move {
54 while let Some(message) = rx.recv().await {
55 if let Some(status) = plugin.handle_message(message) {
56 return Some(status);
57 }
58 }
59 None
60 }))
61 }
62
63 pub fn end(&mut self) -> futures::future::JoinAll<tokio::task::JoinHandle<Option<u8>>> {
65 self.plugins.clear();
66 futures::future::join_all(self.tasks.drain(..))
67 }
68 }
69}
70
71#[cfg(not(feature = "tokio-host"))]
72pub mod std_runtime {
73 use super::*;
74 use std::sync::mpsc;
75 pub struct Host<T> {
79 plugins: Vec<mpsc::SyncSender<Message<T>>>,
80 pub tasks: Vec<std::thread::JoinHandle<Option<u8>>>,
81 }
82
83 impl<T> Drop for Host<T> {
84 #[allow(unused_must_use)]
85 fn drop(&mut self) {
86 self.plugins.clear();
87 for task in self.tasks.drain(..) {
88 task.join();
89 }
90 }
91 }
92
93 impl<T: Sync + Send + 'static> Host<T> {
94 pub fn new() -> Self {
95 Host {
96 plugins: Vec::new(),
97 tasks: Vec::new(),
98 }
99 }
100
101 pub const DEFAULT_CHANNEL_CAPACITY: usize = 4;
103
104 #[allow(unused_must_use)]
106 pub fn send(&mut self, message: impl Into<Message<T>>) {
107 let message = message.into();
108 for plugin in self.plugins.iter() {
109 plugin.send(message.clone());
110 }
111 }
112
113 pub fn attach(&mut self, plugin: impl Plugin<T>) {
116 self.attach_with_capacity(plugin, Self::DEFAULT_CHANNEL_CAPACITY)
117 }
118
119 pub fn attach_with_capacity(&mut self, mut plugin: impl Plugin<T>, capacity: usize) {
121 let (tx, rx) = mpsc::sync_channel(capacity);
122 self.plugins.push(tx);
123 self.tasks.push(std::thread::spawn(move || {
124 while let Ok(message) = rx.recv() {
125 if let Some(status) = plugin.handle_message(message) {
126 return Some(status);
127 }
128 }
129 None
130 }))
131 }
132
133 pub fn end(&mut self) -> Vec<std::thread::Result<Option<u8>>> {
135 self.plugins.clear();
136 self.tasks.drain(..).map(|t| t.join()).collect()
137 }
138 }
139}
140
141pub struct Message<T> {
143 pub content: Arc<T>,
144}
145
146impl<T> AsRef<T> for Message<T> {
147 fn as_ref(&self) -> &T {
148 self.content.as_ref()
149 }
150}
151
152impl<T> Clone for Message<T> {
153 fn clone(&self) -> Self {
154 Message {
155 content: self.content.clone(),
156 }
157 }
158}
159
160impl<T> Message<T> {
161 pub fn new(value: T) -> Self {
162 Message {
163 content: Arc::new(value),
164 }
165 }
166}
167
168impl<T> From<Arc<T>> for Message<T> {
169 fn from(content: Arc<T>) -> Self {
170 Message { content }
171 }
172}
173
174impl<T> From<T> for Message<T> {
175 fn from(content: T) -> Self {
176 Message {
177 content: Arc::new(content),
178 }
179 }
180}
181
182pub trait Plugin<T>: Sync + Send + 'static {
186 fn handle_message(&mut self, message: Message<T>) -> Option<u8>;
187}
188
189#[derive(Debug)]
190pub enum PluginConstructionError {
191 Loading(libloading::Error),
192 Construction,
193}
194
195impl From<libloading::Error> for PluginConstructionError {
196 fn from(e: libloading::Error) -> Self {
197 PluginConstructionError::Loading(e)
198 }
199}
200
201pub fn construct_plugin_with_constructor<T>(
212 path: impl AsRef<OsStr>,
213 constructor: impl AsRef<[u8]>,
214 args: Option<&dyn Any>
215) -> Result<Box<dyn Plugin<T>>, PluginConstructionError> {
216 let lib = libloading::Library::new(path)?;
217 let mut instance = std::mem::MaybeUninit::zeroed();
218 Ok(unsafe {
219 lib.get::<FfiPluginInit<T>>(constructor.as_ref())?(instance.as_mut_ptr(), args);
220 if ((*instance.as_ptr()).as_ref() as *const dyn Plugin<T>).is_null() {
221 return Err(PluginConstructionError::Construction);
222 }
223 instance.assume_init()
224 })
225}
226
227pub fn construct_plugin<T>(
238 path: impl AsRef<OsStr>,
239 args: Option<&dyn Any>
240) -> Result<Box<dyn Plugin<T>>, PluginConstructionError> {
241 construct_plugin_with_constructor(path, b"plugin_constructor", args)
242}
243
244pub fn insert_instance<T>(ptr: *mut Box<dyn Plugin<T>>, mut plugin: Box<dyn Plugin<T>>) {
246 unsafe { std::mem::swap(&mut plugin, &mut *ptr) };
247 std::mem::forget(plugin);
248}
249
250impl<T: 'static, B: AsMut<dyn Plugin<T>> + Sync + Send + 'static> Plugin<T> for B {
251 fn handle_message(&mut self, message: Message<T>) -> Option<u8> {
252 self.as_mut().handle_message(message)
253 }
254}
255
256pub type FfiPluginInit<T> = unsafe extern "C" fn(*mut Box<dyn Plugin<T>>, Option<&dyn Any>);