tokio_blocking/
threadpool.rs1use crossbeam::channel::{unbounded, Receiver, Sender};
2use futures::{prelude::*, sync::oneshot};
3use log::*;
4use std::sync::Arc;
5
6enum Command {
7 Task(Box<dyn FnOnce() + Send + 'static>),
8 Close,
9}
10
11fn run(rx: Receiver<Command>) {
12 loop {
13 match rx.recv() {
14 Ok(task) => match task {
15 Command::Task(task) => task(),
16 Command::Close => break,
17 },
18 Err(_) => {
19 info!("Internal channel closed");
20 break;
21 }
22 }
23 }
24}
25
26struct Inner {
27 tx: Sender<Command>,
28 threads: Vec<std::thread::JoinHandle<()>>,
29}
30
31#[derive(Clone)]
32pub struct ThreadPool {
33 inner: Arc<Inner>,
34}
35
36impl ThreadPool {
37 pub fn new(num_threads: usize) -> Self {
38 let (tx, rx) = unbounded();
39
40 let threads = (0..num_threads)
41 .map(|_| {
42 let rx = rx.clone();
43
44 std::thread::spawn(move || run(rx))
45 })
46 .collect();
47
48 let inner = Arc::new(Inner { tx, threads });
49
50 Self { inner }
51 }
52
53 pub(crate) fn block<T, E, A, F>(&self, arg: A, f: F) -> Block<T, E>
54 where
55 F: FnOnce(A) -> Result<T, E> + Send + 'static,
56 T: Send + 'static,
57 E: Send + 'static,
58 A: Send + 'static,
59 {
60 let (tx, rx) = oneshot::channel();
61
62 let _ = self.inner.tx.send(Command::Task(Box::new(move || {
63 let _ = tx.send(f(arg));
64 })));
65
66 Block { inner: rx }
67 }
68
69 pub fn shutdown(self) {
70 for _ in 0..self.inner.threads.len() {
71 match self.inner.tx.send(Command::Close) {
72 Ok(_) => {}
73 Err(_) => error!("Couldn't send close command"),
74 }
75 }
76 }
77}
78
79pub struct Block<T, E> {
80 inner: oneshot::Receiver<Result<T, E>>,
81}
82
83impl<T, E> Future for Block<T, E> {
84 type Item = T;
85 type Error = E;
86
87 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
88 match self.inner.poll() {
89 Ok(Async::Ready(item)) => match item {
90 Ok(item) => Ok(Async::Ready(item)),
91 Err(err) => Err(err),
92 },
93 Ok(Async::NotReady) => Ok(Async::NotReady),
94 Err(_) => Ok(Async::NotReady),
95 }
96 }
97}