simple_pool/
lib.rs

1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2use object_id::UniqueId;
3use parking_lot::Mutex;
4use std::collections::{BTreeSet, VecDeque};
5use std::future::Future;
6use std::ops::{Deref, DerefMut};
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll, Waker};
10
11type ClientId = usize;
12
13struct ResourcePoolGet<'a, T> {
14    id: UniqueId,
15    pool: &'a ResourcePool<T>,
16    queued: bool,
17}
18
19impl<'a, T> Future for ResourcePoolGet<'a, T> {
20    type Output = ResourcePoolGuard<T>;
21    fn poll(
22        mut self: Pin<&mut ResourcePoolGet<'a, T>>,
23        cx: &mut Context<'_>,
24    ) -> Poll<Self::Output> {
25        let mut holder = self.pool.holder.lock();
26        // notify the pool the waker is waked and we were not aborted
27        if self.queued {
28            holder.confirm_waked(self.id());
29        }
30        // there are no other futures waiting or we are queued and started from a waker
31        if holder.wakers.is_empty() || self.queued {
32            if let Some(res) = holder.resources.pop() {
33                self.queued = false;
34                return Poll::Ready(ResourcePoolGuard {
35                    resource: Some(res),
36                    holder: self.pool.holder.clone(),
37                });
38            }
39        }
40        self.queued = true;
41        holder.append_callback(cx.waker().clone(), self.id());
42        Poll::Pending
43    }
44}
45
46impl<'a, T> ResourcePoolGet<'a, T> {
47    #[inline]
48    fn id(&self) -> ClientId {
49        self.id.as_usize()
50    }
51}
52
53impl<'a, T> Drop for ResourcePoolGet<'a, T> {
54    #[inline]
55    fn drop(&mut self) {
56        // notify the pool we are dropped
57        if self.queued {
58            self.pool.holder.lock().notify_get_fut_drop(self.id());
59        }
60    }
61}
62
63/// Access directly only if you know what you are doing
64pub struct ResourceHolder<T> {
65    pub resources: Vec<T>,
66    wakers: VecDeque<(Waker, ClientId)>,
67    waker_ids: BTreeSet<ClientId>,
68    pending: BTreeSet<ClientId>,
69}
70
71impl<T> ResourceHolder<T> {
72    fn new(size: usize) -> Self {
73        Self {
74            resources: Vec::with_capacity(size),
75            wakers: <_>::default(),
76            waker_ids: <_>::default(),
77            pending: <_>::default(),
78        }
79    }
80
81    #[inline]
82    fn append_resource(&mut self, res: T) {
83        self.resources.push(res);
84        self.wake_next();
85    }
86
87    #[inline]
88    fn wake_next(&mut self) {
89        if let Some((waker, id)) = self.wakers.pop_front() {
90            self.pending.insert(id);
91            self.waker_ids.remove(&id);
92            waker.wake();
93        }
94    }
95
96    #[inline]
97    fn notify_get_fut_drop(&mut self, id: ClientId) {
98        // remove the future from wakers
99        if let Some(pos) = self.wakers.iter().position(|(_, i)| *i == id) {
100            self.wakers.remove(pos);
101            self.waker_ids.remove(&id);
102        }
103        // if the future was pending to get a resource, wake the next one
104        if self.pending.remove(&id) {
105            self.wake_next();
106        }
107    }
108
109    #[inline]
110    fn confirm_waked(&mut self, id: ClientId) {
111        // the resource is taken, remove from pending
112        self.pending.remove(&id);
113    }
114
115    #[inline]
116    fn append_callback(&mut self, waker: Waker, id: ClientId) {
117        if !self.waker_ids.insert(id) {
118            return;
119        }
120        self.wakers.push_back((waker, id));
121    }
122}
123
124/// Versatile resource pool
125pub struct ResourcePool<T> {
126    pub holder: Arc<Mutex<ResourceHolder<T>>>,
127}
128
129impl<T> Default for ResourcePool<T> {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl<T> ResourcePool<T> {
136    /// Create a new resource pool
137    pub fn new() -> Self {
138        Self {
139            holder: Arc::new(Mutex::new(ResourceHolder::new(0))),
140        }
141    }
142    /// Create a new resource pool with pre-allocated capacity
143    ///
144    /// The size parameter is used to pre-allocate memory for the resource holder only
145    pub fn with_capacity(size: usize) -> Self {
146        Self {
147            holder: Arc::new(Mutex::new(ResourceHolder::new(size))),
148        }
149    }
150
151    /// Append a resource to the pool
152    #[inline]
153    pub fn append(&self, res: T) {
154        let mut resources = self.holder.lock();
155        resources.append_resource(res);
156    }
157
158    /// Get a resource from the pool or wait until one is available
159    #[inline]
160    pub fn get(&self) -> impl Future<Output = ResourcePoolGuard<T>> + '_ {
161        ResourcePoolGet {
162            id: <_>::default(),
163            pool: self,
164            queued: false,
165        }
166    }
167}
168
169/// Returns a container with a resource
170///
171/// When dropped, the resource is sent back to the pool
172pub struct ResourcePoolGuard<T> {
173    resource: Option<T>,
174    holder: Arc<Mutex<ResourceHolder<T>>>,
175}
176
177impl<T> ResourcePoolGuard<T> {
178    /// Do not return resource back to the pool when dropped
179    #[inline]
180    pub fn forget_resource(&mut self) {
181        self.resource.take();
182    }
183    /// Replace resource with a new one
184    #[inline]
185    pub fn replace_resource(&mut self, resource: T) {
186        self.resource.replace(resource);
187    }
188}
189
190impl<T> Deref for ResourcePoolGuard<T> {
191    type Target = T;
192    #[inline]
193    fn deref(&self) -> &Self::Target {
194        self.resource.as_ref().unwrap()
195    }
196}
197
198impl<T> DerefMut for ResourcePoolGuard<T> {
199    #[inline]
200    fn deref_mut(&mut self) -> &mut Self::Target {
201        self.resource.as_mut().unwrap()
202    }
203}
204
205impl<T> Drop for ResourcePoolGuard<T> {
206    fn drop(&mut self) {
207        if let Some(res) = self.resource.take() {
208            self.holder.lock().append_resource(res);
209        }
210    }
211}
212
213#[cfg(test)]
214// the tests test the pool for various problems and may go pretty long
215mod test {
216    use super::ResourcePool;
217    use std::sync::Arc;
218    use std::time::Duration;
219    use std::time::Instant;
220    use tokio::sync::mpsc;
221    use tokio::time::sleep;
222
223    #[tokio::test(flavor = "multi_thread")]
224    async fn test_ordering() {
225        for _ in 0..5 {
226            let pool = Arc::new(ResourcePool::new());
227            let op = Instant::now();
228            pool.append(());
229            let n = 1_000;
230            let mut futs = Vec::new();
231            let (tx, mut rx) = mpsc::channel(n);
232            for i in 1..=n {
233                let p = pool.clone();
234                let tx = tx.clone();
235                let fut = tokio::spawn(async move {
236                    sleep(Duration::from_millis(1)).await;
237                    //println!("future {} started {}", i, op.elapsed().as_millis());
238                    let _lock = p.get().await;
239                    tx.send(i).await.unwrap();
240                    println!("future {} locked {}", i, op.elapsed().as_millis());
241                    sleep(Duration::from_millis(10)).await;
242                    //println!("future {} finished", i);
243                });
244                sleep(Duration::from_millis(2)).await;
245                if i > 1 && (i - 2) % 10 == 0 {
246                    println!("future {} canceled", i);
247                    fut.abort();
248                } else {
249                    futs.push(fut);
250                }
251            }
252            for fut in futs {
253                tokio::time::timeout(Duration::from_secs(10), fut)
254                    .await
255                    .unwrap()
256                    .unwrap();
257            }
258            let mut i = 0;
259            loop {
260                i += 1;
261                if i > 1 && (i - 2) % 10 == 0 {
262                    i += 1;
263                }
264                if i > n {
265                    break;
266                }
267                let fut_n = rx.recv().await.unwrap();
268                assert_eq!(i, fut_n);
269            }
270            assert!(
271                pool.holder.lock().pending.is_empty(),
272                "pool is poisoned (pendings)",
273            );
274        }
275    }
276    #[tokio::test(flavor = "multi_thread")]
277    async fn test_no_poisoning() {
278        let n = 2_000;
279        for i in 1..=n {
280            let pool: Arc<ResourcePool<()>> = Arc::new(ResourcePool::new());
281            let pool_c = pool.clone();
282            let fut1 = tokio::spawn(async move {
283                sleep(Duration::from_millis(1)).await;
284                let _resource = pool_c.get().await;
285                //println!("fut1 completed");
286            });
287            let pool_c = pool.clone();
288            let _fut2 = tokio::spawn(async move {
289                sleep(Duration::from_millis(2)).await;
290                let _resource = pool_c.get().await;
291                //println!("fut2 completed");
292            });
293            let pool_c = pool.clone();
294            let _fut3 = tokio::spawn(async move {
295                sleep(Duration::from_millis(3)).await;
296                let _resource = pool_c.get().await;
297                //println!("fut3 completed");
298            });
299            sleep(Duration::from_millis(2)).await;
300            if i % 2 == 0 {
301                pool.append(());
302                fut1.abort();
303            } else {
304                fut1.abort();
305                pool.append(());
306            }
307            sleep(Duration::from_millis(10)).await;
308            let holder = pool.holder.lock();
309            assert!(
310                holder.wakers.is_empty(),
311                "pool is poisoned {}/{}",
312                holder.wakers.len(),
313                holder.resources.len()
314            );
315            assert!(holder.pending.is_empty(), "pool is poisoned (pendings)",);
316        }
317    }
318}