1use crate::{error::Error as CliError, Result, Signals, SubstrateCli};
8use chrono::prelude::*;
9use futures::{future::FutureExt, Future};
10use log::info;
11use soil_client::utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
12use soil_service::{Configuration, Error as ServiceError, TaskManager};
13use std::{marker::PhantomData, time::Duration};
14
15pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::Error> {
17 tokio::runtime::Builder::new_multi_thread()
18 .on_thread_start(|| {
19 TOKIO_THREADS_ALIVE.inc();
20 TOKIO_THREADS_TOTAL.inc();
21 })
22 .on_thread_stop(|| {
23 TOKIO_THREADS_ALIVE.dec();
24 })
25 .enable_all()
26 .build()
27}
28
29pub struct Runner<C: SubstrateCli> {
31 config: Configuration,
32 tokio_runtime: tokio::runtime::Runtime,
33 signals: Signals,
34 phantom: PhantomData<C>,
35}
36
37impl<C: SubstrateCli> Runner<C> {
38 pub fn new(
40 config: Configuration,
41 tokio_runtime: tokio::runtime::Runtime,
42 signals: Signals,
43 ) -> Result<Runner<C>> {
44 Ok(Runner { config, tokio_runtime, signals, phantom: PhantomData })
45 }
46
47 fn print_node_infos(&self) {
62 print_node_infos::<C>(self.config())
63 }
64
65 pub fn run_node_until_exit<F, E>(
68 self,
69 initialize: impl FnOnce(Configuration) -> F,
70 ) -> std::result::Result<(), E>
71 where
72 F: Future<Output = std::result::Result<TaskManager, E>>,
73 E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
74 {
75 self.print_node_infos();
76
77 let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
78
79 let res = self
80 .tokio_runtime
81 .block_on(self.signals.run_until_signal(task_manager.future().fuse()));
82 let task_registry = task_manager.into_task_registry();
87
88 let shutdown_timeout = Duration::from_secs(60);
90 self.tokio_runtime.shutdown_timeout(shutdown_timeout);
91
92 let running_tasks = task_registry.running_tasks();
93
94 if !running_tasks.is_empty() {
95 log::error!("Detected running(potentially stalled) tasks on shutdown:");
96 running_tasks.iter().for_each(|(task, count)| {
97 let instances_desc =
98 if *count > 1 { format!("with {} instances ", count) } else { "".to_string() };
99
100 if task.is_default_group() {
101 log::error!(
102 "Task \"{}\" was still running {}after waiting {} seconds to finish.",
103 task.name,
104 instances_desc,
105 shutdown_timeout.as_secs(),
106 );
107 } else {
108 log::error!(
109 "Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.",
110 task.name,
111 task.group,
112 instances_desc,
113 shutdown_timeout.as_secs(),
114 );
115 }
116 });
117 }
118
119 res.map_err(Into::into)
120 }
121
122 pub fn sync_run<E>(
124 self,
125 runner: impl FnOnce(Configuration) -> std::result::Result<(), E>,
126 ) -> std::result::Result<(), E>
127 where
128 E: std::error::Error + Send + Sync + 'static + From<ServiceError>,
129 {
130 runner(self.config)
131 }
132
133 pub fn async_run<F, E>(
136 self,
137 runner: impl FnOnce(Configuration) -> std::result::Result<(F, TaskManager), E>,
138 ) -> std::result::Result<(), E>
139 where
140 F: Future<Output = std::result::Result<(), E>>,
141 E: std::error::Error + Send + Sync + 'static + From<ServiceError> + From<CliError>,
142 {
143 let (future, task_manager) = runner(self.config)?;
144 self.tokio_runtime.block_on(self.signals.run_until_signal(future.fuse()))?;
145 drop(task_manager);
148 Ok(())
149 }
150
151 pub fn config(&self) -> &Configuration {
153 &self.config
154 }
155
156 pub fn config_mut(&mut self) -> &mut Configuration {
158 &mut self.config
159 }
160}
161
162pub fn print_node_infos<C: SubstrateCli>(config: &Configuration) {
164 info!("{}", C::impl_name());
165 info!("✌️ version {}", C::impl_version());
166 info!("❤️ by {}, {}-{}", C::author(), C::copyright_start_year(), Local::now().year());
167 info!("📋 Chain specification: {}", config.chain_spec.name());
168 info!("🏷 Node name: {}", config.network.node_name);
169 info!("👤 Role: {}", config.display_role());
170 info!(
171 "💾 Database: {} at {}",
172 config.database,
173 config
174 .database
175 .path()
176 .map_or_else(|| "<unknown>".to_owned(), |p| p.display().to_string())
177 );
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use soil_network::config::NetworkConfiguration;
184 use soil_service::{
185 config::{ExecutorConfiguration, RpcConfiguration},
186 Arc, ChainType, GenericChainSpec, NoExtension,
187 };
188 use std::{
189 path::PathBuf,
190 sync::atomic::{AtomicU64, Ordering},
191 };
192
193 struct Cli;
194
195 impl SubstrateCli for Cli {
196 fn author() -> String {
197 "test".into()
198 }
199
200 fn impl_name() -> String {
201 "yep".into()
202 }
203
204 fn impl_version() -> String {
205 "version".into()
206 }
207
208 fn description() -> String {
209 "desc".into()
210 }
211
212 fn support_url() -> String {
213 "no.pe".into()
214 }
215
216 fn copyright_start_year() -> i32 {
217 2042
218 }
219
220 fn load_spec(
221 &self,
222 _: &str,
223 ) -> std::result::Result<Box<dyn soil_service::ChainSpec>, String> {
224 Err("nope".into())
225 }
226 }
227
228 fn create_runner() -> Runner<Cli> {
229 let runtime = build_runtime().unwrap();
230
231 let root = PathBuf::from("db");
232 let runner = Runner::new(
233 Configuration {
234 impl_name: "spec".into(),
235 impl_version: "3".into(),
236 role: soil_service::Role::Authority,
237 tokio_handle: runtime.handle().clone(),
238 transaction_pool: Default::default(),
239 network: NetworkConfiguration::new_memory(),
240 keystore: soil_service::config::KeystoreConfig::InMemory,
241 database: soil_client::db::DatabaseSource::ParityDb { path: root.clone() },
242 trie_cache_maximum_size: None,
243 warm_up_trie_cache: None,
244 state_pruning: None,
245 blocks_pruning: soil_client::db::BlocksPruning::KeepAll,
246 chain_spec: Box::new(
247 GenericChainSpec::<NoExtension, ()>::builder(
248 Default::default(),
249 NoExtension::None,
250 )
251 .with_name("test")
252 .with_id("test_id")
253 .with_chain_type(ChainType::Development)
254 .with_genesis_config_patch(Default::default())
255 .build(),
256 ),
257 executor: ExecutorConfiguration::default(),
258 wasm_runtime_overrides: None,
259 rpc: RpcConfiguration {
260 addr: None,
261 max_connections: Default::default(),
262 cors: None,
263 methods: Default::default(),
264 max_request_size: Default::default(),
265 max_response_size: Default::default(),
266 id_provider: Default::default(),
267 max_subs_per_conn: Default::default(),
268 message_buffer_capacity: Default::default(),
269 port: 9944,
270 batch_config: soil_service::config::RpcBatchRequestConfig::Unlimited,
271 rate_limit: None,
272 rate_limit_whitelisted_ips: Default::default(),
273 rate_limit_trust_proxy_headers: Default::default(),
274 request_logger_limit: 1024,
275 },
276 prometheus_config: None,
277 telemetry_endpoints: None,
278 offchain_worker: Default::default(),
279 force_authoring: false,
280 disable_grandpa: false,
281 dev_key_seed: None,
282 tracing_targets: None,
283 tracing_receiver: Default::default(),
284 announce_block: true,
285 base_path: soil_service::BasePath::new(root.clone()),
286 data_path: root,
287 },
288 runtime,
289 Signals::dummy(),
290 )
291 .unwrap();
292
293 runner
294 }
295
296 #[test]
297 fn ensure_run_until_exit_informs_tasks_to_end() {
298 let runner = create_runner();
299
300 let counter = Arc::new(AtomicU64::new(0));
301 let counter2 = counter.clone();
302
303 runner
304 .run_node_until_exit(move |cfg| async move {
305 let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
306 let (sender, receiver) = futures::channel::oneshot::channel();
307
308 task_manager.spawn_handle().spawn_blocking("test", None, async move {
312 let _ = sender.send(());
313 loop {
314 counter2.fetch_add(1, Ordering::Relaxed);
315 futures_timer::Delay::new(Duration::from_millis(50)).await;
316 }
317 });
318
319 task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
320 let _ = receiver.await;
323 });
324
325 Ok::<_, soil_service::Error>(task_manager)
326 })
327 .unwrap_err();
328
329 let count = counter.load(Ordering::Relaxed);
330
331 assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
335 }
336
337 fn run_test_in_another_process(
338 test_name: &str,
339 test_body: impl FnOnce(),
340 ) -> Option<std::process::Output> {
341 if std::env::var("RUN_FORKED_TEST").is_ok() {
342 test_body();
343 None
344 } else {
345 let output = std::process::Command::new(std::env::current_exe().unwrap())
346 .arg(test_name)
347 .env("RUN_FORKED_TEST", "1")
348 .output()
349 .unwrap();
350
351 assert!(output.status.success());
352 Some(output)
353 }
354 }
355
356 #[test]
359 fn ensure_run_until_exit_is_not_blocking_indefinitely() {
360 let output = run_test_in_another_process(
361 "ensure_run_until_exit_is_not_blocking_indefinitely",
362 || {
363 subsoil::tracing::try_init_simple();
364
365 let runner = create_runner();
366
367 runner
368 .run_node_until_exit(move |cfg| async move {
369 let task_manager =
370 TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
371 let (sender, receiver) = futures::channel::oneshot::channel();
372
373 task_manager.spawn_handle().spawn_blocking("test", None, async move {
376 let _ = sender.send(());
377 loop {
378 std::thread::sleep(Duration::from_secs(30));
379 }
380 });
381
382 task_manager.spawn_essential_handle().spawn_blocking(
383 "test2",
384 None,
385 async {
386 let _ = receiver.await;
389 },
390 );
391
392 Ok::<_, soil_service::Error>(task_manager)
393 })
394 .unwrap_err();
395 },
396 );
397
398 let Some(output) = output else { return };
399
400 let stderr = dbg!(String::from_utf8(output.stderr).unwrap());
401
402 assert!(
403 stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.")
404 );
405 assert!(!stderr
406 .contains("Task \"test2\" was still running after waiting 60 seconds to finish."));
407 }
408}