1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
use std::{ffi::OsStr, sync::Arc, any::Any};

#[cfg(not(feature = "tokio-host"))]
pub use std_runtime::*;
#[cfg(feature = "tokio-host")]
pub use tokio_runtime::*;

#[cfg(feature = "tokio-host")]
pub mod tokio_runtime {
    use super::*;
    use tokio::sync::mpsc;
    /// Meant as the main way of sending commands to plugins.
    /// A structure that holds the input ends of the queues to each plugin,
    /// as well as the `JoinHandle`s to their tasks.
    pub struct Host<T> {
        plugins: Vec<mpsc::Sender<Message<T>>>,
        pub tasks: Vec<tokio::task::JoinHandle<Option<u8>>>,
    }

    impl<T: Sync + Send + 'static> Host<T> {
        pub fn new() -> Self {
            Host {
                plugins: Vec::new(),
                tasks: Vec::new(),
            }
        }

        /// By default, plugins will communicate with the host using a queue capable of holding this many `Message`s.
        pub const DEFAULT_CHANNEL_CAPACITY: usize = 4;

        /// Sends a message to all the attached `Plugin`s.
        pub async fn send(&mut self, message: impl Into<Message<T>>) {
            let message = message.into();
            futures::future::join_all(
                self.plugins
                    .iter_mut()
                    .map(|plugin| plugin.send(message.clone())),
            )
            .await;
        }

        /// Enables a `Plugin` by attaching it to the `Host`: a channel is built, the input is given to the host;
        /// a task running the `Plugin`'s `handle_message` method on every `Message` sent over the channel is spawned, and the `JoinHandle` to this task is added to the host's handles.
        pub async fn attach(&mut self, plugin: impl Plugin<T>) {
            self.attach_with_capacity(plugin, Self::DEFAULT_CHANNEL_CAPACITY)
                .await
        }

        /// Like `attach`, but allows you to choose your own channel capacity.
        pub async fn attach_with_capacity(&mut self, mut plugin: impl Plugin<T>, capacity: usize) {
            let (tx, mut rx) = mpsc::channel(capacity);
            self.plugins.push(tx);
            self.tasks.push(tokio::spawn(async move {
                while let Some(message) = rx.recv().await {
                    if let Some(status) = plugin.handle_message(message) {
                        return Some(status);
                    }
                }
                None
            }))
        }

        /// Drops every channel end, closing them, then waits for all plugins to finish processing the remaining messages.
        pub fn end(&mut self) -> futures::future::JoinAll<tokio::task::JoinHandle<Option<u8>>> {
            self.plugins.clear();
            futures::future::join_all(self.tasks.drain(..))
        }
    }
}

#[cfg(not(feature = "tokio-host"))]
pub mod std_runtime {
    use super::*;
    use std::sync::mpsc;
    /// Meant as the main way of sending commands to plugins.
    /// A structure that holds the input ends of the queues to each plugin,
    /// as well as the `JoinHandle`s to their tasks.
    pub struct Host<T> {
        plugins: Vec<mpsc::SyncSender<Message<T>>>,
        pub tasks: Vec<std::thread::JoinHandle<Option<u8>>>,
    }

    impl<T> Drop for Host<T> {
        #[allow(unused_must_use)]
        fn drop(&mut self) {
            self.plugins.clear();
            for task in self.tasks.drain(..) {
                task.join();
            }
        }
    }

    impl<T: Sync + Send + 'static> Host<T> {
        pub fn new() -> Self {
            Host {
                plugins: Vec::new(),
                tasks: Vec::new(),
            }
        }

        /// By default, plugins will communicate with the host using a queue capable of holding this many `Message`s.
        pub const DEFAULT_CHANNEL_CAPACITY: usize = 4;

        /// Sends a message to all the attached `Plugin`s.
        #[allow(unused_must_use)]
        pub fn send(&mut self, message: impl Into<Message<T>>) {
            let message = message.into();
            for plugin in self.plugins.iter() {
                plugin.send(message.clone());
            }
        }

        /// Enables a `Plugin` by attaching it to the `Host`: a channel is built, the input is given to the host;
        /// a task running the `Plugin`'s `handle_message` method on every `Message` sent over the channel is spawned, and the `JoinHandle` to this task is added to the host's handles.
        pub fn attach(&mut self, plugin: impl Plugin<T>) {
            self.attach_with_capacity(plugin, Self::DEFAULT_CHANNEL_CAPACITY)
        }

        /// Like `attach`, but allows you to choose your own channel capacity.
        pub fn attach_with_capacity(&mut self, mut plugin: impl Plugin<T>, capacity: usize) {
            let (tx, rx) = mpsc::sync_channel(capacity);
            self.plugins.push(tx);
            self.tasks.push(std::thread::spawn(move || {
                while let Ok(message) = rx.recv() {
                    if let Some(status) = plugin.handle_message(message) {
                        return Some(status);
                    }
                }
                None
            }))
        }

        /// Drops every channel end, closing them, then waits for all plugins to finish processing the remaining messages.
        pub fn end(&mut self) -> Vec<std::thread::Result<Option<u8>>> {
            self.plugins.clear();
            self.tasks.drain(..).map(|t| t.join()).collect()
        }
    }
}

/// Represents a single message to be sent to every plugin.
pub struct Message<T> {
    pub content: Arc<T>,
}

impl<T> AsRef<T> for Message<T> {
    fn as_ref(&self) -> &T {
        self.content.as_ref()
    }
}

impl<T> Clone for Message<T> {
    fn clone(&self) -> Self {
        Message {
            content: self.content.clone(),
        }
    }
}

impl<T> Message<T> {
    pub fn new(value: T) -> Self {
        Message {
            content: Arc::new(value),
        }
    }
}

impl<T> From<Arc<T>> for Message<T> {
    fn from(content: Arc<T>) -> Self {
        Message { content }
    }
}

impl<T> From<T> for Message<T> {
    fn from(content: T) -> Self {
        Message {
            content: Arc::new(content),
        }
    }
}

/// In this architectures, plugins are purely slaves: they simply react to messages.
/// Their only way of returning information by default is by returning Some(status) to signal that they wish to be dropped.
/// If you want your plugin to be able to communicate back to your application after some of your messages, you should hand them a channel to do so through your message type.
pub trait Plugin<T>: Sync + Send + 'static {
    fn handle_message(&mut self, message: Message<T>) -> Option<u8>;
}

#[derive(Debug)]
pub enum PluginConstructionError {
    Loading(libloading::Error),
    Construction,
}

impl From<libloading::Error> for PluginConstructionError {
    fn from(e: libloading::Error) -> Self {
        PluginConstructionError::Loading(e)
    }
}

/// Loads a dynamic library at `path`, and calls the function called `constructor` in order to instanciate a `Plugin`.
/// The constructor function is the only function where you need to dirty your hands with `extern "C"`. Its sole purpose is to insert your boxed plugin into a pointer.
/// I suggest writing a constructor of the style:
/// ```rust
/// #[no_mangle]
/// unsafe extern "C" fn plugin_constructor(ptr: *mut Box<dyn Plugin<YourMessageType>>) {
///     let plugin = Box::new(YourPlugin::new());
///     insert_instace(ptr, plugin);
/// }
/// ```
pub fn construct_plugin_with_constructor<T>(
    path: impl AsRef<OsStr>,
    constructor: impl AsRef<[u8]>,
    args: Option<&dyn Any>
) -> Result<Box<dyn Plugin<T>>, PluginConstructionError> {
    let lib = libloading::Library::new(path)?;
    let mut instance = std::mem::MaybeUninit::zeroed();
    Ok(unsafe {
        lib.get::<FfiPluginInit<T>>(constructor.as_ref())?(instance.as_mut_ptr(), args);
        if ((*instance.as_ptr()).as_ref() as *const dyn Plugin<T>).is_null() {
            return Err(PluginConstructionError::Construction);
        }
        instance.assume_init()
    })
}

/// A default for `construct_plugin_with_constructor`, which will call a function named `plugin_constructor`.
/// The constructor function is the only function where you need to dirty your hands with `extern "C"`. Its sole purpose is to insert your boxed plugin into a pointer.
/// I suggest writing a constructor of the style:
/// ```rust
/// #[no_mangle]
/// unsafe extern "C" fn plugin_constructor(ptr: *mut Box<dyn Plugin<YourMessageType>>) {
///     let plugin = Box::new(YourPlugin::new());
///     insert_instace(ptr, plugin);
/// }
/// ```
pub fn construct_plugin<T>(
    path: impl AsRef<OsStr>,
    args: Option<&dyn Any>
) -> Result<Box<dyn Plugin<T>>, PluginConstructionError> {
    construct_plugin_with_constructor(path, b"plugin_constructor", args)
}

/// Inserts a plugin into an uninitialized pointer, preventing the drop on the uninitialized memory that would happen with a simple assignment
pub fn insert_instance<T>(ptr: *mut Box<dyn Plugin<T>>, mut plugin: Box<dyn Plugin<T>>) {
    unsafe { std::mem::swap(&mut plugin, &mut *ptr) };
    std::mem::forget(plugin);
}

impl<T: 'static, B: AsMut<dyn Plugin<T>> + Sync + Send + 'static> Plugin<T> for B {
    fn handle_message(&mut self, message: Message<T>) -> Option<u8> {
        self.as_mut().handle_message(message)
    }
}

pub type FfiPluginInit<T> = unsafe extern "C" fn(*mut Box<dyn Plugin<T>>, Option<&dyn Any>);