Skip to main content

soil_service/
lib.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Substrate service. Starts a thread that spins up the network, client, and extrinsic pool.
8//! Manages communication between them.
9
10#![warn(missing_docs)]
11#![recursion_limit = "1024"]
12
13pub mod basic_authorship;
14pub mod chain_ops;
15pub mod client;
16pub mod config;
17pub mod error;
18pub mod informant;
19pub mod proposer_metrics;
20pub mod sysinfo;
21
22mod builder;
23mod metrics;
24mod task_manager;
25
26use crate::config::Multiaddr;
27use std::{
28	collections::HashMap,
29	net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
30};
31
32use codec::{Decode, Encode};
33use futures::{pin_mut, FutureExt, StreamExt};
34use jsonrpsee::RpcModule;
35use log::{debug, error, trace, warn};
36use soil_client::blockchain::HeaderMetadata;
37use soil_client::client_api::{
38	blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider,
39};
40use soil_client::consensus::SyncOracle;
41use soil_client::utils::mpsc::TracingUnboundedReceiver;
42use soil_network::sync::SyncingService;
43use soil_network::types::PeerId;
44use soil_network::{
45	config::MultiaddrWithPeerId, service::traits::NetworkService, NetworkBackend, NetworkBlock,
46	NetworkPeers, NetworkStateInfo,
47};
48use soil_rpc::server::Server;
49use subsoil::runtime::traits::{Block as BlockT, Header as HeaderT};
50
51pub use self::{
52	builder::{
53		build_default_block_downloader, build_default_syncing_engine, build_network,
54		build_network_advanced, build_polkadot_syncing_strategy, gen_rpc_module, init_telemetry,
55		new_client, new_db_backend, new_full_client, new_full_parts, new_full_parts_record_import,
56		new_full_parts_with_genesis_builder, new_wasm_executor,
57		propagate_transaction_notifications, spawn_tasks, BuildNetworkAdvancedParams,
58		BuildNetworkParams, DefaultSyncingEngineConfig, KeystoreContainer, SpawnTasksParams,
59		TFullBackend, TFullCallExecutor, TFullClient,
60	},
61	client::{ClientConfig, LocalCallExecutor},
62	error::Error,
63	metrics::MetricsService,
64};
65#[allow(deprecated)]
66pub use builder::new_native_or_wasm_executor;
67
68pub use soil_chain_spec::{
69	construct_genesis_block, resolve_state_version_from_wasm, BuildGenesisBlock,
70	GenesisBlockBuilder,
71};
72
73pub use config::{
74	BasePath, BlocksPruning, Configuration, DatabaseSource, PruningMode, Role, RpcMethods, TaskType,
75};
76pub use soil_chain_spec::{
77	ChainSpec, ChainType, Extension as ChainSpecExtension, GenericChainSpec, NoExtension,
78	Properties,
79};
80pub use soil_client::db::PruningFilter;
81
82use crate::config::RpcConfiguration;
83use soil_prometheus::Registry;
84pub use soil_client::executor::NativeExecutionDispatch;
85pub use soil_client::import::ImportQueue;
86pub use soil_client::tracing::TracingReceiver;
87pub use soil_client::transaction_pool::{error::IntoPoolError, InPoolTransaction, TransactionPool};
88pub use soil_network::sync::WarpSyncConfig;
89#[doc(hidden)]
90pub use soil_network::transactions::config::{TransactionImport, TransactionImportFuture};
91pub use soil_rpc::{RandomIntegerSubscriptionId, RandomStringSubscriptionId};
92pub use soil_txpool::TransactionPoolOptions;
93#[doc(hidden)]
94pub use std::{ops::Deref, result::Result, sync::Arc};
95pub use task_manager::{
96	SpawnEssentialTaskHandle, SpawnTaskHandle, Task, TaskManager, TaskRegistry, DEFAULT_GROUP_NAME,
97};
98use tokio::runtime::Handle;
99
100const DEFAULT_PROTOCOL_ID: &str = "sup";
101
102/// A running RPC service that can perform in-memory RPC queries.
103#[derive(Clone)]
104pub struct RpcHandlers {
105	// This is legacy and may be removed at some point, it was for WASM stuff before smoldot was a
106	// thing. https://github.com/paritytech/polkadot-sdk/pull/5038#discussion_r1694971805
107	rpc_module: Arc<RpcModule<()>>,
108
109	// This can be used to introspect the port the RPC server is listening on. SDK consumers are
110	// depending on this and it should be supported even if in-memory query support is removed.
111	listen_addresses: Vec<Multiaddr>,
112}
113
114impl RpcHandlers {
115	/// Create PRC handlers instance.
116	pub fn new(rpc_module: Arc<RpcModule<()>>, listen_addresses: Vec<Multiaddr>) -> Self {
117		Self { rpc_module, listen_addresses }
118	}
119
120	/// Starts an RPC query.
121	///
122	/// The query is passed as a string and must be valid JSON-RPC request object.
123	///
124	/// Returns a response and a stream if the call successful, fails if the
125	/// query could not be decoded as a JSON-RPC request object.
126	///
127	/// If the request subscribes you to events, the `stream` can be used to
128	/// retrieve the events.
129	pub async fn rpc_query(
130		&self,
131		json_query: &str,
132	) -> Result<(String, tokio::sync::mpsc::Receiver<String>), serde_json::Error> {
133		// Because `tokio::sync::mpsc::channel` is used under the hood
134		// it will panic if it's set to usize::MAX.
135		//
136		// This limit is used to prevent panics and is large enough.
137		const TOKIO_MPSC_MAX_SIZE: usize = tokio::sync::Semaphore::MAX_PERMITS;
138
139		self.rpc_module.raw_json_request(json_query, TOKIO_MPSC_MAX_SIZE).await
140	}
141
142	/// Provides access to the underlying `RpcModule`
143	pub fn handle(&self) -> Arc<RpcModule<()>> {
144		self.rpc_module.clone()
145	}
146
147	/// Provides access to listen addresses
148	pub fn listen_addresses(&self) -> &[Multiaddr] {
149		&self.listen_addresses[..]
150	}
151}
152
153/// An incomplete set of chain components, but enough to run the chain ops subcommands.
154pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, TransactionPool, Other> {
155	/// A shared client instance.
156	pub client: Arc<Client>,
157	/// A shared backend instance.
158	pub backend: Arc<Backend>,
159	/// The chain task manager.
160	pub task_manager: TaskManager,
161	/// A keystore container instance.
162	pub keystore_container: KeystoreContainer,
163	/// A chain selection algorithm instance.
164	pub select_chain: SelectChain,
165	/// An import queue.
166	pub import_queue: ImportQueue,
167	/// A shared transaction pool.
168	pub transaction_pool: Arc<TransactionPool>,
169	/// Everything else that needs to be passed into the main build function.
170	pub other: Other,
171}
172
173/// Builds a future that continuously polls the network.
174async fn build_network_future<
175	B: BlockT,
176	C: BlockchainEvents<B>
177		+ HeaderBackend<B>
178		+ BlockBackend<B>
179		+ HeaderMetadata<B, Error = soil_client::blockchain::Error>
180		+ ProofProvider<B>
181		+ Send
182		+ Sync
183		+ 'static,
184	H: soil_network::common::ExHashT,
185	N: NetworkBackend<B, <B as BlockT>::Hash>,
186>(
187	network: N,
188	client: Arc<C>,
189	sync_service: Arc<SyncingService<B>>,
190	announce_imported_blocks: bool,
191) {
192	let mut imported_blocks_stream = client.import_notification_stream().fuse();
193
194	// Stream of finalized blocks reported by the client.
195	let mut finality_notification_stream = client.finality_notification_stream().fuse();
196
197	let network_run = network.run().fuse();
198	pin_mut!(network_run);
199
200	loop {
201		futures::select! {
202			// List of blocks that the client has imported.
203			notification = imported_blocks_stream.next() => {
204				let notification = match notification {
205					Some(n) => n,
206					// If this stream is shut down, that means the client has shut down, and the
207					// most appropriate thing to do for the network future is to shut down too.
208					None => {
209						warn!("Block import stream has terminated, shutting down the network future. Ignore if the node is stopping.");
210						return
211					},
212				};
213
214				if announce_imported_blocks {
215					sync_service.announce_block(notification.hash, None);
216				}
217
218				if notification.is_new_best {
219					sync_service.new_best_block_imported(
220						notification.hash,
221						*notification.header.number(),
222					);
223				}
224			}
225
226			// List of blocks that the client has finalized.
227			notification = finality_notification_stream.select_next_some() => {
228				sync_service.on_block_finalized(notification.hash, notification.header);
229			}
230
231			// Drive the network. Shut down the network future if `NetworkWorker` has terminated.
232			_ = network_run => {
233				warn!("`NetworkWorker` has terminated, shutting down the network future. Ignore if the node is stopping.");
234				return
235			}
236		}
237	}
238}
239
240/// Builds a future that processes system RPC requests.
241pub async fn build_system_rpc_future<
242	B: BlockT,
243	C: BlockchainEvents<B>
244		+ HeaderBackend<B>
245		+ BlockBackend<B>
246		+ HeaderMetadata<B, Error = soil_client::blockchain::Error>
247		+ ProofProvider<B>
248		+ Send
249		+ Sync
250		+ 'static,
251	H: soil_network::common::ExHashT,
252>(
253	role: Role,
254	network_service: Arc<dyn NetworkService>,
255	sync_service: Arc<SyncingService<B>>,
256	client: Arc<C>,
257	mut rpc_rx: TracingUnboundedReceiver<soil_rpc::system::Request<B>>,
258	should_have_peers: bool,
259) {
260	// Current best block at initialization, to report to the RPC layer.
261	let starting_block = client.info().best_number;
262
263	loop {
264		// Answer incoming RPC requests.
265		let Some(req) = rpc_rx.next().await else {
266			debug!("RPC requests stream has terminated, shutting down the system RPC future.");
267			return;
268		};
269
270		match req {
271			soil_rpc::system::Request::Health(sender) => match sync_service.peers_info().await {
272				Ok(info) => {
273					let _ = sender.send(soil_rpc::system::Health {
274						peers: info.len(),
275						is_syncing: sync_service.is_major_syncing(),
276						should_have_peers,
277					});
278				},
279				Err(_) => log::error!("`SyncingEngine` shut down"),
280			},
281			soil_rpc::system::Request::LocalPeerId(sender) => {
282				let _ = sender.send(network_service.local_peer_id().to_base58());
283			},
284			soil_rpc::system::Request::LocalListenAddresses(sender) => {
285				let peer_id = (network_service.local_peer_id()).into();
286				let p2p_proto_suffix = soil_network::multiaddr::Protocol::P2p(peer_id);
287				let addresses = network_service
288					.listen_addresses()
289					.iter()
290					.map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string())
291					.collect();
292				let _ = sender.send(addresses);
293			},
294			soil_rpc::system::Request::Peers(sender) => match sync_service.peers_info().await {
295				Ok(info) => {
296					let _ = sender.send(
297						info.into_iter()
298							.map(|(peer_id, p)| soil_rpc::system::PeerInfo {
299								peer_id: peer_id.to_base58(),
300								roles: format!("{:?}", p.roles),
301								best_hash: p.best_hash,
302								best_number: p.best_number,
303							})
304							.collect(),
305					);
306				},
307				Err(_) => log::error!("`SyncingEngine` shut down"),
308			},
309			soil_rpc::system::Request::NetworkState(sender) => {
310				let network_state = network_service.network_state().await;
311				if let Ok(network_state) = network_state {
312					if let Ok(network_state) = serde_json::to_value(network_state) {
313						let _ = sender.send(network_state);
314					}
315				} else {
316					break;
317				}
318			},
319			soil_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => {
320				let result = match MultiaddrWithPeerId::try_from(peer_addr) {
321					Ok(peer) => network_service.add_reserved_peer(peer),
322					Err(err) => Err(err.to_string()),
323				};
324				let x = result.map_err(soil_rpc::system::error::Error::MalformattedPeerArg);
325				let _ = sender.send(x);
326			},
327			soil_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => {
328				let _ = match peer_id.parse::<PeerId>() {
329					Ok(peer_id) => {
330						network_service.remove_reserved_peer(peer_id);
331						sender.send(Ok(()))
332					},
333					Err(e) => sender.send(Err(
334						soil_rpc::system::error::Error::MalformattedPeerArg(e.to_string()),
335					)),
336				};
337			},
338			soil_rpc::system::Request::NetworkReservedPeers(sender) => {
339				let Ok(reserved_peers) = network_service.reserved_peers().await else {
340					break;
341				};
342
343				let _ =
344					sender.send(reserved_peers.iter().map(|peer_id| peer_id.to_base58()).collect());
345			},
346			soil_rpc::system::Request::NodeRoles(sender) => {
347				use soil_rpc::system::NodeRole;
348
349				let node_role = match role {
350					Role::Authority { .. } => NodeRole::Authority,
351					Role::Full => NodeRole::Full,
352				};
353
354				let _ = sender.send(vec![node_role]);
355			},
356			soil_rpc::system::Request::SyncState(sender) => {
357				use soil_rpc::system::SyncState;
358
359				match sync_service.status().await.map(|status| status.best_seen_block) {
360					Ok(best_seen_block) => {
361						let best_number = client.info().best_number;
362						let _ = sender.send(SyncState {
363							starting_block,
364							current_block: best_number,
365							highest_block: best_seen_block.unwrap_or(best_number),
366						});
367					},
368					Err(_) => log::error!("`SyncingEngine` shut down"),
369				}
370			},
371		}
372	}
373
374	debug!("`NetworkWorker` has terminated, shutting down the system RPC future.");
375}
376
377/// Starts RPC servers.
378pub fn start_rpc_servers<R>(
379	rpc_configuration: &RpcConfiguration,
380	registry: Option<&Registry>,
381	tokio_handle: &Handle,
382	gen_rpc_module: R,
383	rpc_id_provider: Option<Box<dyn soil_rpc::server::SubscriptionIdProvider>>,
384) -> Result<Server, error::Error>
385where
386	R: Fn() -> Result<RpcModule<()>, Error>,
387{
388	let endpoints: Vec<soil_rpc::server::RpcEndpoint> = if let Some(endpoints) =
389		rpc_configuration.addr.as_ref()
390	{
391		endpoints.clone()
392	} else {
393		let ipv6 =
394			SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, rpc_configuration.port, 0, 0));
395		let ipv4 = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, rpc_configuration.port));
396
397		vec![
398			soil_rpc::server::RpcEndpoint {
399				batch_config: rpc_configuration.batch_config,
400				cors: rpc_configuration.cors.clone(),
401				listen_addr: ipv4,
402				max_buffer_capacity_per_connection: rpc_configuration.message_buffer_capacity,
403				max_connections: rpc_configuration.max_connections,
404				max_payload_in_mb: rpc_configuration.max_request_size,
405				max_payload_out_mb: rpc_configuration.max_response_size,
406				max_subscriptions_per_connection: rpc_configuration.max_subs_per_conn,
407				rpc_methods: rpc_configuration.methods.into(),
408				rate_limit: rpc_configuration.rate_limit,
409				rate_limit_trust_proxy_headers: rpc_configuration.rate_limit_trust_proxy_headers,
410				rate_limit_whitelisted_ips: rpc_configuration.rate_limit_whitelisted_ips.clone(),
411				retry_random_port: true,
412				is_optional: false,
413			},
414			soil_rpc::server::RpcEndpoint {
415				batch_config: rpc_configuration.batch_config,
416				cors: rpc_configuration.cors.clone(),
417				listen_addr: ipv6,
418				max_buffer_capacity_per_connection: rpc_configuration.message_buffer_capacity,
419				max_connections: rpc_configuration.max_connections,
420				max_payload_in_mb: rpc_configuration.max_request_size,
421				max_payload_out_mb: rpc_configuration.max_response_size,
422				max_subscriptions_per_connection: rpc_configuration.max_subs_per_conn,
423				rpc_methods: rpc_configuration.methods.into(),
424				rate_limit: rpc_configuration.rate_limit,
425				rate_limit_trust_proxy_headers: rpc_configuration.rate_limit_trust_proxy_headers,
426				rate_limit_whitelisted_ips: rpc_configuration.rate_limit_whitelisted_ips.clone(),
427				retry_random_port: true,
428				is_optional: true,
429			},
430		]
431	};
432
433	let metrics = soil_rpc::server::RpcMetrics::new(registry)?;
434	let rpc_api = gen_rpc_module()?;
435
436	let server_config = soil_rpc::server::Config {
437		endpoints,
438		rpc_api,
439		metrics,
440		id_provider: rpc_id_provider,
441		tokio_handle: tokio_handle.clone(),
442		request_logger_limit: rpc_configuration.request_logger_limit,
443	};
444
445	// TODO: https://github.com/paritytech/substrate/issues/13773
446	//
447	// `block_in_place` is a hack to allow callers to call `block_on` prior to
448	// calling `start_rpc_servers`.
449	match tokio::task::block_in_place(|| {
450		tokio_handle.block_on(soil_rpc::server::start_server(server_config))
451	}) {
452		Ok(server) => Ok(server),
453		Err(e) => Err(Error::Application(e)),
454	}
455}
456
457/// Transaction pool adapter.
458pub struct TransactionPoolAdapter<C, P> {
459	pool: Arc<P>,
460	client: Arc<C>,
461}
462
463impl<C, P> TransactionPoolAdapter<C, P> {
464	/// Constructs a new instance of [`TransactionPoolAdapter`].
465	pub fn new(pool: Arc<P>, client: Arc<C>) -> Self {
466		Self { pool, client }
467	}
468}
469
470/// Get transactions for propagation.
471///
472/// Function extracted to simplify the test and prevent creating `ServiceFactory`.
473fn transactions_to_propagate<Pool, B, H, E>(pool: &Pool) -> Vec<(H, Arc<B::Extrinsic>)>
474where
475	Pool: TransactionPool<Block = B, Hash = H, Error = E>,
476	B: BlockT,
477	H: std::hash::Hash
478		+ Eq
479		+ subsoil::runtime::traits::Member
480		+ subsoil::runtime::traits::MaybeSerialize,
481	E: IntoPoolError + From<soil_client::transaction_pool::error::Error>,
482{
483	pool.ready()
484		.filter(|t| t.is_propagable())
485		.map(|t| {
486			let hash = t.hash().clone();
487			let ex = t.data().clone();
488			(hash, ex)
489		})
490		.collect()
491}
492
493impl<B, H, C, Pool, E> soil_network::transactions::config::TransactionPool<H, B>
494	for TransactionPoolAdapter<C, Pool>
495where
496	C: HeaderBackend<B>
497		+ BlockBackend<B>
498		+ HeaderMetadata<B, Error = soil_client::blockchain::Error>
499		+ ProofProvider<B>
500		+ Send
501		+ Sync
502		+ 'static,
503	Pool: 'static + TransactionPool<Block = B, Hash = H, Error = E>,
504	B: BlockT,
505	H: std::hash::Hash
506		+ Eq
507		+ subsoil::runtime::traits::Member
508		+ subsoil::runtime::traits::MaybeSerialize,
509	E: 'static + IntoPoolError + From<soil_client::transaction_pool::error::Error>,
510{
511	fn transactions(&self) -> Vec<(H, Arc<B::Extrinsic>)> {
512		transactions_to_propagate(&*self.pool)
513	}
514
515	fn hash_of(&self, transaction: &B::Extrinsic) -> H {
516		self.pool.hash_of(transaction)
517	}
518
519	fn import(&self, transaction: B::Extrinsic) -> TransactionImportFuture {
520		let encoded = transaction.encode();
521		let uxt = match Decode::decode(&mut &encoded[..]) {
522			Ok(uxt) => uxt,
523			Err(e) => {
524				debug!(target: soil_txpool::LOG_TARGET, "Transaction invalid: {:?}", e);
525				return Box::pin(futures::future::ready(TransactionImport::Bad));
526			},
527		};
528
529		let start = std::time::Instant::now();
530		let pool = self.pool.clone();
531		let client = self.client.clone();
532		Box::pin(async move {
533			match pool
534				.submit_one(
535					client.info().best_hash,
536					soil_client::transaction_pool::TransactionSource::External,
537					uxt,
538				)
539				.await
540			{
541				Ok(_) => {
542					let elapsed = start.elapsed();
543					trace!(target: soil_txpool::LOG_TARGET, "import transaction: {elapsed:?}");
544					TransactionImport::NewGood
545				},
546				Err(e) => match e.into_pool_error() {
547					Ok(soil_client::transaction_pool::error::Error::AlreadyImported(_)) => {
548						TransactionImport::KnownGood
549					},
550					Ok(_) => TransactionImport::Bad,
551					Err(_) => {
552						// it is not bad at least, just some internal node logic error, so peer is
553						// innocent.
554						TransactionImport::KnownGood
555					},
556				},
557			}
558		})
559	}
560
561	fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>) {
562		self.pool.on_broadcasted(propagations)
563	}
564
565	fn transaction(&self, hash: &H) -> Option<Arc<B::Extrinsic>> {
566		self.pool.ready_transaction(hash).and_then(
567			// Only propagable transactions should be resolved for network service.
568			|tx| tx.is_propagable().then(|| tx.data().clone()),
569		)
570	}
571}
572
573#[cfg(test)]
574mod tests {
575	use super::*;
576	use futures::executor::block_on;
577	use soil_client::consensus::SelectChain;
578	use soil_txpool::BasicPool;
579	use soil_test_node_runtime_client::{
580		prelude::*,
581		runtime::{ExtrinsicBuilder, Transfer, TransferData},
582	};
583
584	#[test]
585	fn should_not_propagate_transactions_that_are_marked_as_such() {
586		// given
587		let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
588		let client = Arc::new(client);
589		let spawner = subsoil::core::testing::TaskExecutor::new();
590		let pool = Arc::from(BasicPool::new_full(
591			Default::default(),
592			true.into(),
593			None,
594			spawner,
595			client.clone(),
596		));
597		let source = subsoil::runtime::transaction_validity::TransactionSource::External;
598		let best = block_on(longest_chain.best_chain()).unwrap();
599		let transaction = Transfer {
600			amount: 5,
601			nonce: 0,
602			from: Sr25519Keyring::Alice.into(),
603			to: Sr25519Keyring::Bob.into(),
604		}
605		.into_unchecked_extrinsic();
606		block_on(pool.submit_one(best.hash(), source, transaction.clone())).unwrap();
607		block_on(pool.submit_one(
608			best.hash(),
609			source,
610			ExtrinsicBuilder::new_call_do_not_propagate().nonce(1).build(),
611		))
612		.unwrap();
613		assert_eq!(pool.status().ready, 2);
614
615		// when
616		let transactions = transactions_to_propagate(&*pool);
617
618		// then
619		assert_eq!(transactions.len(), 1);
620		assert!(TransferData::try_from(&*transactions[0].1).is_ok());
621	}
622}