1use std::cell::{Cell, RefCell};
52use std::rc::Rc;
53use std::result::Result;
54use private::{promise_node, Event, BoolEvent, PromiseAndFulfillerHub, PromiseAndFulfillerWrapper,
55 EVENT_LOOP, with_current_event_loop, PromiseNode};
56
57
58#[macro_export]
64macro_rules! pry {
65 ($expr:expr) => (match $expr {
66 ::std::result::Result::Ok(val) => val,
67 ::std::result::Result::Err(err) => {
68 return $crate::Promise::err(::std::convert::From::from(err))
69 }
70 })
71}
72
73mod private;
74mod handle_table;
75
76#[must_use]
79pub struct Promise<T, E> where T: 'static, E: 'static {
80 node: Box<PromiseNode<T, E>>,
81}
82
83impl <T, E> Promise <T, E> {
84 pub fn ok(value: T) -> Promise<T, E> {
86 Promise { node: Box::new(promise_node::Immediate::new(Ok(value))) }
87 }
88
89 pub fn err(error: E) -> Promise<T, E> {
91 Promise { node: Box::new(promise_node::Immediate::new(Err(error))) }
92 }
93
94 pub fn never_done() -> Promise<T, E> {
96 Promise { node: Box::new(promise_node::NeverDone::new()) }
97 }
98
99 pub fn and_fulfiller() -> (Promise<T, E>, PromiseFulfiller<T, E>)
101 where E: FulfillerDropped
102 {
103 let result = Rc::new(RefCell::new(PromiseAndFulfillerHub::new()));
104 let result_promise: Promise<T, E> =
105 Promise { node: Box::new(PromiseAndFulfillerWrapper::new(result.clone()))};
106 (result_promise, PromiseFulfiller{ hub: result, done: false })
107 }
108
109 pub fn then_else<F, T1, E1>(self, func: F) -> Promise<T1, E1>
118 where F: 'static,
119 F: FnOnce(Result<T, E>) -> Promise<T1, E1>,
120 {
121 let intermediate = Box::new(promise_node::Transform::new(self.node, |x| Ok(func(x)) ));
122 Promise { node: Box::new(promise_node::Chain::new(intermediate)) }
123 }
124
125 pub fn then<F, T1>(self, func: F) -> Promise<T1, E>
127 where F: 'static,
128 F: FnOnce(T) -> Promise<T1, E>,
129 {
130 self.then_else(|r| { match r { Ok(v) => func(v), Err(e) => Promise::err(e) } })
131 }
132
133 pub fn map_else<F, T1, E1>(self, func: F) -> Promise<T1, E1>
141 where F: 'static,
142 F: FnOnce(Result<T, E>) -> Result<T1, E1>,
143 {
144 Promise { node: Box::new(promise_node::Transform::new(self.node, func)) }
145 }
146
147 pub fn map<F, R>(self, func: F) -> Promise<R, E>
149 where F: 'static,
150 F: FnOnce(T) -> Result<R, E>,
151 R: 'static
152 {
153 self.map_else(|r| { match r { Ok(v) => func(v), Err(e) => Err(e) } } )
154 }
155
156 pub fn map_err<E1, F>(self, func: F) -> Promise<T, E1>
158 where F: 'static,
159 F: FnOnce(E) -> E1
160 {
161 self.map_else(|r| { match r { Ok(v) => Ok(v), Err(e) => Err(func(e)) } } )
162 }
163
164 pub fn lift<E1>(self) -> Promise<T, E1> where E: Into<E1> {
166 self.map_err(|e| e.into())
167 }
168
169 pub fn exclusive_join(self, other: Promise<T, E>) -> Promise<T, E> {
172 Promise { node: Box::new(private::promise_node::ExclusiveJoin::new(self.node, other.node)) }
173 }
174
175 pub fn all<I>(promises: I) -> Promise<Vec<T>, E>
178 where I: Iterator<Item=Promise<T, E>>
179 {
180 Promise { node: Box::new(private::promise_node::ArrayJoin::new(promises)) }
181 }
182
183 pub fn fork(self) -> ForkedPromise<T, E> where T: Clone, E: Clone {
185 ForkedPromise::new(self)
186 }
187
188 pub fn attach<U>(self, value: U) -> Promise<T, E> where U: 'static {
190 self.map_else(move |result| { drop(value); result })
191 }
192
193 pub fn eagerly_evaluate(self) -> Promise<T, E> {
197 self.then(|v| { Promise::ok(v) })
198 }
199
200 pub fn wait<E1>(mut self, wait_scope: &WaitScope, event_source: &mut EventPort<E1>) -> Result<T, E>
205 where E: From<E1>
206 {
207 drop(wait_scope);
208 with_current_event_loop(move |event_loop| {
209 let fired = ::std::rc::Rc::new(::std::cell::Cell::new(false));
210 let done_event = BoolEvent::new(fired.clone());
211 let (handle, _dropper) = private::GuardedEventHandle::new();
212 handle.set(Box::new(done_event));
213 self.node.on_ready(handle);
214
215 while !fired.get() {
218 if !event_loop.turn() {
219 try!(event_source.wait());
221 }
222 }
223
224 self.node.get()
225 })
226 }
227}
228
229pub struct WaitScope(::std::marker::PhantomData<*mut u8>); pub struct ForkedPromise<T, E> where T: 'static + Clone, E: 'static + Clone {
237 hub: Rc<RefCell<promise_node::ForkHub<T, E>>>,
238}
239
240impl <T, E> ForkedPromise<T, E> where T: 'static + Clone, E: 'static + Clone {
241 fn new(inner: Promise<T, E>) -> ForkedPromise<T, E> {
242 ForkedPromise {
243 hub: Rc::new(RefCell::new(promise_node::ForkHub::new(inner.node))),
244 }
245 }
246
247 pub fn add_branch(&mut self) -> Promise<T, E> {
249 promise_node::ForkHub::add_branch(&self.hub)
250 }
251}
252
253pub trait EventPort<E> {
256 fn wait(&mut self) -> Result<(), E>;
258}
259
260pub struct ClosedEventPort<E: Clone>(pub E);
262
263impl <E: Clone> EventPort<E> for ClosedEventPort<E> {
264 fn wait(&mut self) -> Result<(), E> {
265 Err(self.0.clone())
266 }
267}
268
269pub struct EventLoop {
271_running: bool,
273 _last_runnable_state: bool,
274 events: RefCell<handle_table::HandleTable<private::EventNode>>,
275 head: private::EventHandle,
276 tail: Cell<private::EventHandle>,
277 depth_first_insertion_point: Cell<private::EventHandle>,
278 currently_firing: Cell<Option<private::EventHandle>>,
279 to_destroy: Cell<Option<private::EventHandle>>,
280}
281
282impl EventLoop {
283 pub fn top_level<R, F>(main: F) -> R
286 where F: FnOnce(&WaitScope) -> R,
287 {
288 let mut events = handle_table::HandleTable::<private::EventNode>::new();
289 let dummy = private::EventNode { event: None, next: None, prev: None };
290 let head_handle = private::EventHandle(events.push(dummy));
291
292 EVENT_LOOP.with(move |maybe_event_loop| {
293 let event_loop = EventLoop {
294 _running: false,
295 _last_runnable_state: false,
296 events: RefCell::new(events),
297 head: head_handle,
298 tail: Cell::new(head_handle),
299 depth_first_insertion_point: Cell::new(head_handle), currently_firing: Cell::new(None),
301 to_destroy: Cell::new(None),
302 };
303
304 assert!(maybe_event_loop.borrow().is_none());
305 *maybe_event_loop.borrow_mut() = Some(event_loop);
306 });
307
308 let wait_scope = WaitScope(::std::marker::PhantomData);
309 let result = main(&wait_scope);
310
311 EVENT_LOOP.with(move |maybe_event_loop| {
312 let el = ::std::mem::replace(&mut *maybe_event_loop.borrow_mut(), None);
313 match el {
314 None => unreachable!(),
315 Some(event_loop) => {
316 let remaining_events = event_loop.events.borrow().len();
319 if remaining_events > 1 {
320 ::std::mem::forget(event_loop); panic!("{} leaked events found when cleaning up event loop. \
322 Perhaps there is a reference cycle containing promises?",
323 remaining_events - 1)
324 }
325 }
326 }
327 });
328
329 result
330 }
331
332 fn arm_depth_first(&self, event_handle: private::EventHandle) {
333 let insertion_node_next = self.events.borrow()[self.depth_first_insertion_point.get().0].next;
334
335 match insertion_node_next {
336 Some(next_handle) => {
337 self.events.borrow_mut()[next_handle.0].prev = Some(event_handle);
338 self.events.borrow_mut()[event_handle.0].next = Some(next_handle);
339 }
340 None => {
341 self.tail.set(event_handle);
342 }
343 }
344
345 self.events.borrow_mut()[event_handle.0].prev = Some(self.depth_first_insertion_point.get());
346 self.events.borrow_mut()[self.depth_first_insertion_point.get().0].next = Some(event_handle);
347 self.depth_first_insertion_point.set(event_handle);
348 }
349
350 fn arm_breadth_first(&self, event_handle: private::EventHandle) {
351 let events = &mut *self.events.borrow_mut();
352 events[self.tail.get().0].next = Some(event_handle);
353 events[event_handle.0].prev = Some(self.tail.get());
354 self.tail.set(event_handle);
355 }
356
357 fn _run(&mut self, max_turn_count: u32) {
361 self._running = true;
362
363 for _ in 0..max_turn_count {
364 if !self.turn() {
365 break;
366 }
367 }
368 }
369
370 fn turn(&self) -> bool {
372
373 let event_handle = match self.events.borrow()[self.head.0].next {
374 None => return false,
375 Some(event_handle) => { event_handle }
376 };
377 self.depth_first_insertion_point.set(event_handle);
378
379 self.currently_firing.set(Some(event_handle));
380 let mut event = ::std::mem::replace(&mut self.events.borrow_mut()[event_handle.0].event, None)
381 .expect("No event to fire?");
382 event.fire();
383 self.currently_firing.set(None);
384
385 let maybe_next = self.events.borrow()[event_handle.0].next;
386 self.events.borrow_mut()[self.head.0].next = maybe_next;
387 if let Some(e) = maybe_next {
388 self.events.borrow_mut()[e.0].prev = Some(self.head);
389 }
390
391 self.events.borrow_mut()[event_handle.0].next = None;
392 self.events.borrow_mut()[event_handle.0].prev = None;
393
394 if self.tail.get() == event_handle {
395 self.tail.set(self.head);
396 }
397
398 self.depth_first_insertion_point.set(self.head);
399
400 if let Some(event_handle) = self.to_destroy.get() {
401 self.events.borrow_mut().remove(event_handle.0);
402 self.to_destroy.set(None);
403 }
404
405 true
406 }
407}
408
409pub trait FulfillerDropped {
412 fn fulfiller_dropped() -> Self;
413}
414
415pub struct PromiseFulfiller<T, E> where T: 'static, E: 'static + FulfillerDropped {
421 hub: Rc<RefCell<private::PromiseAndFulfillerHub<T,E>>>,
422 done: bool,
423}
424
425impl <T, E> PromiseFulfiller<T, E> where T: 'static, E: 'static + FulfillerDropped {
426 pub fn fulfill(mut self, value: T) {
427 self.hub.borrow_mut().resolve(Ok(value));
428 self.done = true;
429 }
430
431 pub fn reject(mut self, error: E) {
432 self.hub.borrow_mut().resolve(Err(error));
433 self.done = true;
434 }
435
436 pub fn resolve(mut self, result: Result<T, E>) {
437 self.hub.borrow_mut().resolve(result);
438 self.done = true;
439 }
440}
441
442impl <T, E> Drop for PromiseFulfiller<T, E> where T: 'static, E: 'static + FulfillerDropped {
443 fn drop(&mut self) {
444 if !self.done {
445 self.hub.borrow_mut().resolve(Err(E::fulfiller_dropped()));
446 }
447 }
448}
449
450impl FulfillerDropped for () {
451 fn fulfiller_dropped() -> () { () }
452}
453
454impl FulfillerDropped for ::std::io::Error {
455 fn fulfiller_dropped() -> ::std::io::Error {
456 ::std::io::Error::new(::std::io::ErrorKind::Other, "Promise fulfiller was dropped.")
457 }
458}
459
460pub struct TaskSet<T, E> where T: 'static, E: 'static {
463 task_set_impl: private::TaskSetImpl<T, E>,
464}
465
466impl <T, E> TaskSet <T, E> {
467 pub fn new(reaper: Box<TaskReaper<T, E>>) -> TaskSet<T, E> {
468 TaskSet { task_set_impl: private::TaskSetImpl::new(reaper) }
469 }
470
471 pub fn add(&mut self, promise: Promise<T, E>) {
472 self.task_set_impl.add(promise.node);
473 }
474}
475
476pub trait TaskReaper<T, E> where T: 'static, E: 'static {
479 fn task_succeeded(&mut self, _value: T) {}
480 fn task_failed(&mut self, error: E);
481}