gj/
lib.rs

1// Copyright (c) 2013-2015 Sandstorm Development Group, Inc. and contributors
2// Licensed under the MIT License:
3//
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20// THE SOFTWARE.
21
22//! A library providing high-level abstractions for event loop concurrency,
23//! heavily borrowing ideas from [KJ](https://capnproto.org/cxxrpc.html#kj-concurrency-framework).
24//! Allows for coordination of asynchronous tasks using [promises](struct.Promise.html) as
25//! a basic building block.
26//!
27//! # Example
28//!
29//! ```
30//! use gj::{EventLoop, Promise, ClosedEventPort};
31//! EventLoop::top_level(|wait_scope| -> Result<(),()> {
32//!     let (promise1, fulfiller1) = Promise::<(),()>::and_fulfiller();
33//!     let (promise2, fulfiller2) = Promise::<(),()>::and_fulfiller();
34//!     let promise3 = promise2.then(|_| {
35//!         println!("world");
36//!         Promise::ok(())
37//!     });
38//!     let promise4 = promise1.then(move |_| {
39//!         println!("hello ");
40//!         fulfiller2.fulfill(());
41//!         Promise::ok(())
42//!     });
43//!     fulfiller1.fulfill(());
44//!     Promise::all(vec![promise3, promise4].into_iter())
45//!         .map(|_| Ok(()))
46//!         .wait(wait_scope, &mut ClosedEventPort(()))
47//! }).expect("top level");
48//! ```
49
50
51use std::cell::{Cell, RefCell};
52use std::rc::Rc;
53use std::result::Result;
54use private::{promise_node, Event, BoolEvent, PromiseAndFulfillerHub, PromiseAndFulfillerWrapper,
55              EVENT_LOOP, with_current_event_loop, PromiseNode};
56
57
58/// Like `try!()`, but for functions that return a [`Promise<T, E>`](struct.Promise.html) rather
59/// than a `Result<T, E>`.
60///
61/// Unwraps a `Result<T, E>`. In the case of an error `Err(e)`, immediately returns from the
62/// enclosing function with `Promise::err(e)`.
63#[macro_export]
64macro_rules! pry {
65    ($expr:expr) => (match $expr {
66        ::std::result::Result::Ok(val) => val,
67        ::std::result::Result::Err(err) => {
68            return $crate::Promise::err(::std::convert::From::from(err))
69        }
70    })
71}
72
73mod private;
74mod handle_table;
75
76/// A computation that might eventually resolve to a value of type `T` or to an error
77///  of type `E`. Dropping the promise cancels the computation.
78#[must_use]
79pub struct Promise<T, E> where T: 'static, E: 'static {
80    node: Box<PromiseNode<T, E>>,
81}
82
83impl <T, E> Promise <T, E> {
84    /// Creates a new promise that has already been fulfilled with the given value.
85    pub fn ok(value: T) -> Promise<T, E> {
86        Promise { node: Box::new(promise_node::Immediate::new(Ok(value))) }
87    }
88
89    /// Creates a new promise that has already been rejected with the given error.
90    pub fn err(error: E) -> Promise<T, E> {
91        Promise { node: Box::new(promise_node::Immediate::new(Err(error))) }
92    }
93
94    /// Creates a new promise that never resolves.
95    pub fn never_done() -> Promise<T, E> {
96        Promise { node: Box::new(promise_node::NeverDone::new()) }
97    }
98
99    /// Creates a new promise/fulfiller pair.
100    pub fn and_fulfiller() -> (Promise<T, E>, PromiseFulfiller<T, E>)
101        where E: FulfillerDropped
102    {
103        let result = Rc::new(RefCell::new(PromiseAndFulfillerHub::new()));
104        let result_promise: Promise<T, E> =
105            Promise { node: Box::new(PromiseAndFulfillerWrapper::new(result.clone()))};
106        (result_promise, PromiseFulfiller{ hub: result, done: false })
107    }
108
109    /// Chains further computation to be executed once the promise resolves.
110    /// When the promise is fulfilled or rejected, invokes `func` on its result.
111    ///
112    /// If the returned promise is dropped before the chained computation runs, the chained
113    /// computation will be cancelled.
114    ///
115    /// Always returns immediately, even if the promise is already resolved. The earliest that
116    /// `func` might be invoked is during the next `turn()` of the event loop.
117    pub fn then_else<F, T1, E1>(self, func: F) -> Promise<T1, E1>
118        where F: 'static,
119              F: FnOnce(Result<T, E>) -> Promise<T1, E1>,
120    {
121        let intermediate = Box::new(promise_node::Transform::new(self.node, |x| Ok(func(x)) ));
122        Promise { node: Box::new(promise_node::Chain::new(intermediate)) }
123    }
124
125    /// Calls `then_else()` with a default error handler that simply propagates all errors.
126    pub fn then<F, T1>(self, func: F) -> Promise<T1, E>
127        where F: 'static,
128              F: FnOnce(T) -> Promise<T1, E>,
129    {
130        self.then_else(|r| { match r { Ok(v) => func(v), Err(e) => Promise::err(e) } })
131    }
132
133    /// Like `then_else()` but for a `func` that returns a direct value rather than a promise. As an
134    /// optimization, execution of `func` is delayed until its result is known to be needed. The
135    /// expectation here is that `func` is just doing some transformation on the results, not
136    /// scheduling any other actions, and therefore the system does not need to be proactive about
137    /// evaluating it. This way, a chain of trivial `map()` transformations can be executed all at
138    /// once without repeated rescheduling through the event loop. Use the `eagerly_evaluate()`
139    /// method to suppress this behavior.
140    pub fn map_else<F, T1, E1>(self, func: F) -> Promise<T1, E1>
141        where F: 'static,
142              F: FnOnce(Result<T, E>) -> Result<T1, E1>,
143    {
144        Promise { node: Box::new(promise_node::Transform::new(self.node, func)) }
145    }
146
147    /// Calls `map_else()` with a default error handler that simply propagates all errors.
148    pub fn map<F, R>(self, func: F) -> Promise<R, E>
149        where F: 'static,
150              F: FnOnce(T) -> Result<R, E>,
151              R: 'static
152    {
153        self.map_else(|r| { match r { Ok(v) => func(v), Err(e) =>  Err(e) } } )
154    }
155
156    /// Transforms the error branch of the promise.
157    pub fn map_err<E1, F>(self, func: F) -> Promise<T, E1>
158        where F: 'static,
159              F: FnOnce(E) -> E1
160    {
161        self.map_else(|r| { match r { Ok(v) => Ok(v), Err(e) =>  Err(func(e)) } } )
162    }
163
164    /// Maps errors into a more general type.
165    pub fn lift<E1>(self) -> Promise<T, E1> where E: Into<E1> {
166        self.map_err(|e| e.into())
167    }
168
169    /// Returns a new promise that resolves when either `self` or `other` resolves. The promise that
170    /// doesn't resolve first is cancelled.
171    pub fn exclusive_join(self, other: Promise<T, E>) -> Promise<T, E> {
172        Promise { node: Box::new(private::promise_node::ExclusiveJoin::new(self.node, other.node)) }
173    }
174
175    /// Transforms a collection of promises into a promise for a vector. If any of
176    /// the promises fails, immediately cancels the remaining promises.
177    pub fn all<I>(promises: I) -> Promise<Vec<T>, E>
178        where I: Iterator<Item=Promise<T, E>>
179    {
180        Promise { node: Box::new(private::promise_node::ArrayJoin::new(promises)) }
181    }
182
183    /// Forks the promise, so that multiple different clients can independently wait on the result.
184    pub fn fork(self) -> ForkedPromise<T, E> where T: Clone, E: Clone {
185        ForkedPromise::new(self)
186    }
187
188    /// Holds onto `value` until the promise resolves, then drops `value`.
189    pub fn attach<U>(self, value: U) -> Promise<T, E> where U: 'static {
190        self.map_else(move |result| { drop(value); result })
191    }
192
193    /// Forces eager evaluation of this promise.  Use this if you are going to hold on to the promise
194    /// for a while without consuming the result, but you want to make sure that the system actually
195    /// processes it.
196    pub fn eagerly_evaluate(self) -> Promise<T, E> {
197        self.then(|v| { Promise::ok(v) })
198    }
199
200    /// Runs the event loop until the promise is fulfilled.
201    ///
202    /// The `WaitScope` argument ensures that `wait()` can only be called at the top level of a program.
203    /// Waiting within event callbacks is disallowed.
204    pub fn wait<E1>(mut self, wait_scope: &WaitScope, event_source: &mut EventPort<E1>) -> Result<T, E>
205        where E: From<E1>
206    {
207        drop(wait_scope);
208        with_current_event_loop(move |event_loop| {
209            let fired = ::std::rc::Rc::new(::std::cell::Cell::new(false));
210            let done_event = BoolEvent::new(fired.clone());
211            let (handle, _dropper) = private::GuardedEventHandle::new();
212            handle.set(Box::new(done_event));
213            self.node.on_ready(handle);
214
215            //event_loop.running = true;
216
217            while !fired.get() {
218                if !event_loop.turn() {
219                    // No events in the queue.
220                    try!(event_source.wait());
221                }
222            }
223
224            self.node.get()
225        })
226    }
227}
228
229/// A scope in which asynchronous programming can occur. Corresponds to the top level scope
230/// of some [event loop](struct.EventLoop.html). Can be used to [wait](struct.Promise.html#method.wait)
231/// for the result of a promise.
232pub struct WaitScope(::std::marker::PhantomData<*mut u8>); // impl !Sync for WaitScope {}
233
234/// The result of `Promise::fork()`. Allows branches to be created. Dropping the `ForkedPromise`
235/// along with any branches created through `add_branch()` will cancel the computation.
236pub struct ForkedPromise<T, E> where T: 'static + Clone, E: 'static + Clone {
237    hub: Rc<RefCell<promise_node::ForkHub<T, E>>>,
238}
239
240impl <T, E> ForkedPromise<T, E> where T: 'static + Clone, E: 'static + Clone {
241    fn new(inner: Promise<T, E>) -> ForkedPromise<T, E> {
242        ForkedPromise {
243            hub: Rc::new(RefCell::new(promise_node::ForkHub::new(inner.node))),
244        }
245    }
246
247    /// Creates a new promise that will resolve when the originally forked promise resolves.
248    pub fn add_branch(&mut self) -> Promise<T, E> {
249        promise_node::ForkHub::add_branch(&self.hub)
250    }
251}
252
253/// Interface between an `EventLoop` and events originating from outside of the loop's thread.
254/// Needed in [`Promise::wait()`](struct.Promise.html#method.wait).
255pub trait EventPort<E> {
256    /// Waits for an external event to arrive, blocking the thread if necessary.
257    fn wait(&mut self) -> Result<(), E>;
258}
259
260/// An event port that never emits any events. On wait() it returns the error it was constructed with.
261pub struct ClosedEventPort<E: Clone>(pub E);
262
263impl <E: Clone> EventPort<E> for ClosedEventPort<E> {
264    fn wait(&mut self) -> Result<(), E> {
265        Err(self.0.clone())
266    }
267}
268
269/// A queue of events being executed in a loop on a single thread.
270pub struct EventLoop {
271//    daemons: TaskSetImpl,
272    _running: bool,
273    _last_runnable_state: bool,
274    events: RefCell<handle_table::HandleTable<private::EventNode>>,
275    head: private::EventHandle,
276    tail: Cell<private::EventHandle>,
277    depth_first_insertion_point: Cell<private::EventHandle>,
278    currently_firing: Cell<Option<private::EventHandle>>,
279    to_destroy: Cell<Option<private::EventHandle>>,
280}
281
282impl EventLoop {
283    /// Creates an event loop for the current thread, panicking if one already exists. Runs the given
284    /// closure and then drops the event loop.
285    pub fn top_level<R, F>(main: F) -> R
286        where F: FnOnce(&WaitScope) -> R,
287    {
288        let mut events = handle_table::HandleTable::<private::EventNode>::new();
289        let dummy = private::EventNode { event: None, next: None, prev: None };
290        let head_handle = private::EventHandle(events.push(dummy));
291
292        EVENT_LOOP.with(move |maybe_event_loop| {
293            let event_loop = EventLoop {
294                _running: false,
295                _last_runnable_state: false,
296                events: RefCell::new(events),
297                head: head_handle,
298                tail: Cell::new(head_handle),
299                depth_first_insertion_point: Cell::new(head_handle), // insert after this node
300                currently_firing: Cell::new(None),
301                to_destroy: Cell::new(None),
302            };
303
304            assert!(maybe_event_loop.borrow().is_none());
305            *maybe_event_loop.borrow_mut() = Some(event_loop);
306        });
307
308        let wait_scope = WaitScope(::std::marker::PhantomData);
309        let result = main(&wait_scope);
310
311        EVENT_LOOP.with(move |maybe_event_loop| {
312            let el = ::std::mem::replace(&mut *maybe_event_loop.borrow_mut(), None);
313            match el {
314                None => unreachable!(),
315                Some(event_loop) => {
316                    // If there is still an event other than the head event, then there must have
317                    // been a memory leak.
318                    let remaining_events = event_loop.events.borrow().len();
319                    if remaining_events > 1 {
320                        ::std::mem::forget(event_loop); // Prevent double panic.
321                        panic!("{} leaked events found when cleaning up event loop. \
322                               Perhaps there is a reference cycle containing promises?",
323                               remaining_events - 1)
324                    }
325                }
326            }
327        });
328
329        result
330    }
331
332    fn arm_depth_first(&self, event_handle: private::EventHandle) {
333        let insertion_node_next = self.events.borrow()[self.depth_first_insertion_point.get().0].next;
334
335        match insertion_node_next {
336            Some(next_handle) => {
337                self.events.borrow_mut()[next_handle.0].prev = Some(event_handle);
338                self.events.borrow_mut()[event_handle.0].next = Some(next_handle);
339            }
340            None => {
341                self.tail.set(event_handle);
342            }
343        }
344
345        self.events.borrow_mut()[event_handle.0].prev = Some(self.depth_first_insertion_point.get());
346        self.events.borrow_mut()[self.depth_first_insertion_point.get().0].next = Some(event_handle);
347        self.depth_first_insertion_point.set(event_handle);
348    }
349
350    fn arm_breadth_first(&self, event_handle: private::EventHandle) {
351        let events = &mut *self.events.borrow_mut();
352        events[self.tail.get().0].next = Some(event_handle);
353        events[event_handle.0].prev = Some(self.tail.get());
354        self.tail.set(event_handle);
355    }
356
357    /// Runs the event loop for `max_turn_count` turns or until there is nothing left to be done,
358    /// whichever comes first. This never calls the `EventPort`'s `sleep()` or `poll()`. It will
359    /// call the `EventPort`'s `set_runnable(false)` if the queue becomes empty.
360    fn _run(&mut self, max_turn_count: u32) {
361        self._running = true;
362
363        for _ in 0..max_turn_count {
364            if !self.turn() {
365                break;
366            }
367        }
368    }
369
370    /// Runs the event loop for a single step.
371    fn turn(&self) -> bool {
372
373        let event_handle = match self.events.borrow()[self.head.0].next {
374            None => return false,
375            Some(event_handle) => { event_handle }
376        };
377        self.depth_first_insertion_point.set(event_handle);
378
379        self.currently_firing.set(Some(event_handle));
380        let mut event = ::std::mem::replace(&mut self.events.borrow_mut()[event_handle.0].event, None)
381            .expect("No event to fire?");
382        event.fire();
383        self.currently_firing.set(None);
384
385        let maybe_next = self.events.borrow()[event_handle.0].next;
386        self.events.borrow_mut()[self.head.0].next = maybe_next;
387        if let Some(e) = maybe_next {
388            self.events.borrow_mut()[e.0].prev = Some(self.head);
389        }
390
391        self.events.borrow_mut()[event_handle.0].next = None;
392        self.events.borrow_mut()[event_handle.0].prev = None;
393
394        if self.tail.get() == event_handle {
395            self.tail.set(self.head);
396        }
397
398        self.depth_first_insertion_point.set(self.head);
399
400        if let Some(event_handle) = self.to_destroy.get() {
401            self.events.borrow_mut().remove(event_handle.0);
402            self.to_destroy.set(None);
403        }
404
405        true
406    }
407}
408
409/// Specifies an error to generate when a [`PromiseFulfiller`](struct.PromiseFulfiller.html) is
410/// dropped.
411pub trait FulfillerDropped {
412    fn fulfiller_dropped() -> Self;
413}
414
415/// A handle that can be used to fulfill or reject a promise. If you think of a promise
416/// as the receiving end of a oneshot channel, then this is the sending end.
417///
418/// When a `PromiseFulfiller<T,E>` is dropped without first receiving a `fulfill()`, `reject()`, or
419/// `resolve()` call, its promise is rejected with the error value `E::fulfiller_dropped()`.
420pub struct PromiseFulfiller<T, E> where T: 'static, E: 'static + FulfillerDropped {
421    hub: Rc<RefCell<private::PromiseAndFulfillerHub<T,E>>>,
422    done: bool,
423}
424
425impl <T, E> PromiseFulfiller<T, E> where T: 'static, E: 'static + FulfillerDropped {
426    pub fn fulfill(mut self, value: T) {
427        self.hub.borrow_mut().resolve(Ok(value));
428        self.done = true;
429    }
430
431    pub fn reject(mut self, error: E) {
432        self.hub.borrow_mut().resolve(Err(error));
433        self.done = true;
434    }
435
436    pub fn resolve(mut self, result: Result<T, E>) {
437        self.hub.borrow_mut().resolve(result);
438        self.done = true;
439    }
440}
441
442impl <T, E> Drop for PromiseFulfiller<T, E> where T: 'static, E: 'static + FulfillerDropped {
443    fn drop(&mut self) {
444        if !self.done {
445            self.hub.borrow_mut().resolve(Err(E::fulfiller_dropped()));
446        }
447    }
448}
449
450impl FulfillerDropped for () {
451    fn fulfiller_dropped() -> () { () }
452}
453
454impl FulfillerDropped for ::std::io::Error {
455    fn fulfiller_dropped() -> ::std::io::Error {
456        ::std::io::Error::new(::std::io::ErrorKind::Other, "Promise fulfiller was dropped.")
457    }
458}
459
460/// Holds a collection of `Promise<T, E>`s and ensures that each executes to completion.
461/// Destroying a `TaskSet` automatically cancels all of its unfinished promises.
462pub struct TaskSet<T, E> where T: 'static, E: 'static {
463    task_set_impl: private::TaskSetImpl<T, E>,
464}
465
466impl <T, E> TaskSet <T, E> {
467    pub fn new(reaper: Box<TaskReaper<T, E>>) -> TaskSet<T, E> {
468        TaskSet { task_set_impl: private::TaskSetImpl::new(reaper) }
469    }
470
471    pub fn add(&mut self, promise: Promise<T, E>) {
472        self.task_set_impl.add(promise.node);
473    }
474}
475
476/// Callbacks to be invoked when a task in a [`TaskSet`](struct.TaskSet.html) finishes. You are
477/// required to implement at least the failure case.
478pub trait TaskReaper<T, E> where T: 'static, E: 'static {
479    fn task_succeeded(&mut self, _value: T) {}
480    fn task_failed(&mut self, error: E);
481}