Skip to main content

sc_service/
builder.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	build_network_future, build_system_rpc_future,
21	client::{Client, ClientConfig},
22	config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23	error::Error,
24	metrics::MetricsService,
25	start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers,
26	SpawnEssentialTaskHandle, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
27};
28use futures::{select, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::{debug, error, info};
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34	execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35	BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, KeysIter, StorageProvider,
36	TrieCacheContext, UsageProvider,
37};
38use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
39use sc_consensus::import_queue::{ImportQueue, ImportQueueService};
40use sc_executor::{
41	sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
42	WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
43};
44use sc_keystore::LocalKeystore;
45use sc_network::{
46	config::{FullNetworkConfiguration, ProtocolId, SyncMode},
47	multiaddr::Protocol,
48	service::{
49		traits::{PeerStore, RequestResponseConfig},
50		NotificationMetrics,
51	},
52	NetworkBackend, NetworkStateInfo,
53};
54use sc_network_common::role::{Role, Roles};
55use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
56use sc_network_sync::{
57	block_relay_protocol::{BlockDownloader, BlockRelayParams},
58	block_request_handler::BlockRequestHandler,
59	engine::SyncingEngine,
60	service::network::{NetworkServiceHandle, NetworkServiceProvider},
61	state_request_handler::StateRequestHandler,
62	strategy::{
63		polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
64		SyncingStrategy,
65	},
66	warp_request_handler::RequestHandler as WarpSyncRequestHandler,
67	SyncingService, WarpSyncConfig,
68};
69use sc_rpc::{
70	author::AuthorApiServer,
71	chain::ChainApiServer,
72	offchain::OffchainApiServer,
73	state::{ChildStateApiServer, StateApiServer},
74	system::SystemApiServer,
75	DenyUnsafe, SubscriptionTaskExecutor,
76};
77use sc_rpc_spec_v2::{
78	archive::ArchiveApiServer,
79	chain_head::ChainHeadApiServer,
80	chain_spec::ChainSpecApiServer,
81	transaction::{TransactionApiServer, TransactionBroadcastApiServer},
82};
83use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
84use sc_tracing::block::TracingExecuteBlock;
85use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
86use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
87use sp_api::{CallApiAt, ProvideRuntimeApi};
88use sp_blockchain::{HeaderBackend, HeaderMetadata};
89use sp_consensus::block_validation::{
90	BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
91};
92use sp_core::traits::{CodeExecutor, SpawnNamed};
93use sp_keystore::KeystorePtr;
94use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
95use sp_storage::{ChildInfo, ChildType, PrefixedStorageKey};
96use std::{
97	str::FromStr,
98	sync::Arc,
99	time::{Duration, SystemTime},
100};
101
102/// Full client type.
103pub type TFullClient<TBl, TRtApi, TExec> =
104	Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
105
106/// Full client backend type.
107pub type TFullBackend<TBl> = Backend<TBl>;
108
109/// Full client call executor type.
110pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
111
112type TFullParts<TBl, TRtApi, TExec> =
113	(TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
114
115/// Construct a local keystore shareable container
116pub struct KeystoreContainer(Arc<LocalKeystore>);
117
118impl KeystoreContainer {
119	/// Construct KeystoreContainer
120	pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
121		let keystore = Arc::new(match config {
122			KeystoreConfig::Path { path, password } => {
123				LocalKeystore::open(path.clone(), password.clone())?
124			},
125			KeystoreConfig::InMemory => LocalKeystore::in_memory(),
126		});
127
128		Ok(Self(keystore))
129	}
130
131	/// Returns a shared reference to a dynamic `Keystore` trait implementation.
132	pub fn keystore(&self) -> KeystorePtr {
133		self.0.clone()
134	}
135
136	/// Returns a shared reference to the local keystore .
137	pub fn local_keystore(&self) -> Arc<LocalKeystore> {
138		self.0.clone()
139	}
140}
141
142/// Creates a new full client for the given config.
143pub fn new_full_client<TBl, TRtApi, TExec>(
144	config: &Configuration,
145	telemetry: Option<TelemetryHandle>,
146	executor: TExec,
147	pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
148) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
149where
150	TBl: BlockT,
151	TExec: CodeExecutor + RuntimeVersionOf + Clone,
152{
153	new_full_parts(config, telemetry, executor, pruning_filters).map(|parts| parts.0)
154}
155
156/// Create the initial parts of a full node with the default genesis block builder.
157///
158/// The `pruning_filters` parameter allows configuring which blocks should be preserved
159/// during pruning.
160pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
161	config: &Configuration,
162	telemetry: Option<TelemetryHandle>,
163	executor: TExec,
164	enable_import_proof_recording: bool,
165	pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
166) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
167where
168	TBl: BlockT,
169	TExec: CodeExecutor + RuntimeVersionOf + Clone,
170{
171	let mut db_config = config.db_config();
172	db_config.pruning_filters = pruning_filters;
173	let backend = new_db_backend(db_config)?;
174
175	let genesis_block_builder = GenesisBlockBuilder::new(
176		config.chain_spec.as_storage_builder(),
177		!config.no_genesis(),
178		backend.clone(),
179		executor.clone(),
180	)?;
181
182	new_full_parts_with_genesis_builder(
183		config,
184		telemetry,
185		executor,
186		backend,
187		genesis_block_builder,
188		enable_import_proof_recording,
189	)
190}
191
192/// Create the initial parts of a full node with the default genesis block builder.
193///
194/// The `pruning_filters` parameter allows configuring which blocks should be preserved
195/// during pruning.
196pub fn new_full_parts<TBl, TRtApi, TExec>(
197	config: &Configuration,
198	telemetry: Option<TelemetryHandle>,
199	executor: TExec,
200	pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
201) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
202where
203	TBl: BlockT,
204	TExec: CodeExecutor + RuntimeVersionOf + Clone,
205{
206	new_full_parts_record_import(config, telemetry, executor, false, pruning_filters)
207}
208
209/// Create the initial parts of a full node.
210pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
211	config: &Configuration,
212	telemetry: Option<TelemetryHandle>,
213	executor: TExec,
214	backend: Arc<TFullBackend<TBl>>,
215	genesis_block_builder: TBuildGenesisBlock,
216	enable_import_proof_recording: bool,
217) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
218where
219	TBl: BlockT,
220	TExec: CodeExecutor + RuntimeVersionOf + Clone,
221	TBuildGenesisBlock: BuildGenesisBlock<
222		TBl,
223		BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
224	>,
225{
226	let keystore_container = KeystoreContainer::new(&config.keystore)?;
227
228	let task_manager = {
229		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
230		TaskManager::new(config.tokio_handle.clone(), registry)?
231	};
232
233	let chain_spec = &config.chain_spec;
234	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
235		.cloned()
236		.unwrap_or_default();
237
238	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
239		.cloned()
240		.unwrap_or_default();
241
242	let client = {
243		let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
244
245		let wasm_runtime_substitutes = config
246			.chain_spec
247			.code_substitutes()
248			.into_iter()
249			.map(|(n, c)| {
250				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
251					Error::Application(Box::from(format!(
252						"Failed to parse `{}` as block number for code substitutes. \
253						 In an old version the key for code substitute was a block hash. \
254						 Please update the chain spec to a version that is compatible with your node.",
255						n
256					)))
257				})?;
258				Ok((number, c))
259			})
260			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
261
262		let client = new_client(
263			backend.clone(),
264			executor,
265			genesis_block_builder,
266			fork_blocks,
267			bad_blocks,
268			extensions,
269			Box::new(task_manager.spawn_handle()),
270			config.prometheus_config.as_ref().map(|config| config.registry.clone()),
271			telemetry,
272			ClientConfig {
273				offchain_worker_enabled: config.offchain_worker.enabled,
274				offchain_indexing_api: config.offchain_worker.indexing_enabled,
275				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
276				no_genesis: config.no_genesis(),
277				wasm_runtime_substitutes,
278				enable_import_proof_recording,
279			},
280		)?;
281
282		if let Some(warm_up_strategy) = config.warm_up_trie_cache {
283			let storage_root = client.usage_info().chain.best_hash;
284			let backend_clone = backend.clone();
285
286			if warm_up_strategy.is_blocking() {
287				// We use the blocking strategy for testing purposes.
288				// So better to error out if it fails.
289				warm_up_trie_cache(backend_clone, storage_root)?;
290			} else {
291				task_manager.spawn_handle().spawn_blocking(
292					"warm-up-trie-cache",
293					None,
294					async move {
295						if let Err(e) = warm_up_trie_cache(backend_clone, storage_root) {
296							error!("Failed to warm up trie cache: {e}");
297						}
298					},
299				);
300			}
301		}
302
303		client
304	};
305
306	Ok((client, backend, keystore_container, task_manager))
307}
308
309fn child_info(key: Vec<u8>) -> Option<ChildInfo> {
310	let prefixed_key = PrefixedStorageKey::new(key);
311	ChildType::from_prefixed_key(&prefixed_key).and_then(|(child_type, storage_key)| {
312		(child_type == ChildType::ParentKeyId).then(|| ChildInfo::new_default(storage_key))
313	})
314}
315
316fn warm_up_trie_cache<TBl: BlockT>(
317	backend: Arc<TFullBackend<TBl>>,
318	storage_root: TBl::Hash,
319) -> Result<(), Error> {
320	use sc_client_api::backend::Backend;
321	use sp_state_machine::Backend as StateBackend;
322
323	let untrusted_state = || backend.state_at(storage_root, TrieCacheContext::Untrusted);
324	let trusted_state = || backend.state_at(storage_root, TrieCacheContext::Trusted);
325
326	debug!("Populating trie cache started",);
327	let start_time = std::time::Instant::now();
328	let mut keys_count = 0;
329	let mut child_keys_count = 0;
330	for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None)? {
331		if keys_count != 0 && keys_count % 100_000 == 0 {
332			debug!("{} keys and {} child keys have been warmed", keys_count, child_keys_count);
333		}
334		match child_info(key.0.clone()) {
335			Some(info) => {
336				for child_key in
337					KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None)?
338				{
339					if trusted_state()?
340						.child_storage(&info, &child_key.0)
341						.unwrap_or_default()
342						.is_none()
343					{
344						debug!("Child storage value unexpectedly empty: {child_key:?}");
345					}
346					child_keys_count += 1;
347				}
348			},
349			None => {
350				if trusted_state()?.storage(&key.0).unwrap_or_default().is_none() {
351					debug!("Storage value unexpectedly empty: {key:?}");
352				}
353				keys_count += 1;
354			},
355		}
356	}
357	debug!(
358		"Trie cache populated with {keys_count} keys and {child_keys_count} child keys in {} s",
359		start_time.elapsed().as_secs_f32()
360	);
361
362	Ok(())
363}
364
365/// Creates a [`NativeElseWasmExecutor`](sc_executor::NativeElseWasmExecutor) according to
366/// [`Configuration`].
367#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
368#[allow(deprecated)]
369pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
370	config: &Configuration,
371) -> sc_executor::NativeElseWasmExecutor<D> {
372	#[allow(deprecated)]
373	sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
374}
375
376/// Creates a [`WasmExecutor`] according to [`ExecutorConfiguration`].
377pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
378	let strategy = config
379		.default_heap_pages
380		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
381	WasmExecutor::<H>::builder()
382		.with_execution_method(config.wasm_method)
383		.with_onchain_heap_alloc_strategy(strategy)
384		.with_offchain_heap_alloc_strategy(strategy)
385		.with_max_runtime_instances(config.max_runtime_instances)
386		.with_runtime_cache_size(config.runtime_cache_size)
387		.build()
388}
389
390/// Create an instance of the default DB-backend.
391///
392/// Pruning filters can be configured via `settings.pruning_filters`.
393/// If any filter returns `true` for a block's justifications, the block will not be pruned.
394pub fn new_db_backend<Block>(
395	settings: DatabaseSettings,
396) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
397where
398	Block: BlockT,
399{
400	const CANONICALIZATION_DELAY: u64 = 4096;
401
402	Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
403}
404
405/// Create an instance of client backed by given backend.
406pub fn new_client<E, Block, RA, G>(
407	backend: Arc<Backend<Block>>,
408	executor: E,
409	genesis_block_builder: G,
410	fork_blocks: ForkBlocks<Block>,
411	bad_blocks: BadBlocks<Block>,
412	execution_extensions: ExecutionExtensions<Block>,
413	spawn_handle: Box<dyn SpawnNamed>,
414	prometheus_registry: Option<Registry>,
415	telemetry: Option<TelemetryHandle>,
416	config: ClientConfig<Block>,
417) -> Result<
418	Client<
419		Backend<Block>,
420		crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
421		Block,
422		RA,
423	>,
424	sp_blockchain::Error,
425>
426where
427	Block: BlockT,
428	E: CodeExecutor + RuntimeVersionOf,
429	G: BuildGenesisBlock<
430		Block,
431		BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
432	>,
433{
434	let executor = crate::client::LocalCallExecutor::new(
435		backend.clone(),
436		executor,
437		config.clone(),
438		execution_extensions,
439	)?;
440
441	Client::new(
442		backend,
443		executor,
444		spawn_handle,
445		genesis_block_builder,
446		fork_blocks,
447		bad_blocks,
448		prometheus_registry,
449		telemetry,
450		config,
451	)
452}
453
454/// Parameters to pass into `build`.
455pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
456	/// The service configuration.
457	pub config: Configuration,
458	/// A shared client returned by `new_full_parts`.
459	pub client: Arc<TCl>,
460	/// A shared backend returned by `new_full_parts`.
461	pub backend: Arc<Backend>,
462	/// A task manager returned by `new_full_parts`.
463	pub task_manager: &'a mut TaskManager,
464	/// A shared keystore returned by `new_full_parts`.
465	pub keystore: KeystorePtr,
466	/// A shared transaction pool.
467	pub transaction_pool: Arc<TExPool>,
468	/// Builds additional [`RpcModule`]s that should be added to the server
469	pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
470	/// A shared network instance.
471	pub network: Arc<dyn sc_network::service::traits::NetworkService>,
472	/// A Sender for RPC requests.
473	pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
474	/// Controller for transactions handlers
475	pub tx_handler_controller:
476		sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
477	/// Syncing service.
478	pub sync_service: Arc<SyncingService<TBl>>,
479	/// Telemetry instance for this node.
480	pub telemetry: Option<&'a mut Telemetry>,
481	/// Optional [`TracingExecuteBlock`] handle.
482	///
483	/// Will be used by the `trace_block` RPC to execute the actual block.
484	pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
485}
486
487/// Spawn the tasks that are required to run a node.
488pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
489	SpawnTasksParams {
490		mut config,
491		task_manager,
492		client,
493		backend,
494		keystore,
495		transaction_pool,
496		rpc_builder,
497		network,
498		system_rpc_tx,
499		tx_handler_controller,
500		sync_service,
501		telemetry,
502		tracing_execute_block: execute_block,
503	}: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
504) -> Result<RpcHandlers, Error>
505where
506	TCl: ProvideRuntimeApi<TBl>
507		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
508		+ Chain<TBl>
509		+ BlockBackend<TBl>
510		+ BlockIdTo<TBl, Error = sp_blockchain::Error>
511		+ ProofProvider<TBl>
512		+ HeaderBackend<TBl>
513		+ BlockchainEvents<TBl>
514		+ ExecutorProvider<TBl>
515		+ UsageProvider<TBl>
516		+ StorageProvider<TBl, TBackend>
517		+ CallApiAt<TBl>
518		+ Send
519		+ 'static,
520	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
521		+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
522		+ sp_session::SessionKeys<TBl>
523		+ sp_api::ApiExt<TBl>,
524	TBl: BlockT,
525	TBl::Hash: Unpin,
526	TBl::Header: Unpin,
527	TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
528	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
529{
530	let chain_info = client.usage_info().chain;
531
532	sp_session::generate_initial_session_keys(
533		client.clone(),
534		chain_info.best_hash,
535		config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
536		keystore.clone(),
537	)
538	.map_err(|e| Error::Application(Box::new(e)))?;
539
540	let sysinfo = sc_sysinfo::gather_sysinfo();
541	sc_sysinfo::print_sysinfo(&sysinfo);
542
543	let telemetry = telemetry
544		.map(|telemetry| {
545			init_telemetry(
546				config.network.node_name.clone(),
547				config.impl_name.clone(),
548				config.impl_version.clone(),
549				config.chain_spec.name().to_string(),
550				config.role.is_authority(),
551				network.clone(),
552				client.clone(),
553				telemetry,
554				Some(sysinfo),
555			)
556		})
557		.transpose()?;
558
559	info!("📦 Highest known block at #{}", chain_info.best_number);
560
561	let spawn_handle = task_manager.spawn_handle();
562
563	// Inform the tx pool about imported and finalized blocks.
564	spawn_handle.spawn(
565		"txpool-notifications",
566		Some("transaction-pool"),
567		sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
568	);
569
570	spawn_handle.spawn(
571		"on-transaction-imported",
572		Some("transaction-pool"),
573		propagate_transaction_notifications(
574			transaction_pool.clone(),
575			tx_handler_controller,
576			telemetry.clone(),
577		),
578	);
579
580	// Prometheus metrics.
581	let metrics_service =
582		if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
583			// Set static metrics.
584			let metrics = MetricsService::with_prometheus(
585				telemetry,
586				&registry,
587				config.role,
588				&config.network.node_name,
589				&config.impl_version,
590			)?;
591			spawn_handle.spawn(
592				"prometheus-endpoint",
593				None,
594				prometheus_endpoint::init_prometheus(port, registry).map(drop),
595			);
596
597			metrics
598		} else {
599			MetricsService::new(telemetry)
600		};
601
602	// Periodically updated metrics and telemetry updates.
603	spawn_handle.spawn(
604		"telemetry-periodic-send",
605		None,
606		metrics_service.run(
607			client.clone(),
608			transaction_pool.clone(),
609			network.clone(),
610			sync_service.clone(),
611		),
612	);
613
614	let rpc_id_provider = config.rpc.id_provider.take();
615
616	// jsonrpsee RPC
617	// RPC-V2 specific metrics need to be registered before the RPC server is started,
618	// since we might have two instances running (one for the in-memory RPC and one for the network
619	// RPC).
620	let rpc_v2_metrics = config
621		.prometheus_registry()
622		.map(|registry| sc_rpc_spec_v2::transaction::TransactionMetrics::new(registry))
623		.transpose()?;
624
625	let gen_rpc_module = || {
626		gen_rpc_module(GenRpcModuleParams {
627			spawn_handle: task_manager.spawn_handle(),
628			client: client.clone(),
629			transaction_pool: transaction_pool.clone(),
630			keystore: keystore.clone(),
631			system_rpc_tx: system_rpc_tx.clone(),
632			impl_name: config.impl_name.clone(),
633			impl_version: config.impl_version.clone(),
634			chain_spec: config.chain_spec.as_ref(),
635			state_pruning: &config.state_pruning,
636			blocks_pruning: config.blocks_pruning,
637			backend: backend.clone(),
638			rpc_builder: &*rpc_builder,
639			metrics: rpc_v2_metrics.clone(),
640			tracing_execute_block: execute_block.clone(),
641		})
642	};
643
644	let rpc_server_handle = start_rpc_servers(
645		&config.rpc,
646		config.prometheus_registry(),
647		&config.tokio_handle,
648		gen_rpc_module,
649		rpc_id_provider,
650	)?;
651
652	let listen_addrs = rpc_server_handle
653		.listen_addrs()
654		.into_iter()
655		.map(|socket_addr| {
656			let mut multiaddr: Multiaddr = socket_addr.ip().into();
657			multiaddr.push(Protocol::Tcp(socket_addr.port()));
658			multiaddr
659		})
660		.collect();
661
662	let in_memory_rpc = {
663		let mut module = gen_rpc_module()?;
664		module.extensions_mut().insert(DenyUnsafe::No);
665		module
666	};
667
668	let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
669
670	// Spawn informant task
671	spawn_handle.spawn(
672		"informant",
673		None,
674		sc_informant::build(client.clone(), network, sync_service.clone()),
675	);
676
677	task_manager.keep_alive((config.base_path, rpc_server_handle));
678
679	Ok(in_memory_rpc_handle)
680}
681
682/// Returns a future that forwards imported transactions to the transaction networking protocol.
683pub async fn propagate_transaction_notifications<Block, ExPool>(
684	transaction_pool: Arc<ExPool>,
685	tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
686		<Block as BlockT>::Hash,
687	>,
688	telemetry: Option<TelemetryHandle>,
689) where
690	Block: BlockT,
691	ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
692{
693	const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);
694
695	// transaction notifications
696	let mut notifications = transaction_pool.import_notification_stream().fuse();
697	let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
698	let mut tx_imported = false;
699
700	loop {
701		select! {
702			notification = notifications.next() => {
703				let Some(hash) = notification else { return };
704
705				tx_handler_controller.propagate_transaction(hash);
706
707				tx_imported = true;
708			},
709			_ = timer => {
710				timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
711
712				if !tx_imported {
713					continue;
714				}
715
716				tx_imported = false;
717				let status = transaction_pool.status();
718
719				telemetry!(
720					telemetry;
721					SUBSTRATE_INFO;
722					"txpool.import";
723					"ready" => status.ready,
724					"future" => status.future,
725				);
726			}
727		}
728	}
729}
730
731/// Initialize telemetry with provided configuration and return telemetry handle
732pub fn init_telemetry<Block, Client, Network>(
733	name: String,
734	implementation: String,
735	version: String,
736	chain: String,
737	authority: bool,
738	network: Network,
739	client: Arc<Client>,
740	telemetry: &mut Telemetry,
741	sysinfo: Option<sc_telemetry::SysInfo>,
742) -> sc_telemetry::Result<TelemetryHandle>
743where
744	Block: BlockT,
745	Client: BlockBackend<Block>,
746	Network: NetworkStateInfo,
747{
748	let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
749	let connection_message = ConnectionMessage {
750		name,
751		implementation,
752		version,
753		target_os: sc_sysinfo::TARGET_OS.into(),
754		target_arch: sc_sysinfo::TARGET_ARCH.into(),
755		target_env: sc_sysinfo::TARGET_ENV.into(),
756		config: String::new(),
757		chain,
758		genesis_hash: format!("{:?}", genesis_hash),
759		authority,
760		startup_time: SystemTime::UNIX_EPOCH
761			.elapsed()
762			.map(|dur| dur.as_millis())
763			.unwrap_or(0)
764			.to_string(),
765		network_id: network.local_peer_id().to_base58(),
766		sysinfo,
767	};
768
769	telemetry.start_telemetry(connection_message)?;
770
771	Ok(telemetry.handle())
772}
773
774/// Parameters for [`gen_rpc_module`].
775pub struct GenRpcModuleParams<'a, TBl: BlockT, TBackend, TCl, TRpc, TExPool> {
776	/// The handle to spawn tasks.
777	pub spawn_handle: SpawnTaskHandle,
778	/// Access to the client.
779	pub client: Arc<TCl>,
780	/// The transaction pool.
781	pub transaction_pool: Arc<TExPool>,
782	/// Keystore handle.
783	pub keystore: KeystorePtr,
784	/// Sender for system requests.
785	pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
786	/// Implementation name of this node.
787	pub impl_name: String,
788	/// Implementation version of this node.
789	pub impl_version: String,
790	/// The chain spec.
791	pub chain_spec: &'a dyn ChainSpec,
792	/// Enabled pruning mode for this node.
793	pub state_pruning: &'a Option<PruningMode>,
794	/// Enabled blocks pruning mode.
795	pub blocks_pruning: BlocksPruning,
796	/// Backend of the node.
797	pub backend: Arc<TBackend>,
798	/// RPC builder.
799	pub rpc_builder: &'a dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>,
800	/// Transaction metrics handle.
801	pub metrics: Option<sc_rpc_spec_v2::transaction::TransactionMetrics>,
802	/// Optional [`TracingExecuteBlock`] handle.
803	///
804	/// Will be used by the `trace_block` RPC to execute the actual block.
805	pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
806}
807
808/// Generate RPC module using provided configuration
809pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
810	GenRpcModuleParams {
811		spawn_handle,
812		client,
813		transaction_pool,
814		keystore,
815		system_rpc_tx,
816		impl_name,
817		impl_version,
818		chain_spec,
819		state_pruning,
820		blocks_pruning,
821		backend,
822		rpc_builder,
823		metrics,
824		tracing_execute_block: execute_block,
825	}: GenRpcModuleParams<TBl, TBackend, TCl, TRpc, TExPool>,
826) -> Result<RpcModule<()>, Error>
827where
828	TBl: BlockT,
829	TCl: ProvideRuntimeApi<TBl>
830		+ BlockchainEvents<TBl>
831		+ HeaderBackend<TBl>
832		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
833		+ ExecutorProvider<TBl>
834		+ CallApiAt<TBl>
835		+ ProofProvider<TBl>
836		+ StorageProvider<TBl, TBackend>
837		+ BlockBackend<TBl>
838		+ Send
839		+ Sync
840		+ 'static,
841	TBackend: sc_client_api::backend::Backend<TBl> + 'static,
842	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
843	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
844	TBl::Hash: Unpin,
845	TBl::Header: Unpin,
846{
847	let system_info = sc_rpc::system::SystemInfo {
848		chain_name: chain_spec.name().into(),
849		impl_name,
850		impl_version,
851		properties: chain_spec.properties(),
852		chain_type: chain_spec.chain_type(),
853	};
854
855	let mut rpc_api = RpcModule::new(());
856	let task_executor = Arc::new(spawn_handle);
857
858	let (chain, state, child_state) = {
859		let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
860		let (state, child_state) =
861			sc_rpc::state::new_full(client.clone(), task_executor.clone(), execute_block);
862		let state = state.into_rpc();
863		let child_state = child_state.into_rpc();
864
865		(chain, state, child_state)
866	};
867
868	const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
869
870	let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
871		client.clone(),
872		transaction_pool.clone(),
873		task_executor.clone(),
874		MAX_TRANSACTION_PER_CONNECTION,
875	)
876	.into_rpc();
877
878	let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
879		client.clone(),
880		transaction_pool.clone(),
881		task_executor.clone(),
882		metrics,
883	)
884	.into_rpc();
885
886	let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
887		client.clone(),
888		backend.clone(),
889		task_executor.clone(),
890		// Defaults to sensible limits for the `ChainHead`.
891		sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
892	)
893	.into_rpc();
894
895	// Part of the RPC v2 spec.
896	// An archive node that can respond to the `archive` RPC-v2 queries is a node with:
897	// - state pruning in archive mode: The storage of blocks is kept around
898	// - block pruning in archive mode: The block's body is kept around
899	let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
900		blocks_pruning.is_archive();
901	let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
902	if is_archive_node {
903		let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
904			client.clone(),
905			backend.clone(),
906			genesis_hash,
907			task_executor.clone(),
908		)
909		.into_rpc();
910		rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
911	}
912
913	// ChainSpec RPC-v2.
914	let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
915		chain_spec.name().into(),
916		genesis_hash,
917		chain_spec.properties(),
918	)
919	.into_rpc();
920
921	let author = sc_rpc::author::Author::new(
922		client.clone(),
923		transaction_pool,
924		keystore,
925		task_executor.clone(),
926	)
927	.into_rpc();
928
929	let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
930
931	if let Some(storage) = backend.offchain_storage() {
932		let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
933
934		rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
935	}
936
937	// Part of the RPC v2 spec.
938	rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
939	rpc_api
940		.merge(transaction_broadcast_rpc_v2)
941		.map_err(|e| Error::Application(e.into()))?;
942	rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
943	rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
944
945	// Part of the old RPC spec.
946	rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
947	rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
948	rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
949	rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
950	rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
951	// Additional [`RpcModule`]s defined in the node to fit the specific blockchain
952	let extra_rpcs = rpc_builder(task_executor.clone())?;
953	rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
954
955	Ok(rpc_api)
956}
957
958/// Parameters to pass into [`build_network`].
959pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
960where
961	Block: BlockT,
962	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
963{
964	/// The service configuration.
965	pub config: &'a Configuration,
966	/// Full network configuration.
967	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
968	/// A shared client returned by `new_full_parts`.
969	pub client: Arc<Client>,
970	/// A shared transaction pool.
971	pub transaction_pool: Arc<TxPool>,
972	/// A handle for spawning tasks.
973	pub spawn_handle: SpawnTaskHandle,
974	/// A handle for spawning essential tasks.
975	pub spawn_essential_handle: SpawnEssentialTaskHandle,
976	/// An import queue.
977	pub import_queue: IQ,
978	/// A block announce validator builder.
979	pub block_announce_validator_builder: Option<
980		Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
981	>,
982	/// Optional warp sync config.
983	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
984	/// User specified block relay params. If not specified, the default
985	/// block request handler will be used.
986	pub block_relay: Option<BlockRelayParams<Block, Net>>,
987	/// Metrics.
988	pub metrics: NotificationMetrics,
989}
990
991/// Build the network service, the network status sinks and an RPC sender.
992pub fn build_network<Block, Net, TxPool, IQ, Client>(
993	params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
994) -> Result<
995	(
996		Arc<dyn sc_network::service::traits::NetworkService>,
997		TracingUnboundedSender<sc_rpc::system::Request<Block>>,
998		sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
999		Arc<SyncingService<Block>>,
1000	),
1001	Error,
1002>
1003where
1004	Block: BlockT,
1005	Client: ProvideRuntimeApi<Block>
1006		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1007		+ Chain<Block>
1008		+ BlockBackend<Block>
1009		+ BlockIdTo<Block, Error = sp_blockchain::Error>
1010		+ ProofProvider<Block>
1011		+ HeaderBackend<Block>
1012		+ BlockchainEvents<Block>
1013		+ 'static,
1014	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1015	IQ: ImportQueue<Block> + 'static,
1016	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1017{
1018	let BuildNetworkParams {
1019		config,
1020		mut net_config,
1021		client,
1022		transaction_pool,
1023		spawn_handle,
1024		spawn_essential_handle,
1025		import_queue,
1026		block_announce_validator_builder,
1027		warp_sync_config,
1028		block_relay,
1029		metrics,
1030	} = params;
1031
1032	let block_announce_validator = if let Some(f) = block_announce_validator_builder {
1033		f(client.clone())
1034	} else {
1035		Box::new(DefaultBlockAnnounceValidator)
1036	};
1037
1038	let network_service_provider = NetworkServiceProvider::new();
1039	let protocol_id = config.protocol_id();
1040	let fork_id = config.chain_spec.fork_id();
1041	let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
1042
1043	let block_downloader = match block_relay {
1044		Some(params) => {
1045			let BlockRelayParams { mut server, downloader, request_response_config } = params;
1046
1047			net_config.add_request_response_protocol(request_response_config);
1048
1049			spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1050				server.run().await;
1051			});
1052
1053			downloader
1054		},
1055		None => build_default_block_downloader(
1056			&protocol_id,
1057			fork_id,
1058			&mut net_config,
1059			network_service_provider.handle(),
1060			Arc::clone(&client),
1061			config.network.default_peers_set.in_peers as usize +
1062				config.network.default_peers_set.out_peers as usize,
1063			&spawn_handle,
1064		),
1065	};
1066
1067	let syncing_strategy = build_polkadot_syncing_strategy(
1068		protocol_id.clone(),
1069		fork_id,
1070		&mut net_config,
1071		warp_sync_config,
1072		block_downloader,
1073		client.clone(),
1074		&spawn_handle,
1075		metrics_registry,
1076		config.blocks_pruning.is_archive(),
1077	)?;
1078
1079	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1080		Roles::from(&config.role),
1081		Arc::clone(&client),
1082		metrics_registry,
1083		metrics.clone(),
1084		&net_config,
1085		protocol_id.clone(),
1086		fork_id,
1087		block_announce_validator,
1088		syncing_strategy,
1089		network_service_provider.handle(),
1090		import_queue.service(),
1091		net_config.peer_store_handle(),
1092	)?;
1093
1094	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1095
1096	build_network_advanced(BuildNetworkAdvancedParams {
1097		role: config.role,
1098		protocol_id,
1099		fork_id,
1100		ipfs_server: config.network.ipfs_server,
1101		announce_block: config.announce_block,
1102		net_config,
1103		client,
1104		transaction_pool,
1105		spawn_handle,
1106		spawn_essential_handle,
1107		import_queue,
1108		sync_service,
1109		block_announce_config,
1110		network_service_provider,
1111		metrics_registry,
1112		metrics,
1113	})
1114}
1115
1116/// Parameters to pass into [`build_network_advanced`].
1117pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
1118where
1119	Block: BlockT,
1120	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1121{
1122	/// Role of the local node.
1123	pub role: Role,
1124	/// Protocol name prefix.
1125	pub protocol_id: ProtocolId,
1126	/// Fork ID.
1127	pub fork_id: Option<&'a str>,
1128	/// Enable serving block data over IPFS bitswap.
1129	pub ipfs_server: bool,
1130	/// Announce block automatically after they have been imported.
1131	pub announce_block: bool,
1132	/// Full network configuration.
1133	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1134	/// A shared client returned by `new_full_parts`.
1135	pub client: Arc<Client>,
1136	/// A shared transaction pool.
1137	pub transaction_pool: Arc<TxPool>,
1138	/// A handle for spawning tasks.
1139	pub spawn_handle: SpawnTaskHandle,
1140	/// A handle for spawning essential tasks.
1141	pub spawn_essential_handle: SpawnEssentialTaskHandle,
1142	/// An import queue.
1143	pub import_queue: IQ,
1144	/// Syncing service to communicate with syncing engine.
1145	pub sync_service: SyncingService<Block>,
1146	/// Block announce config.
1147	pub block_announce_config: Net::NotificationProtocolConfig,
1148	/// Network service provider to drive with network internally.
1149	pub network_service_provider: NetworkServiceProvider,
1150	/// Prometheus metrics registry.
1151	pub metrics_registry: Option<&'a Registry>,
1152	/// Metrics.
1153	pub metrics: NotificationMetrics,
1154}
1155
1156/// Build the network service, the network status sinks and an RPC sender, this is a lower-level
1157/// version of [`build_network`] for those needing more control.
1158pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
1159	params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
1160) -> Result<
1161	(
1162		Arc<dyn sc_network::service::traits::NetworkService>,
1163		TracingUnboundedSender<sc_rpc::system::Request<Block>>,
1164		sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1165		Arc<SyncingService<Block>>,
1166	),
1167	Error,
1168>
1169where
1170	Block: BlockT,
1171	Client: ProvideRuntimeApi<Block>
1172		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1173		+ Chain<Block>
1174		+ BlockBackend<Block>
1175		+ BlockIdTo<Block, Error = sp_blockchain::Error>
1176		+ ProofProvider<Block>
1177		+ HeaderBackend<Block>
1178		+ BlockchainEvents<Block>
1179		+ 'static,
1180	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1181	IQ: ImportQueue<Block> + 'static,
1182	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1183{
1184	let BuildNetworkAdvancedParams {
1185		role,
1186		protocol_id,
1187		fork_id,
1188		ipfs_server,
1189		announce_block,
1190		mut net_config,
1191		client,
1192		transaction_pool,
1193		spawn_handle,
1194		spawn_essential_handle,
1195		import_queue,
1196		sync_service,
1197		block_announce_config,
1198		network_service_provider,
1199		metrics_registry,
1200		metrics,
1201	} = params;
1202
1203	let genesis_hash = client.info().genesis_hash;
1204
1205	let light_client_request_protocol_config = {
1206		// Allow both outgoing and incoming requests.
1207		let (handler, protocol_config) =
1208			LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1209		spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1210		protocol_config
1211	};
1212
1213	// install request handlers to `FullNetworkConfiguration`
1214	net_config.add_request_response_protocol(light_client_request_protocol_config);
1215
1216	let bitswap_config = ipfs_server.then(|| {
1217		let (handler, config) = Net::bitswap_server(client.clone());
1218		spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1219
1220		config
1221	});
1222
1223	// Create transactions protocol and add it to the list of supported protocols of
1224	let (transactions_handler_proto, transactions_config) =
1225		sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1226			protocol_id.clone(),
1227			genesis_hash,
1228			fork_id,
1229			metrics.clone(),
1230			net_config.peer_store_handle(),
1231		);
1232	net_config.add_notification_protocol(transactions_config);
1233
1234	// Start task for `PeerStore`
1235	let peer_store = net_config.take_peer_store();
1236	spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1237
1238	let sync_service = Arc::new(sync_service);
1239
1240	let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1241		role,
1242		executor: {
1243			let spawn_handle = Clone::clone(&spawn_handle);
1244			Box::new(move |fut| {
1245				spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1246			})
1247		},
1248		network_config: net_config,
1249		genesis_hash,
1250		protocol_id,
1251		fork_id: fork_id.map(ToOwned::to_owned),
1252		metrics_registry: metrics_registry.cloned(),
1253		block_announce_config,
1254		bitswap_config,
1255		notification_metrics: metrics,
1256	};
1257
1258	let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1259	let network_mut = Net::new(network_params)?;
1260	let network = network_mut.network_service().clone();
1261
1262	let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1263		network.clone(),
1264		sync_service.clone(),
1265		Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1266		metrics_registry,
1267	)?;
1268	spawn_handle.spawn_blocking(
1269		"network-transactions-handler",
1270		Some("networking"),
1271		tx_handler.run(),
1272	);
1273
1274	spawn_handle.spawn_blocking(
1275		"chain-sync-network-service-provider",
1276		Some("networking"),
1277		network_service_provider.run(Arc::new(network.clone())),
1278	);
1279	spawn_handle.spawn("import-queue", None, {
1280		let sync_service = sync_service.clone();
1281
1282		async move { import_queue.run(sync_service.as_ref()).await }
1283	});
1284
1285	let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1286	spawn_handle.spawn(
1287		"system-rpc-handler",
1288		Some("networking"),
1289		build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1290			role,
1291			network_mut.network_service(),
1292			sync_service.clone(),
1293			client.clone(),
1294			system_rpc_rx,
1295			has_bootnodes,
1296		),
1297	);
1298
1299	let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1300		network_mut,
1301		client,
1302		sync_service.clone(),
1303		announce_block,
1304	);
1305
1306	// The network worker is responsible for gathering all network messages and processing
1307	// them. This is quite a heavy task, and at the time of the writing of this comment it
1308	// frequently happens that this future takes several seconds or in some situations
1309	// even more than a minute until it has processed its entire queue. This is clearly an
1310	// issue, and ideally we would like to fix the network future to take as little time as
1311	// possible, but we also take the extra harm-prevention measure to execute the networking
1312	// future using `spawn_blocking`.
1313	//
1314	// The network worker is spawned as an essential task, meaning if it exits unexpectedly
1315	// the service will shut down.
1316	spawn_essential_handle.spawn_blocking("network-worker", Some("networking"), future);
1317
1318	Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone()))
1319}
1320
1321/// Configuration for [`build_default_syncing_engine`].
1322pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1323where
1324	Block: BlockT,
1325	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1326{
1327	/// Role of the local node.
1328	pub role: Role,
1329	/// Protocol name prefix.
1330	pub protocol_id: ProtocolId,
1331	/// Fork ID.
1332	pub fork_id: Option<&'a str>,
1333	/// Full network configuration.
1334	pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1335	/// Validator for incoming block announcements.
1336	pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1337	/// Handle to communicate with `NetworkService`.
1338	pub network_service_handle: NetworkServiceHandle,
1339	/// Warp sync configuration (when used).
1340	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1341	/// A shared client returned by `new_full_parts`.
1342	pub client: Arc<Client>,
1343	/// Blocks import queue API.
1344	pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1345	/// Expected max total number of peer connections (in + out).
1346	pub num_peers_hint: usize,
1347	/// A handle for spawning tasks.
1348	pub spawn_handle: &'a SpawnTaskHandle,
1349	/// Prometheus metrics registry.
1350	pub metrics_registry: Option<&'a Registry>,
1351	/// Metrics.
1352	pub metrics: NotificationMetrics,
1353	/// Whether to archive blocks. When `true`, gap sync requests bodies to maintain complete
1354	/// block history.
1355	pub archive_blocks: bool,
1356}
1357
1358/// Build default syncing engine using [`build_default_block_downloader`] and
1359/// [`build_polkadot_syncing_strategy`] internally.
1360pub fn build_default_syncing_engine<Block, Client, Net>(
1361	config: DefaultSyncingEngineConfig<Block, Client, Net>,
1362) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1363where
1364	Block: BlockT,
1365	Client: HeaderBackend<Block>
1366		+ BlockBackend<Block>
1367		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1368		+ ProofProvider<Block>
1369		+ Send
1370		+ Sync
1371		+ 'static,
1372	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1373{
1374	let DefaultSyncingEngineConfig {
1375		role,
1376		protocol_id,
1377		fork_id,
1378		net_config,
1379		block_announce_validator,
1380		network_service_handle,
1381		warp_sync_config,
1382		client,
1383		import_queue_service,
1384		num_peers_hint,
1385		spawn_handle,
1386		metrics_registry,
1387		metrics,
1388		archive_blocks,
1389	} = config;
1390
1391	let block_downloader = build_default_block_downloader(
1392		&protocol_id,
1393		fork_id,
1394		net_config,
1395		network_service_handle.clone(),
1396		client.clone(),
1397		num_peers_hint,
1398		spawn_handle,
1399	);
1400	let syncing_strategy = build_polkadot_syncing_strategy(
1401		protocol_id.clone(),
1402		fork_id,
1403		net_config,
1404		warp_sync_config,
1405		block_downloader,
1406		client.clone(),
1407		spawn_handle,
1408		metrics_registry,
1409		archive_blocks,
1410	)?;
1411
1412	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1413		Roles::from(&role),
1414		client,
1415		metrics_registry,
1416		metrics,
1417		&net_config,
1418		protocol_id,
1419		fork_id,
1420		block_announce_validator,
1421		syncing_strategy,
1422		network_service_handle,
1423		import_queue_service,
1424		net_config.peer_store_handle(),
1425	)?;
1426
1427	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1428
1429	Ok((sync_service, block_announce_config))
1430}
1431
1432/// Build default block downloader
1433pub fn build_default_block_downloader<Block, Client, Net>(
1434	protocol_id: &ProtocolId,
1435	fork_id: Option<&str>,
1436	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1437	network_service_handle: NetworkServiceHandle,
1438	client: Arc<Client>,
1439	num_peers_hint: usize,
1440	spawn_handle: &SpawnTaskHandle,
1441) -> Arc<dyn BlockDownloader<Block>>
1442where
1443	Block: BlockT,
1444	Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1445	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1446{
1447	// Custom protocol was not specified, use the default block handler.
1448	// Allow both outgoing and incoming requests.
1449	let BlockRelayParams { mut server, downloader, request_response_config } =
1450		BlockRequestHandler::new::<Net>(
1451			network_service_handle,
1452			&protocol_id,
1453			fork_id,
1454			client.clone(),
1455			num_peers_hint,
1456		);
1457
1458	spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1459		server.run().await;
1460	});
1461
1462	net_config.add_request_response_protocol(request_response_config);
1463
1464	downloader
1465}
1466
1467/// Build standard polkadot syncing strategy
1468pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1469	protocol_id: ProtocolId,
1470	fork_id: Option<&str>,
1471	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1472	warp_sync_config: Option<WarpSyncConfig<Block>>,
1473	block_downloader: Arc<dyn BlockDownloader<Block>>,
1474	client: Arc<Client>,
1475	spawn_handle: &SpawnTaskHandle,
1476	metrics_registry: Option<&Registry>,
1477	archive_blocks: bool,
1478) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1479where
1480	Block: BlockT,
1481	Client: HeaderBackend<Block>
1482		+ BlockBackend<Block>
1483		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1484		+ ProofProvider<Block>
1485		+ Send
1486		+ Sync
1487		+ 'static,
1488	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1489{
1490	if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1491		return Err("Warp sync enabled, but no warp sync provider configured.".into());
1492	}
1493
1494	if client.requires_full_sync() {
1495		match net_config.network_config.sync_mode {
1496			SyncMode::LightState { .. } => {
1497				return Err("Fast sync doesn't work for archive nodes".into())
1498			},
1499			SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1500			SyncMode::Full => {},
1501		}
1502	}
1503
1504	let genesis_hash = client.info().genesis_hash;
1505
1506	let (state_request_protocol_config, state_request_protocol_name) = {
1507		let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1508			net_config.network_config.default_peers_set.reserved_nodes.len();
1509		// Allow both outgoing and incoming requests.
1510		let (handler, protocol_config) =
1511			StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1512		let config_name = protocol_config.protocol_name().clone();
1513
1514		spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1515		(protocol_config, config_name)
1516	};
1517	net_config.add_request_response_protocol(state_request_protocol_config);
1518
1519	let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1520		Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1521			// Allow both outgoing and incoming requests.
1522			let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1523				protocol_id,
1524				genesis_hash,
1525				fork_id,
1526				warp_with_provider.clone(),
1527			);
1528			let config_name = protocol_config.protocol_name().clone();
1529
1530			spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1531			(Some(protocol_config), Some(config_name))
1532		},
1533		_ => (None, None),
1534	};
1535	if let Some(config) = warp_sync_protocol_config {
1536		net_config.add_request_response_protocol(config);
1537	}
1538
1539	let syncing_config = PolkadotSyncingStrategyConfig {
1540		mode: net_config.network_config.sync_mode,
1541		max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1542		max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1543		min_peers_to_start_warp_sync: net_config.network_config.min_peers_to_start_warp_sync,
1544		metrics_registry: metrics_registry.cloned(),
1545		state_request_protocol_name,
1546		block_downloader,
1547		archive_blocks,
1548	};
1549	Ok(Box::new(PolkadotSyncingStrategy::new(
1550		syncing_config,
1551		client,
1552		warp_sync_config,
1553		warp_sync_protocol_name,
1554	)?))
1555}