Skip to main content

sc_service/
builder.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	build_network_future, build_system_rpc_future,
21	client::{Client, ClientConfig},
22	config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23	error::Error,
24	metrics::MetricsService,
25	start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers,
26	SpawnEssentialTaskHandle, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
27};
28use futures::{select, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::{debug, error, info};
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34	execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35	BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, KeysIter, StorageProvider,
36	TrieCacheContext, UsageProvider,
37};
38use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
39use sc_consensus::import_queue::{ImportQueue, ImportQueueService};
40use sc_executor::{
41	sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
42	WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
43};
44use sc_keystore::LocalKeystore;
45use sc_network::{
46	config::{FullNetworkConfiguration, IpfsConfig, ProtocolId, SyncMode},
47	multiaddr::Protocol,
48	service::{
49		traits::{PeerStore, RequestResponseConfig},
50		NotificationMetrics,
51	},
52	IpfsIndexedTransactions, NetworkBackend, NetworkStateInfo,
53};
54use sc_network_common::role::{Role, Roles};
55use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
56use sc_network_sync::{
57	block_relay_protocol::{BlockDownloader, BlockRelayParams},
58	block_request_handler::BlockRequestHandler,
59	engine::SyncingEngine,
60	service::network::{NetworkServiceHandle, NetworkServiceProvider},
61	state_request_handler::StateRequestHandler,
62	strategy::{
63		polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
64		SyncingStrategy,
65	},
66	warp_request_handler::RequestHandler as WarpSyncRequestHandler,
67	SyncingService, WarpSyncConfig,
68};
69use sc_rpc::{
70	author::AuthorApiServer,
71	chain::ChainApiServer,
72	offchain::OffchainApiServer,
73	state::{ChildStateApiServer, StateApiServer},
74	system::SystemApiServer,
75	DenyUnsafe, SubscriptionTaskExecutor,
76};
77use sc_rpc_spec_v2::{
78	archive::ArchiveApiServer,
79	bitswap::BitswapApiServer,
80	chain_head::ChainHeadApiServer,
81	chain_spec::ChainSpecApiServer,
82	transaction::{TransactionApiServer, TransactionBroadcastApiServer},
83};
84use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
85use sc_tracing::block::TracingExecuteBlock;
86use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
87use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
88use sp_api::{CallApiAt, ProvideRuntimeApi};
89use sp_blockchain::{HeaderBackend, HeaderMetadata};
90use sp_consensus::block_validation::{
91	BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
92};
93use sp_core::traits::{CodeExecutor, SpawnNamed};
94use sp_keystore::KeystorePtr;
95use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
96use sp_storage::{ChildInfo, ChildType, PrefixedStorageKey};
97use std::{
98	str::FromStr,
99	sync::Arc,
100	time::{Duration, SystemTime},
101};
102
103/// Cap the maximum number of blocks advertized to IPFS to two weeks at 6-second block time.
104/// Block pruning depth will be used if it is shorter.
105const IPFS_MAX_BLOCKS: u32 = 201600;
106
107/// Full client type.
108pub type TFullClient<TBl, TRtApi, TExec> =
109	Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
110
111/// Full client backend type.
112pub type TFullBackend<TBl> = Backend<TBl>;
113
114/// Full client call executor type.
115pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
116
117type TFullParts<TBl, TRtApi, TExec> =
118	(TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
119
120/// Construct a local keystore shareable container
121pub struct KeystoreContainer(Arc<LocalKeystore>);
122
123impl KeystoreContainer {
124	/// Construct KeystoreContainer
125	pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
126		let keystore = Arc::new(match config {
127			KeystoreConfig::Path { path, password } => {
128				LocalKeystore::open(path.clone(), password.clone())?
129			},
130			KeystoreConfig::InMemory => LocalKeystore::in_memory(),
131		});
132
133		Ok(Self(keystore))
134	}
135
136	/// Returns a shared reference to a dynamic `Keystore` trait implementation.
137	pub fn keystore(&self) -> KeystorePtr {
138		self.0.clone()
139	}
140
141	/// Returns a shared reference to the local keystore .
142	pub fn local_keystore(&self) -> Arc<LocalKeystore> {
143		self.0.clone()
144	}
145}
146
147/// Creates a new full client for the given config.
148pub fn new_full_client<TBl, TRtApi, TExec>(
149	config: &Configuration,
150	telemetry: Option<TelemetryHandle>,
151	executor: TExec,
152	pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
153) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
154where
155	TBl: BlockT,
156	TExec: CodeExecutor + RuntimeVersionOf + Clone,
157{
158	new_full_parts(config, telemetry, executor, pruning_filters).map(|parts| parts.0)
159}
160
161/// Create the initial parts of a full node with the default genesis block builder.
162///
163/// The `pruning_filters` parameter allows configuring which blocks should be preserved
164/// during pruning.
165pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
166	config: &Configuration,
167	telemetry: Option<TelemetryHandle>,
168	executor: TExec,
169	enable_import_proof_recording: bool,
170	pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
171) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
172where
173	TBl: BlockT,
174	TExec: CodeExecutor + RuntimeVersionOf + Clone,
175{
176	let mut db_config = config.db_config();
177	db_config.pruning_filters = pruning_filters;
178	let backend = new_db_backend(db_config)?;
179
180	let genesis_block_builder = GenesisBlockBuilder::new(
181		config.chain_spec.as_storage_builder(),
182		!config.no_genesis(),
183		backend.clone(),
184		executor.clone(),
185	)?;
186
187	new_full_parts_with_genesis_builder(
188		config,
189		telemetry,
190		executor,
191		backend,
192		genesis_block_builder,
193		enable_import_proof_recording,
194	)
195}
196
197/// Create the initial parts of a full node with the default genesis block builder.
198///
199/// The `pruning_filters` parameter allows configuring which blocks should be preserved
200/// during pruning.
201pub fn new_full_parts<TBl, TRtApi, TExec>(
202	config: &Configuration,
203	telemetry: Option<TelemetryHandle>,
204	executor: TExec,
205	pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
206) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
207where
208	TBl: BlockT,
209	TExec: CodeExecutor + RuntimeVersionOf + Clone,
210{
211	new_full_parts_record_import(config, telemetry, executor, false, pruning_filters)
212}
213
214/// Create the initial parts of a full node.
215pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
216	config: &Configuration,
217	telemetry: Option<TelemetryHandle>,
218	executor: TExec,
219	backend: Arc<TFullBackend<TBl>>,
220	genesis_block_builder: TBuildGenesisBlock,
221	enable_import_proof_recording: bool,
222) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
223where
224	TBl: BlockT,
225	TExec: CodeExecutor + RuntimeVersionOf + Clone,
226	TBuildGenesisBlock: BuildGenesisBlock<
227		TBl,
228		BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
229	>,
230{
231	let keystore_container = KeystoreContainer::new(&config.keystore)?;
232
233	let task_manager = {
234		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
235		TaskManager::new(config.tokio_handle.clone(), registry)?
236	};
237
238	let chain_spec = &config.chain_spec;
239	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
240		.cloned()
241		.unwrap_or_default();
242
243	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
244		.cloned()
245		.unwrap_or_default();
246
247	let client = {
248		let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
249
250		let wasm_runtime_substitutes = config
251			.chain_spec
252			.code_substitutes()
253			.into_iter()
254			.map(|(n, c)| {
255				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
256					Error::Application(Box::from(format!(
257						"Failed to parse `{}` as block number for code substitutes. \
258						 In an old version the key for code substitute was a block hash. \
259						 Please update the chain spec to a version that is compatible with your node.",
260						n
261					)))
262				})?;
263				Ok((number, c))
264			})
265			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
266
267		let client = new_client(
268			backend.clone(),
269			executor,
270			genesis_block_builder,
271			fork_blocks,
272			bad_blocks,
273			extensions,
274			Box::new(task_manager.spawn_handle()),
275			config.prometheus_config.as_ref().map(|config| config.registry.clone()),
276			telemetry,
277			ClientConfig {
278				offchain_worker_enabled: config.offchain_worker.enabled,
279				offchain_indexing_api: config.offchain_worker.indexing_enabled,
280				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
281				no_genesis: config.no_genesis(),
282				wasm_runtime_substitutes,
283				enable_import_proof_recording,
284			},
285		)?;
286
287		if let Some(warm_up_strategy) = config.warm_up_trie_cache {
288			let storage_root = client.usage_info().chain.best_hash;
289			let backend_clone = backend.clone();
290
291			if warm_up_strategy.is_blocking() {
292				// We use the blocking strategy for testing purposes.
293				// So better to error out if it fails.
294				warm_up_trie_cache(backend_clone, storage_root)?;
295			} else {
296				task_manager.spawn_handle().spawn_blocking(
297					"warm-up-trie-cache",
298					None,
299					async move {
300						if let Err(e) = warm_up_trie_cache(backend_clone, storage_root) {
301							error!("Failed to warm up trie cache: {e}");
302						}
303					},
304				);
305			}
306		}
307
308		client
309	};
310
311	Ok((client, backend, keystore_container, task_manager))
312}
313
314fn child_info(key: Vec<u8>) -> Option<ChildInfo> {
315	let prefixed_key = PrefixedStorageKey::new(key);
316	ChildType::from_prefixed_key(&prefixed_key).and_then(|(child_type, storage_key)| {
317		(child_type == ChildType::ParentKeyId).then(|| ChildInfo::new_default(storage_key))
318	})
319}
320
321fn warm_up_trie_cache<TBl: BlockT>(
322	backend: Arc<TFullBackend<TBl>>,
323	storage_root: TBl::Hash,
324) -> Result<(), Error> {
325	use sc_client_api::backend::Backend;
326	use sp_state_machine::Backend as StateBackend;
327
328	let untrusted_state = || backend.state_at(storage_root, TrieCacheContext::Untrusted);
329	let trusted_state = || backend.state_at(storage_root, TrieCacheContext::Trusted);
330
331	debug!("Populating trie cache started",);
332	let start_time = std::time::Instant::now();
333	let mut keys_count = 0;
334	let mut child_keys_count = 0;
335	for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None)? {
336		if keys_count != 0 && keys_count % 100_000 == 0 {
337			debug!("{} keys and {} child keys have been warmed", keys_count, child_keys_count);
338		}
339		match child_info(key.0.clone()) {
340			Some(info) => {
341				for child_key in
342					KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None)?
343				{
344					if trusted_state()?
345						.child_storage(&info, &child_key.0)
346						.unwrap_or_default()
347						.is_none()
348					{
349						debug!("Child storage value unexpectedly empty: {child_key:?}");
350					}
351					child_keys_count += 1;
352				}
353			},
354			None => {
355				if trusted_state()?.storage(&key.0).unwrap_or_default().is_none() {
356					debug!("Storage value unexpectedly empty: {key:?}");
357				}
358				keys_count += 1;
359			},
360		}
361	}
362	debug!(
363		"Trie cache populated with {keys_count} keys and {child_keys_count} child keys in {} s",
364		start_time.elapsed().as_secs_f32()
365	);
366
367	Ok(())
368}
369
370/// Creates a [`NativeElseWasmExecutor`](sc_executor::NativeElseWasmExecutor) according to
371/// [`Configuration`].
372#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
373#[allow(deprecated)]
374pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
375	config: &Configuration,
376) -> sc_executor::NativeElseWasmExecutor<D> {
377	#[allow(deprecated)]
378	sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
379}
380
381/// Creates a [`WasmExecutor`] according to [`ExecutorConfiguration`].
382pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
383	let strategy = config
384		.default_heap_pages
385		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
386	WasmExecutor::<H>::builder()
387		.with_execution_method(config.wasm_method)
388		.with_onchain_heap_alloc_strategy(strategy)
389		.with_offchain_heap_alloc_strategy(strategy)
390		.with_max_runtime_instances(config.max_runtime_instances)
391		.with_runtime_cache_size(config.runtime_cache_size)
392		.build()
393}
394
395/// Create an instance of the default DB-backend.
396///
397/// Pruning filters can be configured via `settings.pruning_filters`.
398/// If any filter returns `true` for a block's justifications, the block will not be pruned.
399pub fn new_db_backend<Block>(
400	settings: DatabaseSettings,
401) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
402where
403	Block: BlockT,
404{
405	const CANONICALIZATION_DELAY: u64 = 4096;
406
407	Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
408}
409
410/// Create an instance of client backed by given backend.
411pub fn new_client<E, Block, RA, G>(
412	backend: Arc<Backend<Block>>,
413	executor: E,
414	genesis_block_builder: G,
415	fork_blocks: ForkBlocks<Block>,
416	bad_blocks: BadBlocks<Block>,
417	execution_extensions: ExecutionExtensions<Block>,
418	spawn_handle: Box<dyn SpawnNamed>,
419	prometheus_registry: Option<Registry>,
420	telemetry: Option<TelemetryHandle>,
421	config: ClientConfig<Block>,
422) -> Result<
423	Client<
424		Backend<Block>,
425		crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
426		Block,
427		RA,
428	>,
429	sp_blockchain::Error,
430>
431where
432	Block: BlockT,
433	E: CodeExecutor + RuntimeVersionOf,
434	G: BuildGenesisBlock<
435		Block,
436		BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
437	>,
438{
439	let executor = crate::client::LocalCallExecutor::new(
440		backend.clone(),
441		executor,
442		config.clone(),
443		execution_extensions,
444	)?;
445
446	Client::new(
447		backend,
448		executor,
449		spawn_handle,
450		genesis_block_builder,
451		fork_blocks,
452		bad_blocks,
453		prometheus_registry,
454		telemetry,
455		config,
456	)
457}
458
459/// Parameters to pass into `build`.
460pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
461	/// The service configuration.
462	pub config: Configuration,
463	/// A shared client returned by `new_full_parts`.
464	pub client: Arc<TCl>,
465	/// A shared backend returned by `new_full_parts`.
466	pub backend: Arc<Backend>,
467	/// A task manager returned by `new_full_parts`.
468	pub task_manager: &'a mut TaskManager,
469	/// A shared keystore returned by `new_full_parts`.
470	pub keystore: KeystorePtr,
471	/// A shared transaction pool.
472	pub transaction_pool: Arc<TExPool>,
473	/// Builds additional [`RpcModule`]s that should be added to the server
474	pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
475	/// A shared network instance.
476	pub network: Arc<dyn sc_network::service::traits::NetworkService>,
477	/// A Sender for RPC requests.
478	pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
479	/// Controller for transactions handlers
480	pub tx_handler_controller:
481		sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
482	/// Syncing service.
483	pub sync_service: Arc<SyncingService<TBl>>,
484	/// Telemetry instance for this node.
485	pub telemetry: Option<&'a mut Telemetry>,
486	/// Optional [`TracingExecuteBlock`] handle.
487	///
488	/// Will be used by the `trace_block` RPC to execute the actual block.
489	pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
490}
491
492/// Spawn the tasks that are required to run a node.
493pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
494	SpawnTasksParams {
495		mut config,
496		task_manager,
497		client,
498		backend,
499		keystore,
500		transaction_pool,
501		rpc_builder,
502		network,
503		system_rpc_tx,
504		tx_handler_controller,
505		sync_service,
506		telemetry,
507		tracing_execute_block: execute_block,
508	}: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
509) -> Result<RpcHandlers, Error>
510where
511	TCl: ProvideRuntimeApi<TBl>
512		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
513		+ Chain<TBl>
514		+ BlockBackend<TBl>
515		+ BlockIdTo<TBl, Error = sp_blockchain::Error>
516		+ ProofProvider<TBl>
517		+ HeaderBackend<TBl>
518		+ BlockchainEvents<TBl>
519		+ ExecutorProvider<TBl>
520		+ UsageProvider<TBl>
521		+ StorageProvider<TBl, TBackend>
522		+ CallApiAt<TBl>
523		+ Send
524		+ 'static,
525	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
526		+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
527		+ sp_session::SessionKeys<TBl>
528		+ sp_api::ApiExt<TBl>,
529	TBl: BlockT,
530	TBl::Hash: Unpin,
531	TBl::Header: Unpin,
532	TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
533	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
534{
535	let chain_info = client.usage_info().chain;
536
537	sp_session::generate_initial_session_keys(
538		client.clone(),
539		chain_info.best_hash,
540		config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
541		keystore.clone(),
542	)
543	.map_err(|e| Error::Application(Box::new(e)))?;
544
545	let sysinfo = sc_sysinfo::gather_sysinfo();
546	sc_sysinfo::print_sysinfo(&sysinfo);
547
548	let telemetry = telemetry
549		.map(|telemetry| {
550			init_telemetry(
551				config.network.node_name.clone(),
552				config.impl_name.clone(),
553				config.impl_version.clone(),
554				config.chain_spec.name().to_string(),
555				config.role.is_authority(),
556				network.clone(),
557				client.clone(),
558				telemetry,
559				Some(sysinfo),
560			)
561		})
562		.transpose()?;
563
564	info!("📦 Highest known block at #{}", chain_info.best_number);
565
566	let spawn_handle = task_manager.spawn_handle();
567
568	// Inform the tx pool about imported and finalized blocks.
569	spawn_handle.spawn(
570		"txpool-notifications",
571		Some("transaction-pool"),
572		sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
573	);
574
575	spawn_handle.spawn(
576		"on-transaction-imported",
577		Some("transaction-pool"),
578		propagate_transaction_notifications(
579			transaction_pool.clone(),
580			tx_handler_controller,
581			telemetry.clone(),
582		),
583	);
584
585	// Prometheus metrics.
586	let metrics_service =
587		if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
588			// Set static metrics.
589			let metrics = MetricsService::with_prometheus(
590				telemetry,
591				&registry,
592				config.role,
593				&config.network.node_name,
594				&config.impl_version,
595			)?;
596			spawn_handle.spawn(
597				"prometheus-endpoint",
598				None,
599				prometheus_endpoint::init_prometheus(port, registry).map(drop),
600			);
601
602			metrics
603		} else {
604			MetricsService::new(telemetry)
605		};
606
607	// Periodically updated metrics and telemetry updates.
608	spawn_handle.spawn(
609		"telemetry-periodic-send",
610		None,
611		metrics_service.run(
612			client.clone(),
613			transaction_pool.clone(),
614			network.clone(),
615			sync_service.clone(),
616		),
617	);
618
619	let rpc_id_provider = config.rpc.id_provider.take();
620
621	// jsonrpsee RPC
622	// RPC-V2 specific metrics need to be registered before the RPC server is started,
623	// since we might have two instances running (one for the in-memory RPC and one for the network
624	// RPC).
625	let rpc_v2_metrics = config
626		.prometheus_registry()
627		.map(|registry| sc_rpc_spec_v2::transaction::TransactionMetrics::new(registry))
628		.transpose()?;
629
630	// Create dedicated RPC runtime with limited blocking threads.
631	// This isolates RPC blocking operations from the rest of the node.
632	let rpc_runtime = sc_rpc_server::create_rpc_runtime(config.rpc.max_connections)
633		.map_err(|e| Error::Application(Box::new(e)))?;
634
635	// Create spawn handle for RPC tasks
636	let rpc_spawn_handle: Arc<dyn sp_core::traits::SpawnNamed> =
637		Arc::new(sc_rpc_server::RpcSpawnHandle::new(rpc_runtime.handle().clone()));
638
639	// Factory that creates RPC module
640	let gen_rpc_module = || {
641		gen_rpc_module(GenRpcModuleParams {
642			spawn_handle: rpc_spawn_handle.clone(),
643			client: client.clone(),
644			transaction_pool: transaction_pool.clone(),
645			keystore: keystore.clone(),
646			system_rpc_tx: system_rpc_tx.clone(),
647			impl_name: config.impl_name.clone(),
648			impl_version: config.impl_version.clone(),
649			chain_spec: config.chain_spec.as_ref(),
650			state_pruning: &config.state_pruning,
651			blocks_pruning: config.blocks_pruning,
652			backend: backend.clone(),
653			rpc_builder: &*rpc_builder,
654			metrics: rpc_v2_metrics.clone(),
655			sync_oracle: sync_service.clone(),
656			tracing_execute_block: execute_block.clone(),
657		})
658	};
659
660	// Generate the RPC module for the server
661	let rpc_api = gen_rpc_module()?;
662
663	let rpc_server_handle = start_rpc_servers(
664		&config.rpc,
665		config.prometheus_registry(),
666		&config.tokio_handle,
667		rpc_api,
668		rpc_runtime,
669		rpc_id_provider,
670	)?;
671
672	let listen_addrs = rpc_server_handle
673		.listen_addrs()
674		.into_iter()
675		.map(|socket_addr| {
676			let mut multiaddr: Multiaddr = socket_addr.ip().into();
677			multiaddr.push(Protocol::Tcp(socket_addr.port()));
678			multiaddr
679		})
680		.collect();
681
682	// In-memory RPC uses the same dedicated RPC runtime
683	let in_memory_rpc = {
684		let mut module = gen_rpc_module()?;
685		module.extensions_mut().insert(DenyUnsafe::No);
686		module
687	};
688
689	let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
690
691	// Spawn informant task
692	spawn_handle.spawn(
693		"informant",
694		None,
695		sc_informant::build(client.clone(), network, sync_service.clone()),
696	);
697
698	task_manager.keep_alive((config.base_path, rpc_server_handle));
699
700	Ok(in_memory_rpc_handle)
701}
702
703/// Returns a future that forwards imported transactions to the transaction networking protocol.
704pub async fn propagate_transaction_notifications<Block, ExPool>(
705	transaction_pool: Arc<ExPool>,
706	tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
707		<Block as BlockT>::Hash,
708	>,
709	telemetry: Option<TelemetryHandle>,
710) where
711	Block: BlockT,
712	ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
713{
714	const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);
715
716	// transaction notifications
717	let mut notifications = transaction_pool.import_notification_stream().fuse();
718	let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
719	let mut tx_imported = false;
720
721	loop {
722		select! {
723			notification = notifications.next() => {
724				let Some(hash) = notification else { return };
725
726				tx_handler_controller.propagate_transaction(hash);
727
728				tx_imported = true;
729			},
730			_ = timer => {
731				timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
732
733				if !tx_imported {
734					continue;
735				}
736
737				tx_imported = false;
738				let status = transaction_pool.status();
739
740				telemetry!(
741					telemetry;
742					SUBSTRATE_INFO;
743					"txpool.import";
744					"ready" => status.ready,
745					"future" => status.future,
746				);
747			}
748		}
749	}
750}
751
752/// Initialize telemetry with provided configuration and return telemetry handle
753pub fn init_telemetry<Block, Client, Network>(
754	name: String,
755	implementation: String,
756	version: String,
757	chain: String,
758	authority: bool,
759	network: Network,
760	client: Arc<Client>,
761	telemetry: &mut Telemetry,
762	sysinfo: Option<sc_telemetry::SysInfo>,
763) -> sc_telemetry::Result<TelemetryHandle>
764where
765	Block: BlockT,
766	Client: BlockBackend<Block>,
767	Network: NetworkStateInfo,
768{
769	let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
770	let connection_message = ConnectionMessage {
771		name,
772		implementation,
773		version,
774		target_os: sc_sysinfo::TARGET_OS.into(),
775		target_arch: sc_sysinfo::TARGET_ARCH.into(),
776		target_env: sc_sysinfo::TARGET_ENV.into(),
777		config: String::new(),
778		chain,
779		genesis_hash: format!("{:?}", genesis_hash),
780		authority,
781		startup_time: SystemTime::UNIX_EPOCH
782			.elapsed()
783			.map(|dur| dur.as_millis())
784			.unwrap_or(0)
785			.to_string(),
786		network_id: network.local_peer_id().to_base58(),
787		sysinfo,
788	};
789
790	telemetry.start_telemetry(connection_message)?;
791
792	Ok(telemetry.handle())
793}
794
795/// Parameters for [`gen_rpc_module`].
796pub struct GenRpcModuleParams<'a, TBl: BlockT, TBackend, TCl, TRpc, TExPool> {
797	/// The handle to spawn tasks on the RPC runtime.
798	pub spawn_handle: Arc<dyn sp_core::traits::SpawnNamed>,
799	/// Access to the client.
800	pub client: Arc<TCl>,
801	/// The transaction pool.
802	pub transaction_pool: Arc<TExPool>,
803	/// Keystore handle.
804	pub keystore: KeystorePtr,
805	/// Sender for system requests.
806	pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
807	/// Implementation name of this node.
808	pub impl_name: String,
809	/// Implementation version of this node.
810	pub impl_version: String,
811	/// The chain spec.
812	pub chain_spec: &'a dyn ChainSpec,
813	/// Enabled pruning mode for this node.
814	pub state_pruning: &'a Option<PruningMode>,
815	/// Enabled blocks pruning mode.
816	pub blocks_pruning: BlocksPruning,
817	/// Backend of the node.
818	pub backend: Arc<TBackend>,
819	/// RPC builder.
820	pub rpc_builder: &'a dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>,
821	/// Transaction metrics handle.
822	pub metrics: Option<sc_rpc_spec_v2::transaction::TransactionMetrics>,
823	/// Sync oracle for determining sync status.
824	pub sync_oracle: Arc<dyn sp_consensus::SyncOracle + Send + Sync>,
825	/// Optional [`TracingExecuteBlock`] handle.
826	///
827	/// Will be used by the `trace_block` RPC to execute the actual block.
828	pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
829}
830
831/// Generate RPC module using provided configuration
832pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
833	GenRpcModuleParams {
834		spawn_handle,
835		client,
836		transaction_pool,
837		keystore,
838		system_rpc_tx,
839		impl_name,
840		impl_version,
841		chain_spec,
842		state_pruning,
843		blocks_pruning,
844		backend,
845		rpc_builder,
846		metrics,
847		sync_oracle,
848		tracing_execute_block: execute_block,
849	}: GenRpcModuleParams<TBl, TBackend, TCl, TRpc, TExPool>,
850) -> Result<RpcModule<()>, Error>
851where
852	TBl: BlockT,
853	TCl: ProvideRuntimeApi<TBl>
854		+ BlockchainEvents<TBl>
855		+ HeaderBackend<TBl>
856		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
857		+ ExecutorProvider<TBl>
858		+ CallApiAt<TBl>
859		+ ProofProvider<TBl>
860		+ StorageProvider<TBl, TBackend>
861		+ BlockBackend<TBl>
862		+ Send
863		+ Sync
864		+ 'static,
865	TBackend: sc_client_api::backend::Backend<TBl> + 'static,
866	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
867	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
868	TBl::Hash: Unpin,
869	TBl::Header: Unpin,
870{
871	let system_info = sc_rpc::system::SystemInfo {
872		chain_name: chain_spec.name().into(),
873		impl_name,
874		impl_version,
875		properties: chain_spec.properties(),
876		chain_type: chain_spec.chain_type(),
877	};
878
879	let mut rpc_api = RpcModule::new(());
880	let task_executor = spawn_handle;
881
882	let (chain, state, child_state) = {
883		let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
884		let (state, child_state) =
885			sc_rpc::state::new_full(client.clone(), task_executor.clone(), execute_block);
886		let state = state.into_rpc();
887		let child_state = child_state.into_rpc();
888
889		(chain, state, child_state)
890	};
891
892	const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
893
894	let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
895		client.clone(),
896		transaction_pool.clone(),
897		task_executor.clone(),
898		MAX_TRANSACTION_PER_CONNECTION,
899	)
900	.into_rpc();
901
902	let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
903		client.clone(),
904		transaction_pool.clone(),
905		task_executor.clone(),
906		metrics,
907	)
908	.into_rpc();
909
910	let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
911		client.clone(),
912		backend.clone(),
913		task_executor.clone(),
914		// Defaults to sensible limits for the `ChainHead`.
915		sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
916	)
917	.into_rpc();
918
919	// Part of the RPC v2 spec.
920	// An archive node that can respond to the `archive` RPC-v2 queries is a node with:
921	// - state pruning in archive mode: The storage of blocks is kept around
922	// - block pruning in archive mode: The block's body is kept around
923	let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
924		blocks_pruning.is_archive();
925	let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
926	if is_archive_node {
927		let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
928			client.clone(),
929			backend.clone(),
930			genesis_hash,
931			task_executor.clone(),
932		)
933		.into_rpc();
934		rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
935	}
936
937	// ChainSpec RPC-v2.
938	let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
939		chain_spec.name().into(),
940		genesis_hash,
941		chain_spec.properties(),
942	)
943	.into_rpc();
944
945	// Bitswap RPC-v2 (do not confuse with v1 from `bitswap_v1_get`).
946	let bitswap_v2 = sc_rpc_spec_v2::bitswap::Bitswap::new(client.clone(), sync_oracle).into_rpc();
947
948	let author = sc_rpc::author::Author::new(
949		client.clone(),
950		transaction_pool,
951		keystore,
952		task_executor.clone(),
953	)
954	.into_rpc();
955
956	let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
957
958	if let Some(storage) = backend.offchain_storage() {
959		let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
960
961		rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
962	}
963
964	// Part of the RPC v2 spec.
965	rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
966	rpc_api
967		.merge(transaction_broadcast_rpc_v2)
968		.map_err(|e| Error::Application(e.into()))?;
969	rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
970	rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
971	rpc_api.merge(bitswap_v2).map_err(|e| Error::Application(e.into()))?;
972
973	// Part of the old RPC spec.
974	rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
975	rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
976	rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
977	rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
978	rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
979	// Additional [`RpcModule`]s defined in the node to fit the specific blockchain
980	let extra_rpcs = rpc_builder(task_executor.clone())?;
981	rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
982
983	Ok(rpc_api)
984}
985
986/// Parameters to pass into [`build_network`].
987pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
988where
989	Block: BlockT,
990	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
991{
992	/// The service configuration.
993	pub config: &'a Configuration,
994	/// Full network configuration.
995	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
996	/// A shared client returned by `new_full_parts`.
997	pub client: Arc<Client>,
998	/// A shared transaction pool.
999	pub transaction_pool: Arc<TxPool>,
1000	/// A handle for spawning tasks.
1001	pub spawn_handle: SpawnTaskHandle,
1002	/// A handle for spawning essential tasks.
1003	pub spawn_essential_handle: SpawnEssentialTaskHandle,
1004	/// An import queue.
1005	pub import_queue: IQ,
1006	/// A block announce validator builder.
1007	pub block_announce_validator_builder: Option<
1008		Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
1009	>,
1010	/// Optional warp sync config.
1011	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1012	/// User specified block relay params. If not specified, the default
1013	/// block request handler will be used.
1014	pub block_relay: Option<BlockRelayParams<Block, Net>>,
1015	/// Metrics.
1016	pub metrics: NotificationMetrics,
1017}
1018
1019/// Build the network service, the network status sinks and an RPC sender.
1020pub fn build_network<Block, Net, TxPool, IQ, Client>(
1021	params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
1022) -> Result<
1023	(
1024		Arc<dyn sc_network::service::traits::NetworkService>,
1025		TracingUnboundedSender<sc_rpc::system::Request<Block>>,
1026		sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1027		Arc<SyncingService<Block>>,
1028	),
1029	Error,
1030>
1031where
1032	Block: BlockT,
1033	Client: ProvideRuntimeApi<Block>
1034		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1035		+ Chain<Block>
1036		+ BlockBackend<Block>
1037		+ BlockIdTo<Block, Error = sp_blockchain::Error>
1038		+ ProofProvider<Block>
1039		+ HeaderBackend<Block>
1040		+ BlockchainEvents<Block>
1041		+ 'static,
1042	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1043	IQ: ImportQueue<Block> + 'static,
1044	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1045{
1046	let BuildNetworkParams {
1047		config,
1048		mut net_config,
1049		client,
1050		transaction_pool,
1051		spawn_handle,
1052		spawn_essential_handle,
1053		import_queue,
1054		block_announce_validator_builder,
1055		warp_sync_config,
1056		block_relay,
1057		metrics,
1058	} = params;
1059
1060	let block_announce_validator = if let Some(f) = block_announce_validator_builder {
1061		f(client.clone())
1062	} else {
1063		Box::new(DefaultBlockAnnounceValidator)
1064	};
1065
1066	let network_service_provider = NetworkServiceProvider::new();
1067	let protocol_id = config.protocol_id();
1068	let fork_id = config.chain_spec.fork_id();
1069	let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
1070
1071	let block_downloader = match block_relay {
1072		Some(params) => {
1073			let BlockRelayParams { mut server, downloader, request_response_config } = params;
1074
1075			net_config.add_request_response_protocol(request_response_config);
1076
1077			spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1078				server.run().await;
1079			});
1080
1081			downloader
1082		},
1083		None => build_default_block_downloader(
1084			&protocol_id,
1085			fork_id,
1086			&mut net_config,
1087			network_service_provider.handle(),
1088			Arc::clone(&client),
1089			config.network.default_peers_set.in_peers as usize +
1090				config.network.default_peers_set.out_peers as usize,
1091			&spawn_handle,
1092		),
1093	};
1094
1095	let syncing_strategy = build_polkadot_syncing_strategy(
1096		protocol_id.clone(),
1097		fork_id,
1098		&mut net_config,
1099		warp_sync_config,
1100		block_downloader,
1101		client.clone(),
1102		&spawn_handle,
1103		metrics_registry,
1104		config.blocks_pruning.is_archive(),
1105	)?;
1106
1107	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1108		Roles::from(&config.role),
1109		Arc::clone(&client),
1110		metrics_registry,
1111		metrics.clone(),
1112		&net_config,
1113		protocol_id.clone(),
1114		fork_id,
1115		block_announce_validator,
1116		syncing_strategy,
1117		network_service_provider.handle(),
1118		import_queue.service(),
1119		net_config.peer_store_handle(),
1120	)?;
1121
1122	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1123
1124	build_network_advanced(BuildNetworkAdvancedParams {
1125		role: config.role,
1126		protocol_id,
1127		fork_id,
1128		announce_block: config.announce_block,
1129		net_config,
1130		client,
1131		transaction_pool,
1132		spawn_handle,
1133		spawn_essential_handle,
1134		import_queue,
1135		sync_service,
1136		block_announce_config,
1137		network_service_provider,
1138		metrics_registry,
1139		metrics,
1140		blocks_pruning: config.blocks_pruning,
1141	})
1142}
1143
1144/// Parameters to pass into [`build_network_advanced`].
1145pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
1146where
1147	Block: BlockT,
1148	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1149{
1150	/// Role of the local node.
1151	pub role: Role,
1152	/// Protocol name prefix.
1153	pub protocol_id: ProtocolId,
1154	/// Fork ID.
1155	pub fork_id: Option<&'a str>,
1156	/// Announce block automatically after they have been imported.
1157	pub announce_block: bool,
1158	/// Full network configuration.
1159	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1160	/// A shared client returned by `new_full_parts`.
1161	pub client: Arc<Client>,
1162	/// A shared transaction pool.
1163	pub transaction_pool: Arc<TxPool>,
1164	/// A handle for spawning tasks.
1165	pub spawn_handle: SpawnTaskHandle,
1166	/// A handle for spawning essential tasks.
1167	pub spawn_essential_handle: SpawnEssentialTaskHandle,
1168	/// An import queue.
1169	pub import_queue: IQ,
1170	/// Syncing service to communicate with syncing engine.
1171	pub sync_service: SyncingService<Block>,
1172	/// Block announce config.
1173	pub block_announce_config: Net::NotificationProtocolConfig,
1174	/// Network service provider to drive with network internally.
1175	pub network_service_provider: NetworkServiceProvider,
1176	/// Prometheus metrics registry.
1177	pub metrics_registry: Option<&'a Registry>,
1178	/// Metrics.
1179	pub metrics: NotificationMetrics,
1180	/// Block pruning configuration.
1181	pub blocks_pruning: BlocksPruning,
1182}
1183
1184/// Build the network service, the network status sinks and an RPC sender, this is a lower-level
1185/// version of [`build_network`] for those needing more control.
1186pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
1187	params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
1188) -> Result<
1189	(
1190		Arc<dyn sc_network::service::traits::NetworkService>,
1191		TracingUnboundedSender<sc_rpc::system::Request<Block>>,
1192		sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1193		Arc<SyncingService<Block>>,
1194	),
1195	Error,
1196>
1197where
1198	Block: BlockT,
1199	Client: ProvideRuntimeApi<Block>
1200		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1201		+ Chain<Block>
1202		+ BlockBackend<Block>
1203		+ BlockIdTo<Block, Error = sp_blockchain::Error>
1204		+ ProofProvider<Block>
1205		+ HeaderBackend<Block>
1206		+ BlockchainEvents<Block>
1207		+ 'static,
1208	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1209	IQ: ImportQueue<Block> + 'static,
1210	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1211{
1212	let BuildNetworkAdvancedParams {
1213		role,
1214		protocol_id,
1215		fork_id,
1216		announce_block,
1217		mut net_config,
1218		client,
1219		transaction_pool,
1220		spawn_handle,
1221		spawn_essential_handle,
1222		import_queue,
1223		sync_service,
1224		block_announce_config,
1225		network_service_provider,
1226		metrics_registry,
1227		metrics,
1228		blocks_pruning,
1229	} = params;
1230
1231	let genesis_hash = client.info().genesis_hash;
1232
1233	let light_client_request_protocol_config = {
1234		// Allow both outgoing and incoming requests.
1235		let (handler, protocol_config) =
1236			LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1237		spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1238		protocol_config
1239	};
1240
1241	// install request handlers to `FullNetworkConfiguration`
1242	net_config.add_request_response_protocol(light_client_request_protocol_config);
1243
1244	// Initialize IPFS server.
1245	let ipfs_config = net_config.network_config.ipfs_server.then(|| {
1246		let (handler, bitswap_config) = Net::bitswap_server(client.clone());
1247		spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1248
1249		let ipfs_num_blocks = match blocks_pruning {
1250			BlocksPruning::KeepAll | BlocksPruning::KeepFinalized => IPFS_MAX_BLOCKS,
1251			BlocksPruning::Some(num) => std::cmp::min(num, IPFS_MAX_BLOCKS),
1252		};
1253
1254		IpfsConfig {
1255			bitswap_config,
1256			block_provider: Box::new(IpfsIndexedTransactions::new(client.clone(), ipfs_num_blocks)),
1257			bootnodes: net_config.network_config.ipfs_bootnodes.clone(),
1258		}
1259	});
1260
1261	// Create transactions protocol and add it to the list of supported protocols of
1262	let (transactions_handler_proto, transactions_config) =
1263		sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1264			protocol_id.clone(),
1265			genesis_hash,
1266			fork_id,
1267			metrics.clone(),
1268			net_config.peer_store_handle(),
1269		);
1270	net_config.add_notification_protocol(transactions_config);
1271
1272	// Start task for `PeerStore`
1273	let peer_store = net_config.take_peer_store();
1274	spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1275
1276	let sync_service = Arc::new(sync_service);
1277
1278	let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1279		role,
1280		executor: {
1281			let spawn_handle = Clone::clone(&spawn_handle);
1282			Box::new(move |fut| {
1283				spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1284			})
1285		},
1286		network_config: net_config,
1287		genesis_hash,
1288		protocol_id,
1289		fork_id: fork_id.map(ToOwned::to_owned),
1290		metrics_registry: metrics_registry.cloned(),
1291		block_announce_config,
1292		ipfs_config,
1293		notification_metrics: metrics,
1294	};
1295
1296	let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1297	let network_mut = Net::new(network_params)?;
1298	let network = network_mut.network_service().clone();
1299
1300	let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1301		network.clone(),
1302		sync_service.clone(),
1303		Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1304		metrics_registry,
1305	)?;
1306	spawn_handle.spawn_blocking(
1307		"network-transactions-handler",
1308		Some("networking"),
1309		tx_handler.run(),
1310	);
1311
1312	spawn_handle.spawn_blocking(
1313		"chain-sync-network-service-provider",
1314		Some("networking"),
1315		network_service_provider.run(Arc::new(network.clone())),
1316	);
1317	spawn_handle.spawn("import-queue", None, {
1318		let sync_service = sync_service.clone();
1319
1320		async move { import_queue.run(sync_service.as_ref()).await }
1321	});
1322
1323	let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1324	spawn_handle.spawn(
1325		"system-rpc-handler",
1326		Some("networking"),
1327		build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1328			role,
1329			network_mut.network_service(),
1330			sync_service.clone(),
1331			client.clone(),
1332			system_rpc_rx,
1333			has_bootnodes,
1334		),
1335	);
1336
1337	let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1338		network_mut,
1339		client,
1340		sync_service.clone(),
1341		announce_block,
1342	);
1343
1344	// The network worker is responsible for gathering all network messages and processing
1345	// them. This is quite a heavy task, and at the time of the writing of this comment it
1346	// frequently happens that this future takes several seconds or in some situations
1347	// even more than a minute until it has processed its entire queue. This is clearly an
1348	// issue, and ideally we would like to fix the network future to take as little time as
1349	// possible, but we also take the extra harm-prevention measure to execute the networking
1350	// future using `spawn_blocking`.
1351	//
1352	// The network worker is spawned as an essential task, meaning if it exits unexpectedly
1353	// the service will shut down.
1354	spawn_essential_handle.spawn_blocking("network-worker", Some("networking"), future);
1355
1356	Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone()))
1357}
1358
1359/// Configuration for [`build_default_syncing_engine`].
1360pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1361where
1362	Block: BlockT,
1363	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1364{
1365	/// Role of the local node.
1366	pub role: Role,
1367	/// Protocol name prefix.
1368	pub protocol_id: ProtocolId,
1369	/// Fork ID.
1370	pub fork_id: Option<&'a str>,
1371	/// Full network configuration.
1372	pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1373	/// Validator for incoming block announcements.
1374	pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1375	/// Handle to communicate with `NetworkService`.
1376	pub network_service_handle: NetworkServiceHandle,
1377	/// Warp sync configuration (when used).
1378	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1379	/// A shared client returned by `new_full_parts`.
1380	pub client: Arc<Client>,
1381	/// Blocks import queue API.
1382	pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1383	/// Expected max total number of peer connections (in + out).
1384	pub num_peers_hint: usize,
1385	/// A handle for spawning tasks.
1386	pub spawn_handle: &'a SpawnTaskHandle,
1387	/// Prometheus metrics registry.
1388	pub metrics_registry: Option<&'a Registry>,
1389	/// Metrics.
1390	pub metrics: NotificationMetrics,
1391	/// Whether to archive blocks. When `true`, gap sync requests bodies to maintain complete
1392	/// block history.
1393	pub archive_blocks: bool,
1394}
1395
1396/// Build default syncing engine using [`build_default_block_downloader`] and
1397/// [`build_polkadot_syncing_strategy`] internally.
1398pub fn build_default_syncing_engine<Block, Client, Net>(
1399	config: DefaultSyncingEngineConfig<Block, Client, Net>,
1400) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1401where
1402	Block: BlockT,
1403	Client: HeaderBackend<Block>
1404		+ BlockBackend<Block>
1405		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1406		+ ProofProvider<Block>
1407		+ Send
1408		+ Sync
1409		+ 'static,
1410	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1411{
1412	let DefaultSyncingEngineConfig {
1413		role,
1414		protocol_id,
1415		fork_id,
1416		net_config,
1417		block_announce_validator,
1418		network_service_handle,
1419		warp_sync_config,
1420		client,
1421		import_queue_service,
1422		num_peers_hint,
1423		spawn_handle,
1424		metrics_registry,
1425		metrics,
1426		archive_blocks,
1427	} = config;
1428
1429	let block_downloader = build_default_block_downloader(
1430		&protocol_id,
1431		fork_id,
1432		net_config,
1433		network_service_handle.clone(),
1434		client.clone(),
1435		num_peers_hint,
1436		spawn_handle,
1437	);
1438	let syncing_strategy = build_polkadot_syncing_strategy(
1439		protocol_id.clone(),
1440		fork_id,
1441		net_config,
1442		warp_sync_config,
1443		block_downloader,
1444		client.clone(),
1445		spawn_handle,
1446		metrics_registry,
1447		archive_blocks,
1448	)?;
1449
1450	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1451		Roles::from(&role),
1452		client,
1453		metrics_registry,
1454		metrics,
1455		&net_config,
1456		protocol_id,
1457		fork_id,
1458		block_announce_validator,
1459		syncing_strategy,
1460		network_service_handle,
1461		import_queue_service,
1462		net_config.peer_store_handle(),
1463	)?;
1464
1465	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1466
1467	Ok((sync_service, block_announce_config))
1468}
1469
1470/// Build default block downloader
1471pub fn build_default_block_downloader<Block, Client, Net>(
1472	protocol_id: &ProtocolId,
1473	fork_id: Option<&str>,
1474	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1475	network_service_handle: NetworkServiceHandle,
1476	client: Arc<Client>,
1477	num_peers_hint: usize,
1478	spawn_handle: &SpawnTaskHandle,
1479) -> Arc<dyn BlockDownloader<Block>>
1480where
1481	Block: BlockT,
1482	Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1483	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1484{
1485	// Custom protocol was not specified, use the default block handler.
1486	// Allow both outgoing and incoming requests.
1487	let BlockRelayParams { mut server, downloader, request_response_config } =
1488		BlockRequestHandler::new::<Net>(
1489			network_service_handle,
1490			&protocol_id,
1491			fork_id,
1492			client.clone(),
1493			num_peers_hint,
1494		);
1495
1496	spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1497		server.run().await;
1498	});
1499
1500	net_config.add_request_response_protocol(request_response_config);
1501
1502	downloader
1503}
1504
1505/// Build standard polkadot syncing strategy
1506pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1507	protocol_id: ProtocolId,
1508	fork_id: Option<&str>,
1509	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1510	warp_sync_config: Option<WarpSyncConfig<Block>>,
1511	block_downloader: Arc<dyn BlockDownloader<Block>>,
1512	client: Arc<Client>,
1513	spawn_handle: &SpawnTaskHandle,
1514	metrics_registry: Option<&Registry>,
1515	archive_blocks: bool,
1516) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1517where
1518	Block: BlockT,
1519	Client: HeaderBackend<Block>
1520		+ BlockBackend<Block>
1521		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1522		+ ProofProvider<Block>
1523		+ Send
1524		+ Sync
1525		+ 'static,
1526	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1527{
1528	if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1529		return Err("Warp sync enabled, but no warp sync provider configured.".into());
1530	}
1531
1532	if client.requires_full_sync() {
1533		match net_config.network_config.sync_mode {
1534			SyncMode::LightState { .. } => {
1535				return Err("Fast sync doesn't work for archive nodes".into())
1536			},
1537			SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1538			SyncMode::Full => {},
1539		}
1540	}
1541
1542	let genesis_hash = client.info().genesis_hash;
1543
1544	let (state_request_protocol_config, state_request_protocol_name) = {
1545		let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1546			net_config.network_config.default_peers_set.reserved_nodes.len();
1547		// Allow both outgoing and incoming requests.
1548		let (handler, protocol_config) =
1549			StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1550		let config_name = protocol_config.protocol_name().clone();
1551
1552		spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1553		(protocol_config, config_name)
1554	};
1555	net_config.add_request_response_protocol(state_request_protocol_config);
1556
1557	let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1558		Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1559			// Allow both outgoing and incoming requests.
1560			let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1561				protocol_id,
1562				genesis_hash,
1563				fork_id,
1564				warp_with_provider.clone(),
1565			);
1566			let config_name = protocol_config.protocol_name().clone();
1567
1568			spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1569			(Some(protocol_config), Some(config_name))
1570		},
1571		_ => (None, None),
1572	};
1573	if let Some(config) = warp_sync_protocol_config {
1574		net_config.add_request_response_protocol(config);
1575	}
1576
1577	let syncing_config = PolkadotSyncingStrategyConfig {
1578		mode: net_config.network_config.sync_mode,
1579		max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1580		max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1581		min_peers_to_start_warp_sync: net_config.network_config.min_peers_to_start_warp_sync,
1582		metrics_registry: metrics_registry.cloned(),
1583		state_request_protocol_name,
1584		block_downloader,
1585		archive_blocks,
1586	};
1587	Ok(Box::new(PolkadotSyncingStrategy::new(
1588		syncing_config,
1589		client,
1590		warp_sync_config,
1591		warp_sync_protocol_name,
1592	)?))
1593}