1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
use std::time::Duration; use futures::{Async, Future, Poll}; use futures::unsync::oneshot; use tokio_core::reactor::Timeout; use fut::ActorFuture; use actor::Actor; use arbiter::Arbiter; pub struct Condition<T> where T: Clone { waiters: Vec<oneshot::Sender<T>>, } impl<T> Condition<T> where T: Clone { pub fn wait(&mut self) -> oneshot::Receiver<T> { let (tx, rx) = oneshot::channel(); self.waiters.push(tx); rx } pub fn set(self, result: T) { for waiter in self.waiters { let _ = waiter.send(result.clone()); } } } impl<T> Default for Condition<T> where T: Clone { fn default() -> Self { Condition { waiters: Vec::new() } } } pub(crate) struct TimerFunc<A> where A: Actor { f: Option<Box<TimerFuncBox<A>>>, timeout: Timeout, } impl<A> TimerFunc<A> where A: Actor { pub fn new<F>(timeout: Duration, f: F) -> TimerFunc<A> where F: FnOnce(& mut A, & mut A::Context) + 'static { TimerFunc { f: Some(Box::new(f)), timeout: Timeout::new(timeout, Arbiter::handle()).unwrap()} } } trait TimerFuncBox<A: Actor>: 'static { fn call(self: Box<Self>, &mut A, &mut A::Context); } impl<A: Actor, F: FnOnce(&mut A, &mut A::Context) + 'static> TimerFuncBox<A> for F { #[cfg_attr(feature="cargo-clippy", allow(boxed_local))] fn call(self: Box<Self>, act: &mut A, ctx: &mut A::Context) { (*self)(act, ctx) } } #[doc(hidden)] impl<A> ActorFuture for TimerFunc<A> where A: Actor { type Item = (); type Error = (); type Actor = A; fn poll(&mut self, act: &mut Self::Actor, ctx: &mut <Self::Actor as Actor>::Context) -> Poll<Self::Item, Self::Error> { match self.timeout.poll() { Ok(Async::Ready(_)) => { if let Some(f) = self.f.take() { f.call(act, ctx); } Ok(Async::Ready(())) } Ok(Async::NotReady) => Ok(Async::NotReady), Err(_) => unreachable!(), } } }