1use crate::{
8 build_network_future, build_system_rpc_future,
9 client::{Client, ClientConfig},
10 config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
11 error::Error,
12 metrics::MetricsService,
13 start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers,
14 SpawnEssentialTaskHandle, SpawnTaskHandle, TaskManager, TransactionPoolAdapter,
15};
16use futures::{select, FutureExt, StreamExt};
17use jsonrpsee::RpcModule;
18use log::{debug, error, info};
19use soil_prometheus::Registry;
20use soil_chain_spec::{get_extension, ChainSpec};
21use soil_client::blockchain::{HeaderBackend, HeaderMetadata};
22use soil_client::client_api::{
23 execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
24 BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, KeysIter, StorageProvider,
25 TrieCacheContext, UsageProvider,
26};
27use soil_client::consensus::block_validation::{
28 BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
29};
30use soil_client::db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
31use soil_client::executor::{
32 wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
33 WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
34};
35use soil_client::import::{ImportQueue, ImportQueueService};
36use soil_client::keystore::LocalKeystore;
37use soil_client::tracing::block::TracingExecuteBlock;
38use soil_client::transaction_pool::{MaintainedTransactionPool, TransactionPool};
39use soil_client::utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
40use soil_network::common::role::{Role, Roles};
41use soil_network::light::light_client_requests::handler::LightClientRequestHandler;
42use soil_network::sync::{
43 block_relay_protocol::{BlockDownloader, BlockRelayParams},
44 block_request_handler::BlockRequestHandler,
45 engine::SyncingEngine,
46 service::network::{NetworkServiceHandle, NetworkServiceProvider},
47 state_request_handler::StateRequestHandler,
48 strategy::{
49 polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
50 SyncingStrategy,
51 },
52 warp_request_handler::RequestHandler as WarpSyncRequestHandler,
53 SyncingService, WarpSyncConfig,
54};
55use soil_network::{
56 config::{FullNetworkConfiguration, ProtocolId, SyncMode},
57 multiaddr::Protocol,
58 service::{
59 traits::{PeerStore, RequestResponseConfig},
60 NotificationMetrics,
61 },
62 NetworkBackend, NetworkStateInfo,
63};
64use soil_rpc::v2::{
65 archive::ArchiveApiServer,
66 chain_head::ChainHeadApiServer,
67 chain_spec::ChainSpecApiServer,
68 transaction::{TransactionApiServer, TransactionBroadcastApiServer},
69};
70use soil_rpc::{
71 author::AuthorApiServer,
72 chain::ChainApiServer,
73 offchain::OffchainApiServer,
74 state::{ChildStateApiServer, StateApiServer},
75 system::SystemApiServer,
76 DenyUnsafe, SubscriptionTaskExecutor,
77};
78use soil_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
79use std::{
80 str::FromStr,
81 sync::Arc,
82 time::{Duration, SystemTime},
83};
84use subsoil::api::{CallApiAt, ProvideRuntimeApi};
85use subsoil::core::traits::{CodeExecutor, SpawnNamed};
86use subsoil::keystore::KeystorePtr;
87use subsoil::runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
88use subsoil::storage::{ChildInfo, ChildType, PrefixedStorageKey};
89
90pub type TFullClient<TBl, TRtApi, TExec> =
92 Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
93
94pub type TFullBackend<TBl> = Backend<TBl>;
96
97pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
99
100type TFullParts<TBl, TRtApi, TExec> =
101 (TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
102
103pub struct KeystoreContainer(Arc<LocalKeystore>);
105
106impl KeystoreContainer {
107 pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
109 let keystore = Arc::new(match config {
110 KeystoreConfig::Path { path, password } => {
111 LocalKeystore::open(path.clone(), password.clone())?
112 },
113 KeystoreConfig::InMemory => LocalKeystore::in_memory(),
114 });
115
116 Ok(Self(keystore))
117 }
118
119 pub fn keystore(&self) -> KeystorePtr {
121 self.0.clone()
122 }
123
124 pub fn local_keystore(&self) -> Arc<LocalKeystore> {
126 self.0.clone()
127 }
128}
129
130pub fn new_full_client<TBl, TRtApi, TExec>(
132 config: &Configuration,
133 telemetry: Option<TelemetryHandle>,
134 executor: TExec,
135 pruning_filters: Vec<Arc<dyn soil_client::db::PruningFilter>>,
136) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
137where
138 TBl: BlockT,
139 TExec: CodeExecutor + RuntimeVersionOf + Clone,
140{
141 new_full_parts(config, telemetry, executor, pruning_filters).map(|parts| parts.0)
142}
143
144pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
149 config: &Configuration,
150 telemetry: Option<TelemetryHandle>,
151 executor: TExec,
152 enable_import_proof_recording: bool,
153 pruning_filters: Vec<Arc<dyn soil_client::db::PruningFilter>>,
154) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
155where
156 TBl: BlockT,
157 TExec: CodeExecutor + RuntimeVersionOf + Clone,
158{
159 let mut db_config = config.db_config();
160 db_config.pruning_filters = pruning_filters;
161 let backend = new_db_backend(db_config)?;
162
163 let genesis_block_builder = GenesisBlockBuilder::new(
164 config.chain_spec.as_storage_builder(),
165 !config.no_genesis(),
166 backend.clone(),
167 executor.clone(),
168 )?;
169
170 new_full_parts_with_genesis_builder(
171 config,
172 telemetry,
173 executor,
174 backend,
175 genesis_block_builder,
176 enable_import_proof_recording,
177 )
178}
179
180pub fn new_full_parts<TBl, TRtApi, TExec>(
185 config: &Configuration,
186 telemetry: Option<TelemetryHandle>,
187 executor: TExec,
188 pruning_filters: Vec<Arc<dyn soil_client::db::PruningFilter>>,
189) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
190where
191 TBl: BlockT,
192 TExec: CodeExecutor + RuntimeVersionOf + Clone,
193{
194 new_full_parts_record_import(config, telemetry, executor, false, pruning_filters)
195}
196
197pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
199 config: &Configuration,
200 telemetry: Option<TelemetryHandle>,
201 executor: TExec,
202 backend: Arc<TFullBackend<TBl>>,
203 genesis_block_builder: TBuildGenesisBlock,
204 enable_import_proof_recording: bool,
205) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
206where
207 TBl: BlockT,
208 TExec: CodeExecutor + RuntimeVersionOf + Clone,
209 TBuildGenesisBlock:
210 BuildGenesisBlock<
211 TBl,
212 BlockImportOperation = <Backend<TBl> as soil_client::client_api::backend::Backend<
213 TBl,
214 >>::BlockImportOperation,
215 >,
216{
217 let keystore_container = KeystoreContainer::new(&config.keystore)?;
218
219 let task_manager = {
220 let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
221 TaskManager::new(config.tokio_handle.clone(), registry)?
222 };
223
224 let chain_spec = &config.chain_spec;
225 let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
226 .cloned()
227 .unwrap_or_default();
228
229 let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
230 .cloned()
231 .unwrap_or_default();
232
233 let client = {
234 let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
235
236 let wasm_runtime_substitutes = config
237 .chain_spec
238 .code_substitutes()
239 .into_iter()
240 .map(|(n, c)| {
241 let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
242 Error::Application(Box::from(format!(
243 "Failed to parse `{}` as block number for code substitutes. \
244 In an old version the key for code substitute was a block hash. \
245 Please update the chain spec to a version that is compatible with your node.",
246 n
247 )))
248 })?;
249 Ok((number, c))
250 })
251 .collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
252
253 let client = new_client(
254 backend.clone(),
255 executor,
256 genesis_block_builder,
257 fork_blocks,
258 bad_blocks,
259 extensions,
260 Box::new(task_manager.spawn_handle()),
261 config.prometheus_config.as_ref().map(|config| config.registry.clone()),
262 telemetry,
263 ClientConfig {
264 offchain_worker_enabled: config.offchain_worker.enabled,
265 offchain_indexing_api: config.offchain_worker.indexing_enabled,
266 wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
267 no_genesis: config.no_genesis(),
268 wasm_runtime_substitutes,
269 enable_import_proof_recording,
270 },
271 )?;
272
273 if let Some(warm_up_strategy) = config.warm_up_trie_cache {
274 let storage_root = client.usage_info().chain.best_hash;
275 let backend_clone = backend.clone();
276
277 if warm_up_strategy.is_blocking() {
278 warm_up_trie_cache(backend_clone, storage_root)?;
281 } else {
282 task_manager.spawn_handle().spawn_blocking(
283 "warm-up-trie-cache",
284 None,
285 async move {
286 if let Err(e) = warm_up_trie_cache(backend_clone, storage_root) {
287 error!("Failed to warm up trie cache: {e}");
288 }
289 },
290 );
291 }
292 }
293
294 client
295 };
296
297 Ok((client, backend, keystore_container, task_manager))
298}
299
300fn child_info(key: Vec<u8>) -> Option<ChildInfo> {
301 let prefixed_key = PrefixedStorageKey::new(key);
302 ChildType::from_prefixed_key(&prefixed_key).and_then(|(child_type, storage_key)| {
303 (child_type == ChildType::ParentKeyId).then(|| ChildInfo::new_default(storage_key))
304 })
305}
306
307fn warm_up_trie_cache<TBl: BlockT>(
308 backend: Arc<TFullBackend<TBl>>,
309 storage_root: TBl::Hash,
310) -> Result<(), Error> {
311 use soil_client::client_api::backend::Backend;
312 use subsoil::state_machine::Backend as StateBackend;
313
314 let untrusted_state = || backend.state_at(storage_root, TrieCacheContext::Untrusted);
315 let trusted_state = || backend.state_at(storage_root, TrieCacheContext::Trusted);
316
317 debug!("Populating trie cache started",);
318 let start_time = std::time::Instant::now();
319 let mut keys_count = 0;
320 let mut child_keys_count = 0;
321 for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None)? {
322 if keys_count != 0 && keys_count % 100_000 == 0 {
323 debug!("{} keys and {} child keys have been warmed", keys_count, child_keys_count);
324 }
325 match child_info(key.0.clone()) {
326 Some(info) => {
327 for child_key in
328 KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None)?
329 {
330 if trusted_state()?
331 .child_storage(&info, &child_key.0)
332 .unwrap_or_default()
333 .is_none()
334 {
335 debug!("Child storage value unexpectedly empty: {child_key:?}");
336 }
337 child_keys_count += 1;
338 }
339 },
340 None => {
341 if trusted_state()?.storage(&key.0).unwrap_or_default().is_none() {
342 debug!("Storage value unexpectedly empty: {key:?}");
343 }
344 keys_count += 1;
345 },
346 }
347 }
348 debug!(
349 "Trie cache populated with {keys_count} keys and {child_keys_count} child keys in {} s",
350 start_time.elapsed().as_secs_f32()
351 );
352
353 Ok(())
354}
355
356#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
359#[allow(deprecated)]
360pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
361 config: &Configuration,
362) -> soil_client::executor::NativeElseWasmExecutor<D> {
363 #[allow(deprecated)]
364 soil_client::executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(
365 &config.executor,
366 ))
367}
368
369pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
371 let strategy = config
372 .default_heap_pages
373 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
374 WasmExecutor::<H>::builder()
375 .with_execution_method(config.wasm_method)
376 .with_onchain_heap_alloc_strategy(strategy)
377 .with_offchain_heap_alloc_strategy(strategy)
378 .with_max_runtime_instances(config.max_runtime_instances)
379 .with_runtime_cache_size(config.runtime_cache_size)
380 .build()
381}
382
383pub fn new_db_backend<Block>(
388 settings: DatabaseSettings,
389) -> Result<Arc<Backend<Block>>, soil_client::blockchain::Error>
390where
391 Block: BlockT,
392{
393 const CANONICALIZATION_DELAY: u64 = 4096;
394
395 Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
396}
397
398pub fn new_client<E, Block, RA, G>(
400 backend: Arc<Backend<Block>>,
401 executor: E,
402 genesis_block_builder: G,
403 fork_blocks: ForkBlocks<Block>,
404 bad_blocks: BadBlocks<Block>,
405 execution_extensions: ExecutionExtensions<Block>,
406 spawn_handle: Box<dyn SpawnNamed>,
407 prometheus_registry: Option<Registry>,
408 telemetry: Option<TelemetryHandle>,
409 config: ClientConfig<Block>,
410) -> Result<
411 Client<Backend<Block>, crate::client::LocalCallExecutor<Block, Backend<Block>, E>, Block, RA>,
412 soil_client::blockchain::Error,
413>
414where
415 Block: BlockT,
416 E: CodeExecutor + RuntimeVersionOf,
417 G: BuildGenesisBlock<
418 Block,
419 BlockImportOperation = <Backend<Block> as soil_client::client_api::backend::Backend<
420 Block,
421 >>::BlockImportOperation,
422 >,
423{
424 let executor = crate::client::LocalCallExecutor::new(
425 backend.clone(),
426 executor,
427 config.clone(),
428 execution_extensions,
429 )?;
430
431 Client::new(
432 backend,
433 executor,
434 spawn_handle,
435 genesis_block_builder,
436 fork_blocks,
437 bad_blocks,
438 prometheus_registry,
439 telemetry,
440 config,
441 )
442}
443
444pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
446 pub config: Configuration,
448 pub client: Arc<TCl>,
450 pub backend: Arc<Backend>,
452 pub task_manager: &'a mut TaskManager,
454 pub keystore: KeystorePtr,
456 pub transaction_pool: Arc<TExPool>,
458 pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
460 pub network: Arc<dyn soil_network::service::traits::NetworkService>,
462 pub system_rpc_tx: TracingUnboundedSender<soil_rpc::system::Request<TBl>>,
464 pub tx_handler_controller:
466 soil_network::transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
467 pub sync_service: Arc<SyncingService<TBl>>,
469 pub telemetry: Option<&'a mut Telemetry>,
471 pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
475}
476
477pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
479 SpawnTasksParams {
480 mut config,
481 task_manager,
482 client,
483 backend,
484 keystore,
485 transaction_pool,
486 rpc_builder,
487 network,
488 system_rpc_tx,
489 tx_handler_controller,
490 sync_service,
491 telemetry,
492 tracing_execute_block: execute_block,
493 }: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
494) -> Result<RpcHandlers, Error>
495where
496 TCl: ProvideRuntimeApi<TBl>
497 + HeaderMetadata<TBl, Error = soil_client::blockchain::Error>
498 + Chain<TBl>
499 + BlockBackend<TBl>
500 + BlockIdTo<TBl, Error = soil_client::blockchain::Error>
501 + ProofProvider<TBl>
502 + HeaderBackend<TBl>
503 + BlockchainEvents<TBl>
504 + ExecutorProvider<TBl>
505 + UsageProvider<TBl>
506 + StorageProvider<TBl, TBackend>
507 + CallApiAt<TBl>
508 + Send
509 + 'static,
510 <TCl as ProvideRuntimeApi<TBl>>::Api:
511 subsoil::api::Metadata<TBl>
512 + subsoil::txpool::runtime_api::TaggedTransactionQueue<TBl>
513 + subsoil::session::SessionKeys<TBl>
514 + subsoil::api::ApiExt<TBl>,
515 TBl: BlockT,
516 TBl::Hash: Unpin,
517 TBl::Header: Unpin,
518 TBackend: 'static + soil_client::client_api::backend::Backend<TBl> + Send,
519 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
520{
521 let chain_info = client.usage_info().chain;
522
523 subsoil::session::generate_initial_session_keys(
524 client.clone(),
525 chain_info.best_hash,
526 config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
527 keystore.clone(),
528 )
529 .map_err(|e| Error::Application(Box::new(e)))?;
530
531 let sysinfo = crate::sysinfo::gather_sysinfo();
532 crate::sysinfo::print_sysinfo(&sysinfo);
533
534 let telemetry = telemetry
535 .map(|telemetry| {
536 init_telemetry(
537 config.network.node_name.clone(),
538 config.impl_name.clone(),
539 config.impl_version.clone(),
540 config.chain_spec.name().to_string(),
541 config.role.is_authority(),
542 network.clone(),
543 client.clone(),
544 telemetry,
545 Some(sysinfo),
546 )
547 })
548 .transpose()?;
549
550 info!("📦 Highest known block at #{}", chain_info.best_number);
551
552 let spawn_handle = task_manager.spawn_handle();
553
554 spawn_handle.spawn(
556 "txpool-notifications",
557 Some("transaction-pool"),
558 soil_txpool::notification_future(client.clone(), transaction_pool.clone()),
559 );
560
561 spawn_handle.spawn(
562 "on-transaction-imported",
563 Some("transaction-pool"),
564 propagate_transaction_notifications(
565 transaction_pool.clone(),
566 tx_handler_controller,
567 telemetry.clone(),
568 ),
569 );
570
571 let metrics_service =
573 if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
574 let metrics = MetricsService::with_prometheus(
576 telemetry,
577 ®istry,
578 config.role,
579 &config.network.node_name,
580 &config.impl_version,
581 )?;
582 spawn_handle.spawn(
583 "soil-prometheus",
584 None,
585 soil_prometheus::init_prometheus(port, registry).map(drop),
586 );
587
588 metrics
589 } else {
590 MetricsService::new(telemetry)
591 };
592
593 spawn_handle.spawn(
595 "telemetry-periodic-send",
596 None,
597 metrics_service.run(
598 client.clone(),
599 transaction_pool.clone(),
600 network.clone(),
601 sync_service.clone(),
602 ),
603 );
604
605 let rpc_id_provider = config.rpc.id_provider.take();
606
607 let rpc_v2_metrics = config
612 .prometheus_registry()
613 .map(|registry| soil_rpc::v2::transaction::TransactionMetrics::new(registry))
614 .transpose()?;
615
616 let gen_rpc_module = || {
617 gen_rpc_module(GenRpcModuleParams {
618 spawn_handle: task_manager.spawn_handle(),
619 client: client.clone(),
620 transaction_pool: transaction_pool.clone(),
621 keystore: keystore.clone(),
622 system_rpc_tx: system_rpc_tx.clone(),
623 impl_name: config.impl_name.clone(),
624 impl_version: config.impl_version.clone(),
625 chain_spec: config.chain_spec.as_ref(),
626 state_pruning: &config.state_pruning,
627 blocks_pruning: config.blocks_pruning,
628 backend: backend.clone(),
629 rpc_builder: &*rpc_builder,
630 metrics: rpc_v2_metrics.clone(),
631 tracing_execute_block: execute_block.clone(),
632 })
633 };
634
635 let rpc_server_handle = start_rpc_servers(
636 &config.rpc,
637 config.prometheus_registry(),
638 &config.tokio_handle,
639 gen_rpc_module,
640 rpc_id_provider,
641 )?;
642
643 let listen_addrs = rpc_server_handle
644 .listen_addrs()
645 .into_iter()
646 .map(|socket_addr| {
647 let mut multiaddr: Multiaddr = socket_addr.ip().into();
648 multiaddr.push(Protocol::Tcp(socket_addr.port()));
649 multiaddr
650 })
651 .collect();
652
653 let in_memory_rpc = {
654 let mut module = gen_rpc_module()?;
655 module.extensions_mut().insert(DenyUnsafe::No);
656 module
657 };
658
659 let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
660
661 spawn_handle.spawn(
663 "informant",
664 None,
665 crate::informant::build(client.clone(), network, sync_service.clone()),
666 );
667
668 task_manager.keep_alive((config.base_path, rpc_server_handle));
669
670 Ok(in_memory_rpc_handle)
671}
672
673pub async fn propagate_transaction_notifications<Block, ExPool>(
675 transaction_pool: Arc<ExPool>,
676 tx_handler_controller: soil_network::transactions::TransactionsHandlerController<
677 <Block as BlockT>::Hash,
678 >,
679 telemetry: Option<TelemetryHandle>,
680) where
681 Block: BlockT,
682 ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
683{
684 const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);
685
686 let mut notifications = transaction_pool.import_notification_stream().fuse();
688 let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
689 let mut tx_imported = false;
690
691 loop {
692 select! {
693 notification = notifications.next() => {
694 let Some(hash) = notification else { return };
695
696 tx_handler_controller.propagate_transaction(hash);
697
698 tx_imported = true;
699 },
700 _ = timer => {
701 timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
702
703 if !tx_imported {
704 continue;
705 }
706
707 tx_imported = false;
708 let status = transaction_pool.status();
709
710 telemetry!(
711 telemetry;
712 SUBSTRATE_INFO;
713 "txpool.import";
714 "ready" => status.ready,
715 "future" => status.future,
716 );
717 }
718 }
719 }
720}
721
722pub fn init_telemetry<Block, Client, Network>(
724 name: String,
725 implementation: String,
726 version: String,
727 chain: String,
728 authority: bool,
729 network: Network,
730 client: Arc<Client>,
731 telemetry: &mut Telemetry,
732 sysinfo: Option<soil_telemetry::SysInfo>,
733) -> soil_telemetry::Result<TelemetryHandle>
734where
735 Block: BlockT,
736 Client: BlockBackend<Block>,
737 Network: NetworkStateInfo,
738{
739 let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
740 let connection_message = ConnectionMessage {
741 name,
742 implementation,
743 version,
744 target_os: crate::sysinfo::TARGET_OS.into(),
745 target_arch: crate::sysinfo::TARGET_ARCH.into(),
746 target_env: crate::sysinfo::TARGET_ENV.into(),
747 config: String::new(),
748 chain,
749 genesis_hash: format!("{:?}", genesis_hash),
750 authority,
751 startup_time: SystemTime::UNIX_EPOCH
752 .elapsed()
753 .map(|dur| dur.as_millis())
754 .unwrap_or(0)
755 .to_string(),
756 network_id: network.local_peer_id().to_base58(),
757 sysinfo,
758 };
759
760 telemetry.start_telemetry(connection_message)?;
761
762 Ok(telemetry.handle())
763}
764
765pub struct GenRpcModuleParams<'a, TBl: BlockT, TBackend, TCl, TRpc, TExPool> {
767 pub spawn_handle: SpawnTaskHandle,
769 pub client: Arc<TCl>,
771 pub transaction_pool: Arc<TExPool>,
773 pub keystore: KeystorePtr,
775 pub system_rpc_tx: TracingUnboundedSender<soil_rpc::system::Request<TBl>>,
777 pub impl_name: String,
779 pub impl_version: String,
781 pub chain_spec: &'a dyn ChainSpec,
783 pub state_pruning: &'a Option<PruningMode>,
785 pub blocks_pruning: BlocksPruning,
787 pub backend: Arc<TBackend>,
789 pub rpc_builder: &'a dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>,
791 pub metrics: Option<soil_rpc::v2::transaction::TransactionMetrics>,
793 pub tracing_execute_block: Option<Arc<dyn TracingExecuteBlock<TBl>>>,
797}
798
799pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
801 GenRpcModuleParams {
802 spawn_handle,
803 client,
804 transaction_pool,
805 keystore,
806 system_rpc_tx,
807 impl_name,
808 impl_version,
809 chain_spec,
810 state_pruning,
811 blocks_pruning,
812 backend,
813 rpc_builder,
814 metrics,
815 tracing_execute_block: execute_block,
816 }: GenRpcModuleParams<TBl, TBackend, TCl, TRpc, TExPool>,
817) -> Result<RpcModule<()>, Error>
818where
819 TBl: BlockT,
820 TCl: ProvideRuntimeApi<TBl>
821 + BlockchainEvents<TBl>
822 + HeaderBackend<TBl>
823 + HeaderMetadata<TBl, Error = soil_client::blockchain::Error>
824 + ExecutorProvider<TBl>
825 + CallApiAt<TBl>
826 + ProofProvider<TBl>
827 + StorageProvider<TBl, TBackend>
828 + BlockBackend<TBl>
829 + Send
830 + Sync
831 + 'static,
832 TBackend: soil_client::client_api::backend::Backend<TBl> + 'static,
833 <TCl as ProvideRuntimeApi<TBl>>::Api:
834 subsoil::session::SessionKeys<TBl> + subsoil::api::Metadata<TBl>,
835 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
836 TBl::Hash: Unpin,
837 TBl::Header: Unpin,
838{
839 let system_info = soil_rpc::system::SystemInfo {
840 chain_name: chain_spec.name().into(),
841 impl_name,
842 impl_version,
843 properties: chain_spec.properties(),
844 chain_type: chain_spec.chain_type(),
845 };
846
847 let mut rpc_api = RpcModule::new(());
848 let task_executor = Arc::new(spawn_handle);
849
850 let (chain, state, child_state) = {
851 let chain = soil_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
852 let (state, child_state) =
853 soil_rpc::state::new_full(client.clone(), task_executor.clone(), execute_block);
854 let state = state.into_rpc();
855 let child_state = child_state.into_rpc();
856
857 (chain, state, child_state)
858 };
859
860 const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
861
862 let transaction_broadcast_rpc_v2 = soil_rpc::v2::transaction::TransactionBroadcast::new(
863 client.clone(),
864 transaction_pool.clone(),
865 task_executor.clone(),
866 MAX_TRANSACTION_PER_CONNECTION,
867 )
868 .into_rpc();
869
870 let transaction_v2 = soil_rpc::v2::transaction::Transaction::new(
871 client.clone(),
872 transaction_pool.clone(),
873 task_executor.clone(),
874 metrics,
875 )
876 .into_rpc();
877
878 let chain_head_v2 = soil_rpc::v2::chain_head::ChainHead::new(
879 client.clone(),
880 backend.clone(),
881 task_executor.clone(),
882 soil_rpc::v2::chain_head::ChainHeadConfig::default(),
884 )
885 .into_rpc();
886
887 let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false)
892 && blocks_pruning.is_archive();
893 let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
894 if is_archive_node {
895 let archive_v2 = soil_rpc::v2::archive::Archive::new(
896 client.clone(),
897 backend.clone(),
898 genesis_hash,
899 task_executor.clone(),
900 )
901 .into_rpc();
902 rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
903 }
904
905 let chain_spec_v2 = soil_rpc::v2::chain_spec::ChainSpec::new(
907 chain_spec.name().into(),
908 genesis_hash,
909 chain_spec.properties(),
910 )
911 .into_rpc();
912
913 let author = soil_rpc::author::Author::new(
914 client.clone(),
915 transaction_pool,
916 keystore,
917 task_executor.clone(),
918 )
919 .into_rpc();
920
921 let system = soil_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
922
923 if let Some(storage) = backend.offchain_storage() {
924 let offchain = soil_rpc::offchain::Offchain::new(storage).into_rpc();
925
926 rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
927 }
928
929 rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
931 rpc_api
932 .merge(transaction_broadcast_rpc_v2)
933 .map_err(|e| Error::Application(e.into()))?;
934 rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
935 rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
936
937 rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
939 rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
940 rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
941 rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
942 rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
943 let extra_rpcs = rpc_builder(task_executor.clone())?;
945 rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
946
947 Ok(rpc_api)
948}
949
950pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
952where
953 Block: BlockT,
954 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
955{
956 pub config: &'a Configuration,
958 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
960 pub client: Arc<Client>,
962 pub transaction_pool: Arc<TxPool>,
964 pub spawn_handle: SpawnTaskHandle,
966 pub spawn_essential_handle: SpawnEssentialTaskHandle,
968 pub import_queue: IQ,
970 pub block_announce_validator_builder: Option<
972 Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
973 >,
974 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
976 pub block_relay: Option<BlockRelayParams<Block, Net>>,
979 pub metrics: NotificationMetrics,
981}
982
983pub fn build_network<Block, Net, TxPool, IQ, Client>(
985 params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
986) -> Result<
987 (
988 Arc<dyn soil_network::service::traits::NetworkService>,
989 TracingUnboundedSender<soil_rpc::system::Request<Block>>,
990 soil_network::transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
991 Arc<SyncingService<Block>>,
992 ),
993 Error,
994>
995where
996 Block: BlockT,
997 Client: ProvideRuntimeApi<Block>
998 + HeaderMetadata<Block, Error = soil_client::blockchain::Error>
999 + Chain<Block>
1000 + BlockBackend<Block>
1001 + BlockIdTo<Block, Error = soil_client::blockchain::Error>
1002 + ProofProvider<Block>
1003 + HeaderBackend<Block>
1004 + BlockchainEvents<Block>
1005 + 'static,
1006 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1007 IQ: ImportQueue<Block> + 'static,
1008 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1009{
1010 let BuildNetworkParams {
1011 config,
1012 mut net_config,
1013 client,
1014 transaction_pool,
1015 spawn_handle,
1016 spawn_essential_handle,
1017 import_queue,
1018 block_announce_validator_builder,
1019 warp_sync_config,
1020 block_relay,
1021 metrics,
1022 } = params;
1023
1024 let block_announce_validator = if let Some(f) = block_announce_validator_builder {
1025 f(client.clone())
1026 } else {
1027 Box::new(DefaultBlockAnnounceValidator)
1028 };
1029
1030 let network_service_provider = NetworkServiceProvider::new();
1031 let protocol_id = config.protocol_id();
1032 let fork_id = config.chain_spec.fork_id();
1033 let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
1034
1035 let block_downloader = match block_relay {
1036 Some(params) => {
1037 let BlockRelayParams { mut server, downloader, request_response_config } = params;
1038
1039 net_config.add_request_response_protocol(request_response_config);
1040
1041 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1042 server.run().await;
1043 });
1044
1045 downloader
1046 },
1047 None => build_default_block_downloader(
1048 &protocol_id,
1049 fork_id,
1050 &mut net_config,
1051 network_service_provider.handle(),
1052 Arc::clone(&client),
1053 config.network.default_peers_set.in_peers as usize
1054 + config.network.default_peers_set.out_peers as usize,
1055 &spawn_handle,
1056 ),
1057 };
1058
1059 let syncing_strategy = build_polkadot_syncing_strategy(
1060 protocol_id.clone(),
1061 fork_id,
1062 &mut net_config,
1063 warp_sync_config,
1064 block_downloader,
1065 client.clone(),
1066 &spawn_handle,
1067 metrics_registry,
1068 config.blocks_pruning.is_archive(),
1069 )?;
1070
1071 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1072 Roles::from(&config.role),
1073 Arc::clone(&client),
1074 metrics_registry,
1075 metrics.clone(),
1076 &net_config,
1077 protocol_id.clone(),
1078 fork_id,
1079 block_announce_validator,
1080 syncing_strategy,
1081 network_service_provider.handle(),
1082 import_queue.service(),
1083 net_config.peer_store_handle(),
1084 )?;
1085
1086 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1087
1088 build_network_advanced(BuildNetworkAdvancedParams {
1089 role: config.role,
1090 protocol_id,
1091 fork_id,
1092 ipfs_server: config.network.ipfs_server,
1093 announce_block: config.announce_block,
1094 net_config,
1095 client,
1096 transaction_pool,
1097 spawn_handle,
1098 spawn_essential_handle,
1099 import_queue,
1100 sync_service,
1101 block_announce_config,
1102 network_service_provider,
1103 metrics_registry,
1104 metrics,
1105 })
1106}
1107
1108pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
1110where
1111 Block: BlockT,
1112 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1113{
1114 pub role: Role,
1116 pub protocol_id: ProtocolId,
1118 pub fork_id: Option<&'a str>,
1120 pub ipfs_server: bool,
1122 pub announce_block: bool,
1124 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1126 pub client: Arc<Client>,
1128 pub transaction_pool: Arc<TxPool>,
1130 pub spawn_handle: SpawnTaskHandle,
1132 pub spawn_essential_handle: SpawnEssentialTaskHandle,
1134 pub import_queue: IQ,
1136 pub sync_service: SyncingService<Block>,
1138 pub block_announce_config: Net::NotificationProtocolConfig,
1140 pub network_service_provider: NetworkServiceProvider,
1142 pub metrics_registry: Option<&'a Registry>,
1144 pub metrics: NotificationMetrics,
1146}
1147
1148pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
1151 params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
1152) -> Result<
1153 (
1154 Arc<dyn soil_network::service::traits::NetworkService>,
1155 TracingUnboundedSender<soil_rpc::system::Request<Block>>,
1156 soil_network::transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1157 Arc<SyncingService<Block>>,
1158 ),
1159 Error,
1160>
1161where
1162 Block: BlockT,
1163 Client: ProvideRuntimeApi<Block>
1164 + HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1165 + Chain<Block>
1166 + BlockBackend<Block>
1167 + BlockIdTo<Block, Error = soil_client::blockchain::Error>
1168 + ProofProvider<Block>
1169 + HeaderBackend<Block>
1170 + BlockchainEvents<Block>
1171 + 'static,
1172 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1173 IQ: ImportQueue<Block> + 'static,
1174 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1175{
1176 let BuildNetworkAdvancedParams {
1177 role,
1178 protocol_id,
1179 fork_id,
1180 ipfs_server,
1181 announce_block,
1182 mut net_config,
1183 client,
1184 transaction_pool,
1185 spawn_handle,
1186 spawn_essential_handle,
1187 import_queue,
1188 sync_service,
1189 block_announce_config,
1190 network_service_provider,
1191 metrics_registry,
1192 metrics,
1193 } = params;
1194
1195 let genesis_hash = client.info().genesis_hash;
1196
1197 let light_client_request_protocol_config = {
1198 let (handler, protocol_config) =
1200 LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1201 spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1202 protocol_config
1203 };
1204
1205 net_config.add_request_response_protocol(light_client_request_protocol_config);
1207
1208 let bitswap_config = ipfs_server.then(|| {
1209 let (handler, config) = Net::bitswap_server(client.clone());
1210 spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1211
1212 config
1213 });
1214
1215 let (transactions_handler_proto, transactions_config) =
1217 soil_network::transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1218 protocol_id.clone(),
1219 genesis_hash,
1220 fork_id,
1221 metrics.clone(),
1222 net_config.peer_store_handle(),
1223 );
1224 net_config.add_notification_protocol(transactions_config);
1225
1226 let peer_store = net_config.take_peer_store();
1228 spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1229
1230 let sync_service = Arc::new(sync_service);
1231
1232 let network_params = soil_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1233 role,
1234 executor: {
1235 let spawn_handle = Clone::clone(&spawn_handle);
1236 Box::new(move |fut| {
1237 spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1238 })
1239 },
1240 network_config: net_config,
1241 genesis_hash,
1242 protocol_id,
1243 fork_id: fork_id.map(ToOwned::to_owned),
1244 metrics_registry: metrics_registry.cloned(),
1245 block_announce_config,
1246 bitswap_config,
1247 notification_metrics: metrics,
1248 };
1249
1250 let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1251 let network_mut = Net::new(network_params)?;
1252 let network = network_mut.network_service().clone();
1253
1254 let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1255 network.clone(),
1256 sync_service.clone(),
1257 Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1258 metrics_registry,
1259 )?;
1260 spawn_handle.spawn_blocking(
1261 "network-transactions-handler",
1262 Some("networking"),
1263 tx_handler.run(),
1264 );
1265
1266 spawn_handle.spawn_blocking(
1267 "chain-sync-network-service-provider",
1268 Some("networking"),
1269 network_service_provider.run(Arc::new(network.clone())),
1270 );
1271 spawn_handle.spawn("import-queue", None, {
1272 let sync_service = sync_service.clone();
1273
1274 async move { import_queue.run(sync_service.as_ref()).await }
1275 });
1276
1277 let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1278 spawn_handle.spawn(
1279 "system-rpc-handler",
1280 Some("networking"),
1281 build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1282 role,
1283 network_mut.network_service(),
1284 sync_service.clone(),
1285 client.clone(),
1286 system_rpc_rx,
1287 has_bootnodes,
1288 ),
1289 );
1290
1291 let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1292 network_mut,
1293 client,
1294 sync_service.clone(),
1295 announce_block,
1296 );
1297
1298 spawn_essential_handle.spawn_blocking("network-worker", Some("networking"), future);
1309
1310 Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone()))
1311}
1312
1313pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1315where
1316 Block: BlockT,
1317 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1318{
1319 pub role: Role,
1321 pub protocol_id: ProtocolId,
1323 pub fork_id: Option<&'a str>,
1325 pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1327 pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1329 pub network_service_handle: NetworkServiceHandle,
1331 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1333 pub client: Arc<Client>,
1335 pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1337 pub num_peers_hint: usize,
1339 pub spawn_handle: &'a SpawnTaskHandle,
1341 pub metrics_registry: Option<&'a Registry>,
1343 pub metrics: NotificationMetrics,
1345 pub archive_blocks: bool,
1348}
1349
1350pub fn build_default_syncing_engine<Block, Client, Net>(
1353 config: DefaultSyncingEngineConfig<Block, Client, Net>,
1354) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1355where
1356 Block: BlockT,
1357 Client: HeaderBackend<Block>
1358 + BlockBackend<Block>
1359 + HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1360 + ProofProvider<Block>
1361 + Send
1362 + Sync
1363 + 'static,
1364 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1365{
1366 let DefaultSyncingEngineConfig {
1367 role,
1368 protocol_id,
1369 fork_id,
1370 net_config,
1371 block_announce_validator,
1372 network_service_handle,
1373 warp_sync_config,
1374 client,
1375 import_queue_service,
1376 num_peers_hint,
1377 spawn_handle,
1378 metrics_registry,
1379 metrics,
1380 archive_blocks,
1381 } = config;
1382
1383 let block_downloader = build_default_block_downloader(
1384 &protocol_id,
1385 fork_id,
1386 net_config,
1387 network_service_handle.clone(),
1388 client.clone(),
1389 num_peers_hint,
1390 spawn_handle,
1391 );
1392 let syncing_strategy = build_polkadot_syncing_strategy(
1393 protocol_id.clone(),
1394 fork_id,
1395 net_config,
1396 warp_sync_config,
1397 block_downloader,
1398 client.clone(),
1399 spawn_handle,
1400 metrics_registry,
1401 archive_blocks,
1402 )?;
1403
1404 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1405 Roles::from(&role),
1406 client,
1407 metrics_registry,
1408 metrics,
1409 &net_config,
1410 protocol_id,
1411 fork_id,
1412 block_announce_validator,
1413 syncing_strategy,
1414 network_service_handle,
1415 import_queue_service,
1416 net_config.peer_store_handle(),
1417 )?;
1418
1419 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1420
1421 Ok((sync_service, block_announce_config))
1422}
1423
1424pub fn build_default_block_downloader<Block, Client, Net>(
1426 protocol_id: &ProtocolId,
1427 fork_id: Option<&str>,
1428 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1429 network_service_handle: NetworkServiceHandle,
1430 client: Arc<Client>,
1431 num_peers_hint: usize,
1432 spawn_handle: &SpawnTaskHandle,
1433) -> Arc<dyn BlockDownloader<Block>>
1434where
1435 Block: BlockT,
1436 Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1437 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1438{
1439 let BlockRelayParams { mut server, downloader, request_response_config } =
1442 BlockRequestHandler::new::<Net>(
1443 network_service_handle,
1444 &protocol_id,
1445 fork_id,
1446 client.clone(),
1447 num_peers_hint,
1448 );
1449
1450 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1451 server.run().await;
1452 });
1453
1454 net_config.add_request_response_protocol(request_response_config);
1455
1456 downloader
1457}
1458
1459pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1461 protocol_id: ProtocolId,
1462 fork_id: Option<&str>,
1463 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1464 warp_sync_config: Option<WarpSyncConfig<Block>>,
1465 block_downloader: Arc<dyn BlockDownloader<Block>>,
1466 client: Arc<Client>,
1467 spawn_handle: &SpawnTaskHandle,
1468 metrics_registry: Option<&Registry>,
1469 archive_blocks: bool,
1470) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1471where
1472 Block: BlockT,
1473 Client: HeaderBackend<Block>
1474 + BlockBackend<Block>
1475 + HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1476 + ProofProvider<Block>
1477 + Send
1478 + Sync
1479 + 'static,
1480 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1481{
1482 if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1483 return Err("Warp sync enabled, but no warp sync provider configured.".into());
1484 }
1485
1486 if client.requires_full_sync() {
1487 match net_config.network_config.sync_mode {
1488 SyncMode::LightState { .. } => {
1489 return Err("Fast sync doesn't work for archive nodes".into())
1490 },
1491 SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1492 SyncMode::Full => {},
1493 }
1494 }
1495
1496 let genesis_hash = client.info().genesis_hash;
1497
1498 let (state_request_protocol_config, state_request_protocol_name) = {
1499 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize
1500 + net_config.network_config.default_peers_set.reserved_nodes.len();
1501 let (handler, protocol_config) =
1503 StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1504 let config_name = protocol_config.protocol_name().clone();
1505
1506 spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1507 (protocol_config, config_name)
1508 };
1509 net_config.add_request_response_protocol(state_request_protocol_config);
1510
1511 let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1512 Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1513 let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1515 protocol_id,
1516 genesis_hash,
1517 fork_id,
1518 warp_with_provider.clone(),
1519 );
1520 let config_name = protocol_config.protocol_name().clone();
1521
1522 spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1523 (Some(protocol_config), Some(config_name))
1524 },
1525 _ => (None, None),
1526 };
1527 if let Some(config) = warp_sync_protocol_config {
1528 net_config.add_request_response_protocol(config);
1529 }
1530
1531 let syncing_config = PolkadotSyncingStrategyConfig {
1532 mode: net_config.network_config.sync_mode,
1533 max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1534 max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1535 min_peers_to_start_warp_sync: net_config.network_config.min_peers_to_start_warp_sync,
1536 metrics_registry: metrics_registry.cloned(),
1537 state_request_protocol_name,
1538 block_downloader,
1539 archive_blocks,
1540 };
1541 Ok(Box::new(PolkadotSyncingStrategy::new(
1542 syncing_config,
1543 client,
1544 warp_sync_config,
1545 warp_sync_protocol_name,
1546 )?))
1547}