mill_io/
object_pool.rs

1#[cfg(feature = "unstable-mpmc")]
2use std::sync::mpmc as channel;
3#[cfg(not(feature = "unstable-mpmc"))]
4use std::sync::mpsc as channel;
5use std::sync::{Arc, Mutex};
6const IO_BUFFER_SIZE: usize = 8192;
7
8#[derive(Clone)]
9pub struct ObjectPool<T> {
10    sender: channel::Sender<T>,
11    receiver: Arc<Mutex<channel::Receiver<T>>>,
12    create_fn: Arc<dyn Fn() -> T + Send + Sync>,
13}
14
15impl<T: Send + 'static> ObjectPool<T> {
16    pub fn new<F>(initial_size: usize, create_fn: F) -> Self
17    where
18        F: Fn() -> T + Send + Sync + 'static,
19    {
20        let (sender, receiver) = channel::channel();
21
22        for _ in 0..initial_size {
23            sender
24                .send(create_fn())
25                .expect("Failed to initialize ObjectPool");
26        }
27
28        Self {
29            sender,
30            receiver: Arc::new(Mutex::new(receiver)),
31            create_fn: Arc::new(create_fn),
32        }
33    }
34
35    pub fn acquire(&self) -> PooledObject<T> {
36        let mut object = {
37            let receiver = self.receiver.lock().unwrap();
38            match receiver.try_recv() {
39                Ok(obj) => obj,
40                Err(channel::TryRecvError::Empty) => (self.create_fn)(),
41                Err(channel::TryRecvError::Disconnected) => {
42                    panic!("ObjectPool sender disconnected!");
43                }
44            }
45        };
46
47        if let Some(vec) = (&mut object as &mut dyn std::any::Any).downcast_mut::<Vec<u8>>() {
48            vec.clear();
49            vec.resize(IO_BUFFER_SIZE, 0);
50        }
51
52        PooledObject {
53            object: Some(object),
54            pool_sender: self.sender.clone(),
55        }
56    }
57}
58
59pub struct PooledObject<T> {
60    object: Option<T>,
61    pool_sender: channel::Sender<T>,
62}
63
64impl<T> std::convert::AsMut<T> for PooledObject<T> {
65    fn as_mut(&mut self) -> &mut T {
66        self.object.as_mut().expect("PooledObject is empty")
67    }
68}
69
70impl<T> std::convert::AsRef<T> for PooledObject<T> {
71    fn as_ref(&self) -> &T {
72        self.object.as_ref().expect("PooledObject is empty")
73    }
74}
75
76impl<T> Drop for PooledObject<T> {
77    fn drop(&mut self) {
78        if let Some(object) = self.object.take() {
79            let _ = self.pool_sender.send(object);
80        }
81    }
82}