1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
//! # Shared Resource Pool Builder
//!
//! A producer-consumer thread pool builder that will eventually consume all items
//! sequentially with a shared resource.
//!
//! ## Code example
//!
//! The purpose of this library is best described with an example, so let's get
//! straight into it:
//!
//! ```rust
//! use shared_resource_pool_builder::SharedResourcePoolBuilder;
//!
//! // Define a pool builder with a Vector as shared resource, and an action that
//! // should be executed sequentially over all items. Note that the second
//! // parameter of this function is a shared consumer function that takes two
//! // parameters: The shared resource and the items it will be used on. These
//! // items will be returned from the pool consumers.
//! let pool_builder = SharedResourcePoolBuilder::new(
//!     Vec::new(),
//!     |vec, i| vec.push(i)
//! );
//!
//! // Create a producer-consumer thread pool that does some work. In this case,
//! // send the numbers 0 to 2 to the consumer. There, they will be multiplied by
//! // 10. Note that the first parameter is a producer function that takes a
//! // Sender like object, like [`std::sync::mpsc::Sender`]. Every item that gets
//! // sent will be processed by the consumer, which is the second parameter to
//! // this function. This function expects an object of the same type that gets
//! // sent by the producer. The consumer is expected to return an object of the
//! // type that the shared consumer takes as the second parameter and will
//! // ultimately used on the shared resource.
//! let pool_1 = pool_builder
//!     .create_pool(
//!         |tx| (0..=2).for_each(|i| tx.send(i).unwrap()),
//!         |i| i * 10
//!     );
//!
//! // Create a second pool. Here, the numbers 3 to 5 will be produced, multiplied
//! // by 2, and being processed by the shared consumer function.
//! let pool_2 = pool_builder
//!     .create_pool(
//!         |tx| (3..=5).for_each(|i| tx.send(i).unwrap()),
//!         |i| i * 2
//!     );
//!
//! // Wait for a specific pool to finish before continuing ...
//! pool_1.join().unwrap();
//!
//! // ... or wait for all pools and the shared consumer to finish their work at
//! // once and return the shared resource. Afterwards, the pool builder can no
//! // longer be used, since the shared resource gets moved.
//! let result = {
//!     let mut result = pool_builder.join().unwrap();
//!
//!     // By default, the pool consumers run in as many threads as there are
//!     // cores in the machine, so the result may not be in the same order as
//!     // they were programmed.
//!     result.sort();
//!     result
//! };
//!
//! assert_eq!(result, vec![0, 6, 8, 10, 10, 20]);
//! ```
//!
//! ## Motivation
//!
//! Imagine you have a huge list of items that need to be processed. Every item
//! can be processed independently, so the natural thing to do is using a thread
//! pool to parallelize the work. But now imagine that the resulting items need to
//! be used together with a resource that does not work on multiple threads. The
//! resulting items will have to be processed sequentially, so the resource does
//! not have to be shared with multiple threads.
//!
//! As a more concrete example, imagine a program that uses a SQLite database for
//! storing computational intensive results. Before doing the actual work, you
//! want to remove obsolete results from the database. Then you need to collect
//! all new data that need to get processed. At last, you want all new data to be
//! actually processed.
//!
//! Normally, this involves three sequential steps: Cleaning, collecting, and
//! processing. These steps need to be sequential because writing operations on
//! the same SQLite database can only be done by one thread at a time. You might
//! want to implement synchronization, so that one thread writes and all
//! additional threads are blocked and wait for their turn. But this can be error
//! prone if not done correctly. You will encounter random panics of locked or
//! busy databases. Furthermore, you lose the ability to use prepared statements
//! efficiently.
//!
//! Enter `shared-resource-pool-builder`. With this, you have the ability to
//! define a *shared resource* and a *shared consumer function*. From this builder
//! you can create producer-consumer pools. The consumed items will then be
//! sequentially given to the shared consumer function to apply the result to the
//! shared resource.
//!
//! The pool consumers will all run within a shared thread pool, so you need not
//! to worry about thrashing your system. You can even limit the number of items
//! that get processed in parallel.
//!
//! To stay with the database example, this means you can define a pool builder
//! with the database connection as the shared resource and a shared consumer
//! function that will remove, insert, or update items. You can do that with an
//! enum, for example.
//!
//! Then, for the cleaning step, you create a pool that produces all items from
//! the database and consumes them by returning an object that causes the item to
//! be either deleted (if obsolete) or ignored by the shared consumer.
//!
//! Next, you can create a pool that produces all new items and consumes them by
//! creating an insert action with each item.
//!
//! Depending on the database layout, these two pools can even be run in parallel.
//! Thanks to the shared consumer, only one delete or insert will run at the same
//! time.
//!
//! Again, depending on the layout, you just need to wait for the inserting pool
//! to finish and create yet another pool that produces all new items that needs
//! to be processed, and consumes them by doing the actual work and returning an
//! appropriate update object for the shared consumer. In the meanwhile, the
//! cleaning pool might as well continue its work.
//!
//! Eventually, you wait for all pools to finish before terminating the program.
//!
//! This way, you do not need to worry about synchronizing multiple writer threads
//! or multiple open database connections. Just split the work so the
//! computational intensive tasks get done in the pool consumers and make the
//! shared consumer just do the finalization.

use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;

use threadpool::ThreadPool;

use crate::job::JobCounter;
pub use crate::job::{Job, JobHandle};
use crate::senderlike::SenderLike;

mod job;
mod senderlike;

/// The builder to create pools on a shared resource.
pub struct SharedResourcePoolBuilder<SType, SR> {
    tx: SType,
    handler_thread: JoinHandle<SR>,
    consumer_pool: ThreadPool,
}

impl<SType, Arg, SR> SharedResourcePoolBuilder<SType, SR>
where
    Arg: Send + 'static,
    SType: SenderLike<Item = Job<Arg>> + Clone + Send + 'static,
    SR: Send + 'static,
{
    fn init<SC>(
        tx: SType,
        rx: Receiver<Job<Arg>>,
        mut shared_resource: SR,
        mut shared_consumer_fn: SC,
    ) -> Self
    where
        SC: FnMut(&mut SR, Arg) -> () + Send + Sync + 'static,
    {
        let join_handle = thread::spawn(move || {
            for job in rx {
                shared_consumer_fn(&mut shared_resource, job.0);
            }
            shared_resource
        });

        let consumer_pool = threadpool::Builder::new().build();

        Self {
            tx,
            handler_thread: join_handle,
            consumer_pool,
        }
    }

    /// Creates an unbounded producer-consumer pool.
    ///
    /// This method takes two parameters:
    ///
    /// -   The producer function
    ///
    ///     This function will generate the items to be further processed by the consumer. It must
    ///     take one parameter, which is a sender like object like [`Sender`]. To generate an item
    ///     for the consumer, use the `send()` method of the sender.
    ///
    ///     The return type of this function will be ignored, i.e. it is `()`.
    ///
    ///     By design, this function will run in only one thread. If you want it to run in multiple
    ///     threads, you need to implement that by yourself. The preferred way is to run only a
    ///     cheap listing algorithm in the producer and let the consumer do the computational
    ///     intensive tasks, which will by default run within a thread pool.
    ///
    /// -   The consumer function
    ///
    ///     This function takes one parameter and will be called for each item that was sent by the
    ///     producer. The return type must be the same type that is used by the shared consumer as
    ///     defined by the [`Self::new()`] method.
    ///
    ///     By design, the consumer functions will be run within a thread pool that is shared with
    ///     every other pool consumers created by this builder. The number of threads is the number
    ///     of cores, as defined by the default value of [`threadpool::Builder::num_threads()`].
    ///
    /// The message channel, in which the producer sends and the consumer receives the items, is
    /// unlimited with this function (in other words, it uses [`channel()`]). On modern systems and
    /// "typical" use-cases, this should not be a problem. If however your system is low on memory
    /// or your producer generates millions over millions of items, then you might be interested in
    /// the [bounded](`Self::create_pool_bounded()`) variant of this method.
    ///
    /// # Example
    ///
    /// Assuming that `heavy_algorithm()` converts an integer to an object that can be used with
    /// the shared consumer function, a pool creation might look like this:
    ///
    /// ```rust
    /// # use std::error::Error;
    /// # use std::sync::mpsc::Sender;
    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
    /// #
    /// # fn heavy_algorithm(i: i32) -> i32 { i }
    /// #
    /// # fn example() {
    /// # let pool_builder = SharedResourcePoolBuilder::new(true, |res, item|());
    /// pool_builder.create_pool(
    ///     |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
    ///     |i| heavy_algorithm(i),
    /// );
    /// # }
    /// ```
    pub fn create_pool<P, PArg, C>(&self, producer_fn: P, consumer_fn: C) -> JobHandle
    where
        P: Fn(Sender<PArg>) -> () + Send + 'static,
        PArg: Send + 'static,
        C: Fn(PArg) -> Arg + Clone + Send + Sync + 'static,
    {
        let (tx, rx) = channel::<PArg>();
        self.init_pool(tx, rx, producer_fn, consumer_fn)
    }

    /// Creates a bounded producer-consumer pool.
    ///
    /// This method is nearly identical to its [unbounded](`Self::create_pool()`) variant. For a
    /// detailled description of the producer and consumer parameters, please look there.
    ///
    /// The difference is, that this function takes an additional integer parameter, which limits
    /// the size of the message channel, that is used by the producer and consumer (in other words,
    /// it uses [`sync_channel()`]). If the channel is full, the producer will block until the
    /// consumer takes at least one item. This can be useful if you expect a huge amount of items
    /// to be generated, or if your system is low on memory.
    ///
    /// # Example
    ///
    /// Assuming that `crazy_producer()` generates a huge amount of integer values, and that
    /// `heavy_algorithm()` converts an integer to an object that can be used with the shared
    /// consumer function, a bounded pool creation with a buffer of 100 items might look like this:
    ///
    /// ```rust
    /// # use std::error::Error;
    /// # use std::sync::mpsc::Sender;
    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
    /// #
    /// # fn crazy_producer() -> Vec<i32> { vec![] }
    /// # fn heavy_algorithm(i: i32) -> i32 { i }
    /// #
    /// # fn example() {
    /// # let pool_builder = SharedResourcePoolBuilder::new(true, |res, item|());
    /// pool_builder.create_pool_bounded(
    ///     100,
    ///     |tx| crazy_producer().into_iter().for_each(|i| tx.send(i).unwrap()),
    ///     |i| heavy_algorithm(i),
    /// );
    /// # }
    /// ```
    pub fn create_pool_bounded<P, PArg, C>(
        &self,
        bound: usize,
        producer_fn: P,
        consumer_fn: C,
    ) -> JobHandle
    where
        P: Fn(SyncSender<PArg>) -> () + Send + 'static,
        PArg: Send + 'static,
        C: Fn(PArg) -> Arg + Clone + Send + Sync + 'static,
    {
        let (tx, rx) = sync_channel::<PArg>(bound);
        self.init_pool(tx, rx, producer_fn, consumer_fn)
    }

    fn init_pool<P, PArg, PType, C>(
        &self,
        tx: PType,
        rx: Receiver<PArg>,
        producer_fn: P,
        consumer_fn: C,
    ) -> JobHandle
    where
        PType: SenderLike<Item = PArg> + Send + 'static,
        P: Fn(PType) -> () + Send + 'static,
        PArg: Send + 'static,
        C: Fn(PArg) -> Arg + Clone + Send + Sync + 'static,
    {
        let producer_thread = thread::spawn(move || producer_fn(tx));

        let job_counter = Arc::new((Mutex::new(0), Condvar::new()));

        let consumer_thread = {
            let pool = self.consumer_pool.clone();
            let job_counter = Arc::clone(&job_counter);
            let tx = self.tx.clone();
            thread::spawn(move || {
                for item in rx {
                    let tx = tx.clone();
                    let consumer_fn = consumer_fn.clone();
                    let job_counter = JobCounter::new(Arc::clone(&job_counter));
                    pool.execute(move || {
                        let job = Job(consumer_fn(item), job_counter);
                        tx.send(job).unwrap();
                    });
                }
            })
        };

        let join_handle = thread::spawn(move || {
            producer_thread.join().unwrap();
            consumer_thread.join().unwrap();
        });

        JobHandle::new(join_handle, job_counter)
    }

    /// Send one single item to the shared consumer.
    ///
    /// This method can be used to communicate with the shared resource, before creating another
    /// pool. Maybe you just want to add another item to the shared consumer. Or you want to use
    /// this method to trigger an alternate bevahior of the shared consumer.
    ///
    /// Internally, this method behaves much like the [`Self::create_pool()`] method. It returns a
    /// [`JobHandle`], which can be waited on with [`JobHandle::join()`]. The item is sent to the
    /// message queue of the pool builder and might not be consumed immediately if there are other
    /// items waiting.
    ///
    /// # Examples
    ///
    /// ## Add another item
    ///
    /// In the following example, in addition to summing up 0, 1, and 2, as well as 4, 5, and 6, we
    /// add a single 3 to the result:
    ///
    /// ```rust
    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
    /// #
    /// let pool_builder = SharedResourcePoolBuilder::new(0, |sum, i| *sum += i);
    ///
    /// pool_builder.create_pool(
    ///     |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(i).unwrap()),
    ///     |i| i,
    /// );
    ///
    /// pool_builder.oneshot(3);
    ///
    /// pool_builder.create_pool(
    ///     |tx| vec![4, 5, 6].into_iter().for_each(|i| tx.send(i).unwrap()),
    ///     |i| i,
    /// );
    ///
    /// let result = pool_builder.join().unwrap();
    ///
    /// assert_eq!(result, 21);
    /// ```
    ///
    /// ## Trigger alternate behavior
    ///
    /// In the following example, we change the behavior of the shared consumer in the middle of
    /// the execution:
    ///
    /// ```rust
    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
    /// #
    /// let pool_builder = SharedResourcePoolBuilder::new(
    ///     // Here we define our shared resource to be a tuple. The second element will determine
    ///     // our current behavior and can be altered in the shared consumer function.
    ///     (0, true),
    ///
    ///     // The whole tuple must be used in the shared consumer. Add or subtract the received
    ///     // value `i`, depending on the state of `adding`. If `None` is received, swap the state
    ///     // variable, so that upcoming values will be handled by the alternate behavior.
    ///     |(sum, adding), i| {
    ///         match i {
    ///             Some(i) => {
    ///                 if *adding {
    ///                     *sum += i;
    ///                 } else {
    ///                     *sum -= i;
    ///                 }
    ///             }
    ///             None => {
    ///                 *adding = !*adding;
    ///             }
    ///         }
    ///     },
    /// );
    ///
    /// // First we add some numbers
    /// pool_builder.create_pool(
    ///     |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(Some(i)).unwrap()),
    ///     |i| i,
    /// ).join().unwrap();
    ///
    /// // Swap summing behavior
    /// pool_builder.oneshot(None).join().unwrap();
    ///
    /// // Now we subtract the same numbers again, with the same code as before
    /// pool_builder.create_pool(
    ///     |tx| vec![0, 1, 2].into_iter().for_each(|i| tx.send(Some(i)).unwrap()),
    ///     |i| i,
    /// );
    ///
    /// // Remember that we defined a boolean in our shared resource tuple. We are not interested
    /// // in this one anymore, so we explicitly ignore it here.
    /// let (result, _) = pool_builder.join().unwrap();
    ///
    /// // Since we subtracted the same amount as we added, the result is now 0
    /// assert_eq!(result, 0);
    ///
    /// // Note that we joined the first pool and the oneshot to await them to be finished. In this
    /// // simple example, it would normally not be necessary, since sending merely three items
    /// // happens quite immediately and the shared consumer handles all items sequentially.
    /// // Nevertheless, there could be a race condition if for some reason the swapping is
    /// // triggered before all items could be added. In more complex scenarios, care should be
    /// // taken to await all previous pools with `join()` before altering the shared consumer
    /// // behavior.
    /// ```
    pub fn oneshot(&self, item: Arg) -> JobHandle {
        let job_counter = Arc::new((Mutex::new(0), Condvar::new()));

        let oneshot_thread = {
            let pool = self.consumer_pool.clone();
            let job_counter = Arc::clone(&job_counter);
            let tx = self.tx.clone();
            thread::spawn(move || {
                let job_counter = JobCounter::new(job_counter);
                pool.execute(move || {
                    let job = Job(item, job_counter);
                    tx.send(job).unwrap();
                });
            })
        };

        JobHandle::new(oneshot_thread, job_counter)
    }

    /// Waits for all tasks to finish their work and return the shared resource afterwards.
    ///
    /// This implicitly waits for all created pools to be finished, since their jobs all end up in
    /// the shared consumer. Note that after this call, this pool builder can no longer be used,
    /// since the shared resource will be moved to the caller.
    ///
    /// This method returns a [`thread::Result`]. Since the `Err` variant of this specialized
    /// `Result` does not implement the [`Error`](std::error::Error) trait, the `?`-operator does
    /// not work here. For more information about how to handle panics in this case, please refer
    /// to the documentation of [`thread::Result`].
    ///
    /// # Example:
    ///
    /// ```rust
    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
    /// #
    /// # fn shared_consumer_fn(vec: &mut Vec<i32>, i: i32) {};
    /// # fn heavy_algorithm(i: i32) -> i32 { i }
    /// #
    /// # fn example() {
    /// let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), shared_consumer_fn);
    /// pool_builder.create_pool(
    ///     |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
    ///     |i| heavy_algorithm(i),
    /// );
    ///
    /// let result = pool_builder.join().unwrap();
    /// # }
    /// ```
    pub fn join(self) -> thread::Result<SR> {
        drop(self.tx);
        self.handler_thread.join()
    }
}

impl<Arg, SR> SharedResourcePoolBuilder<Sender<Job<Arg>>, SR>
where
    Arg: Send + 'static,
    SR: Send + 'static,
{
    /// Creates a new unbounded pool builder.
    ///
    /// This function takes two parameters:
    ///
    /// -   The shared resource
    ///
    ///     This will be the resource that is used with every item that gets sent from the pool
    ///     consumer functions. Usually this will be something that cannot be easily shared with
    ///     multiple threads, like a database connection object. But this is not a restriction, you
    ///     can use pretty much everything as the shared resource.
    ///
    /// -   The shared consumer function
    ///
    ///     This function is called for each item that comes from the pool consumers, together with
    ///     the shared resource. To give you the maximum flexibility, the function must take two
    ///     parameters: The shared resource and an item. The type of the item will be always the
    ///     same for every pool consumer you create with this builder.
    ///
    /// Note, that the message channel, which is used by the pool consumers to communicate with the
    /// shared consumer, is unlimited (in other words, it uses [`channel()`]). On modern systems
    /// and "typical" use-cases, this should not be a problem. If however your system is low on
    /// memory or your producers generate millions over millions of items, then you might be
    /// interested in the [bounded](`Self::new_bounded()`) variant of this method.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
    /// #
    /// let pool_builder = SharedResourcePoolBuilder::new(
    ///     // We can use pretty much anything as the shared resource. Here, we are using a Vector:
    ///     Vec::new(),
    ///
    ///     // Naturally, our shared consumer will add an item to the Vector. The type of "i" can
    ///     // either be given explicitly, or simply let Rust infer it.
    ///     |vec, i| vec.push(i)
    /// );
    ///
    /// // Now we create a (very simple) pool for demonstration purpose. Since we return "i"
    /// // unaltered, the parameter type of the shared consumer as well as the item type of the
    /// // shared Vector will be inferred to "u8".
    /// pool_builder.create_pool(
    ///     |tx| tx.send(1u8).unwrap(),
    ///     |i| i,
    /// );
    /// ```
    pub fn new<SC>(shared_resource: SR, shared_consumer_fn: SC) -> Self
    where
        SC: FnMut(&mut SR, Arg) -> () + Send + Sync + 'static,
    {
        let (tx, rx) = channel();
        Self::init(tx, rx, shared_resource, shared_consumer_fn)
    }
}

impl<Arg, SR> SharedResourcePoolBuilder<SyncSender<Job<Arg>>, SR>
where
    Arg: Send + 'static,
    SR: Send + 'static,
{
    /// Creates a new bounded pool builder.
    ///
    /// This method is nearly identical to its [unbounded](`Self::new()`) variant. For a detailled
    /// description of the shared resource and shared consumer parameters, please look there.
    ///
    /// The difference is, that this function takes an additional integer parameter, which limits
    /// the size of the message channel, that is used by the pool consumers to communicate with the
    /// shared consumer (in other words, it uses [`sync_channel()`]). If the channel is full, the
    /// pool consumers will block until the shared consumer takes at least one item. This can be
    /// useful if you expect a huge amount of items to be generated, or if your system is low on
    /// memory.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
    /// #
    /// let pool_builder = SharedResourcePoolBuilder::new_bounded(
    ///     // We are very low on memory
    ///     10,
    ///
    ///     // We can use pretty much anything as the shared resource. Here, we are using a Vector:
    ///     Vec::new(),
    ///
    ///     // Naturally, our shared consumer will add an item to the Vector. The type of "i" can
    ///     // either be given explicitly, or simply let Rust infer it.
    ///     |vec, i| vec.push(i)
    /// );
    ///
    /// // Now we create a (very simple) pool for demonstration purpose. Since we return "i"
    /// // unaltered, the parameter type of the shared consumer as well as the item type of the
    /// // shared Vector will be inferred to "u8".
    /// pool_builder.create_pool(
    ///     |tx| tx.send(1u8).unwrap(),
    ///     |i| i,
    /// );
    ///
    /// // No matter how many pools we create, the maximum buffered items will never exceed 10.
    /// ```
    pub fn new_bounded<SC>(bound: usize, shared_resource: SR, shared_consumer_fn: SC) -> Self
    where
        SC: FnMut(&mut SR, Arg) -> () + Send + Sync + 'static,
    {
        let (tx, rx) = sync_channel(bound);
        Self::init(tx, rx, shared_resource, shared_consumer_fn)
    }
}

#[cfg(test)]
mod tests {
    use std::time::{Duration, Instant};

    use super::*;

    struct TestResource(Vec<TestElem>);

    impl TestResource {
        fn add(&mut self, elem: TestElem) {
            self.0.push(elem);
        }
    }

    struct TestElem(String);

    #[test]
    fn test_one_integer() {
        let consumer_fn = |i| i * 10;

        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
        pool_builder.create_pool(
            |tx| vec![1].into_iter().for_each(|i| tx.send(i).unwrap()),
            consumer_fn,
        );

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        assert_eq!(result, vec![10]);
    }

    #[test]
    fn test_few_integers() {
        let consumer_fn = |i| i * 10;

        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
        pool_builder.create_pool(|tx| (0..5).for_each(|i| tx.send(i).unwrap()), consumer_fn);

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        assert_eq!(result, (0..5).map(consumer_fn).collect::<Vec<_>>());
    }

    #[test]
    fn test_few_integers_with_oneshot() {
        let consumer_fn = |i| i * 10;

        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
        pool_builder.create_pool(|tx| (0..5).for_each(|i| tx.send(i).unwrap()), consumer_fn);
        pool_builder.oneshot(consumer_fn(5));

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        assert_eq!(result, (0..6).map(consumer_fn).collect::<Vec<_>>());
    }

    #[test]
    fn test_few_integers_with_delayed_producer() {
        let consumer_fn = |i| i * 10;

        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
        pool_builder.create_pool(
            |tx| {
                thread::sleep(Duration::from_millis(10));
                (0..5).for_each(|i| tx.send(i).unwrap());
            },
            consumer_fn,
        );

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        assert_eq!(result, (0..5).map(consumer_fn).collect::<Vec<_>>());
    }

    #[test]
    fn test_many_integers_with_bounded_shared_producer() {
        let consumer_fn = |i| i * 10;

        let pool_builder =
            SharedResourcePoolBuilder::new_bounded(10, Vec::new(), |vec, i| vec.push(i));
        pool_builder.create_pool(
            |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
            consumer_fn,
        );

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
    }

    #[test]
    fn test_many_integers_with_bounded_item_producer() {
        let consumer_fn = |i| i * 10;

        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
        pool_builder.create_pool_bounded(
            10,
            |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
            consumer_fn,
        );

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
    }

    #[test]
    fn test_many_integers_with_bounded_shared_and_item_producer() {
        let consumer_fn = |i| i * 10;

        let pool_builder =
            SharedResourcePoolBuilder::new_bounded(10, Vec::new(), |vec, i| vec.push(i));
        pool_builder.create_pool_bounded(
            10,
            |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
            consumer_fn,
        );

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
    }

    #[test]
    fn test_many_integers() {
        let consumer_fn = |i| i * 10;

        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
        pool_builder.create_pool(
            |tx| (0..1000).for_each(|i| tx.send(i).unwrap()),
            consumer_fn,
        );

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        assert_eq!(result, (0..1000).map(consumer_fn).collect::<Vec<_>>());
    }

    #[test]
    fn test_wait_until_pool_finishes() {
        let consumer_fn = |i| i * 10;

        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
        let pool = pool_builder.create_pool(
            |tx| tx.send(1).unwrap(),
            move |parg| {
                thread::sleep(Duration::from_millis(10));
                consumer_fn(parg)
            },
        );

        pool.join().unwrap();

        let now = Instant::now();

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        let millis = now.elapsed().as_millis();
        assert!(millis < 10);

        assert_eq!(result, vec![10]);
    }

    #[test]
    fn test_integers_multiple_pools_parallel() {
        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
        let pools = vec![
            pool_builder.create_pool(|tx| (0..2).for_each(|i| tx.send(i).unwrap()), |i| i * 10),
            pool_builder.create_pool(|tx| (2..5).for_each(|i| tx.send(i).unwrap()), |i| i * 10),
        ];

        for pool in pools {
            pool.join().unwrap();
        }

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        assert_eq!(result, vec![0, 10, 20, 30, 40])
    }

    #[test]
    fn test_integers_multiple_pools_sequential() {
        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));

        let pool =
            pool_builder.create_pool(|tx| (2..5).for_each(|i| tx.send(i).unwrap()), |i| i * 10);

        pool.join().unwrap();

        let pool =
            pool_builder.create_pool(|tx| (0..2).for_each(|i| tx.send(i).unwrap()), |i| i * 10);

        pool.join().unwrap();

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        assert_eq!(result, vec![0, 10, 20, 30, 40])
    }

    #[test]
    fn test_integers_multiple_pools_with_parallel_producers() {
        let producer_pool = threadpool::Builder::new().build();

        let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |vec, i| vec.push(i));
        let pools = vec![
            {
                let producer_pool = producer_pool.clone();
                pool_builder.create_pool(
                    move |tx| {
                        (0..50).into_iter().for_each(|i| {
                            let tx = tx.clone();
                            producer_pool.execute(move || tx.send(i).unwrap())
                        });
                    },
                    |i| i * 10,
                )
            },
            {
                let producer_pool = producer_pool.clone();
                pool_builder.create_pool(
                    move |tx| {
                        (50..100).into_iter().for_each(|i| {
                            let tx = tx.clone();
                            producer_pool.execute(move || tx.send(i).unwrap())
                        });
                    },
                    |i| i * 10,
                )
            },
        ];

        for pool in pools {
            pool.join().unwrap();
        }

        let result = {
            let mut result = pool_builder.join().unwrap();
            result.sort();
            result
        };

        assert_eq!(
            result,
            (0..100).into_iter().map(|i| i * 10).collect::<Vec<_>>()
        )
    }

    #[test]
    fn test_custom_types() {
        let producer_fn = || ('a'..='z').map(|c| String::from(c));

        let pool_builder =
            SharedResourcePoolBuilder::new(TestResource(Vec::new()), |tres, e| tres.add(e));
        pool_builder.create_pool(
            move |tx| producer_fn().for_each(|i| tx.send(i).unwrap()),
            |c| TestElem(c),
        );

        let result = {
            let mut result = pool_builder
                .join()
                .unwrap()
                .0
                .iter()
                .map(|e| e.0.clone())
                .collect::<Vec<_>>();
            result.sort();
            result
        };

        assert_eq!(result, producer_fn().collect::<Vec<_>>());
    }
}