rs_jsonrpc_server_utils/
reactor.rs

1//! Event Loop Remote
2//! Either spawns a new event loop, or re-uses provided one.
3
4use std::{io, thread};
5use std::sync::mpsc;
6use tokio_core;
7
8use core::futures::{self, Future};
9
10/// Possibly uninitialized event loop remote.
11#[derive(Debug)]
12pub enum UninitializedRemote {
13	/// Shared instance of remote.
14	Shared(tokio_core::reactor::Remote),
15	/// Event Loop should be spawned by the transport.
16	Unspawned,
17}
18
19impl UninitializedRemote {
20	/// Initializes remote.
21	/// In case there is no shared remote, will spawn a new event loop.
22	/// Dropping `Remote` closes the loop.
23	pub fn initialize(self) -> io::Result<Remote> {
24		self.init_with_name("event.loop")
25	}
26
27	/// Initializes remote.
28	/// In case there is no shared remote, will spawn a new event loop.
29	/// Dropping `Remote` closes the loop.
30	pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Remote> {
31		match self {
32			UninitializedRemote::Shared(remote) => Ok(Remote::Shared(remote)),
33			UninitializedRemote::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Remote::Spawned),
34		}
35	}
36}
37
38/// Initialized Remote
39#[derive(Debug)]
40pub enum Remote {
41	/// Shared instance
42	Shared(tokio_core::reactor::Remote),
43	/// Spawned Event Loop
44	Spawned(RpcEventLoop),
45}
46
47impl Remote {
48	/// Get remote associated with this event loop.
49	pub fn remote(&self) -> tokio_core::reactor::Remote {
50		match *self {
51			Remote::Shared(ref remote) => remote.clone(),
52			Remote::Spawned(ref eloop) => eloop.remote(),
53		}
54	}
55
56	/// Closes underlying event loop (if any!).
57	pub fn close(self) {
58		if let Remote::Spawned(eloop) = self {
59			eloop.close()
60		}
61	}
62
63	/// Wait for underlying event loop to finish (if any!).
64	pub fn wait(self) {
65		if let Remote::Spawned(eloop) = self {
66			let _ = eloop.wait();
67		}
68	}
69}
70
71/// A handle to running event loop. Dropping the handle will cause event loop to finish.
72#[derive(Debug)]
73pub struct RpcEventLoop {
74	remote: tokio_core::reactor::Remote,
75	close: Option<futures::Complete<()>>,
76	handle: Option<thread::JoinHandle<()>>,
77}
78
79impl Drop for RpcEventLoop {
80	fn drop(&mut self) {
81		self.close.take().map(|v| v.send(()));
82	}
83}
84
85impl RpcEventLoop {
86	/// Spawns a new thread with the `EventLoop`.
87	pub fn spawn() -> io::Result<Self> {
88		RpcEventLoop::with_name(None)
89	}
90
91	/// Spawns a new named thread with the `EventLoop`.
92	pub fn with_name(name: Option<String>) -> io::Result<Self> {
93		let (stop, stopped) = futures::oneshot();
94		let (tx, rx) = mpsc::channel();
95		let mut tb = thread::Builder::new();
96		if let Some(name) = name {
97			tb = tb.name(name);
98		}
99		let handle = tb.spawn(move || {
100			let el = tokio_core::reactor::Core::new();
101			match el {
102				Ok(mut el) => {
103					tx.send(Ok(el.remote())).expect("Rx is blocking upper thread.");
104					let _ = el.run(futures::empty().select(stopped));
105				},
106				Err(err) => {
107					tx.send(Err(err)).expect("Rx is blocking upper thread.");
108				}
109			}
110		}).expect("Couldn't spawn a thread.");
111		let remote = rx.recv().expect("tx is transfered to a newly spawned thread.");
112
113		remote.map(|remote| RpcEventLoop {
114			remote: remote,
115			close: Some(stop),
116			handle: Some(handle),
117		})
118	}
119
120	/// Get remote for this event loop.
121	pub fn remote(&self) -> tokio_core::reactor::Remote {
122		self.remote.clone()
123	}
124
125	/// Blocks current thread and waits until the event loop is finished.
126	pub fn wait(mut self) -> thread::Result<()> {
127		self.handle.take().expect("Handle is always set before self is consumed.").join()
128	}
129
130	/// Finishes this event loop.
131	pub fn close(mut self) {
132		let _ = self.close.take().expect("Close is always set before self is consumed.").send(()).map_err(|e| {
133			warn!("Event Loop is already finished. {:?}", e);
134		});
135	}
136}