1use crate::utils::take_t;
4use crossbeam::queue::ArrayQueue;
5use fxhash::FxHashMap;
6use once_cell::sync::Lazy;
7use parking_lot::Mutex;
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9use std::{
10 borrow::Borrow,
11 cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd},
12 collections::HashMap,
13 default::Default,
14 fmt::Debug,
15 hash::{Hash, Hasher},
16 mem::{self, ManuallyDrop},
17 ops::{Deref, DerefMut},
18 ptr,
19 sync::{Arc, Weak},
20};
21
22pub mod pooled;
23#[cfg(test)]
24mod test;
25
26pub unsafe trait RawPoolable: Send + Sized {
45 fn empty(pool: WeakPool<Self>) -> Self;
47
48 fn reset(&mut self);
51
52 fn capacity(&self) -> usize;
54
55 fn really_drop(self);
59}
60
61pub trait Poolable {
62 fn empty() -> Self;
64
65 fn reset(&mut self);
68
69 fn capacity(&self) -> usize;
71
72 fn really_dropped(&self) -> bool {
75 true
76 }
77}
78
79#[derive(Debug, Clone)]
85pub struct Pooled<T: Poolable + Send + 'static> {
86 pool: ManuallyDrop<WeakPool<Self>>,
87 object: ManuallyDrop<T>,
88}
89
90unsafe impl<T: Poolable + Send + 'static> RawPoolable for Pooled<T> {
91 fn empty(pool: WeakPool<Self>) -> Self {
92 Pooled {
93 pool: ManuallyDrop::new(pool),
94 object: ManuallyDrop::new(Poolable::empty()),
95 }
96 }
97
98 fn reset(&mut self) {
99 Poolable::reset(&mut *self.object)
100 }
101
102 fn capacity(&self) -> usize {
103 Poolable::capacity(&*self.object)
104 }
105
106 fn really_drop(self) {
107 drop(self.detach())
108 }
109}
110
111impl<T: Poolable + Send + 'static> Borrow<T> for Pooled<T> {
112 fn borrow(&self) -> &T {
113 &self.object
114 }
115}
116
117impl Borrow<str> for Pooled<String> {
118 fn borrow(&self) -> &str {
119 &self.object
120 }
121}
122
123impl<T: Poolable + Send + 'static + PartialEq> PartialEq for Pooled<T> {
124 fn eq(&self, other: &Pooled<T>) -> bool {
125 self.object.eq(&other.object)
126 }
127}
128
129impl<T: Poolable + Send + 'static + Eq> Eq for Pooled<T> {}
130
131impl<T: Poolable + Send + 'static + PartialOrd> PartialOrd for Pooled<T> {
132 fn partial_cmp(&self, other: &Pooled<T>) -> Option<Ordering> {
133 self.object.partial_cmp(&other.object)
134 }
135}
136
137impl<T: Poolable + Send + 'static + Ord> Ord for Pooled<T> {
138 fn cmp(&self, other: &Pooled<T>) -> Ordering {
139 self.object.cmp(&other.object)
140 }
141}
142
143impl<T: Poolable + Send + 'static + Hash> Hash for Pooled<T> {
144 fn hash<H>(&self, state: &mut H)
145 where
146 H: Hasher,
147 {
148 Hash::hash(&self.object, state)
149 }
150}
151
152impl<T: Poolable + Send + 'static> Pooled<T> {
153 pub fn orphan(t: T) -> Self {
156 Pooled { pool: ManuallyDrop::new(WeakPool::new()), object: ManuallyDrop::new(t) }
157 }
158
159 pub fn assign(&mut self, pool: &Pool<T>) {
164 let old = mem::replace(&mut self.pool, ManuallyDrop::new(pool.downgrade()));
165 drop(ManuallyDrop::into_inner(old))
166 }
167
168 pub fn detach(self) -> T {
170 let mut t = ManuallyDrop::new(self);
171 unsafe {
172 ManuallyDrop::drop(&mut t.pool);
173 ManuallyDrop::take(&mut t.object)
174 }
175 }
176}
177
178impl<T: Poolable + Send + 'static> AsRef<T> for Pooled<T> {
179 fn as_ref(&self) -> &T {
180 &self.object
181 }
182}
183
184impl<T: Poolable + Send + 'static> Deref for Pooled<T> {
185 type Target = T;
186
187 fn deref(&self) -> &T {
188 &self.object
189 }
190}
191
192impl<T: Poolable + Send + 'static> DerefMut for Pooled<T> {
193 fn deref_mut(&mut self) -> &mut T {
194 &mut self.object
195 }
196}
197
198impl<T: Poolable + Send + 'static> Drop for Pooled<T> {
199 fn drop(&mut self) {
200 if self.really_dropped() {
201 match self.pool.upgrade() {
202 Some(pool) => pool.insert(unsafe { ptr::read(self) }),
203 None => unsafe {
204 ManuallyDrop::drop(&mut self.pool);
205 ManuallyDrop::drop(&mut self.object);
206 },
207 }
208 }
209 }
210}
211
212impl<T: Poolable + Send + 'static + Serialize> Serialize for Pooled<T> {
213 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
214 where
215 S: serde::Serializer,
216 {
217 self.object.serialize(serializer)
218 }
219}
220
221impl<'de, T: Poolable + Send + 'static + DeserializeOwned> Deserialize<'de>
222 for Pooled<T>
223{
224 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
225 where
226 D: serde::Deserializer<'de>,
227 {
228 let mut t = take_t::<T>(1000, 1000);
229 Self::deserialize_in_place(deserializer, &mut t)?;
230 Ok(t)
231 }
232
233 fn deserialize_in_place<D>(deserializer: D, place: &mut Self) -> Result<(), D::Error>
234 where
235 D: serde::Deserializer<'de>,
236 {
237 <T as Deserialize>::deserialize_in_place(deserializer, &mut place.object)
238 }
239}
240
241trait Prune {
242 fn prune(&self);
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
246struct Pid(u32);
247
248impl Pid {
249 fn new() -> Self {
250 use std::sync::atomic::{AtomicU32, Ordering};
251 static PID: AtomicU32 = AtomicU32::new(0);
252 Self(PID.fetch_add(1, Ordering::Relaxed))
253 }
254}
255
256struct GlobalState {
257 pools: FxHashMap<Pid, Box<dyn Prune + Send + 'static>>,
258 pool_shark_running: bool,
259}
260
261static POOLS: Lazy<Mutex<GlobalState>> = Lazy::new(|| {
262 Mutex::new(GlobalState { pools: HashMap::default(), pool_shark_running: false })
263});
264
265#[derive(Debug)]
266struct PoolInner<T: RawPoolable> {
267 pool: ArrayQueue<T>,
268 max_elt_capacity: usize,
269 id: Pid,
270}
271
272impl<T: RawPoolable> Drop for PoolInner<T> {
273 fn drop(&mut self) {
274 while let Some(t) = self.pool.pop() {
275 RawPoolable::really_drop(t)
276 }
277 }
278}
279
280fn pool_shark() {
281 use std::{
282 thread::{sleep, spawn},
283 time::Duration,
284 };
285 spawn(|| loop {
286 sleep(Duration::from_secs(300));
287 {
288 for p in POOLS.lock().pools.values() {
289 p.prune()
290 }
291 }
292 });
293}
294
295#[derive(Clone, Debug)]
296pub struct WeakPool<T: RawPoolable>(Weak<PoolInner<T>>);
297
298impl<T: RawPoolable + Send + 'static> WeakPool<T> {
299 pub fn new() -> Self {
300 WeakPool(Weak::new())
301 }
302
303 pub fn upgrade(&self) -> Option<RawPool<T>> {
304 self.0.upgrade().map(RawPool)
305 }
306}
307
308pub type Pool<T> = RawPool<Pooled<T>>;
309
310#[derive(Clone, Debug)]
320pub struct RawPool<T: RawPoolable + Send + 'static>(Arc<PoolInner<T>>);
321
322impl<T: RawPoolable + Send + 'static> Drop for RawPool<T> {
323 fn drop(&mut self) {
324 if Arc::strong_count(&self.0) <= 2 {
326 let res = POOLS.lock().pools.remove(&self.0.id);
327 drop(res)
328 }
329 }
330}
331
332impl<T: RawPoolable + Send + 'static> Prune for RawPool<T> {
333 fn prune(&self) {
334 let len = self.0.pool.len();
335 let ten_percent = std::cmp::max(1, self.0.pool.capacity() / 10);
336 let one_percent = std::cmp::max(1, ten_percent / 10);
337 if len > ten_percent {
338 for _ in 0..ten_percent {
339 if let Some(v) = self.0.pool.pop() {
340 RawPoolable::really_drop(v)
341 }
342 }
343 } else if len > one_percent {
344 for _ in 0..one_percent {
345 if let Some(v) = self.0.pool.pop() {
346 RawPoolable::really_drop(v)
347 }
348 }
349 } else if len > 0 {
350 if let Some(v) = self.0.pool.pop() {
351 RawPoolable::really_drop(v)
352 }
353 }
354 }
355}
356
357impl<T: RawPoolable + Send + 'static> RawPool<T> {
358 pub fn downgrade(&self) -> WeakPool<T> {
359 WeakPool(Arc::downgrade(&self.0))
360 }
361
362 pub fn new(max_capacity: usize, max_elt_capacity: usize) -> RawPool<T> {
367 let id = Pid::new();
368 let t = RawPool(Arc::new(PoolInner {
369 pool: ArrayQueue::new(max_capacity),
370 max_elt_capacity,
371 id,
372 }));
373 let mut gs = POOLS.lock();
374 gs.pools.insert(id, Box::new(RawPool(Arc::clone(&t.0))));
375 if !gs.pool_shark_running {
376 gs.pool_shark_running = true;
377 pool_shark()
378 }
379 t
380 }
381
382 pub fn try_take(&self) -> Option<T> {
384 self.0.pool.pop()
385 }
386
387 pub fn take(&self) -> T {
389 self.0.pool.pop().unwrap_or_else(|| RawPoolable::empty(self.downgrade()))
390 }
391
392 pub fn insert(&self, mut t: T) {
395 let cap = t.capacity();
396 if cap > 0 && cap <= self.0.max_elt_capacity {
397 t.reset();
398 if let Err(t) = self.0.pool.push(t) {
399 RawPoolable::really_drop(t)
400 }
401 } else {
402 RawPoolable::really_drop(t)
403 }
404 }
405}