1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2use object_id::UniqueId;
3use parking_lot::Mutex;
4use std::collections::{BTreeSet, VecDeque};
5use std::future::Future;
6use std::ops::{Deref, DerefMut};
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll, Waker};
10
11type ClientId = usize;
12
13struct ResourcePoolGet<'a, T> {
14 id: UniqueId,
15 pool: &'a ResourcePool<T>,
16 queued: bool,
17}
18
19impl<'a, T> Future for ResourcePoolGet<'a, T> {
20 type Output = ResourcePoolGuard<T>;
21 fn poll(
22 mut self: Pin<&mut ResourcePoolGet<'a, T>>,
23 cx: &mut Context<'_>,
24 ) -> Poll<Self::Output> {
25 let mut holder = self.pool.holder.lock();
26 if self.queued {
28 holder.confirm_waked(self.id());
29 }
30 if holder.wakers.is_empty() || self.queued {
32 if let Some(res) = holder.resources.pop() {
33 self.queued = false;
34 return Poll::Ready(ResourcePoolGuard {
35 resource: Some(res),
36 holder: self.pool.holder.clone(),
37 });
38 }
39 }
40 self.queued = true;
41 holder.append_callback(cx.waker().clone(), self.id());
42 Poll::Pending
43 }
44}
45
46impl<'a, T> ResourcePoolGet<'a, T> {
47 #[inline]
48 fn id(&self) -> ClientId {
49 self.id.as_usize()
50 }
51}
52
53impl<'a, T> Drop for ResourcePoolGet<'a, T> {
54 #[inline]
55 fn drop(&mut self) {
56 if self.queued {
58 self.pool.holder.lock().notify_get_fut_drop(self.id());
59 }
60 }
61}
62
63pub struct ResourceHolder<T> {
65 pub resources: Vec<T>,
66 wakers: VecDeque<(Waker, ClientId)>,
67 waker_ids: BTreeSet<ClientId>,
68 pending: BTreeSet<ClientId>,
69}
70
71impl<T> ResourceHolder<T> {
72 fn new(size: usize) -> Self {
73 Self {
74 resources: Vec::with_capacity(size),
75 wakers: <_>::default(),
76 waker_ids: <_>::default(),
77 pending: <_>::default(),
78 }
79 }
80
81 #[inline]
82 fn append_resource(&mut self, res: T) {
83 self.resources.push(res);
84 self.wake_next();
85 }
86
87 #[inline]
88 fn wake_next(&mut self) {
89 if let Some((waker, id)) = self.wakers.pop_front() {
90 self.pending.insert(id);
91 self.waker_ids.remove(&id);
92 waker.wake();
93 }
94 }
95
96 #[inline]
97 fn notify_get_fut_drop(&mut self, id: ClientId) {
98 if let Some(pos) = self.wakers.iter().position(|(_, i)| *i == id) {
100 self.wakers.remove(pos);
101 self.waker_ids.remove(&id);
102 }
103 if self.pending.remove(&id) {
105 self.wake_next();
106 }
107 }
108
109 #[inline]
110 fn confirm_waked(&mut self, id: ClientId) {
111 self.pending.remove(&id);
113 }
114
115 #[inline]
116 fn append_callback(&mut self, waker: Waker, id: ClientId) {
117 if !self.waker_ids.insert(id) {
118 return;
119 }
120 self.wakers.push_back((waker, id));
121 }
122}
123
124pub struct ResourcePool<T> {
126 pub holder: Arc<Mutex<ResourceHolder<T>>>,
127}
128
129impl<T> Default for ResourcePool<T> {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl<T> ResourcePool<T> {
136 pub fn new() -> Self {
138 Self {
139 holder: Arc::new(Mutex::new(ResourceHolder::new(0))),
140 }
141 }
142 pub fn with_capacity(size: usize) -> Self {
146 Self {
147 holder: Arc::new(Mutex::new(ResourceHolder::new(size))),
148 }
149 }
150
151 #[inline]
153 pub fn append(&self, res: T) {
154 let mut resources = self.holder.lock();
155 resources.append_resource(res);
156 }
157
158 #[inline]
160 pub fn get(&self) -> impl Future<Output = ResourcePoolGuard<T>> + '_ {
161 ResourcePoolGet {
162 id: <_>::default(),
163 pool: self,
164 queued: false,
165 }
166 }
167}
168
169pub struct ResourcePoolGuard<T> {
173 resource: Option<T>,
174 holder: Arc<Mutex<ResourceHolder<T>>>,
175}
176
177impl<T> ResourcePoolGuard<T> {
178 #[inline]
180 pub fn forget_resource(&mut self) {
181 self.resource.take();
182 }
183 #[inline]
185 pub fn replace_resource(&mut self, resource: T) {
186 self.resource.replace(resource);
187 }
188}
189
190impl<T> Deref for ResourcePoolGuard<T> {
191 type Target = T;
192 #[inline]
193 fn deref(&self) -> &Self::Target {
194 self.resource.as_ref().unwrap()
195 }
196}
197
198impl<T> DerefMut for ResourcePoolGuard<T> {
199 #[inline]
200 fn deref_mut(&mut self) -> &mut Self::Target {
201 self.resource.as_mut().unwrap()
202 }
203}
204
205impl<T> Drop for ResourcePoolGuard<T> {
206 fn drop(&mut self) {
207 if let Some(res) = self.resource.take() {
208 self.holder.lock().append_resource(res);
209 }
210 }
211}
212
213#[cfg(test)]
214mod test {
216 use super::ResourcePool;
217 use std::sync::Arc;
218 use std::time::Duration;
219 use std::time::Instant;
220 use tokio::sync::mpsc;
221 use tokio::time::sleep;
222
223 #[tokio::test(flavor = "multi_thread")]
224 async fn test_ordering() {
225 for _ in 0..5 {
226 let pool = Arc::new(ResourcePool::new());
227 let op = Instant::now();
228 pool.append(());
229 let n = 1_000;
230 let mut futs = Vec::new();
231 let (tx, mut rx) = mpsc::channel(n);
232 for i in 1..=n {
233 let p = pool.clone();
234 let tx = tx.clone();
235 let fut = tokio::spawn(async move {
236 sleep(Duration::from_millis(1)).await;
237 let _lock = p.get().await;
239 tx.send(i).await.unwrap();
240 println!("future {} locked {}", i, op.elapsed().as_millis());
241 sleep(Duration::from_millis(10)).await;
242 });
244 sleep(Duration::from_millis(2)).await;
245 if i > 1 && (i - 2) % 10 == 0 {
246 println!("future {} canceled", i);
247 fut.abort();
248 } else {
249 futs.push(fut);
250 }
251 }
252 for fut in futs {
253 tokio::time::timeout(Duration::from_secs(10), fut)
254 .await
255 .unwrap()
256 .unwrap();
257 }
258 let mut i = 0;
259 loop {
260 i += 1;
261 if i > 1 && (i - 2) % 10 == 0 {
262 i += 1;
263 }
264 if i > n {
265 break;
266 }
267 let fut_n = rx.recv().await.unwrap();
268 assert_eq!(i, fut_n);
269 }
270 assert!(
271 pool.holder.lock().pending.is_empty(),
272 "pool is poisoned (pendings)",
273 );
274 }
275 }
276 #[tokio::test(flavor = "multi_thread")]
277 async fn test_no_poisoning() {
278 let n = 2_000;
279 for i in 1..=n {
280 let pool: Arc<ResourcePool<()>> = Arc::new(ResourcePool::new());
281 let pool_c = pool.clone();
282 let fut1 = tokio::spawn(async move {
283 sleep(Duration::from_millis(1)).await;
284 let _resource = pool_c.get().await;
285 });
287 let pool_c = pool.clone();
288 let _fut2 = tokio::spawn(async move {
289 sleep(Duration::from_millis(2)).await;
290 let _resource = pool_c.get().await;
291 });
293 let pool_c = pool.clone();
294 let _fut3 = tokio::spawn(async move {
295 sleep(Duration::from_millis(3)).await;
296 let _resource = pool_c.get().await;
297 });
299 sleep(Duration::from_millis(2)).await;
300 if i % 2 == 0 {
301 pool.append(());
302 fut1.abort();
303 } else {
304 fut1.abort();
305 pool.append(());
306 }
307 sleep(Duration::from_millis(10)).await;
308 let holder = pool.holder.lock();
309 assert!(
310 holder.wakers.is_empty(),
311 "pool is poisoned {}/{}",
312 holder.wakers.len(),
313 holder.resources.len()
314 );
315 assert!(holder.pending.is_empty(), "pool is poisoned (pendings)",);
316 }
317 }
318}