1use crossbeam::queue::ArrayQueue;
2use fxhash::FxHashMap;
3use once_cell::sync::Lazy;
4use parking_lot::Mutex;
5use schemars::{schema::InstanceType, JsonSchema};
6use serde::{de::DeserializeOwned, Deserialize, Serialize};
7use std::{
8 any::{Any, TypeId},
9 borrow::Borrow,
10 cell::RefCell,
11 cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd},
12 collections::{HashMap, HashSet, VecDeque},
13 default::Default,
14 fmt::Debug,
15 hash::{BuildHasher, Hash, Hasher},
16 ops::{Deref, DerefMut},
17 sync::{Arc, Weak},
18};
19use triomphe::Arc as TArc;
20
21#[macro_export]
29macro_rules! pool {
30 ($vis:vis, $name:ident, $ty:ty, $max_capacity:expr, $max_elt_size:expr) => {
31 $vis fn $name() -> &'static api::utils::pool::Pool<$ty> {
32 static POOL: once_cell::race::OnceBox<api::utils::pool::Pool<$ty>> = once_cell::race::OnceBox::new();
33 POOL.get_or_init(|| Box::new(api::utils::pool::Pool::new($max_capacity, $max_elt_size)))
34 }
35 };
36 ($name:ident, $ty:ty, $max_capacity:expr, $max_elt_size:expr) => {
37 fn $name() -> &'static api::utils::pool::Pool<$ty> {
38 static POOL: once_cell::race::OnceBox<api::utils::pool::Pool<$ty>> = once_cell::race::OnceBox::new();
39 POOL.get_or_init(|| Box::new(api::utils::pool::Pool::new($max_capacity, $max_elt_size)))
40 }
41 }
42}
43
44pub trait Poolable {
45 fn empty() -> Self;
46 fn reset(&mut self);
47 fn capacity(&self) -> usize;
48 fn really_dropped(&self) -> bool {
50 true
51 }
52}
53
54macro_rules! impl_hashmap {
55 ($ty:ident) => {
56 impl<K, V, R> Poolable for $ty<K, V, R>
57 where
58 K: Hash + Eq,
59 R: Default + BuildHasher,
60 {
61 fn empty() -> Self {
62 $ty::default()
63 }
64
65 fn reset(&mut self) {
66 self.clear()
67 }
68
69 fn capacity(&self) -> usize {
70 $ty::capacity(self)
71 }
72 }
73 };
74}
75
76impl_hashmap!(HashMap);
77
78macro_rules! impl_hashset {
79 ($ty:ident) => {
80 impl<K, R> Poolable for $ty<K, R>
81 where
82 K: Hash + Eq,
83 R: Default + BuildHasher,
84 {
85 fn empty() -> Self {
86 $ty::default()
87 }
88
89 fn reset(&mut self) {
90 self.clear()
91 }
92
93 fn capacity(&self) -> usize {
94 $ty::capacity(self)
95 }
96 }
97 };
98}
99
100impl_hashset!(HashSet);
101
102impl<T> Poolable for Vec<T> {
103 fn empty() -> Self {
104 Vec::new()
105 }
106
107 fn reset(&mut self) {
108 self.clear()
109 }
110
111 fn capacity(&self) -> usize {
112 Vec::capacity(self)
113 }
114}
115
116impl<T> Poolable for VecDeque<T> {
117 fn empty() -> Self {
118 VecDeque::new()
119 }
120
121 fn reset(&mut self) {
122 self.clear()
123 }
124
125 fn capacity(&self) -> usize {
126 VecDeque::capacity(self)
127 }
128}
129
130impl Poolable for String {
131 fn empty() -> Self {
132 String::new()
133 }
134
135 fn reset(&mut self) {
136 self.clear()
137 }
138
139 fn capacity(&self) -> usize {
140 self.capacity()
141 }
142}
143
144impl<T: Poolable> Poolable for TArc<T> {
145 fn empty() -> Self {
146 TArc::new(T::empty())
147 }
148
149 fn reset(&mut self) {
150 if let Some(inner) = TArc::get_mut(self) {
151 inner.reset()
152 }
153 }
154
155 fn capacity(&self) -> usize {
156 1
157 }
158
159 fn really_dropped(&self) -> bool {
160 TArc::is_unique(self)
161 }
162}
163
164trait Prune {
165 fn prune(&self);
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
169struct Pid(u32);
170
171impl Pid {
172 fn new() -> Self {
173 use std::sync::atomic::{AtomicU32, Ordering};
174 static PID: AtomicU32 = AtomicU32::new(0);
175 Self(PID.fetch_add(1, Ordering::Relaxed))
176 }
177}
178
179struct GlobalState {
180 pools: FxHashMap<Pid, Box<dyn Prune + Send + 'static>>,
181 pool_shark_running: bool,
182}
183
184static POOLS: Lazy<Mutex<GlobalState>> = Lazy::new(|| {
185 Mutex::new(GlobalState { pools: HashMap::default(), pool_shark_running: false })
186});
187
188#[derive(Debug)]
189struct PoolInner<T: Poolable + Send + 'static> {
190 pool: ArrayQueue<T>,
191 max_elt_capacity: usize,
192 id: Pid,
193}
194
195fn pool_shark() {
196 use std::{
197 thread::{sleep, spawn},
198 time::Duration,
199 };
200 spawn(|| loop {
201 sleep(Duration::from_secs(300));
202 {
203 for p in POOLS.lock().pools.values() {
204 p.prune()
205 }
206 }
207 });
208}
209
210#[derive(Clone, Debug)]
220pub struct Pool<T: Poolable + Send + 'static>(Arc<PoolInner<T>>);
221
222impl<T: Poolable + Send + 'static> Drop for Pool<T> {
223 fn drop(&mut self) {
224 if Arc::strong_count(&self.0) <= 2 {
226 let res = POOLS.lock().pools.remove(&self.0.id);
227 drop(res)
228 }
229 }
230}
231
232impl<T: Poolable + Send + 'static> Prune for Pool<T> {
233 fn prune(&self) {
234 let len = self.0.pool.len();
235 let ten_percent = std::cmp::max(1, self.0.pool.capacity() / 10);
236 let one_percent = std::cmp::max(1, ten_percent / 10);
237 if len > ten_percent {
238 for _ in 0..ten_percent {
239 self.0.pool.pop();
240 }
241 } else if len > one_percent {
242 for _ in 0..one_percent {
243 self.0.pool.pop();
244 }
245 } else if len > 0 {
246 self.0.pool.pop();
247 }
248 }
249}
250
251impl<T: Poolable + Send + 'static> Pool<T> {
252 pub fn new(max_capacity: usize, max_elt_capacity: usize) -> Pool<T> {
257 let id = Pid::new();
258 let t = Pool(Arc::new(PoolInner {
259 pool: ArrayQueue::new(max_capacity),
260 max_elt_capacity,
261 id,
262 }));
263 let mut gs = POOLS.lock();
264 gs.pools.insert(id, Box::new(Pool(Arc::clone(&t.0))));
265 if !gs.pool_shark_running {
266 gs.pool_shark_running = true;
267 pool_shark()
268 }
269 t
270 }
271
272 pub fn take(&self) -> Pooled<T> {
274 let object = self.0.pool.pop().unwrap_or_else(Poolable::empty);
275 Pooled { pool: Arc::downgrade(&self.0), object: Some(object) }
276 }
277}
278
279#[derive(Debug, Clone)]
281pub struct Pooled<T: Poolable + Send + 'static> {
282 pool: Weak<PoolInner<T>>,
283 object: Option<T>,
286}
287
288impl<T: Poolable + Send + 'static> Pooled<T> {
289 #[inline(always)]
290 fn get(&self) -> &T {
291 match &self.object {
292 Some(ref t) => t,
293 None => unreachable!(),
294 }
295 }
296
297 #[inline(always)]
298 fn get_mut(&mut self) -> &mut T {
299 match &mut self.object {
300 Some(ref mut t) => t,
301 None => unreachable!(),
302 }
303 }
304}
305
306impl<T: Poolable + Sync + Send + 'static> Borrow<T> for Pooled<T> {
307 fn borrow(&self) -> &T {
308 self.get()
309 }
310}
311
312impl Borrow<str> for Pooled<String> {
313 fn borrow(&self) -> &str {
314 self.get().borrow()
315 }
316}
317
318impl<T: Poolable + Send + 'static + PartialEq> PartialEq for Pooled<T> {
319 fn eq(&self, other: &Pooled<T>) -> bool {
320 self.get().eq(other.get())
321 }
322}
323
324impl<T: Poolable + Send + 'static + Eq> Eq for Pooled<T> {}
325
326impl<T: Poolable + Send + 'static + PartialOrd> PartialOrd for Pooled<T> {
327 fn partial_cmp(&self, other: &Pooled<T>) -> Option<Ordering> {
328 self.get().partial_cmp(other.get())
329 }
330}
331
332impl<T: Poolable + Send + 'static + Ord> Ord for Pooled<T> {
333 fn cmp(&self, other: &Pooled<T>) -> Ordering {
334 self.get().cmp(other.get())
335 }
336}
337
338impl<T: Poolable + Send + 'static + Hash> Hash for Pooled<T> {
339 fn hash<H>(&self, state: &mut H)
340 where
341 H: Hasher,
342 {
343 Hash::hash(self.get(), state)
344 }
345}
346
347impl<T: Poolable + Send + 'static> Pooled<T> {
348 pub fn orphan(t: T) -> Self {
351 Pooled { pool: Weak::new(), object: Some(t) }
352 }
353
354 pub fn detach(mut self) -> T {
355 self.object.take().unwrap()
356 }
357}
358
359impl<T: Poolable + Send + 'static> AsRef<T> for Pooled<T> {
360 fn as_ref(&self) -> &T {
361 self.get()
362 }
363}
364
365impl<T: Poolable + Send + 'static> Deref for Pooled<T> {
366 type Target = T;
367
368 fn deref(&self) -> &T {
369 self.get()
370 }
371}
372
373impl<T: Poolable + Send + 'static> DerefMut for Pooled<T> {
374 fn deref_mut(&mut self) -> &mut T {
375 self.get_mut()
376 }
377}
378
379impl<T: Poolable + Send + 'static> Drop for Pooled<T> {
380 fn drop(&mut self) {
381 if self.get().really_dropped() {
382 if let Some(inner) = self.pool.upgrade() {
383 let cap = self.get().capacity();
384 if cap > 0 && cap <= inner.max_elt_capacity {
385 let mut object = self.object.take().unwrap();
386 object.reset();
387 inner.pool.push(object).ok();
388 }
389 }
390 }
391 }
392}
393
394impl<T: Poolable + Send + 'static + Serialize> Serialize for Pooled<T> {
395 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
396 where
397 S: serde::Serializer,
398 {
399 self.get().serialize(serializer)
400 }
401}
402
403impl<'de, T: Poolable + Send + 'static + DeserializeOwned> Deserialize<'de>
404 for Pooled<T>
405{
406 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
407 where
408 D: serde::Deserializer<'de>,
409 {
410 let mut t = take_t::<T>(1000, 1000);
411 Self::deserialize_in_place(deserializer, &mut t)?;
412 Ok(t)
413 }
414
415 fn deserialize_in_place<D>(deserializer: D, place: &mut Self) -> Result<(), D::Error>
416 where
417 D: serde::Deserializer<'de>,
418 {
419 <T as Deserialize>::deserialize_in_place(deserializer, place.get_mut())
420 }
421}
422thread_local! {
423 static POOLS_S: RefCell<FxHashMap<TypeId, Box<dyn Any>>> =
424 RefCell::new(HashMap::default());
425}
426
427pub fn take_t<T: Any + Poolable + Send + 'static>(size: usize, max: usize) -> Pooled<T> {
432 POOLS_S.with(|pools| {
433 let mut pools = pools.borrow_mut();
434 let pool: &mut Pool<T> = pools
435 .entry(TypeId::of::<T>())
436 .or_insert_with(|| Box::new(Pool::<T>::new(size, max)))
437 .downcast_mut()
438 .unwrap();
439 pool.take()
440 })
441}
442
443impl<T: Poolable + Send + 'static + JsonSchema> JsonSchema for Pooled<T> {
444 fn schema_name() -> String {
445 "Pooled".to_owned()
447 }
448
449 fn json_schema(
450 _gen: &mut schemars::gen::SchemaGenerator,
451 ) -> schemars::schema::Schema {
452 schemars::schema::SchemaObject {
454 instance_type: Some(InstanceType::Array.into()),
455 ..Default::default()
456 }
457 .into()
458 }
459
460 fn is_referenceable() -> bool {
461 true
462 }
463}