syncpool/
pool.rs

1use crate::bucket::*;
2use crate::utils::{cpu_relax, make_elem};
3use std::ops::Add;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::thread;
6use std::time::{Duration, Instant};
7
8const POOL_SIZE: usize = 8;
9const EXPANSION_CAP: usize = 512;
10const SPIN_PERIOD: usize = 4;
11
12/// Configuration flag (@ bit positions):
13/// 1 -> If the pool is allowed to expand when under pressure
14const CONFIG_ALLOW_EXPANSION: usize = 1;
15
16pub(crate) enum ElemBuilder<T> {
17    Default(fn() -> Box<T>),
18    Builder(fn() -> T),
19    Packer(fn(Box<T>) -> Box<T>),
20}
21
22struct VisitorGuard<'a>(&'a AtomicUsize);
23
24impl<'a> VisitorGuard<'a> {
25    fn register(base: &'a (AtomicUsize, AtomicBool), get: bool) -> Option<Self> {
26        let mut count = 8;
27
28        // wait if the underlying storage is in protection mode
29        while base.1.load(Ordering::Relaxed) {
30            if get {
31                return None;
32            }
33
34            cpu_relax(count);
35
36            if count > 4 {
37                count -= 1;
38            }
39        }
40
41        base.0.fetch_add(1, Ordering::AcqRel);
42
43        Some(VisitorGuard(&base.0))
44    }
45}
46
47impl<'a> Drop for VisitorGuard<'a> {
48    fn drop(&mut self) {
49        self.0.fetch_sub(1, Ordering::AcqRel);
50    }
51}
52
53pub struct SyncPool<T> {
54    /// The slots storage
55    slots: Vec<Bucket2<T>>,
56
57    /// the next bucket to try
58    curr: (AtomicUsize, AtomicUsize),
59
60    /// First node: how many threads are concurrently accessing the struct:
61    ///   0   -> updating the `slots` field;
62    ///   1   -> no one is using the pool;
63    ///   num -> number of visitors
64    /// Second node: write barrier:
65    ///   true  -> write barrier raised
66    ///   false -> no write barrier
67    visitor_counter: (AtomicUsize, AtomicBool),
68
69    /// the number of times we failed to find an in-store struct to offer
70    miss_count: AtomicUsize,
71
72    /// if we allow expansion of the pool
73    configure: AtomicUsize,
74
75    /// the handle to be invoked before putting the struct back
76    reset_handle: Option<fn(&mut T)>,
77
78    /// The builder that will be tasked to create a new instance of the data when the pool is unable
79    /// to render one.
80    builder: ElemBuilder<T>,
81}
82
83impl<T: Default> SyncPool<T> {
84    /// Create a pool with default size of 64 pre-allocated elements in it.
85    pub fn new() -> Self {
86        Self::make_pool(POOL_SIZE, ElemBuilder::Default(Default::default))
87    }
88
89    /// Create a `SyncPool` with pre-defined number of elements. Note that we will round-up
90    /// the size such that the total number of elements in the pool will mod to 8.
91    pub fn with_size(size: usize) -> Self {
92        let mut pool_size = size / SLOT_CAP;
93        if pool_size < 1 {
94            pool_size = 1
95        }
96
97        Self::make_pool(pool_size, ElemBuilder::Default(Default::default))
98    }
99}
100
101impl<T> SyncPool<T> {
102    /// Create a pool with default size of 64 pre-allocated elements in it, which will use the `builder`
103    /// handler to obtain the initialized instance of the struct, and then place the object into the
104    /// `syncpool` for later use.
105    ///
106    /// Note that the handler shall be responsible for creating and initializing the struct object
107    /// with all fields being valid. After all, they will be the same objects provided to the caller
108    /// when invoking the `get` call.
109    ///
110    /// # Examples
111    ///
112    /// ```rust
113    /// use syncpool::*;
114    /// use std::vec;
115    ///
116    /// struct BigStruct {
117    ///     a: u32,
118    ///     b: u32,
119    ///     c: Vec<u8>,
120    /// }
121    ///
122    /// let mut pool = SyncPool::with_builder(|| {
123    ///     BigStruct {
124    ///         a: 1,
125    ///         b: 42,
126    ///         c: vec::from_elem(0u8, 0x1_000_000),
127    ///     }
128    /// });
129    ///
130    /// let big_box: Box<BigStruct> = pool.get();
131    ///
132    /// assert_eq!(big_box.a, 1);
133    /// assert_eq!(big_box.b, 42);
134    /// assert_eq!(big_box.c.len(), 0x1_000_000);
135    ///
136    /// pool.put(big_box);
137    /// ```
138    pub fn with_builder(builder: fn() -> T) -> Self {
139        Self::make_pool(POOL_SIZE, ElemBuilder::Builder(builder))
140    }
141
142    /// Create a `SyncPool` with pre-defined number of elements and a packer handler. The `builder`
143    /// handler shall essentially function the same way as in the `with_builder`, that it shall take
144    /// the responsibility to create and initialize the element, and return the instance at the end
145    /// of the `builder` closure. Note that we will round-up the size such that the total number of
146    /// elements in the pool will mod to 8.
147    pub fn with_builder_and_size(size: usize, builder: fn() -> T) -> Self {
148        let mut pool_size = size / SLOT_CAP;
149        if pool_size < 1 {
150            pool_size = 1
151        }
152
153        Self::make_pool(pool_size, ElemBuilder::Builder(builder))
154    }
155
156    /// Create a pool with default size of 64 pre-allocated elements in it, which will use the `packer`
157    /// handler to initialize the element that's being provided by the pool.
158    ///
159    /// Note that the handler shall take a boxed instance of the element that only contains
160    /// placeholder fields, and it is the caller/handler's job to initialize the fields and pack it
161    /// with valid and meaningful values. If the struct is valid with all-zero values, the handler
162    /// can just return the input element.
163    ///
164    /// # Examples
165    ///
166    /// ```rust
167    /// use syncpool::*;
168    /// use std::vec;
169    ///
170    /// struct BigStruct {
171    ///     a: u32,
172    ///     b: u32,
173    ///     c: Vec<u8>,
174    /// }
175    ///
176    /// let mut pool = SyncPool::with_packer(|mut src: Box<BigStruct>| {
177    ///     src.a = 1;
178    ///     src.b = 42;
179    ///     src.c = vec::from_elem(0u8, 0x1_000_000);
180    ///     src
181    /// });
182    ///
183    /// let big_box: Box<BigStruct> = pool.get();
184    ///
185    /// assert_eq!(big_box.a, 1);
186    /// assert_eq!(big_box.b, 42);
187    /// assert_eq!(big_box.c.len(), 0x1_000_000);
188    ///
189    /// pool.put(big_box);
190    /// ```
191    pub fn with_packer(packer: fn(Box<T>) -> Box<T>) -> Self {
192        Self::make_pool(POOL_SIZE, ElemBuilder::Packer(packer))
193    }
194
195    /// Create a `SyncPool` with pre-defined number of elements and a packer handler. The `packer`
196    /// handler shall essentially function the same way as in `with_packer`, that it shall take the
197    /// responsibility to initialize all the fields of a placeholder struct on the heap, otherwise
198    /// the element returned by the pool will be essentially undefined, unless all the struct's
199    /// fields can be represented by a 0 value. In addition, we will round-up the size such that
200    /// the total number of elements in the pool will mod to 8.
201    pub fn with_packer_and_size(size: usize, packer: fn(Box<T>) -> Box<T>) -> Self {
202        let mut pool_size = size / SLOT_CAP;
203        if pool_size < 1 {
204            pool_size = 1
205        }
206
207        Self::make_pool(pool_size, ElemBuilder::Packer(packer))
208    }
209
210    /// Try to obtain a pre-allocated element from the pool. This method will always succeed even if
211    /// the pool is empty or not available for anyone to access, and in this case, a new boxed-element
212    /// will be created.
213    pub fn get(&mut self) -> Box<T> {
214        // update user count
215        let guard = VisitorGuard::register(&self.visitor_counter, true);
216        if guard.is_none() {
217            return make_elem(&self.builder);
218        }
219
220        // start from where we're left
221        let cap = self.slots.len();
222        let mut trials = cap;
223        let mut pos: usize = self.curr.0.load(Ordering::Acquire) % cap;
224
225        loop {
226            // check this slot
227            let slot = &mut self.slots[pos];
228
229            // try the access or move on
230            if let Ok(i) = slot.access(true) {
231                // try to checkout one slot
232                let checkout = slot.checkout(i);
233                slot.leave(i as u16);
234
235                /*            if slot.access(true) {
236                // try to checkout one slot
237                let checkout = slot.checkout();
238                slot.leave();*/
239
240                if let Ok(val) = checkout {
241                    // now we're locked, get the val and update internal states
242                    self.curr.0.store(pos, Ordering::Release);
243
244                    // done
245                    return val;
246                }
247
248                // failed to checkout, break and let the remainder logic to handle the rest
249                break;
250            }
251
252            // hold off a bit to reduce contentions
253            cpu_relax(SPIN_PERIOD);
254
255            // update to the next position now.
256            pos = self.curr.0.fetch_add(1, Ordering::AcqRel) % cap;
257            trials -= 1;
258
259            // we've finished 1 loop but not finding a value to extract, quit
260            if trials == 0 {
261                break;
262            }
263        }
264
265        // make sure our guard has been returned if we want the correct visitor count
266        drop(guard);
267        self.miss_count.fetch_add(1, Ordering::Relaxed);
268
269        // create a new object
270        make_elem(&self.builder)
271    }
272
273    /// Try to return an element to the `SyncPool`. If succeed, we will return `None` to indicate that
274    /// the value has been placed in an empty slot; otherwise, we will return `Option<Box<T>>` such
275    /// that the caller can decide if the element shall be just discarded, or try put it back again.
276    pub fn put(&mut self, val: Box<T>) -> Option<Box<T>> {
277        // update user count
278        let _guard = VisitorGuard::register(&self.visitor_counter, false);
279
280        // start from where we're left
281        let cap = self.slots.len();
282        let mut trials = 2 * cap;
283        let mut pos: usize = self.curr.1.load(Ordering::Acquire) % cap;
284
285        loop {
286            // check this slot
287            let slot = &mut self.slots[pos];
288
289            // try the access or move on
290            if let Ok(i) = slot.access(false) {
291                // now we're locked, get the val and update internal states
292                self.curr.1.store(pos, Ordering::Release);
293
294                // put the value back and reset
295                slot.release(i, val, self.reset_handle);
296                slot.leave(i as u16);
297
298                return None;
299            }
300
301            /*            if slot.access(false) {
302                // now we're locked, get the val and update internal states
303                self.curr.1.store(pos, Ordering::Release);
304
305                // put the value back into the slot
306                slot.release(val, self.reset_handle.load(Ordering::Acquire));
307                slot.leave();
308
309                return true;
310            }*/
311
312            // hold off a bit to reduce contentions
313            if trials < cap {
314                cpu_relax(SPIN_PERIOD);
315            } else {
316                thread::yield_now();
317            }
318
319            // update states
320            pos = self.curr.1.fetch_add(1, Ordering::AcqRel) % cap;
321            trials -= 1;
322
323            // we've finished 1 loop but not finding a value to extract, quit
324            if trials == 0 {
325                return Some(val);
326            }
327        }
328    }
329
330    fn make_pool(size: usize, builder: ElemBuilder<T>) -> Self {
331        let mut pool = SyncPool {
332            slots: Vec::with_capacity(size),
333            curr: (AtomicUsize::new(0), AtomicUsize::new(0)),
334            visitor_counter: (AtomicUsize::new(1), AtomicBool::new(false)),
335            miss_count: AtomicUsize::new(0),
336            configure: AtomicUsize::new(0),
337            reset_handle: None,
338            builder,
339        };
340
341        pool.add_slots(size, true);
342        pool
343    }
344
345    #[inline]
346    fn add_slots(&mut self, count: usize, fill: bool) {
347        let filler = if fill { Some(&self.builder) } else { None };
348
349        for _ in 0..count {
350            // self.slots.push(Bucket::new(fill));
351            self.slots.push(Bucket2::new(filler));
352        }
353    }
354
355    fn update_config(&mut self, mask: usize, target: bool) {
356        let mut config = self.configure.load(Ordering::SeqCst);
357
358        while let Err(old) = self.configure.compare_exchange(
359            config,
360            config ^ mask,
361            Ordering::SeqCst,
362            Ordering::Relaxed,
363        ) {
364            if !((old & mask > 0) ^ target) {
365                // the configure already matches, we're done
366                return;
367            }
368
369            config = old;
370        }
371    }
372}
373
374impl<T> Default for SyncPool<T>
375where
376    T: Default,
377{
378    fn default() -> Self {
379        SyncPool::new()
380    }
381}
382
383impl<T> Drop for SyncPool<T> {
384    fn drop(&mut self) {
385        self.slots.clear();
386
387        // now drop the reset handle if it's not null
388        self.reset_handle.take();
389    }
390}
391
392pub trait PoolState {
393    fn expansion_enabled(&self) -> bool;
394
395    fn miss_count(&self) -> usize;
396
397    fn capacity(&self) -> usize;
398
399    fn len(&self) -> usize;
400
401    fn is_empty(&self) -> bool {
402        self.len() == 0
403    }
404}
405
406impl<T> PoolState for SyncPool<T> {
407    fn expansion_enabled(&self) -> bool {
408        let configure = self.configure.load(Ordering::SeqCst);
409        configure & CONFIG_ALLOW_EXPANSION > 0
410    }
411
412    fn miss_count(&self) -> usize {
413        self.miss_count.load(Ordering::Acquire)
414    }
415
416    fn capacity(&self) -> usize {
417        self.slots.len() * SLOT_CAP
418    }
419
420    fn len(&self) -> usize {
421        self.slots
422            .iter()
423            .fold(0, |sum, item| sum + item.size_hint())
424    }
425}
426
427pub trait PoolManager<T> {
428    fn reset_handle(&mut self, handle: fn(&mut T)) -> &mut Self;
429    fn allow_expansion(&mut self, allow: bool) -> &mut Self;
430    fn expand(&mut self, additional: usize, block: bool) -> bool;
431    fn refill(&mut self, count: usize) -> usize;
432}
433
434/// The pool manager that provide many useful utilities to keep the SyncPool close to the needs of
435/// the caller program.
436impl<T> PoolManager<T> for SyncPool<T> {
437    /// Set or update the reset handle. If set, the reset handle will be invoked every time an element
438    /// has been returned back to the pool (i.e. calling the `put` method), regardless of if the element
439    /// is created by the pool or not.
440    fn reset_handle(&mut self, handle: fn(&mut T)) -> &mut Self {
441        // busy waiting ... for the first chance a barrier owned by someone else is lowered
442        let mut count: usize = 8;
443        let timeout = Instant::now().add(Duration::from_millis(16));
444
445        loop {
446            match self.visitor_counter.1.compare_exchange(
447                false,
448                true,
449                Ordering::SeqCst,
450                Ordering::Relaxed,
451            ) {
452                Ok(_) => break,
453                Err(_) => {
454                    cpu_relax(count);
455
456                    // update the counter (and the busy wait period)
457                    count -= 1;
458
459                    if count < 4 {
460                        // yield the thread for later try
461                        thread::yield_now();
462                    } else if Instant::now() > timeout {
463                        // don't block for more than 16ms
464                        return self;
465                    }
466                }
467            }
468        }
469
470        self.reset_handle.replace(handle);
471
472        self.visitor_counter.1.store(false, Ordering::SeqCst);
473        self
474    }
475
476    /// Set or update the settings that if we will allow the `SyncPool` to be expanded.
477    fn allow_expansion(&mut self, allow: bool) -> &mut Self {
478        if !(self.expansion_enabled() ^ allow) {
479            // not flipping the configuration, return
480            return self;
481        }
482
483        self.update_config(CONFIG_ALLOW_EXPANSION, allow);
484        self
485    }
486
487    /// Try to expand the `SyncPool` and add more elements to it. Usually invoke this API only when
488    /// the caller is certain that the pool is under pressure, and that a short block to the access
489    /// of the pool won't cause serious issues, since the function will block the current caller's
490    /// thread until it's finished (i.e. get the opportunity to raise the writer's barrier and wait
491    /// everyone to leave).
492    ///
493    /// If we're unable to expand the pool, it's due to one of the following reasons: 1) someone has
494    /// already raised the writer's barrier and is likely modifying the pool, we will leave immediately,
495    /// and it's up to the caller if they want to try again; 2) we've waited too long but still couldn't
496    /// obtain an exclusive access to the pool, and similar to reason 1), we will quit now.
497    fn expand(&mut self, additional: usize, block: bool) -> bool {
498        // if the pool isn't allowed to expand, just return
499        if !self.expansion_enabled() {
500            return false;
501        }
502
503        // if exceeding the upper limit, quit
504        if self.slots.len() > EXPANSION_CAP {
505            return false;
506        }
507
508        // raise the write barrier now, if someone has already raised the flag to indicate the
509        // intention to write, let me go away.
510        if self
511            .visitor_counter
512            .1
513            .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Acquire)
514            .is_err()
515        {
516            return false;
517        }
518
519        // busy waiting ... for all visitors to leave
520        let mut count: usize = 8;
521        let safe = loop {
522            match self
523                .visitor_counter
524                .0
525                .compare_exchange(1, 0, Ordering::SeqCst, Ordering::Relaxed)
526            {
527                Ok(_) => break true,
528                Err(_) => {
529                    cpu_relax(2);
530                    count -= 1;
531
532                    if count < 4 {
533                        thread::yield_now();
534                    } else if !block {
535                        break false;
536                    }
537                }
538            }
539        };
540
541        if safe {
542            // update the slots by pushing `additional` slots
543            self.add_slots(additional, true);
544            self.miss_count.store(0, Ordering::Release);
545        }
546
547        // update the internal states
548        self.visitor_counter.0.store(1, Ordering::SeqCst);
549        self.visitor_counter.1.store(false, Ordering::Release);
550
551        safe
552    }
553
554    /// Due to contentious access to the pool, sometimes the `put` action could not finish and return
555    /// the element to the pool successfully. Overtime, this could cause the number of elements in the
556    /// pool to dwell. This would only happen slowly if we're running a very contentious multithreading
557    /// program, but it surely could happen. If the caller detects such situation, they can invoke the
558    /// `refill` API and try to refill the pool with elements.
559    ///
560    /// We will try to refill as many elements as requested
561    fn refill(&mut self, additional: usize) -> usize {
562        let cap = self.capacity();
563        let empty_slots = cap - self.len();
564
565        if empty_slots == 0 {
566            return 0;
567        }
568
569        let quota = if additional > empty_slots {
570            empty_slots
571        } else {
572            additional
573        };
574
575        let mut count = 0;
576        let timeout = Instant::now().add(Duration::from_millis(16));
577
578        // try to put `quota` number of elements into the pool
579        while count < quota {
580            let mut val = make_elem(&self.builder);
581            let mut runs = 0;
582
583            // retry to put the allocated element into the pool.
584            while let Some(ret) = self.put(val) {
585                val = ret;
586                runs += 1;
587
588                // timeout
589                if Instant::now() > timeout {
590                    return count;
591                }
592
593                // check the pool length for every 4 failed attempts to put the element into the pool.
594                if runs % 4 == 0 && self.len() == cap {
595                    return count;
596                }
597
598                // relax a bit
599                if runs > 8 {
600                    thread::yield_now();
601                } else {
602                    cpu_relax(runs / 2);
603                }
604            }
605
606            count += 1;
607        }
608
609        count
610    }
611}
612
613#[cfg(test)]
614mod pool_tests {
615    use super::*;
616    use std::vec;
617
618    struct BigStruct {
619        a: u32,
620        b: u32,
621        c: Vec<u8>,
622    }
623
624    impl BigStruct {
625        fn new() -> Self {
626            BigStruct {
627                a: 1,
628                b: 42,
629                c: vec::from_elem(0u8, 0x1_000_000),
630            }
631        }
632
633        fn initializer(mut self: Box<Self>) -> Box<Self> {
634            self.a = 1;
635            self.b = 42;
636            self.c = vec::from_elem(0u8, 0x1_000_000);
637
638            self
639        }
640    }
641
642    #[test]
643    fn use_packer() {
644        let mut pool = SyncPool::with_packer(BigStruct::initializer);
645
646        let big_box = pool.get();
647
648        assert_eq!(big_box.a, 1);
649        assert_eq!(big_box.b, 42);
650        assert_eq!(big_box.c.len(), 0x1_000_000);
651    }
652
653    #[test]
654    fn use_builder() {
655        let mut pool = SyncPool::with_builder(BigStruct::new);
656
657        let big_box = pool.get();
658
659        assert_eq!(big_box.a, 1);
660        assert_eq!(big_box.b, 42);
661        assert_eq!(big_box.c.len(), 0x1_000_000);
662    }
663}