shared_resource_pool_builder/job.rs
1use std::sync::{Arc, Condvar, Mutex};
2use std::thread;
3use std::thread::JoinHandle;
4
5type Counter = Arc<(Mutex<u32>, Condvar)>;
6
7/// Argument type that gets sent to the shared resource
8pub struct Job<Arg>(pub(crate) Arg, pub(crate) JobCounter);
9
10pub(crate) struct JobCounter(pub(crate) Counter);
11
12/// The handle that can be used to wait for a pool to finish
13pub struct JobHandle {
14 pub(crate) join_handle: JoinHandle<()>,
15 pub(crate) job_counter: Counter,
16}
17
18impl JobCounter {
19 pub(crate) fn new(job_counter: Counter) -> Self {
20 {
21 let (lock, _) = &*job_counter;
22 let mut job_counter = lock.lock().unwrap();
23 *job_counter += 1;
24 }
25
26 Self(job_counter)
27 }
28}
29
30impl Drop for JobCounter {
31 fn drop(&mut self) {
32 let (lock, cvar) = &*self.0;
33 let mut job_counter = lock.lock().unwrap();
34 *job_counter -= 1;
35 cvar.notify_all();
36 }
37}
38
39impl JobHandle {
40 pub(crate) fn new(join_handle: JoinHandle<()>, job_counter: Counter) -> Self {
41 Self {
42 join_handle,
43 job_counter,
44 }
45 }
46
47 /// Waits for a specific pool to finish its work.
48 ///
49 /// This can be used if you need to create another pool, but only after a previous pool finishes
50 /// its work.
51 ///
52 /// This method returns a [`thread::Result`]. Since the `Err` variant of this specialized
53 /// `Result` does not implement the [`Error`](std::error::Error) trait, the `?`-operator does
54 /// not work here. For more information about how to handle panics in this case, please refer
55 /// to the documentation of [`thread::Result`].
56 ///
57 /// # Example:
58 ///
59 /// ```rust
60 /// # use shared_resource_pool_builder::SharedResourcePoolBuilder;
61 /// #
62 /// # fn example() {
63 /// let pool_builder = SharedResourcePoolBuilder::new(Vec::new(), |v, i| v.push(i));
64 ///
65 /// // It is crucial that all additions finish their work, hence the `join`.
66 /// pool_builder.create_pool(
67 /// |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
68 /// |i| i + 3,
69 /// ).join().unwrap();
70 ///
71 /// // Now we are safe to run the multiplications.
72 /// pool_builder.create_pool(
73 /// |tx| (0..10).for_each(|i| tx.send(i).unwrap()),
74 /// |i| i * 2,
75 /// );
76 ///
77 /// let result = pool_builder.join().unwrap();
78 /// # }
79 /// ```
80 pub fn join(self) -> thread::Result<()> {
81 self.join_handle.join()?;
82
83 let (lock, cvar) = &*self.job_counter;
84 let mut job_counter = lock.lock().unwrap();
85 while *job_counter > 0 {
86 job_counter = cvar.wait(job_counter).unwrap();
87 }
88
89 Ok(())
90 }
91}