1use crate::{error::Error as CliError, Result, Signals, SubstrateCli};
20use chrono::prelude::*;
21use futures::{future::FutureExt, Future};
22use log::info;
23use sc_service::{Configuration, Error as ServiceError, TaskManager};
24use sc_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
25use std::{marker::PhantomData, time::Duration};
26
27pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::Error> {
29 tokio::runtime::Builder::new_multi_thread()
30 .on_thread_start(|| {
31 TOKIO_THREADS_ALIVE.inc();
32 TOKIO_THREADS_TOTAL.inc();
33 })
34 .on_thread_stop(|| {
35 TOKIO_THREADS_ALIVE.dec();
36 })
37 .enable_all()
38 .build()
39}
40
41pub struct Runner<C: SubstrateCli> {
43 config: Configuration,
44 tokio_runtime: tokio::runtime::Runtime,
45 signals: Signals,
46 phantom: PhantomData<C>,
47}
48
49impl<C: SubstrateCli> Runner<C> {
50 pub fn new(
52 config: Configuration,
53 tokio_runtime: tokio::runtime::Runtime,
54 signals: Signals,
55 ) -> Result<Runner<C>> {
56 Ok(Runner { config, tokio_runtime, signals, phantom: PhantomData })
57 }
58
59 fn print_node_infos(&self) {
74 print_node_infos::<C>(self.config())
75 }
76
77 pub fn run_node_until_exit<F, E>(
80 self,
81 initialize: impl FnOnce(Configuration) -> F,
82 ) -> std::result::Result<(), E>
83 where
84 F: Future<Output = std::result::Result<TaskManager, E>>,
85 E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
86 {
87 self.print_node_infos();
88
89 let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
90
91 let res = self
92 .tokio_runtime
93 .block_on(self.signals.run_until_signal(task_manager.future().fuse()));
94 let task_registry = task_manager.into_task_registry();
99
100 let shutdown_timeout = Duration::from_secs(60);
102 self.tokio_runtime.shutdown_timeout(shutdown_timeout);
103
104 let running_tasks = task_registry.running_tasks();
105
106 if !running_tasks.is_empty() {
107 log::error!("Detected running(potentially stalled) tasks on shutdown:");
108 running_tasks.iter().for_each(|(task, count)| {
109 let instances_desc =
110 if *count > 1 { format!("with {} instances ", count) } else { "".to_string() };
111
112 if task.is_default_group() {
113 log::error!(
114 "Task \"{}\" was still running {}after waiting {} seconds to finish.",
115 task.name,
116 instances_desc,
117 shutdown_timeout.as_secs(),
118 );
119 } else {
120 log::error!(
121 "Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.",
122 task.name,
123 task.group,
124 instances_desc,
125 shutdown_timeout.as_secs(),
126 );
127 }
128 });
129 }
130
131 res.map_err(Into::into)
132 }
133
134 pub fn sync_run<E>(
136 self,
137 runner: impl FnOnce(Configuration) -> std::result::Result<(), E>,
138 ) -> std::result::Result<(), E>
139 where
140 E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
141 {
142 runner(self.config)
143 }
144
145 pub fn async_run<F, E>(
148 self,
149 runner: impl FnOnce(Configuration) -> std::result::Result<(F, TaskManager), E>,
150 ) -> std::result::Result<(), E>
151 where
152 F: Future<Output = std::result::Result<(), E>>,
153 E: std::error::Error + Send + Sync + 'static + From<ServiceError> + From<CliError>,
154 {
155 let (future, task_manager) = runner(self.config)?;
156 self.tokio_runtime.block_on(self.signals.run_until_signal(future.fuse()))?;
157 drop(task_manager);
160 Ok(())
161 }
162
163 pub fn config(&self) -> &Configuration {
165 &self.config
166 }
167
168 pub fn config_mut(&mut self) -> &mut Configuration {
170 &mut self.config
171 }
172}
173
174pub fn print_node_infos<C: SubstrateCli>(config: &Configuration) {
176 info!("{}", C::impl_name());
177 info!("✌️ version {}", C::impl_version());
178 info!("❤️ by {}, {}-{}", C::author(), C::copyright_start_year(), Local::now().year());
179 info!("📋 Chain specification: {}", config.chain_spec.name());
180 info!("🏷 Node name: {}", config.network.node_name);
181 info!("👤 Role: {}", config.display_role());
182 info!(
183 "💾 Database: {} at {}",
184 config.database,
185 config
186 .database
187 .path()
188 .map_or_else(|| "<unknown>".to_owned(), |p| p.display().to_string())
189 );
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use sc_network::config::NetworkConfiguration;
196 use sc_service::{
197 config::{ExecutorConfiguration, RpcConfiguration},
198 Arc, ChainType, GenericChainSpec, NoExtension,
199 };
200 use std::{
201 path::PathBuf,
202 sync::atomic::{AtomicU64, Ordering},
203 };
204
205 struct Cli;
206
207 impl SubstrateCli for Cli {
208 fn author() -> String {
209 "test".into()
210 }
211
212 fn impl_name() -> String {
213 "yep".into()
214 }
215
216 fn impl_version() -> String {
217 "version".into()
218 }
219
220 fn description() -> String {
221 "desc".into()
222 }
223
224 fn support_url() -> String {
225 "no.pe".into()
226 }
227
228 fn copyright_start_year() -> i32 {
229 2042
230 }
231
232 fn load_spec(
233 &self,
234 _: &str,
235 ) -> std::result::Result<Box<dyn sc_service::ChainSpec>, String> {
236 Err("nope".into())
237 }
238 }
239
240 fn create_runner() -> Runner<Cli> {
241 let runtime = build_runtime().unwrap();
242
243 let root = PathBuf::from("db");
244 let runner = Runner::new(
245 Configuration {
246 impl_name: "spec".into(),
247 impl_version: "3".into(),
248 role: sc_service::Role::Authority,
249 tokio_handle: runtime.handle().clone(),
250 transaction_pool: Default::default(),
251 network: NetworkConfiguration::new_memory(),
252 keystore: sc_service::config::KeystoreConfig::InMemory,
253 database: sc_client_db::DatabaseSource::ParityDb { path: root.clone() },
254 trie_cache_maximum_size: None,
255 state_pruning: None,
256 blocks_pruning: sc_client_db::BlocksPruning::KeepAll,
257 chain_spec: Box::new(
258 GenericChainSpec::<NoExtension, ()>::builder(
259 Default::default(),
260 NoExtension::None,
261 )
262 .with_name("test")
263 .with_id("test_id")
264 .with_chain_type(ChainType::Development)
265 .with_genesis_config_patch(Default::default())
266 .build(),
267 ),
268 executor: ExecutorConfiguration::default(),
269 wasm_runtime_overrides: None,
270 rpc: RpcConfiguration {
271 addr: None,
272 max_connections: Default::default(),
273 cors: None,
274 methods: Default::default(),
275 max_request_size: Default::default(),
276 max_response_size: Default::default(),
277 id_provider: Default::default(),
278 max_subs_per_conn: Default::default(),
279 message_buffer_capacity: Default::default(),
280 port: 9944,
281 batch_config: sc_service::config::RpcBatchRequestConfig::Unlimited,
282 rate_limit: None,
283 rate_limit_whitelisted_ips: Default::default(),
284 rate_limit_trust_proxy_headers: Default::default(),
285 },
286 prometheus_config: None,
287 telemetry_endpoints: None,
288 offchain_worker: Default::default(),
289 force_authoring: false,
290 disable_grandpa: false,
291 dev_key_seed: None,
292 tracing_targets: None,
293 tracing_receiver: Default::default(),
294 announce_block: true,
295 base_path: sc_service::BasePath::new(root.clone()),
296 data_path: root,
297 },
298 runtime,
299 Signals::dummy(),
300 )
301 .unwrap();
302
303 runner
304 }
305
306 #[test]
307 fn ensure_run_until_exit_informs_tasks_to_end() {
308 let runner = create_runner();
309
310 let counter = Arc::new(AtomicU64::new(0));
311 let counter2 = counter.clone();
312
313 runner
314 .run_node_until_exit(move |cfg| async move {
315 let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
316 let (sender, receiver) = futures::channel::oneshot::channel();
317
318 task_manager.spawn_handle().spawn_blocking("test", None, async move {
322 let _ = sender.send(());
323 loop {
324 counter2.fetch_add(1, Ordering::Relaxed);
325 futures_timer::Delay::new(Duration::from_millis(50)).await;
326 }
327 });
328
329 task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
330 let _ = receiver.await;
333 });
334
335 Ok::<_, sc_service::Error>(task_manager)
336 })
337 .unwrap_err();
338
339 let count = counter.load(Ordering::Relaxed);
340
341 assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
345 }
346
347 fn run_test_in_another_process(
348 test_name: &str,
349 test_body: impl FnOnce(),
350 ) -> Option<std::process::Output> {
351 if std::env::var("RUN_FORKED_TEST").is_ok() {
352 test_body();
353 None
354 } else {
355 let output = std::process::Command::new(std::env::current_exe().unwrap())
356 .arg(test_name)
357 .env("RUN_FORKED_TEST", "1")
358 .output()
359 .unwrap();
360
361 assert!(output.status.success());
362 Some(output)
363 }
364 }
365
366 #[test]
369 fn ensure_run_until_exit_is_not_blocking_indefinitely() {
370 let output = run_test_in_another_process(
371 "ensure_run_until_exit_is_not_blocking_indefinitely",
372 || {
373 sp_tracing::try_init_simple();
374
375 let runner = create_runner();
376
377 runner
378 .run_node_until_exit(move |cfg| async move {
379 let task_manager =
380 TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
381 let (sender, receiver) = futures::channel::oneshot::channel();
382
383 task_manager.spawn_handle().spawn_blocking("test", None, async move {
386 let _ = sender.send(());
387 loop {
388 std::thread::sleep(Duration::from_secs(30));
389 }
390 });
391
392 task_manager.spawn_essential_handle().spawn_blocking(
393 "test2",
394 None,
395 async {
396 let _ = receiver.await;
399 },
400 );
401
402 Ok::<_, sc_service::Error>(task_manager)
403 })
404 .unwrap_err();
405 },
406 );
407
408 let Some(output) = output else { return };
409
410 let stderr = dbg!(String::from_utf8(output.stderr).unwrap());
411
412 assert!(
413 stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.")
414 );
415 assert!(!stderr
416 .contains("Task \"test2\" was still running after waiting 60 seconds to finish."));
417 }
418}