pub struct SyncPool<T> { /* private fields */ }Implementations§
Source§impl<T: Default> SyncPool<T>
impl<T: Default> SyncPool<T>
Sourcepub fn with_size(size: usize) -> Self
pub fn with_size(size: usize) -> Self
Create a SyncPool with pre-defined number of elements. Note that we will round-up
the size such that the total number of elements in the pool will mod to 8.
Examples found in repository?
More examples
97fn pool_setup() {
98 unsafe {
99 let mut pool = SyncPool::with_size(128);
100
101 // clean up the underlying buffer, this handle can also be used to shrink the underlying
102 // buffer to save for space, though at a cost of extra overhead for doing that.
103 pool.reset_handle(sanitizer);
104
105 /*
106 // Alternatively, use an anonymous function for the same purpose. Closure can't be used as
107 // a handle, though.
108 pool.reset_handle(|slice: &mut [u8; BUF_CAP]| {
109 for i in 0..slice.len() {
110 slice[i] = 0;
111 }
112
113 println!("Byte slice cleared...");
114 });
115 */
116
117 POOL.replace(pool);
118 }
119}Source§impl<T> SyncPool<T>
impl<T> SyncPool<T>
Sourcepub fn with_builder(builder: fn() -> T) -> Self
pub fn with_builder(builder: fn() -> T) -> Self
Create a pool with default size of 64 pre-allocated elements in it, which will use the builder
handler to obtain the initialized instance of the struct, and then place the object into the
syncpool for later use.
Note that the handler shall be responsible for creating and initializing the struct object
with all fields being valid. After all, they will be the same objects provided to the caller
when invoking the get call.
§Examples
use syncpool::*;
use std::vec;
struct BigStruct {
a: u32,
b: u32,
c: Vec<u8>,
}
let mut pool = SyncPool::with_builder(|| {
BigStruct {
a: 1,
b: 42,
c: vec::from_elem(0u8, 0x1_000_000),
}
});
let big_box: Box<BigStruct> = pool.get();
assert_eq!(big_box.a, 1);
assert_eq!(big_box.b, 42);
assert_eq!(big_box.c.len(), 0x1_000_000);
pool.put(big_box);Examples found in repository?
35fn call_builder() {
36 let mut pool = SyncPool::with_builder(BigStruct::new);
37
38 println!("Pool created...");
39
40 let big_box = pool.get();
41
42 assert_eq!(big_box.a, 1);
43 assert_eq!(big_box.b, 42);
44 assert_eq!(big_box.c.len(), 0x1_000_000);
45
46 pool.put(big_box);
47}Sourcepub fn with_builder_and_size(size: usize, builder: fn() -> T) -> Self
pub fn with_builder_and_size(size: usize, builder: fn() -> T) -> Self
Create a SyncPool with pre-defined number of elements and a packer handler. The builder
handler shall essentially function the same way as in the with_builder, that it shall take
the responsibility to create and initialize the element, and return the instance at the end
of the builder closure. Note that we will round-up the size such that the total number of
elements in the pool will mod to 8.
Sourcepub fn with_packer(packer: fn(Box<T>) -> Box<T>) -> Self
pub fn with_packer(packer: fn(Box<T>) -> Box<T>) -> Self
Create a pool with default size of 64 pre-allocated elements in it, which will use the packer
handler to initialize the element that’s being provided by the pool.
Note that the handler shall take a boxed instance of the element that only contains placeholder fields, and it is the caller/handler’s job to initialize the fields and pack it with valid and meaningful values. If the struct is valid with all-zero values, the handler can just return the input element.
§Examples
use syncpool::*;
use std::vec;
struct BigStruct {
a: u32,
b: u32,
c: Vec<u8>,
}
let mut pool = SyncPool::with_packer(|mut src: Box<BigStruct>| {
src.a = 1;
src.b = 42;
src.c = vec::from_elem(0u8, 0x1_000_000);
src
});
let big_box: Box<BigStruct> = pool.get();
assert_eq!(big_box.a, 1);
assert_eq!(big_box.b, 42);
assert_eq!(big_box.c.len(), 0x1_000_000);
pool.put(big_box);Examples found in repository?
49fn call_packer() {
50 let mut pool = SyncPool::with_packer(BigStruct::initializer);
51
52 println!("Pool created...");
53
54 let big_box = pool.get();
55
56 assert_eq!(big_box.a, 1);
57 assert_eq!(big_box.b, 42);
58 assert_eq!(big_box.c.len(), 0x1_000_000);
59
60 pool.put(big_box);
61}Sourcepub fn with_packer_and_size(size: usize, packer: fn(Box<T>) -> Box<T>) -> Self
pub fn with_packer_and_size(size: usize, packer: fn(Box<T>) -> Box<T>) -> Self
Create a SyncPool with pre-defined number of elements and a packer handler. The packer
handler shall essentially function the same way as in with_packer, that it shall take the
responsibility to initialize all the fields of a placeholder struct on the heap, otherwise
the element returned by the pool will be essentially undefined, unless all the struct’s
fields can be represented by a 0 value. In addition, we will round-up the size such that
the total number of elements in the pool will mod to 8.
Sourcepub fn get(&mut self) -> Box<T>
pub fn get(&mut self) -> Box<T>
Try to obtain a pre-allocated element from the pool. This method will always succeed even if the pool is empty or not available for anyone to access, and in this case, a new boxed-element will be created.
Examples found in repository?
35fn call_builder() {
36 let mut pool = SyncPool::with_builder(BigStruct::new);
37
38 println!("Pool created...");
39
40 let big_box = pool.get();
41
42 assert_eq!(big_box.a, 1);
43 assert_eq!(big_box.b, 42);
44 assert_eq!(big_box.c.len(), 0x1_000_000);
45
46 pool.put(big_box);
47}
48
49fn call_packer() {
50 let mut pool = SyncPool::with_packer(BigStruct::initializer);
51
52 println!("Pool created...");
53
54 let big_box = pool.get();
55
56 assert_eq!(big_box.a, 1);
57 assert_eq!(big_box.b, 42);
58 assert_eq!(big_box.c.len(), 0x1_000_000);
59
60 pool.put(big_box);
61}More examples
125fn run(alloc: bool) -> u128 {
126 let (tx, rx) = mpsc::sync_channel(32);
127 let tx_clone = tx.clone();
128
129 let now = Instant::now();
130
131 let send_one = thread::spawn(move || {
132 for i in 0..TEST_SIZE {
133 if i % DENOMINATOR == 0 {
134 thread::sleep(Duration::from_nanos(BUSY_PERIOD));
135 }
136
137 let mut data = if alloc {
138 Default::default()
139 } else {
140 unsafe { POOL.as_mut().unwrap().get() }
141 };
142
143 assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
144 assert_ne!(data.id, 42);
145 data.id = 42;
146
147 /* assert_eq!(arr.len(), BUF_CAP);
148 assert_eq!(arr.0[42], 42);*/
149
150 tx_clone.try_send(data).unwrap_or_default();
151 }
152
153 // println!("Send one done...");
154 });
155
156 let send_two = thread::spawn(move || {
157 for i in 0..TEST_SIZE {
158 if i % DENOMINATOR == 0 {
159 thread::sleep(Duration::from_nanos(BUSY_PERIOD));
160 }
161
162 let mut data = if alloc {
163 Default::default()
164 } else {
165 unsafe { POOL.as_mut().unwrap().get() }
166 };
167
168 assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
169 assert_ne!(data.id, 42);
170 data.id = 42;
171
172 /* assert_eq!(arr.len(), BUF_CAP);
173 assert_eq!(arr.0[42], 42);*/
174
175 tx.try_send(data).unwrap_or_default();
176 }
177
178 // println!("Send two done...");
179 });
180
181 let recv_one = thread::spawn(move || {
182 thread::sleep(Duration::from_micros(5));
183
184 while let Ok(arr) = rx.recv() {
185 // assert_eq!(arr.len(), BUF_CAP);
186
187 if !alloc {
188 unsafe {
189 POOL.as_mut().unwrap().put(arr);
190 }
191 }
192 }
193
194 // println!("Recv one done...");
195 });
196
197 for i in 0..TEST_SIZE {
198 // sleep a bit to create some concurrent actions
199 if i % DENOMINATOR == 1 {
200 thread::sleep(Duration::from_nanos(BUSY_PERIOD));
201 }
202
203 let mut data = if alloc {
204 Default::default()
205 } else {
206 unsafe { POOL.as_mut().unwrap().get() }
207 };
208
209 // assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
210 assert_ne!(data.id, 42);
211 data.id = 42;
212
213 /* assert_eq!(arr.len(), BUF_CAP);
214 assert_eq!(arr.0[42], 42);*/
215
216 if !alloc {
217 // when done using the object, make sure to put it back so the pool won't dry up
218 unsafe { POOL.as_mut().unwrap().put(data) };
219 }
220 }
221
222 // println!("Main loop done...");
223
224 send_one.join().unwrap_or_default();
225 send_two.join().unwrap_or_default();
226 recv_one.join().unwrap_or_default();
227
228 now.elapsed().as_micros()
229}Sourcepub fn put(&mut self, val: Box<T>) -> Option<Box<T>>
pub fn put(&mut self, val: Box<T>) -> Option<Box<T>>
Try to return an element to the SyncPool. If succeed, we will return None to indicate that
the value has been placed in an empty slot; otherwise, we will return Option<Box<T>> such
that the caller can decide if the element shall be just discarded, or try put it back again.
Examples found in repository?
35fn call_builder() {
36 let mut pool = SyncPool::with_builder(BigStruct::new);
37
38 println!("Pool created...");
39
40 let big_box = pool.get();
41
42 assert_eq!(big_box.a, 1);
43 assert_eq!(big_box.b, 42);
44 assert_eq!(big_box.c.len(), 0x1_000_000);
45
46 pool.put(big_box);
47}
48
49fn call_packer() {
50 let mut pool = SyncPool::with_packer(BigStruct::initializer);
51
52 println!("Pool created...");
53
54 let big_box = pool.get();
55
56 assert_eq!(big_box.a, 1);
57 assert_eq!(big_box.b, 42);
58 assert_eq!(big_box.c.len(), 0x1_000_000);
59
60 pool.put(big_box);
61}More examples
44fn main() {
45 // let's make the pool slightly smaller than the demand, this will simulate a service under pressure
46 // such that the pool can't completely meet the demand without dynamically expand the pool.
47 let (pinned_producer, pinned_consumer) = unsafe { pool_setup() };
48
49 // make the channel that establish a concurrent pipeline.
50 let (tx, rx) = mpsc::sync_channel(64);
51
52 // data producer loop
53 thread::spawn(move || {
54 let producer = pinned_producer.get_mut();
55
56 for i in 0..COUNT {
57 run(producer, &tx, i);
58 }
59 });
60
61 // data consumer logic
62 let handler = thread::spawn(move || {
63 let consumer = pinned_consumer.get_mut();
64
65 for content in rx {
66 println!("Receiving struct with id: {}", content.id);
67 consumer.put(content);
68 }
69 });
70
71 // wait for the receiver to finish and print the result.
72 handler.join().unwrap_or_default();
73
74 println!("All done...");
75}125fn run(alloc: bool) -> u128 {
126 let (tx, rx) = mpsc::sync_channel(32);
127 let tx_clone = tx.clone();
128
129 let now = Instant::now();
130
131 let send_one = thread::spawn(move || {
132 for i in 0..TEST_SIZE {
133 if i % DENOMINATOR == 0 {
134 thread::sleep(Duration::from_nanos(BUSY_PERIOD));
135 }
136
137 let mut data = if alloc {
138 Default::default()
139 } else {
140 unsafe { POOL.as_mut().unwrap().get() }
141 };
142
143 assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
144 assert_ne!(data.id, 42);
145 data.id = 42;
146
147 /* assert_eq!(arr.len(), BUF_CAP);
148 assert_eq!(arr.0[42], 42);*/
149
150 tx_clone.try_send(data).unwrap_or_default();
151 }
152
153 // println!("Send one done...");
154 });
155
156 let send_two = thread::spawn(move || {
157 for i in 0..TEST_SIZE {
158 if i % DENOMINATOR == 0 {
159 thread::sleep(Duration::from_nanos(BUSY_PERIOD));
160 }
161
162 let mut data = if alloc {
163 Default::default()
164 } else {
165 unsafe { POOL.as_mut().unwrap().get() }
166 };
167
168 assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
169 assert_ne!(data.id, 42);
170 data.id = 42;
171
172 /* assert_eq!(arr.len(), BUF_CAP);
173 assert_eq!(arr.0[42], 42);*/
174
175 tx.try_send(data).unwrap_or_default();
176 }
177
178 // println!("Send two done...");
179 });
180
181 let recv_one = thread::spawn(move || {
182 thread::sleep(Duration::from_micros(5));
183
184 while let Ok(arr) = rx.recv() {
185 // assert_eq!(arr.len(), BUF_CAP);
186
187 if !alloc {
188 unsafe {
189 POOL.as_mut().unwrap().put(arr);
190 }
191 }
192 }
193
194 // println!("Recv one done...");
195 });
196
197 for i in 0..TEST_SIZE {
198 // sleep a bit to create some concurrent actions
199 if i % DENOMINATOR == 1 {
200 thread::sleep(Duration::from_nanos(BUSY_PERIOD));
201 }
202
203 let mut data = if alloc {
204 Default::default()
205 } else {
206 unsafe { POOL.as_mut().unwrap().get() }
207 };
208
209 // assert!(data.id == 21 || data.id == 0, "Wrong id: {}", data.id);
210 assert_ne!(data.id, 42);
211 data.id = 42;
212
213 /* assert_eq!(arr.len(), BUF_CAP);
214 assert_eq!(arr.0[42], 42);*/
215
216 if !alloc {
217 // when done using the object, make sure to put it back so the pool won't dry up
218 unsafe { POOL.as_mut().unwrap().put(data) };
219 }
220 }
221
222 // println!("Main loop done...");
223
224 send_one.join().unwrap_or_default();
225 send_two.join().unwrap_or_default();
226 recv_one.join().unwrap_or_default();
227
228 now.elapsed().as_micros()
229}Trait Implementations§
Source§impl<T> PoolManager<T> for SyncPool<T>
The pool manager that provide many useful utilities to keep the SyncPool close to the needs of
the caller program.
impl<T> PoolManager<T> for SyncPool<T>
The pool manager that provide many useful utilities to keep the SyncPool close to the needs of the caller program.
Source§fn reset_handle(&mut self, handle: fn(&mut T)) -> &mut Self
fn reset_handle(&mut self, handle: fn(&mut T)) -> &mut Self
Set or update the reset handle. If set, the reset handle will be invoked every time an element
has been returned back to the pool (i.e. calling the put method), regardless of if the element
is created by the pool or not.
Source§fn allow_expansion(&mut self, allow: bool) -> &mut Self
fn allow_expansion(&mut self, allow: bool) -> &mut Self
Set or update the settings that if we will allow the SyncPool to be expanded.
Source§fn expand(&mut self, additional: usize, block: bool) -> bool
fn expand(&mut self, additional: usize, block: bool) -> bool
Try to expand the SyncPool and add more elements to it. Usually invoke this API only when
the caller is certain that the pool is under pressure, and that a short block to the access
of the pool won’t cause serious issues, since the function will block the current caller’s
thread until it’s finished (i.e. get the opportunity to raise the writer’s barrier and wait
everyone to leave).
If we’re unable to expand the pool, it’s due to one of the following reasons: 1) someone has already raised the writer’s barrier and is likely modifying the pool, we will leave immediately, and it’s up to the caller if they want to try again; 2) we’ve waited too long but still couldn’t obtain an exclusive access to the pool, and similar to reason 1), we will quit now.
Source§fn refill(&mut self, additional: usize) -> usize
fn refill(&mut self, additional: usize) -> usize
Due to contentious access to the pool, sometimes the put action could not finish and return
the element to the pool successfully. Overtime, this could cause the number of elements in the
pool to dwell. This would only happen slowly if we’re running a very contentious multithreading
program, but it surely could happen. If the caller detects such situation, they can invoke the
refill API and try to refill the pool with elements.
We will try to refill as many elements as requested