1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
type Counter = Arc<(Mutex<u32>, Condvar)>;
/// Argument type that gets sent to the shared resource
pub struct Job<Arg>(pub(crate) Arg, pub(crate) JobCounter);
pub(crate) struct JobCounter(pub(crate) Counter);
/// The handle that can be used to wait for a pool to finish
pub struct JobHandle {
pub(crate) join_handle: JoinHandle<()>,
pub(crate) job_counter: Counter,
}
impl JobCounter {
pub(crate) fn new(job_counter: Counter) -> Self {
{
let (lock, _) = &*job_counter;
let mut job_counter = lock.lock().unwrap();
*job_counter += 1;
}
Self(job_counter)
}
}
impl Drop for JobCounter {
fn drop(&mut self) {
let (lock, cvar) = &*self.0;
let mut job_counter = lock.lock().unwrap();
*job_counter -= 1;
cvar.notify_all();
}
}
impl JobHandle {
pub(crate) fn new(join_handle: JoinHandle<()>, job_counter: Counter) -> Self {
Self {
join_handle,
job_counter,
}
}
/// Waits for a specific pool to finish its work.
///
/// This can be used if you need to create another pool, but only after a previous pool finishes
/// its work.
///
/// This method returns a [`thread::Result`]. Since the `Err` variant of this specialized
/// `Result` does not implement the [`Error`](std::error::Error) trait, the `?`-operator does
/// not work here. For more information about how to handle panics in this case, please refer
/// to the documentation of [`thread::Result`].
///
/// # Example:
///
/// ```rust
/// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
/// #
/// # fn example() {
/// let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |v, i| v.push(i));
///
/// // It is crucial that all additions finish their work, hence the `join`.
/// pool_builder.create_pool(
/// |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
/// |i| i + 3,
/// ).join().unwrap();
///
/// // Now we are safe to run the multiplications.
/// pool_builder.create_pool(
/// |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
/// |i| i * 2,
/// );
///
/// let result = pool_builder.join().unwrap();
/// # }
/// ```
pub fn join(self) -> thread::Result<()> {
self.join_handle.join()?;
let (lock, cvar) = &*self.job_counter;
let mut job_counter = lock.lock().unwrap();
while *job_counter > 0 {
job_counter = cvar.wait(job_counter).unwrap();
}
Ok(())
}
}