1#[cfg(feature = "netidx")]
2use bytes::{Buf, BufMut};
3use crossbeam::queue::ArrayQueue;
4use fxhash::FxHashMap;
5use indexmap::{IndexMap, IndexSet};
6#[cfg(feature = "netidx")]
7use netidx::pack::{Pack, PackError};
8use once_cell::sync::Lazy;
9use parking_lot::Mutex;
10use schemars::{schema::InstanceType, JsonSchema};
11use serde::{de::DeserializeOwned, Deserialize, Serialize};
12use std::{
13 any::{Any, TypeId},
14 borrow::Borrow,
15 cell::RefCell,
16 cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd},
17 collections::{HashMap, HashSet, VecDeque},
18 default::Default,
19 fmt::Debug,
20 hash::{BuildHasher, Hash, Hasher},
21 ops::{Deref, DerefMut},
22 sync::{Arc, Weak},
23};
24use triomphe::Arc as TArc;
25
26#[macro_export]
34macro_rules! pool {
35 ($vis:vis, $name:ident, $ty:ty, $max_capacity:expr, $max_elt_size:expr) => {
36 $vis fn $name() -> &'static api::utils::pool::Pool<$ty> {
37 static POOL: once_cell::race::OnceBox<api::utils::pool::Pool<$ty>> = once_cell::race::OnceBox::new();
38 POOL.get_or_init(|| Box::new(api::utils::pool::Pool::new($max_capacity, $max_elt_size)))
39 }
40 };
41 ($name:ident, $ty:ty, $max_capacity:expr, $max_elt_size:expr) => {
42 fn $name() -> &'static api::utils::pool::Pool<$ty> {
43 static POOL: once_cell::race::OnceBox<api::utils::pool::Pool<$ty>> = once_cell::race::OnceBox::new();
44 POOL.get_or_init(|| Box::new(api::utils::pool::Pool::new($max_capacity, $max_elt_size)))
45 }
46 }
47}
48
49pub trait Poolable {
50 fn empty() -> Self;
51 fn reset(&mut self);
52 fn capacity(&self) -> usize;
53 fn really_dropped(&self) -> bool {
55 true
56 }
57}
58
59macro_rules! impl_hashmap {
60 ($ty:ident) => {
61 impl<K, V, R> Poolable for $ty<K, V, R>
62 where
63 K: Hash + Eq,
64 R: Default + BuildHasher,
65 {
66 fn empty() -> Self {
67 $ty::default()
68 }
69
70 fn reset(&mut self) {
71 self.clear()
72 }
73
74 fn capacity(&self) -> usize {
75 $ty::capacity(self)
76 }
77 }
78 };
79}
80
81impl_hashmap!(HashMap);
82impl_hashmap!(IndexMap);
83
84macro_rules! impl_hashset {
85 ($ty:ident) => {
86 impl<K, R> Poolable for $ty<K, R>
87 where
88 K: Hash + Eq,
89 R: Default + BuildHasher,
90 {
91 fn empty() -> Self {
92 $ty::default()
93 }
94
95 fn reset(&mut self) {
96 self.clear()
97 }
98
99 fn capacity(&self) -> usize {
100 $ty::capacity(self)
101 }
102 }
103 };
104}
105
106impl_hashset!(HashSet);
107impl_hashset!(IndexSet);
108
109impl<T> Poolable for Vec<T> {
110 fn empty() -> Self {
111 Vec::new()
112 }
113
114 fn reset(&mut self) {
115 self.clear()
116 }
117
118 fn capacity(&self) -> usize {
119 Vec::capacity(self)
120 }
121}
122
123impl<T> Poolable for VecDeque<T> {
124 fn empty() -> Self {
125 VecDeque::new()
126 }
127
128 fn reset(&mut self) {
129 self.clear()
130 }
131
132 fn capacity(&self) -> usize {
133 VecDeque::capacity(self)
134 }
135}
136
137impl Poolable for String {
138 fn empty() -> Self {
139 String::new()
140 }
141
142 fn reset(&mut self) {
143 self.clear()
144 }
145
146 fn capacity(&self) -> usize {
147 self.capacity()
148 }
149}
150
151impl<T: Poolable> Poolable for TArc<T> {
152 fn empty() -> Self {
153 TArc::new(T::empty())
154 }
155
156 fn reset(&mut self) {
157 if let Some(inner) = TArc::get_mut(self) {
158 inner.reset()
159 }
160 }
161
162 fn capacity(&self) -> usize {
163 1
164 }
165
166 fn really_dropped(&self) -> bool {
167 TArc::is_unique(&self)
168 }
169}
170
171trait Prune {
172 fn prune(&self);
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
176struct Pid(u32);
177
178impl Pid {
179 fn new() -> Self {
180 use std::sync::atomic::{AtomicU32, Ordering};
181 static PID: AtomicU32 = AtomicU32::new(0);
182 Self(PID.fetch_add(1, Ordering::Relaxed))
183 }
184}
185
186struct GlobalState {
187 pools: FxHashMap<Pid, Box<dyn Prune + Send + 'static>>,
188 pool_shark_running: bool,
189}
190
191static POOLS: Lazy<Mutex<GlobalState>> = Lazy::new(|| {
192 Mutex::new(GlobalState { pools: HashMap::default(), pool_shark_running: false })
193});
194
195#[derive(Debug)]
196struct PoolInner<T: Poolable + Send + 'static> {
197 pool: ArrayQueue<T>,
198 max_elt_capacity: usize,
199 id: Pid,
200}
201
202fn pool_shark() {
203 use std::{
204 thread::{sleep, spawn},
205 time::Duration,
206 };
207 spawn(|| loop {
208 sleep(Duration::from_secs(300));
209 {
210 for p in POOLS.lock().pools.values() {
211 p.prune()
212 }
213 }
214 });
215}
216
217#[derive(Clone, Debug)]
227pub struct Pool<T: Poolable + Send + 'static>(Arc<PoolInner<T>>);
228
229impl<T: Poolable + Send + 'static> Drop for Pool<T> {
230 fn drop(&mut self) {
231 if Arc::strong_count(&self.0) <= 2 {
233 let res = POOLS.lock().pools.remove(&self.0.id);
234 drop(res)
235 }
236 }
237}
238
239impl<T: Poolable + Send + 'static> Prune for Pool<T> {
240 fn prune(&self) {
241 let len = self.0.pool.len();
242 let ten_percent = std::cmp::max(1, self.0.pool.capacity() / 10);
243 let one_percent = std::cmp::max(1, ten_percent / 10);
244 if len > ten_percent {
245 for _ in 0..ten_percent {
246 self.0.pool.pop();
247 }
248 } else if len > one_percent {
249 for _ in 0..one_percent {
250 self.0.pool.pop();
251 }
252 } else if len > 0 {
253 self.0.pool.pop();
254 }
255 }
256}
257
258impl<T: Poolable + Send + 'static> Pool<T> {
259 pub fn new(max_capacity: usize, max_elt_capacity: usize) -> Pool<T> {
264 let id = Pid::new();
265 let t = Pool(Arc::new(PoolInner {
266 pool: ArrayQueue::new(max_capacity),
267 max_elt_capacity,
268 id,
269 }));
270 let mut gs = POOLS.lock();
271 gs.pools.insert(id, Box::new(Pool(Arc::clone(&t.0))));
272 if !gs.pool_shark_running {
273 gs.pool_shark_running = true;
274 pool_shark()
275 }
276 t
277 }
278
279 pub fn take(&self) -> Pooled<T> {
281 let object = self.0.pool.pop().unwrap_or_else(Poolable::empty);
282 Pooled { pool: Arc::downgrade(&self.0), object: Some(object) }
283 }
284}
285
286#[derive(Debug, Clone)]
288pub struct Pooled<T: Poolable + Send + 'static> {
289 pool: Weak<PoolInner<T>>,
290 object: Option<T>,
293}
294
295impl<T: Poolable + Send + 'static> Pooled<T> {
296 #[inline(always)]
297 fn get(&self) -> &T {
298 match &self.object {
299 Some(ref t) => t,
300 None => unreachable!(),
301 }
302 }
303
304 #[inline(always)]
305 fn get_mut(&mut self) -> &mut T {
306 match &mut self.object {
307 Some(ref mut t) => t,
308 None => unreachable!(),
309 }
310 }
311}
312
313impl<T: Poolable + Sync + Send + 'static> Borrow<T> for Pooled<T> {
314 fn borrow(&self) -> &T {
315 self.get()
316 }
317}
318
319impl Borrow<str> for Pooled<String> {
320 fn borrow(&self) -> &str {
321 self.get().borrow()
322 }
323}
324
325impl<T: Poolable + Send + 'static + PartialEq> PartialEq for Pooled<T> {
326 fn eq(&self, other: &Pooled<T>) -> bool {
327 self.get().eq(other.get())
328 }
329}
330
331impl<T: Poolable + Send + 'static + Eq> Eq for Pooled<T> {}
332
333impl<T: Poolable + Send + 'static + PartialOrd> PartialOrd for Pooled<T> {
334 fn partial_cmp(&self, other: &Pooled<T>) -> Option<Ordering> {
335 self.get().partial_cmp(other.get())
336 }
337}
338
339impl<T: Poolable + Send + 'static + Ord> Ord for Pooled<T> {
340 fn cmp(&self, other: &Pooled<T>) -> Ordering {
341 self.get().cmp(other.get())
342 }
343}
344
345impl<T: Poolable + Send + 'static + Hash> Hash for Pooled<T> {
346 fn hash<H>(&self, state: &mut H)
347 where
348 H: Hasher,
349 {
350 Hash::hash(self.get(), state)
351 }
352}
353
354impl<T: Poolable + Send + 'static> Pooled<T> {
355 pub fn orphan(t: T) -> Self {
358 Pooled { pool: Weak::new(), object: Some(t) }
359 }
360
361 pub fn detach(mut self) -> T {
362 self.object.take().unwrap()
363 }
364}
365
366impl<T: Poolable + Send + 'static> AsRef<T> for Pooled<T> {
367 fn as_ref(&self) -> &T {
368 self.get()
369 }
370}
371
372impl<T: Poolable + Send + 'static> Deref for Pooled<T> {
373 type Target = T;
374
375 fn deref(&self) -> &T {
376 self.get()
377 }
378}
379
380impl<T: Poolable + Send + 'static> DerefMut for Pooled<T> {
381 fn deref_mut(&mut self) -> &mut T {
382 self.get_mut()
383 }
384}
385
386impl<T: Poolable + Send + 'static> Drop for Pooled<T> {
387 fn drop(&mut self) {
388 if self.get().really_dropped() {
389 if let Some(inner) = self.pool.upgrade() {
390 let cap = self.get().capacity();
391 if cap > 0 && cap <= inner.max_elt_capacity {
392 let mut object = self.object.take().unwrap();
393 object.reset();
394 inner.pool.push(object).ok();
395 }
396 }
397 }
398 }
399}
400
401impl<T: Poolable + Send + 'static + Serialize> Serialize for Pooled<T> {
402 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
403 where
404 S: serde::Serializer,
405 {
406 self.get().serialize(serializer)
407 }
408}
409
410impl<'de, T: Poolable + Send + 'static + DeserializeOwned> Deserialize<'de>
411 for Pooled<T>
412{
413 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
414 where
415 D: serde::Deserializer<'de>,
416 {
417 let mut t = take_t::<T>(1000, 1000);
418 Self::deserialize_in_place(deserializer, &mut t)?;
419 Ok(t)
420 }
421
422 fn deserialize_in_place<D>(deserializer: D, place: &mut Self) -> Result<(), D::Error>
423 where
424 D: serde::Deserializer<'de>,
425 {
426 <T as Deserialize>::deserialize_in_place(deserializer, place.get_mut())
427 }
428}
429thread_local! {
430 static POOLS_S: RefCell<FxHashMap<TypeId, Box<dyn Any>>> =
431 RefCell::new(HashMap::default());
432}
433
434pub fn take_t<T: Any + Poolable + Send + 'static>(size: usize, max: usize) -> Pooled<T> {
439 POOLS_S.with(|pools| {
440 let mut pools = pools.borrow_mut();
441 let pool: &mut Pool<T> = pools
442 .entry(TypeId::of::<T>())
443 .or_insert_with(|| Box::new(Pool::<T>::new(size, max)))
444 .downcast_mut()
445 .unwrap();
446 pool.take()
447 })
448}
449
450impl<T: Poolable + Send + 'static + JsonSchema> JsonSchema for Pooled<T> {
451 fn schema_name() -> String {
452 "Pooled".to_owned()
454 }
455
456 fn json_schema(
457 _gen: &mut schemars::gen::SchemaGenerator,
458 ) -> schemars::schema::Schema {
459 schemars::schema::SchemaObject {
461 instance_type: Some(InstanceType::Array.into()),
462 ..Default::default()
463 }
464 .into()
465 }
466
467 fn is_referenceable() -> bool {
468 true
469 }
470}
471
472#[cfg(feature = "netidx")]
473impl<T: Pack + Any + Send + Sync + Poolable> Pack for Pooled<T> {
474 fn encoded_len(&self) -> usize {
475 <T as Pack>::encoded_len(&**self)
476 }
477
478 fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
479 <T as Pack>::encode(&**self, buf)
480 }
481
482 fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
483 let mut t = take_t::<T>(1000, 1000);
484 <T as Pack>::decode_into(&mut *t, buf)?;
485 Ok(t)
486 }
487
488 fn decode_into(&mut self, buf: &mut impl Buf) -> Result<(), PackError> {
489 <T as Pack>::decode_into(&mut **self, buf)
490 }
491}