ocl_extras/
work_pool.rs

1#![allow(unused_imports, unused_variables, dead_code)]
2
3use std::sync::Arc;
4use std::rc::{Rc, Weak};
5use std::cell::RefCell;
6use std::thread::{self, JoinHandle, Thread};
7use futures::{executor, SinkExt, StreamExt, Future, Never, Poll, Async, Stream, FutureExt};
8use futures::stream::FuturesUnordered;
9use futures::task::{Context, Waker, LocalMap, Wake};
10use futures::executor::{enter, Executor, SpawnError, ThreadPool};
11use futures::channel::mpsc::{self, Sender};
12
13
14/// An error associated with `WorkPool`.
15#[derive(Debug, Fail)]
16pub enum WorkPoolError {
17    #[fail(display = "{}", _0)]
18    StdIo(#[cause] ::std::io::Error),
19    #[fail(display = "{}", _0)]
20    FuturesMpscSend(#[cause] ::futures::channel::mpsc::SendError),
21}
22
23impl From<::std::io::Error> for WorkPoolError {
24    fn from(err: ::std::io::Error) -> WorkPoolError {
25        WorkPoolError::StdIo(err)
26    }
27}
28
29impl From<::futures::channel::mpsc::SendError> for WorkPoolError {
30    fn from(err: ::futures::channel::mpsc::SendError) -> WorkPoolError {
31        WorkPoolError::FuturesMpscSend(err)
32    }
33}
34
35
36struct ThreadNotify {
37    thread: Thread,
38}
39
40thread_local! {
41    static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
42        thread: thread::current(),
43    });
44}
45
46impl ThreadNotify {
47    fn with_current<R, F>(f: F) -> R
48            where F: FnOnce(&Arc<ThreadNotify>) -> R {
49        CURRENT_THREAD_NOTIFY.with(f)
50    }
51
52    fn park(&self) {
53        thread::park();
54    }
55}
56
57impl Wake for ThreadNotify {
58    fn wake(arc_self: &Arc<Self>) {
59        arc_self.thread.unpark();
60    }
61}
62
63
64/// A work pool task.
65struct Task {
66    fut: Box<Future<Item = (), Error = Never>>,
67    map: LocalMap,
68}
69
70impl Future for Task {
71    type Item = ();
72    type Error = Never;
73
74    fn poll(&mut self, cx: &mut Context) -> Poll<(), Never> {
75        self.fut.poll(&mut cx.with_locals(&mut self.map))
76    }
77}
78
79
80/// The event loop components of a `WorkPool`.
81struct WorkPoolCore {
82    pool: FuturesUnordered<Task>,
83    incoming: Rc<RefCell<Vec<Task>>>,
84    thread_pool: ThreadPool,
85}
86
87impl WorkPoolCore {
88    /// Create a new, empty work pool.
89    pub fn new() -> Result<WorkPoolCore, WorkPoolError> {
90        Ok(WorkPoolCore {
91            pool: FuturesUnordered::new(),
92            incoming: Default::default(),
93            thread_pool: ThreadPool::builder()
94                .name_prefix("work_pool_thread-")
95                .create()?,
96        })
97    }
98
99    // Make maximal progress on the entire pool of spawned task, returning `Ready`
100    // if the pool is empty and `Pending` if no further progress can be made.
101    fn poll_pool(&mut self, waker: &Waker) -> Async<()> {
102        // state for the FuturesUnordered, which will never be used
103        let mut pool_map = LocalMap::new();
104        let mut pool_cx = Context::new(&mut pool_map, waker, &mut self.thread_pool);
105
106        loop {
107            // empty the incoming queue of newly-spawned tasks
108            {
109                let mut incoming = self.incoming.borrow_mut();
110                for task in incoming.drain(..) {
111                    self.pool.push(task)
112                }
113            }
114
115            if let Ok(ret) = self.pool.poll_next(&mut pool_cx) {
116                // we queued up some new tasks; add them and poll again
117                if !self.incoming.borrow().is_empty() {
118                    continue;
119                }
120
121                // no queued tasks; we may be done
122                match ret {
123                    Async::Pending => return Async::Pending,
124                    Async::Ready(None) => return Async::Ready(()),
125                    _ => {}
126                }
127            }
128        }
129    }
130
131    pub fn run(&mut self) {
132        let _enter = enter().expect("cannot execute `WorkPool` \
133            executor from within another executor");
134
135        ThreadNotify::with_current(|thread| {
136            let waker = &Waker::from(thread.clone());
137            loop {
138                if let Async::Ready(t) = self.poll_pool(waker) {
139                    return t;
140                }
141                thread.park();
142            }
143        })
144    }
145
146    fn spawn(&mut self, f: Box<Future<Item = (), Error = Never> + Send>) -> Result<(), SpawnError> {
147        let task = Task {
148            fut: f,
149            map: LocalMap::new(),
150        };
151
152        self.incoming.borrow_mut().push(task);
153        Ok(())
154    }
155}
156
157
158/// A general purpose work completion pool.
159///
160/// Contains elements of a single-threaded event loop and a thread pool.
161///
162/// Runs in and manages its own threads. Dropping the `WorkPool` will block
163/// the dropping thread until all submitted and spawned work is complete.
164pub struct WorkPool {
165    core_tx: Option<Sender<Box<Future<Item = (), Error = Never> + Send>>>,
166    core_thread: Option<JoinHandle<()>>,
167}
168
169impl WorkPool {
170    /// Create a new, empty work pool.
171    pub fn new(buffer_size: usize) -> Result<WorkPool, WorkPoolError> {
172        let (core_tx, core_rx) = mpsc::channel(buffer_size);
173        let core_thread_pre = "work_pool_core-".to_owned();
174
175        let core_thread: JoinHandle<_> = thread::Builder::new()
176                .name(core_thread_pre).spawn(move || {
177            let mut core = WorkPoolCore::new().unwrap();
178            let work = Box::new(core_rx.for_each(|_| Ok(())).map(|_| ()));
179            core.spawn(work).unwrap();
180            core.run();
181        }).unwrap();
182
183        Ok(WorkPool {
184            core_tx: Some(core_tx),
185            core_thread: Some(core_thread),
186        })
187    }
188
189    /// Submits a future which need only be polled to completion and that
190    /// contains no intensive CPU work (including memcpy).
191    pub fn complete<F>(&mut self, future: F) -> Result<(), WorkPoolError>
192            where F: Future<Item = (), Error = Never> + Send + 'static {
193        let tx = self.core_tx.take().unwrap();
194        self.core_tx.get_or_insert(executor::block_on(tx.send(Box::new(future)))?);
195        Ok(())
196    }
197}
198
199impl Drop for WorkPool {
200    /// Blocks the dropping thread until all submitted *and* all spawned work
201    /// is complete.
202    //
203    // TODO: Guarantee above.
204    fn drop(&mut self) {
205        self.core_tx.take().unwrap().close_channel();
206        self.core_thread.take().unwrap().join().expect("Error joining `WorkPool` thread");
207    }
208}