1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright 2016 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under (1) the MaidSafe.net Commercial License,
// version 1.0 or later, or (2) The General Public License (GPL), version 3, depending on which
// licence you accepted on initial access to the Software (the "Licences").
//
// By contributing code to the SAFE Network Software, or to this project generally, you agree to be
// bound by the terms of the MaidSafe Contributor Agreement.  This, along with the Licenses can be
// found in the root directory of this project at LICENSE, COPYING and CONTRIBUTOR.
//
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.
//
// Please review the Licences for the specific language governing permissions and limitations
// relating to use of the SAFE Network Software.

use client::Client;
use errors::CoreError;
use futures::Future;
use futures::stream::Stream;
use futures::sync::mpsc;
use tokio_core::reactor::Core;

/// Transmitter of messages to be run in the core event loop.
pub type CoreMsgTx<T> = mpsc::UnboundedSender<CoreMsg<T>>;
/// Receiver of messages to be run in the core event loop.
pub type CoreMsgRx<T> = mpsc::UnboundedReceiver<CoreMsg<T>>;

/// The final future which the event loop will run.
pub type TailFuture = Box<Future<Item = (), Error = ()>>;
type TailFutureFn<T> = FnMut(&Client<T>, &T) -> Option<TailFuture> + Send + 'static;

/// The message format that core event loop understands.
pub struct CoreMsg<T>(Option<Box<TailFutureFn<T>>>);

/// Future trait returned from core operations.
pub type CoreFuture<T> = Future<Item = T, Error = CoreError>;

impl<T> CoreMsg<T> {
    /// Construct a new message to ask core event loop to do something. If the
    /// return value of the given closure is optionally a future, it will be
    /// registered in the event loop.
    pub fn new<F>(f: F) -> Self
    where
        F: FnOnce(&Client<T>, &T) -> Option<TailFuture> + Send + 'static,
    {
        let mut f = Some(f);
        CoreMsg(Some(
            Box::new(move |client, context| -> Option<TailFuture> {
                let f = unwrap!(f.take());
                f(client, context)
            }),
        ))
    }

    /// Construct a new message which when processed by the event loop will
    /// terminate the event loop. This will be the graceful exit condition.
    pub fn build_terminator() -> Self {
        CoreMsg(None)
    }
}

/// Run the core event loop. This will block until the event loop is alive.
/// Hence must typically be called inside a spawned thread.
pub fn run<T>(mut el: Core, client: &Client<T>, context: &T, el_rx: CoreMsgRx<T>) {
    let el_h = el.handle();

    let keep_alive = el_rx.for_each(|core_msg| {
        if let Some(mut f) = core_msg.0 {
            if let Some(tail) = f(client, context) {
                el_h.spawn(tail);
            }
            Ok(())
        } else {
            // Err(io::Error::new(ErrorKind::Other, "Graceful Termination"))
            Err(())
        }
    });

    let _ = el.run(keep_alive);
    debug!("Exiting Core Event Loop");
}