1#![allow(unused_imports, unused_variables, dead_code)]
2
3use std::sync::Arc;
4use std::rc::{Rc, Weak};
5use std::cell::RefCell;
6use std::thread::{self, JoinHandle, Thread};
7use futures::{executor, SinkExt, StreamExt, Future, Never, Poll, Async, Stream, FutureExt};
8use futures::stream::FuturesUnordered;
9use futures::task::{Context, Waker, LocalMap, Wake};
10use futures::executor::{enter, Executor, SpawnError, ThreadPool};
11use futures::channel::mpsc::{self, Sender};
12
13
14#[derive(Debug, Fail)]
16pub enum WorkPoolError {
17 #[fail(display = "{}", _0)]
18 StdIo(#[cause] ::std::io::Error),
19 #[fail(display = "{}", _0)]
20 FuturesMpscSend(#[cause] ::futures::channel::mpsc::SendError),
21}
22
23impl From<::std::io::Error> for WorkPoolError {
24 fn from(err: ::std::io::Error) -> WorkPoolError {
25 WorkPoolError::StdIo(err)
26 }
27}
28
29impl From<::futures::channel::mpsc::SendError> for WorkPoolError {
30 fn from(err: ::futures::channel::mpsc::SendError) -> WorkPoolError {
31 WorkPoolError::FuturesMpscSend(err)
32 }
33}
34
35
36struct ThreadNotify {
37 thread: Thread,
38}
39
40thread_local! {
41 static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
42 thread: thread::current(),
43 });
44}
45
46impl ThreadNotify {
47 fn with_current<R, F>(f: F) -> R
48 where F: FnOnce(&Arc<ThreadNotify>) -> R {
49 CURRENT_THREAD_NOTIFY.with(f)
50 }
51
52 fn park(&self) {
53 thread::park();
54 }
55}
56
57impl Wake for ThreadNotify {
58 fn wake(arc_self: &Arc<Self>) {
59 arc_self.thread.unpark();
60 }
61}
62
63
64struct Task {
66 fut: Box<Future<Item = (), Error = Never>>,
67 map: LocalMap,
68}
69
70impl Future for Task {
71 type Item = ();
72 type Error = Never;
73
74 fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> {
75 self.fut.poll(&mut cx.with_locals(&mut self.map))
76 }
77}
78
79
80struct WorkPoolCore {
82 pool: FuturesUnordered<Task>,
83 incoming: Rc<RefCell<Vec<Task>>>,
84 thread_pool: ThreadPool,
85}
86
87impl WorkPoolCore {
88 pub fn new() -> Result<WorkPoolCore, WorkPoolError> {
90 Ok(WorkPoolCore {
91 pool: FuturesUnordered::new(),
92 incoming: Default::default(),
93 thread_pool: ThreadPool::builder()
94 .name_prefix("work_pool_thread-")
95 .create()?,
96 })
97 }
98
99 fn poll_pool(&mut self, waker: &Waker) -> Async<()> {
102 let mut pool_map = LocalMap::new();
104 let mut pool_cx = Context::new(&mut pool_map, waker, &mut self.thread_pool);
105
106 loop {
107 {
109 let mut incoming = self.incoming.borrow_mut();
110 for task in incoming.drain(..) {
111 self.pool.push(task)
112 }
113 }
114
115 if let Ok(ret) = self.pool.poll_next(&mut pool_cx) {
116 if !self.incoming.borrow().is_empty() {
118 continue;
119 }
120
121 match ret {
123 Async::Pending => return Async::Pending,
124 Async::Ready(None) => return Async::Ready(()),
125 _ => {}
126 }
127 }
128 }
129 }
130
131 pub fn run(&mut self) {
132 let _enter = enter().expect("cannot execute `WorkPool` \
133 executor from within another executor");
134
135 ThreadNotify::with_current(|thread| {
136 let waker = &Waker::from(thread.clone());
137 loop {
138 if let Async::Ready(t) = self.poll_pool(waker) {
139 return t;
140 }
141 thread.park();
142 }
143 })
144 }
145
146 fn spawn(&mut self, f: Box<Future<Item = (), Error = Never> + Send>) -> Result<(), SpawnError> {
147 let task = Task {
148 fut: f,
149 map: LocalMap::new(),
150 };
151
152 self.incoming.borrow_mut().push(task);
153 Ok(())
154 }
155}
156
157
158pub struct WorkPool {
165 core_tx: Option<Sender<Box<Future<Item = (), Error = Never> + Send>>>,
166 core_thread: Option<JoinHandle<()>>,
167}
168
169impl WorkPool {
170 pub fn new(buffer_size: usize) -> Result<WorkPool, WorkPoolError> {
172 let (core_tx, core_rx) = mpsc::channel(buffer_size);
173 let core_thread_pre = "work_pool_core-".to_owned();
174
175 let core_thread: JoinHandle<_> = thread::Builder::new()
176 .name(core_thread_pre).spawn(move || {
177 let mut core = WorkPoolCore::new().unwrap();
178 let work = Box::new(core_rx.for_each(|_| Ok(())).map(|_| ()));
179 core.spawn(work).unwrap();
180 core.run();
181 }).unwrap();
182
183 Ok(WorkPool {
184 core_tx: Some(core_tx),
185 core_thread: Some(core_thread),
186 })
187 }
188
189 pub fn complete<F>(&mut self, future: F) -> Result<(), WorkPoolError>
192 where F: Future<Item = (), Error = Never> + Send + 'static {
193 let tx = self.core_tx.take().unwrap();
194 self.core_tx.get_or_insert(executor::block_on(tx.send(Box::new(future)))?);
195 Ok(())
196 }
197}
198
199impl Drop for WorkPool {
200 fn drop(&mut self) {
205 self.core_tx.take().unwrap().close_channel();
206 self.core_thread.take().unwrap().join().expect("Error joining `WorkPool` thread");
207 }
208}