1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Copyright 2018 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::client::Client;
use crate::errors::CoreError;
use futures::stream::Stream;
use futures::sync::mpsc;
use futures::Future;
use tokio::runtime::current_thread::{self, Runtime};

/// Transmitter of messages to be run in the core event loop.
pub type CoreMsgTx<C, T> = mpsc::UnboundedSender<CoreMsg<C, T>>;
/// Receiver of messages to be run in the core event loop.
pub type CoreMsgRx<C, T> = mpsc::UnboundedReceiver<CoreMsg<C, T>>;

/// The final future which the event loop will run.
pub type TailFuture = Box<dyn Future<Item = (), Error = ()>>;
type TailFutureFn<C, T> = dyn FnMut(&C, &T) -> Option<TailFuture> + Send + 'static;

/// The message format that core event loop understands.
pub struct CoreMsg<C: Client, T>(Option<Box<TailFutureFn<C, T>>>);

/// Future trait returned from core operations.
pub type CoreFuture<T> = dyn Future<Item = T, Error = CoreError>;

impl<C: Client, T> CoreMsg<C, T> {
    /// Construct a new message to ask core event loop to do something. If the
    /// return value of the given closure is optionally a future, it will be
    /// registered in the event loop.
    pub fn new<F>(f: F) -> Self
    where
        F: FnOnce(&C, &T) -> Option<TailFuture> + Send + 'static,
    {
        let mut f = Some(f);
        CoreMsg(Some(Box::new(
            move |client, context| -> Option<TailFuture> {
                let f = unwrap!(f.take());
                f(client, context)
            },
        )))
    }

    /// Construct a new message which when processed by the event loop will
    /// terminate the event loop. This will be the graceful exit condition.
    pub fn build_terminator() -> Self {
        CoreMsg(None)
    }
}

/// Run the core event loop. This will block until the event loop is alive.
/// Hence must typically be called inside a spawned thread.
pub fn run<C: Client, T>(mut el: Runtime, client: &C, context: &T, el_rx: CoreMsgRx<C, T>) {
    let keep_alive = el_rx.for_each(|core_msg| {
        if let Some(mut f) = core_msg.0 {
            if let Some(tail) = f(client, context) {
                current_thread::spawn(tail);
            }
            Ok(())
        } else {
            Err(())
        }
    });

    let _ = el.block_on(keep_alive);
    debug!("Exiting Core Event Loop");
}