sc_service/
builder.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	build_network_future, build_system_rpc_future,
21	client::{Client, ClientConfig},
22	config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23	error::Error,
24	metrics::MetricsService,
25	start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
26	TaskManager, TransactionPoolAdapter,
27};
28use futures::{select, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::{debug, error, info};
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34	execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35	BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, KeysIter, StorageProvider,
36	TrieCacheContext, UsageProvider,
37};
38use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
39use sc_consensus::import_queue::{ImportQueue, ImportQueueService};
40use sc_executor::{
41	sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
42	WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
43};
44use sc_keystore::LocalKeystore;
45use sc_network::{
46	config::{FullNetworkConfiguration, ProtocolId, SyncMode},
47	multiaddr::Protocol,
48	service::{
49		traits::{PeerStore, RequestResponseConfig},
50		NotificationMetrics,
51	},
52	NetworkBackend, NetworkStateInfo,
53};
54use sc_network_common::role::{Role, Roles};
55use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
56use sc_network_sync::{
57	block_relay_protocol::{BlockDownloader, BlockRelayParams},
58	block_request_handler::BlockRequestHandler,
59	engine::SyncingEngine,
60	service::network::{NetworkServiceHandle, NetworkServiceProvider},
61	state_request_handler::StateRequestHandler,
62	strategy::{
63		polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
64		SyncingStrategy,
65	},
66	warp_request_handler::RequestHandler as WarpSyncRequestHandler,
67	SyncingService, WarpSyncConfig,
68};
69use sc_rpc::{
70	author::AuthorApiServer,
71	chain::ChainApiServer,
72	offchain::OffchainApiServer,
73	state::{ChildStateApiServer, StateApiServer},
74	system::SystemApiServer,
75	DenyUnsafe, SubscriptionTaskExecutor,
76};
77use sc_rpc_spec_v2::{
78	archive::ArchiveApiServer,
79	chain_head::ChainHeadApiServer,
80	chain_spec::ChainSpecApiServer,
81	transaction::{TransactionApiServer, TransactionBroadcastApiServer},
82};
83use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
84use sc_tracing::block::TracingExecuteBlock;
85use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
86use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
87use sp_api::{CallApiAt, ProvideRuntimeApi};
88use sp_blockchain::{HeaderBackend, HeaderMetadata};
89use sp_consensus::block_validation::{
90	BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
91};
92use sp_core::traits::{CodeExecutor, SpawnNamed};
93use sp_keystore::KeystorePtr;
94use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
95use sp_storage::{ChildInfo, ChildType, PrefixedStorageKey};
96use std::{
97	str::FromStr,
98	sync::Arc,
99	time::{Duration, SystemTime},
100};
101
102/// Full client type.
103pub type TFullClient<TBl, TRtApi, TExec> =
104	Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
105
106/// Full client backend type.
107pub type TFullBackend<TBl> = Backend<TBl>;
108
109/// Full client call executor type.
110pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
111
112type TFullParts<TBl, TRtApi, TExec> =
113	(TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
114
115/// Construct a local keystore shareable container
116pub struct KeystoreContainer(Arc<LocalKeystore>);
117
118impl KeystoreContainer {
119	/// Construct KeystoreContainer
120	pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
121		let keystore = Arc::new(match config {
122			KeystoreConfig::Path { path, password } =>
123				LocalKeystore::open(path.clone(), password.clone())?,
124			KeystoreConfig::InMemory => LocalKeystore::in_memory(),
125		});
126
127		Ok(Self(keystore))
128	}
129
130	/// Returns a shared reference to a dynamic `Keystore` trait implementation.
131	pub fn keystore(&self) -> KeystorePtr {
132		self.0.clone()
133	}
134
135	/// Returns a shared reference to the local keystore .
136	pub fn local_keystore(&self) -> Arc<LocalKeystore> {
137		self.0.clone()
138	}
139}
140
141/// Creates a new full client for the given config.
142pub fn new_full_client<TBl, TRtApi, TExec>(
143	config: &Configuration,
144	telemetry: Option<TelemetryHandle>,
145	executor: TExec,
146) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
147where
148	TBl: BlockT,
149	TExec: CodeExecutor + RuntimeVersionOf + Clone,
150{
151	new_full_parts(config, telemetry, executor).map(|parts| parts.0)
152}
153
154/// Create the initial parts of a full node with the default genesis block builder.
155pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
156	config: &Configuration,
157	telemetry: Option<TelemetryHandle>,
158	executor: TExec,
159	enable_import_proof_recording: bool,
160) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
161where
162	TBl: BlockT,
163	TExec: CodeExecutor + RuntimeVersionOf + Clone,
164{
165	let backend = new_db_backend(config.db_config())?;
166
167	let genesis_block_builder = GenesisBlockBuilder::new(
168		config.chain_spec.as_storage_builder(),
169		!config.no_genesis(),
170		backend.clone(),
171		executor.clone(),
172	)?;
173
174	new_full_parts_with_genesis_builder(
175		config,
176		telemetry,
177		executor,
178		backend,
179		genesis_block_builder,
180		enable_import_proof_recording,
181	)
182}
183/// Create the initial parts of a full node with the default genesis block builder.
184pub fn new_full_parts<TBl, TRtApi, TExec>(
185	config: &Configuration,
186	telemetry: Option<TelemetryHandle>,
187	executor: TExec,
188) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
189where
190	TBl: BlockT,
191	TExec: CodeExecutor + RuntimeVersionOf + Clone,
192{
193	new_full_parts_record_import(config, telemetry, executor, false)
194}
195
196/// Create the initial parts of a full node.
197pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
198	config: &Configuration,
199	telemetry: Option<TelemetryHandle>,
200	executor: TExec,
201	backend: Arc<TFullBackend<TBl>>,
202	genesis_block_builder: TBuildGenesisBlock,
203	enable_import_proof_recording: bool,
204) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
205where
206	TBl: BlockT,
207	TExec: CodeExecutor + RuntimeVersionOf + Clone,
208	TBuildGenesisBlock: BuildGenesisBlock<
209		TBl,
210		BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
211	>,
212{
213	let keystore_container = KeystoreContainer::new(&config.keystore)?;
214
215	let task_manager = {
216		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
217		TaskManager::new(config.tokio_handle.clone(), registry)?
218	};
219
220	let chain_spec = &config.chain_spec;
221	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
222		.cloned()
223		.unwrap_or_default();
224
225	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
226		.cloned()
227		.unwrap_or_default();
228
229	let client = {
230		let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
231
232		let wasm_runtime_substitutes = config
233			.chain_spec
234			.code_substitutes()
235			.into_iter()
236			.map(|(n, c)| {
237				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
238					Error::Application(Box::from(format!(
239						"Failed to parse `{}` as block number for code substitutes. \
240						 In an old version the key for code substitute was a block hash. \
241						 Please update the chain spec to a version that is compatible with your node.",
242						n
243					)))
244				})?;
245				Ok((number, c))
246			})
247			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
248
249		let client = new_client(
250			backend.clone(),
251			executor,
252			genesis_block_builder,
253			fork_blocks,
254			bad_blocks,
255			extensions,
256			Box::new(task_manager.spawn_handle()),
257			config.prometheus_config.as_ref().map(|config| config.registry.clone()),
258			telemetry,
259			ClientConfig {
260				offchain_worker_enabled: config.offchain_worker.enabled,
261				offchain_indexing_api: config.offchain_worker.indexing_enabled,
262				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
263				no_genesis: config.no_genesis(),
264				wasm_runtime_substitutes,
265				enable_import_proof_recording,
266			},
267		)?;
268
269		if let Some(warm_up_strategy) = config.warm_up_trie_cache {
270			let storage_root = client.usage_info().chain.best_hash;
271			let backend_clone = backend.clone();
272
273			if warm_up_strategy.is_blocking() {
274				// We use the blocking strategy for testing purposes.
275				// So better to error out if it fails.
276				warm_up_trie_cache(backend_clone, storage_root)?;
277			} else {
278				task_manager.spawn_handle().spawn_blocking(
279					"warm-up-trie-cache",
280					None,
281					async move {
282						if let Err(e) = warm_up_trie_cache(backend_clone, storage_root) {
283							error!("Failed to warm up trie cache: {e}");
284						}
285					},
286				);
287			}
288		}
289
290		client
291	};
292
293	Ok((client, backend, keystore_container, task_manager))
294}
295
296fn child_info(key: Vec<u8>) -> Option<ChildInfo> {
297	let prefixed_key = PrefixedStorageKey::new(key);
298	ChildType::from_prefixed_key(&prefixed_key).and_then(|(child_type, storage_key)| {
299		(child_type == ChildType::ParentKeyId).then(|| ChildInfo::new_default(storage_key))
300	})
301}
302
303fn warm_up_trie_cache<TBl: BlockT>(
304	backend: Arc<TFullBackend<TBl>>,
305	storage_root: TBl::Hash,
306) -> Result<(), Error> {
307	use sc_client_api::backend::Backend;
308	use sp_state_machine::Backend as StateBackend;
309
310	let untrusted_state = || backend.state_at(storage_root, TrieCacheContext::Untrusted);
311	let trusted_state = || backend.state_at(storage_root, TrieCacheContext::Trusted);
312
313	debug!("Populating trie cache started",);
314	let start_time = std::time::Instant::now();
315	let mut keys_count = 0;
316	let mut child_keys_count = 0;
317	for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None)? {
318		if keys_count != 0 && keys_count % 100_000 == 0 {
319			debug!("{} keys and {} child keys have been warmed", keys_count, child_keys_count);
320		}
321		match child_info(key.0.clone()) {
322			Some(info) => {
323				for child_key in
324					KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None)?
325				{
326					if trusted_state()?
327						.child_storage(&info, &child_key.0)
328						.unwrap_or_default()
329						.is_none()
330					{
331						debug!("Child storage value unexpectedly empty: {child_key:?}");
332					}
333					child_keys_count += 1;
334				}
335			},
336			None => {
337				if trusted_state()?.storage(&key.0).unwrap_or_default().is_none() {
338					debug!("Storage value unexpectedly empty: {key:?}");
339				}
340				keys_count += 1;
341			},
342		}
343	}
344	debug!(
345		"Trie cache populated with {keys_count} keys and {child_keys_count} child keys in {} s",
346		start_time.elapsed().as_secs_f32()
347	);
348
349	Ok(())
350}
351
352/// Creates a [`NativeElseWasmExecutor`](sc_executor::NativeElseWasmExecutor) according to
353/// [`Configuration`].
354#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
355#[allow(deprecated)]
356pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
357	config: &Configuration,
358) -> sc_executor::NativeElseWasmExecutor<D> {
359	#[allow(deprecated)]
360	sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
361}
362
363/// Creates a [`WasmExecutor`] according to [`ExecutorConfiguration`].
364pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
365	let strategy = config
366		.default_heap_pages
367		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
368	WasmExecutor::<H>::builder()
369		.with_execution_method(config.wasm_method)
370		.with_onchain_heap_alloc_strategy(strategy)
371		.with_offchain_heap_alloc_strategy(strategy)
372		.with_max_runtime_instances(config.max_runtime_instances)
373		.with_runtime_cache_size(config.runtime_cache_size)
374		.build()
375}
376
377/// Create an instance of default DB-backend backend.
378pub fn new_db_backend<Block>(
379	settings: DatabaseSettings,
380) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
381where
382	Block: BlockT,
383{
384	const CANONICALIZATION_DELAY: u64 = 4096;
385
386	Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
387}
388
389/// Create an instance of client backed by given backend.
390pub fn new_client<E, Block, RA, G>(
391	backend: Arc<Backend<Block>>,
392	executor: E,
393	genesis_block_builder: G,
394	fork_blocks: ForkBlocks<Block>,
395	bad_blocks: BadBlocks<Block>,
396	execution_extensions: ExecutionExtensions<Block>,
397	spawn_handle: Box<dyn SpawnNamed>,
398	prometheus_registry: Option<Registry>,
399	telemetry: Option<TelemetryHandle>,
400	config: ClientConfig<Block>,
401) -> Result<
402	Client<
403		Backend<Block>,
404		crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
405		Block,
406		RA,
407	>,
408	sp_blockchain::Error,
409>
410where
411	Block: BlockT,
412	E: CodeExecutor + RuntimeVersionOf,
413	G: BuildGenesisBlock<
414		Block,
415		BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
416	>,
417{
418	let executor = crate::client::LocalCallExecutor::new(
419		backend.clone(),
420		executor,
421		config.clone(),
422		execution_extensions,
423	)?;
424
425	Client::new(
426		backend,
427		executor,
428		spawn_handle,
429		genesis_block_builder,
430		fork_blocks,
431		bad_blocks,
432		prometheus_registry,
433		telemetry,
434		config,
435	)
436}
437
438/// Parameters to pass into `build`.
439pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
440	/// The service configuration.
441	pub config: Configuration,
442	/// A shared client returned by `new_full_parts`.
443	pub client: Arc<TCl>,
444	/// A shared backend returned by `new_full_parts`.
445	pub backend: Arc<Backend>,
446	/// A task manager returned by `new_full_parts`.
447	pub task_manager: &'a mut TaskManager,
448	/// A shared keystore returned by `new_full_parts`.
449	pub keystore: KeystorePtr,
450	/// A shared transaction pool.
451	pub transaction_pool: Arc<TExPool>,
452	/// Builds additional [`RpcModule`]s that should be added to the server
453	pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
454	/// A shared network instance.
455	pub network: Arc<dyn sc_network::service::traits::NetworkService>,
456	/// A Sender for RPC requests.
457	pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
458	/// Controller for transactions handlers
459	pub tx_handler_controller:
460		sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
461	/// Syncing service.
462	pub sync_service: Arc<SyncingService<TBl>>,
463	/// Telemetry instance for this node.
464	pub telemetry: Option<&'a mut Telemetry>,
465	/// Optional [`TracingExecuteBlock`] handle.
466	///
467	/// Will be used by the `trace_block` RPC to execute the actual block.
468	pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
469}
470
471/// Spawn the tasks that are required to run a node.
472pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
473	SpawnTasksParams {
474		mut config,
475		task_manager,
476		client,
477		backend,
478		keystore,
479		transaction_pool,
480		rpc_builder,
481		network,
482		system_rpc_tx,
483		tx_handler_controller,
484		sync_service,
485		telemetry,
486		tracing_execute_block: execute_block,
487	}: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
488) -> Result<RpcHandlers, Error>
489where
490	TCl: ProvideRuntimeApi<TBl>
491		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
492		+ Chain<TBl>
493		+ BlockBackend<TBl>
494		+ BlockIdTo<TBl, Error = sp_blockchain::Error>
495		+ ProofProvider<TBl>
496		+ HeaderBackend<TBl>
497		+ BlockchainEvents<TBl>
498		+ ExecutorProvider<TBl>
499		+ UsageProvider<TBl>
500		+ StorageProvider<TBl, TBackend>
501		+ CallApiAt<TBl>
502		+ Send
503		+ 'static,
504	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
505		+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
506		+ sp_session::SessionKeys<TBl>
507		+ sp_api::ApiExt<TBl>,
508	TBl: BlockT,
509	TBl::Hash: Unpin,
510	TBl::Header: Unpin,
511	TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
512	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
513{
514	let chain_info = client.usage_info().chain;
515
516	sp_session::generate_initial_session_keys(
517		client.clone(),
518		chain_info.best_hash,
519		config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
520		keystore.clone(),
521	)
522	.map_err(|e| Error::Application(Box::new(e)))?;
523
524	let sysinfo = sc_sysinfo::gather_sysinfo();
525	sc_sysinfo::print_sysinfo(&sysinfo);
526
527	let telemetry = telemetry
528		.map(|telemetry| {
529			init_telemetry(
530				config.network.node_name.clone(),
531				config.impl_name.clone(),
532				config.impl_version.clone(),
533				config.chain_spec.name().to_string(),
534				config.role.is_authority(),
535				network.clone(),
536				client.clone(),
537				telemetry,
538				Some(sysinfo),
539			)
540		})
541		.transpose()?;
542
543	info!("📦 Highest known block at #{}", chain_info.best_number);
544
545	let spawn_handle = task_manager.spawn_handle();
546
547	// Inform the tx pool about imported and finalized blocks.
548	spawn_handle.spawn(
549		"txpool-notifications",
550		Some("transaction-pool"),
551		sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
552	);
553
554	spawn_handle.spawn(
555		"on-transaction-imported",
556		Some("transaction-pool"),
557		propagate_transaction_notifications(
558			transaction_pool.clone(),
559			tx_handler_controller,
560			telemetry.clone(),
561		),
562	);
563
564	// Prometheus metrics.
565	let metrics_service =
566		if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
567			// Set static metrics.
568			let metrics = MetricsService::with_prometheus(
569				telemetry,
570				&registry,
571				config.role,
572				&config.network.node_name,
573				&config.impl_version,
574			)?;
575			spawn_handle.spawn(
576				"prometheus-endpoint",
577				None,
578				prometheus_endpoint::init_prometheus(port, registry).map(drop),
579			);
580
581			metrics
582		} else {
583			MetricsService::new(telemetry)
584		};
585
586	// Periodically updated metrics and telemetry updates.
587	spawn_handle.spawn(
588		"telemetry-periodic-send",
589		None,
590		metrics_service.run(
591			client.clone(),
592			transaction_pool.clone(),
593			network.clone(),
594			sync_service.clone(),
595		),
596	);
597
598	let rpc_id_provider = config.rpc.id_provider.take();
599
600	// jsonrpsee RPC
601	// RPC-V2 specific metrics need to be registered before the RPC server is started,
602	// since we might have two instances running (one for the in-memory RPC and one for the network
603	// RPC).
604	let rpc_v2_metrics = config
605		.prometheus_registry()
606		.map(|registry| sc_rpc_spec_v2::transaction::TransactionMetrics::new(registry))
607		.transpose()?;
608
609	let gen_rpc_module = || {
610		gen_rpc_module(GenRpcModuleParams {
611			spawn_handle: task_manager.spawn_handle(),
612			client: client.clone(),
613			transaction_pool: transaction_pool.clone(),
614			keystore: keystore.clone(),
615			system_rpc_tx: system_rpc_tx.clone(),
616			impl_name: config.impl_name.clone(),
617			impl_version: config.impl_version.clone(),
618			chain_spec: config.chain_spec.as_ref(),
619			state_pruning: &config.state_pruning,
620			blocks_pruning: config.blocks_pruning,
621			backend: backend.clone(),
622			rpc_builder: &*rpc_builder,
623			metrics: rpc_v2_metrics.clone(),
624			tracing_execute_block: execute_block.clone(),
625		})
626	};
627
628	let rpc_server_handle = start_rpc_servers(
629		&config.rpc,
630		config.prometheus_registry(),
631		&config.tokio_handle,
632		gen_rpc_module,
633		rpc_id_provider,
634	)?;
635
636	let listen_addrs = rpc_server_handle
637		.listen_addrs()
638		.into_iter()
639		.map(|socket_addr| {
640			let mut multiaddr: Multiaddr = socket_addr.ip().into();
641			multiaddr.push(Protocol::Tcp(socket_addr.port()));
642			multiaddr
643		})
644		.collect();
645
646	let in_memory_rpc = {
647		let mut module = gen_rpc_module()?;
648		module.extensions_mut().insert(DenyUnsafe::No);
649		module
650	};
651
652	let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
653
654	// Spawn informant task
655	spawn_handle.spawn(
656		"informant",
657		None,
658		sc_informant::build(client.clone(), network, sync_service.clone()),
659	);
660
661	task_manager.keep_alive((config.base_path, rpc_server_handle));
662
663	Ok(in_memory_rpc_handle)
664}
665
666/// Returns a future that forwards imported transactions to the transaction networking protocol.
667pub async fn propagate_transaction_notifications<Block, ExPool>(
668	transaction_pool: Arc<ExPool>,
669	tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
670		<Block as BlockT>::Hash,
671	>,
672	telemetry: Option<TelemetryHandle>,
673) where
674	Block: BlockT,
675	ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
676{
677	const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);
678
679	// transaction notifications
680	let mut notifications = transaction_pool.import_notification_stream().fuse();
681	let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
682	let mut tx_imported = false;
683
684	loop {
685		select! {
686			notification = notifications.next() => {
687				let Some(hash) = notification else { return };
688
689				tx_handler_controller.propagate_transaction(hash);
690
691				tx_imported = true;
692			},
693			_ = timer => {
694				timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
695
696				if !tx_imported {
697					continue;
698				}
699
700				tx_imported = false;
701				let status = transaction_pool.status();
702
703				telemetry!(
704					telemetry;
705					SUBSTRATE_INFO;
706					"txpool.import";
707					"ready" => status.ready,
708					"future" => status.future,
709				);
710			}
711		}
712	}
713}
714
715/// Initialize telemetry with provided configuration and return telemetry handle
716pub fn init_telemetry<Block, Client, Network>(
717	name: String,
718	implementation: String,
719	version: String,
720	chain: String,
721	authority: bool,
722	network: Network,
723	client: Arc<Client>,
724	telemetry: &mut Telemetry,
725	sysinfo: Option<sc_telemetry::SysInfo>,
726) -> sc_telemetry::Result<TelemetryHandle>
727where
728	Block: BlockT,
729	Client: BlockBackend<Block>,
730	Network: NetworkStateInfo,
731{
732	let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
733	let connection_message = ConnectionMessage {
734		name,
735		implementation,
736		version,
737		target_os: sc_sysinfo::TARGET_OS.into(),
738		target_arch: sc_sysinfo::TARGET_ARCH.into(),
739		target_env: sc_sysinfo::TARGET_ENV.into(),
740		config: String::new(),
741		chain,
742		genesis_hash: format!("{:?}", genesis_hash),
743		authority,
744		startup_time: SystemTime::UNIX_EPOCH
745			.elapsed()
746			.map(|dur| dur.as_millis())
747			.unwrap_or(0)
748			.to_string(),
749		network_id: network.local_peer_id().to_base58(),
750		sysinfo,
751	};
752
753	telemetry.start_telemetry(connection_message)?;
754
755	Ok(telemetry.handle())
756}
757
758/// Parameters for [`gen_rpc_module`].
759pub struct GenRpcModuleParams<'a, TBl: BlockT, TBackend, TCl, TRpc, TExPool> {
760	/// The handle to spawn tasks.
761	pub spawn_handle: SpawnTaskHandle,
762	/// Access to the client.
763	pub client: Arc<TCl>,
764	/// The transaction pool.
765	pub transaction_pool: Arc<TExPool>,
766	/// Keystore handle.
767	pub keystore: KeystorePtr,
768	/// Sender for system requests.
769	pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
770	/// Implementation name of this node.
771	pub impl_name: String,
772	/// Implementation version of this node.
773	pub impl_version: String,
774	/// The chain spec.
775	pub chain_spec: &'a dyn ChainSpec,
776	/// Enabled pruning mode for this node.
777	pub state_pruning: &'a Option<PruningMode>,
778	/// Enabled blocks pruning mode.
779	pub blocks_pruning: BlocksPruning,
780	/// Backend of the node.
781	pub backend: Arc<TBackend>,
782	/// RPC builder.
783	pub rpc_builder: &'a dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>,
784	/// Transaction metrics handle.
785	pub metrics: Option<sc_rpc_spec_v2::transaction::TransactionMetrics>,
786	/// Optional [`TracingExecuteBlock`] handle.
787	///
788	/// Will be used by the `trace_block` RPC to execute the actual block.
789	pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
790}
791
792/// Generate RPC module using provided configuration
793pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
794	GenRpcModuleParams {
795		spawn_handle,
796		client,
797		transaction_pool,
798		keystore,
799		system_rpc_tx,
800		impl_name,
801		impl_version,
802		chain_spec,
803		state_pruning,
804		blocks_pruning,
805		backend,
806		rpc_builder,
807		metrics,
808		tracing_execute_block: execute_block,
809	}: GenRpcModuleParams<TBl, TBackend, TCl, TRpc, TExPool>,
810) -> Result<RpcModule<()>, Error>
811where
812	TBl: BlockT,
813	TCl: ProvideRuntimeApi<TBl>
814		+ BlockchainEvents<TBl>
815		+ HeaderBackend<TBl>
816		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
817		+ ExecutorProvider<TBl>
818		+ CallApiAt<TBl>
819		+ ProofProvider<TBl>
820		+ StorageProvider<TBl, TBackend>
821		+ BlockBackend<TBl>
822		+ Send
823		+ Sync
824		+ 'static,
825	TBackend: sc_client_api::backend::Backend<TBl> + 'static,
826	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
827	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
828	TBl::Hash: Unpin,
829	TBl::Header: Unpin,
830{
831	let system_info = sc_rpc::system::SystemInfo {
832		chain_name: chain_spec.name().into(),
833		impl_name,
834		impl_version,
835		properties: chain_spec.properties(),
836		chain_type: chain_spec.chain_type(),
837	};
838
839	let mut rpc_api = RpcModule::new(());
840	let task_executor = Arc::new(spawn_handle);
841
842	let (chain, state, child_state) = {
843		let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
844		let (state, child_state) =
845			sc_rpc::state::new_full(client.clone(), task_executor.clone(), execute_block);
846		let state = state.into_rpc();
847		let child_state = child_state.into_rpc();
848
849		(chain, state, child_state)
850	};
851
852	const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
853
854	let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
855		client.clone(),
856		transaction_pool.clone(),
857		task_executor.clone(),
858		MAX_TRANSACTION_PER_CONNECTION,
859	)
860	.into_rpc();
861
862	let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
863		client.clone(),
864		transaction_pool.clone(),
865		task_executor.clone(),
866		metrics,
867	)
868	.into_rpc();
869
870	let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
871		client.clone(),
872		backend.clone(),
873		task_executor.clone(),
874		// Defaults to sensible limits for the `ChainHead`.
875		sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
876	)
877	.into_rpc();
878
879	// Part of the RPC v2 spec.
880	// An archive node that can respond to the `archive` RPC-v2 queries is a node with:
881	// - state pruning in archive mode: The storage of blocks is kept around
882	// - block pruning in archive mode: The block's body is kept around
883	let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
884		blocks_pruning.is_archive();
885	let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
886	if is_archive_node {
887		let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
888			client.clone(),
889			backend.clone(),
890			genesis_hash,
891			task_executor.clone(),
892		)
893		.into_rpc();
894		rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
895	}
896
897	// ChainSpec RPC-v2.
898	let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
899		chain_spec.name().into(),
900		genesis_hash,
901		chain_spec.properties(),
902	)
903	.into_rpc();
904
905	let author = sc_rpc::author::Author::new(
906		client.clone(),
907		transaction_pool,
908		keystore,
909		task_executor.clone(),
910	)
911	.into_rpc();
912
913	let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
914
915	if let Some(storage) = backend.offchain_storage() {
916		let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
917
918		rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
919	}
920
921	// Part of the RPC v2 spec.
922	rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
923	rpc_api
924		.merge(transaction_broadcast_rpc_v2)
925		.map_err(|e| Error::Application(e.into()))?;
926	rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
927	rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
928
929	// Part of the old RPC spec.
930	rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
931	rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
932	rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
933	rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
934	rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
935	// Additional [`RpcModule`]s defined in the node to fit the specific blockchain
936	let extra_rpcs = rpc_builder(task_executor.clone())?;
937	rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
938
939	Ok(rpc_api)
940}
941
942/// Parameters to pass into [`build_network`].
943pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
944where
945	Block: BlockT,
946	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
947{
948	/// The service configuration.
949	pub config: &'a Configuration,
950	/// Full network configuration.
951	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
952	/// A shared client returned by `new_full_parts`.
953	pub client: Arc<Client>,
954	/// A shared transaction pool.
955	pub transaction_pool: Arc<TxPool>,
956	/// A handle for spawning tasks.
957	pub spawn_handle: SpawnTaskHandle,
958	/// An import queue.
959	pub import_queue: IQ,
960	/// A block announce validator builder.
961	pub block_announce_validator_builder: Option<
962		Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
963	>,
964	/// Optional warp sync config.
965	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
966	/// User specified block relay params. If not specified, the default
967	/// block request handler will be used.
968	pub block_relay: Option<BlockRelayParams<Block, Net>>,
969	/// Metrics.
970	pub metrics: NotificationMetrics,
971}
972
973/// Build the network service, the network status sinks and an RPC sender.
974pub fn build_network<Block, Net, TxPool, IQ, Client>(
975	params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
976) -> Result<
977	(
978		Arc<dyn sc_network::service::traits::NetworkService>,
979		TracingUnboundedSender<sc_rpc::system::Request<Block>>,
980		sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
981		Arc<SyncingService<Block>>,
982	),
983	Error,
984>
985where
986	Block: BlockT,
987	Client: ProvideRuntimeApi<Block>
988		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
989		+ Chain<Block>
990		+ BlockBackend<Block>
991		+ BlockIdTo<Block, Error = sp_blockchain::Error>
992		+ ProofProvider<Block>
993		+ HeaderBackend<Block>
994		+ BlockchainEvents<Block>
995		+ 'static,
996	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
997	IQ: ImportQueue<Block> + 'static,
998	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
999{
1000	let BuildNetworkParams {
1001		config,
1002		mut net_config,
1003		client,
1004		transaction_pool,
1005		spawn_handle,
1006		import_queue,
1007		block_announce_validator_builder,
1008		warp_sync_config,
1009		block_relay,
1010		metrics,
1011	} = params;
1012
1013	let block_announce_validator = if let Some(f) = block_announce_validator_builder {
1014		f(client.clone())
1015	} else {
1016		Box::new(DefaultBlockAnnounceValidator)
1017	};
1018
1019	let network_service_provider = NetworkServiceProvider::new();
1020	let protocol_id = config.protocol_id();
1021	let fork_id = config.chain_spec.fork_id();
1022	let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
1023
1024	let block_downloader = match block_relay {
1025		Some(params) => {
1026			let BlockRelayParams { mut server, downloader, request_response_config } = params;
1027
1028			net_config.add_request_response_protocol(request_response_config);
1029
1030			spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1031				server.run().await;
1032			});
1033
1034			downloader
1035		},
1036		None => build_default_block_downloader(
1037			&protocol_id,
1038			fork_id,
1039			&mut net_config,
1040			network_service_provider.handle(),
1041			Arc::clone(&client),
1042			config.network.default_peers_set.in_peers as usize +
1043				config.network.default_peers_set.out_peers as usize,
1044			&spawn_handle,
1045		),
1046	};
1047
1048	let syncing_strategy = build_polkadot_syncing_strategy(
1049		protocol_id.clone(),
1050		fork_id,
1051		&mut net_config,
1052		warp_sync_config,
1053		block_downloader,
1054		client.clone(),
1055		&spawn_handle,
1056		metrics_registry,
1057	)?;
1058
1059	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1060		Roles::from(&config.role),
1061		Arc::clone(&client),
1062		metrics_registry,
1063		metrics.clone(),
1064		&net_config,
1065		protocol_id.clone(),
1066		fork_id,
1067		block_announce_validator,
1068		syncing_strategy,
1069		network_service_provider.handle(),
1070		import_queue.service(),
1071		net_config.peer_store_handle(),
1072	)?;
1073
1074	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1075
1076	build_network_advanced(BuildNetworkAdvancedParams {
1077		role: config.role,
1078		protocol_id,
1079		fork_id,
1080		ipfs_server: config.network.ipfs_server,
1081		announce_block: config.announce_block,
1082		net_config,
1083		client,
1084		transaction_pool,
1085		spawn_handle,
1086		import_queue,
1087		sync_service,
1088		block_announce_config,
1089		network_service_provider,
1090		metrics_registry,
1091		metrics,
1092	})
1093}
1094
1095/// Parameters to pass into [`build_network_advanced`].
1096pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
1097where
1098	Block: BlockT,
1099	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1100{
1101	/// Role of the local node.
1102	pub role: Role,
1103	/// Protocol name prefix.
1104	pub protocol_id: ProtocolId,
1105	/// Fork ID.
1106	pub fork_id: Option<&'a str>,
1107	/// Enable serving block data over IPFS bitswap.
1108	pub ipfs_server: bool,
1109	/// Announce block automatically after they have been imported.
1110	pub announce_block: bool,
1111	/// Full network configuration.
1112	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1113	/// A shared client returned by `new_full_parts`.
1114	pub client: Arc<Client>,
1115	/// A shared transaction pool.
1116	pub transaction_pool: Arc<TxPool>,
1117	/// A handle for spawning tasks.
1118	pub spawn_handle: SpawnTaskHandle,
1119	/// An import queue.
1120	pub import_queue: IQ,
1121	/// Syncing service to communicate with syncing engine.
1122	pub sync_service: SyncingService<Block>,
1123	/// Block announce config.
1124	pub block_announce_config: Net::NotificationProtocolConfig,
1125	/// Network service provider to drive with network internally.
1126	pub network_service_provider: NetworkServiceProvider,
1127	/// Prometheus metrics registry.
1128	pub metrics_registry: Option<&'a Registry>,
1129	/// Metrics.
1130	pub metrics: NotificationMetrics,
1131}
1132
1133/// Build the network service, the network status sinks and an RPC sender, this is a lower-level
1134/// version of [`build_network`] for those needing more control.
1135pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
1136	params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
1137) -> Result<
1138	(
1139		Arc<dyn sc_network::service::traits::NetworkService>,
1140		TracingUnboundedSender<sc_rpc::system::Request<Block>>,
1141		sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1142		Arc<SyncingService<Block>>,
1143	),
1144	Error,
1145>
1146where
1147	Block: BlockT,
1148	Client: ProvideRuntimeApi<Block>
1149		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1150		+ Chain<Block>
1151		+ BlockBackend<Block>
1152		+ BlockIdTo<Block, Error = sp_blockchain::Error>
1153		+ ProofProvider<Block>
1154		+ HeaderBackend<Block>
1155		+ BlockchainEvents<Block>
1156		+ 'static,
1157	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1158	IQ: ImportQueue<Block> + 'static,
1159	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1160{
1161	let BuildNetworkAdvancedParams {
1162		role,
1163		protocol_id,
1164		fork_id,
1165		ipfs_server,
1166		announce_block,
1167		mut net_config,
1168		client,
1169		transaction_pool,
1170		spawn_handle,
1171		import_queue,
1172		sync_service,
1173		block_announce_config,
1174		network_service_provider,
1175		metrics_registry,
1176		metrics,
1177	} = params;
1178
1179	let genesis_hash = client.info().genesis_hash;
1180
1181	let light_client_request_protocol_config = {
1182		// Allow both outgoing and incoming requests.
1183		let (handler, protocol_config) =
1184			LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1185		spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1186		protocol_config
1187	};
1188
1189	// install request handlers to `FullNetworkConfiguration`
1190	net_config.add_request_response_protocol(light_client_request_protocol_config);
1191
1192	let bitswap_config = ipfs_server.then(|| {
1193		let (handler, config) = Net::bitswap_server(client.clone());
1194		spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1195
1196		config
1197	});
1198
1199	// Create transactions protocol and add it to the list of supported protocols of
1200	let (transactions_handler_proto, transactions_config) =
1201		sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1202			protocol_id.clone(),
1203			genesis_hash,
1204			fork_id,
1205			metrics.clone(),
1206			net_config.peer_store_handle(),
1207		);
1208	net_config.add_notification_protocol(transactions_config);
1209
1210	// Start task for `PeerStore`
1211	let peer_store = net_config.take_peer_store();
1212	spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1213
1214	let sync_service = Arc::new(sync_service);
1215
1216	let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1217		role,
1218		executor: {
1219			let spawn_handle = Clone::clone(&spawn_handle);
1220			Box::new(move |fut| {
1221				spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1222			})
1223		},
1224		network_config: net_config,
1225		genesis_hash,
1226		protocol_id,
1227		fork_id: fork_id.map(ToOwned::to_owned),
1228		metrics_registry: metrics_registry.cloned(),
1229		block_announce_config,
1230		bitswap_config,
1231		notification_metrics: metrics,
1232	};
1233
1234	let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1235	let network_mut = Net::new(network_params)?;
1236	let network = network_mut.network_service().clone();
1237
1238	let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1239		network.clone(),
1240		sync_service.clone(),
1241		Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1242		metrics_registry,
1243	)?;
1244	spawn_handle.spawn_blocking(
1245		"network-transactions-handler",
1246		Some("networking"),
1247		tx_handler.run(),
1248	);
1249
1250	spawn_handle.spawn_blocking(
1251		"chain-sync-network-service-provider",
1252		Some("networking"),
1253		network_service_provider.run(Arc::new(network.clone())),
1254	);
1255	spawn_handle.spawn("import-queue", None, {
1256		let sync_service = sync_service.clone();
1257
1258		async move { import_queue.run(sync_service.as_ref()).await }
1259	});
1260
1261	let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1262	spawn_handle.spawn(
1263		"system-rpc-handler",
1264		Some("networking"),
1265		build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1266			role,
1267			network_mut.network_service(),
1268			sync_service.clone(),
1269			client.clone(),
1270			system_rpc_rx,
1271			has_bootnodes,
1272		),
1273	);
1274
1275	let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1276		network_mut,
1277		client,
1278		sync_service.clone(),
1279		announce_block,
1280	);
1281
1282	// The network worker is responsible for gathering all network messages and processing
1283	// them. This is quite a heavy task, and at the time of the writing of this comment it
1284	// frequently happens that this future takes several seconds or in some situations
1285	// even more than a minute until it has processed its entire queue. This is clearly an
1286	// issue, and ideally we would like to fix the network future to take as little time as
1287	// possible, but we also take the extra harm-prevention measure to execute the networking
1288	// future using `spawn_blocking`.
1289	spawn_handle.spawn_blocking("network-worker", Some("networking"), future);
1290
1291	Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone()))
1292}
1293
1294/// Configuration for [`build_default_syncing_engine`].
1295pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1296where
1297	Block: BlockT,
1298	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1299{
1300	/// Role of the local node.
1301	pub role: Role,
1302	/// Protocol name prefix.
1303	pub protocol_id: ProtocolId,
1304	/// Fork ID.
1305	pub fork_id: Option<&'a str>,
1306	/// Full network configuration.
1307	pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1308	/// Validator for incoming block announcements.
1309	pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1310	/// Handle to communicate with `NetworkService`.
1311	pub network_service_handle: NetworkServiceHandle,
1312	/// Warp sync configuration (when used).
1313	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1314	/// A shared client returned by `new_full_parts`.
1315	pub client: Arc<Client>,
1316	/// Blocks import queue API.
1317	pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1318	/// Expected max total number of peer connections (in + out).
1319	pub num_peers_hint: usize,
1320	/// A handle for spawning tasks.
1321	pub spawn_handle: &'a SpawnTaskHandle,
1322	/// Prometheus metrics registry.
1323	pub metrics_registry: Option<&'a Registry>,
1324	/// Metrics.
1325	pub metrics: NotificationMetrics,
1326}
1327
1328/// Build default syncing engine using [`build_default_block_downloader`] and
1329/// [`build_polkadot_syncing_strategy`] internally.
1330pub fn build_default_syncing_engine<Block, Client, Net>(
1331	config: DefaultSyncingEngineConfig<Block, Client, Net>,
1332) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1333where
1334	Block: BlockT,
1335	Client: HeaderBackend<Block>
1336		+ BlockBackend<Block>
1337		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1338		+ ProofProvider<Block>
1339		+ Send
1340		+ Sync
1341		+ 'static,
1342	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1343{
1344	let DefaultSyncingEngineConfig {
1345		role,
1346		protocol_id,
1347		fork_id,
1348		net_config,
1349		block_announce_validator,
1350		network_service_handle,
1351		warp_sync_config,
1352		client,
1353		import_queue_service,
1354		num_peers_hint,
1355		spawn_handle,
1356		metrics_registry,
1357		metrics,
1358	} = config;
1359
1360	let block_downloader = build_default_block_downloader(
1361		&protocol_id,
1362		fork_id,
1363		net_config,
1364		network_service_handle.clone(),
1365		client.clone(),
1366		num_peers_hint,
1367		spawn_handle,
1368	);
1369	let syncing_strategy = build_polkadot_syncing_strategy(
1370		protocol_id.clone(),
1371		fork_id,
1372		net_config,
1373		warp_sync_config,
1374		block_downloader,
1375		client.clone(),
1376		spawn_handle,
1377		metrics_registry,
1378	)?;
1379
1380	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1381		Roles::from(&role),
1382		client,
1383		metrics_registry,
1384		metrics,
1385		&net_config,
1386		protocol_id,
1387		fork_id,
1388		block_announce_validator,
1389		syncing_strategy,
1390		network_service_handle,
1391		import_queue_service,
1392		net_config.peer_store_handle(),
1393	)?;
1394
1395	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1396
1397	Ok((sync_service, block_announce_config))
1398}
1399
1400/// Build default block downloader
1401pub fn build_default_block_downloader<Block, Client, Net>(
1402	protocol_id: &ProtocolId,
1403	fork_id: Option<&str>,
1404	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1405	network_service_handle: NetworkServiceHandle,
1406	client: Arc<Client>,
1407	num_peers_hint: usize,
1408	spawn_handle: &SpawnTaskHandle,
1409) -> Arc<dyn BlockDownloader<Block>>
1410where
1411	Block: BlockT,
1412	Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1413	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1414{
1415	// Custom protocol was not specified, use the default block handler.
1416	// Allow both outgoing and incoming requests.
1417	let BlockRelayParams { mut server, downloader, request_response_config } =
1418		BlockRequestHandler::new::<Net>(
1419			network_service_handle,
1420			&protocol_id,
1421			fork_id,
1422			client.clone(),
1423			num_peers_hint,
1424		);
1425
1426	spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1427		server.run().await;
1428	});
1429
1430	net_config.add_request_response_protocol(request_response_config);
1431
1432	downloader
1433}
1434
1435/// Build standard polkadot syncing strategy
1436pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1437	protocol_id: ProtocolId,
1438	fork_id: Option<&str>,
1439	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1440	warp_sync_config: Option<WarpSyncConfig<Block>>,
1441	block_downloader: Arc<dyn BlockDownloader<Block>>,
1442	client: Arc<Client>,
1443	spawn_handle: &SpawnTaskHandle,
1444	metrics_registry: Option<&Registry>,
1445) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1446where
1447	Block: BlockT,
1448	Client: HeaderBackend<Block>
1449		+ BlockBackend<Block>
1450		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1451		+ ProofProvider<Block>
1452		+ Send
1453		+ Sync
1454		+ 'static,
1455	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1456{
1457	if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1458		return Err("Warp sync enabled, but no warp sync provider configured.".into())
1459	}
1460
1461	if client.requires_full_sync() {
1462		match net_config.network_config.sync_mode {
1463			SyncMode::LightState { .. } =>
1464				return Err("Fast sync doesn't work for archive nodes".into()),
1465			SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1466			SyncMode::Full => {},
1467		}
1468	}
1469
1470	let genesis_hash = client.info().genesis_hash;
1471
1472	let (state_request_protocol_config, state_request_protocol_name) = {
1473		let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1474			net_config.network_config.default_peers_set.reserved_nodes.len();
1475		// Allow both outgoing and incoming requests.
1476		let (handler, protocol_config) =
1477			StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1478		let config_name = protocol_config.protocol_name().clone();
1479
1480		spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1481		(protocol_config, config_name)
1482	};
1483	net_config.add_request_response_protocol(state_request_protocol_config);
1484
1485	let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1486		Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1487			// Allow both outgoing and incoming requests.
1488			let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1489				protocol_id,
1490				genesis_hash,
1491				fork_id,
1492				warp_with_provider.clone(),
1493			);
1494			let config_name = protocol_config.protocol_name().clone();
1495
1496			spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1497			(Some(protocol_config), Some(config_name))
1498		},
1499		_ => (None, None),
1500	};
1501	if let Some(config) = warp_sync_protocol_config {
1502		net_config.add_request_response_protocol(config);
1503	}
1504
1505	let syncing_config = PolkadotSyncingStrategyConfig {
1506		mode: net_config.network_config.sync_mode,
1507		max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1508		max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1509		min_peers_to_start_warp_sync: net_config.network_config.min_peers_to_start_warp_sync,
1510		metrics_registry: metrics_registry.cloned(),
1511		state_request_protocol_name,
1512		block_downloader,
1513	};
1514	Ok(Box::new(PolkadotSyncingStrategy::new(
1515		syncing_config,
1516		client,
1517		warp_sync_config,
1518		warp_sync_protocol_name,
1519	)?))
1520}