libmhash/
tag_thread_pool.rs

1use std::{collections::HashMap, hash::Hash, thread};
2
3enum Operation<K>
4where
5    K: Eq + Hash + Send + Clone + 'static,
6{
7    Add(Job<K>),
8    Done(K),
9    DoneBy(Box<dyn Fn(&K) -> bool + Send + 'static>),
10}
11
12enum ExecutorOperation {
13    Job(Box<dyn FnOnce() + Send>),
14    Done,
15}
16
17struct Job<K>
18where
19    K: Eq + Hash + Send + Clone + 'static,
20{
21    pub tag: K,
22    pub job: Box<dyn FnOnce() + Send>,
23}
24
25impl<K> Job<K>
26where
27    K: Eq + Hash + Send + Clone + 'static,
28{
29    fn new(tag: K, job: Box<dyn FnOnce() + Send>) -> Self {
30        Self { tag, job }
31    }
32}
33
34pub struct TagThreadPool<K>
35where
36    K: Eq + Hash + Send + Clone + 'static,
37{
38    dispatcher: crossbeam_channel::Sender<Operation<K>>,
39}
40
41impl<K> TagThreadPool<K>
42where
43    K: Eq + Hash + Send + Clone + 'static,
44{
45    pub fn new() -> Self {
46        let (dispatcher, receiver) = crossbeam_channel::unbounded::<Operation<K>>();
47        let thread_pool = threadpool::Builder::new().build();
48        let dispatcher_for_finish = dispatcher.clone();
49        thread::spawn(move || {
50            let mut job_senders = HashMap::new();
51
52            loop {
53                let operation = match receiver.recv() {
54                    Ok(operation) => operation,
55                    Err(_) => break,
56                };
57
58                let _ = match operation {
59                    Operation::Add(job) => job_senders
60                        .entry(job.tag.clone())
61                        .or_insert_with(|| {
62                            let (sender, receiver) =
63                                crossbeam_channel::unbounded::<ExecutorOperation>();
64
65                            thread_pool.execute(move || loop {
66                                if let Ok(operation) = receiver.recv() {
67                                    match operation {
68                                        ExecutorOperation::Job(job) => (job)(),
69                                        ExecutorOperation::Done => break,
70                                    }
71                                }
72                            });
73
74                            sender
75                        })
76                        .send(ExecutorOperation::Job(job.job)),
77                    Operation::Done(tag) => match job_senders.remove(&tag) {
78                        Some(sender) => sender.send(ExecutorOperation::Done),
79                        None => Ok(()),
80                    },
81                    Operation::DoneBy(filter) => {
82                        for key in job_senders.keys().filter(|k| (filter)(k)) {
83                            let _ = dispatcher_for_finish.send(Operation::Done(key.clone()));
84                        }
85                        Ok(())
86                    }
87                };
88            }
89        });
90
91        TagThreadPool { dispatcher }
92    }
93
94    pub fn dispatch<F>(&self, tag: K, job: F)
95    where
96        F: FnOnce() + Send + 'static,
97    {
98        self.dispatcher
99            .send(Operation::Add(Job::new(tag, Box::new(job))))
100            .unwrap();
101    }
102
103    pub fn finish(&self, tag: K) {
104        self.dispatcher.send(Operation::Done(tag)).unwrap();
105    }
106
107    pub fn finish_by<F>(&self, filter: F)
108    where
109        F: Fn(&K) -> bool + Send + 'static,
110    {
111        self.dispatcher
112            .send(Operation::DoneBy(Box::new(filter)))
113            .unwrap();
114    }
115}