1use crate::{error::Error as CliError, Result, Signals, SubstrateCli};
20use chrono::prelude::*;
21use futures::{future::FutureExt, Future};
22use log::info;
23use sc_service::{Configuration, Error as ServiceError, TaskManager};
24use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
25use std::{marker::PhantomData, time::Duration};
26
27pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::Error> {
29 tokio::runtime::Builder::new_multi_thread()
30 .on_thread_start(|| {
31 TOKIO_THREADS_ALIVE.inc();
32 TOKIO_THREADS_TOTAL.inc();
33 })
34 .on_thread_stop(|| {
35 TOKIO_THREADS_ALIVE.dec();
36 })
37 .enable_all()
38 .build()
39}
40
41pub struct Runner<C: SubstrateCli> {
43 config: Configuration,
44 tokio_runtime: tokio::runtime::Runtime,
45 signals: Signals,
46 phantom: PhantomData<C>,
47}
48
49impl<C: SubstrateCli> Runner<C> {
50 pub fn new(
52 config: Configuration,
53 tokio_runtime: tokio::runtime::Runtime,
54 signals: Signals,
55 ) -> Result<Runner<C>> {
56 Ok(Runner { config, tokio_runtime, signals, phantom: PhantomData })
57 }
58
59 fn print_node_infos(&self) {
74 print_node_infos::<C>(self.config())
75 }
76
77 pub fn run_node_until_exit<F, E>(
80 self,
81 initialize: impl FnOnce(Configuration) -> F,
82 ) -> std::result::Result<(), E>
83 where
84 F: Future<Output = std::result::Result<TaskManager, E>>,
85 E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
86 {
87 self.print_node_infos();
88
89 let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
90
91 let res = self
92 .tokio_runtime
93 .block_on(self.signals.run_until_signal(task_manager.future().fuse()));
94 let task_registry = task_manager.into_task_registry();
99
100 let shutdown_timeout = Duration::from_secs(60);
102 self.tokio_runtime.shutdown_timeout(shutdown_timeout);
103
104 let running_tasks = task_registry.running_tasks();
105
106 if !running_tasks.is_empty() {
107 log::error!("Detected running(potentially stalled) tasks on shutdown:");
108 running_tasks.iter().for_each(|(task, count)| {
109 let instances_desc =
110 if *count > 1 { format!("with {} instances ", count) } else { "".to_string() };
111
112 if task.is_default_group() {
113 log::error!(
114 "Task \"{}\" was still running {}after waiting {} seconds to finish.",
115 task.name,
116 instances_desc,
117 shutdown_timeout.as_secs(),
118 );
119 } else {
120 log::error!(
121 "Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.",
122 task.name,
123 task.group,
124 instances_desc,
125 shutdown_timeout.as_secs(),
126 );
127 }
128 });
129 }
130
131 res.map_err(Into::into)
132 }
133
134 pub fn sync_run<E>(
136 self,
137 runner: impl FnOnce(Configuration) -> std::result::Result<(), E>,
138 ) -> std::result::Result<(), E>
139 where
140 E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
141 {
142 runner(self.config)
143 }
144
145 pub fn async_run<F, E>(
148 self,
149 runner: impl FnOnce(Configuration) -> std::result::Result<(F, TaskManager), E>,
150 ) -> std::result::Result<(), E>
151 where
152 F: Future<Output = std::result::Result<(), E>>,
153 E: std::error::Error + Send + Sync + 'static + From<ServiceError> + From<CliError>,
154 {
155 let (future, task_manager) = runner(self.config)?;
156 self.tokio_runtime.block_on(self.signals.run_until_signal(future.fuse()))?;
157 drop(task_manager);
160 Ok(())
161 }
162
163 pub fn config(&self) -> &Configuration {
165 &self.config
166 }
167
168 pub fn config_mut(&mut self) -> &mut Configuration {
170 &mut self.config
171 }
172}
173
174pub fn print_node_infos<C: SubstrateCli>(config: &Configuration) {
176 info!("{}", C::impl_name());
177 info!("✌️ version {}", C::impl_version());
178 info!("❤️ by {}, {}-{}", C::author(), C::copyright_start_year(), Local::now().year());
179 info!("📋 Chain specification: {}", config.chain_spec.name());
180 info!("🏷 Node name: {}", config.network.node_name);
181 info!("👤 Role: {}", config.display_role());
182 info!(
183 "💾 Database: {} at {}",
184 config.database,
185 config
186 .database
187 .path()
188 .map_or_else(|| "<unknown>".to_owned(), |p| p.display().to_string())
189 );
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use sc_network::config::NetworkConfiguration;
196 use sc_service::{
197 config::{ExecutorConfiguration, RpcConfiguration},
198 Arc, ChainType, GenericChainSpec, NoExtension,
199 };
200 use std::{
201 path::PathBuf,
202 sync::atomic::{AtomicU64, Ordering},
203 };
204
205 struct Cli;
206
207 impl SubstrateCli for Cli {
208 fn author() -> String {
209 "test".into()
210 }
211
212 fn impl_name() -> String {
213 "yep".into()
214 }
215
216 fn impl_version() -> String {
217 "version".into()
218 }
219
220 fn description() -> String {
221 "desc".into()
222 }
223
224 fn support_url() -> String {
225 "no.pe".into()
226 }
227
228 fn copyright_start_year() -> i32 {
229 2042
230 }
231
232 fn load_spec(
233 &self,
234 _: &str,
235 ) -> std::result::Result<Box<dyn sc_service::ChainSpec>, String> {
236 Err("nope".into())
237 }
238 }
239
240 fn create_runner() -> Runner<Cli> {
241 let runtime = build_runtime().unwrap();
242
243 let root = PathBuf::from("db");
244 let runner = Runner::new(
245 Configuration {
246 impl_name: "spec".into(),
247 impl_version: "3".into(),
248 role: sc_service::Role::Authority,
249 tokio_handle: runtime.handle().clone(),
250 transaction_pool: Default::default(),
251 network: NetworkConfiguration::new_memory(),
252 keystore: sc_service::config::KeystoreConfig::InMemory,
253 database: sc_client_db::DatabaseSource::ParityDb { path: root.clone() },
254 trie_cache_maximum_size: None,
255 warm_up_trie_cache: None,
256 state_pruning: None,
257 blocks_pruning: sc_client_db::BlocksPruning::KeepAll,
258 chain_spec: Box::new(
259 GenericChainSpec::<NoExtension, ()>::builder(
260 Default::default(),
261 NoExtension::None,
262 )
263 .with_name("test")
264 .with_id("test_id")
265 .with_chain_type(ChainType::Development)
266 .with_genesis_config_patch(Default::default())
267 .build(),
268 ),
269 executor: ExecutorConfiguration::default(),
270 wasm_runtime_overrides: None,
271 rpc: RpcConfiguration {
272 addr: None,
273 max_connections: Default::default(),
274 cors: None,
275 methods: Default::default(),
276 max_request_size: Default::default(),
277 max_response_size: Default::default(),
278 id_provider: Default::default(),
279 max_subs_per_conn: Default::default(),
280 message_buffer_capacity: Default::default(),
281 port: 9944,
282 batch_config: sc_service::config::RpcBatchRequestConfig::Unlimited,
283 rate_limit: None,
284 rate_limit_whitelisted_ips: Default::default(),
285 rate_limit_trust_proxy_headers: Default::default(),
286 request_logger_limit: 1024,
287 },
288 prometheus_config: None,
289 telemetry_endpoints: None,
290 offchain_worker: Default::default(),
291 force_authoring: false,
292 disable_grandpa: false,
293 dev_key_seed: None,
294 tracing_targets: None,
295 tracing_receiver: Default::default(),
296 announce_block: true,
297 base_path: sc_service::BasePath::new(root.clone()),
298 data_path: root,
299 },
300 runtime,
301 Signals::dummy(),
302 )
303 .unwrap();
304
305 runner
306 }
307
308 #[test]
309 fn ensure_run_until_exit_informs_tasks_to_end() {
310 let runner = create_runner();
311
312 let counter = Arc::new(AtomicU64::new(0));
313 let counter2 = counter.clone();
314
315 runner
316 .run_node_until_exit(move |cfg| async move {
317 let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
318 let (sender, receiver) = futures::channel::oneshot::channel();
319
320 task_manager.spawn_handle().spawn_blocking("test", None, async move {
324 let _ = sender.send(());
325 loop {
326 counter2.fetch_add(1, Ordering::Relaxed);
327 futures_timer::Delay::new(Duration::from_millis(50)).await;
328 }
329 });
330
331 task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
332 let _ = receiver.await;
335 });
336
337 Ok::<_, sc_service::Error>(task_manager)
338 })
339 .unwrap_err();
340
341 let count = counter.load(Ordering::Relaxed);
342
343 assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
347 }
348
349 fn run_test_in_another_process(
350 test_name: &str,
351 test_body: impl FnOnce(),
352 ) -> Option<std::process::Output> {
353 if std::env::var("RUN_FORKED_TEST").is_ok() {
354 test_body();
355 None
356 } else {
357 let output = std::process::Command::new(std::env::current_exe().unwrap())
358 .arg(test_name)
359 .env("RUN_FORKED_TEST", "1")
360 .output()
361 .unwrap();
362
363 assert!(output.status.success());
364 Some(output)
365 }
366 }
367
368 #[test]
371 fn ensure_run_until_exit_is_not_blocking_indefinitely() {
372 let output = run_test_in_another_process(
373 "ensure_run_until_exit_is_not_blocking_indefinitely",
374 || {
375 sp_tracing::try_init_simple();
376
377 let runner = create_runner();
378
379 runner
380 .run_node_until_exit(move |cfg| async move {
381 let task_manager =
382 TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
383 let (sender, receiver) = futures::channel::oneshot::channel();
384
385 task_manager.spawn_handle().spawn_blocking("test", None, async move {
388 let _ = sender.send(());
389 loop {
390 std::thread::sleep(Duration::from_secs(30));
391 }
392 });
393
394 task_manager.spawn_essential_handle().spawn_blocking(
395 "test2",
396 None,
397 async {
398 let _ = receiver.await;
401 },
402 );
403
404 Ok::<_, sc_service::Error>(task_manager)
405 })
406 .unwrap_err();
407 },
408 );
409
410 let Some(output) = output else { return };
411
412 let stderr = dbg!(String::from_utf8(output.stderr).unwrap());
413
414 assert!(
415 stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.")
416 );
417 assert!(!stderr
418 .contains("Task \"test2\" was still running after waiting 60 seconds to finish."));
419 }
420}