1use crate::{
20 build_network_future, build_system_rpc_future,
21 client::{Client, ClientConfig},
22 config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23 error::Error,
24 metrics::MetricsService,
25 start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers,
26 SpawnEssentialTaskHandle, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
27};
28use futures::{select, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::{debug, error, info};
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34 execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35 BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, KeysIter, StorageProvider,
36 TrieCacheContext, UsageProvider,
37};
38use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
39use sc_consensus::import_queue::{ImportQueue, ImportQueueService};
40use sc_executor::{
41 sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
42 WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
43};
44use sc_keystore::LocalKeystore;
45use sc_network::{
46 config::{FullNetworkConfiguration, ProtocolId, SyncMode},
47 multiaddr::Protocol,
48 service::{
49 traits::{PeerStore, RequestResponseConfig},
50 NotificationMetrics,
51 },
52 NetworkBackend, NetworkStateInfo,
53};
54use sc_network_common::role::{Role, Roles};
55use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
56use sc_network_sync::{
57 block_relay_protocol::{BlockDownloader, BlockRelayParams},
58 block_request_handler::BlockRequestHandler,
59 engine::SyncingEngine,
60 service::network::{NetworkServiceHandle, NetworkServiceProvider},
61 state_request_handler::StateRequestHandler,
62 strategy::{
63 polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
64 SyncingStrategy,
65 },
66 warp_request_handler::RequestHandler as WarpSyncRequestHandler,
67 SyncingService, WarpSyncConfig,
68};
69use sc_rpc::{
70 author::AuthorApiServer,
71 chain::ChainApiServer,
72 offchain::OffchainApiServer,
73 state::{ChildStateApiServer, StateApiServer},
74 system::SystemApiServer,
75 DenyUnsafe, SubscriptionTaskExecutor,
76};
77use sc_rpc_spec_v2::{
78 archive::ArchiveApiServer,
79 chain_head::ChainHeadApiServer,
80 chain_spec::ChainSpecApiServer,
81 transaction::{TransactionApiServer, TransactionBroadcastApiServer},
82};
83use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
84use sc_tracing::block::TracingExecuteBlock;
85use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
86use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
87use sp_api::{CallApiAt, ProvideRuntimeApi};
88use sp_blockchain::{HeaderBackend, HeaderMetadata};
89use sp_consensus::block_validation::{
90 BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
91};
92use sp_core::traits::{CodeExecutor, SpawnNamed};
93use sp_keystore::KeystorePtr;
94use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
95use sp_storage::{ChildInfo, ChildType, PrefixedStorageKey};
96use std::{
97 str::FromStr,
98 sync::Arc,
99 time::{Duration, SystemTime},
100};
101
102pub type TFullClient<TBl, TRtApi, TExec> =
104 Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
105
106pub type TFullBackend<TBl> = Backend<TBl>;
108
109pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
111
112type TFullParts<TBl, TRtApi, TExec> =
113 (TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
114
115pub struct KeystoreContainer(Arc<LocalKeystore>);
117
118impl KeystoreContainer {
119 pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
121 let keystore = Arc::new(match config {
122 KeystoreConfig::Path { path, password } => {
123 LocalKeystore::open(path.clone(), password.clone())?
124 },
125 KeystoreConfig::InMemory => LocalKeystore::in_memory(),
126 });
127
128 Ok(Self(keystore))
129 }
130
131 pub fn keystore(&self) -> KeystorePtr {
133 self.0.clone()
134 }
135
136 pub fn local_keystore(&self) -> Arc<LocalKeystore> {
138 self.0.clone()
139 }
140}
141
142pub fn new_full_client<TBl, TRtApi, TExec>(
144 config: &Configuration,
145 telemetry: Option<TelemetryHandle>,
146 executor: TExec,
147 pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
148) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
149where
150 TBl: BlockT,
151 TExec: CodeExecutor + RuntimeVersionOf + Clone,
152{
153 new_full_parts(config, telemetry, executor, pruning_filters).map(|parts| parts.0)
154}
155
156pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
161 config: &Configuration,
162 telemetry: Option<TelemetryHandle>,
163 executor: TExec,
164 enable_import_proof_recording: bool,
165 pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
166) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
167where
168 TBl: BlockT,
169 TExec: CodeExecutor + RuntimeVersionOf + Clone,
170{
171 let mut db_config = config.db_config();
172 db_config.pruning_filters = pruning_filters;
173 let backend = new_db_backend(db_config)?;
174
175 let genesis_block_builder = GenesisBlockBuilder::new(
176 config.chain_spec.as_storage_builder(),
177 !config.no_genesis(),
178 backend.clone(),
179 executor.clone(),
180 )?;
181
182 new_full_parts_with_genesis_builder(
183 config,
184 telemetry,
185 executor,
186 backend,
187 genesis_block_builder,
188 enable_import_proof_recording,
189 )
190}
191
192pub fn new_full_parts<TBl, TRtApi, TExec>(
197 config: &Configuration,
198 telemetry: Option<TelemetryHandle>,
199 executor: TExec,
200 pruning_filters: Vec<Arc<dyn sc_client_db::PruningFilter>>,
201) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
202where
203 TBl: BlockT,
204 TExec: CodeExecutor + RuntimeVersionOf + Clone,
205{
206 new_full_parts_record_import(config, telemetry, executor, false, pruning_filters)
207}
208
209pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
211 config: &Configuration,
212 telemetry: Option<TelemetryHandle>,
213 executor: TExec,
214 backend: Arc<TFullBackend<TBl>>,
215 genesis_block_builder: TBuildGenesisBlock,
216 enable_import_proof_recording: bool,
217) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
218where
219 TBl: BlockT,
220 TExec: CodeExecutor + RuntimeVersionOf + Clone,
221 TBuildGenesisBlock: BuildGenesisBlock<
222 TBl,
223 BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
224 >,
225{
226 let keystore_container = KeystoreContainer::new(&config.keystore)?;
227
228 let task_manager = {
229 let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
230 TaskManager::new(config.tokio_handle.clone(), registry)?
231 };
232
233 let chain_spec = &config.chain_spec;
234 let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
235 .cloned()
236 .unwrap_or_default();
237
238 let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
239 .cloned()
240 .unwrap_or_default();
241
242 let client = {
243 let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
244
245 let wasm_runtime_substitutes = config
246 .chain_spec
247 .code_substitutes()
248 .into_iter()
249 .map(|(n, c)| {
250 let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
251 Error::Application(Box::from(format!(
252 "Failed to parse `{}` as block number for code substitutes. \
253 In an old version the key for code substitute was a block hash. \
254 Please update the chain spec to a version that is compatible with your node.",
255 n
256 )))
257 })?;
258 Ok((number, c))
259 })
260 .collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
261
262 let client = new_client(
263 backend.clone(),
264 executor,
265 genesis_block_builder,
266 fork_blocks,
267 bad_blocks,
268 extensions,
269 Box::new(task_manager.spawn_handle()),
270 config.prometheus_config.as_ref().map(|config| config.registry.clone()),
271 telemetry,
272 ClientConfig {
273 offchain_worker_enabled: config.offchain_worker.enabled,
274 offchain_indexing_api: config.offchain_worker.indexing_enabled,
275 wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
276 no_genesis: config.no_genesis(),
277 wasm_runtime_substitutes,
278 enable_import_proof_recording,
279 },
280 )?;
281
282 if let Some(warm_up_strategy) = config.warm_up_trie_cache {
283 let storage_root = client.usage_info().chain.best_hash;
284 let backend_clone = backend.clone();
285
286 if warm_up_strategy.is_blocking() {
287 warm_up_trie_cache(backend_clone, storage_root)?;
290 } else {
291 task_manager.spawn_handle().spawn_blocking(
292 "warm-up-trie-cache",
293 None,
294 async move {
295 if let Err(e) = warm_up_trie_cache(backend_clone, storage_root) {
296 error!("Failed to warm up trie cache: {e}");
297 }
298 },
299 );
300 }
301 }
302
303 client
304 };
305
306 Ok((client, backend, keystore_container, task_manager))
307}
308
309fn child_info(key: Vec<u8>) -> Option<ChildInfo> {
310 let prefixed_key = PrefixedStorageKey::new(key);
311 ChildType::from_prefixed_key(&prefixed_key).and_then(|(child_type, storage_key)| {
312 (child_type == ChildType::ParentKeyId).then(|| ChildInfo::new_default(storage_key))
313 })
314}
315
316fn warm_up_trie_cache<TBl: BlockT>(
317 backend: Arc<TFullBackend<TBl>>,
318 storage_root: TBl::Hash,
319) -> Result<(), Error> {
320 use sc_client_api::backend::Backend;
321 use sp_state_machine::Backend as StateBackend;
322
323 let untrusted_state = || backend.state_at(storage_root, TrieCacheContext::Untrusted);
324 let trusted_state = || backend.state_at(storage_root, TrieCacheContext::Trusted);
325
326 debug!("Populating trie cache started",);
327 let start_time = std::time::Instant::now();
328 let mut keys_count = 0;
329 let mut child_keys_count = 0;
330 for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None)? {
331 if keys_count != 0 && keys_count % 100_000 == 0 {
332 debug!("{} keys and {} child keys have been warmed", keys_count, child_keys_count);
333 }
334 match child_info(key.0.clone()) {
335 Some(info) => {
336 for child_key in
337 KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None)?
338 {
339 if trusted_state()?
340 .child_storage(&info, &child_key.0)
341 .unwrap_or_default()
342 .is_none()
343 {
344 debug!("Child storage value unexpectedly empty: {child_key:?}");
345 }
346 child_keys_count += 1;
347 }
348 },
349 None => {
350 if trusted_state()?.storage(&key.0).unwrap_or_default().is_none() {
351 debug!("Storage value unexpectedly empty: {key:?}");
352 }
353 keys_count += 1;
354 },
355 }
356 }
357 debug!(
358 "Trie cache populated with {keys_count} keys and {child_keys_count} child keys in {} s",
359 start_time.elapsed().as_secs_f32()
360 );
361
362 Ok(())
363}
364
365#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
368#[allow(deprecated)]
369pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
370 config: &Configuration,
371) -> sc_executor::NativeElseWasmExecutor<D> {
372 #[allow(deprecated)]
373 sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
374}
375
376pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
378 let strategy = config
379 .default_heap_pages
380 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
381 WasmExecutor::<H>::builder()
382 .with_execution_method(config.wasm_method)
383 .with_onchain_heap_alloc_strategy(strategy)
384 .with_offchain_heap_alloc_strategy(strategy)
385 .with_max_runtime_instances(config.max_runtime_instances)
386 .with_runtime_cache_size(config.runtime_cache_size)
387 .build()
388}
389
390pub fn new_db_backend<Block>(
395 settings: DatabaseSettings,
396) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
397where
398 Block: BlockT,
399{
400 const CANONICALIZATION_DELAY: u64 = 4096;
401
402 Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
403}
404
405pub fn new_client<E, Block, RA, G>(
407 backend: Arc<Backend<Block>>,
408 executor: E,
409 genesis_block_builder: G,
410 fork_blocks: ForkBlocks<Block>,
411 bad_blocks: BadBlocks<Block>,
412 execution_extensions: ExecutionExtensions<Block>,
413 spawn_handle: Box<dyn SpawnNamed>,
414 prometheus_registry: Option<Registry>,
415 telemetry: Option<TelemetryHandle>,
416 config: ClientConfig<Block>,
417) -> Result<
418 Client<
419 Backend<Block>,
420 crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
421 Block,
422 RA,
423 >,
424 sp_blockchain::Error,
425>
426where
427 Block: BlockT,
428 E: CodeExecutor + RuntimeVersionOf,
429 G: BuildGenesisBlock<
430 Block,
431 BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
432 >,
433{
434 let executor = crate::client::LocalCallExecutor::new(
435 backend.clone(),
436 executor,
437 config.clone(),
438 execution_extensions,
439 )?;
440
441 Client::new(
442 backend,
443 executor,
444 spawn_handle,
445 genesis_block_builder,
446 fork_blocks,
447 bad_blocks,
448 prometheus_registry,
449 telemetry,
450 config,
451 )
452}
453
454pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
456 pub config: Configuration,
458 pub client: Arc<TCl>,
460 pub backend: Arc<Backend>,
462 pub task_manager: &'a mut TaskManager,
464 pub keystore: KeystorePtr,
466 pub transaction_pool: Arc<TExPool>,
468 pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
470 pub network: Arc<dyn sc_network::service::traits::NetworkService>,
472 pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
474 pub tx_handler_controller:
476 sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
477 pub sync_service: Arc<SyncingService<TBl>>,
479 pub telemetry: Option<&'a mut Telemetry>,
481 pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
485}
486
487pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
489 SpawnTasksParams {
490 mut config,
491 task_manager,
492 client,
493 backend,
494 keystore,
495 transaction_pool,
496 rpc_builder,
497 network,
498 system_rpc_tx,
499 tx_handler_controller,
500 sync_service,
501 telemetry,
502 tracing_execute_block: execute_block,
503 }: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
504) -> Result<RpcHandlers, Error>
505where
506 TCl: ProvideRuntimeApi<TBl>
507 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
508 + Chain<TBl>
509 + BlockBackend<TBl>
510 + BlockIdTo<TBl, Error = sp_blockchain::Error>
511 + ProofProvider<TBl>
512 + HeaderBackend<TBl>
513 + BlockchainEvents<TBl>
514 + ExecutorProvider<TBl>
515 + UsageProvider<TBl>
516 + StorageProvider<TBl, TBackend>
517 + CallApiAt<TBl>
518 + Send
519 + 'static,
520 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
521 + sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
522 + sp_session::SessionKeys<TBl>
523 + sp_api::ApiExt<TBl>,
524 TBl: BlockT,
525 TBl::Hash: Unpin,
526 TBl::Header: Unpin,
527 TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
528 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
529{
530 let chain_info = client.usage_info().chain;
531
532 sp_session::generate_initial_session_keys(
533 client.clone(),
534 chain_info.best_hash,
535 config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
536 keystore.clone(),
537 )
538 .map_err(|e| Error::Application(Box::new(e)))?;
539
540 let sysinfo = sc_sysinfo::gather_sysinfo();
541 sc_sysinfo::print_sysinfo(&sysinfo);
542
543 let telemetry = telemetry
544 .map(|telemetry| {
545 init_telemetry(
546 config.network.node_name.clone(),
547 config.impl_name.clone(),
548 config.impl_version.clone(),
549 config.chain_spec.name().to_string(),
550 config.role.is_authority(),
551 network.clone(),
552 client.clone(),
553 telemetry,
554 Some(sysinfo),
555 )
556 })
557 .transpose()?;
558
559 info!("📦 Highest known block at #{}", chain_info.best_number);
560
561 let spawn_handle = task_manager.spawn_handle();
562
563 spawn_handle.spawn(
565 "txpool-notifications",
566 Some("transaction-pool"),
567 sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
568 );
569
570 spawn_handle.spawn(
571 "on-transaction-imported",
572 Some("transaction-pool"),
573 propagate_transaction_notifications(
574 transaction_pool.clone(),
575 tx_handler_controller,
576 telemetry.clone(),
577 ),
578 );
579
580 let metrics_service =
582 if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
583 let metrics = MetricsService::with_prometheus(
585 telemetry,
586 ®istry,
587 config.role,
588 &config.network.node_name,
589 &config.impl_version,
590 )?;
591 spawn_handle.spawn(
592 "prometheus-endpoint",
593 None,
594 prometheus_endpoint::init_prometheus(port, registry).map(drop),
595 );
596
597 metrics
598 } else {
599 MetricsService::new(telemetry)
600 };
601
602 spawn_handle.spawn(
604 "telemetry-periodic-send",
605 None,
606 metrics_service.run(
607 client.clone(),
608 transaction_pool.clone(),
609 network.clone(),
610 sync_service.clone(),
611 ),
612 );
613
614 let rpc_id_provider = config.rpc.id_provider.take();
615
616 let rpc_v2_metrics = config
621 .prometheus_registry()
622 .map(|registry| sc_rpc_spec_v2::transaction::TransactionMetrics::new(registry))
623 .transpose()?;
624
625 let gen_rpc_module = || {
626 gen_rpc_module(GenRpcModuleParams {
627 spawn_handle: task_manager.spawn_handle(),
628 client: client.clone(),
629 transaction_pool: transaction_pool.clone(),
630 keystore: keystore.clone(),
631 system_rpc_tx: system_rpc_tx.clone(),
632 impl_name: config.impl_name.clone(),
633 impl_version: config.impl_version.clone(),
634 chain_spec: config.chain_spec.as_ref(),
635 state_pruning: &config.state_pruning,
636 blocks_pruning: config.blocks_pruning,
637 backend: backend.clone(),
638 rpc_builder: &*rpc_builder,
639 metrics: rpc_v2_metrics.clone(),
640 tracing_execute_block: execute_block.clone(),
641 })
642 };
643
644 let rpc_server_handle = start_rpc_servers(
645 &config.rpc,
646 config.prometheus_registry(),
647 &config.tokio_handle,
648 gen_rpc_module,
649 rpc_id_provider,
650 )?;
651
652 let listen_addrs = rpc_server_handle
653 .listen_addrs()
654 .into_iter()
655 .map(|socket_addr| {
656 let mut multiaddr: Multiaddr = socket_addr.ip().into();
657 multiaddr.push(Protocol::Tcp(socket_addr.port()));
658 multiaddr
659 })
660 .collect();
661
662 let in_memory_rpc = {
663 let mut module = gen_rpc_module()?;
664 module.extensions_mut().insert(DenyUnsafe::No);
665 module
666 };
667
668 let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
669
670 spawn_handle.spawn(
672 "informant",
673 None,
674 sc_informant::build(client.clone(), network, sync_service.clone()),
675 );
676
677 task_manager.keep_alive((config.base_path, rpc_server_handle));
678
679 Ok(in_memory_rpc_handle)
680}
681
682pub async fn propagate_transaction_notifications<Block, ExPool>(
684 transaction_pool: Arc<ExPool>,
685 tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
686 <Block as BlockT>::Hash,
687 >,
688 telemetry: Option<TelemetryHandle>,
689) where
690 Block: BlockT,
691 ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
692{
693 const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);
694
695 let mut notifications = transaction_pool.import_notification_stream().fuse();
697 let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
698 let mut tx_imported = false;
699
700 loop {
701 select! {
702 notification = notifications.next() => {
703 let Some(hash) = notification else { return };
704
705 tx_handler_controller.propagate_transaction(hash);
706
707 tx_imported = true;
708 },
709 _ = timer => {
710 timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
711
712 if !tx_imported {
713 continue;
714 }
715
716 tx_imported = false;
717 let status = transaction_pool.status();
718
719 telemetry!(
720 telemetry;
721 SUBSTRATE_INFO;
722 "txpool.import";
723 "ready" => status.ready,
724 "future" => status.future,
725 );
726 }
727 }
728 }
729}
730
731pub fn init_telemetry<Block, Client, Network>(
733 name: String,
734 implementation: String,
735 version: String,
736 chain: String,
737 authority: bool,
738 network: Network,
739 client: Arc<Client>,
740 telemetry: &mut Telemetry,
741 sysinfo: Option<sc_telemetry::SysInfo>,
742) -> sc_telemetry::Result<TelemetryHandle>
743where
744 Block: BlockT,
745 Client: BlockBackend<Block>,
746 Network: NetworkStateInfo,
747{
748 let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
749 let connection_message = ConnectionMessage {
750 name,
751 implementation,
752 version,
753 target_os: sc_sysinfo::TARGET_OS.into(),
754 target_arch: sc_sysinfo::TARGET_ARCH.into(),
755 target_env: sc_sysinfo::TARGET_ENV.into(),
756 config: String::new(),
757 chain,
758 genesis_hash: format!("{:?}", genesis_hash),
759 authority,
760 startup_time: SystemTime::UNIX_EPOCH
761 .elapsed()
762 .map(|dur| dur.as_millis())
763 .unwrap_or(0)
764 .to_string(),
765 network_id: network.local_peer_id().to_base58(),
766 sysinfo,
767 };
768
769 telemetry.start_telemetry(connection_message)?;
770
771 Ok(telemetry.handle())
772}
773
774pub struct GenRpcModuleParams<'a, TBl: BlockT, TBackend, TCl, TRpc, TExPool> {
776 pub spawn_handle: SpawnTaskHandle,
778 pub client: Arc<TCl>,
780 pub transaction_pool: Arc<TExPool>,
782 pub keystore: KeystorePtr,
784 pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
786 pub impl_name: String,
788 pub impl_version: String,
790 pub chain_spec: &'a dyn ChainSpec,
792 pub state_pruning: &'a Option<PruningMode>,
794 pub blocks_pruning: BlocksPruning,
796 pub backend: Arc<TBackend>,
798 pub rpc_builder: &'a dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>,
800 pub metrics: Option<sc_rpc_spec_v2::transaction::TransactionMetrics>,
802 pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
806}
807
808pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
810 GenRpcModuleParams {
811 spawn_handle,
812 client,
813 transaction_pool,
814 keystore,
815 system_rpc_tx,
816 impl_name,
817 impl_version,
818 chain_spec,
819 state_pruning,
820 blocks_pruning,
821 backend,
822 rpc_builder,
823 metrics,
824 tracing_execute_block: execute_block,
825 }: GenRpcModuleParams<TBl, TBackend, TCl, TRpc, TExPool>,
826) -> Result<RpcModule<()>, Error>
827where
828 TBl: BlockT,
829 TCl: ProvideRuntimeApi<TBl>
830 + BlockchainEvents<TBl>
831 + HeaderBackend<TBl>
832 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
833 + ExecutorProvider<TBl>
834 + CallApiAt<TBl>
835 + ProofProvider<TBl>
836 + StorageProvider<TBl, TBackend>
837 + BlockBackend<TBl>
838 + Send
839 + Sync
840 + 'static,
841 TBackend: sc_client_api::backend::Backend<TBl> + 'static,
842 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
843 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
844 TBl::Hash: Unpin,
845 TBl::Header: Unpin,
846{
847 let system_info = sc_rpc::system::SystemInfo {
848 chain_name: chain_spec.name().into(),
849 impl_name,
850 impl_version,
851 properties: chain_spec.properties(),
852 chain_type: chain_spec.chain_type(),
853 };
854
855 let mut rpc_api = RpcModule::new(());
856 let task_executor = Arc::new(spawn_handle);
857
858 let (chain, state, child_state) = {
859 let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
860 let (state, child_state) =
861 sc_rpc::state::new_full(client.clone(), task_executor.clone(), execute_block);
862 let state = state.into_rpc();
863 let child_state = child_state.into_rpc();
864
865 (chain, state, child_state)
866 };
867
868 const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
869
870 let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
871 client.clone(),
872 transaction_pool.clone(),
873 task_executor.clone(),
874 MAX_TRANSACTION_PER_CONNECTION,
875 )
876 .into_rpc();
877
878 let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
879 client.clone(),
880 transaction_pool.clone(),
881 task_executor.clone(),
882 metrics,
883 )
884 .into_rpc();
885
886 let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
887 client.clone(),
888 backend.clone(),
889 task_executor.clone(),
890 sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
892 )
893 .into_rpc();
894
895 let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
900 blocks_pruning.is_archive();
901 let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
902 if is_archive_node {
903 let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
904 client.clone(),
905 backend.clone(),
906 genesis_hash,
907 task_executor.clone(),
908 )
909 .into_rpc();
910 rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
911 }
912
913 let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
915 chain_spec.name().into(),
916 genesis_hash,
917 chain_spec.properties(),
918 )
919 .into_rpc();
920
921 let author = sc_rpc::author::Author::new(
922 client.clone(),
923 transaction_pool,
924 keystore,
925 task_executor.clone(),
926 )
927 .into_rpc();
928
929 let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
930
931 if let Some(storage) = backend.offchain_storage() {
932 let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
933
934 rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
935 }
936
937 rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
939 rpc_api
940 .merge(transaction_broadcast_rpc_v2)
941 .map_err(|e| Error::Application(e.into()))?;
942 rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
943 rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
944
945 rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
947 rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
948 rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
949 rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
950 rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
951 let extra_rpcs = rpc_builder(task_executor.clone())?;
953 rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
954
955 Ok(rpc_api)
956}
957
958pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
960where
961 Block: BlockT,
962 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
963{
964 pub config: &'a Configuration,
966 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
968 pub client: Arc<Client>,
970 pub transaction_pool: Arc<TxPool>,
972 pub spawn_handle: SpawnTaskHandle,
974 pub spawn_essential_handle: SpawnEssentialTaskHandle,
976 pub import_queue: IQ,
978 pub block_announce_validator_builder: Option<
980 Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
981 >,
982 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
984 pub block_relay: Option<BlockRelayParams<Block, Net>>,
987 pub metrics: NotificationMetrics,
989}
990
991pub fn build_network<Block, Net, TxPool, IQ, Client>(
993 params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
994) -> Result<
995 (
996 Arc<dyn sc_network::service::traits::NetworkService>,
997 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
998 sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
999 Arc<SyncingService<Block>>,
1000 ),
1001 Error,
1002>
1003where
1004 Block: BlockT,
1005 Client: ProvideRuntimeApi<Block>
1006 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1007 + Chain<Block>
1008 + BlockBackend<Block>
1009 + BlockIdTo<Block, Error = sp_blockchain::Error>
1010 + ProofProvider<Block>
1011 + HeaderBackend<Block>
1012 + BlockchainEvents<Block>
1013 + 'static,
1014 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1015 IQ: ImportQueue<Block> + 'static,
1016 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1017{
1018 let BuildNetworkParams {
1019 config,
1020 mut net_config,
1021 client,
1022 transaction_pool,
1023 spawn_handle,
1024 spawn_essential_handle,
1025 import_queue,
1026 block_announce_validator_builder,
1027 warp_sync_config,
1028 block_relay,
1029 metrics,
1030 } = params;
1031
1032 let block_announce_validator = if let Some(f) = block_announce_validator_builder {
1033 f(client.clone())
1034 } else {
1035 Box::new(DefaultBlockAnnounceValidator)
1036 };
1037
1038 let network_service_provider = NetworkServiceProvider::new();
1039 let protocol_id = config.protocol_id();
1040 let fork_id = config.chain_spec.fork_id();
1041 let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
1042
1043 let block_downloader = match block_relay {
1044 Some(params) => {
1045 let BlockRelayParams { mut server, downloader, request_response_config } = params;
1046
1047 net_config.add_request_response_protocol(request_response_config);
1048
1049 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1050 server.run().await;
1051 });
1052
1053 downloader
1054 },
1055 None => build_default_block_downloader(
1056 &protocol_id,
1057 fork_id,
1058 &mut net_config,
1059 network_service_provider.handle(),
1060 Arc::clone(&client),
1061 config.network.default_peers_set.in_peers as usize +
1062 config.network.default_peers_set.out_peers as usize,
1063 &spawn_handle,
1064 ),
1065 };
1066
1067 let syncing_strategy = build_polkadot_syncing_strategy(
1068 protocol_id.clone(),
1069 fork_id,
1070 &mut net_config,
1071 warp_sync_config,
1072 block_downloader,
1073 client.clone(),
1074 &spawn_handle,
1075 metrics_registry,
1076 config.blocks_pruning.is_archive(),
1077 )?;
1078
1079 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1080 Roles::from(&config.role),
1081 Arc::clone(&client),
1082 metrics_registry,
1083 metrics.clone(),
1084 &net_config,
1085 protocol_id.clone(),
1086 fork_id,
1087 block_announce_validator,
1088 syncing_strategy,
1089 network_service_provider.handle(),
1090 import_queue.service(),
1091 net_config.peer_store_handle(),
1092 )?;
1093
1094 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1095
1096 build_network_advanced(BuildNetworkAdvancedParams {
1097 role: config.role,
1098 protocol_id,
1099 fork_id,
1100 ipfs_server: config.network.ipfs_server,
1101 announce_block: config.announce_block,
1102 net_config,
1103 client,
1104 transaction_pool,
1105 spawn_handle,
1106 spawn_essential_handle,
1107 import_queue,
1108 sync_service,
1109 block_announce_config,
1110 network_service_provider,
1111 metrics_registry,
1112 metrics,
1113 })
1114}
1115
1116pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
1118where
1119 Block: BlockT,
1120 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1121{
1122 pub role: Role,
1124 pub protocol_id: ProtocolId,
1126 pub fork_id: Option<&'a str>,
1128 pub ipfs_server: bool,
1130 pub announce_block: bool,
1132 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1134 pub client: Arc<Client>,
1136 pub transaction_pool: Arc<TxPool>,
1138 pub spawn_handle: SpawnTaskHandle,
1140 pub spawn_essential_handle: SpawnEssentialTaskHandle,
1142 pub import_queue: IQ,
1144 pub sync_service: SyncingService<Block>,
1146 pub block_announce_config: Net::NotificationProtocolConfig,
1148 pub network_service_provider: NetworkServiceProvider,
1150 pub metrics_registry: Option<&'a Registry>,
1152 pub metrics: NotificationMetrics,
1154}
1155
1156pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
1159 params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
1160) -> Result<
1161 (
1162 Arc<dyn sc_network::service::traits::NetworkService>,
1163 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
1164 sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1165 Arc<SyncingService<Block>>,
1166 ),
1167 Error,
1168>
1169where
1170 Block: BlockT,
1171 Client: ProvideRuntimeApi<Block>
1172 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1173 + Chain<Block>
1174 + BlockBackend<Block>
1175 + BlockIdTo<Block, Error = sp_blockchain::Error>
1176 + ProofProvider<Block>
1177 + HeaderBackend<Block>
1178 + BlockchainEvents<Block>
1179 + 'static,
1180 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1181 IQ: ImportQueue<Block> + 'static,
1182 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1183{
1184 let BuildNetworkAdvancedParams {
1185 role,
1186 protocol_id,
1187 fork_id,
1188 ipfs_server,
1189 announce_block,
1190 mut net_config,
1191 client,
1192 transaction_pool,
1193 spawn_handle,
1194 spawn_essential_handle,
1195 import_queue,
1196 sync_service,
1197 block_announce_config,
1198 network_service_provider,
1199 metrics_registry,
1200 metrics,
1201 } = params;
1202
1203 let genesis_hash = client.info().genesis_hash;
1204
1205 let light_client_request_protocol_config = {
1206 let (handler, protocol_config) =
1208 LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1209 spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1210 protocol_config
1211 };
1212
1213 net_config.add_request_response_protocol(light_client_request_protocol_config);
1215
1216 let bitswap_config = ipfs_server.then(|| {
1217 let (handler, config) = Net::bitswap_server(client.clone());
1218 spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1219
1220 config
1221 });
1222
1223 let (transactions_handler_proto, transactions_config) =
1225 sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1226 protocol_id.clone(),
1227 genesis_hash,
1228 fork_id,
1229 metrics.clone(),
1230 net_config.peer_store_handle(),
1231 );
1232 net_config.add_notification_protocol(transactions_config);
1233
1234 let peer_store = net_config.take_peer_store();
1236 spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1237
1238 let sync_service = Arc::new(sync_service);
1239
1240 let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1241 role,
1242 executor: {
1243 let spawn_handle = Clone::clone(&spawn_handle);
1244 Box::new(move |fut| {
1245 spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1246 })
1247 },
1248 network_config: net_config,
1249 genesis_hash,
1250 protocol_id,
1251 fork_id: fork_id.map(ToOwned::to_owned),
1252 metrics_registry: metrics_registry.cloned(),
1253 block_announce_config,
1254 bitswap_config,
1255 notification_metrics: metrics,
1256 };
1257
1258 let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1259 let network_mut = Net::new(network_params)?;
1260 let network = network_mut.network_service().clone();
1261
1262 let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1263 network.clone(),
1264 sync_service.clone(),
1265 Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1266 metrics_registry,
1267 )?;
1268 spawn_handle.spawn_blocking(
1269 "network-transactions-handler",
1270 Some("networking"),
1271 tx_handler.run(),
1272 );
1273
1274 spawn_handle.spawn_blocking(
1275 "chain-sync-network-service-provider",
1276 Some("networking"),
1277 network_service_provider.run(Arc::new(network.clone())),
1278 );
1279 spawn_handle.spawn("import-queue", None, {
1280 let sync_service = sync_service.clone();
1281
1282 async move { import_queue.run(sync_service.as_ref()).await }
1283 });
1284
1285 let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1286 spawn_handle.spawn(
1287 "system-rpc-handler",
1288 Some("networking"),
1289 build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1290 role,
1291 network_mut.network_service(),
1292 sync_service.clone(),
1293 client.clone(),
1294 system_rpc_rx,
1295 has_bootnodes,
1296 ),
1297 );
1298
1299 let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1300 network_mut,
1301 client,
1302 sync_service.clone(),
1303 announce_block,
1304 );
1305
1306 spawn_essential_handle.spawn_blocking("network-worker", Some("networking"), future);
1317
1318 Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone()))
1319}
1320
1321pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1323where
1324 Block: BlockT,
1325 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1326{
1327 pub role: Role,
1329 pub protocol_id: ProtocolId,
1331 pub fork_id: Option<&'a str>,
1333 pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1335 pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1337 pub network_service_handle: NetworkServiceHandle,
1339 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1341 pub client: Arc<Client>,
1343 pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1345 pub num_peers_hint: usize,
1347 pub spawn_handle: &'a SpawnTaskHandle,
1349 pub metrics_registry: Option<&'a Registry>,
1351 pub metrics: NotificationMetrics,
1353 pub archive_blocks: bool,
1356}
1357
1358pub fn build_default_syncing_engine<Block, Client, Net>(
1361 config: DefaultSyncingEngineConfig<Block, Client, Net>,
1362) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1363where
1364 Block: BlockT,
1365 Client: HeaderBackend<Block>
1366 + BlockBackend<Block>
1367 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1368 + ProofProvider<Block>
1369 + Send
1370 + Sync
1371 + 'static,
1372 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1373{
1374 let DefaultSyncingEngineConfig {
1375 role,
1376 protocol_id,
1377 fork_id,
1378 net_config,
1379 block_announce_validator,
1380 network_service_handle,
1381 warp_sync_config,
1382 client,
1383 import_queue_service,
1384 num_peers_hint,
1385 spawn_handle,
1386 metrics_registry,
1387 metrics,
1388 archive_blocks,
1389 } = config;
1390
1391 let block_downloader = build_default_block_downloader(
1392 &protocol_id,
1393 fork_id,
1394 net_config,
1395 network_service_handle.clone(),
1396 client.clone(),
1397 num_peers_hint,
1398 spawn_handle,
1399 );
1400 let syncing_strategy = build_polkadot_syncing_strategy(
1401 protocol_id.clone(),
1402 fork_id,
1403 net_config,
1404 warp_sync_config,
1405 block_downloader,
1406 client.clone(),
1407 spawn_handle,
1408 metrics_registry,
1409 archive_blocks,
1410 )?;
1411
1412 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1413 Roles::from(&role),
1414 client,
1415 metrics_registry,
1416 metrics,
1417 &net_config,
1418 protocol_id,
1419 fork_id,
1420 block_announce_validator,
1421 syncing_strategy,
1422 network_service_handle,
1423 import_queue_service,
1424 net_config.peer_store_handle(),
1425 )?;
1426
1427 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1428
1429 Ok((sync_service, block_announce_config))
1430}
1431
1432pub fn build_default_block_downloader<Block, Client, Net>(
1434 protocol_id: &ProtocolId,
1435 fork_id: Option<&str>,
1436 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1437 network_service_handle: NetworkServiceHandle,
1438 client: Arc<Client>,
1439 num_peers_hint: usize,
1440 spawn_handle: &SpawnTaskHandle,
1441) -> Arc<dyn BlockDownloader<Block>>
1442where
1443 Block: BlockT,
1444 Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1445 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1446{
1447 let BlockRelayParams { mut server, downloader, request_response_config } =
1450 BlockRequestHandler::new::<Net>(
1451 network_service_handle,
1452 &protocol_id,
1453 fork_id,
1454 client.clone(),
1455 num_peers_hint,
1456 );
1457
1458 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1459 server.run().await;
1460 });
1461
1462 net_config.add_request_response_protocol(request_response_config);
1463
1464 downloader
1465}
1466
1467pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1469 protocol_id: ProtocolId,
1470 fork_id: Option<&str>,
1471 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1472 warp_sync_config: Option<WarpSyncConfig<Block>>,
1473 block_downloader: Arc<dyn BlockDownloader<Block>>,
1474 client: Arc<Client>,
1475 spawn_handle: &SpawnTaskHandle,
1476 metrics_registry: Option<&Registry>,
1477 archive_blocks: bool,
1478) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1479where
1480 Block: BlockT,
1481 Client: HeaderBackend<Block>
1482 + BlockBackend<Block>
1483 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1484 + ProofProvider<Block>
1485 + Send
1486 + Sync
1487 + 'static,
1488 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1489{
1490 if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1491 return Err("Warp sync enabled, but no warp sync provider configured.".into());
1492 }
1493
1494 if client.requires_full_sync() {
1495 match net_config.network_config.sync_mode {
1496 SyncMode::LightState { .. } => {
1497 return Err("Fast sync doesn't work for archive nodes".into())
1498 },
1499 SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1500 SyncMode::Full => {},
1501 }
1502 }
1503
1504 let genesis_hash = client.info().genesis_hash;
1505
1506 let (state_request_protocol_config, state_request_protocol_name) = {
1507 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1508 net_config.network_config.default_peers_set.reserved_nodes.len();
1509 let (handler, protocol_config) =
1511 StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1512 let config_name = protocol_config.protocol_name().clone();
1513
1514 spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1515 (protocol_config, config_name)
1516 };
1517 net_config.add_request_response_protocol(state_request_protocol_config);
1518
1519 let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1520 Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1521 let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1523 protocol_id,
1524 genesis_hash,
1525 fork_id,
1526 warp_with_provider.clone(),
1527 );
1528 let config_name = protocol_config.protocol_name().clone();
1529
1530 spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1531 (Some(protocol_config), Some(config_name))
1532 },
1533 _ => (None, None),
1534 };
1535 if let Some(config) = warp_sync_protocol_config {
1536 net_config.add_request_response_protocol(config);
1537 }
1538
1539 let syncing_config = PolkadotSyncingStrategyConfig {
1540 mode: net_config.network_config.sync_mode,
1541 max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1542 max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1543 min_peers_to_start_warp_sync: net_config.network_config.min_peers_to_start_warp_sync,
1544 metrics_registry: metrics_registry.cloned(),
1545 state_request_protocol_name,
1546 block_downloader,
1547 archive_blocks,
1548 };
1549 Ok(Box::new(PolkadotSyncingStrategy::new(
1550 syncing_config,
1551 client,
1552 warp_sync_config,
1553 warp_sync_protocol_name,
1554 )?))
1555}