1use crate::{
20 build_network_future, build_system_rpc_future,
21 client::{Client, ClientConfig},
22 config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23 error::Error,
24 metrics::MetricsService,
25 start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
26 TaskManager, TransactionPoolAdapter,
27};
28use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::info;
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34 execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35 BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, StorageProvider, UsageProvider,
36};
37use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
38use sc_consensus::import_queue::{ImportQueue, ImportQueueService};
39use sc_executor::{
40 sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
41 WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
42};
43use sc_keystore::LocalKeystore;
44use sc_network::{
45 config::{FullNetworkConfiguration, ProtocolId, SyncMode},
46 multiaddr::Protocol,
47 service::{
48 traits::{PeerStore, RequestResponseConfig},
49 NotificationMetrics,
50 },
51 NetworkBackend, NetworkStateInfo,
52};
53use sc_network_common::role::{Role, Roles};
54use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
55use sc_network_sync::{
56 block_relay_protocol::{BlockDownloader, BlockRelayParams},
57 block_request_handler::BlockRequestHandler,
58 engine::SyncingEngine,
59 service::network::{NetworkServiceHandle, NetworkServiceProvider},
60 state_request_handler::StateRequestHandler,
61 strategy::{
62 polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
63 SyncingStrategy,
64 },
65 warp_request_handler::RequestHandler as WarpSyncRequestHandler,
66 SyncingService, WarpSyncConfig,
67};
68use sc_rpc::{
69 author::AuthorApiServer,
70 chain::ChainApiServer,
71 offchain::OffchainApiServer,
72 state::{ChildStateApiServer, StateApiServer},
73 system::SystemApiServer,
74 DenyUnsafe, SubscriptionTaskExecutor,
75};
76use sc_rpc_spec_v2::{
77 archive::ArchiveApiServer,
78 chain_head::ChainHeadApiServer,
79 chain_spec::ChainSpecApiServer,
80 transaction::{TransactionApiServer, TransactionBroadcastApiServer},
81};
82use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
83use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
84use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
85use sp_api::{CallApiAt, ProvideRuntimeApi};
86use sp_blockchain::{HeaderBackend, HeaderMetadata};
87use sp_consensus::block_validation::{
88 BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
89};
90use sp_core::traits::{CodeExecutor, SpawnNamed};
91use sp_keystore::KeystorePtr;
92use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
93use std::{str::FromStr, sync::Arc, time::SystemTime};
94
95pub type TFullClient<TBl, TRtApi, TExec> =
97 Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
98
99pub type TFullBackend<TBl> = Backend<TBl>;
101
102pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
104
105type TFullParts<TBl, TRtApi, TExec> =
106 (TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
107
108pub struct KeystoreContainer(Arc<LocalKeystore>);
110
111impl KeystoreContainer {
112 pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
114 let keystore = Arc::new(match config {
115 KeystoreConfig::Path { path, password } =>
116 LocalKeystore::open(path.clone(), password.clone())?,
117 KeystoreConfig::InMemory => LocalKeystore::in_memory(),
118 });
119
120 Ok(Self(keystore))
121 }
122
123 pub fn keystore(&self) -> KeystorePtr {
125 self.0.clone()
126 }
127
128 pub fn local_keystore(&self) -> Arc<LocalKeystore> {
130 self.0.clone()
131 }
132}
133
134pub fn new_full_client<TBl, TRtApi, TExec>(
136 config: &Configuration,
137 telemetry: Option<TelemetryHandle>,
138 executor: TExec,
139) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
140where
141 TBl: BlockT,
142 TExec: CodeExecutor + RuntimeVersionOf + Clone,
143{
144 new_full_parts(config, telemetry, executor).map(|parts| parts.0)
145}
146
147pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
149 config: &Configuration,
150 telemetry: Option<TelemetryHandle>,
151 executor: TExec,
152 enable_import_proof_recording: bool,
153) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
154where
155 TBl: BlockT,
156 TExec: CodeExecutor + RuntimeVersionOf + Clone,
157{
158 let backend = new_db_backend(config.db_config())?;
159
160 let genesis_block_builder = GenesisBlockBuilder::new(
161 config.chain_spec.as_storage_builder(),
162 !config.no_genesis(),
163 backend.clone(),
164 executor.clone(),
165 )?;
166
167 new_full_parts_with_genesis_builder(
168 config,
169 telemetry,
170 executor,
171 backend,
172 genesis_block_builder,
173 enable_import_proof_recording,
174 )
175}
176pub fn new_full_parts<TBl, TRtApi, TExec>(
178 config: &Configuration,
179 telemetry: Option<TelemetryHandle>,
180 executor: TExec,
181) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
182where
183 TBl: BlockT,
184 TExec: CodeExecutor + RuntimeVersionOf + Clone,
185{
186 new_full_parts_record_import(config, telemetry, executor, false)
187}
188
189pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
191 config: &Configuration,
192 telemetry: Option<TelemetryHandle>,
193 executor: TExec,
194 backend: Arc<TFullBackend<TBl>>,
195 genesis_block_builder: TBuildGenesisBlock,
196 enable_import_proof_recording: bool,
197) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
198where
199 TBl: BlockT,
200 TExec: CodeExecutor + RuntimeVersionOf + Clone,
201 TBuildGenesisBlock: BuildGenesisBlock<
202 TBl,
203 BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
204 >,
205{
206 let keystore_container = KeystoreContainer::new(&config.keystore)?;
207
208 let task_manager = {
209 let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
210 TaskManager::new(config.tokio_handle.clone(), registry)?
211 };
212
213 let chain_spec = &config.chain_spec;
214 let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
215 .cloned()
216 .unwrap_or_default();
217
218 let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
219 .cloned()
220 .unwrap_or_default();
221
222 let client = {
223 let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
224
225 let wasm_runtime_substitutes = config
226 .chain_spec
227 .code_substitutes()
228 .into_iter()
229 .map(|(n, c)| {
230 let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
231 Error::Application(Box::from(format!(
232 "Failed to parse `{}` as block number for code substitutes. \
233 In an old version the key for code substitute was a block hash. \
234 Please update the chain spec to a version that is compatible with your node.",
235 n
236 )))
237 })?;
238 Ok((number, c))
239 })
240 .collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
241
242 let client = new_client(
243 backend.clone(),
244 executor,
245 genesis_block_builder,
246 fork_blocks,
247 bad_blocks,
248 extensions,
249 Box::new(task_manager.spawn_handle()),
250 config.prometheus_config.as_ref().map(|config| config.registry.clone()),
251 telemetry,
252 ClientConfig {
253 offchain_worker_enabled: config.offchain_worker.enabled,
254 offchain_indexing_api: config.offchain_worker.indexing_enabled,
255 wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
256 no_genesis: config.no_genesis(),
257 wasm_runtime_substitutes,
258 enable_import_proof_recording,
259 },
260 )?;
261
262 client
263 };
264
265 Ok((client, backend, keystore_container, task_manager))
266}
267
268#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
271#[allow(deprecated)]
272pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
273 config: &Configuration,
274) -> sc_executor::NativeElseWasmExecutor<D> {
275 #[allow(deprecated)]
276 sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
277}
278
279pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
281 let strategy = config
282 .default_heap_pages
283 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
284 WasmExecutor::<H>::builder()
285 .with_execution_method(config.wasm_method)
286 .with_onchain_heap_alloc_strategy(strategy)
287 .with_offchain_heap_alloc_strategy(strategy)
288 .with_max_runtime_instances(config.max_runtime_instances)
289 .with_runtime_cache_size(config.runtime_cache_size)
290 .build()
291}
292
293pub fn new_db_backend<Block>(
295 settings: DatabaseSettings,
296) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
297where
298 Block: BlockT,
299{
300 const CANONICALIZATION_DELAY: u64 = 4096;
301
302 Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
303}
304
305pub fn new_client<E, Block, RA, G>(
307 backend: Arc<Backend<Block>>,
308 executor: E,
309 genesis_block_builder: G,
310 fork_blocks: ForkBlocks<Block>,
311 bad_blocks: BadBlocks<Block>,
312 execution_extensions: ExecutionExtensions<Block>,
313 spawn_handle: Box<dyn SpawnNamed>,
314 prometheus_registry: Option<Registry>,
315 telemetry: Option<TelemetryHandle>,
316 config: ClientConfig<Block>,
317) -> Result<
318 Client<
319 Backend<Block>,
320 crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
321 Block,
322 RA,
323 >,
324 sp_blockchain::Error,
325>
326where
327 Block: BlockT,
328 E: CodeExecutor + RuntimeVersionOf,
329 G: BuildGenesisBlock<
330 Block,
331 BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
332 >,
333{
334 let executor = crate::client::LocalCallExecutor::new(
335 backend.clone(),
336 executor,
337 config.clone(),
338 execution_extensions,
339 )?;
340
341 Client::new(
342 backend,
343 executor,
344 spawn_handle,
345 genesis_block_builder,
346 fork_blocks,
347 bad_blocks,
348 prometheus_registry,
349 telemetry,
350 config,
351 )
352}
353
354pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
356 pub config: Configuration,
358 pub client: Arc<TCl>,
360 pub backend: Arc<Backend>,
362 pub task_manager: &'a mut TaskManager,
364 pub keystore: KeystorePtr,
366 pub transaction_pool: Arc<TExPool>,
368 pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
370 pub network: Arc<dyn sc_network::service::traits::NetworkService>,
372 pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
374 pub tx_handler_controller:
376 sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
377 pub sync_service: Arc<SyncingService<TBl>>,
379 pub telemetry: Option<&'a mut Telemetry>,
381}
382
383pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
385 params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
386) -> Result<RpcHandlers, Error>
387where
388 TCl: ProvideRuntimeApi<TBl>
389 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
390 + Chain<TBl>
391 + BlockBackend<TBl>
392 + BlockIdTo<TBl, Error = sp_blockchain::Error>
393 + ProofProvider<TBl>
394 + HeaderBackend<TBl>
395 + BlockchainEvents<TBl>
396 + ExecutorProvider<TBl>
397 + UsageProvider<TBl>
398 + StorageProvider<TBl, TBackend>
399 + CallApiAt<TBl>
400 + Send
401 + 'static,
402 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
403 + sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
404 + sp_session::SessionKeys<TBl>
405 + sp_api::ApiExt<TBl>,
406 TBl: BlockT,
407 TBl::Hash: Unpin,
408 TBl::Header: Unpin,
409 TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
410 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
411{
412 let SpawnTasksParams {
413 mut config,
414 task_manager,
415 client,
416 backend,
417 keystore,
418 transaction_pool,
419 rpc_builder,
420 network,
421 system_rpc_tx,
422 tx_handler_controller,
423 sync_service,
424 telemetry,
425 } = params;
426
427 let chain_info = client.usage_info().chain;
428
429 sp_session::generate_initial_session_keys(
430 client.clone(),
431 chain_info.best_hash,
432 config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
433 keystore.clone(),
434 )
435 .map_err(|e| Error::Application(Box::new(e)))?;
436
437 let sysinfo = sc_sysinfo::gather_sysinfo();
438 sc_sysinfo::print_sysinfo(&sysinfo);
439
440 let telemetry = telemetry
441 .map(|telemetry| {
442 init_telemetry(
443 config.network.node_name.clone(),
444 config.impl_name.clone(),
445 config.impl_version.clone(),
446 config.chain_spec.name().to_string(),
447 config.role.is_authority(),
448 network.clone(),
449 client.clone(),
450 telemetry,
451 Some(sysinfo),
452 )
453 })
454 .transpose()?;
455
456 info!("📦 Highest known block at #{}", chain_info.best_number);
457
458 let spawn_handle = task_manager.spawn_handle();
459
460 spawn_handle.spawn(
462 "txpool-notifications",
463 Some("transaction-pool"),
464 sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
465 );
466
467 spawn_handle.spawn(
468 "on-transaction-imported",
469 Some("transaction-pool"),
470 propagate_transaction_notifications(
471 transaction_pool.clone(),
472 tx_handler_controller,
473 telemetry.clone(),
474 ),
475 );
476
477 let metrics_service =
479 if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
480 let metrics = MetricsService::with_prometheus(
482 telemetry,
483 ®istry,
484 config.role,
485 &config.network.node_name,
486 &config.impl_version,
487 )?;
488 spawn_handle.spawn(
489 "prometheus-endpoint",
490 None,
491 prometheus_endpoint::init_prometheus(port, registry).map(drop),
492 );
493
494 metrics
495 } else {
496 MetricsService::new(telemetry)
497 };
498
499 spawn_handle.spawn(
501 "telemetry-periodic-send",
502 None,
503 metrics_service.run(
504 client.clone(),
505 transaction_pool.clone(),
506 network.clone(),
507 sync_service.clone(),
508 ),
509 );
510
511 let rpc_id_provider = config.rpc.id_provider.take();
512
513 let gen_rpc_module = || {
515 gen_rpc_module(
516 task_manager.spawn_handle(),
517 client.clone(),
518 transaction_pool.clone(),
519 keystore.clone(),
520 system_rpc_tx.clone(),
521 config.impl_name.clone(),
522 config.impl_version.clone(),
523 config.chain_spec.as_ref(),
524 &config.state_pruning,
525 config.blocks_pruning,
526 backend.clone(),
527 &*rpc_builder,
528 )
529 };
530
531 let rpc_server_handle = start_rpc_servers(
532 &config.rpc,
533 config.prometheus_registry(),
534 &config.tokio_handle,
535 gen_rpc_module,
536 rpc_id_provider,
537 )?;
538
539 let listen_addrs = rpc_server_handle
540 .listen_addrs()
541 .into_iter()
542 .map(|socket_addr| {
543 let mut multiaddr: Multiaddr = socket_addr.ip().into();
544 multiaddr.push(Protocol::Tcp(socket_addr.port()));
545 multiaddr
546 })
547 .collect();
548
549 let in_memory_rpc = {
550 let mut module = gen_rpc_module()?;
551 module.extensions_mut().insert(DenyUnsafe::No);
552 module
553 };
554
555 let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
556
557 spawn_handle.spawn(
559 "informant",
560 None,
561 sc_informant::build(client.clone(), network, sync_service.clone()),
562 );
563
564 task_manager.keep_alive((config.base_path, rpc_server_handle));
565
566 Ok(in_memory_rpc_handle)
567}
568
569pub async fn propagate_transaction_notifications<Block, ExPool>(
571 transaction_pool: Arc<ExPool>,
572 tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
573 <Block as BlockT>::Hash,
574 >,
575 telemetry: Option<TelemetryHandle>,
576) where
577 Block: BlockT,
578 ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
579{
580 transaction_pool
582 .import_notification_stream()
583 .for_each(move |hash| {
584 tx_handler_controller.propagate_transaction(hash);
585 let status = transaction_pool.status();
586 telemetry!(
587 telemetry;
588 SUBSTRATE_INFO;
589 "txpool.import";
590 "ready" => status.ready,
591 "future" => status.future,
592 );
593 ready(())
594 })
595 .await;
596}
597
598pub fn init_telemetry<Block, Client, Network>(
600 name: String,
601 implementation: String,
602 version: String,
603 chain: String,
604 authority: bool,
605 network: Network,
606 client: Arc<Client>,
607 telemetry: &mut Telemetry,
608 sysinfo: Option<sc_telemetry::SysInfo>,
609) -> sc_telemetry::Result<TelemetryHandle>
610where
611 Block: BlockT,
612 Client: BlockBackend<Block>,
613 Network: NetworkStateInfo,
614{
615 let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
616 let connection_message = ConnectionMessage {
617 name,
618 implementation,
619 version,
620 target_os: sc_sysinfo::TARGET_OS.into(),
621 target_arch: sc_sysinfo::TARGET_ARCH.into(),
622 target_env: sc_sysinfo::TARGET_ENV.into(),
623 config: String::new(),
624 chain,
625 genesis_hash: format!("{:?}", genesis_hash),
626 authority,
627 startup_time: SystemTime::UNIX_EPOCH
628 .elapsed()
629 .map(|dur| dur.as_millis())
630 .unwrap_or(0)
631 .to_string(),
632 network_id: network.local_peer_id().to_base58(),
633 sysinfo,
634 };
635
636 telemetry.start_telemetry(connection_message)?;
637
638 Ok(telemetry.handle())
639}
640
641pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
643 spawn_handle: SpawnTaskHandle,
644 client: Arc<TCl>,
645 transaction_pool: Arc<TExPool>,
646 keystore: KeystorePtr,
647 system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
648 impl_name: String,
649 impl_version: String,
650 chain_spec: &dyn ChainSpec,
651 state_pruning: &Option<PruningMode>,
652 blocks_pruning: BlocksPruning,
653 backend: Arc<TBackend>,
654 rpc_builder: &(dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
655) -> Result<RpcModule<()>, Error>
656where
657 TBl: BlockT,
658 TCl: ProvideRuntimeApi<TBl>
659 + BlockchainEvents<TBl>
660 + HeaderBackend<TBl>
661 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
662 + ExecutorProvider<TBl>
663 + CallApiAt<TBl>
664 + ProofProvider<TBl>
665 + StorageProvider<TBl, TBackend>
666 + BlockBackend<TBl>
667 + Send
668 + Sync
669 + 'static,
670 TBackend: sc_client_api::backend::Backend<TBl> + 'static,
671 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
672 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
673 TBl::Hash: Unpin,
674 TBl::Header: Unpin,
675{
676 let system_info = sc_rpc::system::SystemInfo {
677 chain_name: chain_spec.name().into(),
678 impl_name,
679 impl_version,
680 properties: chain_spec.properties(),
681 chain_type: chain_spec.chain_type(),
682 };
683
684 let mut rpc_api = RpcModule::new(());
685 let task_executor = Arc::new(spawn_handle);
686
687 let (chain, state, child_state) = {
688 let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
689 let (state, child_state) = sc_rpc::state::new_full(client.clone(), task_executor.clone());
690 let state = state.into_rpc();
691 let child_state = child_state.into_rpc();
692
693 (chain, state, child_state)
694 };
695
696 const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
697
698 let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
699 client.clone(),
700 transaction_pool.clone(),
701 task_executor.clone(),
702 MAX_TRANSACTION_PER_CONNECTION,
703 )
704 .into_rpc();
705
706 let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
707 client.clone(),
708 transaction_pool.clone(),
709 task_executor.clone(),
710 )
711 .into_rpc();
712
713 let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
714 client.clone(),
715 backend.clone(),
716 task_executor.clone(),
717 sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
719 )
720 .into_rpc();
721
722 let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
727 blocks_pruning.is_archive();
728 let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
729 if is_archive_node {
730 let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
731 client.clone(),
732 backend.clone(),
733 genesis_hash,
734 task_executor.clone(),
735 )
736 .into_rpc();
737 rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
738 }
739
740 let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
742 chain_spec.name().into(),
743 genesis_hash,
744 chain_spec.properties(),
745 )
746 .into_rpc();
747
748 let author = sc_rpc::author::Author::new(
749 client.clone(),
750 transaction_pool,
751 keystore,
752 task_executor.clone(),
753 )
754 .into_rpc();
755
756 let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
757
758 if let Some(storage) = backend.offchain_storage() {
759 let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
760
761 rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
762 }
763
764 rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
766 rpc_api
767 .merge(transaction_broadcast_rpc_v2)
768 .map_err(|e| Error::Application(e.into()))?;
769 rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
770 rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
771
772 rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
774 rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
775 rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
776 rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
777 rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
778 let extra_rpcs = rpc_builder(task_executor.clone())?;
780 rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
781
782 Ok(rpc_api)
783}
784
785pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
787where
788 Block: BlockT,
789 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
790{
791 pub config: &'a Configuration,
793 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
795 pub client: Arc<Client>,
797 pub transaction_pool: Arc<TxPool>,
799 pub spawn_handle: SpawnTaskHandle,
801 pub import_queue: IQ,
803 pub block_announce_validator_builder: Option<
805 Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
806 >,
807 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
809 pub block_relay: Option<BlockRelayParams<Block, Net>>,
812 pub metrics: NotificationMetrics,
814}
815
816pub fn build_network<Block, Net, TxPool, IQ, Client>(
818 params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
819) -> Result<
820 (
821 Arc<dyn sc_network::service::traits::NetworkService>,
822 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
823 sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
824 NetworkStarter,
825 Arc<SyncingService<Block>>,
826 ),
827 Error,
828>
829where
830 Block: BlockT,
831 Client: ProvideRuntimeApi<Block>
832 + HeaderMetadata<Block, Error = sp_blockchain::Error>
833 + Chain<Block>
834 + BlockBackend<Block>
835 + BlockIdTo<Block, Error = sp_blockchain::Error>
836 + ProofProvider<Block>
837 + HeaderBackend<Block>
838 + BlockchainEvents<Block>
839 + 'static,
840 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
841 IQ: ImportQueue<Block> + 'static,
842 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
843{
844 let BuildNetworkParams {
845 config,
846 mut net_config,
847 client,
848 transaction_pool,
849 spawn_handle,
850 import_queue,
851 block_announce_validator_builder,
852 warp_sync_config,
853 block_relay,
854 metrics,
855 } = params;
856
857 let block_announce_validator = if let Some(f) = block_announce_validator_builder {
858 f(client.clone())
859 } else {
860 Box::new(DefaultBlockAnnounceValidator)
861 };
862
863 let network_service_provider = NetworkServiceProvider::new();
864 let protocol_id = config.protocol_id();
865 let fork_id = config.chain_spec.fork_id();
866 let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
867
868 let block_downloader = match block_relay {
869 Some(params) => {
870 let BlockRelayParams { mut server, downloader, request_response_config } = params;
871
872 net_config.add_request_response_protocol(request_response_config);
873
874 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
875 server.run().await;
876 });
877
878 downloader
879 },
880 None => build_default_block_downloader(
881 &protocol_id,
882 fork_id,
883 &mut net_config,
884 network_service_provider.handle(),
885 Arc::clone(&client),
886 config.network.default_peers_set.in_peers as usize +
887 config.network.default_peers_set.out_peers as usize,
888 &spawn_handle,
889 ),
890 };
891
892 let syncing_strategy = build_polkadot_syncing_strategy(
893 protocol_id.clone(),
894 fork_id,
895 &mut net_config,
896 warp_sync_config,
897 block_downloader,
898 client.clone(),
899 &spawn_handle,
900 metrics_registry,
901 )?;
902
903 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
904 Roles::from(&config.role),
905 Arc::clone(&client),
906 metrics_registry,
907 metrics.clone(),
908 &net_config,
909 protocol_id.clone(),
910 fork_id,
911 block_announce_validator,
912 syncing_strategy,
913 network_service_provider.handle(),
914 import_queue.service(),
915 net_config.peer_store_handle(),
916 )?;
917
918 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
919
920 build_network_advanced(BuildNetworkAdvancedParams {
921 role: config.role,
922 protocol_id,
923 fork_id,
924 ipfs_server: config.network.ipfs_server,
925 announce_block: config.announce_block,
926 net_config,
927 client,
928 transaction_pool,
929 spawn_handle,
930 import_queue,
931 sync_service,
932 block_announce_config,
933 network_service_provider,
934 metrics_registry,
935 metrics,
936 })
937}
938
939pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
941where
942 Block: BlockT,
943 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
944{
945 pub role: Role,
947 pub protocol_id: ProtocolId,
949 pub fork_id: Option<&'a str>,
951 pub ipfs_server: bool,
953 pub announce_block: bool,
955 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
957 pub client: Arc<Client>,
959 pub transaction_pool: Arc<TxPool>,
961 pub spawn_handle: SpawnTaskHandle,
963 pub import_queue: IQ,
965 pub sync_service: SyncingService<Block>,
967 pub block_announce_config: Net::NotificationProtocolConfig,
969 pub network_service_provider: NetworkServiceProvider,
971 pub metrics_registry: Option<&'a Registry>,
973 pub metrics: NotificationMetrics,
975}
976
977pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
980 params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
981) -> Result<
982 (
983 Arc<dyn sc_network::service::traits::NetworkService>,
984 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
985 sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
986 NetworkStarter,
987 Arc<SyncingService<Block>>,
988 ),
989 Error,
990>
991where
992 Block: BlockT,
993 Client: ProvideRuntimeApi<Block>
994 + HeaderMetadata<Block, Error = sp_blockchain::Error>
995 + Chain<Block>
996 + BlockBackend<Block>
997 + BlockIdTo<Block, Error = sp_blockchain::Error>
998 + ProofProvider<Block>
999 + HeaderBackend<Block>
1000 + BlockchainEvents<Block>
1001 + 'static,
1002 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1003 IQ: ImportQueue<Block> + 'static,
1004 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1005{
1006 let BuildNetworkAdvancedParams {
1007 role,
1008 protocol_id,
1009 fork_id,
1010 ipfs_server,
1011 announce_block,
1012 mut net_config,
1013 client,
1014 transaction_pool,
1015 spawn_handle,
1016 import_queue,
1017 sync_service,
1018 block_announce_config,
1019 network_service_provider,
1020 metrics_registry,
1021 metrics,
1022 } = params;
1023
1024 let genesis_hash = client.info().genesis_hash;
1025
1026 let light_client_request_protocol_config = {
1027 let (handler, protocol_config) =
1029 LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1030 spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1031 protocol_config
1032 };
1033
1034 net_config.add_request_response_protocol(light_client_request_protocol_config);
1036
1037 let bitswap_config = ipfs_server.then(|| {
1038 let (handler, config) = Net::bitswap_server(client.clone());
1039 spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1040
1041 config
1042 });
1043
1044 let (transactions_handler_proto, transactions_config) =
1046 sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1047 protocol_id.clone(),
1048 genesis_hash,
1049 fork_id,
1050 metrics.clone(),
1051 net_config.peer_store_handle(),
1052 );
1053 net_config.add_notification_protocol(transactions_config);
1054
1055 let peer_store = net_config.take_peer_store();
1057 spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1058
1059 let sync_service = Arc::new(sync_service);
1060
1061 let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1062 role,
1063 executor: {
1064 let spawn_handle = Clone::clone(&spawn_handle);
1065 Box::new(move |fut| {
1066 spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1067 })
1068 },
1069 network_config: net_config,
1070 genesis_hash,
1071 protocol_id,
1072 fork_id: fork_id.map(ToOwned::to_owned),
1073 metrics_registry: metrics_registry.cloned(),
1074 block_announce_config,
1075 bitswap_config,
1076 notification_metrics: metrics,
1077 };
1078
1079 let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1080 let network_mut = Net::new(network_params)?;
1081 let network = network_mut.network_service().clone();
1082
1083 let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1084 network.clone(),
1085 sync_service.clone(),
1086 Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1087 metrics_registry,
1088 )?;
1089 spawn_handle.spawn_blocking(
1090 "network-transactions-handler",
1091 Some("networking"),
1092 tx_handler.run(),
1093 );
1094
1095 spawn_handle.spawn_blocking(
1096 "chain-sync-network-service-provider",
1097 Some("networking"),
1098 network_service_provider.run(Arc::new(network.clone())),
1099 );
1100 spawn_handle.spawn("import-queue", None, {
1101 let sync_service = sync_service.clone();
1102
1103 async move { import_queue.run(sync_service.as_ref()).await }
1104 });
1105
1106 let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1107 spawn_handle.spawn(
1108 "system-rpc-handler",
1109 Some("networking"),
1110 build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1111 role,
1112 network_mut.network_service(),
1113 sync_service.clone(),
1114 client.clone(),
1115 system_rpc_rx,
1116 has_bootnodes,
1117 ),
1118 );
1119
1120 let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1121 network_mut,
1122 client,
1123 sync_service.clone(),
1124 announce_block,
1125 );
1126
1127 let (network_start_tx, network_start_rx) = oneshot::channel();
1142
1143 spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
1151 if network_start_rx.await.is_err() {
1152 log::warn!(
1153 "The NetworkStart returned as part of `build_network` has been silently dropped"
1154 );
1155 return
1158 }
1159
1160 future.await
1161 });
1162
1163 Ok((
1164 network,
1165 system_rpc_tx,
1166 tx_handler_controller,
1167 NetworkStarter(network_start_tx),
1168 sync_service.clone(),
1169 ))
1170}
1171
1172pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1174where
1175 Block: BlockT,
1176 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1177{
1178 pub role: Role,
1180 pub protocol_id: ProtocolId,
1182 pub fork_id: Option<&'a str>,
1184 pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1186 pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1188 pub network_service_handle: NetworkServiceHandle,
1190 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1192 pub client: Arc<Client>,
1194 pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1196 pub num_peers_hint: usize,
1198 pub spawn_handle: &'a SpawnTaskHandle,
1200 pub metrics_registry: Option<&'a Registry>,
1202 pub metrics: NotificationMetrics,
1204}
1205
1206pub fn build_default_syncing_engine<Block, Client, Net>(
1209 config: DefaultSyncingEngineConfig<Block, Client, Net>,
1210) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1211where
1212 Block: BlockT,
1213 Client: HeaderBackend<Block>
1214 + BlockBackend<Block>
1215 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1216 + ProofProvider<Block>
1217 + Send
1218 + Sync
1219 + 'static,
1220 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1221{
1222 let DefaultSyncingEngineConfig {
1223 role,
1224 protocol_id,
1225 fork_id,
1226 net_config,
1227 block_announce_validator,
1228 network_service_handle,
1229 warp_sync_config,
1230 client,
1231 import_queue_service,
1232 num_peers_hint,
1233 spawn_handle,
1234 metrics_registry,
1235 metrics,
1236 } = config;
1237
1238 let block_downloader = build_default_block_downloader(
1239 &protocol_id,
1240 fork_id,
1241 net_config,
1242 network_service_handle.clone(),
1243 client.clone(),
1244 num_peers_hint,
1245 spawn_handle,
1246 );
1247 let syncing_strategy = build_polkadot_syncing_strategy(
1248 protocol_id.clone(),
1249 fork_id,
1250 net_config,
1251 warp_sync_config,
1252 block_downloader,
1253 client.clone(),
1254 spawn_handle,
1255 metrics_registry,
1256 )?;
1257
1258 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1259 Roles::from(&role),
1260 client,
1261 metrics_registry,
1262 metrics,
1263 &net_config,
1264 protocol_id,
1265 fork_id,
1266 block_announce_validator,
1267 syncing_strategy,
1268 network_service_handle,
1269 import_queue_service,
1270 net_config.peer_store_handle(),
1271 )?;
1272
1273 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1274
1275 Ok((sync_service, block_announce_config))
1276}
1277
1278pub fn build_default_block_downloader<Block, Client, Net>(
1280 protocol_id: &ProtocolId,
1281 fork_id: Option<&str>,
1282 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1283 network_service_handle: NetworkServiceHandle,
1284 client: Arc<Client>,
1285 num_peers_hint: usize,
1286 spawn_handle: &SpawnTaskHandle,
1287) -> Arc<dyn BlockDownloader<Block>>
1288where
1289 Block: BlockT,
1290 Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1291 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1292{
1293 let BlockRelayParams { mut server, downloader, request_response_config } =
1296 BlockRequestHandler::new::<Net>(
1297 network_service_handle,
1298 &protocol_id,
1299 fork_id,
1300 client.clone(),
1301 num_peers_hint,
1302 );
1303
1304 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1305 server.run().await;
1306 });
1307
1308 net_config.add_request_response_protocol(request_response_config);
1309
1310 downloader
1311}
1312
1313pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1315 protocol_id: ProtocolId,
1316 fork_id: Option<&str>,
1317 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1318 warp_sync_config: Option<WarpSyncConfig<Block>>,
1319 block_downloader: Arc<dyn BlockDownloader<Block>>,
1320 client: Arc<Client>,
1321 spawn_handle: &SpawnTaskHandle,
1322 metrics_registry: Option<&Registry>,
1323) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1324where
1325 Block: BlockT,
1326 Client: HeaderBackend<Block>
1327 + BlockBackend<Block>
1328 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1329 + ProofProvider<Block>
1330 + Send
1331 + Sync
1332 + 'static,
1333 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1334{
1335 if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1336 return Err("Warp sync enabled, but no warp sync provider configured.".into())
1337 }
1338
1339 if client.requires_full_sync() {
1340 match net_config.network_config.sync_mode {
1341 SyncMode::LightState { .. } =>
1342 return Err("Fast sync doesn't work for archive nodes".into()),
1343 SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1344 SyncMode::Full => {},
1345 }
1346 }
1347
1348 let genesis_hash = client.info().genesis_hash;
1349
1350 let (state_request_protocol_config, state_request_protocol_name) = {
1351 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1352 net_config.network_config.default_peers_set.reserved_nodes.len();
1353 let (handler, protocol_config) =
1355 StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1356 let config_name = protocol_config.protocol_name().clone();
1357
1358 spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1359 (protocol_config, config_name)
1360 };
1361 net_config.add_request_response_protocol(state_request_protocol_config);
1362
1363 let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1364 Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1365 let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1367 protocol_id,
1368 genesis_hash,
1369 fork_id,
1370 warp_with_provider.clone(),
1371 );
1372 let config_name = protocol_config.protocol_name().clone();
1373
1374 spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1375 (Some(protocol_config), Some(config_name))
1376 },
1377 _ => (None, None),
1378 };
1379 if let Some(config) = warp_sync_protocol_config {
1380 net_config.add_request_response_protocol(config);
1381 }
1382
1383 let syncing_config = PolkadotSyncingStrategyConfig {
1384 mode: net_config.network_config.sync_mode,
1385 max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1386 max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1387 metrics_registry: metrics_registry.cloned(),
1388 state_request_protocol_name,
1389 block_downloader,
1390 };
1391 Ok(Box::new(PolkadotSyncingStrategy::new(
1392 syncing_config,
1393 client,
1394 warp_sync_config,
1395 warp_sync_protocol_name,
1396 )?))
1397}
1398
1399#[must_use]
1401pub struct NetworkStarter(oneshot::Sender<()>);
1402
1403impl NetworkStarter {
1404 pub fn new(sender: oneshot::Sender<()>) -> Self {
1406 NetworkStarter(sender)
1407 }
1408
1409 pub fn start_network(self) {
1413 let _ = self.0.send(());
1414 }
1415}