1#[cfg(feature = "unstable-mpmc")]
2use std::sync::mpmc as channel;
3#[cfg(not(feature = "unstable-mpmc"))]
4use std::sync::mpsc as channel;
5use std::sync::{Arc, Mutex};
6const IO_BUFFER_SIZE: usize = 8192;
7
8#[derive(Clone)]
9pub struct ObjectPool<T> {
10 sender: channel::Sender<T>,
11 receiver: Arc<Mutex<channel::Receiver<T>>>,
12 create_fn: Arc<dyn Fn() -> T + Send + Sync>,
13}
14
15impl<T: Send + 'static> ObjectPool<T> {
16 pub fn new<F>(initial_size: usize, create_fn: F) -> Self
17 where
18 F: Fn() -> T + Send + Sync + 'static,
19 {
20 let (sender, receiver) = channel::channel();
21
22 for _ in 0..initial_size {
23 sender
24 .send(create_fn())
25 .expect("Failed to initialize ObjectPool");
26 }
27
28 Self {
29 sender,
30 receiver: Arc::new(Mutex::new(receiver)),
31 create_fn: Arc::new(create_fn),
32 }
33 }
34
35 pub fn acquire(&self) -> PooledObject<T> {
36 let mut object = {
37 let receiver = self.receiver.lock().unwrap();
38 match receiver.try_recv() {
39 Ok(obj) => obj,
40 Err(channel::TryRecvError::Empty) => (self.create_fn)(),
41 Err(channel::TryRecvError::Disconnected) => {
42 panic!("ObjectPool sender disconnected!");
43 }
44 }
45 };
46
47 if let Some(vec) = (&mut object as &mut dyn std::any::Any).downcast_mut::<Vec<u8>>() {
48 vec.clear();
49 vec.resize(IO_BUFFER_SIZE, 0);
50 }
51
52 PooledObject {
53 object: Some(object),
54 pool_sender: self.sender.clone(),
55 }
56 }
57}
58
59pub struct PooledObject<T> {
60 object: Option<T>,
61 pool_sender: channel::Sender<T>,
62}
63
64impl<T> std::convert::AsMut<T> for PooledObject<T> {
65 fn as_mut(&mut self) -> &mut T {
66 self.object.as_mut().expect("PooledObject is empty")
67 }
68}
69
70impl<T> std::convert::AsRef<T> for PooledObject<T> {
71 fn as_ref(&self) -> &T {
72 self.object.as_ref().expect("PooledObject is empty")
73 }
74}
75
76impl<T> Drop for PooledObject<T> {
77 fn drop(&mut self) {
78 if let Some(object) = self.object.take() {
79 let _ = self.pool_sender.send(object);
80 }
81 }
82}