1extern crate log;
85extern crate parking_lot;
86
87use std::collections::BTreeMap;
88use std::mem::{forget, ManuallyDrop};
89use std::ops::{Deref, DerefMut};
90use std::time::{Duration, Instant};
91
92use parking_lot::RwLock;
93
94#[cfg(not(test))]
95use log::{debug, error, info};
96
97#[cfg(test)]
98use std::{println as info, println as error, println as debug};
99
100const DEFAULT_POOL_INDEXES: usize = 32;
101const DEFAULT_SINGLE_POOL_SIZE: usize = 8;
102const DEFAULT_EXPIRATION: Duration = Duration::from_secs(300);
103
104#[derive(Debug)]
105struct Inner<T> {
106 inner: T,
107 start_time: Instant,
108}
109
110impl<T> Inner<T> {
111 pub fn new(object: T, start_time: Instant) -> Self {
112 Self { inner: object, start_time }
113 }
114
115 pub fn expired(&self, duration: Duration) -> bool {
116 self.start_time.elapsed().as_millis() > duration.as_millis()
117 }
118}
119
120#[derive(Debug)]
121pub struct Pool<T> {
122 max_pool_indexes: usize,
123 max_single_pool_size: usize,
124 expiration: Duration,
125 objects: RwLock<BTreeMap<String, Vec<Inner<T>>>>,
126}
127
128impl<T> Pool<T> {
129 pub fn new(max_pool_indexes: usize, max_single_pool_size: usize, expiration: Duration) -> Pool<T> {
130 Pool { max_pool_indexes, max_single_pool_size, expiration, objects: RwLock::new(BTreeMap::new()) }
131 }
132
133 pub fn default() -> Pool<T> {
134 Pool::new(DEFAULT_POOL_INDEXES, DEFAULT_SINGLE_POOL_SIZE, DEFAULT_EXPIRATION)
135 }
136
137 pub fn size(&self) -> usize {
138 self.objects.read().len()
139 }
140
141 pub fn len(&self, item: &str) -> usize {
142 match self.objects.read().get(item) {
143 Some(item) => item.len(),
144 None => 0,
145 }
146 }
147
148 pub fn is_full(&self) -> bool {
149 self.size() >= self.max_pool_indexes
150 }
151
152 fn expunge_oldest(&self) {
153 if !self.is_full() {
154 debug!("Object pool is not full, nothing to remove");
155 return;
156 }
157 let mut last = String::new();
158 if let Some((obj, _)) = self.objects.read().iter().next() {
159 last = obj.clone();
160 }
161 if !last.is_empty() {
162 debug!("Removing oldest element in the queue: {}", last);
163 self.objects.write().remove(&last);
164 } else {
165 error!("Unable to find an element to remove from the queue, next allocation could fail");
166 }
167 }
168
169 fn try_pull(&self, item: &str) -> Option<Reusable<T>> {
170 match self.objects.write().get_mut(item) {
171 Some(objects) => {
172 info!("Pool for {} is currently of {} objects", item, objects.len());
173 if objects.len() > self.max_single_pool_size {
174 objects.pop();
175 }
176 match objects.pop() {
177 Some(object) => {
178 if object.expired(self.expiration) {
179 info!(
180 "Element {} has reached expiration time of {} ms, evicting from pool",
181 item,
182 object.start_time.elapsed().as_millis()
183 );
184 None
185 } else {
186 info!(
187 "Reusing element pool {} created {} ms ago",
188 item,
189 object.start_time.elapsed().as_millis()
190 );
191 Some(Reusable::new(self, item.to_string(), object.start_time, object.inner))
192 }
193 }
194 None => {
195 debug!("Element {} pool is empty", item);
196 None
197 }
198 }
199 }
200 None => {
201 debug!("Unable to find element {} in objects pool", item);
202 None
203 }
204 }
205 }
206
207 fn attach_time(&self, item: &str, start_time: Instant, t: T) {
208 self.expunge_oldest();
209 if self.objects.read().contains_key(item) {
210 debug!("Creating new pool of {} elements and attatching object to it", self.max_single_pool_size);
211 self.objects.write().get_mut(item).unwrap().push(Inner::new(t, start_time))
212 } else {
213 debug!(
214 "Attatching element {} to existing pool of max {} elements",
215 self.len(item) + 1,
216 self.max_single_pool_size
217 );
218 self.objects.write().insert(item.to_string(), vec![Inner::new(t, start_time)]);
219 }
220 }
221
222 pub fn pull<F: Fn() -> T>(&self, item: &str, fallback: F) -> Reusable<T> {
223 match self.try_pull(item) {
224 Some(object) => object,
225 None => {
226 info!("Creating new element {} with a pool of {} instances", item, self.max_single_pool_size);
227 for _ in 0..self.max_single_pool_size {
228 self.attach(item, fallback())
229 }
230 self.pull(item, fallback)
231 }
232 }
233 }
234
235 pub fn attach(&self, item: &str, t: T) {
236 self.attach_time(item, Instant::now(), t);
237 }
238}
239
240pub struct Reusable<'a, T> {
241 item: String,
242 pool: &'a Pool<T>,
243 data: ManuallyDrop<T>,
244 start_time: Instant,
245}
246
247impl<'a, T> Reusable<'a, T> {
248 pub fn new(pool: &'a Pool<T>, item: String, start_time: Instant, t: T) -> Self {
249 Self { item, pool, data: ManuallyDrop::new(t), start_time }
250 }
251
252 pub fn detach(mut self) -> (&'a Pool<T>, T) {
253 let ret = unsafe { (self.pool, self.take()) };
254 info!("Detaching object from element {} pool", self.item);
255 forget(self);
256 ret
257 }
258
259 unsafe fn take(&mut self) -> T {
260 ManuallyDrop::take(&mut self.data)
261 }
262}
263
264impl<'a, T> Deref for Reusable<'a, T> {
265 type Target = T;
266
267 fn deref(&self) -> &Self::Target {
268 &self.data
269 }
270}
271
272impl<'a, T> DerefMut for Reusable<'a, T> {
273 fn deref_mut(&mut self) -> &mut Self::Target {
274 &mut self.data
275 }
276}
277
278impl<'a, T> Drop for Reusable<'a, T> {
279 fn drop(&mut self) {
280 let value = unsafe { self.take() };
281 info!("Re-attatching object to element {} object pool", self.item);
282 self.pool.attach_time(&self.item, self.start_time, value)
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use std::mem::drop;
290 use std::thread;
291
292 use pretty_assertions::assert_eq;
293
294 #[derive(Debug)]
295 struct Obj {
296 idx: usize,
297 }
298
299 impl Obj {
300 fn new(idx: usize) -> Self {
301 Self { idx }
302 }
303 }
304
305 #[test]
306 fn test_detach() {
307 let pool = Pool::default();
308 let (pool, object) = pool.pull("item1", || Obj::new(1)).detach();
309 assert_eq!(object.idx, 1);
310 assert_eq!(pool.len("item1"), 7);
311 drop(object);
312 assert_eq!(pool.len("item1"), 7);
313 }
314
315 #[test]
316 fn test_detach_then_attach() {
317 let pool = Pool::default();
318 let (pool, object) = pool.pull("item1", || Obj::new(1)).detach();
319 assert_eq!(object.idx, 1);
320 assert_eq!(pool.len("item1"), 7);
321 pool.attach("item1", object);
322 assert_eq!(pool.try_pull("item1").unwrap().idx, 1);
323 assert_eq!(pool.len("item1"), 8);
324 }
325
326 #[test]
327 fn test_pull_and_size() {
328 let pool = Pool::default();
329 pool.attach("item1", Obj::new(1));
330 assert_eq!(pool.size(), 1);
331
332 let object1 = pool.try_pull("item1");
333 let object2 = pool.try_pull("item1");
334 let object3 = pool.pull("item2", || Obj::new(2));
335 assert_eq!(pool.size(), 2);
336
337 assert_eq!(object1.is_some(), true);
338 assert_eq!(object2.is_none(), true);
339
340 assert_eq!(pool.len("item1"), 0);
341 drop(object1);
342 assert_eq!(pool.len("item1"), 1);
343 drop(object2);
344 assert_eq!(pool.len("item1"), 1);
345
346 assert_eq!(object3.idx, 2);
347 assert_eq!(pool.len("item2"), 7);
348 drop(object3);
349 assert_eq!(pool.len("item2"), 8);
350 }
351
352 #[test]
353 fn test_fill_up_pool() {
354 let pool = Pool::default();
355 for x in 0..DEFAULT_POOL_INDEXES {
356 pool.attach(&format!("item{}", x), Obj::new(x));
357 assert_eq!(pool.size(), x + 1)
358 }
359 for (_, obj) in pool.objects.read().iter() {
360 assert_eq!(obj.len(), 1);
361 }
362 }
363
364 #[test]
365 fn test_expire_pool() {
366 let pool = Pool::new(DEFAULT_POOL_INDEXES, DEFAULT_SINGLE_POOL_SIZE, Duration::from_secs(1));
367 for x in 1..7 {
368 pool.attach(&format!("item{}", x), Obj::new(x));
369 }
370 assert_eq!(pool.size(), 6);
371 thread::sleep(Duration::from_millis(1500));
372 for x in 1..7 {
373 assert_eq!(pool.try_pull(&format!("item{})", x)).is_none(), true);
374 }
375 for x in 1..7 {
376 pool.pull(&format!("item{})", x), || Obj::new(x));
377 }
378 for x in 1..7 {
379 assert_eq!(pool.try_pull(&format!("item{})", x)).is_some(), true);
380 }
381 }
382
383 #[test]
384 fn test_smoke() {
385 let pool = Pool::default();
386 for x in 0..10000 {
387 let obj = pool.pull(&format!("item{}", x), || Obj::new(x));
388 assert_eq!(obj.data.idx, x);
389 if x >= 32 {
390 assert!(pool.size() >= 31);
391 }
392 }
393 }
394}