1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
use crate::{Error, FromBytesOwned, Plugin, ToBytes};
use std::{
collections::{HashMap, VecDeque},
sync::{Arc, Condvar, Mutex, RwLock, Weak},
};
/// `PoolBuilder` is used to configure and create `Pool`s
#[derive(Debug, Clone)]
pub struct PoolBuilder {
/// Max number of concurrent instances for a plugin - by default this is set to the output of
/// `std::thread::available_parallelism`
pub max_instances: usize,
}
impl PoolBuilder {
/// Create a `PoolBuilder` with default values
pub fn new() -> Self {
Self::default()
}
/// Set the max number of parallel instances
pub fn with_max_instances(mut self, n: usize) -> Self {
self.max_instances = n;
self
}
/// Create a new `Pool` with the given configuration
pub fn build<F: 'static + Fn() -> Result<Plugin, Error> + Send + Sync>(
self,
source: F,
) -> Pool {
Pool::new_from_builder(source, self)
}
}
impl Default for PoolBuilder {
fn default() -> Self {
PoolBuilder {
max_instances: std::thread::available_parallelism()
.expect("available parallelism")
.into(),
}
}
}
type PluginSource = dyn Fn() -> Result<Plugin, Error> + Send + Sync;
struct PoolInner {
plugin_source: Box<PluginSource>,
/// Available plugins ready to be checked out
available: VecDeque<Plugin>,
/// Current number of plugins (checked out + available)
current_size: usize,
/// Maximum number of plugins
max_size: usize,
}
/// `Pool` manages threadsafe access to a limited number of instances of multiple plugins
#[derive(Clone)]
pub struct Pool {
inner: Arc<Mutex<PoolInner>>,
cond: Arc<Condvar>,
existing_functions: Arc<RwLock<HashMap<String, bool>>>,
}
impl Pool {
/// Create a new pool with the default configuration
pub fn new<F: 'static + Fn() -> Result<Plugin, Error> + Send + Sync>(source: F) -> Self {
Self::new_from_builder(source, PoolBuilder::default())
}
/// Create a new pool configured using a `PoolBuilder`
pub fn new_from_builder<F: 'static + Fn() -> Result<Plugin, Error> + Send + Sync>(
source: F,
builder: PoolBuilder,
) -> Self {
let cond = Arc::new(Condvar::new());
Pool {
inner: Arc::new(Mutex::new(PoolInner {
plugin_source: Box::new(source),
available: VecDeque::new(),
current_size: 0,
max_size: builder.max_instances,
})),
cond,
existing_functions: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Get the number of live instances for a plugin (both checked out and available)
pub fn count(&self) -> usize {
self.inner.lock().unwrap().current_size
}
/// Get access to a plugin, this will create a new instance if needed (and allowed by the specified
/// max_instances). `Ok(None)` is returned if the timeout is reached before an available plugin could be
/// acquired
pub fn get(&self, timeout: std::time::Duration) -> Result<Option<PoolPlugin>, Error> {
let start = std::time::Instant::now();
// Hold lock throughout except when waiting on condition variable
let mut inner = self.inner.lock().unwrap();
loop {
// Try to pop an available plugin from the queue
if let Some(plugin) = inner.available.pop_front() {
return Ok(Some(PoolPlugin {
plugin: Some(plugin),
pool: Arc::downgrade(&self.inner),
cond: self.cond.clone(),
}));
}
// Create new plugin if under capacity
if inner.current_size < inner.max_size {
let plugin = (*inner.plugin_source)()?;
inner.current_size += 1;
return Ok(Some(PoolPlugin {
plugin: Some(plugin),
pool: Arc::downgrade(&self.inner),
cond: self.cond.clone(),
}));
}
// All plugins busy and at capacity. Check if we should keep waiting.
let elapsed = std::time::Instant::now() - start;
if elapsed >= timeout {
return Ok(None);
}
// Wait for a plugin to be returned. wait_timeout releases the lock while
// waiting and re-acquires it when woken. Loop back to check availability.
let remaining = timeout - elapsed;
let (guard, wait_result) = self.cond.wait_timeout(inner, remaining).unwrap();
inner = guard;
if wait_result.timed_out() {
return Ok(None);
}
}
}
/// Access a plugin in a callback function. This calls `Pool::get` then the provided callback. `Ok(None)`
/// is returned if the timeout is reached before an available plugin could be acquired
pub fn with_plugin<T>(
&self,
timeout: std::time::Duration,
f: impl FnOnce(&mut Plugin) -> Result<T, Error>,
) -> Result<Option<T>, Error> {
if let Some(mut plugin) = self.get(timeout)? {
return f(&mut plugin).map(Some);
}
Ok(None)
}
/// Returns `true` if the given function exists, otherwise `false`. Results are cached after the first
/// call.
pub fn function_exists(&self, name: &str, timeout: std::time::Duration) -> Result<bool, Error> {
// read current value if any
let read = self.existing_functions.read().unwrap();
let exists_opt = read.get(name).cloned();
drop(read);
if let Some(exists) = exists_opt {
Ok(exists)
} else {
// load plugin and call function_exists
let plugin = self.get(timeout)?;
if let Some(p) = plugin.as_ref() {
let exists = p.plugin.as_ref().unwrap().function_exists(name);
// write result to hashmap
let mut write = self.existing_functions.write().unwrap();
write.insert(name.to_string(), exists);
Ok(exists)
} else {
// Timeout - return false but don't cache, since we don't
// actually know whether the function exists
Ok(false)
}
}
}
}
/// `PoolPlugin` wraps a plugin checked out from a pool. When dropped, the plugin is automatically returned
/// to the pool.
pub struct PoolPlugin {
/// The checked-out plugin. Wrapped in `Option` so it can be moved out on drop.
plugin: Option<Plugin>,
/// Weak reference to the pool, used to return the plugin on drop. Using `Weak` allows the pool
/// to be fully dropped even if plugins are still checked out; when those plugins are dropped,
/// they'll see the pool is gone and simply drop themselves.
pool: Weak<Mutex<PoolInner>>,
/// Condition variable to notify waiters when this plugin is returned.
cond: Arc<Condvar>,
}
impl std::fmt::Debug for PoolPlugin {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PoolPlugin")
.field("plugin", &self.plugin.as_ref().map(|p| p.id))
.finish()
}
}
impl PoolPlugin {
/// Helper to call a plugin function on the underlying plugin
pub fn call<'a, Input: ToBytes<'a>, Output: FromBytesOwned>(
&mut self,
name: impl AsRef<str>,
input: Input,
) -> Result<Output, Error> {
self.plugin
.as_mut()
.expect("plugin is Some until Drop runs")
.call(name.as_ref(), input)
}
/// Helper to get the underlying plugin's ID
pub fn id(&self) -> uuid::Uuid {
self.plugin
.as_ref()
.expect("plugin is Some until Drop runs")
.id
}
}
impl std::ops::Deref for PoolPlugin {
type Target = Plugin;
fn deref(&self) -> &Self::Target {
self.plugin
.as_ref()
.expect("plugin is Some until Drop runs")
}
}
impl std::ops::DerefMut for PoolPlugin {
fn deref_mut(&mut self) -> &mut Self::Target {
self.plugin
.as_mut()
.expect("plugin is Some until Drop runs")
}
}
impl Drop for PoolPlugin {
fn drop(&mut self) {
if let Some(plugin) = self.plugin.take() {
if let Some(inner) = self.pool.upgrade() {
let mut guard = inner.lock().unwrap();
guard.available.push_back(plugin);
drop(guard);
self.cond.notify_one();
}
// If pool is gone, just drop the plugin
}
}
}