sc_service/
builder.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	build_network_future, build_system_rpc_future,
21	client::{Client, ClientConfig},
22	config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23	error::Error,
24	metrics::MetricsService,
25	start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
26	TaskManager, TransactionPoolAdapter,
27};
28use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::info;
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34	execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35	BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, StorageProvider, UsageProvider,
36};
37use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
38use sc_consensus::import_queue::{ImportQueue, ImportQueueService};
39use sc_executor::{
40	sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
41	WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
42};
43use sc_keystore::LocalKeystore;
44use sc_network::{
45	config::{FullNetworkConfiguration, ProtocolId, SyncMode},
46	multiaddr::Protocol,
47	service::{
48		traits::{PeerStore, RequestResponseConfig},
49		NotificationMetrics,
50	},
51	NetworkBackend, NetworkStateInfo,
52};
53use sc_network_common::role::{Role, Roles};
54use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
55use sc_network_sync::{
56	block_relay_protocol::{BlockDownloader, BlockRelayParams},
57	block_request_handler::BlockRequestHandler,
58	engine::SyncingEngine,
59	service::network::{NetworkServiceHandle, NetworkServiceProvider},
60	state_request_handler::StateRequestHandler,
61	strategy::{
62		polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
63		SyncingStrategy,
64	},
65	warp_request_handler::RequestHandler as WarpSyncRequestHandler,
66	SyncingService, WarpSyncConfig,
67};
68use sc_rpc::{
69	author::AuthorApiServer,
70	chain::ChainApiServer,
71	offchain::OffchainApiServer,
72	state::{ChildStateApiServer, StateApiServer},
73	system::SystemApiServer,
74	DenyUnsafe, SubscriptionTaskExecutor,
75};
76use sc_rpc_spec_v2::{
77	archive::ArchiveApiServer,
78	chain_head::ChainHeadApiServer,
79	chain_spec::ChainSpecApiServer,
80	transaction::{TransactionApiServer, TransactionBroadcastApiServer},
81};
82use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
83use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
84use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
85use sp_api::{CallApiAt, ProvideRuntimeApi};
86use sp_blockchain::{HeaderBackend, HeaderMetadata};
87use sp_consensus::block_validation::{
88	BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
89};
90use sp_core::traits::{CodeExecutor, SpawnNamed};
91use sp_keystore::KeystorePtr;
92use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
93use std::{str::FromStr, sync::Arc, time::SystemTime};
94
95/// Full client type.
96pub type TFullClient<TBl, TRtApi, TExec> =
97	Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
98
99/// Full client backend type.
100pub type TFullBackend<TBl> = Backend<TBl>;
101
102/// Full client call executor type.
103pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
104
105type TFullParts<TBl, TRtApi, TExec> =
106	(TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
107
108/// Construct a local keystore shareable container
109pub struct KeystoreContainer(Arc<LocalKeystore>);
110
111impl KeystoreContainer {
112	/// Construct KeystoreContainer
113	pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
114		let keystore = Arc::new(match config {
115			KeystoreConfig::Path { path, password } =>
116				LocalKeystore::open(path.clone(), password.clone())?,
117			KeystoreConfig::InMemory => LocalKeystore::in_memory(),
118		});
119
120		Ok(Self(keystore))
121	}
122
123	/// Returns a shared reference to a dynamic `Keystore` trait implementation.
124	pub fn keystore(&self) -> KeystorePtr {
125		self.0.clone()
126	}
127
128	/// Returns a shared reference to the local keystore .
129	pub fn local_keystore(&self) -> Arc<LocalKeystore> {
130		self.0.clone()
131	}
132}
133
134/// Creates a new full client for the given config.
135pub fn new_full_client<TBl, TRtApi, TExec>(
136	config: &Configuration,
137	telemetry: Option<TelemetryHandle>,
138	executor: TExec,
139) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
140where
141	TBl: BlockT,
142	TExec: CodeExecutor + RuntimeVersionOf + Clone,
143{
144	new_full_parts(config, telemetry, executor).map(|parts| parts.0)
145}
146
147/// Create the initial parts of a full node with the default genesis block builder.
148pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
149	config: &Configuration,
150	telemetry: Option<TelemetryHandle>,
151	executor: TExec,
152	enable_import_proof_recording: bool,
153) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
154where
155	TBl: BlockT,
156	TExec: CodeExecutor + RuntimeVersionOf + Clone,
157{
158	let backend = new_db_backend(config.db_config())?;
159
160	let genesis_block_builder = GenesisBlockBuilder::new(
161		config.chain_spec.as_storage_builder(),
162		!config.no_genesis(),
163		backend.clone(),
164		executor.clone(),
165	)?;
166
167	new_full_parts_with_genesis_builder(
168		config,
169		telemetry,
170		executor,
171		backend,
172		genesis_block_builder,
173		enable_import_proof_recording,
174	)
175}
176/// Create the initial parts of a full node with the default genesis block builder.
177pub fn new_full_parts<TBl, TRtApi, TExec>(
178	config: &Configuration,
179	telemetry: Option<TelemetryHandle>,
180	executor: TExec,
181) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
182where
183	TBl: BlockT,
184	TExec: CodeExecutor + RuntimeVersionOf + Clone,
185{
186	new_full_parts_record_import(config, telemetry, executor, false)
187}
188
189/// Create the initial parts of a full node.
190pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
191	config: &Configuration,
192	telemetry: Option<TelemetryHandle>,
193	executor: TExec,
194	backend: Arc<TFullBackend<TBl>>,
195	genesis_block_builder: TBuildGenesisBlock,
196	enable_import_proof_recording: bool,
197) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
198where
199	TBl: BlockT,
200	TExec: CodeExecutor + RuntimeVersionOf + Clone,
201	TBuildGenesisBlock: BuildGenesisBlock<
202		TBl,
203		BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
204	>,
205{
206	let keystore_container = KeystoreContainer::new(&config.keystore)?;
207
208	let task_manager = {
209		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
210		TaskManager::new(config.tokio_handle.clone(), registry)?
211	};
212
213	let chain_spec = &config.chain_spec;
214	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
215		.cloned()
216		.unwrap_or_default();
217
218	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
219		.cloned()
220		.unwrap_or_default();
221
222	let client = {
223		let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
224
225		let wasm_runtime_substitutes = config
226			.chain_spec
227			.code_substitutes()
228			.into_iter()
229			.map(|(n, c)| {
230				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
231					Error::Application(Box::from(format!(
232						"Failed to parse `{}` as block number for code substitutes. \
233						 In an old version the key for code substitute was a block hash. \
234						 Please update the chain spec to a version that is compatible with your node.",
235						n
236					)))
237				})?;
238				Ok((number, c))
239			})
240			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
241
242		let client = new_client(
243			backend.clone(),
244			executor,
245			genesis_block_builder,
246			fork_blocks,
247			bad_blocks,
248			extensions,
249			Box::new(task_manager.spawn_handle()),
250			config.prometheus_config.as_ref().map(|config| config.registry.clone()),
251			telemetry,
252			ClientConfig {
253				offchain_worker_enabled: config.offchain_worker.enabled,
254				offchain_indexing_api: config.offchain_worker.indexing_enabled,
255				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
256				no_genesis: config.no_genesis(),
257				wasm_runtime_substitutes,
258				enable_import_proof_recording,
259			},
260		)?;
261
262		client
263	};
264
265	Ok((client, backend, keystore_container, task_manager))
266}
267
268/// Creates a [`NativeElseWasmExecutor`](sc_executor::NativeElseWasmExecutor) according to
269/// [`Configuration`].
270#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
271#[allow(deprecated)]
272pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
273	config: &Configuration,
274) -> sc_executor::NativeElseWasmExecutor<D> {
275	#[allow(deprecated)]
276	sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
277}
278
279/// Creates a [`WasmExecutor`] according to [`ExecutorConfiguration`].
280pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
281	let strategy = config
282		.default_heap_pages
283		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
284	WasmExecutor::<H>::builder()
285		.with_execution_method(config.wasm_method)
286		.with_onchain_heap_alloc_strategy(strategy)
287		.with_offchain_heap_alloc_strategy(strategy)
288		.with_max_runtime_instances(config.max_runtime_instances)
289		.with_runtime_cache_size(config.runtime_cache_size)
290		.build()
291}
292
293/// Create an instance of default DB-backend backend.
294pub fn new_db_backend<Block>(
295	settings: DatabaseSettings,
296) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
297where
298	Block: BlockT,
299{
300	const CANONICALIZATION_DELAY: u64 = 4096;
301
302	Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
303}
304
305/// Create an instance of client backed by given backend.
306pub fn new_client<E, Block, RA, G>(
307	backend: Arc<Backend<Block>>,
308	executor: E,
309	genesis_block_builder: G,
310	fork_blocks: ForkBlocks<Block>,
311	bad_blocks: BadBlocks<Block>,
312	execution_extensions: ExecutionExtensions<Block>,
313	spawn_handle: Box<dyn SpawnNamed>,
314	prometheus_registry: Option<Registry>,
315	telemetry: Option<TelemetryHandle>,
316	config: ClientConfig<Block>,
317) -> Result<
318	Client<
319		Backend<Block>,
320		crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
321		Block,
322		RA,
323	>,
324	sp_blockchain::Error,
325>
326where
327	Block: BlockT,
328	E: CodeExecutor + RuntimeVersionOf,
329	G: BuildGenesisBlock<
330		Block,
331		BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
332	>,
333{
334	let executor = crate::client::LocalCallExecutor::new(
335		backend.clone(),
336		executor,
337		config.clone(),
338		execution_extensions,
339	)?;
340
341	Client::new(
342		backend,
343		executor,
344		spawn_handle,
345		genesis_block_builder,
346		fork_blocks,
347		bad_blocks,
348		prometheus_registry,
349		telemetry,
350		config,
351	)
352}
353
354/// Parameters to pass into `build`.
355pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
356	/// The service configuration.
357	pub config: Configuration,
358	/// A shared client returned by `new_full_parts`.
359	pub client: Arc<TCl>,
360	/// A shared backend returned by `new_full_parts`.
361	pub backend: Arc<Backend>,
362	/// A task manager returned by `new_full_parts`.
363	pub task_manager: &'a mut TaskManager,
364	/// A shared keystore returned by `new_full_parts`.
365	pub keystore: KeystorePtr,
366	/// A shared transaction pool.
367	pub transaction_pool: Arc<TExPool>,
368	/// Builds additional [`RpcModule`]s that should be added to the server
369	pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
370	/// A shared network instance.
371	pub network: Arc<dyn sc_network::service::traits::NetworkService>,
372	/// A Sender for RPC requests.
373	pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
374	/// Controller for transactions handlers
375	pub tx_handler_controller:
376		sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
377	/// Syncing service.
378	pub sync_service: Arc<SyncingService<TBl>>,
379	/// Telemetry instance for this node.
380	pub telemetry: Option<&'a mut Telemetry>,
381}
382
383/// Spawn the tasks that are required to run a node.
384pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
385	params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
386) -> Result<RpcHandlers, Error>
387where
388	TCl: ProvideRuntimeApi<TBl>
389		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
390		+ Chain<TBl>
391		+ BlockBackend<TBl>
392		+ BlockIdTo<TBl, Error = sp_blockchain::Error>
393		+ ProofProvider<TBl>
394		+ HeaderBackend<TBl>
395		+ BlockchainEvents<TBl>
396		+ ExecutorProvider<TBl>
397		+ UsageProvider<TBl>
398		+ StorageProvider<TBl, TBackend>
399		+ CallApiAt<TBl>
400		+ Send
401		+ 'static,
402	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
403		+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
404		+ sp_session::SessionKeys<TBl>
405		+ sp_api::ApiExt<TBl>,
406	TBl: BlockT,
407	TBl::Hash: Unpin,
408	TBl::Header: Unpin,
409	TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
410	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
411{
412	let SpawnTasksParams {
413		mut config,
414		task_manager,
415		client,
416		backend,
417		keystore,
418		transaction_pool,
419		rpc_builder,
420		network,
421		system_rpc_tx,
422		tx_handler_controller,
423		sync_service,
424		telemetry,
425	} = params;
426
427	let chain_info = client.usage_info().chain;
428
429	sp_session::generate_initial_session_keys(
430		client.clone(),
431		chain_info.best_hash,
432		config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
433		keystore.clone(),
434	)
435	.map_err(|e| Error::Application(Box::new(e)))?;
436
437	let sysinfo = sc_sysinfo::gather_sysinfo();
438	sc_sysinfo::print_sysinfo(&sysinfo);
439
440	let telemetry = telemetry
441		.map(|telemetry| {
442			init_telemetry(
443				config.network.node_name.clone(),
444				config.impl_name.clone(),
445				config.impl_version.clone(),
446				config.chain_spec.name().to_string(),
447				config.role.is_authority(),
448				network.clone(),
449				client.clone(),
450				telemetry,
451				Some(sysinfo),
452			)
453		})
454		.transpose()?;
455
456	info!("📦 Highest known block at #{}", chain_info.best_number);
457
458	let spawn_handle = task_manager.spawn_handle();
459
460	// Inform the tx pool about imported and finalized blocks.
461	spawn_handle.spawn(
462		"txpool-notifications",
463		Some("transaction-pool"),
464		sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
465	);
466
467	spawn_handle.spawn(
468		"on-transaction-imported",
469		Some("transaction-pool"),
470		propagate_transaction_notifications(
471			transaction_pool.clone(),
472			tx_handler_controller,
473			telemetry.clone(),
474		),
475	);
476
477	// Prometheus metrics.
478	let metrics_service =
479		if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
480			// Set static metrics.
481			let metrics = MetricsService::with_prometheus(
482				telemetry,
483				&registry,
484				config.role,
485				&config.network.node_name,
486				&config.impl_version,
487			)?;
488			spawn_handle.spawn(
489				"prometheus-endpoint",
490				None,
491				prometheus_endpoint::init_prometheus(port, registry).map(drop),
492			);
493
494			metrics
495		} else {
496			MetricsService::new(telemetry)
497		};
498
499	// Periodically updated metrics and telemetry updates.
500	spawn_handle.spawn(
501		"telemetry-periodic-send",
502		None,
503		metrics_service.run(
504			client.clone(),
505			transaction_pool.clone(),
506			network.clone(),
507			sync_service.clone(),
508		),
509	);
510
511	let rpc_id_provider = config.rpc.id_provider.take();
512
513	// jsonrpsee RPC
514	let gen_rpc_module = || {
515		gen_rpc_module(
516			task_manager.spawn_handle(),
517			client.clone(),
518			transaction_pool.clone(),
519			keystore.clone(),
520			system_rpc_tx.clone(),
521			config.impl_name.clone(),
522			config.impl_version.clone(),
523			config.chain_spec.as_ref(),
524			&config.state_pruning,
525			config.blocks_pruning,
526			backend.clone(),
527			&*rpc_builder,
528		)
529	};
530
531	let rpc_server_handle = start_rpc_servers(
532		&config.rpc,
533		config.prometheus_registry(),
534		&config.tokio_handle,
535		gen_rpc_module,
536		rpc_id_provider,
537	)?;
538
539	let listen_addrs = rpc_server_handle
540		.listen_addrs()
541		.into_iter()
542		.map(|socket_addr| {
543			let mut multiaddr: Multiaddr = socket_addr.ip().into();
544			multiaddr.push(Protocol::Tcp(socket_addr.port()));
545			multiaddr
546		})
547		.collect();
548
549	let in_memory_rpc = {
550		let mut module = gen_rpc_module()?;
551		module.extensions_mut().insert(DenyUnsafe::No);
552		module
553	};
554
555	let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
556
557	// Spawn informant task
558	spawn_handle.spawn(
559		"informant",
560		None,
561		sc_informant::build(client.clone(), network, sync_service.clone()),
562	);
563
564	task_manager.keep_alive((config.base_path, rpc_server_handle));
565
566	Ok(in_memory_rpc_handle)
567}
568
569/// Returns a future that forwards imported transactions to the transaction networking protocol.
570pub async fn propagate_transaction_notifications<Block, ExPool>(
571	transaction_pool: Arc<ExPool>,
572	tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
573		<Block as BlockT>::Hash,
574	>,
575	telemetry: Option<TelemetryHandle>,
576) where
577	Block: BlockT,
578	ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
579{
580	// transaction notifications
581	transaction_pool
582		.import_notification_stream()
583		.for_each(move |hash| {
584			tx_handler_controller.propagate_transaction(hash);
585			let status = transaction_pool.status();
586			telemetry!(
587				telemetry;
588				SUBSTRATE_INFO;
589				"txpool.import";
590				"ready" => status.ready,
591				"future" => status.future,
592			);
593			ready(())
594		})
595		.await;
596}
597
598/// Initialize telemetry with provided configuration and return telemetry handle
599pub fn init_telemetry<Block, Client, Network>(
600	name: String,
601	implementation: String,
602	version: String,
603	chain: String,
604	authority: bool,
605	network: Network,
606	client: Arc<Client>,
607	telemetry: &mut Telemetry,
608	sysinfo: Option<sc_telemetry::SysInfo>,
609) -> sc_telemetry::Result<TelemetryHandle>
610where
611	Block: BlockT,
612	Client: BlockBackend<Block>,
613	Network: NetworkStateInfo,
614{
615	let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
616	let connection_message = ConnectionMessage {
617		name,
618		implementation,
619		version,
620		target_os: sc_sysinfo::TARGET_OS.into(),
621		target_arch: sc_sysinfo::TARGET_ARCH.into(),
622		target_env: sc_sysinfo::TARGET_ENV.into(),
623		config: String::new(),
624		chain,
625		genesis_hash: format!("{:?}", genesis_hash),
626		authority,
627		startup_time: SystemTime::UNIX_EPOCH
628			.elapsed()
629			.map(|dur| dur.as_millis())
630			.unwrap_or(0)
631			.to_string(),
632		network_id: network.local_peer_id().to_base58(),
633		sysinfo,
634	};
635
636	telemetry.start_telemetry(connection_message)?;
637
638	Ok(telemetry.handle())
639}
640
641/// Generate RPC module using provided configuration
642pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
643	spawn_handle: SpawnTaskHandle,
644	client: Arc<TCl>,
645	transaction_pool: Arc<TExPool>,
646	keystore: KeystorePtr,
647	system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
648	impl_name: String,
649	impl_version: String,
650	chain_spec: &dyn ChainSpec,
651	state_pruning: &Option<PruningMode>,
652	blocks_pruning: BlocksPruning,
653	backend: Arc<TBackend>,
654	rpc_builder: &(dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
655) -> Result<RpcModule<()>, Error>
656where
657	TBl: BlockT,
658	TCl: ProvideRuntimeApi<TBl>
659		+ BlockchainEvents<TBl>
660		+ HeaderBackend<TBl>
661		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
662		+ ExecutorProvider<TBl>
663		+ CallApiAt<TBl>
664		+ ProofProvider<TBl>
665		+ StorageProvider<TBl, TBackend>
666		+ BlockBackend<TBl>
667		+ Send
668		+ Sync
669		+ 'static,
670	TBackend: sc_client_api::backend::Backend<TBl> + 'static,
671	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
672	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
673	TBl::Hash: Unpin,
674	TBl::Header: Unpin,
675{
676	let system_info = sc_rpc::system::SystemInfo {
677		chain_name: chain_spec.name().into(),
678		impl_name,
679		impl_version,
680		properties: chain_spec.properties(),
681		chain_type: chain_spec.chain_type(),
682	};
683
684	let mut rpc_api = RpcModule::new(());
685	let task_executor = Arc::new(spawn_handle);
686
687	let (chain, state, child_state) = {
688		let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
689		let (state, child_state) = sc_rpc::state::new_full(client.clone(), task_executor.clone());
690		let state = state.into_rpc();
691		let child_state = child_state.into_rpc();
692
693		(chain, state, child_state)
694	};
695
696	const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
697
698	let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
699		client.clone(),
700		transaction_pool.clone(),
701		task_executor.clone(),
702		MAX_TRANSACTION_PER_CONNECTION,
703	)
704	.into_rpc();
705
706	let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
707		client.clone(),
708		transaction_pool.clone(),
709		task_executor.clone(),
710	)
711	.into_rpc();
712
713	let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
714		client.clone(),
715		backend.clone(),
716		task_executor.clone(),
717		// Defaults to sensible limits for the `ChainHead`.
718		sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
719	)
720	.into_rpc();
721
722	// Part of the RPC v2 spec.
723	// An archive node that can respond to the `archive` RPC-v2 queries is a node with:
724	// - state pruning in archive mode: The storage of blocks is kept around
725	// - block pruning in archive mode: The block's body is kept around
726	let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
727		blocks_pruning.is_archive();
728	let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
729	if is_archive_node {
730		let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
731			client.clone(),
732			backend.clone(),
733			genesis_hash,
734			task_executor.clone(),
735		)
736		.into_rpc();
737		rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
738	}
739
740	// ChainSpec RPC-v2.
741	let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
742		chain_spec.name().into(),
743		genesis_hash,
744		chain_spec.properties(),
745	)
746	.into_rpc();
747
748	let author = sc_rpc::author::Author::new(
749		client.clone(),
750		transaction_pool,
751		keystore,
752		task_executor.clone(),
753	)
754	.into_rpc();
755
756	let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
757
758	if let Some(storage) = backend.offchain_storage() {
759		let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
760
761		rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
762	}
763
764	// Part of the RPC v2 spec.
765	rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
766	rpc_api
767		.merge(transaction_broadcast_rpc_v2)
768		.map_err(|e| Error::Application(e.into()))?;
769	rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
770	rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
771
772	// Part of the old RPC spec.
773	rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
774	rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
775	rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
776	rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
777	rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
778	// Additional [`RpcModule`]s defined in the node to fit the specific blockchain
779	let extra_rpcs = rpc_builder(task_executor.clone())?;
780	rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
781
782	Ok(rpc_api)
783}
784
785/// Parameters to pass into [`build_network`].
786pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
787where
788	Block: BlockT,
789	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
790{
791	/// The service configuration.
792	pub config: &'a Configuration,
793	/// Full network configuration.
794	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
795	/// A shared client returned by `new_full_parts`.
796	pub client: Arc<Client>,
797	/// A shared transaction pool.
798	pub transaction_pool: Arc<TxPool>,
799	/// A handle for spawning tasks.
800	pub spawn_handle: SpawnTaskHandle,
801	/// An import queue.
802	pub import_queue: IQ,
803	/// A block announce validator builder.
804	pub block_announce_validator_builder: Option<
805		Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
806	>,
807	/// Optional warp sync config.
808	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
809	/// User specified block relay params. If not specified, the default
810	/// block request handler will be used.
811	pub block_relay: Option<BlockRelayParams<Block, Net>>,
812	/// Metrics.
813	pub metrics: NotificationMetrics,
814}
815
816/// Build the network service, the network status sinks and an RPC sender.
817pub fn build_network<Block, Net, TxPool, IQ, Client>(
818	params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
819) -> Result<
820	(
821		Arc<dyn sc_network::service::traits::NetworkService>,
822		TracingUnboundedSender<sc_rpc::system::Request<Block>>,
823		sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
824		NetworkStarter,
825		Arc<SyncingService<Block>>,
826	),
827	Error,
828>
829where
830	Block: BlockT,
831	Client: ProvideRuntimeApi<Block>
832		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
833		+ Chain<Block>
834		+ BlockBackend<Block>
835		+ BlockIdTo<Block, Error = sp_blockchain::Error>
836		+ ProofProvider<Block>
837		+ HeaderBackend<Block>
838		+ BlockchainEvents<Block>
839		+ 'static,
840	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
841	IQ: ImportQueue<Block> + 'static,
842	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
843{
844	let BuildNetworkParams {
845		config,
846		mut net_config,
847		client,
848		transaction_pool,
849		spawn_handle,
850		import_queue,
851		block_announce_validator_builder,
852		warp_sync_config,
853		block_relay,
854		metrics,
855	} = params;
856
857	let block_announce_validator = if let Some(f) = block_announce_validator_builder {
858		f(client.clone())
859	} else {
860		Box::new(DefaultBlockAnnounceValidator)
861	};
862
863	let network_service_provider = NetworkServiceProvider::new();
864	let protocol_id = config.protocol_id();
865	let fork_id = config.chain_spec.fork_id();
866	let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
867
868	let block_downloader = match block_relay {
869		Some(params) => {
870			let BlockRelayParams { mut server, downloader, request_response_config } = params;
871
872			net_config.add_request_response_protocol(request_response_config);
873
874			spawn_handle.spawn("block-request-handler", Some("networking"), async move {
875				server.run().await;
876			});
877
878			downloader
879		},
880		None => build_default_block_downloader(
881			&protocol_id,
882			fork_id,
883			&mut net_config,
884			network_service_provider.handle(),
885			Arc::clone(&client),
886			config.network.default_peers_set.in_peers as usize +
887				config.network.default_peers_set.out_peers as usize,
888			&spawn_handle,
889		),
890	};
891
892	let syncing_strategy = build_polkadot_syncing_strategy(
893		protocol_id.clone(),
894		fork_id,
895		&mut net_config,
896		warp_sync_config,
897		block_downloader,
898		client.clone(),
899		&spawn_handle,
900		metrics_registry,
901	)?;
902
903	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
904		Roles::from(&config.role),
905		Arc::clone(&client),
906		metrics_registry,
907		metrics.clone(),
908		&net_config,
909		protocol_id.clone(),
910		fork_id,
911		block_announce_validator,
912		syncing_strategy,
913		network_service_provider.handle(),
914		import_queue.service(),
915		net_config.peer_store_handle(),
916	)?;
917
918	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
919
920	build_network_advanced(BuildNetworkAdvancedParams {
921		role: config.role,
922		protocol_id,
923		fork_id,
924		ipfs_server: config.network.ipfs_server,
925		announce_block: config.announce_block,
926		net_config,
927		client,
928		transaction_pool,
929		spawn_handle,
930		import_queue,
931		sync_service,
932		block_announce_config,
933		network_service_provider,
934		metrics_registry,
935		metrics,
936	})
937}
938
939/// Parameters to pass into [`build_network_advanced`].
940pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
941where
942	Block: BlockT,
943	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
944{
945	/// Role of the local node.
946	pub role: Role,
947	/// Protocol name prefix.
948	pub protocol_id: ProtocolId,
949	/// Fork ID.
950	pub fork_id: Option<&'a str>,
951	/// Enable serving block data over IPFS bitswap.
952	pub ipfs_server: bool,
953	/// Announce block automatically after they have been imported.
954	pub announce_block: bool,
955	/// Full network configuration.
956	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
957	/// A shared client returned by `new_full_parts`.
958	pub client: Arc<Client>,
959	/// A shared transaction pool.
960	pub transaction_pool: Arc<TxPool>,
961	/// A handle for spawning tasks.
962	pub spawn_handle: SpawnTaskHandle,
963	/// An import queue.
964	pub import_queue: IQ,
965	/// Syncing service to communicate with syncing engine.
966	pub sync_service: SyncingService<Block>,
967	/// Block announce config.
968	pub block_announce_config: Net::NotificationProtocolConfig,
969	/// Network service provider to drive with network internally.
970	pub network_service_provider: NetworkServiceProvider,
971	/// Prometheus metrics registry.
972	pub metrics_registry: Option<&'a Registry>,
973	/// Metrics.
974	pub metrics: NotificationMetrics,
975}
976
977/// Build the network service, the network status sinks and an RPC sender, this is a lower-level
978/// version of [`build_network`] for those needing more control.
979pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
980	params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
981) -> Result<
982	(
983		Arc<dyn sc_network::service::traits::NetworkService>,
984		TracingUnboundedSender<sc_rpc::system::Request<Block>>,
985		sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
986		NetworkStarter,
987		Arc<SyncingService<Block>>,
988	),
989	Error,
990>
991where
992	Block: BlockT,
993	Client: ProvideRuntimeApi<Block>
994		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
995		+ Chain<Block>
996		+ BlockBackend<Block>
997		+ BlockIdTo<Block, Error = sp_blockchain::Error>
998		+ ProofProvider<Block>
999		+ HeaderBackend<Block>
1000		+ BlockchainEvents<Block>
1001		+ 'static,
1002	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1003	IQ: ImportQueue<Block> + 'static,
1004	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1005{
1006	let BuildNetworkAdvancedParams {
1007		role,
1008		protocol_id,
1009		fork_id,
1010		ipfs_server,
1011		announce_block,
1012		mut net_config,
1013		client,
1014		transaction_pool,
1015		spawn_handle,
1016		import_queue,
1017		sync_service,
1018		block_announce_config,
1019		network_service_provider,
1020		metrics_registry,
1021		metrics,
1022	} = params;
1023
1024	let genesis_hash = client.info().genesis_hash;
1025
1026	let light_client_request_protocol_config = {
1027		// Allow both outgoing and incoming requests.
1028		let (handler, protocol_config) =
1029			LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1030		spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1031		protocol_config
1032	};
1033
1034	// install request handlers to `FullNetworkConfiguration`
1035	net_config.add_request_response_protocol(light_client_request_protocol_config);
1036
1037	let bitswap_config = ipfs_server.then(|| {
1038		let (handler, config) = Net::bitswap_server(client.clone());
1039		spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1040
1041		config
1042	});
1043
1044	// Create transactions protocol and add it to the list of supported protocols of
1045	let (transactions_handler_proto, transactions_config) =
1046		sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1047			protocol_id.clone(),
1048			genesis_hash,
1049			fork_id,
1050			metrics.clone(),
1051			net_config.peer_store_handle(),
1052		);
1053	net_config.add_notification_protocol(transactions_config);
1054
1055	// Start task for `PeerStore`
1056	let peer_store = net_config.take_peer_store();
1057	spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1058
1059	let sync_service = Arc::new(sync_service);
1060
1061	let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1062		role,
1063		executor: {
1064			let spawn_handle = Clone::clone(&spawn_handle);
1065			Box::new(move |fut| {
1066				spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1067			})
1068		},
1069		network_config: net_config,
1070		genesis_hash,
1071		protocol_id,
1072		fork_id: fork_id.map(ToOwned::to_owned),
1073		metrics_registry: metrics_registry.cloned(),
1074		block_announce_config,
1075		bitswap_config,
1076		notification_metrics: metrics,
1077	};
1078
1079	let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1080	let network_mut = Net::new(network_params)?;
1081	let network = network_mut.network_service().clone();
1082
1083	let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1084		network.clone(),
1085		sync_service.clone(),
1086		Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1087		metrics_registry,
1088	)?;
1089	spawn_handle.spawn_blocking(
1090		"network-transactions-handler",
1091		Some("networking"),
1092		tx_handler.run(),
1093	);
1094
1095	spawn_handle.spawn_blocking(
1096		"chain-sync-network-service-provider",
1097		Some("networking"),
1098		network_service_provider.run(Arc::new(network.clone())),
1099	);
1100	spawn_handle.spawn("import-queue", None, {
1101		let sync_service = sync_service.clone();
1102
1103		async move { import_queue.run(sync_service.as_ref()).await }
1104	});
1105
1106	let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1107	spawn_handle.spawn(
1108		"system-rpc-handler",
1109		Some("networking"),
1110		build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1111			role,
1112			network_mut.network_service(),
1113			sync_service.clone(),
1114			client.clone(),
1115			system_rpc_rx,
1116			has_bootnodes,
1117		),
1118	);
1119
1120	let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1121		network_mut,
1122		client,
1123		sync_service.clone(),
1124		announce_block,
1125	);
1126
1127	// TODO: Normally, one is supposed to pass a list of notifications protocols supported by the
1128	// node through the `NetworkConfiguration` struct. But because this function doesn't know in
1129	// advance which components, such as GrandPa or Polkadot, will be plugged on top of the
1130	// service, it is unfortunately not possible to do so without some deep refactoring. To
1131	// bypass this problem, the `NetworkService` provides a `register_notifications_protocol`
1132	// method that can be called even after the network has been initialized. However, we want to
1133	// avoid the situation where `register_notifications_protocol` is called *after* the network
1134	// actually connects to other peers. For this reason, we delay the process of the network
1135	// future until the user calls `NetworkStarter::start_network`.
1136	//
1137	// This entire hack should eventually be removed in favour of passing the list of protocols
1138	// through the configuration.
1139	//
1140	// See also https://github.com/paritytech/substrate/issues/6827
1141	let (network_start_tx, network_start_rx) = oneshot::channel();
1142
1143	// The network worker is responsible for gathering all network messages and processing
1144	// them. This is quite a heavy task, and at the time of the writing of this comment it
1145	// frequently happens that this future takes several seconds or in some situations
1146	// even more than a minute until it has processed its entire queue. This is clearly an
1147	// issue, and ideally we would like to fix the network future to take as little time as
1148	// possible, but we also take the extra harm-prevention measure to execute the networking
1149	// future using `spawn_blocking`.
1150	spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
1151		if network_start_rx.await.is_err() {
1152			log::warn!(
1153				"The NetworkStart returned as part of `build_network` has been silently dropped"
1154			);
1155			// This `return` might seem unnecessary, but we don't want to make it look like
1156			// everything is working as normal even though the user is clearly misusing the API.
1157			return
1158		}
1159
1160		future.await
1161	});
1162
1163	Ok((
1164		network,
1165		system_rpc_tx,
1166		tx_handler_controller,
1167		NetworkStarter(network_start_tx),
1168		sync_service.clone(),
1169	))
1170}
1171
1172/// Configuration for [`build_default_syncing_engine`].
1173pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1174where
1175	Block: BlockT,
1176	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1177{
1178	/// Role of the local node.
1179	pub role: Role,
1180	/// Protocol name prefix.
1181	pub protocol_id: ProtocolId,
1182	/// Fork ID.
1183	pub fork_id: Option<&'a str>,
1184	/// Full network configuration.
1185	pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1186	/// Validator for incoming block announcements.
1187	pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1188	/// Handle to communicate with `NetworkService`.
1189	pub network_service_handle: NetworkServiceHandle,
1190	/// Warp sync configuration (when used).
1191	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1192	/// A shared client returned by `new_full_parts`.
1193	pub client: Arc<Client>,
1194	/// Blocks import queue API.
1195	pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1196	/// Expected max total number of peer connections (in + out).
1197	pub num_peers_hint: usize,
1198	/// A handle for spawning tasks.
1199	pub spawn_handle: &'a SpawnTaskHandle,
1200	/// Prometheus metrics registry.
1201	pub metrics_registry: Option<&'a Registry>,
1202	/// Metrics.
1203	pub metrics: NotificationMetrics,
1204}
1205
1206/// Build default syncing engine using [`build_default_block_downloader`] and
1207/// [`build_polkadot_syncing_strategy`] internally.
1208pub fn build_default_syncing_engine<Block, Client, Net>(
1209	config: DefaultSyncingEngineConfig<Block, Client, Net>,
1210) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1211where
1212	Block: BlockT,
1213	Client: HeaderBackend<Block>
1214		+ BlockBackend<Block>
1215		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1216		+ ProofProvider<Block>
1217		+ Send
1218		+ Sync
1219		+ 'static,
1220	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1221{
1222	let DefaultSyncingEngineConfig {
1223		role,
1224		protocol_id,
1225		fork_id,
1226		net_config,
1227		block_announce_validator,
1228		network_service_handle,
1229		warp_sync_config,
1230		client,
1231		import_queue_service,
1232		num_peers_hint,
1233		spawn_handle,
1234		metrics_registry,
1235		metrics,
1236	} = config;
1237
1238	let block_downloader = build_default_block_downloader(
1239		&protocol_id,
1240		fork_id,
1241		net_config,
1242		network_service_handle.clone(),
1243		client.clone(),
1244		num_peers_hint,
1245		spawn_handle,
1246	);
1247	let syncing_strategy = build_polkadot_syncing_strategy(
1248		protocol_id.clone(),
1249		fork_id,
1250		net_config,
1251		warp_sync_config,
1252		block_downloader,
1253		client.clone(),
1254		spawn_handle,
1255		metrics_registry,
1256	)?;
1257
1258	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1259		Roles::from(&role),
1260		client,
1261		metrics_registry,
1262		metrics,
1263		&net_config,
1264		protocol_id,
1265		fork_id,
1266		block_announce_validator,
1267		syncing_strategy,
1268		network_service_handle,
1269		import_queue_service,
1270		net_config.peer_store_handle(),
1271	)?;
1272
1273	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1274
1275	Ok((sync_service, block_announce_config))
1276}
1277
1278/// Build default block downloader
1279pub fn build_default_block_downloader<Block, Client, Net>(
1280	protocol_id: &ProtocolId,
1281	fork_id: Option<&str>,
1282	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1283	network_service_handle: NetworkServiceHandle,
1284	client: Arc<Client>,
1285	num_peers_hint: usize,
1286	spawn_handle: &SpawnTaskHandle,
1287) -> Arc<dyn BlockDownloader<Block>>
1288where
1289	Block: BlockT,
1290	Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1291	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1292{
1293	// Custom protocol was not specified, use the default block handler.
1294	// Allow both outgoing and incoming requests.
1295	let BlockRelayParams { mut server, downloader, request_response_config } =
1296		BlockRequestHandler::new::<Net>(
1297			network_service_handle,
1298			&protocol_id,
1299			fork_id,
1300			client.clone(),
1301			num_peers_hint,
1302		);
1303
1304	spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1305		server.run().await;
1306	});
1307
1308	net_config.add_request_response_protocol(request_response_config);
1309
1310	downloader
1311}
1312
1313/// Build standard polkadot syncing strategy
1314pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1315	protocol_id: ProtocolId,
1316	fork_id: Option<&str>,
1317	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1318	warp_sync_config: Option<WarpSyncConfig<Block>>,
1319	block_downloader: Arc<dyn BlockDownloader<Block>>,
1320	client: Arc<Client>,
1321	spawn_handle: &SpawnTaskHandle,
1322	metrics_registry: Option<&Registry>,
1323) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1324where
1325	Block: BlockT,
1326	Client: HeaderBackend<Block>
1327		+ BlockBackend<Block>
1328		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1329		+ ProofProvider<Block>
1330		+ Send
1331		+ Sync
1332		+ 'static,
1333	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1334{
1335	if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1336		return Err("Warp sync enabled, but no warp sync provider configured.".into())
1337	}
1338
1339	if client.requires_full_sync() {
1340		match net_config.network_config.sync_mode {
1341			SyncMode::LightState { .. } =>
1342				return Err("Fast sync doesn't work for archive nodes".into()),
1343			SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1344			SyncMode::Full => {},
1345		}
1346	}
1347
1348	let genesis_hash = client.info().genesis_hash;
1349
1350	let (state_request_protocol_config, state_request_protocol_name) = {
1351		let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1352			net_config.network_config.default_peers_set.reserved_nodes.len();
1353		// Allow both outgoing and incoming requests.
1354		let (handler, protocol_config) =
1355			StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1356		let config_name = protocol_config.protocol_name().clone();
1357
1358		spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1359		(protocol_config, config_name)
1360	};
1361	net_config.add_request_response_protocol(state_request_protocol_config);
1362
1363	let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1364		Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1365			// Allow both outgoing and incoming requests.
1366			let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1367				protocol_id,
1368				genesis_hash,
1369				fork_id,
1370				warp_with_provider.clone(),
1371			);
1372			let config_name = protocol_config.protocol_name().clone();
1373
1374			spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1375			(Some(protocol_config), Some(config_name))
1376		},
1377		_ => (None, None),
1378	};
1379	if let Some(config) = warp_sync_protocol_config {
1380		net_config.add_request_response_protocol(config);
1381	}
1382
1383	let syncing_config = PolkadotSyncingStrategyConfig {
1384		mode: net_config.network_config.sync_mode,
1385		max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1386		max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1387		metrics_registry: metrics_registry.cloned(),
1388		state_request_protocol_name,
1389		block_downloader,
1390	};
1391	Ok(Box::new(PolkadotSyncingStrategy::new(
1392		syncing_config,
1393		client,
1394		warp_sync_config,
1395		warp_sync_protocol_name,
1396	)?))
1397}
1398
1399/// Object used to start the network.
1400#[must_use]
1401pub struct NetworkStarter(oneshot::Sender<()>);
1402
1403impl NetworkStarter {
1404	/// Create a new NetworkStarter
1405	pub fn new(sender: oneshot::Sender<()>) -> Self {
1406		NetworkStarter(sender)
1407	}
1408
1409	/// Start the network. Call this after all sub-components have been initialized.
1410	///
1411	/// > **Note**: If you don't call this function, the networking will not work.
1412	pub fn start_network(self) {
1413		let _ = self.0.send(());
1414	}
1415}