Skip to main content

soil_service/
builder.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7use crate::{
8	build_network_future, build_system_rpc_future,
9	client::{Client, ClientConfig},
10	config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
11	error::Error,
12	metrics::MetricsService,
13	start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers,
14	SpawnEssentialTaskHandle, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
15};
16use futures::{select, FutureExt, StreamExt};
17use jsonrpsee::RpcModule;
18use log::{debug, error, info};
19use soil_prometheus::Registry;
20use soil_chain_spec::{get_extension, ChainSpec};
21use soil_client::blockchain::{HeaderBackend, HeaderMetadata};
22use soil_client::client_api::{
23	execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
24	BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, KeysIter, StorageProvider,
25	TrieCacheContext, UsageProvider,
26};
27use soil_client::consensus::block_validation::{
28	BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
29};
30use soil_client::db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
31use soil_client::executor::{
32	wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
33	WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
34};
35use soil_client::import::{ImportQueue, ImportQueueService};
36use soil_client::keystore::LocalKeystore;
37use soil_client::tracing::block::TracingExecuteBlock;
38use soil_client::transaction_pool::{MaintainedTransactionPool, TransactionPool};
39use soil_client::utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
40use soil_network::common::role::{Role, Roles};
41use soil_network::light::light_client_requests::handler::LightClientRequestHandler;
42use soil_network::sync::{
43	block_relay_protocol::{BlockDownloader, BlockRelayParams},
44	block_request_handler::BlockRequestHandler,
45	engine::SyncingEngine,
46	service::network::{NetworkServiceHandle, NetworkServiceProvider},
47	state_request_handler::StateRequestHandler,
48	strategy::{
49		polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
50		SyncingStrategy,
51	},
52	warp_request_handler::RequestHandler as WarpSyncRequestHandler,
53	SyncingService, WarpSyncConfig,
54};
55use soil_network::{
56	config::{FullNetworkConfiguration, ProtocolId, SyncMode},
57	multiaddr::Protocol,
58	service::{
59		traits::{PeerStore, RequestResponseConfig},
60		NotificationMetrics,
61	},
62	NetworkBackend, NetworkStateInfo,
63};
64use soil_rpc::v2::{
65	archive::ArchiveApiServer,
66	chain_head::ChainHeadApiServer,
67	chain_spec::ChainSpecApiServer,
68	transaction::{TransactionApiServer, TransactionBroadcastApiServer},
69};
70use soil_rpc::{
71	author::AuthorApiServer,
72	chain::ChainApiServer,
73	offchain::OffchainApiServer,
74	state::{ChildStateApiServer, StateApiServer},
75	system::SystemApiServer,
76	DenyUnsafe, SubscriptionTaskExecutor,
77};
78use soil_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
79use std::{
80	str::FromStr,
81	sync::Arc,
82	time::{Duration, SystemTime},
83};
84use subsoil::api::{CallApiAt, ProvideRuntimeApi};
85use subsoil::core::traits::{CodeExecutor, SpawnNamed};
86use subsoil::keystore::KeystorePtr;
87use subsoil::runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
88use subsoil::storage::{ChildInfo, ChildType, PrefixedStorageKey};
89
90/// Full client type.
91pub type TFullClient<TBl, TRtApi, TExec> =
92	Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
93
94/// Full client backend type.
95pub type TFullBackend<TBl> = Backend<TBl>;
96
97/// Full client call executor type.
98pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
99
100type TFullParts<TBl, TRtApi, TExec> =
101	(TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
102
103/// Construct a local keystore shareable container
104pub struct KeystoreContainer(Arc<LocalKeystore>);
105
106impl KeystoreContainer {
107	/// Construct KeystoreContainer
108	pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
109		let keystore = Arc::new(match config {
110			KeystoreConfig::Path { path, password } => {
111				LocalKeystore::open(path.clone(), password.clone())?
112			},
113			KeystoreConfig::InMemory => LocalKeystore::in_memory(),
114		});
115
116		Ok(Self(keystore))
117	}
118
119	/// Returns a shared reference to a dynamic `Keystore` trait implementation.
120	pub fn keystore(&self) -> KeystorePtr {
121		self.0.clone()
122	}
123
124	/// Returns a shared reference to the local keystore .
125	pub fn local_keystore(&self) -> Arc<LocalKeystore> {
126		self.0.clone()
127	}
128}
129
130/// Creates a new full client for the given config.
131pub fn new_full_client<TBl, TRtApi, TExec>(
132	config: &Configuration,
133	telemetry: Option<TelemetryHandle>,
134	executor: TExec,
135	pruning_filters: Vec<Arc<dyn soil_client::db::PruningFilter>>,
136) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
137where
138	TBl: BlockT,
139	TExec: CodeExecutor + RuntimeVersionOf + Clone,
140{
141	new_full_parts(config, telemetry, executor, pruning_filters).map(|parts| parts.0)
142}
143
144/// Create the initial parts of a full node with the default genesis block builder.
145///
146/// The `pruning_filters` parameter allows configuring which blocks should be preserved
147/// during pruning.
148pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
149	config: &Configuration,
150	telemetry: Option<TelemetryHandle>,
151	executor: TExec,
152	enable_import_proof_recording: bool,
153	pruning_filters: Vec<Arc<dyn soil_client::db::PruningFilter>>,
154) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
155where
156	TBl: BlockT,
157	TExec: CodeExecutor + RuntimeVersionOf + Clone,
158{
159	let mut db_config = config.db_config();
160	db_config.pruning_filters = pruning_filters;
161	let backend = new_db_backend(db_config)?;
162
163	let genesis_block_builder = GenesisBlockBuilder::new(
164		config.chain_spec.as_storage_builder(),
165		!config.no_genesis(),
166		backend.clone(),
167		executor.clone(),
168	)?;
169
170	new_full_parts_with_genesis_builder(
171		config,
172		telemetry,
173		executor,
174		backend,
175		genesis_block_builder,
176		enable_import_proof_recording,
177	)
178}
179
180/// Create the initial parts of a full node with the default genesis block builder.
181///
182/// The `pruning_filters` parameter allows configuring which blocks should be preserved
183/// during pruning.
184pub fn new_full_parts<TBl, TRtApi, TExec>(
185	config: &Configuration,
186	telemetry: Option<TelemetryHandle>,
187	executor: TExec,
188	pruning_filters: Vec<Arc<dyn soil_client::db::PruningFilter>>,
189) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
190where
191	TBl: BlockT,
192	TExec: CodeExecutor + RuntimeVersionOf + Clone,
193{
194	new_full_parts_record_import(config, telemetry, executor, false, pruning_filters)
195}
196
197/// Create the initial parts of a full node.
198pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
199	config: &Configuration,
200	telemetry: Option<TelemetryHandle>,
201	executor: TExec,
202	backend: Arc<TFullBackend<TBl>>,
203	genesis_block_builder: TBuildGenesisBlock,
204	enable_import_proof_recording: bool,
205) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
206where
207	TBl: BlockT,
208	TExec: CodeExecutor + RuntimeVersionOf + Clone,
209	TBuildGenesisBlock:
210		BuildGenesisBlock<
211			TBl,
212			BlockImportOperation = <Backend<TBl> as soil_client::client_api::backend::Backend<
213				TBl,
214			>>::BlockImportOperation,
215		>,
216{
217	let keystore_container = KeystoreContainer::new(&config.keystore)?;
218
219	let task_manager = {
220		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
221		TaskManager::new(config.tokio_handle.clone(), registry)?
222	};
223
224	let chain_spec = &config.chain_spec;
225	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
226		.cloned()
227		.unwrap_or_default();
228
229	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
230		.cloned()
231		.unwrap_or_default();
232
233	let client = {
234		let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
235
236		let wasm_runtime_substitutes = config
237			.chain_spec
238			.code_substitutes()
239			.into_iter()
240			.map(|(n, c)| {
241				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
242					Error::Application(Box::from(format!(
243						"Failed to parse `{}` as block number for code substitutes. \
244						 In an old version the key for code substitute was a block hash. \
245						 Please update the chain spec to a version that is compatible with your node.",
246						n
247					)))
248				})?;
249				Ok((number, c))
250			})
251			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
252
253		let client = new_client(
254			backend.clone(),
255			executor,
256			genesis_block_builder,
257			fork_blocks,
258			bad_blocks,
259			extensions,
260			Box::new(task_manager.spawn_handle()),
261			config.prometheus_config.as_ref().map(|config| config.registry.clone()),
262			telemetry,
263			ClientConfig {
264				offchain_worker_enabled: config.offchain_worker.enabled,
265				offchain_indexing_api: config.offchain_worker.indexing_enabled,
266				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
267				no_genesis: config.no_genesis(),
268				wasm_runtime_substitutes,
269				enable_import_proof_recording,
270			},
271		)?;
272
273		if let Some(warm_up_strategy) = config.warm_up_trie_cache {
274			let storage_root = client.usage_info().chain.best_hash;
275			let backend_clone = backend.clone();
276
277			if warm_up_strategy.is_blocking() {
278				// We use the blocking strategy for testing purposes.
279				// So better to error out if it fails.
280				warm_up_trie_cache(backend_clone, storage_root)?;
281			} else {
282				task_manager.spawn_handle().spawn_blocking(
283					"warm-up-trie-cache",
284					None,
285					async move {
286						if let Err(e) = warm_up_trie_cache(backend_clone, storage_root) {
287							error!("Failed to warm up trie cache: {e}");
288						}
289					},
290				);
291			}
292		}
293
294		client
295	};
296
297	Ok((client, backend, keystore_container, task_manager))
298}
299
300fn child_info(key: Vec<u8>) -> Option<ChildInfo> {
301	let prefixed_key = PrefixedStorageKey::new(key);
302	ChildType::from_prefixed_key(&prefixed_key).and_then(|(child_type, storage_key)| {
303		(child_type == ChildType::ParentKeyId).then(|| ChildInfo::new_default(storage_key))
304	})
305}
306
307fn warm_up_trie_cache<TBl: BlockT>(
308	backend: Arc<TFullBackend<TBl>>,
309	storage_root: TBl::Hash,
310) -> Result<(), Error> {
311	use soil_client::client_api::backend::Backend;
312	use subsoil::state_machine::Backend as StateBackend;
313
314	let untrusted_state = || backend.state_at(storage_root, TrieCacheContext::Untrusted);
315	let trusted_state = || backend.state_at(storage_root, TrieCacheContext::Trusted);
316
317	debug!("Populating trie cache started",);
318	let start_time = std::time::Instant::now();
319	let mut keys_count = 0;
320	let mut child_keys_count = 0;
321	for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None)? {
322		if keys_count != 0 && keys_count % 100_000 == 0 {
323			debug!("{} keys and {} child keys have been warmed", keys_count, child_keys_count);
324		}
325		match child_info(key.0.clone()) {
326			Some(info) => {
327				for child_key in
328					KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None)?
329				{
330					if trusted_state()?
331						.child_storage(&info, &child_key.0)
332						.unwrap_or_default()
333						.is_none()
334					{
335						debug!("Child storage value unexpectedly empty: {child_key:?}");
336					}
337					child_keys_count += 1;
338				}
339			},
340			None => {
341				if trusted_state()?.storage(&key.0).unwrap_or_default().is_none() {
342					debug!("Storage value unexpectedly empty: {key:?}");
343				}
344				keys_count += 1;
345			},
346		}
347	}
348	debug!(
349		"Trie cache populated with {keys_count} keys and {child_keys_count} child keys in {} s",
350		start_time.elapsed().as_secs_f32()
351	);
352
353	Ok(())
354}
355
356/// Creates a [`NativeElseWasmExecutor`](soil_client::executor::NativeElseWasmExecutor) according to
357/// [`Configuration`].
358#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
359#[allow(deprecated)]
360pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
361	config: &Configuration,
362) -> soil_client::executor::NativeElseWasmExecutor<D> {
363	#[allow(deprecated)]
364	soil_client::executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(
365		&config.executor,
366	))
367}
368
369/// Creates a [`WasmExecutor`] according to [`ExecutorConfiguration`].
370pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
371	let strategy = config
372		.default_heap_pages
373		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
374	WasmExecutor::<H>::builder()
375		.with_execution_method(config.wasm_method)
376		.with_onchain_heap_alloc_strategy(strategy)
377		.with_offchain_heap_alloc_strategy(strategy)
378		.with_max_runtime_instances(config.max_runtime_instances)
379		.with_runtime_cache_size(config.runtime_cache_size)
380		.build()
381}
382
383/// Create an instance of the default DB-backend.
384///
385/// Pruning filters can be configured via `settings.pruning_filters`.
386/// If any filter returns `true` for a block's justifications, the block will not be pruned.
387pub fn new_db_backend<Block>(
388	settings: DatabaseSettings,
389) -> Result<Arc<Backend<Block>>, soil_client::blockchain::Error>
390where
391	Block: BlockT,
392{
393	const CANONICALIZATION_DELAY: u64 = 4096;
394
395	Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
396}
397
398/// Create an instance of client backed by given backend.
399pub fn new_client<E, Block, RA, G>(
400	backend: Arc<Backend<Block>>,
401	executor: E,
402	genesis_block_builder: G,
403	fork_blocks: ForkBlocks<Block>,
404	bad_blocks: BadBlocks<Block>,
405	execution_extensions: ExecutionExtensions<Block>,
406	spawn_handle: Box<dyn SpawnNamed>,
407	prometheus_registry: Option<Registry>,
408	telemetry: Option<TelemetryHandle>,
409	config: ClientConfig<Block>,
410) -> Result<
411	Client<Backend<Block>, crate::client::LocalCallExecutor<Block, Backend<Block>, E>, Block, RA>,
412	soil_client::blockchain::Error,
413>
414where
415	Block: BlockT,
416	E: CodeExecutor + RuntimeVersionOf,
417	G: BuildGenesisBlock<
418		Block,
419		BlockImportOperation = <Backend<Block> as soil_client::client_api::backend::Backend<
420			Block,
421		>>::BlockImportOperation,
422	>,
423{
424	let executor = crate::client::LocalCallExecutor::new(
425		backend.clone(),
426		executor,
427		config.clone(),
428		execution_extensions,
429	)?;
430
431	Client::new(
432		backend,
433		executor,
434		spawn_handle,
435		genesis_block_builder,
436		fork_blocks,
437		bad_blocks,
438		prometheus_registry,
439		telemetry,
440		config,
441	)
442}
443
444/// Parameters to pass into `build`.
445pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
446	/// The service configuration.
447	pub config: Configuration,
448	/// A shared client returned by `new_full_parts`.
449	pub client: Arc<TCl>,
450	/// A shared backend returned by `new_full_parts`.
451	pub backend: Arc<Backend>,
452	/// A task manager returned by `new_full_parts`.
453	pub task_manager: &'a mut TaskManager,
454	/// A shared keystore returned by `new_full_parts`.
455	pub keystore: KeystorePtr,
456	/// A shared transaction pool.
457	pub transaction_pool: Arc<TExPool>,
458	/// Builds additional [`RpcModule`]s that should be added to the server
459	pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
460	/// A shared network instance.
461	pub network: Arc<dyn soil_network::service::traits::NetworkService>,
462	/// A Sender for RPC requests.
463	pub system_rpc_tx: TracingUnboundedSender<soil_rpc::system::Request<TBl>>,
464	/// Controller for transactions handlers
465	pub tx_handler_controller:
466		soil_network::transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
467	/// Syncing service.
468	pub sync_service: Arc<SyncingService<TBl>>,
469	/// Telemetry instance for this node.
470	pub telemetry: Option<&'a mut Telemetry>,
471	/// Optional [`TracingExecuteBlock`] handle.
472	///
473	/// Will be used by the `trace_block` RPC to execute the actual block.
474	pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
475}
476
477/// Spawn the tasks that are required to run a node.
478pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
479	SpawnTasksParams {
480		mut config,
481		task_manager,
482		client,
483		backend,
484		keystore,
485		transaction_pool,
486		rpc_builder,
487		network,
488		system_rpc_tx,
489		tx_handler_controller,
490		sync_service,
491		telemetry,
492		tracing_execute_block: execute_block,
493	}: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
494) -> Result<RpcHandlers, Error>
495where
496	TCl: ProvideRuntimeApi<TBl>
497		+ HeaderMetadata<TBl, Error = soil_client::blockchain::Error>
498		+ Chain<TBl>
499		+ BlockBackend<TBl>
500		+ BlockIdTo<TBl, Error = soil_client::blockchain::Error>
501		+ ProofProvider<TBl>
502		+ HeaderBackend<TBl>
503		+ BlockchainEvents<TBl>
504		+ ExecutorProvider<TBl>
505		+ UsageProvider<TBl>
506		+ StorageProvider<TBl, TBackend>
507		+ CallApiAt<TBl>
508		+ Send
509		+ 'static,
510	<TCl as ProvideRuntimeApi<TBl>>::Api:
511		subsoil::api::Metadata<TBl>
512			+ subsoil::txpool::runtime_api::TaggedTransactionQueue<TBl>
513			+ subsoil::session::SessionKeys<TBl>
514			+ subsoil::api::ApiExt<TBl>,
515	TBl: BlockT,
516	TBl::Hash: Unpin,
517	TBl::Header: Unpin,
518	TBackend: 'static + soil_client::client_api::backend::Backend<TBl> + Send,
519	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
520{
521	let chain_info = client.usage_info().chain;
522
523	subsoil::session::generate_initial_session_keys(
524		client.clone(),
525		chain_info.best_hash,
526		config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
527		keystore.clone(),
528	)
529	.map_err(|e| Error::Application(Box::new(e)))?;
530
531	let sysinfo = crate::sysinfo::gather_sysinfo();
532	crate::sysinfo::print_sysinfo(&sysinfo);
533
534	let telemetry = telemetry
535		.map(|telemetry| {
536			init_telemetry(
537				config.network.node_name.clone(),
538				config.impl_name.clone(),
539				config.impl_version.clone(),
540				config.chain_spec.name().to_string(),
541				config.role.is_authority(),
542				network.clone(),
543				client.clone(),
544				telemetry,
545				Some(sysinfo),
546			)
547		})
548		.transpose()?;
549
550	info!("📦 Highest known block at #{}", chain_info.best_number);
551
552	let spawn_handle = task_manager.spawn_handle();
553
554	// Inform the tx pool about imported and finalized blocks.
555	spawn_handle.spawn(
556		"txpool-notifications",
557		Some("transaction-pool"),
558		soil_txpool::notification_future(client.clone(), transaction_pool.clone()),
559	);
560
561	spawn_handle.spawn(
562		"on-transaction-imported",
563		Some("transaction-pool"),
564		propagate_transaction_notifications(
565			transaction_pool.clone(),
566			tx_handler_controller,
567			telemetry.clone(),
568		),
569	);
570
571	// Prometheus metrics.
572	let metrics_service =
573		if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
574			// Set static metrics.
575			let metrics = MetricsService::with_prometheus(
576				telemetry,
577				&registry,
578				config.role,
579				&config.network.node_name,
580				&config.impl_version,
581			)?;
582			spawn_handle.spawn(
583				"soil-prometheus",
584				None,
585				soil_prometheus::init_prometheus(port, registry).map(drop),
586			);
587
588			metrics
589		} else {
590			MetricsService::new(telemetry)
591		};
592
593	// Periodically updated metrics and telemetry updates.
594	spawn_handle.spawn(
595		"telemetry-periodic-send",
596		None,
597		metrics_service.run(
598			client.clone(),
599			transaction_pool.clone(),
600			network.clone(),
601			sync_service.clone(),
602		),
603	);
604
605	let rpc_id_provider = config.rpc.id_provider.take();
606
607	// jsonrpsee RPC
608	// RPC-V2 specific metrics need to be registered before the RPC server is started,
609	// since we might have two instances running (one for the in-memory RPC and one for the network
610	// RPC).
611	let rpc_v2_metrics = config
612		.prometheus_registry()
613		.map(|registry| soil_rpc::v2::transaction::TransactionMetrics::new(registry))
614		.transpose()?;
615
616	let gen_rpc_module = || {
617		gen_rpc_module(GenRpcModuleParams {
618			spawn_handle: task_manager.spawn_handle(),
619			client: client.clone(),
620			transaction_pool: transaction_pool.clone(),
621			keystore: keystore.clone(),
622			system_rpc_tx: system_rpc_tx.clone(),
623			impl_name: config.impl_name.clone(),
624			impl_version: config.impl_version.clone(),
625			chain_spec: config.chain_spec.as_ref(),
626			state_pruning: &config.state_pruning,
627			blocks_pruning: config.blocks_pruning,
628			backend: backend.clone(),
629			rpc_builder: &*rpc_builder,
630			metrics: rpc_v2_metrics.clone(),
631			tracing_execute_block: execute_block.clone(),
632		})
633	};
634
635	let rpc_server_handle = start_rpc_servers(
636		&config.rpc,
637		config.prometheus_registry(),
638		&config.tokio_handle,
639		gen_rpc_module,
640		rpc_id_provider,
641	)?;
642
643	let listen_addrs = rpc_server_handle
644		.listen_addrs()
645		.into_iter()
646		.map(|socket_addr| {
647			let mut multiaddr: Multiaddr = socket_addr.ip().into();
648			multiaddr.push(Protocol::Tcp(socket_addr.port()));
649			multiaddr
650		})
651		.collect();
652
653	let in_memory_rpc = {
654		let mut module = gen_rpc_module()?;
655		module.extensions_mut().insert(DenyUnsafe::No);
656		module
657	};
658
659	let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
660
661	// Spawn informant task
662	spawn_handle.spawn(
663		"informant",
664		None,
665		crate::informant::build(client.clone(), network, sync_service.clone()),
666	);
667
668	task_manager.keep_alive((config.base_path, rpc_server_handle));
669
670	Ok(in_memory_rpc_handle)
671}
672
673/// Returns a future that forwards imported transactions to the transaction networking protocol.
674pub async fn propagate_transaction_notifications<Block, ExPool>(
675	transaction_pool: Arc<ExPool>,
676	tx_handler_controller: soil_network::transactions::TransactionsHandlerController<
677		<Block as BlockT>::Hash,
678	>,
679	telemetry: Option<TelemetryHandle>,
680) where
681	Block: BlockT,
682	ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
683{
684	const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);
685
686	// transaction notifications
687	let mut notifications = transaction_pool.import_notification_stream().fuse();
688	let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
689	let mut tx_imported = false;
690
691	loop {
692		select! {
693			notification = notifications.next() => {
694				let Some(hash) = notification else { return };
695
696				tx_handler_controller.propagate_transaction(hash);
697
698				tx_imported = true;
699			},
700			_ = timer => {
701				timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
702
703				if !tx_imported {
704					continue;
705				}
706
707				tx_imported = false;
708				let status = transaction_pool.status();
709
710				telemetry!(
711					telemetry;
712					SUBSTRATE_INFO;
713					"txpool.import";
714					"ready" => status.ready,
715					"future" => status.future,
716				);
717			}
718		}
719	}
720}
721
722/// Initialize telemetry with provided configuration and return telemetry handle
723pub fn init_telemetry<Block, Client, Network>(
724	name: String,
725	implementation: String,
726	version: String,
727	chain: String,
728	authority: bool,
729	network: Network,
730	client: Arc<Client>,
731	telemetry: &mut Telemetry,
732	sysinfo: Option<soil_telemetry::SysInfo>,
733) -> soil_telemetry::Result<TelemetryHandle>
734where
735	Block: BlockT,
736	Client: BlockBackend<Block>,
737	Network: NetworkStateInfo,
738{
739	let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
740	let connection_message = ConnectionMessage {
741		name,
742		implementation,
743		version,
744		target_os: crate::sysinfo::TARGET_OS.into(),
745		target_arch: crate::sysinfo::TARGET_ARCH.into(),
746		target_env: crate::sysinfo::TARGET_ENV.into(),
747		config: String::new(),
748		chain,
749		genesis_hash: format!("{:?}", genesis_hash),
750		authority,
751		startup_time: SystemTime::UNIX_EPOCH
752			.elapsed()
753			.map(|dur| dur.as_millis())
754			.unwrap_or(0)
755			.to_string(),
756		network_id: network.local_peer_id().to_base58(),
757		sysinfo,
758	};
759
760	telemetry.start_telemetry(connection_message)?;
761
762	Ok(telemetry.handle())
763}
764
765/// Parameters for [`gen_rpc_module`].
766pub struct GenRpcModuleParams<'a, TBl: BlockT, TBackend, TCl, TRpc, TExPool> {
767	/// The handle to spawn tasks.
768	pub spawn_handle: SpawnTaskHandle,
769	/// Access to the client.
770	pub client: Arc<TCl>,
771	/// The transaction pool.
772	pub transaction_pool: Arc<TExPool>,
773	/// Keystore handle.
774	pub keystore: KeystorePtr,
775	/// Sender for system requests.
776	pub system_rpc_tx: TracingUnboundedSender<soil_rpc::system::Request<TBl>>,
777	/// Implementation name of this node.
778	pub impl_name: String,
779	/// Implementation version of this node.
780	pub impl_version: String,
781	/// The chain spec.
782	pub chain_spec: &'a dyn ChainSpec,
783	/// Enabled pruning mode for this node.
784	pub state_pruning: &'a Option<PruningMode>,
785	/// Enabled blocks pruning mode.
786	pub blocks_pruning: BlocksPruning,
787	/// Backend of the node.
788	pub backend: Arc<TBackend>,
789	/// RPC builder.
790	pub rpc_builder: &'a dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>,
791	/// Transaction metrics handle.
792	pub metrics: Option<soil_rpc::v2::transaction::TransactionMetrics>,
793	/// Optional [`TracingExecuteBlock`] handle.
794	///
795	/// Will be used by the `trace_block` RPC to execute the actual block.
796	pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
797}
798
799/// Generate RPC module using provided configuration
800pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
801	GenRpcModuleParams {
802		spawn_handle,
803		client,
804		transaction_pool,
805		keystore,
806		system_rpc_tx,
807		impl_name,
808		impl_version,
809		chain_spec,
810		state_pruning,
811		blocks_pruning,
812		backend,
813		rpc_builder,
814		metrics,
815		tracing_execute_block: execute_block,
816	}: GenRpcModuleParams<TBl, TBackend, TCl, TRpc, TExPool>,
817) -> Result<RpcModule<()>, Error>
818where
819	TBl: BlockT,
820	TCl: ProvideRuntimeApi<TBl>
821		+ BlockchainEvents<TBl>
822		+ HeaderBackend<TBl>
823		+ HeaderMetadata<TBl, Error = soil_client::blockchain::Error>
824		+ ExecutorProvider<TBl>
825		+ CallApiAt<TBl>
826		+ ProofProvider<TBl>
827		+ StorageProvider<TBl, TBackend>
828		+ BlockBackend<TBl>
829		+ Send
830		+ Sync
831		+ 'static,
832	TBackend: soil_client::client_api::backend::Backend<TBl> + 'static,
833	<TCl as ProvideRuntimeApi<TBl>>::Api:
834		subsoil::session::SessionKeys<TBl> + subsoil::api::Metadata<TBl>,
835	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
836	TBl::Hash: Unpin,
837	TBl::Header: Unpin,
838{
839	let system_info = soil_rpc::system::SystemInfo {
840		chain_name: chain_spec.name().into(),
841		impl_name,
842		impl_version,
843		properties: chain_spec.properties(),
844		chain_type: chain_spec.chain_type(),
845	};
846
847	let mut rpc_api = RpcModule::new(());
848	let task_executor = Arc::new(spawn_handle);
849
850	let (chain, state, child_state) = {
851		let chain = soil_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
852		let (state, child_state) =
853			soil_rpc::state::new_full(client.clone(), task_executor.clone(), execute_block);
854		let state = state.into_rpc();
855		let child_state = child_state.into_rpc();
856
857		(chain, state, child_state)
858	};
859
860	const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
861
862	let transaction_broadcast_rpc_v2 = soil_rpc::v2::transaction::TransactionBroadcast::new(
863		client.clone(),
864		transaction_pool.clone(),
865		task_executor.clone(),
866		MAX_TRANSACTION_PER_CONNECTION,
867	)
868	.into_rpc();
869
870	let transaction_v2 = soil_rpc::v2::transaction::Transaction::new(
871		client.clone(),
872		transaction_pool.clone(),
873		task_executor.clone(),
874		metrics,
875	)
876	.into_rpc();
877
878	let chain_head_v2 = soil_rpc::v2::chain_head::ChainHead::new(
879		client.clone(),
880		backend.clone(),
881		task_executor.clone(),
882		// Defaults to sensible limits for the `ChainHead`.
883		soil_rpc::v2::chain_head::ChainHeadConfig::default(),
884	)
885	.into_rpc();
886
887	// Part of the RPC v2 spec.
888	// An archive node that can respond to the `archive` RPC-v2 queries is a node with:
889	// - state pruning in archive mode: The storage of blocks is kept around
890	// - block pruning in archive mode: The block's body is kept around
891	let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false)
892		&& blocks_pruning.is_archive();
893	let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
894	if is_archive_node {
895		let archive_v2 = soil_rpc::v2::archive::Archive::new(
896			client.clone(),
897			backend.clone(),
898			genesis_hash,
899			task_executor.clone(),
900		)
901		.into_rpc();
902		rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
903	}
904
905	// ChainSpec RPC-v2.
906	let chain_spec_v2 = soil_rpc::v2::chain_spec::ChainSpec::new(
907		chain_spec.name().into(),
908		genesis_hash,
909		chain_spec.properties(),
910	)
911	.into_rpc();
912
913	let author = soil_rpc::author::Author::new(
914		client.clone(),
915		transaction_pool,
916		keystore,
917		task_executor.clone(),
918	)
919	.into_rpc();
920
921	let system = soil_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
922
923	if let Some(storage) = backend.offchain_storage() {
924		let offchain = soil_rpc::offchain::Offchain::new(storage).into_rpc();
925
926		rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
927	}
928
929	// Part of the RPC v2 spec.
930	rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
931	rpc_api
932		.merge(transaction_broadcast_rpc_v2)
933		.map_err(|e| Error::Application(e.into()))?;
934	rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
935	rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
936
937	// Part of the old RPC spec.
938	rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
939	rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
940	rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
941	rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
942	rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
943	// Additional [`RpcModule`]s defined in the node to fit the specific blockchain
944	let extra_rpcs = rpc_builder(task_executor.clone())?;
945	rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
946
947	Ok(rpc_api)
948}
949
950/// Parameters to pass into [`build_network`].
951pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
952where
953	Block: BlockT,
954	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
955{
956	/// The service configuration.
957	pub config: &'a Configuration,
958	/// Full network configuration.
959	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
960	/// A shared client returned by `new_full_parts`.
961	pub client: Arc<Client>,
962	/// A shared transaction pool.
963	pub transaction_pool: Arc<TxPool>,
964	/// A handle for spawning tasks.
965	pub spawn_handle: SpawnTaskHandle,
966	/// A handle for spawning essential tasks.
967	pub spawn_essential_handle: SpawnEssentialTaskHandle,
968	/// An import queue.
969	pub import_queue: IQ,
970	/// A block announce validator builder.
971	pub block_announce_validator_builder: Option<
972		Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
973	>,
974	/// Optional warp sync config.
975	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
976	/// User specified block relay params. If not specified, the default
977	/// block request handler will be used.
978	pub block_relay: Option<BlockRelayParams<Block, Net>>,
979	/// Metrics.
980	pub metrics: NotificationMetrics,
981}
982
983/// Build the network service, the network status sinks and an RPC sender.
984pub fn build_network<Block, Net, TxPool, IQ, Client>(
985	params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
986) -> Result<
987	(
988		Arc<dyn soil_network::service::traits::NetworkService>,
989		TracingUnboundedSender<soil_rpc::system::Request<Block>>,
990		soil_network::transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
991		Arc<SyncingService<Block>>,
992	),
993	Error,
994>
995where
996	Block: BlockT,
997	Client: ProvideRuntimeApi<Block>
998		+ HeaderMetadata<Block, Error = soil_client::blockchain::Error>
999		+ Chain<Block>
1000		+ BlockBackend<Block>
1001		+ BlockIdTo<Block, Error = soil_client::blockchain::Error>
1002		+ ProofProvider<Block>
1003		+ HeaderBackend<Block>
1004		+ BlockchainEvents<Block>
1005		+ 'static,
1006	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1007	IQ: ImportQueue<Block> + 'static,
1008	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1009{
1010	let BuildNetworkParams {
1011		config,
1012		mut net_config,
1013		client,
1014		transaction_pool,
1015		spawn_handle,
1016		spawn_essential_handle,
1017		import_queue,
1018		block_announce_validator_builder,
1019		warp_sync_config,
1020		block_relay,
1021		metrics,
1022	} = params;
1023
1024	let block_announce_validator = if let Some(f) = block_announce_validator_builder {
1025		f(client.clone())
1026	} else {
1027		Box::new(DefaultBlockAnnounceValidator)
1028	};
1029
1030	let network_service_provider = NetworkServiceProvider::new();
1031	let protocol_id = config.protocol_id();
1032	let fork_id = config.chain_spec.fork_id();
1033	let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
1034
1035	let block_downloader = match block_relay {
1036		Some(params) => {
1037			let BlockRelayParams { mut server, downloader, request_response_config } = params;
1038
1039			net_config.add_request_response_protocol(request_response_config);
1040
1041			spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1042				server.run().await;
1043			});
1044
1045			downloader
1046		},
1047		None => build_default_block_downloader(
1048			&protocol_id,
1049			fork_id,
1050			&mut net_config,
1051			network_service_provider.handle(),
1052			Arc::clone(&client),
1053			config.network.default_peers_set.in_peers as usize
1054				+ config.network.default_peers_set.out_peers as usize,
1055			&spawn_handle,
1056		),
1057	};
1058
1059	let syncing_strategy = build_polkadot_syncing_strategy(
1060		protocol_id.clone(),
1061		fork_id,
1062		&mut net_config,
1063		warp_sync_config,
1064		block_downloader,
1065		client.clone(),
1066		&spawn_handle,
1067		metrics_registry,
1068		config.blocks_pruning.is_archive(),
1069	)?;
1070
1071	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1072		Roles::from(&config.role),
1073		Arc::clone(&client),
1074		metrics_registry,
1075		metrics.clone(),
1076		&net_config,
1077		protocol_id.clone(),
1078		fork_id,
1079		block_announce_validator,
1080		syncing_strategy,
1081		network_service_provider.handle(),
1082		import_queue.service(),
1083		net_config.peer_store_handle(),
1084	)?;
1085
1086	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1087
1088	build_network_advanced(BuildNetworkAdvancedParams {
1089		role: config.role,
1090		protocol_id,
1091		fork_id,
1092		ipfs_server: config.network.ipfs_server,
1093		announce_block: config.announce_block,
1094		net_config,
1095		client,
1096		transaction_pool,
1097		spawn_handle,
1098		spawn_essential_handle,
1099		import_queue,
1100		sync_service,
1101		block_announce_config,
1102		network_service_provider,
1103		metrics_registry,
1104		metrics,
1105	})
1106}
1107
1108/// Parameters to pass into [`build_network_advanced`].
1109pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
1110where
1111	Block: BlockT,
1112	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1113{
1114	/// Role of the local node.
1115	pub role: Role,
1116	/// Protocol name prefix.
1117	pub protocol_id: ProtocolId,
1118	/// Fork ID.
1119	pub fork_id: Option<&'a str>,
1120	/// Enable serving block data over IPFS bitswap.
1121	pub ipfs_server: bool,
1122	/// Announce block automatically after they have been imported.
1123	pub announce_block: bool,
1124	/// Full network configuration.
1125	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1126	/// A shared client returned by `new_full_parts`.
1127	pub client: Arc<Client>,
1128	/// A shared transaction pool.
1129	pub transaction_pool: Arc<TxPool>,
1130	/// A handle for spawning tasks.
1131	pub spawn_handle: SpawnTaskHandle,
1132	/// A handle for spawning essential tasks.
1133	pub spawn_essential_handle: SpawnEssentialTaskHandle,
1134	/// An import queue.
1135	pub import_queue: IQ,
1136	/// Syncing service to communicate with syncing engine.
1137	pub sync_service: SyncingService<Block>,
1138	/// Block announce config.
1139	pub block_announce_config: Net::NotificationProtocolConfig,
1140	/// Network service provider to drive with network internally.
1141	pub network_service_provider: NetworkServiceProvider,
1142	/// Prometheus metrics registry.
1143	pub metrics_registry: Option<&'a Registry>,
1144	/// Metrics.
1145	pub metrics: NotificationMetrics,
1146}
1147
1148/// Build the network service, the network status sinks and an RPC sender, this is a lower-level
1149/// version of [`build_network`] for those needing more control.
1150pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
1151	params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
1152) -> Result<
1153	(
1154		Arc<dyn soil_network::service::traits::NetworkService>,
1155		TracingUnboundedSender<soil_rpc::system::Request<Block>>,
1156		soil_network::transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1157		Arc<SyncingService<Block>>,
1158	),
1159	Error,
1160>
1161where
1162	Block: BlockT,
1163	Client: ProvideRuntimeApi<Block>
1164		+ HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1165		+ Chain<Block>
1166		+ BlockBackend<Block>
1167		+ BlockIdTo<Block, Error = soil_client::blockchain::Error>
1168		+ ProofProvider<Block>
1169		+ HeaderBackend<Block>
1170		+ BlockchainEvents<Block>
1171		+ 'static,
1172	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1173	IQ: ImportQueue<Block> + 'static,
1174	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1175{
1176	let BuildNetworkAdvancedParams {
1177		role,
1178		protocol_id,
1179		fork_id,
1180		ipfs_server,
1181		announce_block,
1182		mut net_config,
1183		client,
1184		transaction_pool,
1185		spawn_handle,
1186		spawn_essential_handle,
1187		import_queue,
1188		sync_service,
1189		block_announce_config,
1190		network_service_provider,
1191		metrics_registry,
1192		metrics,
1193	} = params;
1194
1195	let genesis_hash = client.info().genesis_hash;
1196
1197	let light_client_request_protocol_config = {
1198		// Allow both outgoing and incoming requests.
1199		let (handler, protocol_config) =
1200			LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1201		spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1202		protocol_config
1203	};
1204
1205	// install request handlers to `FullNetworkConfiguration`
1206	net_config.add_request_response_protocol(light_client_request_protocol_config);
1207
1208	let bitswap_config = ipfs_server.then(|| {
1209		let (handler, config) = Net::bitswap_server(client.clone());
1210		spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1211
1212		config
1213	});
1214
1215	// Create transactions protocol and add it to the list of supported protocols of
1216	let (transactions_handler_proto, transactions_config) =
1217		soil_network::transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1218			protocol_id.clone(),
1219			genesis_hash,
1220			fork_id,
1221			metrics.clone(),
1222			net_config.peer_store_handle(),
1223		);
1224	net_config.add_notification_protocol(transactions_config);
1225
1226	// Start task for `PeerStore`
1227	let peer_store = net_config.take_peer_store();
1228	spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1229
1230	let sync_service = Arc::new(sync_service);
1231
1232	let network_params = soil_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1233		role,
1234		executor: {
1235			let spawn_handle = Clone::clone(&spawn_handle);
1236			Box::new(move |fut| {
1237				spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1238			})
1239		},
1240		network_config: net_config,
1241		genesis_hash,
1242		protocol_id,
1243		fork_id: fork_id.map(ToOwned::to_owned),
1244		metrics_registry: metrics_registry.cloned(),
1245		block_announce_config,
1246		bitswap_config,
1247		notification_metrics: metrics,
1248	};
1249
1250	let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1251	let network_mut = Net::new(network_params)?;
1252	let network = network_mut.network_service().clone();
1253
1254	let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1255		network.clone(),
1256		sync_service.clone(),
1257		Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1258		metrics_registry,
1259	)?;
1260	spawn_handle.spawn_blocking(
1261		"network-transactions-handler",
1262		Some("networking"),
1263		tx_handler.run(),
1264	);
1265
1266	spawn_handle.spawn_blocking(
1267		"chain-sync-network-service-provider",
1268		Some("networking"),
1269		network_service_provider.run(Arc::new(network.clone())),
1270	);
1271	spawn_handle.spawn("import-queue", None, {
1272		let sync_service = sync_service.clone();
1273
1274		async move { import_queue.run(sync_service.as_ref()).await }
1275	});
1276
1277	let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1278	spawn_handle.spawn(
1279		"system-rpc-handler",
1280		Some("networking"),
1281		build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1282			role,
1283			network_mut.network_service(),
1284			sync_service.clone(),
1285			client.clone(),
1286			system_rpc_rx,
1287			has_bootnodes,
1288		),
1289	);
1290
1291	let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1292		network_mut,
1293		client,
1294		sync_service.clone(),
1295		announce_block,
1296	);
1297
1298	// The network worker is responsible for gathering all network messages and processing
1299	// them. This is quite a heavy task, and at the time of the writing of this comment it
1300	// frequently happens that this future takes several seconds or in some situations
1301	// even more than a minute until it has processed its entire queue. This is clearly an
1302	// issue, and ideally we would like to fix the network future to take as little time as
1303	// possible, but we also take the extra harm-prevention measure to execute the networking
1304	// future using `spawn_blocking`.
1305	//
1306	// The network worker is spawned as an essential task, meaning if it exits unexpectedly
1307	// the service will shut down.
1308	spawn_essential_handle.spawn_blocking("network-worker", Some("networking"), future);
1309
1310	Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone()))
1311}
1312
1313/// Configuration for [`build_default_syncing_engine`].
1314pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1315where
1316	Block: BlockT,
1317	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1318{
1319	/// Role of the local node.
1320	pub role: Role,
1321	/// Protocol name prefix.
1322	pub protocol_id: ProtocolId,
1323	/// Fork ID.
1324	pub fork_id: Option<&'a str>,
1325	/// Full network configuration.
1326	pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1327	/// Validator for incoming block announcements.
1328	pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1329	/// Handle to communicate with `NetworkService`.
1330	pub network_service_handle: NetworkServiceHandle,
1331	/// Warp sync configuration (when used).
1332	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1333	/// A shared client returned by `new_full_parts`.
1334	pub client: Arc<Client>,
1335	/// Blocks import queue API.
1336	pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1337	/// Expected max total number of peer connections (in + out).
1338	pub num_peers_hint: usize,
1339	/// A handle for spawning tasks.
1340	pub spawn_handle: &'a SpawnTaskHandle,
1341	/// Prometheus metrics registry.
1342	pub metrics_registry: Option<&'a Registry>,
1343	/// Metrics.
1344	pub metrics: NotificationMetrics,
1345	/// Whether to archive blocks. When `true`, gap sync requests bodies to maintain complete
1346	/// block history.
1347	pub archive_blocks: bool,
1348}
1349
1350/// Build default syncing engine using [`build_default_block_downloader`] and
1351/// [`build_polkadot_syncing_strategy`] internally.
1352pub fn build_default_syncing_engine<Block, Client, Net>(
1353	config: DefaultSyncingEngineConfig<Block, Client, Net>,
1354) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1355where
1356	Block: BlockT,
1357	Client: HeaderBackend<Block>
1358		+ BlockBackend<Block>
1359		+ HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1360		+ ProofProvider<Block>
1361		+ Send
1362		+ Sync
1363		+ 'static,
1364	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1365{
1366	let DefaultSyncingEngineConfig {
1367		role,
1368		protocol_id,
1369		fork_id,
1370		net_config,
1371		block_announce_validator,
1372		network_service_handle,
1373		warp_sync_config,
1374		client,
1375		import_queue_service,
1376		num_peers_hint,
1377		spawn_handle,
1378		metrics_registry,
1379		metrics,
1380		archive_blocks,
1381	} = config;
1382
1383	let block_downloader = build_default_block_downloader(
1384		&protocol_id,
1385		fork_id,
1386		net_config,
1387		network_service_handle.clone(),
1388		client.clone(),
1389		num_peers_hint,
1390		spawn_handle,
1391	);
1392	let syncing_strategy = build_polkadot_syncing_strategy(
1393		protocol_id.clone(),
1394		fork_id,
1395		net_config,
1396		warp_sync_config,
1397		block_downloader,
1398		client.clone(),
1399		spawn_handle,
1400		metrics_registry,
1401		archive_blocks,
1402	)?;
1403
1404	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1405		Roles::from(&role),
1406		client,
1407		metrics_registry,
1408		metrics,
1409		&net_config,
1410		protocol_id,
1411		fork_id,
1412		block_announce_validator,
1413		syncing_strategy,
1414		network_service_handle,
1415		import_queue_service,
1416		net_config.peer_store_handle(),
1417	)?;
1418
1419	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1420
1421	Ok((sync_service, block_announce_config))
1422}
1423
1424/// Build default block downloader
1425pub fn build_default_block_downloader<Block, Client, Net>(
1426	protocol_id: &ProtocolId,
1427	fork_id: Option<&str>,
1428	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1429	network_service_handle: NetworkServiceHandle,
1430	client: Arc<Client>,
1431	num_peers_hint: usize,
1432	spawn_handle: &SpawnTaskHandle,
1433) -> Arc<dyn BlockDownloader<Block>>
1434where
1435	Block: BlockT,
1436	Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1437	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1438{
1439	// Custom protocol was not specified, use the default block handler.
1440	// Allow both outgoing and incoming requests.
1441	let BlockRelayParams { mut server, downloader, request_response_config } =
1442		BlockRequestHandler::new::<Net>(
1443			network_service_handle,
1444			&protocol_id,
1445			fork_id,
1446			client.clone(),
1447			num_peers_hint,
1448		);
1449
1450	spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1451		server.run().await;
1452	});
1453
1454	net_config.add_request_response_protocol(request_response_config);
1455
1456	downloader
1457}
1458
1459/// Build standard polkadot syncing strategy
1460pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1461	protocol_id: ProtocolId,
1462	fork_id: Option<&str>,
1463	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1464	warp_sync_config: Option<WarpSyncConfig<Block>>,
1465	block_downloader: Arc<dyn BlockDownloader<Block>>,
1466	client: Arc<Client>,
1467	spawn_handle: &SpawnTaskHandle,
1468	metrics_registry: Option<&Registry>,
1469	archive_blocks: bool,
1470) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1471where
1472	Block: BlockT,
1473	Client: HeaderBackend<Block>
1474		+ BlockBackend<Block>
1475		+ HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1476		+ ProofProvider<Block>
1477		+ Send
1478		+ Sync
1479		+ 'static,
1480	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1481{
1482	if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1483		return Err("Warp sync enabled, but no warp sync provider configured.".into());
1484	}
1485
1486	if client.requires_full_sync() {
1487		match net_config.network_config.sync_mode {
1488			SyncMode::LightState { .. } => {
1489				return Err("Fast sync doesn't work for archive nodes".into())
1490			},
1491			SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1492			SyncMode::Full => {},
1493		}
1494	}
1495
1496	let genesis_hash = client.info().genesis_hash;
1497
1498	let (state_request_protocol_config, state_request_protocol_name) = {
1499		let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize
1500			+ net_config.network_config.default_peers_set.reserved_nodes.len();
1501		// Allow both outgoing and incoming requests.
1502		let (handler, protocol_config) =
1503			StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1504		let config_name = protocol_config.protocol_name().clone();
1505
1506		spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1507		(protocol_config, config_name)
1508	};
1509	net_config.add_request_response_protocol(state_request_protocol_config);
1510
1511	let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1512		Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1513			// Allow both outgoing and incoming requests.
1514			let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1515				protocol_id,
1516				genesis_hash,
1517				fork_id,
1518				warp_with_provider.clone(),
1519			);
1520			let config_name = protocol_config.protocol_name().clone();
1521
1522			spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1523			(Some(protocol_config), Some(config_name))
1524		},
1525		_ => (None, None),
1526	};
1527	if let Some(config) = warp_sync_protocol_config {
1528		net_config.add_request_response_protocol(config);
1529	}
1530
1531	let syncing_config = PolkadotSyncingStrategyConfig {
1532		mode: net_config.network_config.sync_mode,
1533		max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1534		max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1535		min_peers_to_start_warp_sync: net_config.network_config.min_peers_to_start_warp_sync,
1536		metrics_registry: metrics_registry.cloned(),
1537		state_request_protocol_name,
1538		block_downloader,
1539		archive_blocks,
1540	};
1541	Ok(Box::new(PolkadotSyncingStrategy::new(
1542		syncing_config,
1543		client,
1544		warp_sync_config,
1545		warp_sync_protocol_name,
1546	)?))
1547}