Skip to main content

soil_cli/
runner.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7use crate::{error::Error as CliError, Result, Signals, SubstrateCli};
8use chrono::prelude::*;
9use futures::{future::FutureExt, Future};
10use log::info;
11use soil_client::utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
12use soil_service::{Configuration, Error as ServiceError, TaskManager};
13use std::{marker::PhantomData, time::Duration};
14
15/// Build a tokio runtime with all features.
16pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::Error> {
17	tokio::runtime::Builder::new_multi_thread()
18		.on_thread_start(|| {
19			TOKIO_THREADS_ALIVE.inc();
20			TOKIO_THREADS_TOTAL.inc();
21		})
22		.on_thread_stop(|| {
23			TOKIO_THREADS_ALIVE.dec();
24		})
25		.enable_all()
26		.build()
27}
28
29/// A Substrate CLI runtime that can be used to run a node or a command
30pub struct Runner<C: SubstrateCli> {
31	config: Configuration,
32	tokio_runtime: tokio::runtime::Runtime,
33	signals: Signals,
34	phantom: PhantomData<C>,
35}
36
37impl<C: SubstrateCli> Runner<C> {
38	/// Create a new runtime with the command provided in argument
39	pub fn new(
40		config: Configuration,
41		tokio_runtime: tokio::runtime::Runtime,
42		signals: Signals,
43	) -> Result<Runner<C>> {
44		Ok(Runner { config, tokio_runtime, signals, phantom: PhantomData })
45	}
46
47	/// Log information about the node itself.
48	///
49	/// # Example:
50	///
51	/// ```text
52	/// 2020-06-03 16:14:21 Substrate Node
53	/// 2020-06-03 16:14:21 ✌️  version 2.0.0-rc3-f4940588c-x86_64-linux-gnu
54	/// 2020-06-03 16:14:21 ❤️  by Parity Technologies <admin@parity.io>, 2017-2020
55	/// 2020-06-03 16:14:21 📋 Chain specification: Flaming Fir
56	/// 2020-06-03 16:14:21 🏷  Node name: jolly-rod-7462
57	/// 2020-06-03 16:14:21 👤 Role: FULL
58	/// 2020-06-03 16:14:21 💾 Database: RocksDb at /tmp/c/chains/flamingfir7/db
59	/// 2020-06-03 16:14:21 ⛓  Native runtime: node-251 (soil-test-staging-node-1.tx1.au10)
60	/// ```
61	fn print_node_infos(&self) {
62		print_node_infos::<C>(self.config())
63	}
64
65	/// A helper function that runs a node with tokio and stops if the process receives the signal
66	/// `SIGTERM` or `SIGINT`.
67	pub fn run_node_until_exit<F, E>(
68		self,
69		initialize: impl FnOnce(Configuration) -> F,
70	) -> std::result::Result<(), E>
71	where
72		F: Future<Output = std::result::Result<TaskManager, E>>,
73		E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
74	{
75		self.print_node_infos();
76
77		let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
78
79		let res = self
80			.tokio_runtime
81			.block_on(self.signals.run_until_signal(task_manager.future().fuse()));
82		// We need to drop the task manager here to inform all tasks that they should shut down.
83		//
84		// This is important to be done before we instruct the tokio runtime to shutdown. Otherwise
85		// the tokio runtime will wait the full 60 seconds for all tasks to stop.
86		let task_registry = task_manager.into_task_registry();
87
88		// Give all futures 60 seconds to shutdown, before tokio "leaks" them.
89		let shutdown_timeout = Duration::from_secs(60);
90		self.tokio_runtime.shutdown_timeout(shutdown_timeout);
91
92		let running_tasks = task_registry.running_tasks();
93
94		if !running_tasks.is_empty() {
95			log::error!("Detected running(potentially stalled) tasks on shutdown:");
96			running_tasks.iter().for_each(|(task, count)| {
97				let instances_desc =
98					if *count > 1 { format!("with {} instances ", count) } else { "".to_string() };
99
100				if task.is_default_group() {
101					log::error!(
102						"Task \"{}\" was still running {}after waiting {} seconds to finish.",
103						task.name,
104						instances_desc,
105						shutdown_timeout.as_secs(),
106					);
107				} else {
108					log::error!(
109						"Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.",
110						task.name,
111						task.group,
112						instances_desc,
113						shutdown_timeout.as_secs(),
114					);
115				}
116			});
117		}
118
119		res.map_err(Into::into)
120	}
121
122	/// A helper function that runs a command with the configuration of this node.
123	pub fn sync_run<E>(
124		self,
125		runner: impl FnOnce(Configuration) -> std::result::Result<(), E>,
126	) -> std::result::Result<(), E>
127	where
128		E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
129	{
130		runner(self.config)
131	}
132
133	/// A helper function that runs a future with tokio and stops if the process receives
134	/// the signal `SIGTERM` or `SIGINT`.
135	pub fn async_run<F, E>(
136		self,
137		runner: impl FnOnce(Configuration) -> std::result::Result<(F, TaskManager), E>,
138	) -> std::result::Result<(), E>
139	where
140		F: Future<Output = std::result::Result<(), E>>,
141		E: std::error::Error + Send + Sync + 'static + From<ServiceError> + From<CliError>,
142	{
143		let (future, task_manager) = runner(self.config)?;
144		self.tokio_runtime.block_on(self.signals.run_until_signal(future.fuse()))?;
145		// Drop the task manager before dropping the rest, to ensure that all futures were informed
146		// about the shut down.
147		drop(task_manager);
148		Ok(())
149	}
150
151	/// Get an immutable reference to the node Configuration
152	pub fn config(&self) -> &Configuration {
153		&self.config
154	}
155
156	/// Get a mutable reference to the node Configuration
157	pub fn config_mut(&mut self) -> &mut Configuration {
158		&mut self.config
159	}
160}
161
162/// Log information about the node itself.
163pub fn print_node_infos<C: SubstrateCli>(config: &Configuration) {
164	info!("{}", C::impl_name());
165	info!("✌️  version {}", C::impl_version());
166	info!("❤️  by {}, {}-{}", C::author(), C::copyright_start_year(), Local::now().year());
167	info!("📋 Chain specification: {}", config.chain_spec.name());
168	info!("🏷  Node name: {}", config.network.node_name);
169	info!("👤 Role: {}", config.display_role());
170	info!(
171		"💾 Database: {} at {}",
172		config.database,
173		config
174			.database
175			.path()
176			.map_or_else(|| "<unknown>".to_owned(), |p| p.display().to_string())
177	);
178}
179
180#[cfg(test)]
181mod tests {
182	use super::*;
183	use soil_network::config::NetworkConfiguration;
184	use soil_service::{
185		config::{ExecutorConfiguration, RpcConfiguration},
186		Arc, ChainType, GenericChainSpec, NoExtension,
187	};
188	use std::{
189		path::PathBuf,
190		sync::atomic::{AtomicU64, Ordering},
191	};
192
193	struct Cli;
194
195	impl SubstrateCli for Cli {
196		fn author() -> String {
197			"test".into()
198		}
199
200		fn impl_name() -> String {
201			"yep".into()
202		}
203
204		fn impl_version() -> String {
205			"version".into()
206		}
207
208		fn description() -> String {
209			"desc".into()
210		}
211
212		fn support_url() -> String {
213			"no.pe".into()
214		}
215
216		fn copyright_start_year() -> i32 {
217			2042
218		}
219
220		fn load_spec(
221			&self,
222			_: &str,
223		) -> std::result::Result<Box<dyn soil_service::ChainSpec>, String> {
224			Err("nope".into())
225		}
226	}
227
228	fn create_runner() -> Runner<Cli> {
229		let runtime = build_runtime().unwrap();
230
231		let root = PathBuf::from("db");
232		let runner = Runner::new(
233			Configuration {
234				impl_name: "spec".into(),
235				impl_version: "3".into(),
236				role: soil_service::Role::Authority,
237				tokio_handle: runtime.handle().clone(),
238				transaction_pool: Default::default(),
239				network: NetworkConfiguration::new_memory(),
240				keystore: soil_service::config::KeystoreConfig::InMemory,
241				database: soil_client::db::DatabaseSource::ParityDb { path: root.clone() },
242				trie_cache_maximum_size: None,
243				warm_up_trie_cache: None,
244				state_pruning: None,
245				blocks_pruning: soil_client::db::BlocksPruning::KeepAll,
246				chain_spec: Box::new(
247					GenericChainSpec::<NoExtension, ()>::builder(
248						Default::default(),
249						NoExtension::None,
250					)
251					.with_name("test")
252					.with_id("test_id")
253					.with_chain_type(ChainType::Development)
254					.with_genesis_config_patch(Default::default())
255					.build(),
256				),
257				executor: ExecutorConfiguration::default(),
258				wasm_runtime_overrides: None,
259				rpc: RpcConfiguration {
260					addr: None,
261					max_connections: Default::default(),
262					cors: None,
263					methods: Default::default(),
264					max_request_size: Default::default(),
265					max_response_size: Default::default(),
266					id_provider: Default::default(),
267					max_subs_per_conn: Default::default(),
268					message_buffer_capacity: Default::default(),
269					port: 9944,
270					batch_config: soil_service::config::RpcBatchRequestConfig::Unlimited,
271					rate_limit: None,
272					rate_limit_whitelisted_ips: Default::default(),
273					rate_limit_trust_proxy_headers: Default::default(),
274					request_logger_limit: 1024,
275				},
276				prometheus_config: None,
277				telemetry_endpoints: None,
278				offchain_worker: Default::default(),
279				force_authoring: false,
280				disable_grandpa: false,
281				dev_key_seed: None,
282				tracing_targets: None,
283				tracing_receiver: Default::default(),
284				announce_block: true,
285				base_path: soil_service::BasePath::new(root.clone()),
286				data_path: root,
287			},
288			runtime,
289			Signals::dummy(),
290		)
291		.unwrap();
292
293		runner
294	}
295
296	#[test]
297	fn ensure_run_until_exit_informs_tasks_to_end() {
298		let runner = create_runner();
299
300		let counter = Arc::new(AtomicU64::new(0));
301		let counter2 = counter.clone();
302
303		runner
304			.run_node_until_exit(move |cfg| async move {
305				let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
306				let (sender, receiver) = futures::channel::oneshot::channel();
307
308				// We need to use `spawn_blocking` here so that we get a dedicated thread for our
309				// future. This is important for this test, as otherwise tokio can just "drop" the
310				// future.
311				task_manager.spawn_handle().spawn_blocking("test", None, async move {
312					let _ = sender.send(());
313					loop {
314						counter2.fetch_add(1, Ordering::Relaxed);
315						futures_timer::Delay::new(Duration::from_millis(50)).await;
316					}
317				});
318
319				task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
320					// Let's stop this essential task directly when our other task started.
321					// It will signal that the task manager should end.
322					let _ = receiver.await;
323				});
324
325				Ok::<_, soil_service::Error>(task_manager)
326			})
327			.unwrap_err();
328
329		let count = counter.load(Ordering::Relaxed);
330
331		// Ensure that our counting task was running for less than 30 seconds.
332		// It should be directly killed, but for CI and whatever we are being a little bit more
333		// "relaxed".
334		assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
335	}
336
337	fn run_test_in_another_process(
338		test_name: &str,
339		test_body: impl FnOnce(),
340	) -> Option<std::process::Output> {
341		if std::env::var("RUN_FORKED_TEST").is_ok() {
342			test_body();
343			None
344		} else {
345			let output = std::process::Command::new(std::env::current_exe().unwrap())
346				.arg(test_name)
347				.env("RUN_FORKED_TEST", "1")
348				.output()
349				.unwrap();
350
351			assert!(output.status.success());
352			Some(output)
353		}
354	}
355
356	/// This test ensures that `run_node_until_exit` aborts waiting for "stuck" tasks after 60
357	/// seconds, aka doesn't wait until they are finished (which may never happen).
358	#[test]
359	fn ensure_run_until_exit_is_not_blocking_indefinitely() {
360		let output = run_test_in_another_process(
361			"ensure_run_until_exit_is_not_blocking_indefinitely",
362			|| {
363				subsoil::tracing::try_init_simple();
364
365				let runner = create_runner();
366
367				runner
368					.run_node_until_exit(move |cfg| async move {
369						let task_manager =
370							TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
371						let (sender, receiver) = futures::channel::oneshot::channel();
372
373						// We need to use `spawn_blocking` here so that we get a dedicated thread
374						// for our future. This future is more blocking code that will never end.
375						task_manager.spawn_handle().spawn_blocking("test", None, async move {
376							let _ = sender.send(());
377							loop {
378								std::thread::sleep(Duration::from_secs(30));
379							}
380						});
381
382						task_manager.spawn_essential_handle().spawn_blocking(
383							"test2",
384							None,
385							async {
386								// Let's stop this essential task directly when our other task
387								// started. It will signal that the task manager should end.
388								let _ = receiver.await;
389							},
390						);
391
392						Ok::<_, soil_service::Error>(task_manager)
393					})
394					.unwrap_err();
395			},
396		);
397
398		let Some(output) = output else { return };
399
400		let stderr = dbg!(String::from_utf8(output.stderr).unwrap());
401
402		assert!(
403			stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.")
404		);
405		assert!(!stderr
406			.contains("Task \"test2\" was still running after waiting 60 seconds to finish."));
407	}
408}