fibers/executor/
thread_pool.rs1use futures::{Async, Future};
5use nbchan::mpsc as nb_mpsc;
6use num_cpus;
7use std::io;
8use std::sync::mpsc::TryRecvError;
9use std::thread;
10use std::time;
11
12use super::Executor;
13use fiber::Task;
14use fiber::{self, Spawn};
15use io::poll;
16use sync::oneshot::{self, Link};
17
18#[derive(Debug)]
46pub struct ThreadPoolExecutor {
47 pool: SchedulerPool,
48 pollers: PollerPool,
49 spawn_rx: nb_mpsc::Receiver<Task>,
50 spawn_tx: nb_mpsc::Sender<Task>,
51 round: usize,
52 steps: usize,
53}
54impl ThreadPoolExecutor {
55 pub fn new() -> io::Result<Self> {
59 Self::with_thread_count(num_cpus::get() * 2)
60 }
61
62 pub fn with_thread_count(count: usize) -> io::Result<Self> {
79 assert!(count > 0);
80 let pollers = PollerPool::new(count)?;
81 let schedulers = SchedulerPool::new(&pollers);
82 let (tx, rx) = nb_mpsc::channel();
83 Ok(ThreadPoolExecutor {
84 pool: schedulers,
85 pollers,
86 spawn_tx: tx,
87 spawn_rx: rx,
88 round: 0,
89 steps: 0,
90 })
91 }
92}
93impl Executor for ThreadPoolExecutor {
94 type Handle = ThreadPoolExecutorHandle;
95 fn handle(&self) -> Self::Handle {
96 ThreadPoolExecutorHandle {
97 spawn_tx: self.spawn_tx.clone(),
98 }
99 }
100 fn run_once(&mut self) -> io::Result<()> {
101 match self.spawn_rx.try_recv() {
102 Err(TryRecvError::Empty) => {
103 thread::sleep(time::Duration::from_millis(1));
104 }
105 Err(TryRecvError::Disconnected) => unreachable!(),
106 Ok(task) => {
107 let i = self.round % self.pool.schedulers.len();
108 self.pool.schedulers[i].spawn_boxed(task.0);
109 self.round = self.round.wrapping_add(1);
110 }
111 }
112 self.steps = self.steps.wrapping_add(1);
113 let i = self.steps % self.pool.schedulers.len();
114 if self.pool.links[i].poll().is_err() {
115 Err(io::Error::new(
116 io::ErrorKind::Other,
117 format!("The {}-th scheduler thread is aborted", i),
118 ))
119 } else {
120 Ok(())
121 }
122 }
123}
124impl Spawn for ThreadPoolExecutor {
125 fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
126 self.handle().spawn_boxed(fiber)
127 }
128}
129
130#[derive(Debug, Clone)]
132pub struct ThreadPoolExecutorHandle {
133 spawn_tx: nb_mpsc::Sender<Task>,
134}
135impl Spawn for ThreadPoolExecutorHandle {
136 fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
137 let _ = self.spawn_tx.send(Task(fiber));
138 }
139}
140
141#[derive(Debug)]
142struct PollerPool {
143 pollers: Vec<poll::PollerHandle>,
144 links: Vec<Link<(), io::Error>>,
145}
146impl PollerPool {
147 pub fn new(pool_size: usize) -> io::Result<Self> {
148 let mut pollers = Vec::new();
149 let mut links = Vec::new();
150 for _ in 0..pool_size {
151 let (link0, mut link1) = oneshot::link();
152 let mut poller = poll::Poller::new()?;
153 links.push(link0);
154 pollers.push(poller.handle());
155 thread::spawn(move || {
156 while let Ok(Async::NotReady) = link1.poll() {
157 let timeout = time::Duration::from_millis(1);
158 if let Err(e) = poller.poll(Some(timeout)) {
159 link1.exit(Err(e));
160 return;
161 }
162 }
163 });
164 }
165 Ok(PollerPool { pollers, links })
166 }
167}
168
169#[derive(Debug)]
170struct SchedulerPool {
171 schedulers: Vec<fiber::SchedulerHandle>,
172 links: Vec<Link<(), ()>>,
173}
174impl SchedulerPool {
175 pub fn new(poller_pool: &PollerPool) -> Self {
176 let mut schedulers = Vec::new();
177 let mut links = Vec::new();
178 for poller in &poller_pool.pollers {
179 let (link0, mut link1) = oneshot::link();
180 let mut scheduler = fiber::Scheduler::new(poller.clone());
181 links.push(link0);
182 schedulers.push(scheduler.handle());
183 thread::spawn(move || {
184 while let Ok(Async::NotReady) = link1.poll() {
185 scheduler.run_once(true);
186 }
187 });
188 }
189 SchedulerPool { schedulers, links }
190 }
191}