shared_resource_pool_builder/lib.rs
1//! # Shared Resource Pool Builder
2//!
3//! A producer-consumer thread pool builder that will eventually consume all items
4//! sequentially with a shared resource.
5//!
6//! ## Code example
7//!
8//! The purpose of this library is best described with an example, so let's get
9//! straight into it:
10//!
11//! ```rust
12//! use shared_resource_pool_builder::SharedResourcePoolBuilder;
13//!
14//! // Define a pool builder with a Vector as shared resource, and an action that
15//! // should be executed sequentially over all items. Note that the second
16//! // parameter of this function is a shared consumer function that takes two
17//! // parameters: The shared resource and the items it will be used on. These
18//! // items will be returned from the pool consumers.
19//! let pool_builder = SharedResourcePoolBuilder::new(
20//! Vec::new(),
21//! |vec, i| vec.push(i)
22//! );
23//!
24//! // Create a producer-consumer thread pool that does some work. In this case,
25//! // send the numbers 0 to 2 to the consumer. There, they will be multiplied by
26//! // 10. Note that the first parameter is a producer function that takes a
27//! // Sender like object, like [`std::sync::mpsc::Sender`]. Every item that gets
28//! // sent will be processed by the consumer, which is the second parameter to
29//! // this function. This function expects an object of the same type that gets
30//! // sent by the producer. The consumer is expected to return an object of the
31//! // type that the shared consumer takes as the second parameter and will
32//! // ultimately used on the shared resource.
33//! let pool_1 = pool_builder
34//! .create_pool(
35//! |tx| (0..=2).for_each(|i| tx.send(i).unwrap()),
36//! |i| i * 10
37//! );
38//!
39//! // Create a second pool. Here, the numbers 3 to 5 will be produced, multiplied
40//! // by 2, and being processed by the shared consumer function.
41//! let pool_2 = pool_builder
42//! .create_pool(
43//! |tx| (3..=5).for_each(|i| tx.send(i).unwrap()),
44//! |i| i * 2
45//! );
46//!
47//! // Wait for a specific pool to finish before continuing ...
48//! pool_1.join().unwrap();
49//!
50//! // ... or wait for all pools and the shared consumer to finish their work at
51//! // once and return the shared resource. Afterwards, the pool builder can no
52//! // longer be used, since the shared resource gets moved.
53//! let result = {
54//! let mut result = pool_builder.join().unwrap();
55//!
56//! // By default, the pool consumers run in as many threads as there are
57//! // cores in the machine, so the result may not be in the same order as
58//! // they were programmed.
59//! result.sort();
60//! result
61//! };
62//!
63//! assert_eq!(result, vec![0, 6, 8, 10, 10, 20]);
64//! ```
65//!
66//! ## Motivation
67//!
68//! Imagine you have a huge list of items that need to be processed. Every item
69//! can be processed independently, so the natural thing to do is using a thread
70//! pool to parallelize the work. But now imagine that the resulting items need to
71//! be used together with a resource that does not work on multiple threads. The
72//! resulting items will have to be processed sequentially, so the resource does
73//! not have to be shared with multiple threads.
74//!
75//! As a more concrete example, imagine a program that uses a SQLite database for
76//! storing computational intensive results. Before doing the actual work, you
77//! want to remove obsolete results from the database. Then you need to collect
78//! all new data that need to get processed. At last, you want all new data to be
79//! actually processed.
80//!
81//! Normally, this involves three sequential steps: Cleaning, collecting, and
82//! processing. These steps need to be sequential because writing operations on
83//! the same SQLite database can only be done by one thread at a time. You might
84//! want to implement synchronization, so that one thread writes and all
85//! additional threads are blocked and wait for their turn. But this can be error
86//! prone if not done correctly. You will encounter random panics of locked or
87//! busy databases. Furthermore, you lose the ability to use prepared statements
88//! efficiently.
89//!
90//! Enter `shared-resource-pool-builder`. With this, you have the ability to
91//! define a *shared resource* and a *shared consumer function*. From this builder
92//! you can create producer-consumer pools. The consumed items will then be
93//! sequentially given to the shared consumer function to apply the result to the
94//! shared resource.
95//!
96//! The pool consumers will all run within a shared thread pool, so you need not
97//! to worry about thrashing your system. You can even limit the number of items
98//! that get processed in parallel.
99//!
100//! To stay with the database example, this means you can define a pool builder
101//! with the database connection as the shared resource and a shared consumer
102//! function that will remove, insert, or update items. You can do that with an
103//! enum, for example.
104//!
105//! Then, for the cleaning step, you create a pool that produces all items from
106//! the database and consumes them by returning an object that causes the item to
107//! be either deleted (if obsolete) or ignored by the shared consumer.
108//!
109//! Next, you can create a pool that produces all new items and consumes them by
110//! creating an insert action with each item.
111//!
112//! Depending on the database layout, these two pools can even be run in parallel.
113//! Thanks to the shared consumer, only one delete or insert will run at the same
114//! time.
115//!
116//! Again, depending on the layout, you just need to wait for the inserting pool
117//! to finish and create yet another pool that produces all new items that needs
118//! to be processed, and consumes them by doing the actual work and returning an
119//! appropriate update object for the shared consumer. In the meanwhile, the
120//! cleaning pool might as well continue its work.
121//!
122//! Eventually, you wait for all pools to finish before terminating the program.
123//!
124//! This way, you do not need to worry about synchronizing multiple writer threads
125//! or multiple open database connections. Just split the work so the
126//! computational intensive tasks get done in the pool consumers and make the
127//! shared consumer just do the finalization.
128
129use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender};
130use std::sync::{Arc, Condvar, Mutex};
131use std::thread;
132use std::thread::JoinHandle;
133
134use threadpool::ThreadPool;
135
136use crate::job::JobCounter;
137pub use crate::job::{Job, JobHandle};
138use crate::senderlike::SenderLike;
139
140mod job;
141mod senderlike;
142
143/// The builder to create pools on a shared resource.
144pub struct SharedResourcePoolBuilder<SType, SR> {
145 tx: SType,
146 handler_thread: JoinHandle<SR>,
147 consumer_pool: ThreadPool,
148}
149
150impl<SType, Arg, SR> SharedResourcePoolBuilder<SType, SR>
151where
152 Arg: Send + 'static,
153 SType: SenderLike<Item = Job<Arg>> + Clone + Send + 'static,
154 SR: Send + 'static,
155{
156 fn init<SC>(
157 tx: SType,
158 rx: Receiver<Job<Arg>>,
159 mut shared_resource: SR,
160 mut shared_consumer_fn: SC,
161 ) -> Self
162 where
163 SC: FnMut(&mut SR, Arg) -> () + Send + Sync + 'static,
164 {
165 let join_handle = thread::spawn(move || {
166 for job in rx {
167 shared_consumer_fn(&mut shared_resource, job.0);
168 }
169 shared_resource
170 });
171
172 let consumer_pool = threadpool::Builder::new().build();
173
174 Self {
175 tx,
176 handler_thread: join_handle,
177 consumer_pool,
178 }
179 }
180
181 /// Creates an unbounded producer-consumer pool.
182 ///
183 /// This method takes two parameters:
184 ///
185 /// - The producer function
186 ///
187 /// This function will generate the items to be further processed by the consumer. It must
188 /// take one parameter, which is a sender like object like [`Sender`]. To generate an item
189 /// for the consumer, use the `send()` method of the sender.
190 ///
191 /// The return type of this function will be ignored, i.e. it is `()`.
192 ///
193 /// By design, this function will run in only one thread. If you want it to run in multiple
194 /// threads, you need to implement that by yourself. The preferred way is to run only a
195 /// cheap listing algorithm in the producer and let the consumer do the computational
196 /// intensive tasks, which will by default run within a thread pool.
197 ///
198 /// - The consumer function
199 ///
200 /// This function takes one parameter and will be called for each item that was sent by the
201 /// producer. The return type must be the same type that is used by the shared consumer as
202 /// defined by the [`Self::new()`] method.
203 ///
204 /// By design, the consumer functions will be run within a thread pool that is shared with
205 /// every other pool consumers created by this builder. The number of threads is the number
206 /// of cores, as defined by the default value of [`threadpool::Builder::num_threads()`].
207 ///
208 /// The message channel, in which the producer sends and the consumer receives the items, is
209 /// unlimited with this function (in other words, it uses [`channel()`]). On modern systems and
210 /// "typical" use-cases, this should not be a problem. If however your system is low on memory
211 /// or your producer generates millions over millions of items, then you might be interested in
212 /// the [bounded](`Self::create_pool_bounded()`) variant of this method.
213 ///
214 /// # Example
215 ///
216 /// Assuming that `heavy_algorithm()` converts an integer to an object that can be used with
217 /// the shared consumer function, a pool creation might look like this:
218 ///
219 /// ```rust
220 /// # use std::error::Error;
221 /// # use std::sync::mpsc::Sender;
222 /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
223 /// #
224 /// # fn heavy_algorithm(i: i32) -> i32 { i }
225 /// #
226 /// # fn example() {
227 /// # let pool_builder = SharedResourcePoolBuilder::new(true, |res, item|());
228 /// pool_builder.create_pool(
229 /// |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
230 /// |i| heavy_algorithm(i),
231 /// );
232 /// # }
233 /// ```
234 pub fn create_pool<P, PArg, C>(&self, producer_fn: P, consumer_fn: C) -> JobHandle
235 where
236 P: Fn(Sender<PArg>) -> () + Send + 'static,
237 PArg: Send + 'static,
238 C: Fn(PArg) -> Arg + Clone + Send + Sync + 'static,
239 {
240 let (tx, rx) = channel::<PArg>();
241 self.init_pool(tx, rx, producer_fn, consumer_fn)
242 }
243
244 /// Creates a bounded producer-consumer pool.
245 ///
246 /// This method is nearly identical to its [unbounded](`Self::create_pool()`) variant. For a
247 /// detailled description of the producer and consumer parameters, please look there.
248 ///
249 /// The difference is, that this function takes an additional integer parameter, which limits
250 /// the size of the message channel, that is used by the producer and consumer (in other words,
251 /// it uses [`sync_channel()`]). If the channel is full, the producer will block until the
252 /// consumer takes at least one item. This can be useful if you expect a huge amount of items
253 /// to be generated, or if your system is low on memory.
254 ///
255 /// # Example
256 ///
257 /// Assuming that `crazy_producer()` generates a huge amount of integer values, and that
258 /// `heavy_algorithm()` converts an integer to an object that can be used with the shared
259 /// consumer function, a bounded pool creation with a buffer of 100 items might look like this:
260 ///
261 /// ```rust
262 /// # use std::error::Error;
263 /// # use std::sync::mpsc::Sender;
264 /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
265 /// #
266 /// # fn crazy_producer() -> Vec<i32> { vec![] }
267 /// # fn heavy_algorithm(i: i32) -> i32 { i }
268 /// #
269 /// # fn example() {
270 /// # let pool_builder = SharedResourcePoolBuilder::new(true, |res, item|());
271 /// pool_builder.create_pool_bounded(
272 /// 100,
273 /// |tx| crazy_producer().into_iter().for_each(|i| tx.send(i).unwrap()),
274 /// |i| heavy_algorithm(i),
275 /// );
276 /// # }
277 /// ```
278 pub fn create_pool_bounded<P, PArg, C>(
279 &self,
280 bound: usize,
281 producer_fn: P,
282 consumer_fn: C,
283 ) -> JobHandle
284 where
285 P: Fn(SyncSender<PArg>) -> () + Send + 'static,
286 PArg: Send + 'static,
287 C: Fn(PArg) -> Arg + Clone + Send + Sync + 'static,
288 {
289 let (tx, rx) = sync_channel::<PArg>(bound);
290 self.init_pool(tx, rx, producer_fn, consumer_fn)
291 }
292
293 fn init_pool<P, PArg, PType, C>(
294 &self,
295 tx: PType,
296 rx: Receiver<PArg>,
297 producer_fn: P,
298 consumer_fn: C,
299 ) -> JobHandle
300 where
301 PType: SenderLike<Item = PArg> + Send + 'static,
302 P: Fn(PType) -> () + Send + 'static,
303 PArg: Send + 'static,
304 C: Fn(PArg) -> Arg + Clone + Send + Sync + 'static,
305 {
306 let producer_thread = thread::spawn(move || producer_fn(tx));
307
308 let job_counter = Arc::new((Mutex::new(0), Condvar::new()));
309
310 let consumer_thread = {
311 let pool = self.consumer_pool.clone();
312 let job_counter = Arc::clone(&job_counter);
313 let tx = self.tx.clone();
314 thread::spawn(move || {
315 for item in rx {
316 let tx = tx.clone();
317 let consumer_fn = consumer_fn.clone();
318 let job_counter = JobCounter::new(Arc::clone(&job_counter));
319 pool.execute(move || {
320 let job = Job(consumer_fn(item), job_counter);
321 tx.send(job).unwrap();
322 });
323 }
324 })
325 };
326
327 let join_handle = thread::spawn(move || {
328 producer_thread.join().unwrap();
329 consumer_thread.join().unwrap();
330 });
331
332 JobHandle::new(join_handle, job_counter)
333 }
334
335 /// Send one single item to the shared consumer.
336 ///
337 /// This method can be used to communicate with the shared resource, before creating another
338 /// pool. Maybe you just want to add another item to the shared consumer. Or you want to use
339 /// this method to trigger an alternate bevahior of the shared consumer.
340 ///
341 /// Internally, this method behaves much like the [`Self::create_pool()`] method. It returns a
342 /// [`JobHandle`], which can be waited on with [`JobHandle::join()`]. The item is sent to the
343 /// message queue of the pool builder and might not be consumed immediately if there are other
344 /// items waiting.
345 ///
346 /// # Examples
347 ///
348 /// ## Add another item
349 ///
350 /// In the following example, in addition to summing up 0, 1, and 2, as well as 4, 5, and 6, we
351 /// add a single 3 to the result:
352 ///
353 /// ```rust
354 /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
355 /// #
356 /// let pool_builder = SharedResourcePoolBuilder::new(0, |sum, i| *sum += i);
357 ///
358 /// pool_builder.create_pool(
359 /// |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(i).unwrap()),
360 /// |i| i,
361 /// );
362 ///
363 /// pool_builder.oneshot(3);
364 ///
365 /// pool_builder.create_pool(
366 /// |tx| vec![4, 5, 6].into_iter().for_each(|i| tx.send(i).unwrap()),
367 /// |i| i,
368 /// );
369 ///
370 /// let result = pool_builder.join().unwrap();
371 ///
372 /// assert_eq!(result, 21);
373 /// ```
374 ///
375 /// ## Trigger alternate behavior
376 ///
377 /// In the following example, we change the behavior of the shared consumer in the middle of
378 /// the execution:
379 ///
380 /// ```rust
381 /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
382 /// #
383 /// let pool_builder = SharedResourcePoolBuilder::new(
384 /// // Here we define our shared resource to be a tuple. The second element will determine
385 /// // our current behavior and can be altered in the shared consumer function.
386 /// (0, true),
387 ///
388 /// // The whole tuple must be used in the shared consumer. Add or subtract the received
389 /// // value `i`, depending on the state of `adding`. If `None` is received, swap the state
390 /// // variable, so that upcoming values will be handled by the alternate behavior.
391 /// |(sum, adding), i| {
392 /// match i {
393 /// Some(i) => {
394 /// if *adding {
395 /// *sum += i;
396 /// } else {
397 /// *sum -= i;
398 /// }
399 /// }
400 /// None => {
401 /// *adding = !*adding;
402 /// }
403 /// }
404 /// },
405 /// );
406 ///
407 /// // First we add some numbers
408 /// pool_builder.create_pool(
409 /// |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(Some(i)).unwrap()),
410 /// |i| i,
411 /// ).join().unwrap();
412 ///
413 /// // Swap summing behavior
414 /// pool_builder.oneshot(None).join().unwrap();
415 ///
416 /// // Now we subtract the same numbers again, with the same code as before
417 /// pool_builder.create_pool(
418 /// |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(Some(i)).unwrap()),
419 /// |i| i,
420 /// );
421 ///
422 /// // Remember that we defined a boolean in our shared resource tuple. We are not interested
423 /// // in this one anymore, so we explicitly ignore it here.
424 /// let (result, _) = pool_builder.join().unwrap();
425 ///
426 /// // Since we subtracted the same amount as we added, the result is now 0
427 /// assert_eq!(result, 0);
428 ///
429 /// // Note that we joined the first pool and the oneshot to await them to be finished. In this
430 /// // simple example, it would normally not be necessary, since sending merely three items
431 /// // happens quite immediately and the shared consumer handles all items sequentially.
432 /// // Nevertheless, there could be a race condition if for some reason the swapping is
433 /// // triggered before all items could be added. In more complex scenarios, care should be
434 /// // taken to await all previous pools with `join()` before altering the shared consumer
435 /// // behavior.
436 /// ```
437 pub fn oneshot(&self, item: Arg) -> JobHandle {
438 let job_counter = Arc::new((Mutex::new(0), Condvar::new()));
439
440 let oneshot_thread = {
441 let pool = self.consumer_pool.clone();
442 let job_counter = Arc::clone(&job_counter);
443 let tx = self.tx.clone();
444 thread::spawn(move || {
445 let job_counter = JobCounter::new(job_counter);
446 pool.execute(move || {
447 let job = Job(item, job_counter);
448 tx.send(job).unwrap();
449 });
450 })
451 };
452
453 JobHandle::new(oneshot_thread, job_counter)
454 }
455
456 /// Waits for all tasks to finish their work and return the shared resource afterwards.
457 ///
458 /// This implicitly waits for all created pools to be finished, since their jobs all end up in
459 /// the shared consumer. Note that after this call, this pool builder can no longer be used,
460 /// since the shared resource will be moved to the caller.
461 ///
462 /// This method returns a [`thread::Result`]. Since the `Err` variant of this specialized
463 /// `Result` does not implement the [`Error`](std::error::Error) trait, the `?`-operator does
464 /// not work here. For more information about how to handle panics in this case, please refer
465 /// to the documentation of [`thread::Result`].
466 ///
467 /// # Example:
468 ///
469 /// ```rust
470 /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
471 /// #
472 /// # fn shared_consumer_fn(vec: &mut Vec<i32>, i: i32) {};
473 /// # fn heavy_algorithm(i: i32) -> i32 { i }
474 /// #
475 /// # fn example() {
476 /// let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), shared_consumer_fn);
477 /// pool_builder.create_pool(
478 /// |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
479 /// |i| heavy_algorithm(i),
480 /// );
481 ///
482 /// let result = pool_builder.join().unwrap();
483 /// # }
484 /// ```
485 pub fn join(self) -> thread::Result<SR> {
486 drop(self.tx);
487 self.handler_thread.join()
488 }
489}
490
491impl<Arg, SR> SharedResourcePoolBuilder<Sender<Job<Arg>>, SR>
492where
493 Arg: Send + 'static,
494 SR: Send + 'static,
495{
496 /// Creates a new unbounded pool builder.
497 ///
498 /// This function takes two parameters:
499 ///
500 /// - The shared resource
501 ///
502 /// This will be the resource that is used with every item that gets sent from the pool
503 /// consumer functions. Usually this will be something that cannot be easily shared with
504 /// multiple threads, like a database connection object. But this is not a restriction, you
505 /// can use pretty much everything as the shared resource.
506 ///
507 /// - The shared consumer function
508 ///
509 /// This function is called for each item that comes from the pool consumers, together with
510 /// the shared resource. To give you the maximum flexibility, the function must take two
511 /// parameters: The shared resource and an item. The type of the item will be always the
512 /// same for every pool consumer you create with this builder.
513 ///
514 /// Note, that the message channel, which is used by the pool consumers to communicate with the
515 /// shared consumer, is unlimited (in other words, it uses [`channel()`]). On modern systems
516 /// and "typical" use-cases, this should not be a problem. If however your system is low on
517 /// memory or your producers generate millions over millions of items, then you might be
518 /// interested in the [bounded](`Self::new_bounded()`) variant of this method.
519 ///
520 /// # Example
521 ///
522 /// ```rust
523 /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
524 /// #
525 /// let pool_builder = SharedResourcePoolBuilder::new(
526 /// // We can use pretty much anything as the shared resource. Here, we are using a Vector:
527 /// Vec::new(),
528 ///
529 /// // Naturally, our shared consumer will add an item to the Vector. The type of "i" can
530 /// // either be given explicitly, or simply let Rust infer it.
531 /// |vec, i| vec.push(i)
532 /// );
533 ///
534 /// // Now we create a (very simple) pool for demonstration purpose. Since we return "i"
535 /// // unaltered, the parameter type of the shared consumer as well as the item type of the
536 /// // shared Vector will be inferred to "u8".
537 /// pool_builder.create_pool(
538 /// |tx| tx.send(1u8).unwrap(),
539 /// |i| i,
540 /// );
541 /// ```
542 pub fn new<SC>(shared_resource: SR, shared_consumer_fn: SC) -> Self
543 where
544 SC: FnMut(&mut SR, Arg) -> () + Send + Sync + 'static,
545 {
546 let (tx, rx) = channel();
547 Self::init(tx, rx, shared_resource, shared_consumer_fn)
548 }
549}
550
551impl<Arg, SR> SharedResourcePoolBuilder<SyncSender<Job<Arg>>, SR>
552where
553 Arg: Send + 'static,
554 SR: Send + 'static,
555{
556 /// Creates a new bounded pool builder.
557 ///
558 /// This method is nearly identical to its [unbounded](`Self::new()`) variant. For a detailled
559 /// description of the shared resource and shared consumer parameters, please look there.
560 ///
561 /// The difference is, that this function takes an additional integer parameter, which limits
562 /// the size of the message channel, that is used by the pool consumers to communicate with the
563 /// shared consumer (in other words, it uses [`sync_channel()`]). If the channel is full, the
564 /// pool consumers will block until the shared consumer takes at least one item. This can be
565 /// useful if you expect a huge amount of items to be generated, or if your system is low on
566 /// memory.
567 ///
568 /// # Example
569 ///
570 /// ```rust
571 /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
572 /// #
573 /// let pool_builder = SharedResourcePoolBuilder::new_bounded(
574 /// // We are very low on memory
575 /// 10,
576 ///
577 /// // We can use pretty much anything as the shared resource. Here, we are using a Vector:
578 /// Vec::new(),
579 ///
580 /// // Naturally, our shared consumer will add an item to the Vector. The type of "i" can
581 /// // either be given explicitly, or simply let Rust infer it.
582 /// |vec, i| vec.push(i)
583 /// );
584 ///
585 /// // Now we create a (very simple) pool for demonstration purpose. Since we return "i"
586 /// // unaltered, the parameter type of the shared consumer as well as the item type of the
587 /// // shared Vector will be inferred to "u8".
588 /// pool_builder.create_pool(
589 /// |tx| tx.send(1u8).unwrap(),
590 /// |i| i,
591 /// );
592 ///
593 /// // No matter how many pools we create, the maximum buffered items will never exceed 10.
594 /// ```
595 pub fn new_bounded<SC>(bound: usize, shared_resource: SR, shared_consumer_fn: SC) -> Self
596 where
597 SC: FnMut(&mut SR, Arg) -> () + Send + Sync + 'static,
598 {
599 let (tx, rx) = sync_channel(bound);
600 Self::init(tx, rx, shared_resource, shared_consumer_fn)
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use std::time::{Duration, Instant};
607
608 use super::*;
609
610 struct TestResource(Vec<TestElem>);
611
612 impl TestResource {
613 fn add(&mut self, elem: TestElem) {
614 self.0.push(elem);
615 }
616 }
617
618 struct TestElem(String);
619
620 #[test]
621 fn test_one_integer() {
622 let consumer_fn = |i| i * 10;
623
624 let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
625 pool_builder.create_pool(
626 |tx| vec![1].into_iter().for_each(|i| tx.send(i).unwrap()),
627 consumer_fn,
628 );
629
630 let result = {
631 let mut result = pool_builder.join().unwrap();
632 result.sort();
633 result
634 };
635
636 assert_eq!(result, vec![10]);
637 }
638
639 #[test]
640 fn test_few_integers() {
641 let consumer_fn = |i| i * 10;
642
643 let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
644 pool_builder.create_pool(|tx| (0..5).for_each(|i| tx.send(i).unwrap()), consumer_fn);
645
646 let result = {
647 let mut result = pool_builder.join().unwrap();
648 result.sort();
649 result
650 };
651
652 assert_eq!(result, (0..5).map(consumer_fn).collect::<Vec<_>>());
653 }
654
655 #[test]
656 fn test_few_integers_with_oneshot() {
657 let consumer_fn = |i| i * 10;
658
659 let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
660 pool_builder.create_pool(|tx| (0..5).for_each(|i| tx.send(i).unwrap()), consumer_fn);
661 pool_builder.oneshot(consumer_fn(5));
662
663 let result = {
664 let mut result = pool_builder.join().unwrap();
665 result.sort();
666 result
667 };
668
669 assert_eq!(result, (0..6).map(consumer_fn).collect::<Vec<_>>());
670 }
671
672 #[test]
673 fn test_few_integers_with_delayed_producer() {
674 let consumer_fn = |i| i * 10;
675
676 let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
677 pool_builder.create_pool(
678 |tx| {
679 thread::sleep(Duration::from_millis(10));
680 (0..5).for_each(|i| tx.send(i).unwrap());
681 },
682 consumer_fn,
683 );
684
685 let result = {
686 let mut result = pool_builder.join().unwrap();
687 result.sort();
688 result
689 };
690
691 assert_eq!(result, (0..5).map(consumer_fn).collect::<Vec<_>>());
692 }
693
694 #[test]
695 fn test_many_integers_with_bounded_shared_producer() {
696 let consumer_fn = |i| i * 10;
697
698 let pool_builder =
699 SharedResourcePoolBuilder::new_bounded(10, Vec::new(), |vec, i| vec.push(i));
700 pool_builder.create_pool(
701 |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
702 consumer_fn,
703 );
704
705 let result = {
706 let mut result = pool_builder.join().unwrap();
707 result.sort();
708 result
709 };
710
711 assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
712 }
713
714 #[test]
715 fn test_many_integers_with_bounded_item_producer() {
716 let consumer_fn = |i| i * 10;
717
718 let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
719 pool_builder.create_pool_bounded(
720 10,
721 |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
722 consumer_fn,
723 );
724
725 let result = {
726 let mut result = pool_builder.join().unwrap();
727 result.sort();
728 result
729 };
730
731 assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
732 }
733
734 #[test]
735 fn test_many_integers_with_bounded_shared_and_item_producer() {
736 let consumer_fn = |i| i * 10;
737
738 let pool_builder =
739 SharedResourcePoolBuilder::new_bounded(10, Vec::new(), |vec, i| vec.push(i));
740 pool_builder.create_pool_bounded(
741 10,
742 |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
743 consumer_fn,
744 );
745
746 let result = {
747 let mut result = pool_builder.join().unwrap();
748 result.sort();
749 result
750 };
751
752 assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
753 }
754
755 #[test]
756 fn test_many_integers() {
757 let consumer_fn = |i| i * 10;
758
759 let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
760 pool_builder.create_pool(
761 |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
762 consumer_fn,
763 );
764
765 let result = {
766 let mut result = pool_builder.join().unwrap();
767 result.sort();
768 result
769 };
770
771 assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
772 }
773
774 #[test]
775 fn test_wait_until_pool_finishes() {
776 let consumer_fn = |i| i * 10;
777
778 let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
779 let pool = pool_builder.create_pool(
780 |tx| tx.send(1).unwrap(),
781 move |parg| {
782 thread::sleep(Duration::from_millis(10));
783 consumer_fn(parg)
784 },
785 );
786
787 pool.join().unwrap();
788
789 let now = Instant::now();
790
791 let result = {
792 let mut result = pool_builder.join().unwrap();
793 result.sort();
794 result
795 };
796
797 let millis = now.elapsed().as_millis();
798 assert!(millis < 10);
799
800 assert_eq!(result, vec![10]);
801 }
802
803 #[test]
804 fn test_integers_multiple_pools_parallel() {
805 let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
806 let pools = vec![
807 pool_builder.create_pool(|tx| (0..2).for_each(|i| tx.send(i).unwrap()), |i| i * 10),
808 pool_builder.create_pool(|tx| (2..5).for_each(|i| tx.send(i).unwrap()), |i| i * 10),
809 ];
810
811 for pool in pools {
812 pool.join().unwrap();
813 }
814
815 let result = {
816 let mut result = pool_builder.join().unwrap();
817 result.sort();
818 result
819 };
820
821 assert_eq!(result, vec![0, 10, 20, 30, 40])
822 }
823
824 #[test]
825 fn test_integers_multiple_pools_sequential() {
826 let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
827
828 let pool =
829 pool_builder.create_pool(|tx| (2..5).for_each(|i| tx.send(i).unwrap()), |i| i * 10);
830
831 pool.join().unwrap();
832
833 let pool =
834 pool_builder.create_pool(|tx| (0..2).for_each(|i| tx.send(i).unwrap()), |i| i * 10);
835
836 pool.join().unwrap();
837
838 let result = {
839 let mut result = pool_builder.join().unwrap();
840 result.sort();
841 result
842 };
843
844 assert_eq!(result, vec![0, 10, 20, 30, 40])
845 }
846
847 #[test]
848 fn test_integers_multiple_pools_with_parallel_producers() {
849 let producer_pool = threadpool::Builder::new().build();
850
851 let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
852 let pools = vec![
853 {
854 let producer_pool = producer_pool.clone();
855 pool_builder.create_pool(
856 move |tx| {
857 (0..50).into_iter().for_each(|i| {
858 let tx = tx.clone();
859 producer_pool.execute(move || tx.send(i).unwrap())
860 });
861 },
862 |i| i * 10,
863 )
864 },
865 {
866 let producer_pool = producer_pool.clone();
867 pool_builder.create_pool(
868 move |tx| {
869 (50..100).into_iter().for_each(|i| {
870 let tx = tx.clone();
871 producer_pool.execute(move || tx.send(i).unwrap())
872 });
873 },
874 |i| i * 10,
875 )
876 },
877 ];
878
879 for pool in pools {
880 pool.join().unwrap();
881 }
882
883 let result = {
884 let mut result = pool_builder.join().unwrap();
885 result.sort();
886 result
887 };
888
889 assert_eq!(
890 result,
891 (0..100).into_iter().map(|i| i * 10).collect::<Vec<_>>()
892 )
893 }
894
895 #[test]
896 fn test_custom_types() {
897 let producer_fn = || ('a'..='z').map(|c| String::from(c));
898
899 let pool_builder =
900 SharedResourcePoolBuilder::new(TestResource(Vec::new()), |tres, e| tres.add(e));
901 pool_builder.create_pool(
902 move |tx| producer_fn().for_each(|i| tx.send(i).unwrap()),
903 |c| TestElem(c),
904 );
905
906 let result = {
907 let mut result = pool_builder
908 .join()
909 .unwrap()
910 .0
911 .iter()
912 .map(|e| e.0.clone())
913 .collect::<Vec<_>>();
914 result.sort();
915 result
916 };
917
918 assert_eq!(result, producer_fn().collect::<Vec<_>>());
919 }
920}