libmhash/
tag_thread_pool.rs1use std::{collections::HashMap, hash::Hash, thread};
2
3enum Operation<K>
4where
5 K: Eq + Hash + Send + Clone + 'static,
6{
7 Add(Job<K>),
8 Done(K),
9 DoneBy(Box<dyn Fn(&K) -> bool + Send + 'static>),
10}
11
12enum ExecutorOperation {
13 Job(Box<dyn FnOnce() + Send>),
14 Done,
15}
16
17struct Job<K>
18where
19 K: Eq + Hash + Send + Clone + 'static,
20{
21 pub tag: K,
22 pub job: Box<dyn FnOnce() + Send>,
23}
24
25impl<K> Job<K>
26where
27 K: Eq + Hash + Send + Clone + 'static,
28{
29 fn new(tag: K, job: Box<dyn FnOnce() + Send>) -> Self {
30 Self { tag, job }
31 }
32}
33
34pub struct TagThreadPool<K>
35where
36 K: Eq + Hash + Send + Clone + 'static,
37{
38 dispatcher: crossbeam_channel::Sender<Operation<K>>,
39}
40
41impl<K> TagThreadPool<K>
42where
43 K: Eq + Hash + Send + Clone + 'static,
44{
45 pub fn new() -> Self {
46 let (dispatcher, receiver) = crossbeam_channel::unbounded::<Operation<K>>();
47 let thread_pool = threadpool::Builder::new().build();
48 let dispatcher_for_finish = dispatcher.clone();
49 thread::spawn(move || {
50 let mut job_senders = HashMap::new();
51
52 loop {
53 let operation = match receiver.recv() {
54 Ok(operation) => operation,
55 Err(_) => break,
56 };
57
58 let _ = match operation {
59 Operation::Add(job) => job_senders
60 .entry(job.tag.clone())
61 .or_insert_with(|| {
62 let (sender, receiver) =
63 crossbeam_channel::unbounded::<ExecutorOperation>();
64
65 thread_pool.execute(move || loop {
66 if let Ok(operation) = receiver.recv() {
67 match operation {
68 ExecutorOperation::Job(job) => (job)(),
69 ExecutorOperation::Done => break,
70 }
71 }
72 });
73
74 sender
75 })
76 .send(ExecutorOperation::Job(job.job)),
77 Operation::Done(tag) => match job_senders.remove(&tag) {
78 Some(sender) => sender.send(ExecutorOperation::Done),
79 None => Ok(()),
80 },
81 Operation::DoneBy(filter) => {
82 for key in job_senders.keys().filter(|k| (filter)(k)) {
83 let _ = dispatcher_for_finish.send(Operation::Done(key.clone()));
84 }
85 Ok(())
86 }
87 };
88 }
89 });
90
91 TagThreadPool { dispatcher }
92 }
93
94 pub fn dispatch<F>(&self, tag: K, job: F)
95 where
96 F: FnOnce() + Send + 'static,
97 {
98 self.dispatcher
99 .send(Operation::Add(Job::new(tag, Box::new(job))))
100 .unwrap();
101 }
102
103 pub fn finish(&self, tag: K) {
104 self.dispatcher.send(Operation::Done(tag)).unwrap();
105 }
106
107 pub fn finish_by<F>(&self, filter: F)
108 where
109 F: Fn(&K) -> bool + Send + 'static,
110 {
111 self.dispatcher
112 .send(Operation::DoneBy(Box::new(filter)))
113 .unwrap();
114 }
115}