1use crate::{
20 build_network_future, build_system_rpc_future,
21 client::{Client, ClientConfig},
22 config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23 error::Error,
24 metrics::MetricsService,
25 start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers,
26 SpawnEssentialTaskHandle, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
27};
28use futures::{select, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::{debug, error, info};
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34 execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35 BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, KeysIter, StorageProvider,
36 TrieCacheContext, UsageProvider,
37};
38use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
39use sc_consensus::import_queue::{ImportQueue, ImportQueueService};
40use sc_executor::{
41 sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
42 WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
43};
44use sc_keystore::LocalKeystore;
45use sc_network::{
46 config::{FullNetworkConfiguration, IpfsConfig, ProtocolId, SyncMode},
47 multiaddr::Protocol,
48 service::{
49 traits::{PeerStore, RequestResponseConfig},
50 NotificationMetrics,
51 },
52 IpfsIndexedTransactions, NetworkBackend, NetworkStateInfo,
53};
54use sc_network_common::role::{Role, Roles};
55use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
56use sc_network_sync::{
57 block_relay_protocol::{BlockDownloader, BlockRelayParams},
58 block_request_handler::BlockRequestHandler,
59 engine::SyncingEngine,
60 service::network::{NetworkServiceHandle, NetworkServiceProvider},
61 state_request_handler::StateRequestHandler,
62 strategy::{
63 polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
64 SyncingStrategy,
65 },
66 warp_request_handler::RequestHandler as WarpSyncRequestHandler,
67 SyncingService, WarpSyncConfig,
68};
69use sc_rpc::{
70 author::AuthorApiServer,
71 chain::ChainApiServer,
72 offchain::OffchainApiServer,
73 state::{ChildStateApiServer, StateApiServer},
74 system::SystemApiServer,
75 DenyUnsafe, SubscriptionTaskExecutor,
76};
77use sc_rpc_spec_v2::{
78 archive::ArchiveApiServer,
79 bitswap::BitswapApiServer,
80 chain_head::ChainHeadApiServer,
81 chain_spec::ChainSpecApiServer,
82 transaction::{TransactionApiServer, TransactionBroadcastApiServer},
83};
84use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
85use sc_tracing::block::TracingExecuteBlock;
86use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
87use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
88use sp_api::{CallApiAt, ProvideRuntimeApi};
89use sp_blockchain::{HeaderBackend, HeaderMetadata};
90use sp_consensus::block_validation::{
91 BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
92};
93use sp_core::traits::{CodeExecutor, SpawnNamed};
94use sp_keystore::KeystorePtr;
95use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
96use sp_storage::{ChildInfo, ChildType, PrefixedStorageKey};
97use std::{
98 str::FromStr,
99 sync::Arc,
100 time::{Duration, SystemTime},
101};
102
103const IPFS_MAX_BLOCKS: u32 = 201600;
106
107pub type TFullClient<TBl, TRtApi, TExec> =
109 Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
110
111pub type TFullBackend<TBl> = Backend<TBl>;
113
114pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
116
117type TFullParts<TBl, TRtApi, TExec> =
118 (TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
119
120pub struct KeystoreContainer(Arc<LocalKeystore>);
122
123impl KeystoreContainer {
124 pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
126 let keystore = Arc::new(match config {
127 KeystoreConfig::Path { path, password } => {
128 LocalKeystore::open(path.clone(), password.clone())?
129 },
130 KeystoreConfig::InMemory => LocalKeystore::in_memory(),
131 });
132
133 Ok(Self(keystore))
134 }
135
136 pub fn keystore(&self) -> KeystorePtr {
138 self.0.clone()
139 }
140
141 pub fn local_keystore(&self) -> Arc<LocalKeystore> {
143 self.0.clone()
144 }
145}
146
147pub fn new_full_client<TBl, TRtApi, TExec>(
149 config: &Configuration,
150 telemetry: Option<TelemetryHandle>,
151 executor: TExec,
152 pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
153) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
154where
155 TBl: BlockT,
156 TExec: CodeExecutor + RuntimeVersionOf + Clone,
157{
158 new_full_parts(config, telemetry, executor, pruning_filters).map(|parts| parts.0)
159}
160
161pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
166 config: &Configuration,
167 telemetry: Option<TelemetryHandle>,
168 executor: TExec,
169 enable_import_proof_recording: bool,
170 pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
171) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
172where
173 TBl: BlockT,
174 TExec: CodeExecutor + RuntimeVersionOf + Clone,
175{
176 let mut db_config = config.db_config();
177 db_config.pruning_filters = pruning_filters;
178 let backend = new_db_backend(db_config)?;
179
180 let genesis_block_builder = GenesisBlockBuilder::new(
181 config.chain_spec.as_storage_builder(),
182 !config.no_genesis(),
183 backend.clone(),
184 executor.clone(),
185 )?;
186
187 new_full_parts_with_genesis_builder(
188 config,
189 telemetry,
190 executor,
191 backend,
192 genesis_block_builder,
193 enable_import_proof_recording,
194 )
195}
196
197pub fn new_full_parts<TBl, TRtApi, TExec>(
202 config: &Configuration,
203 telemetry: Option<TelemetryHandle>,
204 executor: TExec,
205 pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
206) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
207where
208 TBl: BlockT,
209 TExec: CodeExecutor + RuntimeVersionOf + Clone,
210{
211 new_full_parts_record_import(config, telemetry, executor, false, pruning_filters)
212}
213
214pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
216 config: &Configuration,
217 telemetry: Option<TelemetryHandle>,
218 executor: TExec,
219 backend: Arc<TFullBackend<TBl>>,
220 genesis_block_builder: TBuildGenesisBlock,
221 enable_import_proof_recording: bool,
222) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
223where
224 TBl: BlockT,
225 TExec: CodeExecutor + RuntimeVersionOf + Clone,
226 TBuildGenesisBlock: BuildGenesisBlock<
227 TBl,
228 BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
229 >,
230{
231 let keystore_container = KeystoreContainer::new(&config.keystore)?;
232
233 let task_manager = {
234 let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
235 TaskManager::new(config.tokio_handle.clone(), registry)?
236 };
237
238 let chain_spec = &config.chain_spec;
239 let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
240 .cloned()
241 .unwrap_or_default();
242
243 let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
244 .cloned()
245 .unwrap_or_default();
246
247 let client = {
248 let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
249
250 let wasm_runtime_substitutes = config
251 .chain_spec
252 .code_substitutes()
253 .into_iter()
254 .map(|(n, c)| {
255 let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
256 Error::Application(Box::from(format!(
257 "Failed to parse `{}` as block number for code substitutes. \
258 In an old version the key for code substitute was a block hash. \
259 Please update the chain spec to a version that is compatible with your node.",
260 n
261 )))
262 })?;
263 Ok((number, c))
264 })
265 .collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
266
267 let client = new_client(
268 backend.clone(),
269 executor,
270 genesis_block_builder,
271 fork_blocks,
272 bad_blocks,
273 extensions,
274 Box::new(task_manager.spawn_handle()),
275 config.prometheus_config.as_ref().map(|config| config.registry.clone()),
276 telemetry,
277 ClientConfig {
278 offchain_worker_enabled: config.offchain_worker.enabled,
279 offchain_indexing_api: config.offchain_worker.indexing_enabled,
280 wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
281 no_genesis: config.no_genesis(),
282 wasm_runtime_substitutes,
283 enable_import_proof_recording,
284 },
285 )?;
286
287 if let Some(warm_up_strategy) = config.warm_up_trie_cache {
288 let storage_root = client.usage_info().chain.best_hash;
289 let backend_clone = backend.clone();
290
291 if warm_up_strategy.is_blocking() {
292 warm_up_trie_cache(backend_clone, storage_root)?;
295 } else {
296 task_manager.spawn_handle().spawn_blocking(
297 "warm-up-trie-cache",
298 None,
299 async move {
300 if let Err(e) = warm_up_trie_cache(backend_clone, storage_root) {
301 error!("Failed to warm up trie cache: {e}");
302 }
303 },
304 );
305 }
306 }
307
308 client
309 };
310
311 Ok((client, backend, keystore_container, task_manager))
312}
313
314fn child_info(key: Vec<u8>) -> Option<ChildInfo> {
315 let prefixed_key = PrefixedStorageKey::new(key);
316 ChildType::from_prefixed_key(&prefixed_key).and_then(|(child_type, storage_key)| {
317 (child_type == ChildType::ParentKeyId).then(|| ChildInfo::new_default(storage_key))
318 })
319}
320
321fn warm_up_trie_cache<TBl: BlockT>(
322 backend: Arc<TFullBackend<TBl>>,
323 storage_root: TBl::Hash,
324) -> Result<(), Error> {
325 use sc_client_api::backend::Backend;
326 use sp_state_machine::Backend as StateBackend;
327
328 let untrusted_state = || backend.state_at(storage_root, TrieCacheContext::Untrusted);
329 let trusted_state = || backend.state_at(storage_root, TrieCacheContext::Trusted);
330
331 debug!("Populating trie cache started",);
332 let start_time = std::time::Instant::now();
333 let mut keys_count = 0;
334 let mut child_keys_count = 0;
335 for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None)? {
336 if keys_count != 0 && keys_count % 100_000 == 0 {
337 debug!("{} keys and {} child keys have been warmed", keys_count, child_keys_count);
338 }
339 match child_info(key.0.clone()) {
340 Some(info) => {
341 for child_key in
342 KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None)?
343 {
344 if trusted_state()?
345 .child_storage(&info, &child_key.0)
346 .unwrap_or_default()
347 .is_none()
348 {
349 debug!("Child storage value unexpectedly empty: {child_key:?}");
350 }
351 child_keys_count += 1;
352 }
353 },
354 None => {
355 if trusted_state()?.storage(&key.0).unwrap_or_default().is_none() {
356 debug!("Storage value unexpectedly empty: {key:?}");
357 }
358 keys_count += 1;
359 },
360 }
361 }
362 debug!(
363 "Trie cache populated with {keys_count} keys and {child_keys_count} child keys in {} s",
364 start_time.elapsed().as_secs_f32()
365 );
366
367 Ok(())
368}
369
370#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
373#[allow(deprecated)]
374pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
375 config: &Configuration,
376) -> sc_executor::NativeElseWasmExecutor<D> {
377 #[allow(deprecated)]
378 sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
379}
380
381pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
383 let strategy = config
384 .default_heap_pages
385 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
386 WasmExecutor::<H>::builder()
387 .with_execution_method(config.wasm_method)
388 .with_onchain_heap_alloc_strategy(strategy)
389 .with_offchain_heap_alloc_strategy(strategy)
390 .with_max_runtime_instances(config.max_runtime_instances)
391 .with_runtime_cache_size(config.runtime_cache_size)
392 .build()
393}
394
395pub fn new_db_backend<Block>(
400 settings: DatabaseSettings,
401) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
402where
403 Block: BlockT,
404{
405 const CANONICALIZATION_DELAY: u64 = 4096;
406
407 Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
408}
409
410pub fn new_client<E, Block, RA, G>(
412 backend: Arc<Backend<Block>>,
413 executor: E,
414 genesis_block_builder: G,
415 fork_blocks: ForkBlocks<Block>,
416 bad_blocks: BadBlocks<Block>,
417 execution_extensions: ExecutionExtensions<Block>,
418 spawn_handle: Box<dyn SpawnNamed>,
419 prometheus_registry: Option<Registry>,
420 telemetry: Option<TelemetryHandle>,
421 config: ClientConfig<Block>,
422) -> Result<
423 Client<
424 Backend<Block>,
425 crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
426 Block,
427 RA,
428 >,
429 sp_blockchain::Error,
430>
431where
432 Block: BlockT,
433 E: CodeExecutor + RuntimeVersionOf,
434 G: BuildGenesisBlock<
435 Block,
436 BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
437 >,
438{
439 let executor = crate::client::LocalCallExecutor::new(
440 backend.clone(),
441 executor,
442 config.clone(),
443 execution_extensions,
444 )?;
445
446 Client::new(
447 backend,
448 executor,
449 spawn_handle,
450 genesis_block_builder,
451 fork_blocks,
452 bad_blocks,
453 prometheus_registry,
454 telemetry,
455 config,
456 )
457}
458
459pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
461 pub config: Configuration,
463 pub client: Arc<TCl>,
465 pub backend: Arc<Backend>,
467 pub task_manager: &'a mut TaskManager,
469 pub keystore: KeystorePtr,
471 pub transaction_pool: Arc<TExPool>,
473 pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
475 pub network: Arc<dyn sc_network::service::traits::NetworkService>,
477 pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
479 pub tx_handler_controller:
481 sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
482 pub sync_service: Arc<SyncingService<TBl>>,
484 pub telemetry: Option<&'a mut Telemetry>,
486 pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
490}
491
492pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
494 SpawnTasksParams {
495 mut config,
496 task_manager,
497 client,
498 backend,
499 keystore,
500 transaction_pool,
501 rpc_builder,
502 network,
503 system_rpc_tx,
504 tx_handler_controller,
505 sync_service,
506 telemetry,
507 tracing_execute_block: execute_block,
508 }: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
509) -> Result<RpcHandlers, Error>
510where
511 TCl: ProvideRuntimeApi<TBl>
512 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
513 + Chain<TBl>
514 + BlockBackend<TBl>
515 + BlockIdTo<TBl, Error = sp_blockchain::Error>
516 + ProofProvider<TBl>
517 + HeaderBackend<TBl>
518 + BlockchainEvents<TBl>
519 + ExecutorProvider<TBl>
520 + UsageProvider<TBl>
521 + StorageProvider<TBl, TBackend>
522 + CallApiAt<TBl>
523 + Send
524 + 'static,
525 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
526 + sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
527 + sp_session::SessionKeys<TBl>
528 + sp_api::ApiExt<TBl>,
529 TBl: BlockT,
530 TBl::Hash: Unpin,
531 TBl::Header: Unpin,
532 TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
533 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
534{
535 let chain_info = client.usage_info().chain;
536
537 sp_session::generate_initial_session_keys(
538 client.clone(),
539 chain_info.best_hash,
540 config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
541 keystore.clone(),
542 )
543 .map_err(|e| Error::Application(Box::new(e)))?;
544
545 let sysinfo = sc_sysinfo::gather_sysinfo();
546 sc_sysinfo::print_sysinfo(&sysinfo);
547
548 let telemetry = telemetry
549 .map(|telemetry| {
550 init_telemetry(
551 config.network.node_name.clone(),
552 config.impl_name.clone(),
553 config.impl_version.clone(),
554 config.chain_spec.name().to_string(),
555 config.role.is_authority(),
556 network.clone(),
557 client.clone(),
558 telemetry,
559 Some(sysinfo),
560 )
561 })
562 .transpose()?;
563
564 info!("📦 Highest known block at #{}", chain_info.best_number);
565
566 let spawn_handle = task_manager.spawn_handle();
567
568 spawn_handle.spawn(
570 "txpool-notifications",
571 Some("transaction-pool"),
572 sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
573 );
574
575 spawn_handle.spawn(
576 "on-transaction-imported",
577 Some("transaction-pool"),
578 propagate_transaction_notifications(
579 transaction_pool.clone(),
580 tx_handler_controller,
581 telemetry.clone(),
582 ),
583 );
584
585 let metrics_service =
587 if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
588 let metrics = MetricsService::with_prometheus(
590 telemetry,
591 ®istry,
592 config.role,
593 &config.network.node_name,
594 &config.impl_version,
595 )?;
596 spawn_handle.spawn(
597 "prometheus-endpoint",
598 None,
599 prometheus_endpoint::init_prometheus(port, registry).map(drop),
600 );
601
602 metrics
603 } else {
604 MetricsService::new(telemetry)
605 };
606
607 spawn_handle.spawn(
609 "telemetry-periodic-send",
610 None,
611 metrics_service.run(
612 client.clone(),
613 transaction_pool.clone(),
614 network.clone(),
615 sync_service.clone(),
616 ),
617 );
618
619 let rpc_id_provider = config.rpc.id_provider.take();
620
621 let rpc_v2_metrics = config
626 .prometheus_registry()
627 .map(|registry| sc_rpc_spec_v2::transaction::TransactionMetrics::new(registry))
628 .transpose()?;
629
630 let rpc_runtime = sc_rpc_server::create_rpc_runtime(config.rpc.max_connections)
633 .map_err(|e| Error::Application(Box::new(e)))?;
634
635 let rpc_spawn_handle: Arc<dyn sp_core::traits::SpawnNamed> =
637 Arc::new(sc_rpc_server::RpcSpawnHandle::new(rpc_runtime.handle().clone()));
638
639 let gen_rpc_module = || {
641 gen_rpc_module(GenRpcModuleParams {
642 spawn_handle: rpc_spawn_handle.clone(),
643 client: client.clone(),
644 transaction_pool: transaction_pool.clone(),
645 keystore: keystore.clone(),
646 system_rpc_tx: system_rpc_tx.clone(),
647 impl_name: config.impl_name.clone(),
648 impl_version: config.impl_version.clone(),
649 chain_spec: config.chain_spec.as_ref(),
650 state_pruning: &config.state_pruning,
651 blocks_pruning: config.blocks_pruning,
652 backend: backend.clone(),
653 rpc_builder: &*rpc_builder,
654 metrics: rpc_v2_metrics.clone(),
655 sync_oracle: sync_service.clone(),
656 tracing_execute_block: execute_block.clone(),
657 })
658 };
659
660 let rpc_api = gen_rpc_module()?;
662
663 let rpc_server_handle = start_rpc_servers(
664 &config.rpc,
665 config.prometheus_registry(),
666 &config.tokio_handle,
667 rpc_api,
668 rpc_runtime,
669 rpc_id_provider,
670 )?;
671
672 let listen_addrs = rpc_server_handle
673 .listen_addrs()
674 .into_iter()
675 .map(|socket_addr| {
676 let mut multiaddr: Multiaddr = socket_addr.ip().into();
677 multiaddr.push(Protocol::Tcp(socket_addr.port()));
678 multiaddr
679 })
680 .collect();
681
682 let in_memory_rpc = {
684 let mut module = gen_rpc_module()?;
685 module.extensions_mut().insert(DenyUnsafe::No);
686 module
687 };
688
689 let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
690
691 spawn_handle.spawn(
693 "informant",
694 None,
695 sc_informant::build(client.clone(), network, sync_service.clone()),
696 );
697
698 task_manager.keep_alive((config.base_path, rpc_server_handle));
699
700 Ok(in_memory_rpc_handle)
701}
702
703pub async fn propagate_transaction_notifications<Block, ExPool>(
705 transaction_pool: Arc<ExPool>,
706 tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
707 <Block as BlockT>::Hash,
708 >,
709 telemetry: Option<TelemetryHandle>,
710) where
711 Block: BlockT,
712 ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
713{
714 const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);
715
716 let mut notifications = transaction_pool.import_notification_stream().fuse();
718 let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
719 let mut tx_imported = false;
720
721 loop {
722 select! {
723 notification = notifications.next() => {
724 let Some(hash) = notification else { return };
725
726 tx_handler_controller.propagate_transaction(hash);
727
728 tx_imported = true;
729 },
730 _ = timer => {
731 timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
732
733 if !tx_imported {
734 continue;
735 }
736
737 tx_imported = false;
738 let status = transaction_pool.status();
739
740 telemetry!(
741 telemetry;
742 SUBSTRATE_INFO;
743 "txpool.import";
744 "ready" => status.ready,
745 "future" => status.future,
746 );
747 }
748 }
749 }
750}
751
752pub fn init_telemetry<Block, Client, Network>(
754 name: String,
755 implementation: String,
756 version: String,
757 chain: String,
758 authority: bool,
759 network: Network,
760 client: Arc<Client>,
761 telemetry: &mut Telemetry,
762 sysinfo: Option<sc_telemetry::SysInfo>,
763) -> sc_telemetry::Result<TelemetryHandle>
764where
765 Block: BlockT,
766 Client: BlockBackend<Block>,
767 Network: NetworkStateInfo,
768{
769 let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
770 let connection_message = ConnectionMessage {
771 name,
772 implementation,
773 version,
774 target_os: sc_sysinfo::TARGET_OS.into(),
775 target_arch: sc_sysinfo::TARGET_ARCH.into(),
776 target_env: sc_sysinfo::TARGET_ENV.into(),
777 config: String::new(),
778 chain,
779 genesis_hash: format!("{:?}", genesis_hash),
780 authority,
781 startup_time: SystemTime::UNIX_EPOCH
782 .elapsed()
783 .map(|dur| dur.as_millis())
784 .unwrap_or(0)
785 .to_string(),
786 network_id: network.local_peer_id().to_base58(),
787 sysinfo,
788 };
789
790 telemetry.start_telemetry(connection_message)?;
791
792 Ok(telemetry.handle())
793}
794
795pub struct GenRpcModuleParams<'a, TBl: BlockT, TBackend, TCl, TRpc, TExPool> {
797 pub spawn_handle: Arc<dyn sp_core::traits::SpawnNamed>,
799 pub client: Arc<TCl>,
801 pub transaction_pool: Arc<TExPool>,
803 pub keystore: KeystorePtr,
805 pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
807 pub impl_name: String,
809 pub impl_version: String,
811 pub chain_spec: &'a dyn ChainSpec,
813 pub state_pruning: &'a Option<PruningMode>,
815 pub blocks_pruning: BlocksPruning,
817 pub backend: Arc<TBackend>,
819 pub rpc_builder: &'a dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>,
821 pub metrics: Option<sc_rpc_spec_v2::transaction::TransactionMetrics>,
823 pub sync_oracle: Arc<dyn sp_consensus::SyncOracle + Send + Sync>,
825 pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
829}
830
831pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
833 GenRpcModuleParams {
834 spawn_handle,
835 client,
836 transaction_pool,
837 keystore,
838 system_rpc_tx,
839 impl_name,
840 impl_version,
841 chain_spec,
842 state_pruning,
843 blocks_pruning,
844 backend,
845 rpc_builder,
846 metrics,
847 sync_oracle,
848 tracing_execute_block: execute_block,
849 }: GenRpcModuleParams<TBl, TBackend, TCl, TRpc, TExPool>,
850) -> Result<RpcModule<()>, Error>
851where
852 TBl: BlockT,
853 TCl: ProvideRuntimeApi<TBl>
854 + BlockchainEvents<TBl>
855 + HeaderBackend<TBl>
856 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
857 + ExecutorProvider<TBl>
858 + CallApiAt<TBl>
859 + ProofProvider<TBl>
860 + StorageProvider<TBl, TBackend>
861 + BlockBackend<TBl>
862 + Send
863 + Sync
864 + 'static,
865 TBackend: sc_client_api::backend::Backend<TBl> + 'static,
866 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
867 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
868 TBl::Hash: Unpin,
869 TBl::Header: Unpin,
870{
871 let system_info = sc_rpc::system::SystemInfo {
872 chain_name: chain_spec.name().into(),
873 impl_name,
874 impl_version,
875 properties: chain_spec.properties(),
876 chain_type: chain_spec.chain_type(),
877 };
878
879 let mut rpc_api = RpcModule::new(());
880 let task_executor = spawn_handle;
881
882 let (chain, state, child_state) = {
883 let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
884 let (state, child_state) =
885 sc_rpc::state::new_full(client.clone(), task_executor.clone(), execute_block);
886 let state = state.into_rpc();
887 let child_state = child_state.into_rpc();
888
889 (chain, state, child_state)
890 };
891
892 const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
893
894 let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
895 client.clone(),
896 transaction_pool.clone(),
897 task_executor.clone(),
898 MAX_TRANSACTION_PER_CONNECTION,
899 )
900 .into_rpc();
901
902 let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
903 client.clone(),
904 transaction_pool.clone(),
905 task_executor.clone(),
906 metrics,
907 )
908 .into_rpc();
909
910 let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
911 client.clone(),
912 backend.clone(),
913 task_executor.clone(),
914 sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
916 )
917 .into_rpc();
918
919 let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
924 blocks_pruning.is_archive();
925 let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
926 if is_archive_node {
927 let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
928 client.clone(),
929 backend.clone(),
930 genesis_hash,
931 task_executor.clone(),
932 )
933 .into_rpc();
934 rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
935 }
936
937 let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
939 chain_spec.name().into(),
940 genesis_hash,
941 chain_spec.properties(),
942 )
943 .into_rpc();
944
945 let bitswap_v2 = sc_rpc_spec_v2::bitswap::Bitswap::new(client.clone(), sync_oracle).into_rpc();
947
948 let author = sc_rpc::author::Author::new(
949 client.clone(),
950 transaction_pool,
951 keystore,
952 task_executor.clone(),
953 )
954 .into_rpc();
955
956 let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
957
958 if let Some(storage) = backend.offchain_storage() {
959 let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
960
961 rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
962 }
963
964 rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
966 rpc_api
967 .merge(transaction_broadcast_rpc_v2)
968 .map_err(|e| Error::Application(e.into()))?;
969 rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
970 rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
971 rpc_api.merge(bitswap_v2).map_err(|e| Error::Application(e.into()))?;
972
973 rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
975 rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
976 rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
977 rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
978 rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
979 let extra_rpcs = rpc_builder(task_executor.clone())?;
981 rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
982
983 Ok(rpc_api)
984}
985
986pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
988where
989 Block: BlockT,
990 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
991{
992 pub config: &'a Configuration,
994 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
996 pub client: Arc<Client>,
998 pub transaction_pool: Arc<TxPool>,
1000 pub spawn_handle: SpawnTaskHandle,
1002 pub spawn_essential_handle: SpawnEssentialTaskHandle,
1004 pub import_queue: IQ,
1006 pub block_announce_validator_builder: Option<
1008 Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
1009 >,
1010 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1012 pub block_relay: Option<BlockRelayParams<Block, Net>>,
1015 pub metrics: NotificationMetrics,
1017}
1018
1019pub fn build_network<Block, Net, TxPool, IQ, Client>(
1021 params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
1022) -> Result<
1023 (
1024 Arc<dyn sc_network::service::traits::NetworkService>,
1025 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
1026 sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1027 Arc<SyncingService<Block>>,
1028 ),
1029 Error,
1030>
1031where
1032 Block: BlockT,
1033 Client: ProvideRuntimeApi<Block>
1034 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1035 + Chain<Block>
1036 + BlockBackend<Block>
1037 + BlockIdTo<Block, Error = sp_blockchain::Error>
1038 + ProofProvider<Block>
1039 + HeaderBackend<Block>
1040 + BlockchainEvents<Block>
1041 + 'static,
1042 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1043 IQ: ImportQueue<Block> + 'static,
1044 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1045{
1046 let BuildNetworkParams {
1047 config,
1048 mut net_config,
1049 client,
1050 transaction_pool,
1051 spawn_handle,
1052 spawn_essential_handle,
1053 import_queue,
1054 block_announce_validator_builder,
1055 warp_sync_config,
1056 block_relay,
1057 metrics,
1058 } = params;
1059
1060 let block_announce_validator = if let Some(f) = block_announce_validator_builder {
1061 f(client.clone())
1062 } else {
1063 Box::new(DefaultBlockAnnounceValidator)
1064 };
1065
1066 let network_service_provider = NetworkServiceProvider::new();
1067 let protocol_id = config.protocol_id();
1068 let fork_id = config.chain_spec.fork_id();
1069 let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
1070
1071 let block_downloader = match block_relay {
1072 Some(params) => {
1073 let BlockRelayParams { mut server, downloader, request_response_config } = params;
1074
1075 net_config.add_request_response_protocol(request_response_config);
1076
1077 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1078 server.run().await;
1079 });
1080
1081 downloader
1082 },
1083 None => build_default_block_downloader(
1084 &protocol_id,
1085 fork_id,
1086 &mut net_config,
1087 network_service_provider.handle(),
1088 Arc::clone(&client),
1089 config.network.default_peers_set.in_peers as usize +
1090 config.network.default_peers_set.out_peers as usize,
1091 &spawn_handle,
1092 ),
1093 };
1094
1095 let syncing_strategy = build_polkadot_syncing_strategy(
1096 protocol_id.clone(),
1097 fork_id,
1098 &mut net_config,
1099 warp_sync_config,
1100 block_downloader,
1101 client.clone(),
1102 &spawn_handle,
1103 metrics_registry,
1104 config.blocks_pruning.is_archive(),
1105 )?;
1106
1107 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1108 Roles::from(&config.role),
1109 Arc::clone(&client),
1110 metrics_registry,
1111 metrics.clone(),
1112 &net_config,
1113 protocol_id.clone(),
1114 fork_id,
1115 block_announce_validator,
1116 syncing_strategy,
1117 network_service_provider.handle(),
1118 import_queue.service(),
1119 net_config.peer_store_handle(),
1120 )?;
1121
1122 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1123
1124 build_network_advanced(BuildNetworkAdvancedParams {
1125 role: config.role,
1126 protocol_id,
1127 fork_id,
1128 announce_block: config.announce_block,
1129 net_config,
1130 client,
1131 transaction_pool,
1132 spawn_handle,
1133 spawn_essential_handle,
1134 import_queue,
1135 sync_service,
1136 block_announce_config,
1137 network_service_provider,
1138 metrics_registry,
1139 metrics,
1140 blocks_pruning: config.blocks_pruning,
1141 })
1142}
1143
1144pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
1146where
1147 Block: BlockT,
1148 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1149{
1150 pub role: Role,
1152 pub protocol_id: ProtocolId,
1154 pub fork_id: Option<&'a str>,
1156 pub announce_block: bool,
1158 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1160 pub client: Arc<Client>,
1162 pub transaction_pool: Arc<TxPool>,
1164 pub spawn_handle: SpawnTaskHandle,
1166 pub spawn_essential_handle: SpawnEssentialTaskHandle,
1168 pub import_queue: IQ,
1170 pub sync_service: SyncingService<Block>,
1172 pub block_announce_config: Net::NotificationProtocolConfig,
1174 pub network_service_provider: NetworkServiceProvider,
1176 pub metrics_registry: Option<&'a Registry>,
1178 pub metrics: NotificationMetrics,
1180 pub blocks_pruning: BlocksPruning,
1182}
1183
1184pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
1187 params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
1188) -> Result<
1189 (
1190 Arc<dyn sc_network::service::traits::NetworkService>,
1191 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
1192 sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1193 Arc<SyncingService<Block>>,
1194 ),
1195 Error,
1196>
1197where
1198 Block: BlockT,
1199 Client: ProvideRuntimeApi<Block>
1200 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1201 + Chain<Block>
1202 + BlockBackend<Block>
1203 + BlockIdTo<Block, Error = sp_blockchain::Error>
1204 + ProofProvider<Block>
1205 + HeaderBackend<Block>
1206 + BlockchainEvents<Block>
1207 + 'static,
1208 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1209 IQ: ImportQueue<Block> + 'static,
1210 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1211{
1212 let BuildNetworkAdvancedParams {
1213 role,
1214 protocol_id,
1215 fork_id,
1216 announce_block,
1217 mut net_config,
1218 client,
1219 transaction_pool,
1220 spawn_handle,
1221 spawn_essential_handle,
1222 import_queue,
1223 sync_service,
1224 block_announce_config,
1225 network_service_provider,
1226 metrics_registry,
1227 metrics,
1228 blocks_pruning,
1229 } = params;
1230
1231 let genesis_hash = client.info().genesis_hash;
1232
1233 let light_client_request_protocol_config = {
1234 let (handler, protocol_config) =
1236 LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1237 spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1238 protocol_config
1239 };
1240
1241 net_config.add_request_response_protocol(light_client_request_protocol_config);
1243
1244 let ipfs_config = net_config.network_config.ipfs_server.then(|| {
1246 let (handler, bitswap_config) = Net::bitswap_server(client.clone());
1247 spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1248
1249 let ipfs_num_blocks = match blocks_pruning {
1250 BlocksPruning::KeepAll | BlocksPruning::KeepFinalized => IPFS_MAX_BLOCKS,
1251 BlocksPruning::Some(num) => std::cmp::min(num, IPFS_MAX_BLOCKS),
1252 };
1253
1254 IpfsConfig {
1255 bitswap_config,
1256 block_provider: Box::new(IpfsIndexedTransactions::new(client.clone(), ipfs_num_blocks)),
1257 bootnodes: net_config.network_config.ipfs_bootnodes.clone(),
1258 }
1259 });
1260
1261 let (transactions_handler_proto, transactions_config) =
1263 sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1264 protocol_id.clone(),
1265 genesis_hash,
1266 fork_id,
1267 metrics.clone(),
1268 net_config.peer_store_handle(),
1269 );
1270 net_config.add_notification_protocol(transactions_config);
1271
1272 let peer_store = net_config.take_peer_store();
1274 spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1275
1276 let sync_service = Arc::new(sync_service);
1277
1278 let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1279 role,
1280 executor: {
1281 let spawn_handle = Clone::clone(&spawn_handle);
1282 Box::new(move |fut| {
1283 spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1284 })
1285 },
1286 network_config: net_config,
1287 genesis_hash,
1288 protocol_id,
1289 fork_id: fork_id.map(ToOwned::to_owned),
1290 metrics_registry: metrics_registry.cloned(),
1291 block_announce_config,
1292 ipfs_config,
1293 notification_metrics: metrics,
1294 };
1295
1296 let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1297 let network_mut = Net::new(network_params)?;
1298 let network = network_mut.network_service().clone();
1299
1300 let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1301 network.clone(),
1302 sync_service.clone(),
1303 Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1304 metrics_registry,
1305 )?;
1306 spawn_handle.spawn_blocking(
1307 "network-transactions-handler",
1308 Some("networking"),
1309 tx_handler.run(),
1310 );
1311
1312 spawn_handle.spawn_blocking(
1313 "chain-sync-network-service-provider",
1314 Some("networking"),
1315 network_service_provider.run(Arc::new(network.clone())),
1316 );
1317 spawn_handle.spawn("import-queue", None, {
1318 let sync_service = sync_service.clone();
1319
1320 async move { import_queue.run(sync_service.as_ref()).await }
1321 });
1322
1323 let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1324 spawn_handle.spawn(
1325 "system-rpc-handler",
1326 Some("networking"),
1327 build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1328 role,
1329 network_mut.network_service(),
1330 sync_service.clone(),
1331 client.clone(),
1332 system_rpc_rx,
1333 has_bootnodes,
1334 ),
1335 );
1336
1337 let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1338 network_mut,
1339 client,
1340 sync_service.clone(),
1341 announce_block,
1342 );
1343
1344 spawn_essential_handle.spawn_blocking("network-worker", Some("networking"), future);
1355
1356 Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone()))
1357}
1358
1359pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1361where
1362 Block: BlockT,
1363 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1364{
1365 pub role: Role,
1367 pub protocol_id: ProtocolId,
1369 pub fork_id: Option<&'a str>,
1371 pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1373 pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1375 pub network_service_handle: NetworkServiceHandle,
1377 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1379 pub client: Arc<Client>,
1381 pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1383 pub num_peers_hint: usize,
1385 pub spawn_handle: &'a SpawnTaskHandle,
1387 pub metrics_registry: Option<&'a Registry>,
1389 pub metrics: NotificationMetrics,
1391 pub archive_blocks: bool,
1394}
1395
1396pub fn build_default_syncing_engine<Block, Client, Net>(
1399 config: DefaultSyncingEngineConfig<Block, Client, Net>,
1400) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1401where
1402 Block: BlockT,
1403 Client: HeaderBackend<Block>
1404 + BlockBackend<Block>
1405 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1406 + ProofProvider<Block>
1407 + Send
1408 + Sync
1409 + 'static,
1410 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1411{
1412 let DefaultSyncingEngineConfig {
1413 role,
1414 protocol_id,
1415 fork_id,
1416 net_config,
1417 block_announce_validator,
1418 network_service_handle,
1419 warp_sync_config,
1420 client,
1421 import_queue_service,
1422 num_peers_hint,
1423 spawn_handle,
1424 metrics_registry,
1425 metrics,
1426 archive_blocks,
1427 } = config;
1428
1429 let block_downloader = build_default_block_downloader(
1430 &protocol_id,
1431 fork_id,
1432 net_config,
1433 network_service_handle.clone(),
1434 client.clone(),
1435 num_peers_hint,
1436 spawn_handle,
1437 );
1438 let syncing_strategy = build_polkadot_syncing_strategy(
1439 protocol_id.clone(),
1440 fork_id,
1441 net_config,
1442 warp_sync_config,
1443 block_downloader,
1444 client.clone(),
1445 spawn_handle,
1446 metrics_registry,
1447 archive_blocks,
1448 )?;
1449
1450 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1451 Roles::from(&role),
1452 client,
1453 metrics_registry,
1454 metrics,
1455 &net_config,
1456 protocol_id,
1457 fork_id,
1458 block_announce_validator,
1459 syncing_strategy,
1460 network_service_handle,
1461 import_queue_service,
1462 net_config.peer_store_handle(),
1463 )?;
1464
1465 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1466
1467 Ok((sync_service, block_announce_config))
1468}
1469
1470pub fn build_default_block_downloader<Block, Client, Net>(
1472 protocol_id: &ProtocolId,
1473 fork_id: Option<&str>,
1474 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1475 network_service_handle: NetworkServiceHandle,
1476 client: Arc<Client>,
1477 num_peers_hint: usize,
1478 spawn_handle: &SpawnTaskHandle,
1479) -> Arc<dyn BlockDownloader<Block>>
1480where
1481 Block: BlockT,
1482 Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1483 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1484{
1485 let BlockRelayParams { mut server, downloader, request_response_config } =
1488 BlockRequestHandler::new::<Net>(
1489 network_service_handle,
1490 &protocol_id,
1491 fork_id,
1492 client.clone(),
1493 num_peers_hint,
1494 );
1495
1496 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1497 server.run().await;
1498 });
1499
1500 net_config.add_request_response_protocol(request_response_config);
1501
1502 downloader
1503}
1504
1505pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1507 protocol_id: ProtocolId,
1508 fork_id: Option<&str>,
1509 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1510 warp_sync_config: Option<WarpSyncConfig<Block>>,
1511 block_downloader: Arc<dyn BlockDownloader<Block>>,
1512 client: Arc<Client>,
1513 spawn_handle: &SpawnTaskHandle,
1514 metrics_registry: Option<&Registry>,
1515 archive_blocks: bool,
1516) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1517where
1518 Block: BlockT,
1519 Client: HeaderBackend<Block>
1520 + BlockBackend<Block>
1521 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1522 + ProofProvider<Block>
1523 + Send
1524 + Sync
1525 + 'static,
1526 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1527{
1528 if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1529 return Err("Warp sync enabled, but no warp sync provider configured.".into());
1530 }
1531
1532 if client.requires_full_sync() {
1533 match net_config.network_config.sync_mode {
1534 SyncMode::LightState { .. } => {
1535 return Err("Fast sync doesn't work for archive nodes".into())
1536 },
1537 SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1538 SyncMode::Full => {},
1539 }
1540 }
1541
1542 let genesis_hash = client.info().genesis_hash;
1543
1544 let (state_request_protocol_config, state_request_protocol_name) = {
1545 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1546 net_config.network_config.default_peers_set.reserved_nodes.len();
1547 let (handler, protocol_config) =
1549 StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1550 let config_name = protocol_config.protocol_name().clone();
1551
1552 spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1553 (protocol_config, config_name)
1554 };
1555 net_config.add_request_response_protocol(state_request_protocol_config);
1556
1557 let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1558 Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1559 let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1561 protocol_id,
1562 genesis_hash,
1563 fork_id,
1564 warp_with_provider.clone(),
1565 );
1566 let config_name = protocol_config.protocol_name().clone();
1567
1568 spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1569 (Some(protocol_config), Some(config_name))
1570 },
1571 _ => (None, None),
1572 };
1573 if let Some(config) = warp_sync_protocol_config {
1574 net_config.add_request_response_protocol(config);
1575 }
1576
1577 let syncing_config = PolkadotSyncingStrategyConfig {
1578 mode: net_config.network_config.sync_mode,
1579 max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1580 max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1581 min_peers_to_start_warp_sync: net_config.network_config.min_peers_to_start_warp_sync,
1582 metrics_registry: metrics_registry.cloned(),
1583 state_request_protocol_name,
1584 block_downloader,
1585 archive_blocks,
1586 };
1587 Ok(Box::new(PolkadotSyncingStrategy::new(
1588 syncing_config,
1589 client,
1590 warp_sync_config,
1591 warp_sync_protocol_name,
1592 )?))
1593}