susydev_jsonrpc_server_utils/
reactor.rs

1//! Event Loop Executor
2//! Either spawns a new event loop, or re-uses provided one.
3
4use num_cpus;
5use std::sync::mpsc;
6use std::{io, thread};
7use tokio;
8
9use crate::core::futures::{self, Future};
10
11/// Possibly uninitialized event loop executor.
12#[derive(Debug)]
13pub enum UninitializedExecutor {
14	/// Shared instance of executor.
15	Shared(tokio::runtime::TaskExecutor),
16	/// Event Loop should be spawned by the transport.
17	Unspawned,
18}
19
20impl UninitializedExecutor {
21	/// Initializes executor.
22	/// In case there is no shared executor, will spawn a new event loop.
23	/// Dropping `Executor` closes the loop.
24	pub fn initialize(self) -> io::Result<Executor> {
25		self.init_with_name("event.loop")
26	}
27
28	/// Initializes executor.
29	/// In case there is no shared executor, will spawn a new event loop.
30	/// Dropping `Executor` closes the loop.
31	pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Executor> {
32		match self {
33			UninitializedExecutor::Shared(executor) => Ok(Executor::Shared(executor)),
34			UninitializedExecutor::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Executor::Spawned),
35		}
36	}
37}
38
39/// Initialized Executor
40#[derive(Debug)]
41pub enum Executor {
42	/// Shared instance
43	Shared(tokio::runtime::TaskExecutor),
44	/// Spawned Event Loop
45	Spawned(RpcEventLoop),
46}
47
48impl Executor {
49	/// Get tokio executor associated with this event loop.
50	pub fn executor(&self) -> tokio::runtime::TaskExecutor {
51		match *self {
52			Executor::Shared(ref executor) => executor.clone(),
53			Executor::Spawned(ref eloop) => eloop.executor(),
54		}
55	}
56
57	/// Spawn a future onto the Tokio runtime.
58	pub fn spawn<F>(&self, future: F)
59	where
60		F: Future<Item = (), Error = ()> + Send + 'static,
61	{
62		self.executor().spawn(future)
63	}
64
65	/// Closes underlying event loop (if any!).
66	pub fn close(self) {
67		if let Executor::Spawned(eloop) = self {
68			eloop.close()
69		}
70	}
71
72	/// Wait for underlying event loop to finish (if any!).
73	pub fn wait(self) {
74		if let Executor::Spawned(eloop) = self {
75			let _ = eloop.wait();
76		}
77	}
78}
79
80/// A handle to running event loop. Dropping the handle will cause event loop to finish.
81#[derive(Debug)]
82pub struct RpcEventLoop {
83	executor: tokio::runtime::TaskExecutor,
84	close: Option<futures::Complete<()>>,
85	handle: Option<thread::JoinHandle<()>>,
86}
87
88impl Drop for RpcEventLoop {
89	fn drop(&mut self) {
90		self.close.take().map(|v| v.send(()));
91	}
92}
93
94impl RpcEventLoop {
95	/// Spawns a new thread with the `EventLoop`.
96	pub fn spawn() -> io::Result<Self> {
97		RpcEventLoop::with_name(None)
98	}
99
100	/// Spawns a new named thread with the `EventLoop`.
101	pub fn with_name(name: Option<String>) -> io::Result<Self> {
102		let (stop, stopped) = futures::oneshot();
103		let (tx, rx) = mpsc::channel();
104		let mut tb = thread::Builder::new();
105		if let Some(name) = name {
106			tb = tb.name(name);
107		}
108
109		let handle = tb
110			.spawn(move || {
111				let core_threads = match num_cpus::get_physical() {
112					1 => 1,
113					2..=4 => 2,
114					_ => 3,
115				};
116
117				let runtime = tokio::runtime::Builder::new()
118					.core_threads(core_threads)
119					.name_prefix("susydev-jsonrpc-eventloop-")
120					.build();
121
122				match runtime {
123					Ok(mut runtime) => {
124						tx.send(Ok(runtime.executor())).expect("Rx is blocking upper thread.");
125						let terminate = futures::empty().select(stopped).map(|_| ()).map_err(|_| ());
126						runtime.spawn(terminate);
127						runtime.shutdown_on_idle().wait().unwrap();
128					}
129					Err(err) => {
130						tx.send(Err(err)).expect("Rx is blocking upper thread.");
131					}
132				}
133			})
134			.expect("Couldn't spawn a thread.");
135
136		let exec = rx.recv().expect("tx is transfered to a newly spawned thread.");
137
138		exec.map(|executor| RpcEventLoop {
139			executor,
140			close: Some(stop),
141			handle: Some(handle),
142		})
143	}
144
145	/// Get executor for this event loop.
146	pub fn executor(&self) -> tokio::runtime::TaskExecutor {
147		self.executor.clone()
148	}
149
150	/// Blocks current thread and waits until the event loop is finished.
151	pub fn wait(mut self) -> thread::Result<()> {
152		self.handle
153			.take()
154			.expect("Handle is always set before self is consumed.")
155			.join()
156	}
157
158	/// Finishes this event loop.
159	pub fn close(mut self) {
160		let _ = self
161			.close
162			.take()
163			.expect("Close is always set before self is consumed.")
164			.send(())
165			.map_err(|e| {
166				warn!("Event Loop is already finished. {:?}", e);
167			});
168	}
169}