parity_runtime/
lib.rs

1// Copyright 2015-2020 Parity Technologies (UK) Ltd.
2// This file is part of Parity Ethereum.
3
4// Parity Ethereum is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Ethereum is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Ethereum.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Tokio Runtime wrapper.
18
19use futures::compat::*;
20use futures01::{Future as Future01, IntoFuture as IntoFuture01};
21use std::{fmt, future::Future, thread};
22pub use tokio_compat::runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime, TaskExecutor};
23
24/// Runtime for futures.
25///
26/// Runs in a separate thread.
27pub struct Runtime {
28	executor: Executor,
29	handle: RuntimeHandle,
30}
31
32const RUNTIME_BUILD_PROOF: &str =
33	"Building a Tokio runtime will only fail when mio components cannot be initialized (catastrophic)";
34
35impl Runtime {
36	fn new(runtime_bldr: &mut TokioRuntimeBuilder) -> Self {
37		let mut runtime = runtime_bldr.build().expect(RUNTIME_BUILD_PROOF);
38
39		let (stop, stopped) = tokio::sync::oneshot::channel();
40		let (tx, rx) = std::sync::mpsc::channel();
41		let handle = thread::spawn(move || {
42			let executor = runtime.executor();
43			runtime.block_on_std(async move {
44				tx.send(executor).expect("Rx is blocking upper thread.");
45				let _ = stopped.await;
46			});
47		});
48		let executor = rx.recv().expect("tx is transfered to a newly spawned thread.");
49
50		Runtime {
51			executor: Executor { inner: Mode::Tokio(executor) },
52			handle: RuntimeHandle { close: Some(stop), handle: Some(handle) },
53		}
54	}
55
56	/// Spawns a new tokio runtime with a default thread count on a background
57	/// thread and returns a `Runtime` which can be used to spawn tasks via
58	/// its executor.
59	pub fn with_default_thread_count() -> Self {
60		let mut runtime_bldr = TokioRuntimeBuilder::new();
61		Self::new(&mut runtime_bldr)
62	}
63
64	/// Spawns a new tokio runtime with a the specified thread count on a
65	/// background thread and returns a `Runtime` which can be used to spawn
66	/// tasks via its executor.
67	#[cfg(any(test, feature = "test-helpers"))]
68	pub fn with_thread_count(thread_count: usize) -> Self {
69		let mut runtime_bldr = TokioRuntimeBuilder::new();
70		runtime_bldr.core_threads(thread_count);
71
72		Self::new(&mut runtime_bldr)
73	}
74
75	/// Returns this runtime raw executor.
76	#[cfg(any(test, feature = "test-helpers"))]
77	pub fn raw_executor(&self) -> TaskExecutor {
78		if let Mode::Tokio(ref executor) = self.executor.inner {
79			executor.clone()
80		} else {
81			panic!("Runtime is not initialized in Tokio mode.")
82		}
83	}
84
85	/// Returns runtime executor.
86	pub fn executor(&self) -> Executor {
87		self.executor.clone()
88	}
89}
90
91#[derive(Clone)]
92enum Mode {
93	Tokio(TaskExecutor),
94	// Mode used in tests
95	#[allow(dead_code)]
96	Sync,
97	// Mode used in tests
98	#[allow(dead_code)]
99	ThreadPerFuture,
100}
101
102impl fmt::Debug for Mode {
103	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
104		use self::Mode::*;
105
106		match *self {
107			Tokio(_) => write!(fmt, "tokio"),
108			Sync => write!(fmt, "synchronous"),
109			ThreadPerFuture => write!(fmt, "thread per future"),
110		}
111	}
112}
113
114fn block_on<F: Future<Output = ()> + Send + 'static>(r: F) {
115	tokio::runtime::Builder::new().enable_all().basic_scheduler().build().expect(RUNTIME_BUILD_PROOF).block_on(r)
116}
117
118#[derive(Debug, Clone)]
119pub struct Executor {
120	inner: Mode,
121}
122
123impl Executor {
124	/// Synchronous executor, used for tests.
125	#[cfg(any(test, feature = "test-helpers"))]
126	pub fn new_sync() -> Self {
127		Executor { inner: Mode::Sync }
128	}
129
130	/// Spawns a new thread for each future (use only for tests).
131	#[cfg(any(test, feature = "test-helpers"))]
132	pub fn new_thread_per_future() -> Self {
133		Executor { inner: Mode::ThreadPerFuture }
134	}
135
136	/// Spawn a legacy future on this runtime
137	pub fn spawn<R>(&self, r: R)
138	where
139		R: IntoFuture01<Item = (), Error = ()> + Send + 'static,
140		R::Future: Send + 'static,
141	{
142		self.spawn_std(async move {
143			let _ = r.into_future().compat().await;
144		})
145	}
146
147	/// Spawn an std future on this runtime
148	pub fn spawn_std<R>(&self, r: R)
149	where
150		R: Future<Output = ()> + Send + 'static,
151	{
152		match &self.inner {
153			Mode::Tokio(executor) => {
154				let _ = executor.spawn_handle_std(r);
155			}
156			Mode::Sync => block_on(r),
157			Mode::ThreadPerFuture => {
158				thread::spawn(move || block_on(r));
159			}
160		}
161	}
162}
163
164impl<F: Future01<Item = (), Error = ()> + Send + 'static> futures01::future::Executor<F> for Executor {
165	fn execute(&self, future: F) -> Result<(), futures01::future::ExecuteError<F>> {
166		match &self.inner {
167			Mode::Tokio(executor) => executor.execute(future),
168			Mode::Sync => {
169				block_on(async move {
170					let _ = future.compat().await;
171				});
172				Ok(())
173			}
174			Mode::ThreadPerFuture => {
175				thread::spawn(move || {
176					block_on(async move {
177						let _ = future.compat().await;
178					})
179				});
180				Ok(())
181			}
182		}
183	}
184}
185
186/// A handle to a runtime. Dropping the handle will cause runtime to shutdown.
187pub struct RuntimeHandle {
188	close: Option<tokio::sync::oneshot::Sender<()>>,
189	handle: Option<thread::JoinHandle<()>>,
190}
191
192impl From<Runtime> for RuntimeHandle {
193	fn from(el: Runtime) -> Self {
194		el.handle
195	}
196}
197
198impl Drop for RuntimeHandle {
199	fn drop(&mut self) {
200		self.close.take().map(|v| v.send(()));
201	}
202}
203
204impl RuntimeHandle {
205	/// Blocks current thread and waits until the runtime is finished.
206	pub fn wait(mut self) -> thread::Result<()> {
207		self.handle.take().expect("Handle is taken only in `wait`, `wait` is consuming; qed").join()
208	}
209
210	/// Finishes this runtime.
211	pub fn close(mut self) {
212		let _ =
213			self.close.take().expect("Close is taken only in `close` and `drop`. `close` is consuming; qed").send(());
214	}
215}