tokio_blocking/
threadpool.rs

1use crossbeam::channel::{unbounded, Receiver, Sender};
2use futures::{prelude::*, sync::oneshot};
3use log::*;
4use std::sync::Arc;
5
6enum Command {
7    Task(Box<dyn FnOnce() + Send + 'static>),
8    Close,
9}
10
11fn run(rx: Receiver<Command>) {
12    loop {
13        match rx.recv() {
14            Ok(task) => match task {
15                Command::Task(task) => task(),
16                Command::Close => break,
17            },
18            Err(_) => {
19                info!("Internal channel closed");
20                break;
21            }
22        }
23    }
24}
25
26struct Inner {
27    tx: Sender<Command>,
28    threads: Vec<std::thread::JoinHandle<()>>,
29}
30
31#[derive(Clone)]
32pub struct ThreadPool {
33    inner: Arc<Inner>,
34}
35
36impl ThreadPool {
37    pub fn new(num_threads: usize) -> Self {
38        let (tx, rx) = unbounded();
39
40        let threads = (0..num_threads)
41            .map(|_| {
42                let rx = rx.clone();
43
44                std::thread::spawn(move || run(rx))
45            })
46            .collect();
47
48        let inner = Arc::new(Inner { tx, threads });
49
50        Self { inner }
51    }
52
53    pub(crate) fn block<T, E, A, F>(&self, arg: A, f: F) -> Block<T, E>
54    where
55        F: FnOnce(A) -> Result<T, E> + Send + 'static,
56        T: Send + 'static,
57        E: Send + 'static,
58        A: Send + 'static,
59    {
60        let (tx, rx) = oneshot::channel();
61
62        let _ = self.inner.tx.send(Command::Task(Box::new(move || {
63            let _ = tx.send(f(arg));
64        })));
65
66        Block { inner: rx }
67    }
68
69    pub fn shutdown(self) {
70        for _ in 0..self.inner.threads.len() {
71            match self.inner.tx.send(Command::Close) {
72                Ok(_) => {}
73                Err(_) => error!("Couldn't send close command"),
74            }
75        }
76    }
77}
78
79pub struct Block<T, E> {
80    inner: oneshot::Receiver<Result<T, E>>,
81}
82
83impl<T, E> Future for Block<T, E> {
84    type Item = T;
85    type Error = E;
86
87    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
88        match self.inner.poll() {
89            Ok(Async::Ready(item)) => match item {
90                Ok(item) => Ok(Async::Ready(item)),
91                Err(err) => Err(err),
92            },
93            Ok(Async::NotReady) => Ok(Async::NotReady),
94            Err(_) => Ok(Async::NotReady),
95        }
96    }
97}