1use parking_lot::Mutex;
2#[cfg(feature = "unstable-mpmc")]
3use std::sync::mpmc as channel;
4#[cfg(not(feature = "unstable-mpmc"))]
5use std::sync::mpsc as channel;
6use std::sync::Arc;
7const IO_BUFFER_SIZE: usize = 8192;
8
9#[derive(Clone)]
10pub struct ObjectPool<T> {
11 sender: channel::Sender<T>,
12 receiver: Arc<Mutex<channel::Receiver<T>>>,
13 create_fn: Arc<dyn Fn() -> T + Send + Sync>,
14}
15
16impl<T: Send + 'static> ObjectPool<T> {
17 pub fn new<F>(initial_size: usize, create_fn: F) -> Self
18 where
19 F: Fn() -> T + Send + Sync + 'static,
20 {
21 let (sender, receiver) = channel::channel();
22
23 for _ in 0..initial_size {
24 sender
25 .send(create_fn())
26 .expect("Failed to initialize ObjectPool");
27 }
28
29 Self {
30 sender,
31 receiver: Arc::new(Mutex::new(receiver)),
32 create_fn: Arc::new(create_fn),
33 }
34 }
35
36 pub fn acquire(&self) -> PooledObject<T> {
37 let mut object = {
38 let receiver = self.receiver.lock();
39 match receiver.try_recv() {
40 Ok(obj) => obj,
41 Err(channel::TryRecvError::Empty) => (self.create_fn)(),
42 Err(channel::TryRecvError::Disconnected) => {
43 panic!("ObjectPool sender disconnected!");
44 }
45 }
46 };
47
48 if let Some(vec) = (&mut object as &mut dyn std::any::Any).downcast_mut::<Vec<u8>>() {
49 vec.clear();
50 vec.resize(IO_BUFFER_SIZE, 0);
51 }
52
53 PooledObject {
54 object: Some(object),
55 pool_sender: self.sender.clone(),
56 }
57 }
58}
59
60pub struct PooledObject<T> {
61 object: Option<T>,
62 pool_sender: channel::Sender<T>,
63}
64
65impl<T> std::convert::AsMut<T> for PooledObject<T> {
66 fn as_mut(&mut self) -> &mut T {
67 self.object.as_mut().expect("PooledObject is empty")
68 }
69}
70
71impl<T> std::convert::AsRef<T> for PooledObject<T> {
72 fn as_ref(&self) -> &T {
73 self.object.as_ref().expect("PooledObject is empty")
74 }
75}
76
77impl<T> Drop for PooledObject<T> {
78 fn drop(&mut self) {
79 if let Some(object) = self.object.take() {
80 let _ = self.pool_sender.send(object);
81 }
82 }
83}