Skip to main content

sc_service/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate service. Starts a thread that spins up the network, client, and extrinsic pool.
20//! Manages communication between them.
21
22#![warn(missing_docs)]
23#![recursion_limit = "1024"]
24
25pub mod chain_ops;
26pub mod client;
27pub mod config;
28pub mod error;
29
30mod builder;
31mod metrics;
32mod task_manager;
33
34use crate::config::Multiaddr;
35use std::{
36	collections::HashMap,
37	net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
38};
39
40use codec::{Decode, Encode};
41use futures::{pin_mut, FutureExt, StreamExt};
42use jsonrpsee::RpcModule;
43use log::{debug, error, trace, warn};
44use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
45use sc_network::{
46	config::MultiaddrWithPeerId, service::traits::NetworkService, NetworkBackend, NetworkBlock,
47	NetworkPeers, NetworkStateInfo,
48};
49use sc_network_sync::SyncingService;
50use sc_network_types::PeerId;
51pub use sc_rpc_server::create_rpc_runtime;
52use sc_rpc_server::Server;
53use sc_utils::mpsc::TracingUnboundedReceiver;
54use sp_blockchain::HeaderMetadata;
55use sp_consensus::SyncOracle;
56use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
57
58pub use self::{
59	builder::{
60		build_default_block_downloader, build_default_syncing_engine, build_network,
61		build_network_advanced, build_polkadot_syncing_strategy, gen_rpc_module, init_telemetry,
62		new_client, new_db_backend, new_full_client, new_full_parts, new_full_parts_record_import,
63		new_full_parts_with_genesis_builder, new_wasm_executor,
64		propagate_transaction_notifications, spawn_tasks, BuildNetworkAdvancedParams,
65		BuildNetworkParams, DefaultSyncingEngineConfig, KeystoreContainer, SpawnTasksParams,
66		TFullBackend, TFullCallExecutor, TFullClient,
67	},
68	client::{ClientConfig, LocalCallExecutor},
69	error::Error,
70	metrics::MetricsService,
71};
72#[allow(deprecated)]
73pub use builder::new_native_or_wasm_executor;
74
75pub use sc_chain_spec::{
76	construct_genesis_block, resolve_state_version_from_wasm, BuildGenesisBlock,
77	GenesisBlockBuilder,
78};
79
80pub use config::{
81	BasePath, BlocksPruning, Configuration, DatabaseSource, PruningMode, Role, RpcMethods, TaskType,
82};
83pub use sc_chain_spec::{
84	ChainSpec, ChainType, Extension as ChainSpecExtension, GenericChainSpec, NoExtension,
85	Properties,
86};
87pub use sc_client_db::PruningFilter;
88
89use crate::config::RpcConfiguration;
90use prometheus_endpoint::Registry;
91pub use sc_consensus::ImportQueue;
92pub use sc_executor::NativeExecutionDispatch;
93pub use sc_network_sync::WarpSyncConfig;
94#[doc(hidden)]
95pub use sc_network_transactions::config::{TransactionImport, TransactionImportFuture};
96pub use sc_rpc::{RandomIntegerSubscriptionId, RandomStringSubscriptionId};
97pub use sc_tracing::TracingReceiver;
98pub use sc_transaction_pool::TransactionPoolOptions;
99pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
100#[doc(hidden)]
101pub use std::{ops::Deref, result::Result, sync::Arc};
102pub use task_manager::{
103	SpawnEssentialTaskHandle, SpawnTaskHandle, Task, TaskManager, TaskRegistry, DEFAULT_GROUP_NAME,
104};
105use tokio::runtime::Handle;
106
107const DEFAULT_PROTOCOL_ID: &str = "sup";
108
109/// A running RPC service that can perform in-memory RPC queries.
110#[derive(Clone)]
111pub struct RpcHandlers {
112	// This is legacy and may be removed at some point, it was for WASM stuff before smoldot was a
113	// thing. https://github.com/paritytech/polkadot-sdk/pull/5038#discussion_r1694971805
114	rpc_module: Arc<RpcModule<()>>,
115
116	// This can be used to introspect the port the RPC server is listening on. SDK consumers are
117	// depending on this and it should be supported even if in-memory query support is removed.
118	listen_addresses: Vec<Multiaddr>,
119}
120
121impl RpcHandlers {
122	/// Create PRC handlers instance.
123	pub fn new(rpc_module: Arc<RpcModule<()>>, listen_addresses: Vec<Multiaddr>) -> Self {
124		Self { rpc_module, listen_addresses }
125	}
126
127	/// Starts an RPC query.
128	///
129	/// The query is passed as a string and must be valid JSON-RPC request object.
130	///
131	/// Returns a response and a stream if the call successful, fails if the
132	/// query could not be decoded as a JSON-RPC request object.
133	///
134	/// If the request subscribes you to events, the `stream` can be used to
135	/// retrieve the events.
136	pub async fn rpc_query(
137		&self,
138		json_query: &str,
139	) -> Result<(String, tokio::sync::mpsc::Receiver<String>), serde_json::Error> {
140		// Because `tokio::sync::mpsc::channel` is used under the hood
141		// it will panic if it's set to usize::MAX.
142		//
143		// This limit is used to prevent panics and is large enough.
144		const TOKIO_MPSC_MAX_SIZE: usize = tokio::sync::Semaphore::MAX_PERMITS;
145
146		self.rpc_module.raw_json_request(json_query, TOKIO_MPSC_MAX_SIZE).await
147	}
148
149	/// Provides access to the underlying `RpcModule`
150	pub fn handle(&self) -> Arc<RpcModule<()>> {
151		self.rpc_module.clone()
152	}
153
154	/// Provides access to listen addresses
155	pub fn listen_addresses(&self) -> &[Multiaddr] {
156		&self.listen_addresses[..]
157	}
158}
159
160/// An incomplete set of chain components, but enough to run the chain ops subcommands.
161pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, TransactionPool, Other> {
162	/// A shared client instance.
163	pub client: Arc<Client>,
164	/// A shared backend instance.
165	pub backend: Arc<Backend>,
166	/// The chain task manager.
167	pub task_manager: TaskManager,
168	/// A keystore container instance.
169	pub keystore_container: KeystoreContainer,
170	/// A chain selection algorithm instance.
171	pub select_chain: SelectChain,
172	/// An import queue.
173	pub import_queue: ImportQueue,
174	/// A shared transaction pool.
175	pub transaction_pool: Arc<TransactionPool>,
176	/// Everything else that needs to be passed into the main build function.
177	pub other: Other,
178}
179
180/// Builds a future that continuously polls the network.
181async fn build_network_future<
182	B: BlockT,
183	C: BlockchainEvents<B>
184		+ HeaderBackend<B>
185		+ BlockBackend<B>
186		+ HeaderMetadata<B, Error = sp_blockchain::Error>
187		+ ProofProvider<B>
188		+ Send
189		+ Sync
190		+ 'static,
191	H: sc_network_common::ExHashT,
192	N: NetworkBackend<B, <B as BlockT>::Hash>,
193>(
194	network: N,
195	client: Arc<C>,
196	sync_service: Arc<SyncingService<B>>,
197	announce_imported_blocks: bool,
198) {
199	let mut imported_blocks_stream = client.import_notification_stream().fuse();
200
201	// Stream of finalized blocks reported by the client.
202	let mut finality_notification_stream = client.finality_notification_stream().fuse();
203
204	let network_run = network.run().fuse();
205	pin_mut!(network_run);
206
207	loop {
208		futures::select! {
209			// List of blocks that the client has imported.
210			notification = imported_blocks_stream.next() => {
211				let notification = match notification {
212					Some(n) => n,
213					// If this stream is shut down, that means the client has shut down, and the
214					// most appropriate thing to do for the network future is to shut down too.
215					None => {
216						warn!("Block import stream has terminated, shutting down the network future. Ignore if the node is stopping.");
217						return
218					},
219				};
220
221				if announce_imported_blocks {
222					sync_service.announce_block(notification.hash, None);
223				}
224
225				if notification.is_new_best {
226					sync_service.new_best_block_imported(
227						notification.hash,
228						*notification.header.number(),
229					);
230				}
231			}
232
233			// List of blocks that the client has finalized.
234			notification = finality_notification_stream.select_next_some() => {
235				sync_service.on_block_finalized(notification.hash, notification.header);
236			}
237
238			// Drive the network. Shut down the network future if `NetworkWorker` has terminated.
239			_ = network_run => {
240				warn!("`NetworkWorker` has terminated, shutting down the network future. Ignore if the node is stopping.");
241				return
242			}
243		}
244	}
245}
246
247/// Builds a future that processes system RPC requests.
248pub async fn build_system_rpc_future<
249	B: BlockT,
250	C: BlockchainEvents<B>
251		+ HeaderBackend<B>
252		+ BlockBackend<B>
253		+ HeaderMetadata<B, Error = sp_blockchain::Error>
254		+ ProofProvider<B>
255		+ Send
256		+ Sync
257		+ 'static,
258	H: sc_network_common::ExHashT,
259>(
260	role: Role,
261	network_service: Arc<dyn NetworkService>,
262	sync_service: Arc<SyncingService<B>>,
263	client: Arc<C>,
264	mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
265	should_have_peers: bool,
266) {
267	// Current best block at initialization, to report to the RPC layer.
268	let starting_block = client.info().best_number;
269
270	loop {
271		// Answer incoming RPC requests.
272		let Some(req) = rpc_rx.next().await else {
273			debug!("RPC requests stream has terminated, shutting down the system RPC future.");
274			return;
275		};
276
277		match req {
278			sc_rpc::system::Request::Health(sender) => match sync_service.peers_info().await {
279				Ok(info) => {
280					let _ = sender.send(sc_rpc::system::Health {
281						peers: info.len(),
282						is_syncing: sync_service.is_major_syncing(),
283						should_have_peers,
284					});
285				},
286				Err(_) => log::error!("`SyncingEngine` shut down"),
287			},
288			sc_rpc::system::Request::LocalPeerId(sender) => {
289				let _ = sender.send(network_service.local_peer_id().to_base58());
290			},
291			sc_rpc::system::Request::LocalListenAddresses(sender) => {
292				let peer_id = (network_service.local_peer_id()).into();
293				let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id);
294				let addresses = network_service
295					.listen_addresses()
296					.iter()
297					.map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string())
298					.collect();
299				let _ = sender.send(addresses);
300			},
301			sc_rpc::system::Request::Peers(sender) => match sync_service.peers_info().await {
302				Ok(info) => {
303					let _ = sender.send(
304						info.into_iter()
305							.map(|(peer_id, p)| sc_rpc::system::PeerInfo {
306								peer_id: peer_id.to_base58(),
307								roles: format!("{:?}", p.roles),
308								best_hash: p.best_hash,
309								best_number: p.best_number,
310							})
311							.collect(),
312					);
313				},
314				Err(_) => log::error!("`SyncingEngine` shut down"),
315			},
316			sc_rpc::system::Request::NetworkState(sender) => {
317				let network_state = network_service.network_state().await;
318				if let Ok(network_state) = network_state {
319					if let Ok(network_state) = serde_json::to_value(network_state) {
320						let _ = sender.send(network_state);
321					}
322				} else {
323					break;
324				}
325			},
326			sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => {
327				let result = match MultiaddrWithPeerId::try_from(peer_addr) {
328					Ok(peer) => network_service.add_reserved_peer(peer),
329					Err(err) => Err(err.to_string()),
330				};
331				let x = result.map_err(sc_rpc::system::error::Error::MalformattedPeerArg);
332				let _ = sender.send(x);
333			},
334			sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => {
335				let _ = match peer_id.parse::<PeerId>() {
336					Ok(peer_id) => {
337						network_service.remove_reserved_peer(peer_id);
338						sender.send(Ok(()))
339					},
340					Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg(
341						e.to_string(),
342					))),
343				};
344			},
345			sc_rpc::system::Request::NetworkReservedPeers(sender) => {
346				let Ok(reserved_peers) = network_service.reserved_peers().await else {
347					break;
348				};
349
350				let _ =
351					sender.send(reserved_peers.iter().map(|peer_id| peer_id.to_base58()).collect());
352			},
353			sc_rpc::system::Request::NodeRoles(sender) => {
354				use sc_rpc::system::NodeRole;
355
356				let node_role = match role {
357					Role::Authority { .. } => NodeRole::Authority,
358					Role::Full => NodeRole::Full,
359				};
360
361				let _ = sender.send(vec![node_role]);
362			},
363			sc_rpc::system::Request::SyncState(sender) => {
364				use sc_rpc::system::SyncState;
365
366				match sync_service.status().await.map(|status| status.best_seen_block) {
367					Ok(best_seen_block) => {
368						let best_number = client.info().best_number;
369						let _ = sender.send(SyncState {
370							starting_block,
371							current_block: best_number,
372							highest_block: best_seen_block.unwrap_or(best_number),
373						});
374					},
375					Err(_) => log::error!("`SyncingEngine` shut down"),
376				}
377			},
378		}
379	}
380
381	debug!("`NetworkWorker` has terminated, shutting down the system RPC future.");
382}
383
384/// Starts RPC servers.
385pub fn start_rpc_servers(
386	rpc_configuration: &RpcConfiguration,
387	registry: Option<&Registry>,
388	tokio_handle: &Handle,
389	rpc_api: RpcModule<()>,
390	rpc_runtime: tokio::runtime::Runtime,
391	rpc_id_provider: Option<Box<dyn sc_rpc_server::SubscriptionIdProvider>>,
392) -> Result<Server, error::Error> {
393	let endpoints: Vec<sc_rpc_server::RpcEndpoint> = if let Some(endpoints) =
394		rpc_configuration.addr.as_ref()
395	{
396		endpoints.clone()
397	} else {
398		let ipv6 =
399			SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, rpc_configuration.port, 0, 0));
400		let ipv4 = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, rpc_configuration.port));
401
402		vec![
403			sc_rpc_server::RpcEndpoint {
404				batch_config: rpc_configuration.batch_config,
405				cors: rpc_configuration.cors.clone(),
406				listen_addr: ipv4,
407				max_buffer_capacity_per_connection: rpc_configuration.message_buffer_capacity,
408				max_connections: rpc_configuration.max_connections,
409				max_payload_in_mb: rpc_configuration.max_request_size,
410				max_payload_out_mb: rpc_configuration.max_response_size,
411				max_subscriptions_per_connection: rpc_configuration.max_subs_per_conn,
412				rpc_methods: rpc_configuration.methods.into(),
413				rate_limit: rpc_configuration.rate_limit,
414				rate_limit_trust_proxy_headers: rpc_configuration.rate_limit_trust_proxy_headers,
415				rate_limit_whitelisted_ips: rpc_configuration.rate_limit_whitelisted_ips.clone(),
416				retry_random_port: true,
417				is_optional: false,
418			},
419			sc_rpc_server::RpcEndpoint {
420				batch_config: rpc_configuration.batch_config,
421				cors: rpc_configuration.cors.clone(),
422				listen_addr: ipv6,
423				max_buffer_capacity_per_connection: rpc_configuration.message_buffer_capacity,
424				max_connections: rpc_configuration.max_connections,
425				max_payload_in_mb: rpc_configuration.max_request_size,
426				max_payload_out_mb: rpc_configuration.max_response_size,
427				max_subscriptions_per_connection: rpc_configuration.max_subs_per_conn,
428				rpc_methods: rpc_configuration.methods.into(),
429				rate_limit: rpc_configuration.rate_limit,
430				rate_limit_trust_proxy_headers: rpc_configuration.rate_limit_trust_proxy_headers,
431				rate_limit_whitelisted_ips: rpc_configuration.rate_limit_whitelisted_ips.clone(),
432				retry_random_port: true,
433				is_optional: true,
434			},
435		]
436	};
437
438	let metrics = sc_rpc_server::RpcMetrics::new(registry)?;
439
440	let server_config = sc_rpc_server::Config {
441		endpoints,
442		metrics,
443		rpc_api,
444		id_provider: rpc_id_provider,
445		request_logger_limit: rpc_configuration.request_logger_limit,
446		rpc_runtime,
447	};
448
449	// TODO: https://github.com/paritytech/substrate/issues/13773
450	//
451	// `block_in_place` is a hack to allow callers to call `block_on` prior to
452	// calling `start_rpc_servers`.
453	match tokio::task::block_in_place(|| {
454		tokio_handle.block_on(sc_rpc_server::start_server(server_config))
455	}) {
456		Ok(server) => Ok(server),
457		Err(e) => Err(Error::Application(e)),
458	}
459}
460
461/// Transaction pool adapter.
462pub struct TransactionPoolAdapter<C, P> {
463	pool: Arc<P>,
464	client: Arc<C>,
465}
466
467impl<C, P> TransactionPoolAdapter<C, P> {
468	/// Constructs a new instance of [`TransactionPoolAdapter`].
469	pub fn new(pool: Arc<P>, client: Arc<C>) -> Self {
470		Self { pool, client }
471	}
472}
473
474/// Get transactions for propagation.
475///
476/// Function extracted to simplify the test and prevent creating `ServiceFactory`.
477fn transactions_to_propagate<Pool, B, H, E>(pool: &Pool) -> Vec<(H, Arc<B::Extrinsic>)>
478where
479	Pool: TransactionPool<Block = B, Hash = H, Error = E>,
480	B: BlockT,
481	H: std::hash::Hash + Eq + sp_runtime::traits::Member + sp_runtime::traits::MaybeSerialize,
482	E: IntoPoolError + From<sc_transaction_pool_api::error::Error>,
483{
484	pool.ready()
485		.filter(|t| t.is_propagable())
486		.map(|t| {
487			let hash = t.hash().clone();
488			let ex = t.data().clone();
489			(hash, ex)
490		})
491		.collect()
492}
493
494impl<B, H, C, Pool, E> sc_network_transactions::config::TransactionPool<H, B>
495	for TransactionPoolAdapter<C, Pool>
496where
497	C: HeaderBackend<B>
498		+ BlockBackend<B>
499		+ HeaderMetadata<B, Error = sp_blockchain::Error>
500		+ ProofProvider<B>
501		+ Send
502		+ Sync
503		+ 'static,
504	Pool: 'static + TransactionPool<Block = B, Hash = H, Error = E>,
505	B: BlockT,
506	H: std::hash::Hash + Eq + sp_runtime::traits::Member + sp_runtime::traits::MaybeSerialize,
507	E: 'static + IntoPoolError + From<sc_transaction_pool_api::error::Error>,
508{
509	fn transactions(&self) -> Vec<(H, Arc<B::Extrinsic>)> {
510		transactions_to_propagate(&*self.pool)
511	}
512
513	fn hash_of(&self, transaction: &B::Extrinsic) -> H {
514		self.pool.hash_of(transaction)
515	}
516
517	fn import(&self, transaction: B::Extrinsic) -> TransactionImportFuture {
518		let encoded = transaction.encode();
519		let uxt = match Decode::decode(&mut &encoded[..]) {
520			Ok(uxt) => uxt,
521			Err(e) => {
522				debug!(target: sc_transaction_pool::LOG_TARGET, "Transaction invalid: {:?}", e);
523				return Box::pin(futures::future::ready(TransactionImport::Bad));
524			},
525		};
526
527		let start = std::time::Instant::now();
528		let pool = self.pool.clone();
529		let client = self.client.clone();
530		Box::pin(async move {
531			match pool
532				.submit_one(
533					client.info().best_hash,
534					sc_transaction_pool_api::TransactionSource::External,
535					uxt,
536				)
537				.await
538			{
539				Ok(_) => {
540					let elapsed = start.elapsed();
541					trace!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}");
542					TransactionImport::NewGood
543				},
544				Err(e) => match e.into_pool_error() {
545					Ok(sc_transaction_pool_api::error::Error::AlreadyImported(_)) => {
546						TransactionImport::KnownGood
547					},
548					Ok(_) => TransactionImport::Bad,
549					Err(_) => {
550						// it is not bad at least, just some internal node logic error, so peer is
551						// innocent.
552						TransactionImport::KnownGood
553					},
554				},
555			}
556		})
557	}
558
559	fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>) {
560		self.pool.on_broadcasted(propagations)
561	}
562
563	fn transaction(&self, hash: &H) -> Option<Arc<B::Extrinsic>> {
564		self.pool.ready_transaction(hash).and_then(
565			// Only propagable transactions should be resolved for network service.
566			|tx| tx.is_propagable().then(|| tx.data().clone()),
567		)
568	}
569}
570
571#[cfg(test)]
572mod tests {
573	use super::*;
574	use futures::executor::block_on;
575	use sc_transaction_pool::BasicPool;
576	use sp_consensus::SelectChain;
577	use substrate_test_runtime_client::{
578		prelude::*,
579		runtime::{ExtrinsicBuilder, Transfer, TransferData},
580	};
581
582	#[test]
583	fn should_not_propagate_transactions_that_are_marked_as_such() {
584		// given
585		let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
586		let client = Arc::new(client);
587		let spawner = sp_core::testing::TaskExecutor::new();
588		let pool = Arc::from(BasicPool::new_full(
589			Default::default(),
590			true.into(),
591			None,
592			spawner,
593			client.clone(),
594		));
595		let source = sp_runtime::transaction_validity::TransactionSource::External;
596		let best = block_on(longest_chain.best_chain()).unwrap();
597		let transaction = Transfer {
598			amount: 5,
599			nonce: 0,
600			from: Sr25519Keyring::Alice.into(),
601			to: Sr25519Keyring::Bob.into(),
602		}
603		.into_unchecked_extrinsic();
604		block_on(pool.submit_one(best.hash(), source, transaction.clone())).unwrap();
605		block_on(pool.submit_one(
606			best.hash(),
607			source,
608			ExtrinsicBuilder::new_call_do_not_propagate().nonce(1).build(),
609		))
610		.unwrap();
611		assert_eq!(pool.status().ready, 2);
612
613		// when
614		let transactions = transactions_to_propagate(&*pool);
615
616		// then
617		assert_eq!(transactions.len(), 1);
618		assert!(TransferData::try_from(&*transactions[0].1).is_ok());
619	}
620}