bitconch_jsonrpc_server_utils/
reactor.rs

1//! Event Loop Executor
2//! Either spawns a new event loop, or re-uses provided one.
3
4use std::{io, thread};
5use std::sync::mpsc;
6use tokio;
7use num_cpus;
8
9use core::futures::{self, Future};
10
11/// Possibly uninitialized event loop executor.
12#[derive(Debug)]
13pub enum UninitializedExecutor {
14	/// Shared instance of executor.
15	Shared(tokio::runtime::TaskExecutor),
16	/// Event Loop should be spawned by the transport.
17	Unspawned,
18}
19
20impl UninitializedExecutor {
21	/// Initializes executor.
22	/// In case there is no shared executor, will spawn a new event loop.
23	/// Dropping `Executor` closes the loop.
24	pub fn initialize(self) -> io::Result<Executor> {
25		self.init_with_name("event.loop")
26	}
27
28	/// Initializes executor.
29	/// In case there is no shared executor, will spawn a new event loop.
30	/// Dropping `Executor` closes the loop.
31	pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Executor> {
32		match self {
33			UninitializedExecutor::Shared(executor) => Ok(Executor::Shared(executor)),
34			UninitializedExecutor::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Executor::Spawned),
35		}
36	}
37}
38
39/// Initialized Executor
40#[derive(Debug)]
41pub enum Executor {
42	/// Shared instance
43	Shared(tokio::runtime::TaskExecutor),
44	/// Spawned Event Loop
45	Spawned(RpcEventLoop),
46}
47
48impl Executor {
49	/// Get tokio executor associated with this event loop.
50	pub fn executor(&self) -> tokio::runtime::TaskExecutor {
51		match *self {
52			Executor::Shared(ref executor) => executor.clone(),
53			Executor::Spawned(ref eloop) => eloop.executor(),
54		}
55	}
56
57	/// Spawn a future onto the Tokio runtime.
58	pub fn spawn<F>(&self, future: F)
59	where
60		F: Future<Item = (), Error = ()> + Send + 'static,
61	{
62		self.executor().spawn(future)
63	}
64
65	/// Closes underlying event loop (if any!).
66	pub fn close(self) {
67		if let Executor::Spawned(eloop) = self {
68			eloop.close()
69		}
70	}
71
72	/// Wait for underlying event loop to finish (if any!).
73	pub fn wait(self) {
74		if let Executor::Spawned(eloop) = self {
75			let _ = eloop.wait();
76		}
77	}
78}
79
80/// A handle to running event loop. Dropping the handle will cause event loop to finish.
81#[derive(Debug)]
82pub struct RpcEventLoop {
83	executor: tokio::runtime::TaskExecutor,
84	close: Option<futures::Complete<()>>,
85	handle: Option<thread::JoinHandle<()>>,
86}
87
88impl Drop for RpcEventLoop {
89	fn drop(&mut self) {
90		self.close.take().map(|v| v.send(()));
91	}
92}
93
94impl RpcEventLoop {
95	/// Spawns a new thread with the `EventLoop`.
96	pub fn spawn() -> io::Result<Self> {
97		RpcEventLoop::with_name(None)
98	}
99
100	/// Spawns a new named thread with the `EventLoop`.
101	pub fn with_name(name: Option<String>) -> io::Result<Self> {
102		let (stop, stopped) = futures::oneshot();
103		let (tx, rx) = mpsc::channel();
104		let mut tb = thread::Builder::new();
105		if let Some(name) = name {
106			tb = tb.name(name);
107		}
108
109		let handle = tb.spawn(move || {
110			let mut tp_builder = tokio::executor::thread_pool::Builder::new();
111
112			let pool_size = match num_cpus::get_physical() {
113				1 => 1,
114				2...4 => 2,
115				_ => 3,
116			};
117
118			tp_builder
119				.pool_size(pool_size)
120				.name_prefix("jsonrpc-eventloop-");
121
122			let runtime = tokio::runtime::Builder::new()
123				.threadpool_builder(tp_builder)
124				.build();
125
126			match runtime {
127				Ok(mut runtime) => {
128					tx.send(Ok(runtime.executor())).expect("Rx is blocking upper thread.");
129					let terminate = futures::empty().select(stopped)
130						.map(|_| ())
131						.map_err(|_| ());
132					runtime.spawn(terminate);
133					runtime.shutdown_on_idle().wait().unwrap();
134				},
135				Err(err) => {
136					tx.send(Err(err)).expect("Rx is blocking upper thread.");
137				}
138			}
139		}).expect("Couldn't spawn a thread.");
140
141		let exec = rx.recv().expect("tx is transfered to a newly spawned thread.");
142
143		exec.map(|executor| RpcEventLoop {
144			executor,
145			close: Some(stop),
146			handle: Some(handle),
147		})
148	}
149
150	/// Get executor for this event loop.
151	pub fn executor(&self) -> tokio::runtime::TaskExecutor {
152		self.executor.clone()
153	}
154
155	/// Blocks current thread and waits until the event loop is finished.
156	pub fn wait(mut self) -> thread::Result<()> {
157		self.handle.take().expect("Handle is always set before self is consumed.").join()
158	}
159
160	/// Finishes this event loop.
161	pub fn close(mut self) {
162		let _ = self.close.take().expect("Close is always set before self is consumed.").send(()).map_err(|e| {
163			warn!("Event Loop is already finished. {:?}", e);
164		});
165	}
166}