fibers/executor/
thread_pool.rs

1// Copyright (c) 2016 DWANGO Co., Ltd. All Rights Reserved.
2// See the LICENSE file at the top-level directory of this distribution.
3
4use futures::{Async, Future};
5use nbchan::mpsc as nb_mpsc;
6use num_cpus;
7use std::io;
8use std::sync::mpsc::TryRecvError;
9use std::thread;
10use std::time;
11
12use super::Executor;
13use fiber::Task;
14use fiber::{self, Spawn};
15use io::poll;
16use sync::oneshot::{self, Link};
17
18/// An executor that executes spawned fibers on pooled threads.
19///
20/// # Examples
21///
22/// An example to calculate fibonacci numbers:
23///
24/// ```
25/// # extern crate fibers;
26/// # extern crate futures;
27/// use fibers::{Spawn, Executor, ThreadPoolExecutor};
28/// use futures::{Async, Future};
29///
30/// fn fib<H: Spawn + Clone>(n: usize, handle: H) -> Box<dyn Future<Item=usize, Error=()> + Send> {
31///     if n < 2 {
32///         Box::new(futures::finished(n))
33///     } else {
34///         let f0 = handle.spawn_monitor(fib(n - 1, handle.clone()));
35///         let f1 = handle.spawn_monitor(fib(n - 2, handle.clone()));
36///         Box::new(f0.join(f1).map(|(a0, a1)| a0 + a1).map_err(|_| ()))
37///     }
38/// }
39///
40/// let mut executor = ThreadPoolExecutor::new().unwrap();
41/// let monitor = executor.spawn_monitor(fib(7, executor.handle()));
42/// let answer = executor.run_fiber(monitor).unwrap();
43/// assert_eq!(answer, Ok(13));
44/// ```
45#[derive(Debug)]
46pub struct ThreadPoolExecutor {
47    pool: SchedulerPool,
48    pollers: PollerPool,
49    spawn_rx: nb_mpsc::Receiver<Task>,
50    spawn_tx: nb_mpsc::Sender<Task>,
51    round: usize,
52    steps: usize,
53}
54impl ThreadPoolExecutor {
55    /// Creates a new instance of `ThreadPoolExecutor`.
56    ///
57    /// This is equivalent to `ThreadPoolExecutor::with_thread_count(num_cpus::get() * 2)`.
58    pub fn new() -> io::Result<Self> {
59        Self::with_thread_count(num_cpus::get() * 2)
60    }
61
62    /// Creates a new instance of `ThreadPoolExecutor` with the specified size of thread pool.
63    ///
64    /// # Implementation Details
65    ///
66    /// Note that current implementation is very naive and
67    /// should be improved in future releases.
68    ///
69    /// Internally, `count` threads are assigned to each of
70    /// the scheduler (i.e., `fibers::fiber::Scheduler`) and
71    /// the I/O poller (i.e., `fibers::io::poll::Poller`).
72    ///
73    /// When `spawn` function is called, the executor will assign a scheduler (thread)
74    /// for the fiber in simple round robin fashion.
75    ///
76    /// If any of those threads are aborted, the executor will return an error as
77    /// a result of `run_once` method call after that.
78    pub fn with_thread_count(count: usize) -> io::Result<Self> {
79        assert!(count > 0);
80        let pollers = PollerPool::new(count)?;
81        let schedulers = SchedulerPool::new(&pollers);
82        let (tx, rx) = nb_mpsc::channel();
83        Ok(ThreadPoolExecutor {
84            pool: schedulers,
85            pollers,
86            spawn_tx: tx,
87            spawn_rx: rx,
88            round: 0,
89            steps: 0,
90        })
91    }
92}
93impl Executor for ThreadPoolExecutor {
94    type Handle = ThreadPoolExecutorHandle;
95    fn handle(&self) -> Self::Handle {
96        ThreadPoolExecutorHandle {
97            spawn_tx: self.spawn_tx.clone(),
98        }
99    }
100    fn run_once(&mut self) -> io::Result<()> {
101        match self.spawn_rx.try_recv() {
102            Err(TryRecvError::Empty) => {
103                thread::sleep(time::Duration::from_millis(1));
104            }
105            Err(TryRecvError::Disconnected) => unreachable!(),
106            Ok(task) => {
107                let i = self.round % self.pool.schedulers.len();
108                self.pool.schedulers[i].spawn_boxed(task.0);
109                self.round = self.round.wrapping_add(1);
110            }
111        }
112        self.steps = self.steps.wrapping_add(1);
113        let i = self.steps % self.pool.schedulers.len();
114        if self.pool.links[i].poll().is_err() {
115            Err(io::Error::new(
116                io::ErrorKind::Other,
117                format!("The {}-th scheduler thread is aborted", i),
118            ))
119        } else {
120            Ok(())
121        }
122    }
123}
124impl Spawn for ThreadPoolExecutor {
125    fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
126        self.handle().spawn_boxed(fiber)
127    }
128}
129
130/// A handle of a `ThreadPoolExecutor` instance.
131#[derive(Debug, Clone)]
132pub struct ThreadPoolExecutorHandle {
133    spawn_tx: nb_mpsc::Sender<Task>,
134}
135impl Spawn for ThreadPoolExecutorHandle {
136    fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
137        let _ = self.spawn_tx.send(Task(fiber));
138    }
139}
140
141#[derive(Debug)]
142struct PollerPool {
143    pollers: Vec<poll::PollerHandle>,
144    links: Vec<Link<(), io::Error>>,
145}
146impl PollerPool {
147    pub fn new(pool_size: usize) -> io::Result<Self> {
148        let mut pollers = Vec::new();
149        let mut links = Vec::new();
150        for _ in 0..pool_size {
151            let (link0, mut link1) = oneshot::link();
152            let mut poller = poll::Poller::new()?;
153            links.push(link0);
154            pollers.push(poller.handle());
155            thread::spawn(move || {
156                while let Ok(Async::NotReady) = link1.poll() {
157                    let timeout = time::Duration::from_millis(1);
158                    if let Err(e) = poller.poll(Some(timeout)) {
159                        link1.exit(Err(e));
160                        return;
161                    }
162                }
163            });
164        }
165        Ok(PollerPool { pollers, links })
166    }
167}
168
169#[derive(Debug)]
170struct SchedulerPool {
171    schedulers: Vec<fiber::SchedulerHandle>,
172    links: Vec<Link<(), ()>>,
173}
174impl SchedulerPool {
175    pub fn new(poller_pool: &PollerPool) -> Self {
176        let mut schedulers = Vec::new();
177        let mut links = Vec::new();
178        for poller in &poller_pool.pollers {
179            let (link0, mut link1) = oneshot::link();
180            let mut scheduler = fiber::Scheduler::new(poller.clone());
181            links.push(link0);
182            schedulers.push(scheduler.handle());
183            thread::spawn(move || {
184                while let Ok(Async::NotReady) = link1.poll() {
185                    scheduler.run_once(true);
186                }
187            });
188        }
189        SchedulerPool { schedulers, links }
190    }
191}