message_plugins/
lib.rs

1use std::{ffi::OsStr, sync::Arc, any::Any};
2
3#[cfg(not(feature = "tokio-host"))]
4pub use std_runtime::*;
5#[cfg(feature = "tokio-host")]
6pub use tokio_runtime::*;
7
8#[cfg(feature = "tokio-host")]
9pub mod tokio_runtime {
10    use super::*;
11    use tokio::sync::mpsc;
12    /// Meant as the main way of sending commands to plugins.
13    /// A structure that holds the input ends of the queues to each plugin,
14    /// as well as the `JoinHandle`s to their tasks.
15    pub struct Host<T> {
16        plugins: Vec<mpsc::Sender<Message<T>>>,
17        pub tasks: Vec<tokio::task::JoinHandle<Option<u8>>>,
18    }
19
20    impl<T: Sync + Send + 'static> Host<T> {
21        pub fn new() -> Self {
22            Host {
23                plugins: Vec::new(),
24                tasks: Vec::new(),
25            }
26        }
27
28        /// By default, plugins will communicate with the host using a queue capable of holding this many `Message`s.
29        pub const DEFAULT_CHANNEL_CAPACITY: usize = 4;
30
31        /// Sends a message to all the attached `Plugin`s.
32        pub async fn send(&mut self, message: impl Into<Message<T>>) {
33            let message = message.into();
34            futures::future::join_all(
35                self.plugins
36                    .iter_mut()
37                    .map(|plugin| plugin.send(message.clone())),
38            )
39            .await;
40        }
41
42        /// Enables a `Plugin` by attaching it to the `Host`: a channel is built, the input is given to the host;
43        /// a task running the `Plugin`'s `handle_message` method on every `Message` sent over the channel is spawned, and the `JoinHandle` to this task is added to the host's handles.
44        pub async fn attach(&mut self, plugin: impl Plugin<T>) {
45            self.attach_with_capacity(plugin, Self::DEFAULT_CHANNEL_CAPACITY)
46                .await
47        }
48
49        /// Like `attach`, but allows you to choose your own channel capacity.
50        pub async fn attach_with_capacity(&mut self, mut plugin: impl Plugin<T>, capacity: usize) {
51            let (tx, mut rx) = mpsc::channel(capacity);
52            self.plugins.push(tx);
53            self.tasks.push(tokio::spawn(async move {
54                while let Some(message) = rx.recv().await {
55                    if let Some(status) = plugin.handle_message(message) {
56                        return Some(status);
57                    }
58                }
59                None
60            }))
61        }
62
63        /// Drops every channel end, closing them, then waits for all plugins to finish processing the remaining messages.
64        pub fn end(&mut self) -> futures::future::JoinAll<tokio::task::JoinHandle<Option<u8>>> {
65            self.plugins.clear();
66            futures::future::join_all(self.tasks.drain(..))
67        }
68    }
69}
70
71#[cfg(not(feature = "tokio-host"))]
72pub mod std_runtime {
73    use super::*;
74    use std::sync::mpsc;
75    /// Meant as the main way of sending commands to plugins.
76    /// A structure that holds the input ends of the queues to each plugin,
77    /// as well as the `JoinHandle`s to their tasks.
78    pub struct Host<T> {
79        plugins: Vec<mpsc::SyncSender<Message<T>>>,
80        pub tasks: Vec<std::thread::JoinHandle<Option<u8>>>,
81    }
82
83    impl<T> Drop for Host<T> {
84        #[allow(unused_must_use)]
85        fn drop(&mut self) {
86            self.plugins.clear();
87            for task in self.tasks.drain(..) {
88                task.join();
89            }
90        }
91    }
92
93    impl<T: Sync + Send + 'static> Host<T> {
94        pub fn new() -> Self {
95            Host {
96                plugins: Vec::new(),
97                tasks: Vec::new(),
98            }
99        }
100
101        /// By default, plugins will communicate with the host using a queue capable of holding this many `Message`s.
102        pub const DEFAULT_CHANNEL_CAPACITY: usize = 4;
103
104        /// Sends a message to all the attached `Plugin`s.
105        #[allow(unused_must_use)]
106        pub fn send(&mut self, message: impl Into<Message<T>>) {
107            let message = message.into();
108            for plugin in self.plugins.iter() {
109                plugin.send(message.clone());
110            }
111        }
112
113        /// Enables a `Plugin` by attaching it to the `Host`: a channel is built, the input is given to the host;
114        /// a task running the `Plugin`'s `handle_message` method on every `Message` sent over the channel is spawned, and the `JoinHandle` to this task is added to the host's handles.
115        pub fn attach(&mut self, plugin: impl Plugin<T>) {
116            self.attach_with_capacity(plugin, Self::DEFAULT_CHANNEL_CAPACITY)
117        }
118
119        /// Like `attach`, but allows you to choose your own channel capacity.
120        pub fn attach_with_capacity(&mut self, mut plugin: impl Plugin<T>, capacity: usize) {
121            let (tx, rx) = mpsc::sync_channel(capacity);
122            self.plugins.push(tx);
123            self.tasks.push(std::thread::spawn(move || {
124                while let Ok(message) = rx.recv() {
125                    if let Some(status) = plugin.handle_message(message) {
126                        return Some(status);
127                    }
128                }
129                None
130            }))
131        }
132
133        /// Drops every channel end, closing them, then waits for all plugins to finish processing the remaining messages.
134        pub fn end(&mut self) -> Vec<std::thread::Result<Option<u8>>> {
135            self.plugins.clear();
136            self.tasks.drain(..).map(|t| t.join()).collect()
137        }
138    }
139}
140
141/// Represents a single message to be sent to every plugin.
142pub struct Message<T> {
143    pub content: Arc<T>,
144}
145
146impl<T> AsRef<T> for Message<T> {
147    fn as_ref(&self) -> &T {
148        self.content.as_ref()
149    }
150}
151
152impl<T> Clone for Message<T> {
153    fn clone(&self) -> Self {
154        Message {
155            content: self.content.clone(),
156        }
157    }
158}
159
160impl<T> Message<T> {
161    pub fn new(value: T) -> Self {
162        Message {
163            content: Arc::new(value),
164        }
165    }
166}
167
168impl<T> From<Arc<T>> for Message<T> {
169    fn from(content: Arc<T>) -> Self {
170        Message { content }
171    }
172}
173
174impl<T> From<T> for Message<T> {
175    fn from(content: T) -> Self {
176        Message {
177            content: Arc::new(content),
178        }
179    }
180}
181
182/// In this architectures, plugins are purely slaves: they simply react to messages.
183/// Their only way of returning information by default is by returning Some(status) to signal that they wish to be dropped.
184/// If you want your plugin to be able to communicate back to your application after some of your messages, you should hand them a channel to do so through your message type.
185pub trait Plugin<T>: Sync + Send + 'static {
186    fn handle_message(&mut self, message: Message<T>) -> Option<u8>;
187}
188
189#[derive(Debug)]
190pub enum PluginConstructionError {
191    Loading(libloading::Error),
192    Construction,
193}
194
195impl From<libloading::Error> for PluginConstructionError {
196    fn from(e: libloading::Error) -> Self {
197        PluginConstructionError::Loading(e)
198    }
199}
200
201/// Loads a dynamic library at `path`, and calls the function called `constructor` in order to instanciate a `Plugin`.
202/// The constructor function is the only function where you need to dirty your hands with `extern "C"`. Its sole purpose is to insert your boxed plugin into a pointer.
203/// I suggest writing a constructor of the style:
204/// ```rust
205/// #[no_mangle]
206/// unsafe extern "C" fn plugin_constructor(ptr: *mut Box<dyn Plugin<YourMessageType>>) {
207///     let plugin = Box::new(YourPlugin::new());
208///     insert_instace(ptr, plugin);
209/// }
210/// ```
211pub fn construct_plugin_with_constructor<T>(
212    path: impl AsRef<OsStr>,
213    constructor: impl AsRef<[u8]>,
214    args: Option<&dyn Any>
215) -> Result<Box<dyn Plugin<T>>, PluginConstructionError> {
216    let lib = libloading::Library::new(path)?;
217    let mut instance = std::mem::MaybeUninit::zeroed();
218    Ok(unsafe {
219        lib.get::<FfiPluginInit<T>>(constructor.as_ref())?(instance.as_mut_ptr(), args);
220        if ((*instance.as_ptr()).as_ref() as *const dyn Plugin<T>).is_null() {
221            return Err(PluginConstructionError::Construction);
222        }
223        instance.assume_init()
224    })
225}
226
227/// A default for `construct_plugin_with_constructor`, which will call a function named `plugin_constructor`.
228/// The constructor function is the only function where you need to dirty your hands with `extern "C"`. Its sole purpose is to insert your boxed plugin into a pointer.
229/// I suggest writing a constructor of the style:
230/// ```rust
231/// #[no_mangle]
232/// unsafe extern "C" fn plugin_constructor(ptr: *mut Box<dyn Plugin<YourMessageType>>) {
233///     let plugin = Box::new(YourPlugin::new());
234///     insert_instace(ptr, plugin);
235/// }
236/// ```
237pub fn construct_plugin<T>(
238    path: impl AsRef<OsStr>,
239    args: Option<&dyn Any>
240) -> Result<Box<dyn Plugin<T>>, PluginConstructionError> {
241    construct_plugin_with_constructor(path, b"plugin_constructor", args)
242}
243
244/// Inserts a plugin into an uninitialized pointer, preventing the drop on the uninitialized memory that would happen with a simple assignment
245pub fn insert_instance<T>(ptr: *mut Box<dyn Plugin<T>>, mut plugin: Box<dyn Plugin<T>>) {
246    unsafe { std::mem::swap(&mut plugin, &mut *ptr) };
247    std::mem::forget(plugin);
248}
249
250impl<T: 'static, B: AsMut<dyn Plugin<T>> + Sync + Send + 'static> Plugin<T> for B {
251    fn handle_message(&mut self, message: Message<T>) -> Option<u8> {
252        self.as_mut().handle_message(message)
253    }
254}
255
256pub type FfiPluginInit<T> = unsafe extern "C" fn(*mut Box<dyn Plugin<T>>, Option<&dyn Any>);