1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use crate::client::Client;
use crate::errors::CoreError;
use futures::stream::Stream;
use futures::sync::mpsc;
use futures::Future;
use tokio::runtime::current_thread::{self, Runtime};
pub type CoreMsgTx<C, T> = mpsc::UnboundedSender<CoreMsg<C, T>>;
pub type CoreMsgRx<C, T> = mpsc::UnboundedReceiver<CoreMsg<C, T>>;
pub type TailFuture = Box<dyn Future<Item = (), Error = ()>>;
type TailFutureFn<C, T> = dyn FnMut(&C, &T) -> Option<TailFuture> + Send + 'static;
pub struct CoreMsg<C: Client, T>(Option<Box<TailFutureFn<C, T>>>);
pub type CoreFuture<T> = dyn Future<Item = T, Error = CoreError>;
impl<C: Client, T> CoreMsg<C, T> {
pub fn new<F>(f: F) -> Self
where
F: FnOnce(&C, &T) -> Option<TailFuture> + Send + 'static,
{
let mut f = Some(f);
CoreMsg(Some(Box::new(
move |client, context| -> Option<TailFuture> {
let f = unwrap!(f.take());
f(client, context)
},
)))
}
pub fn build_terminator() -> Self {
CoreMsg(None)
}
}
pub fn run<C: Client, T>(mut el: Runtime, client: &C, context: &T, el_rx: CoreMsgRx<C, T>) {
let keep_alive = el_rx.for_each(|core_msg| {
if let Some(mut f) = core_msg.0 {
if let Some(tail) = f(client, context) {
current_thread::spawn(tail);
}
Ok(())
} else {
Err(())
}
});
let _ = el.block_on(keep_alive);
debug!("Exiting Core Event Loop");
}