shared_resource_pool_builder/
lib.rs

1//! # Shared Resource Pool Builder
2//!
3//! A producer-consumer thread pool builder that will eventually consume all items
4//! sequentially with a shared resource.
5//!
6//! ## Code example
7//!
8//! The purpose of this library is best described with an example, so let's get
9//! straight into it:
10//!
11//! ```rust
12//! use shared_resource_pool_builder::SharedResourcePoolBuilder;
13//!
14//! // Define a pool builder with a Vector as shared resource, and an action that
15//! // should be executed sequentially over all items. Note that the second
16//! // parameter of this function is a shared consumer function that takes two
17//! // parameters: The shared resource and the items it will be used on. These
18//! // items will be returned from the pool consumers.
19//! let pool_builder = SharedResourcePoolBuilder::new(
20//!     Vec::new(),
21//!     |vec, i| vec.push(i)
22//! );
23//!
24//! // Create a producer-consumer thread pool that does some work. In this case,
25//! // send the numbers 0 to 2 to the consumer. There, they will be multiplied by
26//! // 10. Note that the first parameter is a producer function that takes a
27//! // Sender like object, like [`std::sync::mpsc::Sender`]. Every item that gets
28//! // sent will be processed by the consumer, which is the second parameter to
29//! // this function. This function expects an object of the same type that gets
30//! // sent by the producer. The consumer is expected to return an object of the
31//! // type that the shared consumer takes as the second parameter and will
32//! // ultimately used on the shared resource.
33//! let pool_1 = pool_builder
34//!     .create_pool(
35//!         |tx| (0..=2).for_each(|i| tx.send(i).unwrap()),
36//!         |i| i * 10
37//!     );
38//!
39//! // Create a second pool. Here, the numbers 3 to 5 will be produced, multiplied
40//! // by 2, and being processed by the shared consumer function.
41//! let pool_2 = pool_builder
42//!     .create_pool(
43//!         |tx| (3..=5).for_each(|i| tx.send(i).unwrap()),
44//!         |i| i * 2
45//!     );
46//!
47//! // Wait for a specific pool to finish before continuing ...
48//! pool_1.join().unwrap();
49//!
50//! // ... or wait for all pools and the shared consumer to finish their work at
51//! // once and return the shared resource. Afterwards, the pool builder can no
52//! // longer be used, since the shared resource gets moved.
53//! let result = {
54//!     let mut result = pool_builder.join().unwrap();
55//!
56//!     // By default, the pool consumers run in as many threads as there are
57//!     // cores in the machine, so the result may not be in the same order as
58//!     // they were programmed.
59//!     result.sort();
60//!     result
61//! };
62//!
63//! assert_eq!(result, vec![0, 6, 8, 10, 10, 20]);
64//! ```
65//!
66//! ## Motivation
67//!
68//! Imagine you have a huge list of items that need to be processed. Every item
69//! can be processed independently, so the natural thing to do is using a thread
70//! pool to parallelize the work. But now imagine that the resulting items need to
71//! be used together with a resource that does not work on multiple threads. The
72//! resulting items will have to be processed sequentially, so the resource does
73//! not have to be shared with multiple threads.
74//!
75//! As a more concrete example, imagine a program that uses a SQLite database for
76//! storing computational intensive results. Before doing the actual work, you
77//! want to remove obsolete results from the database. Then you need to collect
78//! all new data that need to get processed. At last, you want all new data to be
79//! actually processed.
80//!
81//! Normally, this involves three sequential steps: Cleaning, collecting, and
82//! processing. These steps need to be sequential because writing operations on
83//! the same SQLite database can only be done by one thread at a time. You might
84//! want to implement synchronization, so that one thread writes and all
85//! additional threads are blocked and wait for their turn. But this can be error
86//! prone if not done correctly. You will encounter random panics of locked or
87//! busy databases. Furthermore, you lose the ability to use prepared statements
88//! efficiently.
89//!
90//! Enter `shared-resource-pool-builder`. With this, you have the ability to
91//! define a *shared resource* and a *shared consumer function*. From this builder
92//! you can create producer-consumer pools. The consumed items will then be
93//! sequentially given to the shared consumer function to apply the result to the
94//! shared resource.
95//!
96//! The pool consumers will all run within a shared thread pool, so you need not
97//! to worry about thrashing your system. You can even limit the number of items
98//! that get processed in parallel.
99//!
100//! To stay with the database example, this means you can define a pool builder
101//! with the database connection as the shared resource and a shared consumer
102//! function that will remove, insert, or update items. You can do that with an
103//! enum, for example.
104//!
105//! Then, for the cleaning step, you create a pool that produces all items from
106//! the database and consumes them by returning an object that causes the item to
107//! be either deleted (if obsolete) or ignored by the shared consumer.
108//!
109//! Next, you can create a pool that produces all new items and consumes them by
110//! creating an insert action with each item.
111//!
112//! Depending on the database layout, these two pools can even be run in parallel.
113//! Thanks to the shared consumer, only one delete or insert will run at the same
114//! time.
115//!
116//! Again, depending on the layout, you just need to wait for the inserting pool
117//! to finish and create yet another pool that produces all new items that needs
118//! to be processed, and consumes them by doing the actual work and returning an
119//! appropriate update object for the shared consumer. In the meanwhile, the
120//! cleaning pool might as well continue its work.
121//!
122//! Eventually, you wait for all pools to finish before terminating the program.
123//!
124//! This way, you do not need to worry about synchronizing multiple writer threads
125//! or multiple open database connections. Just split the work so the
126//! computational intensive tasks get done in the pool consumers and make the
127//! shared consumer just do the finalization.
128
129use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender};
130use std::sync::{Arc, Condvar, Mutex};
131use std::thread;
132use std::thread::JoinHandle;
133
134use threadpool::ThreadPool;
135
136use crate::job::JobCounter;
137pub use crate::job::{Job, JobHandle};
138use crate::senderlike::SenderLike;
139
140mod job;
141mod senderlike;
142
143/// The builder to create pools on a shared resource.
144pub struct SharedResourcePoolBuilder<SType, SR> {
145    tx: SType,
146    handler_thread: JoinHandle<SR>,
147    consumer_pool: ThreadPool,
148}
149
150impl<SType, Arg, SR> SharedResourcePoolBuilder<SType, SR>
151where
152    Arg: Send + 'static,
153    SType: SenderLike<Item = Job<Arg>> + Clone + Send + 'static,
154    SR: Send + 'static,
155{
156    fn init<SC>(
157        tx: SType,
158        rx: Receiver<Job<Arg>>,
159        mut shared_resource: SR,
160        mut shared_consumer_fn: SC,
161    ) -> Self
162    where
163        SC: FnMut(&mut SR, Arg) -> () + Send + Sync + 'static,
164    {
165        let join_handle = thread::spawn(move || {
166            for job in rx {
167                shared_consumer_fn(&mut shared_resource, job.0);
168            }
169            shared_resource
170        });
171
172        let consumer_pool = threadpool::Builder::new().build();
173
174        Self {
175            tx,
176            handler_thread: join_handle,
177            consumer_pool,
178        }
179    }
180
181    /// Creates an unbounded producer-consumer pool.
182    ///
183    /// This method takes two parameters:
184    ///
185    /// -   The producer function
186    ///
187    ///     This function will generate the items to be further processed by the consumer. It must
188    ///     take one parameter, which is a sender like object like [`Sender`]. To generate an item
189    ///     for the consumer, use the `send()` method of the sender.
190    ///
191    ///     The return type of this function will be ignored, i.e. it is `()`.
192    ///
193    ///     By design, this function will run in only one thread. If you want it to run in multiple
194    ///     threads, you need to implement that by yourself. The preferred way is to run only a
195    ///     cheap listing algorithm in the producer and let the consumer do the computational
196    ///     intensive tasks, which will by default run within a thread pool.
197    ///
198    /// -   The consumer function
199    ///
200    ///     This function takes one parameter and will be called for each item that was sent by the
201    ///     producer. The return type must be the same type that is used by the shared consumer as
202    ///     defined by the [`Self::new()`] method.
203    ///
204    ///     By design, the consumer functions will be run within a thread pool that is shared with
205    ///     every other pool consumers created by this builder. The number of threads is the number
206    ///     of cores, as defined by the default value of [`threadpool::Builder::num_threads()`].
207    ///
208    /// The message channel, in which the producer sends and the consumer receives the items, is
209    /// unlimited with this function (in other words, it uses [`channel()`]). On modern systems and
210    /// "typical" use-cases, this should not be a problem. If however your system is low on memory
211    /// or your producer generates millions over millions of items, then you might be interested in
212    /// the [bounded](`Self::create_pool_bounded()`) variant of this method.
213    ///
214    /// # Example
215    ///
216    /// Assuming that `heavy_algorithm()` converts an integer to an object that can be used with
217    /// the shared consumer function, a pool creation might look like this:
218    ///
219    /// ```rust
220    /// # use std::error::Error;
221    /// # use std::sync::mpsc::Sender;
222    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
223    /// #
224    /// # fn heavy_algorithm(i: i32) -> i32 { i }
225    /// #
226    /// # fn example() {
227    /// # let pool_builder = SharedResourcePoolBuilder::new(true, |res, item|());
228    /// pool_builder.create_pool(
229    ///     |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
230    ///     |i| heavy_algorithm(i),
231    /// );
232    /// # }
233    /// ```
234    pub fn create_pool<P, PArg, C>(&self, producer_fn: P, consumer_fn: C) -> JobHandle
235    where
236        P: Fn(Sender<PArg>) -> () + Send + 'static,
237        PArg: Send + 'static,
238        C: Fn(PArg) -> Arg + Clone + Send + Sync + 'static,
239    {
240        let (tx, rx) = channel::<PArg>();
241        self.init_pool(tx, rx, producer_fn, consumer_fn)
242    }
243
244    /// Creates a bounded producer-consumer pool.
245    ///
246    /// This method is nearly identical to its [unbounded](`Self::create_pool()`) variant. For a
247    /// detailled description of the producer and consumer parameters, please look there.
248    ///
249    /// The difference is, that this function takes an additional integer parameter, which limits
250    /// the size of the message channel, that is used by the producer and consumer (in other words,
251    /// it uses [`sync_channel()`]). If the channel is full, the producer will block until the
252    /// consumer takes at least one item. This can be useful if you expect a huge amount of items
253    /// to be generated, or if your system is low on memory.
254    ///
255    /// # Example
256    ///
257    /// Assuming that `crazy_producer()` generates a huge amount of integer values, and that
258    /// `heavy_algorithm()` converts an integer to an object that can be used with the shared
259    /// consumer function, a bounded pool creation with a buffer of 100 items might look like this:
260    ///
261    /// ```rust
262    /// # use std::error::Error;
263    /// # use std::sync::mpsc::Sender;
264    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
265    /// #
266    /// # fn crazy_producer() -> Vec<i32> { vec![] }
267    /// # fn heavy_algorithm(i: i32) -> i32 { i }
268    /// #
269    /// # fn example() {
270    /// # let pool_builder = SharedResourcePoolBuilder::new(true, |res, item|());
271    /// pool_builder.create_pool_bounded(
272    ///     100,
273    ///     |tx| crazy_producer().into_iter().for_each(|i| tx.send(i).unwrap()),
274    ///     |i| heavy_algorithm(i),
275    /// );
276    /// # }
277    /// ```
278    pub fn create_pool_bounded<P, PArg, C>(
279        &self,
280        bound: usize,
281        producer_fn: P,
282        consumer_fn: C,
283    ) -> JobHandle
284    where
285        P: Fn(SyncSender<PArg>) -> () + Send + 'static,
286        PArg: Send + 'static,
287        C: Fn(PArg) -> Arg + Clone + Send + Sync + 'static,
288    {
289        let (tx, rx) = sync_channel::<PArg>(bound);
290        self.init_pool(tx, rx, producer_fn, consumer_fn)
291    }
292
293    fn init_pool<P, PArg, PType, C>(
294        &self,
295        tx: PType,
296        rx: Receiver<PArg>,
297        producer_fn: P,
298        consumer_fn: C,
299    ) -> JobHandle
300    where
301        PType: SenderLike<Item = PArg> + Send + 'static,
302        P: Fn(PType) -> () + Send + 'static,
303        PArg: Send + 'static,
304        C: Fn(PArg) -> Arg + Clone + Send + Sync + 'static,
305    {
306        let producer_thread = thread::spawn(move || producer_fn(tx));
307
308        let job_counter = Arc::new((Mutex::new(0), Condvar::new()));
309
310        let consumer_thread = {
311            let pool = self.consumer_pool.clone();
312            let job_counter = Arc::clone(&job_counter);
313            let tx = self.tx.clone();
314            thread::spawn(move || {
315                for item in rx {
316                    let tx = tx.clone();
317                    let consumer_fn = consumer_fn.clone();
318                    let job_counter = JobCounter::new(Arc::clone(&job_counter));
319                    pool.execute(move || {
320                        let job = Job(consumer_fn(item), job_counter);
321                        tx.send(job).unwrap();
322                    });
323                }
324            })
325        };
326
327        let join_handle = thread::spawn(move || {
328            producer_thread.join().unwrap();
329            consumer_thread.join().unwrap();
330        });
331
332        JobHandle::new(join_handle, job_counter)
333    }
334
335    /// Send one single item to the shared consumer.
336    ///
337    /// This method can be used to communicate with the shared resource, before creating another
338    /// pool. Maybe you just want to add another item to the shared consumer. Or you want to use
339    /// this method to trigger an alternate bevahior of the shared consumer.
340    ///
341    /// Internally, this method behaves much like the [`Self::create_pool()`] method. It returns a
342    /// [`JobHandle`], which can be waited on with [`JobHandle::join()`]. The item is sent to the
343    /// message queue of the pool builder and might not be consumed immediately if there are other
344    /// items waiting.
345    ///
346    /// # Examples
347    ///
348    /// ## Add another item
349    ///
350    /// In the following example, in addition to summing up 0, 1, and 2, as well as 4, 5, and 6, we
351    /// add a single 3 to the result:
352    ///
353    /// ```rust
354    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
355    /// #
356    /// let pool_builder = SharedResourcePoolBuilder::new(0, |sum, i| *sum += i);
357    ///
358    /// pool_builder.create_pool(
359    ///     |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(i).unwrap()),
360    ///     |i| i,
361    /// );
362    ///
363    /// pool_builder.oneshot(3);
364    ///
365    /// pool_builder.create_pool(
366    ///     |tx| vec![4, 5, 6].into_iter().for_each(|i| tx.send(i).unwrap()),
367    ///     |i| i,
368    /// );
369    ///
370    /// let result = pool_builder.join().unwrap();
371    ///
372    /// assert_eq!(result, 21);
373    /// ```
374    ///
375    /// ## Trigger alternate behavior
376    ///
377    /// In the following example, we change the behavior of the shared consumer in the middle of
378    /// the execution:
379    ///
380    /// ```rust
381    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
382    /// #
383    /// let pool_builder = SharedResourcePoolBuilder::new(
384    ///     // Here we define our shared resource to be a tuple. The second element will determine
385    ///     // our current behavior and can be altered in the shared consumer function.
386    ///     (0, true),
387    ///
388    ///     // The whole tuple must be used in the shared consumer. Add or subtract the received
389    ///     // value `i`, depending on the state of `adding`. If `None` is received, swap the state
390    ///     // variable, so that upcoming values will be handled by the alternate behavior.
391    ///     |(sum, adding), i| {
392    ///         match i {
393    ///             Some(i) => {
394    ///                 if *adding {
395    ///                     *sum += i;
396    ///                 } else {
397    ///                     *sum -= i;
398    ///                 }
399    ///             }
400    ///             None => {
401    ///                 *adding = !*adding;
402    ///             }
403    ///         }
404    ///     },
405    /// );
406    ///
407    /// // First we add some numbers
408    /// pool_builder.create_pool(
409    ///     |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(Some(i)).unwrap()),
410    ///     |i| i,
411    /// ).join().unwrap();
412    ///
413    /// // Swap summing behavior
414    /// pool_builder.oneshot(None).join().unwrap();
415    ///
416    /// // Now we subtract the same numbers again, with the same code as before
417    /// pool_builder.create_pool(
418    ///     |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(Some(i)).unwrap()),
419    ///     |i| i,
420    /// );
421    ///
422    /// // Remember that we defined a boolean in our shared resource tuple. We are not interested
423    /// // in this one anymore, so we explicitly ignore it here.
424    /// let (result, _) = pool_builder.join().unwrap();
425    ///
426    /// // Since we subtracted the same amount as we added, the result is now 0
427    /// assert_eq!(result, 0);
428    ///
429    /// // Note that we joined the first pool and the oneshot to await them to be finished. In this
430    /// // simple example, it would normally not be necessary, since sending merely three items
431    /// // happens quite immediately and the shared consumer handles all items sequentially.
432    /// // Nevertheless, there could be a race condition if for some reason the swapping is
433    /// // triggered before all items could be added. In more complex scenarios, care should be
434    /// // taken to await all previous pools with `join()` before altering the shared consumer
435    /// // behavior.
436    /// ```
437    pub fn oneshot(&self, item: Arg) -> JobHandle {
438        let job_counter = Arc::new((Mutex::new(0), Condvar::new()));
439
440        let oneshot_thread = {
441            let pool = self.consumer_pool.clone();
442            let job_counter = Arc::clone(&job_counter);
443            let tx = self.tx.clone();
444            thread::spawn(move || {
445                let job_counter = JobCounter::new(job_counter);
446                pool.execute(move || {
447                    let job = Job(item, job_counter);
448                    tx.send(job).unwrap();
449                });
450            })
451        };
452
453        JobHandle::new(oneshot_thread, job_counter)
454    }
455
456    /// Waits for all tasks to finish their work and return the shared resource afterwards.
457    ///
458    /// This implicitly waits for all created pools to be finished, since their jobs all end up in
459    /// the shared consumer. Note that after this call, this pool builder can no longer be used,
460    /// since the shared resource will be moved to the caller.
461    ///
462    /// This method returns a [`thread::Result`]. Since the `Err` variant of this specialized
463    /// `Result` does not implement the [`Error`](std::error::Error) trait, the `?`-operator does
464    /// not work here. For more information about how to handle panics in this case, please refer
465    /// to the documentation of [`thread::Result`].
466    ///
467    /// # Example:
468    ///
469    /// ```rust
470    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
471    /// #
472    /// # fn shared_consumer_fn(vec: &mut Vec<i32>, i: i32) {};
473    /// # fn heavy_algorithm(i: i32) -> i32 { i }
474    /// #
475    /// # fn example() {
476    /// let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), shared_consumer_fn);
477    /// pool_builder.create_pool(
478    ///     |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
479    ///     |i| heavy_algorithm(i),
480    /// );
481    ///
482    /// let result = pool_builder.join().unwrap();
483    /// # }
484    /// ```
485    pub fn join(self) -> thread::Result<SR> {
486        drop(self.tx);
487        self.handler_thread.join()
488    }
489}
490
491impl<Arg, SR> SharedResourcePoolBuilder<Sender<Job<Arg>>, SR>
492where
493    Arg: Send + 'static,
494    SR: Send + 'static,
495{
496    /// Creates a new unbounded pool builder.
497    ///
498    /// This function takes two parameters:
499    ///
500    /// -   The shared resource
501    ///
502    ///     This will be the resource that is used with every item that gets sent from the pool
503    ///     consumer functions. Usually this will be something that cannot be easily shared with
504    ///     multiple threads, like a database connection object. But this is not a restriction, you
505    ///     can use pretty much everything as the shared resource.
506    ///
507    /// -   The shared consumer function
508    ///
509    ///     This function is called for each item that comes from the pool consumers, together with
510    ///     the shared resource. To give you the maximum flexibility, the function must take two
511    ///     parameters: The shared resource and an item. The type of the item will be always the
512    ///     same for every pool consumer you create with this builder.
513    ///
514    /// Note, that the message channel, which is used by the pool consumers to communicate with the
515    /// shared consumer, is unlimited (in other words, it uses [`channel()`]). On modern systems
516    /// and "typical" use-cases, this should not be a problem. If however your system is low on
517    /// memory or your producers generate millions over millions of items, then you might be
518    /// interested in the [bounded](`Self::new_bounded()`) variant of this method.
519    ///
520    /// # Example
521    ///
522    /// ```rust
523    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
524    /// #
525    /// let pool_builder = SharedResourcePoolBuilder::new(
526    ///     // We can use pretty much anything as the shared resource. Here, we are using a Vector:
527    ///     Vec::new(),
528    ///
529    ///     // Naturally, our shared consumer will add an item to the Vector. The type of "i" can
530    ///     // either be given explicitly, or simply let Rust infer it.
531    ///     |vec, i| vec.push(i)
532    /// );
533    ///
534    /// // Now we create a (very simple) pool for demonstration purpose. Since we return "i"
535    /// // unaltered, the parameter type of the shared consumer as well as the item type of the
536    /// // shared Vector will be inferred to "u8".
537    /// pool_builder.create_pool(
538    ///     |tx| tx.send(1u8).unwrap(),
539    ///     |i| i,
540    /// );
541    /// ```
542    pub fn new<SC>(shared_resource: SR, shared_consumer_fn: SC) -> Self
543    where
544        SC: FnMut(&mut SR, Arg) -> () + Send + Sync + 'static,
545    {
546        let (tx, rx) = channel();
547        Self::init(tx, rx, shared_resource, shared_consumer_fn)
548    }
549}
550
551impl<Arg, SR> SharedResourcePoolBuilder<SyncSender<Job<Arg>>, SR>
552where
553    Arg: Send + 'static,
554    SR: Send + 'static,
555{
556    /// Creates a new bounded pool builder.
557    ///
558    /// This method is nearly identical to its [unbounded](`Self::new()`) variant. For a detailled
559    /// description of the shared resource and shared consumer parameters, please look there.
560    ///
561    /// The difference is, that this function takes an additional integer parameter, which limits
562    /// the size of the message channel, that is used by the pool consumers to communicate with the
563    /// shared consumer (in other words, it uses [`sync_channel()`]). If the channel is full, the
564    /// pool consumers will block until the shared consumer takes at least one item. This can be
565    /// useful if you expect a huge amount of items to be generated, or if your system is low on
566    /// memory.
567    ///
568    /// # Example
569    ///
570    /// ```rust
571    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
572    /// #
573    /// let pool_builder = SharedResourcePoolBuilder::new_bounded(
574    ///     // We are very low on memory
575    ///     10,
576    ///
577    ///     // We can use pretty much anything as the shared resource. Here, we are using a Vector:
578    ///     Vec::new(),
579    ///
580    ///     // Naturally, our shared consumer will add an item to the Vector. The type of "i" can
581    ///     // either be given explicitly, or simply let Rust infer it.
582    ///     |vec, i| vec.push(i)
583    /// );
584    ///
585    /// // Now we create a (very simple) pool for demonstration purpose. Since we return "i"
586    /// // unaltered, the parameter type of the shared consumer as well as the item type of the
587    /// // shared Vector will be inferred to "u8".
588    /// pool_builder.create_pool(
589    ///     |tx| tx.send(1u8).unwrap(),
590    ///     |i| i,
591    /// );
592    ///
593    /// // No matter how many pools we create, the maximum buffered items will never exceed 10.
594    /// ```
595    pub fn new_bounded<SC>(bound: usize, shared_resource: SR, shared_consumer_fn: SC) -> Self
596    where
597        SC: FnMut(&mut SR, Arg) -> () + Send + Sync + 'static,
598    {
599        let (tx, rx) = sync_channel(bound);
600        Self::init(tx, rx, shared_resource, shared_consumer_fn)
601    }
602}
603
604#[cfg(test)]
605mod tests {
606    use std::time::{Duration, Instant};
607
608    use super::*;
609
610    struct TestResource(Vec<TestElem>);
611
612    impl TestResource {
613        fn add(&mut self, elem: TestElem) {
614            self.0.push(elem);
615        }
616    }
617
618    struct TestElem(String);
619
620    #[test]
621    fn test_one_integer() {
622        let consumer_fn = |i| i * 10;
623
624        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
625        pool_builder.create_pool(
626            |tx| vec![1].into_iter().for_each(|i| tx.send(i).unwrap()),
627            consumer_fn,
628        );
629
630        let result = {
631            let mut result = pool_builder.join().unwrap();
632            result.sort();
633            result
634        };
635
636        assert_eq!(result, vec![10]);
637    }
638
639    #[test]
640    fn test_few_integers() {
641        let consumer_fn = |i| i * 10;
642
643        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
644        pool_builder.create_pool(|tx| (0..5).for_each(|i| tx.send(i).unwrap()), consumer_fn);
645
646        let result = {
647            let mut result = pool_builder.join().unwrap();
648            result.sort();
649            result
650        };
651
652        assert_eq!(result, (0..5).map(consumer_fn).collect::<Vec<_>>());
653    }
654
655    #[test]
656    fn test_few_integers_with_oneshot() {
657        let consumer_fn = |i| i * 10;
658
659        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
660        pool_builder.create_pool(|tx| (0..5).for_each(|i| tx.send(i).unwrap()), consumer_fn);
661        pool_builder.oneshot(consumer_fn(5));
662
663        let result = {
664            let mut result = pool_builder.join().unwrap();
665            result.sort();
666            result
667        };
668
669        assert_eq!(result, (0..6).map(consumer_fn).collect::<Vec<_>>());
670    }
671
672    #[test]
673    fn test_few_integers_with_delayed_producer() {
674        let consumer_fn = |i| i * 10;
675
676        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
677        pool_builder.create_pool(
678            |tx| {
679                thread::sleep(Duration::from_millis(10));
680                (0..5).for_each(|i| tx.send(i).unwrap());
681            },
682            consumer_fn,
683        );
684
685        let result = {
686            let mut result = pool_builder.join().unwrap();
687            result.sort();
688            result
689        };
690
691        assert_eq!(result, (0..5).map(consumer_fn).collect::<Vec<_>>());
692    }
693
694    #[test]
695    fn test_many_integers_with_bounded_shared_producer() {
696        let consumer_fn = |i| i * 10;
697
698        let pool_builder =
699            SharedResourcePoolBuilder::new_bounded(10, Vec::new(), |vec, i| vec.push(i));
700        pool_builder.create_pool(
701            |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
702            consumer_fn,
703        );
704
705        let result = {
706            let mut result = pool_builder.join().unwrap();
707            result.sort();
708            result
709        };
710
711        assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
712    }
713
714    #[test]
715    fn test_many_integers_with_bounded_item_producer() {
716        let consumer_fn = |i| i * 10;
717
718        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
719        pool_builder.create_pool_bounded(
720            10,
721            |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
722            consumer_fn,
723        );
724
725        let result = {
726            let mut result = pool_builder.join().unwrap();
727            result.sort();
728            result
729        };
730
731        assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
732    }
733
734    #[test]
735    fn test_many_integers_with_bounded_shared_and_item_producer() {
736        let consumer_fn = |i| i * 10;
737
738        let pool_builder =
739            SharedResourcePoolBuilder::new_bounded(10, Vec::new(), |vec, i| vec.push(i));
740        pool_builder.create_pool_bounded(
741            10,
742            |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
743            consumer_fn,
744        );
745
746        let result = {
747            let mut result = pool_builder.join().unwrap();
748            result.sort();
749            result
750        };
751
752        assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
753    }
754
755    #[test]
756    fn test_many_integers() {
757        let consumer_fn = |i| i * 10;
758
759        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
760        pool_builder.create_pool(
761            |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
762            consumer_fn,
763        );
764
765        let result = {
766            let mut result = pool_builder.join().unwrap();
767            result.sort();
768            result
769        };
770
771        assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
772    }
773
774    #[test]
775    fn test_wait_until_pool_finishes() {
776        let consumer_fn = |i| i * 10;
777
778        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
779        let pool = pool_builder.create_pool(
780            |tx| tx.send(1).unwrap(),
781            move |parg| {
782                thread::sleep(Duration::from_millis(10));
783                consumer_fn(parg)
784            },
785        );
786
787        pool.join().unwrap();
788
789        let now = Instant::now();
790
791        let result = {
792            let mut result = pool_builder.join().unwrap();
793            result.sort();
794            result
795        };
796
797        let millis = now.elapsed().as_millis();
798        assert!(millis < 10);
799
800        assert_eq!(result, vec![10]);
801    }
802
803    #[test]
804    fn test_integers_multiple_pools_parallel() {
805        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
806        let pools = vec![
807            pool_builder.create_pool(|tx| (0..2).for_each(|i| tx.send(i).unwrap()), |i| i * 10),
808            pool_builder.create_pool(|tx| (2..5).for_each(|i| tx.send(i).unwrap()), |i| i * 10),
809        ];
810
811        for pool in pools {
812            pool.join().unwrap();
813        }
814
815        let result = {
816            let mut result = pool_builder.join().unwrap();
817            result.sort();
818            result
819        };
820
821        assert_eq!(result, vec![0, 10, 20, 30, 40])
822    }
823
824    #[test]
825    fn test_integers_multiple_pools_sequential() {
826        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
827
828        let pool =
829            pool_builder.create_pool(|tx| (2..5).for_each(|i| tx.send(i).unwrap()), |i| i * 10);
830
831        pool.join().unwrap();
832
833        let pool =
834            pool_builder.create_pool(|tx| (0..2).for_each(|i| tx.send(i).unwrap()), |i| i * 10);
835
836        pool.join().unwrap();
837
838        let result = {
839            let mut result = pool_builder.join().unwrap();
840            result.sort();
841            result
842        };
843
844        assert_eq!(result, vec![0, 10, 20, 30, 40])
845    }
846
847    #[test]
848    fn test_integers_multiple_pools_with_parallel_producers() {
849        let producer_pool = threadpool::Builder::new().build();
850
851        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
852        let pools = vec![
853            {
854                let producer_pool = producer_pool.clone();
855                pool_builder.create_pool(
856                    move |tx| {
857                        (0..50).into_iter().for_each(|i| {
858                            let tx = tx.clone();
859                            producer_pool.execute(move || tx.send(i).unwrap())
860                        });
861                    },
862                    |i| i * 10,
863                )
864            },
865            {
866                let producer_pool = producer_pool.clone();
867                pool_builder.create_pool(
868                    move |tx| {
869                        (50..100).into_iter().for_each(|i| {
870                            let tx = tx.clone();
871                            producer_pool.execute(move || tx.send(i).unwrap())
872                        });
873                    },
874                    |i| i * 10,
875                )
876            },
877        ];
878
879        for pool in pools {
880            pool.join().unwrap();
881        }
882
883        let result = {
884            let mut result = pool_builder.join().unwrap();
885            result.sort();
886            result
887        };
888
889        assert_eq!(
890            result,
891            (0..100).into_iter().map(|i| i * 10).collect::<Vec<_>>()
892        )
893    }
894
895    #[test]
896    fn test_custom_types() {
897        let producer_fn = || ('a'..='z').map(|c| String::from(c));
898
899        let pool_builder =
900            SharedResourcePoolBuilder::new(TestResource(Vec::new()), |tres, e| tres.add(e));
901        pool_builder.create_pool(
902            move |tx| producer_fn().for_each(|i| tx.send(i).unwrap()),
903            |c| TestElem(c),
904        );
905
906        let result = {
907            let mut result = pool_builder
908                .join()
909                .unwrap()
910                .0
911                .iter()
912                .map(|e| e.0.clone())
913                .collect::<Vec<_>>();
914            result.sort();
915            result
916        };
917
918        assert_eq!(result, producer_fn().collect::<Vec<_>>());
919    }
920}