tetsy_jsonrpc_server_utils/
reactor.rs

1//! Event Loop Executor
2//!
3//! Either spawns a new event loop, or re-uses provided one.
4//! Spawned event loop is always single threaded (mostly for
5//! historical/backward compatibility reasons) despite the fact
6//! that `tokio::runtime` can be multi-threaded.
7
8use std::io;
9use tokio;
10
11use crate::core::futures::{self, Future};
12
13/// Possibly uninitialized event loop executor.
14#[derive(Debug)]
15pub enum UninitializedExecutor {
16	/// Shared instance of executor.
17	Shared(tokio::runtime::TaskExecutor),
18	/// Event Loop should be spawned by the transport.
19	Unspawned,
20}
21
22impl UninitializedExecutor {
23	/// Initializes executor.
24	/// In case there is no shared executor, will spawn a new event loop.
25	/// Dropping `Executor` closes the loop.
26	pub fn initialize(self) -> io::Result<Executor> {
27		self.init_with_name("event.loop")
28	}
29
30	/// Initializes executor.
31	/// In case there is no shared executor, will spawn a new event loop.
32	/// Dropping `Executor` closes the loop.
33	pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Executor> {
34		match self {
35			UninitializedExecutor::Shared(executor) => Ok(Executor::Shared(executor)),
36			UninitializedExecutor::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Executor::Spawned),
37		}
38	}
39}
40
41/// Initialized Executor
42#[derive(Debug)]
43pub enum Executor {
44	/// Shared instance
45	Shared(tokio::runtime::TaskExecutor),
46	/// Spawned Event Loop
47	Spawned(RpcEventLoop),
48}
49
50impl Executor {
51	/// Get tokio executor associated with this event loop.
52	pub fn executor(&self) -> tokio::runtime::TaskExecutor {
53		match *self {
54			Executor::Shared(ref executor) => executor.clone(),
55			Executor::Spawned(ref eloop) => eloop.executor(),
56		}
57	}
58
59	/// Spawn a future onto the Tokio runtime.
60	pub fn spawn<F>(&self, future: F)
61	where
62		F: Future<Item = (), Error = ()> + Send + 'static,
63	{
64		self.executor().spawn(future)
65	}
66
67	/// Closes underlying event loop (if any!).
68	pub fn close(self) {
69		if let Executor::Spawned(eloop) = self {
70			eloop.close()
71		}
72	}
73
74	/// Wait for underlying event loop to finish (if any!).
75	pub fn wait(self) {
76		if let Executor::Spawned(eloop) = self {
77			let _ = eloop.wait();
78		}
79	}
80}
81
82/// A handle to running event loop. Dropping the handle will cause event loop to finish.
83#[derive(Debug)]
84pub struct RpcEventLoop {
85	executor: tokio::runtime::TaskExecutor,
86	close: Option<futures::Complete<()>>,
87	handle: Option<tokio::runtime::Shutdown>,
88}
89
90impl Drop for RpcEventLoop {
91	fn drop(&mut self) {
92		self.close.take().map(|v| v.send(()));
93	}
94}
95
96impl RpcEventLoop {
97	/// Spawns a new thread with the `EventLoop`.
98	pub fn spawn() -> io::Result<Self> {
99		RpcEventLoop::with_name(None)
100	}
101
102	/// Spawns a new named thread with the `EventLoop`.
103	pub fn with_name(name: Option<String>) -> io::Result<Self> {
104		let (stop, stopped) = futures::oneshot();
105
106		let mut tb = tokio::runtime::Builder::new();
107		tb.core_threads(1);
108
109		if let Some(name) = name {
110			tb.name_prefix(name);
111		}
112
113		let mut runtime = tb.build()?;
114		let executor = runtime.executor();
115		let terminate = futures::empty().select(stopped).map(|_| ()).map_err(|_| ());
116		runtime.spawn(terminate);
117		let handle = runtime.shutdown_on_idle();
118
119		Ok(RpcEventLoop {
120			executor,
121			close: Some(stop),
122			handle: Some(handle),
123		})
124	}
125
126	/// Get executor for this event loop.
127	pub fn executor(&self) -> tokio::runtime::TaskExecutor {
128		self.executor.clone()
129	}
130
131	/// Blocks current thread and waits until the event loop is finished.
132	pub fn wait(mut self) -> Result<(), ()> {
133		self.handle.take().ok_or(())?.wait()
134	}
135
136	/// Finishes this event loop.
137	pub fn close(mut self) {
138		let _ = self
139			.close
140			.take()
141			.expect("Close is always set before self is consumed.")
142			.send(())
143			.map_err(|e| {
144				warn!("Event Loop is already finished. {:?}", e);
145			});
146	}
147}
148
149#[cfg(test)]
150mod tests {
151	use super::*;
152
153	#[test]
154	fn make_sure_rpc_event_loop_is_send_and_sync() {
155		fn is_send_and_sync<T: Send + Sync>() {}
156
157		is_send_and_sync::<RpcEventLoop>();
158	}
159}