shared_resource_pool_builder/
job.rs

1use std::sync::{Arc, Condvar, Mutex};
2use std::thread;
3use std::thread::JoinHandle;
4
5type Counter = Arc<(Mutex<u32>, Condvar)>;
6
7/// Argument type that gets sent to the shared resource
8pub struct Job<Arg>(pub(crate) Arg, pub(crate) JobCounter);
9
10pub(crate) struct JobCounter(pub(crate) Counter);
11
12/// The handle that can be used to wait for a pool to finish
13pub struct JobHandle {
14    pub(crate) join_handle: JoinHandle<()>,
15    pub(crate) job_counter: Counter,
16}
17
18impl JobCounter {
19    pub(crate) fn new(job_counter: Counter) -> Self {
20        {
21            let (lock, _) = &*job_counter;
22            let mut job_counter = lock.lock().unwrap();
23            *job_counter += 1;
24        }
25
26        Self(job_counter)
27    }
28}
29
30impl Drop for JobCounter {
31    fn drop(&mut self) {
32        let (lock, cvar) = &*self.0;
33        let mut job_counter = lock.lock().unwrap();
34        *job_counter -= 1;
35        cvar.notify_all();
36    }
37}
38
39impl JobHandle {
40    pub(crate) fn new(join_handle: JoinHandle<()>, job_counter: Counter) -> Self {
41        Self {
42            join_handle,
43            job_counter,
44        }
45    }
46
47    /// Waits for a specific pool to finish its work.
48    ///
49    /// This can be used if you need to create another pool, but only after a previous pool finishes
50    /// its work.
51    ///
52    /// This method returns a [`thread::Result`]. Since the `Err` variant of this specialized
53    /// `Result` does not implement the [`Error`](std::error::Error) trait, the `?`-operator does
54    /// not work here. For more information about how to handle panics in this case, please refer
55    /// to the documentation of [`thread::Result`].
56    ///
57    /// # Example:
58    ///
59    /// ```rust
60    /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
61    /// #
62    /// # fn example() {
63    /// let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |v, i| v.push(i));
64    ///
65    /// // It is crucial that all additions finish their work, hence the `join`.
66    /// pool_builder.create_pool(
67    ///     |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
68    ///     |i| i + 3,
69    /// ).join().unwrap();
70    ///
71    /// // Now we are safe to run the multiplications.
72    /// pool_builder.create_pool(
73    ///     |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
74    ///     |i| i * 2,
75    /// );
76    ///
77    /// let result = pool_builder.join().unwrap();
78    /// # }
79    /// ```
80    pub fn join(self) -> thread::Result<()> {
81        self.join_handle.join()?;
82
83        let (lock, cvar) = &*self.job_counter;
84        let mut job_counter = lock.lock().unwrap();
85        while *job_counter > 0 {
86            job_counter = cvar.wait(job_counter).unwrap();
87        }
88
89        Ok(())
90    }
91}