SyncPool

Struct SyncPool 

Source
pub struct SyncPool<T> { /* private fields */ }

Implementations§

Source§

impl<T: Default> SyncPool<T>

Source

pub fn new() -> Self

Create a pool with default size of 64 pre-allocated elements in it.

Source

pub fn with_size(size: usize) -> Self

Create a SyncPool with pre-defined number of elements. Note that we will round-up the size such that the total number of elements in the pool will mod to 8.

Examples found in repository?
examples/basic.rs (line 35)
31unsafe fn pool_setup() -> (
32    Pin<&'static mut SyncPool<ComplexStruct>>,
33    Pin<&'static mut SyncPool<ComplexStruct>>,
34) {
35    POOL.as_mut_ptr().write(SyncPool::with_size(COUNT / 2));
36
37    (
38        Pin::new(&mut *POOL.as_mut_ptr()),
39        Pin::new(&mut *POOL.as_mut_ptr()),
40    )
41}
More examples
Hide additional examples
examples/complex_bench.rs (line 99)
97fn pool_setup() {
98    unsafe {
99        let mut pool = SyncPool::with_size(128);
100
101        // clean up the underlying buffer, this handle can also be used to shrink the underlying
102        // buffer to save for space, though at a cost of extra overhead for doing that.
103        pool.reset_handle(sanitizer);
104
105        /*
106        // Alternatively, use an anonymous function for the same purpose. Closure can't be used as
107        // a handle, though.
108        pool.reset_handle(|slice: &mut [u8; BUF_CAP]| {
109            for i in 0..slice.len() {
110                slice[i] = 0;
111            }
112
113            println!("Byte slice cleared...");
114        });
115        */
116
117        POOL.replace(pool);
118    }
119}
Source§

impl<T> SyncPool<T>

Source

pub fn with_builder(builder: fn() -> T) -> Self

Create a pool with default size of 64 pre-allocated elements in it, which will use the builder handler to obtain the initialized instance of the struct, and then place the object into the syncpool for later use.

Note that the handler shall be responsible for creating and initializing the struct object with all fields being valid. After all, they will be the same objects provided to the caller when invoking the get call.

§Examples
use syncpool::*;
use std::vec;

struct BigStruct {
    a: u32,
    b: u32,
    c: Vec<u8>,
}

let mut pool = SyncPool::with_builder(|| {
    BigStruct {
        a: 1,
        b: 42,
        c: vec::from_elem(0u8, 0x1_000_000),
    }
});

let big_box: Box<BigStruct> = pool.get();

assert_eq!(big_box.a, 1);
assert_eq!(big_box.b, 42);
assert_eq!(big_box.c.len(), 0x1_000_000);

pool.put(big_box);
Examples found in repository?
examples/with_tools.rs (line 36)
35fn call_builder() {
36    let mut pool = SyncPool::with_builder(BigStruct::new);
37
38    println!("Pool created...");
39
40    let big_box = pool.get();
41
42    assert_eq!(big_box.a, 1);
43    assert_eq!(big_box.b, 42);
44    assert_eq!(big_box.c.len(), 0x1_000_000);
45
46    pool.put(big_box);
47}
Source

pub fn with_builder_and_size(size: usize, builder: fn() -> T) -> Self

Create a SyncPool with pre-defined number of elements and a packer handler. The builder handler shall essentially function the same way as in the with_builder, that it shall take the responsibility to create and initialize the element, and return the instance at the end of the builder closure. Note that we will round-up the size such that the total number of elements in the pool will mod to 8.

Source

pub fn with_packer(packer: fn(Box<T>) -> Box<T>) -> Self

Create a pool with default size of 64 pre-allocated elements in it, which will use the packer handler to initialize the element that’s being provided by the pool.

Note that the handler shall take a boxed instance of the element that only contains placeholder fields, and it is the caller/handler’s job to initialize the fields and pack it with valid and meaningful values. If the struct is valid with all-zero values, the handler can just return the input element.

§Examples
use syncpool::*;
use std::vec;

struct BigStruct {
    a: u32,
    b: u32,
    c: Vec<u8>,
}

let mut pool = SyncPool::with_packer(|mut src: Box<BigStruct>| {
    src.a = 1;
    src.b = 42;
    src.c = vec::from_elem(0u8, 0x1_000_000);
    src
});

let big_box: Box<BigStruct> = pool.get();

assert_eq!(big_box.a, 1);
assert_eq!(big_box.b, 42);
assert_eq!(big_box.c.len(), 0x1_000_000);

pool.put(big_box);
Examples found in repository?
examples/with_tools.rs (line 50)
49fn call_packer() {
50    let mut pool = SyncPool::with_packer(BigStruct::initializer);
51
52    println!("Pool created...");
53
54    let big_box = pool.get();
55
56    assert_eq!(big_box.a, 1);
57    assert_eq!(big_box.b, 42);
58    assert_eq!(big_box.c.len(), 0x1_000_000);
59
60    pool.put(big_box);
61}
Source

pub fn with_packer_and_size(size: usize, packer: fn(Box<T>) -> Box<T>) -> Self

Create a SyncPool with pre-defined number of elements and a packer handler. The packer handler shall essentially function the same way as in with_packer, that it shall take the responsibility to initialize all the fields of a placeholder struct on the heap, otherwise the element returned by the pool will be essentially undefined, unless all the struct’s fields can be represented by a 0 value. In addition, we will round-up the size such that the total number of elements in the pool will mod to 8.

Source

pub fn get(&mut self) -> Box<T>

Try to obtain a pre-allocated element from the pool. This method will always succeed even if the pool is empty or not available for anyone to access, and in this case, a new boxed-element will be created.

Examples found in repository?
examples/with_tools.rs (line 40)
35fn call_builder() {
36    let mut pool = SyncPool::with_builder(BigStruct::new);
37
38    println!("Pool created...");
39
40    let big_box = pool.get();
41
42    assert_eq!(big_box.a, 1);
43    assert_eq!(big_box.b, 42);
44    assert_eq!(big_box.c.len(), 0x1_000_000);
45
46    pool.put(big_box);
47}
48
49fn call_packer() {
50    let mut pool = SyncPool::with_packer(BigStruct::initializer);
51
52    println!("Pool created...");
53
54    let big_box = pool.get();
55
56    assert_eq!(big_box.a, 1);
57    assert_eq!(big_box.b, 42);
58    assert_eq!(big_box.c.len(), 0x1_000_000);
59
60    pool.put(big_box);
61}
More examples
Hide additional examples
examples/basic.rs (line 79)
77fn run(pool: &mut SyncPool<ComplexStruct>, chan: &SyncSender<Box<ComplexStruct>>, id: usize) {
78    // take a pre-init struct from the pool
79    let mut content = pool.get();
80    content.id = id;
81
82    // assuming we're doing some stuff in this period
83    thread::sleep(Duration::from_nanos(32));
84
85    // done with the stuff, send the result out.
86    chan.send(content).unwrap_or_default();
87}
examples/complex_bench.rs (line 140)
125fn run(alloc: bool) -> u128 {
126    let (tx, rx) = mpsc::sync_channel(32);
127    let tx_clone = tx.clone();
128
129    let now = Instant::now();
130
131    let send_one = thread::spawn(move || {
132        for i in 0..TEST_SIZE {
133            if i % DENOMINATOR == 0 {
134                thread::sleep(Duration::from_nanos(BUSY_PERIOD));
135            }
136
137            let mut data = if alloc {
138                Default::default()
139            } else {
140                unsafe { POOL.as_mut().unwrap().get() }
141            };
142
143            assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
144            assert_ne!(data.id, 42);
145            data.id = 42;
146
147            /*            assert_eq!(arr.len(), BUF_CAP);
148            assert_eq!(arr.0[42], 42);*/
149
150            tx_clone.try_send(data).unwrap_or_default();
151        }
152
153        //        println!("Send one done...");
154    });
155
156    let send_two = thread::spawn(move || {
157        for i in 0..TEST_SIZE {
158            if i % DENOMINATOR == 0 {
159                thread::sleep(Duration::from_nanos(BUSY_PERIOD));
160            }
161
162            let mut data = if alloc {
163                Default::default()
164            } else {
165                unsafe { POOL.as_mut().unwrap().get() }
166            };
167
168            assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
169            assert_ne!(data.id, 42);
170            data.id = 42;
171
172            /*            assert_eq!(arr.len(), BUF_CAP);
173            assert_eq!(arr.0[42], 42);*/
174
175            tx.try_send(data).unwrap_or_default();
176        }
177
178        //        println!("Send two done...");
179    });
180
181    let recv_one = thread::spawn(move || {
182        thread::sleep(Duration::from_micros(5));
183
184        while let Ok(arr) = rx.recv() {
185            //            assert_eq!(arr.len(), BUF_CAP);
186
187            if !alloc {
188                unsafe {
189                    POOL.as_mut().unwrap().put(arr);
190                }
191            }
192        }
193
194        //        println!("Recv one done...");
195    });
196
197    for i in 0..TEST_SIZE {
198        // sleep a bit to create some concurrent actions
199        if i % DENOMINATOR == 1 {
200            thread::sleep(Duration::from_nanos(BUSY_PERIOD));
201        }
202
203        let mut data = if alloc {
204            Default::default()
205        } else {
206            unsafe { POOL.as_mut().unwrap().get() }
207        };
208
209        //        assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
210        assert_ne!(data.id, 42);
211        data.id = 42;
212
213        /*        assert_eq!(arr.len(), BUF_CAP);
214        assert_eq!(arr.0[42], 42);*/
215
216        if !alloc {
217            // when done using the object, make sure to put it back so the pool won't dry up
218            unsafe { POOL.as_mut().unwrap().put(data) };
219        }
220    }
221
222    //    println!("Main loop done...");
223
224    send_one.join().unwrap_or_default();
225    send_two.join().unwrap_or_default();
226    recv_one.join().unwrap_or_default();
227
228    now.elapsed().as_micros()
229}
Source

pub fn put(&mut self, val: Box<T>) -> Option<Box<T>>

Try to return an element to the SyncPool. If succeed, we will return None to indicate that the value has been placed in an empty slot; otherwise, we will return Option<Box<T>> such that the caller can decide if the element shall be just discarded, or try put it back again.

Examples found in repository?
examples/with_tools.rs (line 46)
35fn call_builder() {
36    let mut pool = SyncPool::with_builder(BigStruct::new);
37
38    println!("Pool created...");
39
40    let big_box = pool.get();
41
42    assert_eq!(big_box.a, 1);
43    assert_eq!(big_box.b, 42);
44    assert_eq!(big_box.c.len(), 0x1_000_000);
45
46    pool.put(big_box);
47}
48
49fn call_packer() {
50    let mut pool = SyncPool::with_packer(BigStruct::initializer);
51
52    println!("Pool created...");
53
54    let big_box = pool.get();
55
56    assert_eq!(big_box.a, 1);
57    assert_eq!(big_box.b, 42);
58    assert_eq!(big_box.c.len(), 0x1_000_000);
59
60    pool.put(big_box);
61}
More examples
Hide additional examples
examples/basic.rs (line 67)
44fn main() {
45    // let's make the pool slightly smaller than the demand, this will simulate a service under pressure
46    // such that the pool can't completely meet the demand without dynamically expand the pool.
47    let (pinned_producer, pinned_consumer) = unsafe { pool_setup() };
48
49    // make the channel that establish a concurrent pipeline.
50    let (tx, rx) = mpsc::sync_channel(64);
51
52    // data producer loop
53    thread::spawn(move || {
54        let producer = pinned_producer.get_mut();
55
56        for i in 0..COUNT {
57            run(producer, &tx, i);
58        }
59    });
60
61    // data consumer logic
62    let handler = thread::spawn(move || {
63        let consumer = pinned_consumer.get_mut();
64
65        for content in rx {
66            println!("Receiving struct with id: {}", content.id);
67            consumer.put(content);
68        }
69    });
70
71    // wait for the receiver to finish and print the result.
72    handler.join().unwrap_or_default();
73
74    println!("All done...");
75}
examples/complex_bench.rs (line 189)
125fn run(alloc: bool) -> u128 {
126    let (tx, rx) = mpsc::sync_channel(32);
127    let tx_clone = tx.clone();
128
129    let now = Instant::now();
130
131    let send_one = thread::spawn(move || {
132        for i in 0..TEST_SIZE {
133            if i % DENOMINATOR == 0 {
134                thread::sleep(Duration::from_nanos(BUSY_PERIOD));
135            }
136
137            let mut data = if alloc {
138                Default::default()
139            } else {
140                unsafe { POOL.as_mut().unwrap().get() }
141            };
142
143            assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
144            assert_ne!(data.id, 42);
145            data.id = 42;
146
147            /*            assert_eq!(arr.len(), BUF_CAP);
148            assert_eq!(arr.0[42], 42);*/
149
150            tx_clone.try_send(data).unwrap_or_default();
151        }
152
153        //        println!("Send one done...");
154    });
155
156    let send_two = thread::spawn(move || {
157        for i in 0..TEST_SIZE {
158            if i % DENOMINATOR == 0 {
159                thread::sleep(Duration::from_nanos(BUSY_PERIOD));
160            }
161
162            let mut data = if alloc {
163                Default::default()
164            } else {
165                unsafe { POOL.as_mut().unwrap().get() }
166            };
167
168            assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
169            assert_ne!(data.id, 42);
170            data.id = 42;
171
172            /*            assert_eq!(arr.len(), BUF_CAP);
173            assert_eq!(arr.0[42], 42);*/
174
175            tx.try_send(data).unwrap_or_default();
176        }
177
178        //        println!("Send two done...");
179    });
180
181    let recv_one = thread::spawn(move || {
182        thread::sleep(Duration::from_micros(5));
183
184        while let Ok(arr) = rx.recv() {
185            //            assert_eq!(arr.len(), BUF_CAP);
186
187            if !alloc {
188                unsafe {
189                    POOL.as_mut().unwrap().put(arr);
190                }
191            }
192        }
193
194        //        println!("Recv one done...");
195    });
196
197    for i in 0..TEST_SIZE {
198        // sleep a bit to create some concurrent actions
199        if i % DENOMINATOR == 1 {
200            thread::sleep(Duration::from_nanos(BUSY_PERIOD));
201        }
202
203        let mut data = if alloc {
204            Default::default()
205        } else {
206            unsafe { POOL.as_mut().unwrap().get() }
207        };
208
209        //        assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
210        assert_ne!(data.id, 42);
211        data.id = 42;
212
213        /*        assert_eq!(arr.len(), BUF_CAP);
214        assert_eq!(arr.0[42], 42);*/
215
216        if !alloc {
217            // when done using the object, make sure to put it back so the pool won't dry up
218            unsafe { POOL.as_mut().unwrap().put(data) };
219        }
220    }
221
222    //    println!("Main loop done...");
223
224    send_one.join().unwrap_or_default();
225    send_two.join().unwrap_or_default();
226    recv_one.join().unwrap_or_default();
227
228    now.elapsed().as_micros()
229}

Trait Implementations§

Source§

impl<T> Default for SyncPool<T>
where T: Default,

Source§

fn default() -> Self

Returns the “default value” for a type. Read more
Source§

impl<T> Drop for SyncPool<T>

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl<T> PoolManager<T> for SyncPool<T>

The pool manager that provide many useful utilities to keep the SyncPool close to the needs of the caller program.

Source§

fn reset_handle(&mut self, handle: fn(&mut T)) -> &mut Self

Set or update the reset handle. If set, the reset handle will be invoked every time an element has been returned back to the pool (i.e. calling the put method), regardless of if the element is created by the pool or not.

Source§

fn allow_expansion(&mut self, allow: bool) -> &mut Self

Set or update the settings that if we will allow the SyncPool to be expanded.

Source§

fn expand(&mut self, additional: usize, block: bool) -> bool

Try to expand the SyncPool and add more elements to it. Usually invoke this API only when the caller is certain that the pool is under pressure, and that a short block to the access of the pool won’t cause serious issues, since the function will block the current caller’s thread until it’s finished (i.e. get the opportunity to raise the writer’s barrier and wait everyone to leave).

If we’re unable to expand the pool, it’s due to one of the following reasons: 1) someone has already raised the writer’s barrier and is likely modifying the pool, we will leave immediately, and it’s up to the caller if they want to try again; 2) we’ve waited too long but still couldn’t obtain an exclusive access to the pool, and similar to reason 1), we will quit now.

Source§

fn refill(&mut self, additional: usize) -> usize

Due to contentious access to the pool, sometimes the put action could not finish and return the element to the pool successfully. Overtime, this could cause the number of elements in the pool to dwell. This would only happen slowly if we’re running a very contentious multithreading program, but it surely could happen. If the caller detects such situation, they can invoke the refill API and try to refill the pool with elements.

We will try to refill as many elements as requested

Source§

impl<T> PoolState for SyncPool<T>

Auto Trait Implementations§

§

impl<T> !Freeze for SyncPool<T>

§

impl<T> RefUnwindSafe for SyncPool<T>
where T: RefUnwindSafe,

§

impl<T> Send for SyncPool<T>
where T: Send,

§

impl<T> !Sync for SyncPool<T>

§

impl<T> Unpin for SyncPool<T>

§

impl<T> UnwindSafe for SyncPool<T>
where T: RefUnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.