Skip to main content

mill_io/
object_pool.rs

1use parking_lot::Mutex;
2#[cfg(feature = "unstable-mpmc")]
3use std::sync::mpmc as channel;
4#[cfg(not(feature = "unstable-mpmc"))]
5use std::sync::mpsc as channel;
6use std::sync::Arc;
7const IO_BUFFER_SIZE: usize = 8192;
8
9#[derive(Clone)]
10pub struct ObjectPool<T> {
11    sender: channel::Sender<T>,
12    receiver: Arc<Mutex<channel::Receiver<T>>>,
13    create_fn: Arc<dyn Fn() -> T + Send + Sync>,
14}
15
16impl<T: Send + 'static> ObjectPool<T> {
17    pub fn new<F>(initial_size: usize, create_fn: F) -> Self
18    where
19        F: Fn() -> T + Send + Sync + 'static,
20    {
21        let (sender, receiver) = channel::channel();
22
23        for _ in 0..initial_size {
24            sender
25                .send(create_fn())
26                .expect("Failed to initialize ObjectPool");
27        }
28
29        Self {
30            sender,
31            receiver: Arc::new(Mutex::new(receiver)),
32            create_fn: Arc::new(create_fn),
33        }
34    }
35
36    pub fn acquire(&self) -> PooledObject<T> {
37        let mut object = {
38            let receiver = self.receiver.lock();
39            match receiver.try_recv() {
40                Ok(obj) => obj,
41                Err(channel::TryRecvError::Empty) => (self.create_fn)(),
42                Err(channel::TryRecvError::Disconnected) => {
43                    panic!("ObjectPool sender disconnected!");
44                }
45            }
46        };
47
48        if let Some(vec) = (&mut object as &mut dyn std::any::Any).downcast_mut::<Vec<u8>>() {
49            vec.clear();
50            vec.resize(IO_BUFFER_SIZE, 0);
51        }
52
53        PooledObject {
54            object: Some(object),
55            pool_sender: self.sender.clone(),
56        }
57    }
58}
59
60pub struct PooledObject<T> {
61    object: Option<T>,
62    pool_sender: channel::Sender<T>,
63}
64
65impl<T> std::convert::AsMut<T> for PooledObject<T> {
66    fn as_mut(&mut self) -> &mut T {
67        self.object.as_mut().expect("PooledObject is empty")
68    }
69}
70
71impl<T> std::convert::AsRef<T> for PooledObject<T> {
72    fn as_ref(&self) -> &T {
73        self.object.as_ref().expect("PooledObject is empty")
74    }
75}
76
77impl<T> Drop for PooledObject<T> {
78    fn drop(&mut self) {
79        if let Some(object) = self.object.take() {
80            let _ = self.pool_sender.send(object);
81        }
82    }
83}