use client::Client;
use errors::CoreError;
use futures::Future;
use futures::stream::Stream;
use futures::sync::mpsc;
use tokio_core::reactor::Core;
pub type CoreMsgTx<T> = mpsc::UnboundedSender<CoreMsg<T>>;
pub type CoreMsgRx<T> = mpsc::UnboundedReceiver<CoreMsg<T>>;
pub type TailFuture = Box<Future<Item = (), Error = ()>>;
type TailFutureFn<T> = FnMut(&Client<T>, &T) -> Option<TailFuture> + Send + 'static;
pub struct CoreMsg<T>(Option<Box<TailFutureFn<T>>>);
pub type CoreFuture<T> = Future<Item = T, Error = CoreError>;
impl<T> CoreMsg<T> {
pub fn new<F>(f: F) -> Self
where
F: FnOnce(&Client<T>, &T) -> Option<TailFuture> + Send + 'static,
{
let mut f = Some(f);
CoreMsg(Some(
Box::new(move |client, context| -> Option<TailFuture> {
let f = unwrap!(f.take());
f(client, context)
}),
))
}
pub fn build_terminator() -> Self {
CoreMsg(None)
}
}
pub fn run<T>(mut el: Core, client: &Client<T>, context: &T, el_rx: CoreMsgRx<T>) {
let el_h = el.handle();
let keep_alive = el_rx.for_each(|core_msg| {
if let Some(mut f) = core_msg.0 {
if let Some(tail) = f(client, context) {
el_h.spawn(tail);
}
Ok(())
} else {
Err(())
}
});
let _ = el.run(keep_alive);
debug!("Exiting Core Event Loop");
}