syncpool/pool.rs
1use crate::bucket::*;
2use crate::utils::{cpu_relax, make_elem};
3use std::ops::Add;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::thread;
6use std::time::{Duration, Instant};
7
8const POOL_SIZE: usize = 8;
9const EXPANSION_CAP: usize = 512;
10const SPIN_PERIOD: usize = 4;
11
12/// Configuration flag (@ bit positions):
13/// 1 -> If the pool is allowed to expand when under pressure
14const CONFIG_ALLOW_EXPANSION: usize = 1;
15
16pub(crate) enum ElemBuilder<T> {
17 Default(fn() -> Box<T>),
18 Builder(fn() -> T),
19 Packer(fn(Box<T>) -> Box<T>),
20}
21
22struct VisitorGuard<'a>(&'a AtomicUsize);
23
24impl<'a> VisitorGuard<'a> {
25 fn register(base: &'a (AtomicUsize, AtomicBool), get: bool) -> Option<Self> {
26 let mut count = 8;
27
28 // wait if the underlying storage is in protection mode
29 while base.1.load(Ordering::Relaxed) {
30 if get {
31 return None;
32 }
33
34 cpu_relax(count);
35
36 if count > 4 {
37 count -= 1;
38 }
39 }
40
41 base.0.fetch_add(1, Ordering::AcqRel);
42
43 Some(VisitorGuard(&base.0))
44 }
45}
46
47impl<'a> Drop for VisitorGuard<'a> {
48 fn drop(&mut self) {
49 self.0.fetch_sub(1, Ordering::AcqRel);
50 }
51}
52
53pub struct SyncPool<T> {
54 /// The slots storage
55 slots: Vec<Bucket2<T>>,
56
57 /// the next bucket to try
58 curr: (AtomicUsize, AtomicUsize),
59
60 /// First node: how many threads are concurrently accessing the struct:
61 /// 0 -> updating the `slots` field;
62 /// 1 -> no one is using the pool;
63 /// num -> number of visitors
64 /// Second node: write barrier:
65 /// true -> write barrier raised
66 /// false -> no write barrier
67 visitor_counter: (AtomicUsize, AtomicBool),
68
69 /// the number of times we failed to find an in-store struct to offer
70 miss_count: AtomicUsize,
71
72 /// if we allow expansion of the pool
73 configure: AtomicUsize,
74
75 /// the handle to be invoked before putting the struct back
76 reset_handle: Option<fn(&mut T)>,
77
78 /// The builder that will be tasked to create a new instance of the data when the pool is unable
79 /// to render one.
80 builder: ElemBuilder<T>,
81}
82
83impl<T: Default> SyncPool<T> {
84 /// Create a pool with default size of 64 pre-allocated elements in it.
85 pub fn new() -> Self {
86 Self::make_pool(POOL_SIZE, ElemBuilder::Default(Default::default))
87 }
88
89 /// Create a `SyncPool` with pre-defined number of elements. Note that we will round-up
90 /// the size such that the total number of elements in the pool will mod to 8.
91 pub fn with_size(size: usize) -> Self {
92 let mut pool_size = size / SLOT_CAP;
93 if pool_size < 1 {
94 pool_size = 1
95 }
96
97 Self::make_pool(pool_size, ElemBuilder::Default(Default::default))
98 }
99}
100
101impl<T> SyncPool<T> {
102 /// Create a pool with default size of 64 pre-allocated elements in it, which will use the `builder`
103 /// handler to obtain the initialized instance of the struct, and then place the object into the
104 /// `syncpool` for later use.
105 ///
106 /// Note that the handler shall be responsible for creating and initializing the struct object
107 /// with all fields being valid. After all, they will be the same objects provided to the caller
108 /// when invoking the `get` call.
109 ///
110 /// # Examples
111 ///
112 /// ```rust
113 /// use syncpool::*;
114 /// use std::vec;
115 ///
116 /// struct BigStruct {
117 /// a: u32,
118 /// b: u32,
119 /// c: Vec<u8>,
120 /// }
121 ///
122 /// let mut pool = SyncPool::with_builder(|| {
123 /// BigStruct {
124 /// a: 1,
125 /// b: 42,
126 /// c: vec::from_elem(0u8, 0x1_000_000),
127 /// }
128 /// });
129 ///
130 /// let big_box: Box<BigStruct> = pool.get();
131 ///
132 /// assert_eq!(big_box.a, 1);
133 /// assert_eq!(big_box.b, 42);
134 /// assert_eq!(big_box.c.len(), 0x1_000_000);
135 ///
136 /// pool.put(big_box);
137 /// ```
138 pub fn with_builder(builder: fn() -> T) -> Self {
139 Self::make_pool(POOL_SIZE, ElemBuilder::Builder(builder))
140 }
141
142 /// Create a `SyncPool` with pre-defined number of elements and a packer handler. The `builder`
143 /// handler shall essentially function the same way as in the `with_builder`, that it shall take
144 /// the responsibility to create and initialize the element, and return the instance at the end
145 /// of the `builder` closure. Note that we will round-up the size such that the total number of
146 /// elements in the pool will mod to 8.
147 pub fn with_builder_and_size(size: usize, builder: fn() -> T) -> Self {
148 let mut pool_size = size / SLOT_CAP;
149 if pool_size < 1 {
150 pool_size = 1
151 }
152
153 Self::make_pool(pool_size, ElemBuilder::Builder(builder))
154 }
155
156 /// Create a pool with default size of 64 pre-allocated elements in it, which will use the `packer`
157 /// handler to initialize the element that's being provided by the pool.
158 ///
159 /// Note that the handler shall take a boxed instance of the element that only contains
160 /// placeholder fields, and it is the caller/handler's job to initialize the fields and pack it
161 /// with valid and meaningful values. If the struct is valid with all-zero values, the handler
162 /// can just return the input element.
163 ///
164 /// # Examples
165 ///
166 /// ```rust
167 /// use syncpool::*;
168 /// use std::vec;
169 ///
170 /// struct BigStruct {
171 /// a: u32,
172 /// b: u32,
173 /// c: Vec<u8>,
174 /// }
175 ///
176 /// let mut pool = SyncPool::with_packer(|mut src: Box<BigStruct>| {
177 /// src.a = 1;
178 /// src.b = 42;
179 /// src.c = vec::from_elem(0u8, 0x1_000_000);
180 /// src
181 /// });
182 ///
183 /// let big_box: Box<BigStruct> = pool.get();
184 ///
185 /// assert_eq!(big_box.a, 1);
186 /// assert_eq!(big_box.b, 42);
187 /// assert_eq!(big_box.c.len(), 0x1_000_000);
188 ///
189 /// pool.put(big_box);
190 /// ```
191 pub fn with_packer(packer: fn(Box<T>) -> Box<T>) -> Self {
192 Self::make_pool(POOL_SIZE, ElemBuilder::Packer(packer))
193 }
194
195 /// Create a `SyncPool` with pre-defined number of elements and a packer handler. The `packer`
196 /// handler shall essentially function the same way as in `with_packer`, that it shall take the
197 /// responsibility to initialize all the fields of a placeholder struct on the heap, otherwise
198 /// the element returned by the pool will be essentially undefined, unless all the struct's
199 /// fields can be represented by a 0 value. In addition, we will round-up the size such that
200 /// the total number of elements in the pool will mod to 8.
201 pub fn with_packer_and_size(size: usize, packer: fn(Box<T>) -> Box<T>) -> Self {
202 let mut pool_size = size / SLOT_CAP;
203 if pool_size < 1 {
204 pool_size = 1
205 }
206
207 Self::make_pool(pool_size, ElemBuilder::Packer(packer))
208 }
209
210 /// Try to obtain a pre-allocated element from the pool. This method will always succeed even if
211 /// the pool is empty or not available for anyone to access, and in this case, a new boxed-element
212 /// will be created.
213 pub fn get(&mut self) -> Box<T> {
214 // update user count
215 let guard = VisitorGuard::register(&self.visitor_counter, true);
216 if guard.is_none() {
217 return make_elem(&self.builder);
218 }
219
220 // start from where we're left
221 let cap = self.slots.len();
222 let mut trials = cap;
223 let mut pos: usize = self.curr.0.load(Ordering::Acquire) % cap;
224
225 loop {
226 // check this slot
227 let slot = &mut self.slots[pos];
228
229 // try the access or move on
230 if let Ok(i) = slot.access(true) {
231 // try to checkout one slot
232 let checkout = slot.checkout(i);
233 slot.leave(i as u16);
234
235 /* if slot.access(true) {
236 // try to checkout one slot
237 let checkout = slot.checkout();
238 slot.leave();*/
239
240 if let Ok(val) = checkout {
241 // now we're locked, get the val and update internal states
242 self.curr.0.store(pos, Ordering::Release);
243
244 // done
245 return val;
246 }
247
248 // failed to checkout, break and let the remainder logic to handle the rest
249 break;
250 }
251
252 // hold off a bit to reduce contentions
253 cpu_relax(SPIN_PERIOD);
254
255 // update to the next position now.
256 pos = self.curr.0.fetch_add(1, Ordering::AcqRel) % cap;
257 trials -= 1;
258
259 // we've finished 1 loop but not finding a value to extract, quit
260 if trials == 0 {
261 break;
262 }
263 }
264
265 // make sure our guard has been returned if we want the correct visitor count
266 drop(guard);
267 self.miss_count.fetch_add(1, Ordering::Relaxed);
268
269 // create a new object
270 make_elem(&self.builder)
271 }
272
273 /// Try to return an element to the `SyncPool`. If succeed, we will return `None` to indicate that
274 /// the value has been placed in an empty slot; otherwise, we will return `Option<Box<T>>` such
275 /// that the caller can decide if the element shall be just discarded, or try put it back again.
276 pub fn put(&mut self, val: Box<T>) -> Option<Box<T>> {
277 // update user count
278 let _guard = VisitorGuard::register(&self.visitor_counter, false);
279
280 // start from where we're left
281 let cap = self.slots.len();
282 let mut trials = 2 * cap;
283 let mut pos: usize = self.curr.1.load(Ordering::Acquire) % cap;
284
285 loop {
286 // check this slot
287 let slot = &mut self.slots[pos];
288
289 // try the access or move on
290 if let Ok(i) = slot.access(false) {
291 // now we're locked, get the val and update internal states
292 self.curr.1.store(pos, Ordering::Release);
293
294 // put the value back and reset
295 slot.release(i, val, self.reset_handle);
296 slot.leave(i as u16);
297
298 return None;
299 }
300
301 /* if slot.access(false) {
302 // now we're locked, get the val and update internal states
303 self.curr.1.store(pos, Ordering::Release);
304
305 // put the value back into the slot
306 slot.release(val, self.reset_handle.load(Ordering::Acquire));
307 slot.leave();
308
309 return true;
310 }*/
311
312 // hold off a bit to reduce contentions
313 if trials < cap {
314 cpu_relax(SPIN_PERIOD);
315 } else {
316 thread::yield_now();
317 }
318
319 // update states
320 pos = self.curr.1.fetch_add(1, Ordering::AcqRel) % cap;
321 trials -= 1;
322
323 // we've finished 1 loop but not finding a value to extract, quit
324 if trials == 0 {
325 return Some(val);
326 }
327 }
328 }
329
330 fn make_pool(size: usize, builder: ElemBuilder<T>) -> Self {
331 let mut pool = SyncPool {
332 slots: Vec::with_capacity(size),
333 curr: (AtomicUsize::new(0), AtomicUsize::new(0)),
334 visitor_counter: (AtomicUsize::new(1), AtomicBool::new(false)),
335 miss_count: AtomicUsize::new(0),
336 configure: AtomicUsize::new(0),
337 reset_handle: None,
338 builder,
339 };
340
341 pool.add_slots(size, true);
342 pool
343 }
344
345 #[inline]
346 fn add_slots(&mut self, count: usize, fill: bool) {
347 let filler = if fill { Some(&self.builder) } else { None };
348
349 for _ in 0..count {
350 // self.slots.push(Bucket::new(fill));
351 self.slots.push(Bucket2::new(filler));
352 }
353 }
354
355 fn update_config(&mut self, mask: usize, target: bool) {
356 let mut config = self.configure.load(Ordering::SeqCst);
357
358 while let Err(old) = self.configure.compare_exchange(
359 config,
360 config ^ mask,
361 Ordering::SeqCst,
362 Ordering::Relaxed,
363 ) {
364 if !((old & mask > 0) ^ target) {
365 // the configure already matches, we're done
366 return;
367 }
368
369 config = old;
370 }
371 }
372}
373
374impl<T> Default for SyncPool<T>
375where
376 T: Default,
377{
378 fn default() -> Self {
379 SyncPool::new()
380 }
381}
382
383impl<T> Drop for SyncPool<T> {
384 fn drop(&mut self) {
385 self.slots.clear();
386
387 // now drop the reset handle if it's not null
388 self.reset_handle.take();
389 }
390}
391
392pub trait PoolState {
393 fn expansion_enabled(&self) -> bool;
394
395 fn miss_count(&self) -> usize;
396
397 fn capacity(&self) -> usize;
398
399 fn len(&self) -> usize;
400
401 fn is_empty(&self) -> bool {
402 self.len() == 0
403 }
404}
405
406impl<T> PoolState for SyncPool<T> {
407 fn expansion_enabled(&self) -> bool {
408 let configure = self.configure.load(Ordering::SeqCst);
409 configure & CONFIG_ALLOW_EXPANSION > 0
410 }
411
412 fn miss_count(&self) -> usize {
413 self.miss_count.load(Ordering::Acquire)
414 }
415
416 fn capacity(&self) -> usize {
417 self.slots.len() * SLOT_CAP
418 }
419
420 fn len(&self) -> usize {
421 self.slots
422 .iter()
423 .fold(0, |sum, item| sum + item.size_hint())
424 }
425}
426
427pub trait PoolManager<T> {
428 fn reset_handle(&mut self, handle: fn(&mut T)) -> &mut Self;
429 fn allow_expansion(&mut self, allow: bool) -> &mut Self;
430 fn expand(&mut self, additional: usize, block: bool) -> bool;
431 fn refill(&mut self, count: usize) -> usize;
432}
433
434/// The pool manager that provide many useful utilities to keep the SyncPool close to the needs of
435/// the caller program.
436impl<T> PoolManager<T> for SyncPool<T> {
437 /// Set or update the reset handle. If set, the reset handle will be invoked every time an element
438 /// has been returned back to the pool (i.e. calling the `put` method), regardless of if the element
439 /// is created by the pool or not.
440 fn reset_handle(&mut self, handle: fn(&mut T)) -> &mut Self {
441 // busy waiting ... for the first chance a barrier owned by someone else is lowered
442 let mut count: usize = 8;
443 let timeout = Instant::now().add(Duration::from_millis(16));
444
445 loop {
446 match self.visitor_counter.1.compare_exchange(
447 false,
448 true,
449 Ordering::SeqCst,
450 Ordering::Relaxed,
451 ) {
452 Ok(_) => break,
453 Err(_) => {
454 cpu_relax(count);
455
456 // update the counter (and the busy wait period)
457 count -= 1;
458
459 if count < 4 {
460 // yield the thread for later try
461 thread::yield_now();
462 } else if Instant::now() > timeout {
463 // don't block for more than 16ms
464 return self;
465 }
466 }
467 }
468 }
469
470 self.reset_handle.replace(handle);
471
472 self.visitor_counter.1.store(false, Ordering::SeqCst);
473 self
474 }
475
476 /// Set or update the settings that if we will allow the `SyncPool` to be expanded.
477 fn allow_expansion(&mut self, allow: bool) -> &mut Self {
478 if !(self.expansion_enabled() ^ allow) {
479 // not flipping the configuration, return
480 return self;
481 }
482
483 self.update_config(CONFIG_ALLOW_EXPANSION, allow);
484 self
485 }
486
487 /// Try to expand the `SyncPool` and add more elements to it. Usually invoke this API only when
488 /// the caller is certain that the pool is under pressure, and that a short block to the access
489 /// of the pool won't cause serious issues, since the function will block the current caller's
490 /// thread until it's finished (i.e. get the opportunity to raise the writer's barrier and wait
491 /// everyone to leave).
492 ///
493 /// If we're unable to expand the pool, it's due to one of the following reasons: 1) someone has
494 /// already raised the writer's barrier and is likely modifying the pool, we will leave immediately,
495 /// and it's up to the caller if they want to try again; 2) we've waited too long but still couldn't
496 /// obtain an exclusive access to the pool, and similar to reason 1), we will quit now.
497 fn expand(&mut self, additional: usize, block: bool) -> bool {
498 // if the pool isn't allowed to expand, just return
499 if !self.expansion_enabled() {
500 return false;
501 }
502
503 // if exceeding the upper limit, quit
504 if self.slots.len() > EXPANSION_CAP {
505 return false;
506 }
507
508 // raise the write barrier now, if someone has already raised the flag to indicate the
509 // intention to write, let me go away.
510 if self
511 .visitor_counter
512 .1
513 .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Acquire)
514 .is_err()
515 {
516 return false;
517 }
518
519 // busy waiting ... for all visitors to leave
520 let mut count: usize = 8;
521 let safe = loop {
522 match self
523 .visitor_counter
524 .0
525 .compare_exchange(1, 0, Ordering::SeqCst, Ordering::Relaxed)
526 {
527 Ok(_) => break true,
528 Err(_) => {
529 cpu_relax(2);
530 count -= 1;
531
532 if count < 4 {
533 thread::yield_now();
534 } else if !block {
535 break false;
536 }
537 }
538 }
539 };
540
541 if safe {
542 // update the slots by pushing `additional` slots
543 self.add_slots(additional, true);
544 self.miss_count.store(0, Ordering::Release);
545 }
546
547 // update the internal states
548 self.visitor_counter.0.store(1, Ordering::SeqCst);
549 self.visitor_counter.1.store(false, Ordering::Release);
550
551 safe
552 }
553
554 /// Due to contentious access to the pool, sometimes the `put` action could not finish and return
555 /// the element to the pool successfully. Overtime, this could cause the number of elements in the
556 /// pool to dwell. This would only happen slowly if we're running a very contentious multithreading
557 /// program, but it surely could happen. If the caller detects such situation, they can invoke the
558 /// `refill` API and try to refill the pool with elements.
559 ///
560 /// We will try to refill as many elements as requested
561 fn refill(&mut self, additional: usize) -> usize {
562 let cap = self.capacity();
563 let empty_slots = cap - self.len();
564
565 if empty_slots == 0 {
566 return 0;
567 }
568
569 let quota = if additional > empty_slots {
570 empty_slots
571 } else {
572 additional
573 };
574
575 let mut count = 0;
576 let timeout = Instant::now().add(Duration::from_millis(16));
577
578 // try to put `quota` number of elements into the pool
579 while count < quota {
580 let mut val = make_elem(&self.builder);
581 let mut runs = 0;
582
583 // retry to put the allocated element into the pool.
584 while let Some(ret) = self.put(val) {
585 val = ret;
586 runs += 1;
587
588 // timeout
589 if Instant::now() > timeout {
590 return count;
591 }
592
593 // check the pool length for every 4 failed attempts to put the element into the pool.
594 if runs % 4 == 0 && self.len() == cap {
595 return count;
596 }
597
598 // relax a bit
599 if runs > 8 {
600 thread::yield_now();
601 } else {
602 cpu_relax(runs / 2);
603 }
604 }
605
606 count += 1;
607 }
608
609 count
610 }
611}
612
613#[cfg(test)]
614mod pool_tests {
615 use super::*;
616 use std::vec;
617
618 struct BigStruct {
619 a: u32,
620 b: u32,
621 c: Vec<u8>,
622 }
623
624 impl BigStruct {
625 fn new() -> Self {
626 BigStruct {
627 a: 1,
628 b: 42,
629 c: vec::from_elem(0u8, 0x1_000_000),
630 }
631 }
632
633 fn initializer(mut self: Box<Self>) -> Box<Self> {
634 self.a = 1;
635 self.b = 42;
636 self.c = vec::from_elem(0u8, 0x1_000_000);
637
638 self
639 }
640 }
641
642 #[test]
643 fn use_packer() {
644 let mut pool = SyncPool::with_packer(BigStruct::initializer);
645
646 let big_box = pool.get();
647
648 assert_eq!(big_box.a, 1);
649 assert_eq!(big_box.b, 42);
650 assert_eq!(big_box.c.len(), 0x1_000_000);
651 }
652
653 #[test]
654 fn use_builder() {
655 let mut pool = SyncPool::with_builder(BigStruct::new);
656
657 let big_box = pool.get();
658
659 assert_eq!(big_box.a, 1);
660 assert_eq!(big_box.b, 42);
661 assert_eq!(big_box.c.len(), 0x1_000_000);
662 }
663}