1use crate::{
20 build_network_future, build_system_rpc_future,
21 client::{Client, ClientConfig},
22 config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23 error::Error,
24 metrics::MetricsService,
25 start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
26 TaskManager, TransactionPoolAdapter,
27};
28use futures::{select, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::{debug, error, info};
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34 execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35 BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, KeysIter, StorageProvider,
36 TrieCacheContext, UsageProvider,
37};
38use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
39use sc_consensus::import_queue::{ImportQueue, ImportQueueService};
40use sc_executor::{
41 sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
42 WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
43};
44use sc_keystore::LocalKeystore;
45use sc_network::{
46 config::{FullNetworkConfiguration, ProtocolId, SyncMode},
47 multiaddr::Protocol,
48 service::{
49 traits::{PeerStore, RequestResponseConfig},
50 NotificationMetrics,
51 },
52 NetworkBackend, NetworkStateInfo,
53};
54use sc_network_common::role::{Role, Roles};
55use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
56use sc_network_sync::{
57 block_relay_protocol::{BlockDownloader, BlockRelayParams},
58 block_request_handler::BlockRequestHandler,
59 engine::SyncingEngine,
60 service::network::{NetworkServiceHandle, NetworkServiceProvider},
61 state_request_handler::StateRequestHandler,
62 strategy::{
63 polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
64 SyncingStrategy,
65 },
66 warp_request_handler::RequestHandler as WarpSyncRequestHandler,
67 SyncingService, WarpSyncConfig,
68};
69use sc_rpc::{
70 author::AuthorApiServer,
71 chain::ChainApiServer,
72 offchain::OffchainApiServer,
73 state::{ChildStateApiServer, StateApiServer},
74 system::SystemApiServer,
75 DenyUnsafe, SubscriptionTaskExecutor,
76};
77use sc_rpc_spec_v2::{
78 archive::ArchiveApiServer,
79 chain_head::ChainHeadApiServer,
80 chain_spec::ChainSpecApiServer,
81 transaction::{TransactionApiServer, TransactionBroadcastApiServer},
82};
83use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
84use sc_tracing::block::TracingExecuteBlock;
85use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
86use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
87use sp_api::{CallApiAt, ProvideRuntimeApi};
88use sp_blockchain::{HeaderBackend, HeaderMetadata};
89use sp_consensus::block_validation::{
90 BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
91};
92use sp_core::traits::{CodeExecutor, SpawnNamed};
93use sp_keystore::KeystorePtr;
94use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
95use sp_storage::{ChildInfo, ChildType, PrefixedStorageKey};
96use std::{
97 str::FromStr,
98 sync::Arc,
99 time::{Duration, SystemTime},
100};
101
102pub type TFullClient<TBl, TRtApi, TExec> =
104 Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
105
106pub type TFullBackend<TBl> = Backend<TBl>;
108
109pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
111
112type TFullParts<TBl, TRtApi, TExec> =
113 (TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
114
115pub struct KeystoreContainer(Arc<LocalKeystore>);
117
118impl KeystoreContainer {
119 pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
121 let keystore = Arc::new(match config {
122 KeystoreConfig::Path { path, password } =>
123 LocalKeystore::open(path.clone(), password.clone())?,
124 KeystoreConfig::InMemory => LocalKeystore::in_memory(),
125 });
126
127 Ok(Self(keystore))
128 }
129
130 pub fn keystore(&self) -> KeystorePtr {
132 self.0.clone()
133 }
134
135 pub fn local_keystore(&self) -> Arc<LocalKeystore> {
137 self.0.clone()
138 }
139}
140
141pub fn new_full_client<TBl, TRtApi, TExec>(
143 config: &Configuration,
144 telemetry: Option<TelemetryHandle>,
145 executor: TExec,
146) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
147where
148 TBl: BlockT,
149 TExec: CodeExecutor + RuntimeVersionOf + Clone,
150{
151 new_full_parts(config, telemetry, executor).map(|parts| parts.0)
152}
153
154pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
156 config: &Configuration,
157 telemetry: Option<TelemetryHandle>,
158 executor: TExec,
159 enable_import_proof_recording: bool,
160) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
161where
162 TBl: BlockT,
163 TExec: CodeExecutor + RuntimeVersionOf + Clone,
164{
165 let backend = new_db_backend(config.db_config())?;
166
167 let genesis_block_builder = GenesisBlockBuilder::new(
168 config.chain_spec.as_storage_builder(),
169 !config.no_genesis(),
170 backend.clone(),
171 executor.clone(),
172 )?;
173
174 new_full_parts_with_genesis_builder(
175 config,
176 telemetry,
177 executor,
178 backend,
179 genesis_block_builder,
180 enable_import_proof_recording,
181 )
182}
183pub fn new_full_parts<TBl, TRtApi, TExec>(
185 config: &Configuration,
186 telemetry: Option<TelemetryHandle>,
187 executor: TExec,
188) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
189where
190 TBl: BlockT,
191 TExec: CodeExecutor + RuntimeVersionOf + Clone,
192{
193 new_full_parts_record_import(config, telemetry, executor, false)
194}
195
196pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
198 config: &Configuration,
199 telemetry: Option<TelemetryHandle>,
200 executor: TExec,
201 backend: Arc<TFullBackend<TBl>>,
202 genesis_block_builder: TBuildGenesisBlock,
203 enable_import_proof_recording: bool,
204) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
205where
206 TBl: BlockT,
207 TExec: CodeExecutor + RuntimeVersionOf + Clone,
208 TBuildGenesisBlock: BuildGenesisBlock<
209 TBl,
210 BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
211 >,
212{
213 let keystore_container = KeystoreContainer::new(&config.keystore)?;
214
215 let task_manager = {
216 let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
217 TaskManager::new(config.tokio_handle.clone(), registry)?
218 };
219
220 let chain_spec = &config.chain_spec;
221 let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
222 .cloned()
223 .unwrap_or_default();
224
225 let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
226 .cloned()
227 .unwrap_or_default();
228
229 let client = {
230 let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
231
232 let wasm_runtime_substitutes = config
233 .chain_spec
234 .code_substitutes()
235 .into_iter()
236 .map(|(n, c)| {
237 let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
238 Error::Application(Box::from(format!(
239 "Failed to parse `{}` as block number for code substitutes. \
240 In an old version the key for code substitute was a block hash. \
241 Please update the chain spec to a version that is compatible with your node.",
242 n
243 )))
244 })?;
245 Ok((number, c))
246 })
247 .collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
248
249 let client = new_client(
250 backend.clone(),
251 executor,
252 genesis_block_builder,
253 fork_blocks,
254 bad_blocks,
255 extensions,
256 Box::new(task_manager.spawn_handle()),
257 config.prometheus_config.as_ref().map(|config| config.registry.clone()),
258 telemetry,
259 ClientConfig {
260 offchain_worker_enabled: config.offchain_worker.enabled,
261 offchain_indexing_api: config.offchain_worker.indexing_enabled,
262 wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
263 no_genesis: config.no_genesis(),
264 wasm_runtime_substitutes,
265 enable_import_proof_recording,
266 },
267 )?;
268
269 if let Some(warm_up_strategy) = config.warm_up_trie_cache {
270 let storage_root = client.usage_info().chain.best_hash;
271 let backend_clone = backend.clone();
272
273 if warm_up_strategy.is_blocking() {
274 warm_up_trie_cache(backend_clone, storage_root)?;
277 } else {
278 task_manager.spawn_handle().spawn_blocking(
279 "warm-up-trie-cache",
280 None,
281 async move {
282 if let Err(e) = warm_up_trie_cache(backend_clone, storage_root) {
283 error!("Failed to warm up trie cache: {e}");
284 }
285 },
286 );
287 }
288 }
289
290 client
291 };
292
293 Ok((client, backend, keystore_container, task_manager))
294}
295
296fn child_info(key: Vec<u8>) -> Option<ChildInfo> {
297 let prefixed_key = PrefixedStorageKey::new(key);
298 ChildType::from_prefixed_key(&prefixed_key).and_then(|(child_type, storage_key)| {
299 (child_type == ChildType::ParentKeyId).then(|| ChildInfo::new_default(storage_key))
300 })
301}
302
303fn warm_up_trie_cache<TBl: BlockT>(
304 backend: Arc<TFullBackend<TBl>>,
305 storage_root: TBl::Hash,
306) -> Result<(), Error> {
307 use sc_client_api::backend::Backend;
308 use sp_state_machine::Backend as StateBackend;
309
310 let untrusted_state = || backend.state_at(storage_root, TrieCacheContext::Untrusted);
311 let trusted_state = || backend.state_at(storage_root, TrieCacheContext::Trusted);
312
313 debug!("Populating trie cache started",);
314 let start_time = std::time::Instant::now();
315 let mut keys_count = 0;
316 let mut child_keys_count = 0;
317 for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None)? {
318 if keys_count != 0 && keys_count % 100_000 == 0 {
319 debug!("{} keys and {} child keys have been warmed", keys_count, child_keys_count);
320 }
321 match child_info(key.0.clone()) {
322 Some(info) => {
323 for child_key in
324 KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None)?
325 {
326 if trusted_state()?
327 .child_storage(&info, &child_key.0)
328 .unwrap_or_default()
329 .is_none()
330 {
331 debug!("Child storage value unexpectedly empty: {child_key:?}");
332 }
333 child_keys_count += 1;
334 }
335 },
336 None => {
337 if trusted_state()?.storage(&key.0).unwrap_or_default().is_none() {
338 debug!("Storage value unexpectedly empty: {key:?}");
339 }
340 keys_count += 1;
341 },
342 }
343 }
344 debug!(
345 "Trie cache populated with {keys_count} keys and {child_keys_count} child keys in {} s",
346 start_time.elapsed().as_secs_f32()
347 );
348
349 Ok(())
350}
351
352#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
355#[allow(deprecated)]
356pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
357 config: &Configuration,
358) -> sc_executor::NativeElseWasmExecutor<D> {
359 #[allow(deprecated)]
360 sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
361}
362
363pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
365 let strategy = config
366 .default_heap_pages
367 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
368 WasmExecutor::<H>::builder()
369 .with_execution_method(config.wasm_method)
370 .with_onchain_heap_alloc_strategy(strategy)
371 .with_offchain_heap_alloc_strategy(strategy)
372 .with_max_runtime_instances(config.max_runtime_instances)
373 .with_runtime_cache_size(config.runtime_cache_size)
374 .build()
375}
376
377pub fn new_db_backend<Block>(
379 settings: DatabaseSettings,
380) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
381where
382 Block: BlockT,
383{
384 const CANONICALIZATION_DELAY: u64 = 4096;
385
386 Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
387}
388
389pub fn new_client<E, Block, RA, G>(
391 backend: Arc<Backend<Block>>,
392 executor: E,
393 genesis_block_builder: G,
394 fork_blocks: ForkBlocks<Block>,
395 bad_blocks: BadBlocks<Block>,
396 execution_extensions: ExecutionExtensions<Block>,
397 spawn_handle: Box<dyn SpawnNamed>,
398 prometheus_registry: Option<Registry>,
399 telemetry: Option<TelemetryHandle>,
400 config: ClientConfig<Block>,
401) -> Result<
402 Client<
403 Backend<Block>,
404 crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
405 Block,
406 RA,
407 >,
408 sp_blockchain::Error,
409>
410where
411 Block: BlockT,
412 E: CodeExecutor + RuntimeVersionOf,
413 G: BuildGenesisBlock<
414 Block,
415 BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
416 >,
417{
418 let executor = crate::client::LocalCallExecutor::new(
419 backend.clone(),
420 executor,
421 config.clone(),
422 execution_extensions,
423 )?;
424
425 Client::new(
426 backend,
427 executor,
428 spawn_handle,
429 genesis_block_builder,
430 fork_blocks,
431 bad_blocks,
432 prometheus_registry,
433 telemetry,
434 config,
435 )
436}
437
438pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
440 pub config: Configuration,
442 pub client: Arc<TCl>,
444 pub backend: Arc<Backend>,
446 pub task_manager: &'a mut TaskManager,
448 pub keystore: KeystorePtr,
450 pub transaction_pool: Arc<TExPool>,
452 pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
454 pub network: Arc<dyn sc_network::service::traits::NetworkService>,
456 pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
458 pub tx_handler_controller:
460 sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
461 pub sync_service: Arc<SyncingService<TBl>>,
463 pub telemetry: Option<&'a mut Telemetry>,
465 pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
469}
470
471pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
473 SpawnTasksParams {
474 mut config,
475 task_manager,
476 client,
477 backend,
478 keystore,
479 transaction_pool,
480 rpc_builder,
481 network,
482 system_rpc_tx,
483 tx_handler_controller,
484 sync_service,
485 telemetry,
486 tracing_execute_block: execute_block,
487 }: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
488) -> Result<RpcHandlers, Error>
489where
490 TCl: ProvideRuntimeApi<TBl>
491 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
492 + Chain<TBl>
493 + BlockBackend<TBl>
494 + BlockIdTo<TBl, Error = sp_blockchain::Error>
495 + ProofProvider<TBl>
496 + HeaderBackend<TBl>
497 + BlockchainEvents<TBl>
498 + ExecutorProvider<TBl>
499 + UsageProvider<TBl>
500 + StorageProvider<TBl, TBackend>
501 + CallApiAt<TBl>
502 + Send
503 + 'static,
504 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
505 + sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
506 + sp_session::SessionKeys<TBl>
507 + sp_api::ApiExt<TBl>,
508 TBl: BlockT,
509 TBl::Hash: Unpin,
510 TBl::Header: Unpin,
511 TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
512 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
513{
514 let chain_info = client.usage_info().chain;
515
516 sp_session::generate_initial_session_keys(
517 client.clone(),
518 chain_info.best_hash,
519 config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
520 keystore.clone(),
521 )
522 .map_err(|e| Error::Application(Box::new(e)))?;
523
524 let sysinfo = sc_sysinfo::gather_sysinfo();
525 sc_sysinfo::print_sysinfo(&sysinfo);
526
527 let telemetry = telemetry
528 .map(|telemetry| {
529 init_telemetry(
530 config.network.node_name.clone(),
531 config.impl_name.clone(),
532 config.impl_version.clone(),
533 config.chain_spec.name().to_string(),
534 config.role.is_authority(),
535 network.clone(),
536 client.clone(),
537 telemetry,
538 Some(sysinfo),
539 )
540 })
541 .transpose()?;
542
543 info!("📦 Highest known block at #{}", chain_info.best_number);
544
545 let spawn_handle = task_manager.spawn_handle();
546
547 spawn_handle.spawn(
549 "txpool-notifications",
550 Some("transaction-pool"),
551 sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
552 );
553
554 spawn_handle.spawn(
555 "on-transaction-imported",
556 Some("transaction-pool"),
557 propagate_transaction_notifications(
558 transaction_pool.clone(),
559 tx_handler_controller,
560 telemetry.clone(),
561 ),
562 );
563
564 let metrics_service =
566 if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
567 let metrics = MetricsService::with_prometheus(
569 telemetry,
570 ®istry,
571 config.role,
572 &config.network.node_name,
573 &config.impl_version,
574 )?;
575 spawn_handle.spawn(
576 "prometheus-endpoint",
577 None,
578 prometheus_endpoint::init_prometheus(port, registry).map(drop),
579 );
580
581 metrics
582 } else {
583 MetricsService::new(telemetry)
584 };
585
586 spawn_handle.spawn(
588 "telemetry-periodic-send",
589 None,
590 metrics_service.run(
591 client.clone(),
592 transaction_pool.clone(),
593 network.clone(),
594 sync_service.clone(),
595 ),
596 );
597
598 let rpc_id_provider = config.rpc.id_provider.take();
599
600 let rpc_v2_metrics = config
605 .prometheus_registry()
606 .map(|registry| sc_rpc_spec_v2::transaction::TransactionMetrics::new(registry))
607 .transpose()?;
608
609 let gen_rpc_module = || {
610 gen_rpc_module(GenRpcModuleParams {
611 spawn_handle: task_manager.spawn_handle(),
612 client: client.clone(),
613 transaction_pool: transaction_pool.clone(),
614 keystore: keystore.clone(),
615 system_rpc_tx: system_rpc_tx.clone(),
616 impl_name: config.impl_name.clone(),
617 impl_version: config.impl_version.clone(),
618 chain_spec: config.chain_spec.as_ref(),
619 state_pruning: &config.state_pruning,
620 blocks_pruning: config.blocks_pruning,
621 backend: backend.clone(),
622 rpc_builder: &*rpc_builder,
623 metrics: rpc_v2_metrics.clone(),
624 tracing_execute_block: execute_block.clone(),
625 })
626 };
627
628 let rpc_server_handle = start_rpc_servers(
629 &config.rpc,
630 config.prometheus_registry(),
631 &config.tokio_handle,
632 gen_rpc_module,
633 rpc_id_provider,
634 )?;
635
636 let listen_addrs = rpc_server_handle
637 .listen_addrs()
638 .into_iter()
639 .map(|socket_addr| {
640 let mut multiaddr: Multiaddr = socket_addr.ip().into();
641 multiaddr.push(Protocol::Tcp(socket_addr.port()));
642 multiaddr
643 })
644 .collect();
645
646 let in_memory_rpc = {
647 let mut module = gen_rpc_module()?;
648 module.extensions_mut().insert(DenyUnsafe::No);
649 module
650 };
651
652 let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
653
654 spawn_handle.spawn(
656 "informant",
657 None,
658 sc_informant::build(client.clone(), network, sync_service.clone()),
659 );
660
661 task_manager.keep_alive((config.base_path, rpc_server_handle));
662
663 Ok(in_memory_rpc_handle)
664}
665
666pub async fn propagate_transaction_notifications<Block, ExPool>(
668 transaction_pool: Arc<ExPool>,
669 tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
670 <Block as BlockT>::Hash,
671 >,
672 telemetry: Option<TelemetryHandle>,
673) where
674 Block: BlockT,
675 ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
676{
677 const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);
678
679 let mut notifications = transaction_pool.import_notification_stream().fuse();
681 let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
682 let mut tx_imported = false;
683
684 loop {
685 select! {
686 notification = notifications.next() => {
687 let Some(hash) = notification else { return };
688
689 tx_handler_controller.propagate_transaction(hash);
690
691 tx_imported = true;
692 },
693 _ = timer => {
694 timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
695
696 if !tx_imported {
697 continue;
698 }
699
700 tx_imported = false;
701 let status = transaction_pool.status();
702
703 telemetry!(
704 telemetry;
705 SUBSTRATE_INFO;
706 "txpool.import";
707 "ready" => status.ready,
708 "future" => status.future,
709 );
710 }
711 }
712 }
713}
714
715pub fn init_telemetry<Block, Client, Network>(
717 name: String,
718 implementation: String,
719 version: String,
720 chain: String,
721 authority: bool,
722 network: Network,
723 client: Arc<Client>,
724 telemetry: &mut Telemetry,
725 sysinfo: Option<sc_telemetry::SysInfo>,
726) -> sc_telemetry::Result<TelemetryHandle>
727where
728 Block: BlockT,
729 Client: BlockBackend<Block>,
730 Network: NetworkStateInfo,
731{
732 let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
733 let connection_message = ConnectionMessage {
734 name,
735 implementation,
736 version,
737 target_os: sc_sysinfo::TARGET_OS.into(),
738 target_arch: sc_sysinfo::TARGET_ARCH.into(),
739 target_env: sc_sysinfo::TARGET_ENV.into(),
740 config: String::new(),
741 chain,
742 genesis_hash: format!("{:?}", genesis_hash),
743 authority,
744 startup_time: SystemTime::UNIX_EPOCH
745 .elapsed()
746 .map(|dur| dur.as_millis())
747 .unwrap_or(0)
748 .to_string(),
749 network_id: network.local_peer_id().to_base58(),
750 sysinfo,
751 };
752
753 telemetry.start_telemetry(connection_message)?;
754
755 Ok(telemetry.handle())
756}
757
758pub struct GenRpcModuleParams<'a, TBl: BlockT, TBackend, TCl, TRpc, TExPool> {
760 pub spawn_handle: SpawnTaskHandle,
762 pub client: Arc<TCl>,
764 pub transaction_pool: Arc<TExPool>,
766 pub keystore: KeystorePtr,
768 pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
770 pub impl_name: String,
772 pub impl_version: String,
774 pub chain_spec: &'a dyn ChainSpec,
776 pub state_pruning: &'a Option<PruningMode>,
778 pub blocks_pruning: BlocksPruning,
780 pub backend: Arc<TBackend>,
782 pub rpc_builder: &'a dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>,
784 pub metrics: Option<sc_rpc_spec_v2::transaction::TransactionMetrics>,
786 pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
790}
791
792pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
794 GenRpcModuleParams {
795 spawn_handle,
796 client,
797 transaction_pool,
798 keystore,
799 system_rpc_tx,
800 impl_name,
801 impl_version,
802 chain_spec,
803 state_pruning,
804 blocks_pruning,
805 backend,
806 rpc_builder,
807 metrics,
808 tracing_execute_block: execute_block,
809 }: GenRpcModuleParams<TBl, TBackend, TCl, TRpc, TExPool>,
810) -> Result<RpcModule<()>, Error>
811where
812 TBl: BlockT,
813 TCl: ProvideRuntimeApi<TBl>
814 + BlockchainEvents<TBl>
815 + HeaderBackend<TBl>
816 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
817 + ExecutorProvider<TBl>
818 + CallApiAt<TBl>
819 + ProofProvider<TBl>
820 + StorageProvider<TBl, TBackend>
821 + BlockBackend<TBl>
822 + Send
823 + Sync
824 + 'static,
825 TBackend: sc_client_api::backend::Backend<TBl> + 'static,
826 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
827 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
828 TBl::Hash: Unpin,
829 TBl::Header: Unpin,
830{
831 let system_info = sc_rpc::system::SystemInfo {
832 chain_name: chain_spec.name().into(),
833 impl_name,
834 impl_version,
835 properties: chain_spec.properties(),
836 chain_type: chain_spec.chain_type(),
837 };
838
839 let mut rpc_api = RpcModule::new(());
840 let task_executor = Arc::new(spawn_handle);
841
842 let (chain, state, child_state) = {
843 let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
844 let (state, child_state) =
845 sc_rpc::state::new_full(client.clone(), task_executor.clone(), execute_block);
846 let state = state.into_rpc();
847 let child_state = child_state.into_rpc();
848
849 (chain, state, child_state)
850 };
851
852 const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
853
854 let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
855 client.clone(),
856 transaction_pool.clone(),
857 task_executor.clone(),
858 MAX_TRANSACTION_PER_CONNECTION,
859 )
860 .into_rpc();
861
862 let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
863 client.clone(),
864 transaction_pool.clone(),
865 task_executor.clone(),
866 metrics,
867 )
868 .into_rpc();
869
870 let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
871 client.clone(),
872 backend.clone(),
873 task_executor.clone(),
874 sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
876 )
877 .into_rpc();
878
879 let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
884 blocks_pruning.is_archive();
885 let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
886 if is_archive_node {
887 let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
888 client.clone(),
889 backend.clone(),
890 genesis_hash,
891 task_executor.clone(),
892 )
893 .into_rpc();
894 rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
895 }
896
897 let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
899 chain_spec.name().into(),
900 genesis_hash,
901 chain_spec.properties(),
902 )
903 .into_rpc();
904
905 let author = sc_rpc::author::Author::new(
906 client.clone(),
907 transaction_pool,
908 keystore,
909 task_executor.clone(),
910 )
911 .into_rpc();
912
913 let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
914
915 if let Some(storage) = backend.offchain_storage() {
916 let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
917
918 rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
919 }
920
921 rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
923 rpc_api
924 .merge(transaction_broadcast_rpc_v2)
925 .map_err(|e| Error::Application(e.into()))?;
926 rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
927 rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
928
929 rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
931 rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
932 rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
933 rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
934 rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
935 let extra_rpcs = rpc_builder(task_executor.clone())?;
937 rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
938
939 Ok(rpc_api)
940}
941
942pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
944where
945 Block: BlockT,
946 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
947{
948 pub config: &'a Configuration,
950 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
952 pub client: Arc<Client>,
954 pub transaction_pool: Arc<TxPool>,
956 pub spawn_handle: SpawnTaskHandle,
958 pub import_queue: IQ,
960 pub block_announce_validator_builder: Option<
962 Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
963 >,
964 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
966 pub block_relay: Option<BlockRelayParams<Block, Net>>,
969 pub metrics: NotificationMetrics,
971}
972
973pub fn build_network<Block, Net, TxPool, IQ, Client>(
975 params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
976) -> Result<
977 (
978 Arc<dyn sc_network::service::traits::NetworkService>,
979 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
980 sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
981 Arc<SyncingService<Block>>,
982 ),
983 Error,
984>
985where
986 Block: BlockT,
987 Client: ProvideRuntimeApi<Block>
988 + HeaderMetadata<Block, Error = sp_blockchain::Error>
989 + Chain<Block>
990 + BlockBackend<Block>
991 + BlockIdTo<Block, Error = sp_blockchain::Error>
992 + ProofProvider<Block>
993 + HeaderBackend<Block>
994 + BlockchainEvents<Block>
995 + 'static,
996 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
997 IQ: ImportQueue<Block> + 'static,
998 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
999{
1000 let BuildNetworkParams {
1001 config,
1002 mut net_config,
1003 client,
1004 transaction_pool,
1005 spawn_handle,
1006 import_queue,
1007 block_announce_validator_builder,
1008 warp_sync_config,
1009 block_relay,
1010 metrics,
1011 } = params;
1012
1013 let block_announce_validator = if let Some(f) = block_announce_validator_builder {
1014 f(client.clone())
1015 } else {
1016 Box::new(DefaultBlockAnnounceValidator)
1017 };
1018
1019 let network_service_provider = NetworkServiceProvider::new();
1020 let protocol_id = config.protocol_id();
1021 let fork_id = config.chain_spec.fork_id();
1022 let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
1023
1024 let block_downloader = match block_relay {
1025 Some(params) => {
1026 let BlockRelayParams { mut server, downloader, request_response_config } = params;
1027
1028 net_config.add_request_response_protocol(request_response_config);
1029
1030 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1031 server.run().await;
1032 });
1033
1034 downloader
1035 },
1036 None => build_default_block_downloader(
1037 &protocol_id,
1038 fork_id,
1039 &mut net_config,
1040 network_service_provider.handle(),
1041 Arc::clone(&client),
1042 config.network.default_peers_set.in_peers as usize +
1043 config.network.default_peers_set.out_peers as usize,
1044 &spawn_handle,
1045 ),
1046 };
1047
1048 let syncing_strategy = build_polkadot_syncing_strategy(
1049 protocol_id.clone(),
1050 fork_id,
1051 &mut net_config,
1052 warp_sync_config,
1053 block_downloader,
1054 client.clone(),
1055 &spawn_handle,
1056 metrics_registry,
1057 )?;
1058
1059 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1060 Roles::from(&config.role),
1061 Arc::clone(&client),
1062 metrics_registry,
1063 metrics.clone(),
1064 &net_config,
1065 protocol_id.clone(),
1066 fork_id,
1067 block_announce_validator,
1068 syncing_strategy,
1069 network_service_provider.handle(),
1070 import_queue.service(),
1071 net_config.peer_store_handle(),
1072 )?;
1073
1074 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1075
1076 build_network_advanced(BuildNetworkAdvancedParams {
1077 role: config.role,
1078 protocol_id,
1079 fork_id,
1080 ipfs_server: config.network.ipfs_server,
1081 announce_block: config.announce_block,
1082 net_config,
1083 client,
1084 transaction_pool,
1085 spawn_handle,
1086 import_queue,
1087 sync_service,
1088 block_announce_config,
1089 network_service_provider,
1090 metrics_registry,
1091 metrics,
1092 })
1093}
1094
1095pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
1097where
1098 Block: BlockT,
1099 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1100{
1101 pub role: Role,
1103 pub protocol_id: ProtocolId,
1105 pub fork_id: Option<&'a str>,
1107 pub ipfs_server: bool,
1109 pub announce_block: bool,
1111 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1113 pub client: Arc<Client>,
1115 pub transaction_pool: Arc<TxPool>,
1117 pub spawn_handle: SpawnTaskHandle,
1119 pub import_queue: IQ,
1121 pub sync_service: SyncingService<Block>,
1123 pub block_announce_config: Net::NotificationProtocolConfig,
1125 pub network_service_provider: NetworkServiceProvider,
1127 pub metrics_registry: Option<&'a Registry>,
1129 pub metrics: NotificationMetrics,
1131}
1132
1133pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
1136 params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
1137) -> Result<
1138 (
1139 Arc<dyn sc_network::service::traits::NetworkService>,
1140 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
1141 sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1142 Arc<SyncingService<Block>>,
1143 ),
1144 Error,
1145>
1146where
1147 Block: BlockT,
1148 Client: ProvideRuntimeApi<Block>
1149 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1150 + Chain<Block>
1151 + BlockBackend<Block>
1152 + BlockIdTo<Block, Error = sp_blockchain::Error>
1153 + ProofProvider<Block>
1154 + HeaderBackend<Block>
1155 + BlockchainEvents<Block>
1156 + 'static,
1157 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1158 IQ: ImportQueue<Block> + 'static,
1159 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1160{
1161 let BuildNetworkAdvancedParams {
1162 role,
1163 protocol_id,
1164 fork_id,
1165 ipfs_server,
1166 announce_block,
1167 mut net_config,
1168 client,
1169 transaction_pool,
1170 spawn_handle,
1171 import_queue,
1172 sync_service,
1173 block_announce_config,
1174 network_service_provider,
1175 metrics_registry,
1176 metrics,
1177 } = params;
1178
1179 let genesis_hash = client.info().genesis_hash;
1180
1181 let light_client_request_protocol_config = {
1182 let (handler, protocol_config) =
1184 LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1185 spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1186 protocol_config
1187 };
1188
1189 net_config.add_request_response_protocol(light_client_request_protocol_config);
1191
1192 let bitswap_config = ipfs_server.then(|| {
1193 let (handler, config) = Net::bitswap_server(client.clone());
1194 spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1195
1196 config
1197 });
1198
1199 let (transactions_handler_proto, transactions_config) =
1201 sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1202 protocol_id.clone(),
1203 genesis_hash,
1204 fork_id,
1205 metrics.clone(),
1206 net_config.peer_store_handle(),
1207 );
1208 net_config.add_notification_protocol(transactions_config);
1209
1210 let peer_store = net_config.take_peer_store();
1212 spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1213
1214 let sync_service = Arc::new(sync_service);
1215
1216 let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1217 role,
1218 executor: {
1219 let spawn_handle = Clone::clone(&spawn_handle);
1220 Box::new(move |fut| {
1221 spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1222 })
1223 },
1224 network_config: net_config,
1225 genesis_hash,
1226 protocol_id,
1227 fork_id: fork_id.map(ToOwned::to_owned),
1228 metrics_registry: metrics_registry.cloned(),
1229 block_announce_config,
1230 bitswap_config,
1231 notification_metrics: metrics,
1232 };
1233
1234 let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1235 let network_mut = Net::new(network_params)?;
1236 let network = network_mut.network_service().clone();
1237
1238 let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1239 network.clone(),
1240 sync_service.clone(),
1241 Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1242 metrics_registry,
1243 )?;
1244 spawn_handle.spawn_blocking(
1245 "network-transactions-handler",
1246 Some("networking"),
1247 tx_handler.run(),
1248 );
1249
1250 spawn_handle.spawn_blocking(
1251 "chain-sync-network-service-provider",
1252 Some("networking"),
1253 network_service_provider.run(Arc::new(network.clone())),
1254 );
1255 spawn_handle.spawn("import-queue", None, {
1256 let sync_service = sync_service.clone();
1257
1258 async move { import_queue.run(sync_service.as_ref()).await }
1259 });
1260
1261 let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1262 spawn_handle.spawn(
1263 "system-rpc-handler",
1264 Some("networking"),
1265 build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1266 role,
1267 network_mut.network_service(),
1268 sync_service.clone(),
1269 client.clone(),
1270 system_rpc_rx,
1271 has_bootnodes,
1272 ),
1273 );
1274
1275 let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1276 network_mut,
1277 client,
1278 sync_service.clone(),
1279 announce_block,
1280 );
1281
1282 spawn_handle.spawn_blocking("network-worker", Some("networking"), future);
1290
1291 Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone()))
1292}
1293
1294pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1296where
1297 Block: BlockT,
1298 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1299{
1300 pub role: Role,
1302 pub protocol_id: ProtocolId,
1304 pub fork_id: Option<&'a str>,
1306 pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1308 pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1310 pub network_service_handle: NetworkServiceHandle,
1312 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1314 pub client: Arc<Client>,
1316 pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1318 pub num_peers_hint: usize,
1320 pub spawn_handle: &'a SpawnTaskHandle,
1322 pub metrics_registry: Option<&'a Registry>,
1324 pub metrics: NotificationMetrics,
1326}
1327
1328pub fn build_default_syncing_engine<Block, Client, Net>(
1331 config: DefaultSyncingEngineConfig<Block, Client, Net>,
1332) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1333where
1334 Block: BlockT,
1335 Client: HeaderBackend<Block>
1336 + BlockBackend<Block>
1337 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1338 + ProofProvider<Block>
1339 + Send
1340 + Sync
1341 + 'static,
1342 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1343{
1344 let DefaultSyncingEngineConfig {
1345 role,
1346 protocol_id,
1347 fork_id,
1348 net_config,
1349 block_announce_validator,
1350 network_service_handle,
1351 warp_sync_config,
1352 client,
1353 import_queue_service,
1354 num_peers_hint,
1355 spawn_handle,
1356 metrics_registry,
1357 metrics,
1358 } = config;
1359
1360 let block_downloader = build_default_block_downloader(
1361 &protocol_id,
1362 fork_id,
1363 net_config,
1364 network_service_handle.clone(),
1365 client.clone(),
1366 num_peers_hint,
1367 spawn_handle,
1368 );
1369 let syncing_strategy = build_polkadot_syncing_strategy(
1370 protocol_id.clone(),
1371 fork_id,
1372 net_config,
1373 warp_sync_config,
1374 block_downloader,
1375 client.clone(),
1376 spawn_handle,
1377 metrics_registry,
1378 )?;
1379
1380 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1381 Roles::from(&role),
1382 client,
1383 metrics_registry,
1384 metrics,
1385 &net_config,
1386 protocol_id,
1387 fork_id,
1388 block_announce_validator,
1389 syncing_strategy,
1390 network_service_handle,
1391 import_queue_service,
1392 net_config.peer_store_handle(),
1393 )?;
1394
1395 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1396
1397 Ok((sync_service, block_announce_config))
1398}
1399
1400pub fn build_default_block_downloader<Block, Client, Net>(
1402 protocol_id: &ProtocolId,
1403 fork_id: Option<&str>,
1404 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1405 network_service_handle: NetworkServiceHandle,
1406 client: Arc<Client>,
1407 num_peers_hint: usize,
1408 spawn_handle: &SpawnTaskHandle,
1409) -> Arc<dyn BlockDownloader<Block>>
1410where
1411 Block: BlockT,
1412 Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1413 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1414{
1415 let BlockRelayParams { mut server, downloader, request_response_config } =
1418 BlockRequestHandler::new::<Net>(
1419 network_service_handle,
1420 &protocol_id,
1421 fork_id,
1422 client.clone(),
1423 num_peers_hint,
1424 );
1425
1426 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1427 server.run().await;
1428 });
1429
1430 net_config.add_request_response_protocol(request_response_config);
1431
1432 downloader
1433}
1434
1435pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1437 protocol_id: ProtocolId,
1438 fork_id: Option<&str>,
1439 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1440 warp_sync_config: Option<WarpSyncConfig<Block>>,
1441 block_downloader: Arc<dyn BlockDownloader<Block>>,
1442 client: Arc<Client>,
1443 spawn_handle: &SpawnTaskHandle,
1444 metrics_registry: Option<&Registry>,
1445) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1446where
1447 Block: BlockT,
1448 Client: HeaderBackend<Block>
1449 + BlockBackend<Block>
1450 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1451 + ProofProvider<Block>
1452 + Send
1453 + Sync
1454 + 'static,
1455 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1456{
1457 if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1458 return Err("Warp sync enabled, but no warp sync provider configured.".into())
1459 }
1460
1461 if client.requires_full_sync() {
1462 match net_config.network_config.sync_mode {
1463 SyncMode::LightState { .. } =>
1464 return Err("Fast sync doesn't work for archive nodes".into()),
1465 SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1466 SyncMode::Full => {},
1467 }
1468 }
1469
1470 let genesis_hash = client.info().genesis_hash;
1471
1472 let (state_request_protocol_config, state_request_protocol_name) = {
1473 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1474 net_config.network_config.default_peers_set.reserved_nodes.len();
1475 let (handler, protocol_config) =
1477 StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1478 let config_name = protocol_config.protocol_name().clone();
1479
1480 spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1481 (protocol_config, config_name)
1482 };
1483 net_config.add_request_response_protocol(state_request_protocol_config);
1484
1485 let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1486 Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1487 let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1489 protocol_id,
1490 genesis_hash,
1491 fork_id,
1492 warp_with_provider.clone(),
1493 );
1494 let config_name = protocol_config.protocol_name().clone();
1495
1496 spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1497 (Some(protocol_config), Some(config_name))
1498 },
1499 _ => (None, None),
1500 };
1501 if let Some(config) = warp_sync_protocol_config {
1502 net_config.add_request_response_protocol(config);
1503 }
1504
1505 let syncing_config = PolkadotSyncingStrategyConfig {
1506 mode: net_config.network_config.sync_mode,
1507 max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1508 max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1509 min_peers_to_start_warp_sync: net_config.network_config.min_peers_to_start_warp_sync,
1510 metrics_registry: metrics_registry.cloned(),
1511 state_request_protocol_name,
1512 block_downloader,
1513 };
1514 Ok(Box::new(PolkadotSyncingStrategy::new(
1515 syncing_config,
1516 client,
1517 warp_sync_config,
1518 warp_sync_protocol_name,
1519 )?))
1520}