Skip to main content

sc_rpc/state/
state_full.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! State API backend for full nodes.
20
21use std::{collections::HashMap, marker::PhantomData, sync::Arc, time::Duration};
22
23use super::{
24	client_err,
25	error::{Error, Result},
26	ChildStateBackend, StateBackend,
27};
28use crate::{
29	utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
30	DenyUnsafe, SubscriptionTaskExecutor,
31};
32
33use futures::{future, stream, StreamExt};
34use jsonrpsee::{core::async_trait, types::ErrorObject, PendingSubscriptionSink};
35use sc_client_api::{
36	Backend, BlockBackend, BlockchainEvents, CallExecutor, ExecutorProvider, ProofProvider,
37	StorageProvider,
38};
39use sc_rpc_api::state::ReadProof;
40use sc_tracing::block::TracingExecuteBlock;
41use sp_api::{CallApiAt, Metadata, ProvideRuntimeApi};
42use sp_blockchain::{
43	CachedHeaderMetadata, Error as ClientError, HeaderBackend, HeaderMetadata,
44	Result as ClientResult,
45};
46use sp_core::{
47	storage::{
48		ChildInfo, ChildType, PrefixedStorageKey, StorageChangeSet, StorageData, StorageKey,
49	},
50	traits::CallContext,
51	Bytes,
52};
53use sp_runtime::traits::Block as BlockT;
54use sp_version::RuntimeVersion;
55
56/// The maximum time allowed for an RPC call when running without unsafe RPC enabled.
57const MAXIMUM_SAFE_RPC_CALL_TIMEOUT: Duration = Duration::from_secs(30);
58
59/// Ranges to query in `state_queryStorage`.
60struct QueryStorageRange<Block: BlockT> {
61	/// Hashes of all the blocks in the range.
62	pub hashes: Vec<Block::Hash>,
63}
64
65/// State API backend for full nodes.
66pub struct FullState<BE, Block: BlockT, Client> {
67	client: Arc<Client>,
68	executor: SubscriptionTaskExecutor,
69	execute_block: Option<Arc<dyn TracingExecuteBlock<Block>>>,
70	_phantom: PhantomData<BE>,
71}
72
73impl<BE, Block: BlockT, Client> FullState<BE, Block, Client>
74where
75	BE: Backend<Block>,
76	Client: StorageProvider<Block, BE>
77		+ HeaderBackend<Block>
78		+ BlockBackend<Block>
79		+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
80	Block: BlockT + 'static,
81{
82	/// Create new state API backend for full nodes.
83	pub fn new(
84		client: Arc<Client>,
85		executor: SubscriptionTaskExecutor,
86		execute_block: Option<Arc<dyn TracingExecuteBlock<Block>>>,
87	) -> Self {
88		Self { client, executor, execute_block, _phantom: PhantomData }
89	}
90
91	/// Returns given block hash or best block hash if None is passed.
92	fn block_or_best(&self, hash: Option<Block::Hash>) -> ClientResult<Block::Hash> {
93		Ok(hash.unwrap_or_else(|| self.client.info().best_hash))
94	}
95
96	/// Validates block range.
97	fn query_storage_range(
98		&self,
99		from: Block::Hash,
100		to: Option<Block::Hash>,
101	) -> Result<QueryStorageRange<Block>> {
102		let to = self
103			.block_or_best(to)
104			.map_err(|e| invalid_block::<Block>(from, to, e.to_string()))?;
105
106		let invalid_block_err =
107			|e: ClientError| invalid_block::<Block>(from, Some(to), e.to_string());
108		let from_meta = self.client.header_metadata(from).map_err(invalid_block_err)?;
109		let to_meta = self.client.header_metadata(to).map_err(invalid_block_err)?;
110
111		if from_meta.number > to_meta.number {
112			return Err(invalid_block_range(
113				&from_meta,
114				&to_meta,
115				"from number > to number".to_owned(),
116			));
117		}
118
119		// check if we can get from `to` to `from` by going through parent_hashes.
120		let from_number = from_meta.number;
121		let hashes = {
122			let mut hashes = vec![to_meta.hash];
123			let mut last = to_meta.clone();
124			while last.number > from_number {
125				let header_metadata = self
126					.client
127					.header_metadata(last.parent)
128					.map_err(|e| invalid_block_range::<Block>(&last, &to_meta, e.to_string()))?;
129				hashes.push(header_metadata.hash);
130				last = header_metadata;
131			}
132			if last.hash != from_meta.hash {
133				return Err(invalid_block_range(
134					&from_meta,
135					&to_meta,
136					"from and to are on different forks".to_owned(),
137				));
138			}
139			hashes.reverse();
140			hashes
141		};
142
143		Ok(QueryStorageRange { hashes })
144	}
145
146	/// Iterates through range.unfiltered_range and check each block for changes of keys' values.
147	fn query_storage_unfiltered(
148		&self,
149		range: &QueryStorageRange<Block>,
150		keys: &[StorageKey],
151		last_values: &mut HashMap<StorageKey, Option<StorageData>>,
152		changes: &mut Vec<StorageChangeSet<Block::Hash>>,
153	) -> Result<()> {
154		for block_hash in &range.hashes {
155			let mut block_changes = StorageChangeSet { block: *block_hash, changes: Vec::new() };
156			for key in keys {
157				let (has_changed, data) = {
158					let curr_data = self.client.storage(*block_hash, key).map_err(client_err)?;
159					match last_values.get(key) {
160						Some(prev_data) => (curr_data != *prev_data, curr_data),
161						None => (true, curr_data),
162					}
163				};
164				if has_changed {
165					block_changes.changes.push((key.clone(), data.clone()));
166				}
167				last_values.insert(key.clone(), data);
168			}
169			if !block_changes.changes.is_empty() {
170				changes.push(block_changes);
171			}
172		}
173		Ok(())
174	}
175}
176
177#[async_trait]
178impl<BE, Block, Client> StateBackend<Block, Client> for FullState<BE, Block, Client>
179where
180	Block: BlockT + 'static,
181	Block::Hash: Unpin,
182	BE: Backend<Block> + 'static,
183	Client: ExecutorProvider<Block>
184		+ StorageProvider<Block, BE>
185		+ ProofProvider<Block>
186		+ HeaderBackend<Block>
187		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
188		+ BlockchainEvents<Block>
189		+ CallApiAt<Block>
190		+ ProvideRuntimeApi<Block>
191		+ BlockBackend<Block>
192		+ Send
193		+ Sync
194		+ 'static,
195	Client::Api: Metadata<Block>,
196{
197	fn call(
198		&self,
199		block: Option<Block::Hash>,
200		method: String,
201		call_data: Bytes,
202	) -> std::result::Result<Bytes, Error> {
203		self.block_or_best(block)
204			.and_then(|block| {
205				self.client
206					.executor()
207					.call(block, &method, &call_data, CallContext::Offchain)
208					.map(Into::into)
209			})
210			.map_err(client_err)
211	}
212
213	// TODO: This is horribly broken; either remove it, or make it streaming.
214	fn storage_keys(
215		&self,
216		block: Option<Block::Hash>,
217		prefix: StorageKey,
218	) -> std::result::Result<Vec<StorageKey>, Error> {
219		// TODO: Remove the `.collect`.
220		self.block_or_best(block)
221			.and_then(|block| self.client.storage_keys(block, Some(&prefix), None))
222			.map(|iter| iter.collect())
223			.map_err(client_err)
224	}
225
226	// TODO: This is horribly broken; either remove it, or make it streaming.
227	fn storage_pairs(
228		&self,
229		block: Option<Block::Hash>,
230		prefix: StorageKey,
231	) -> std::result::Result<Vec<(StorageKey, StorageData)>, Error> {
232		// TODO: Remove the `.collect`.
233		self.block_or_best(block)
234			.and_then(|block| self.client.storage_pairs(block, Some(&prefix), None))
235			.map(|iter| iter.collect())
236			.map_err(client_err)
237	}
238
239	fn storage_keys_paged(
240		&self,
241		block: Option<Block::Hash>,
242		prefix: Option<StorageKey>,
243		count: u32,
244		start_key: Option<StorageKey>,
245	) -> std::result::Result<Vec<StorageKey>, Error> {
246		self.block_or_best(block)
247			.and_then(|block| self.client.storage_keys(block, prefix.as_ref(), start_key.as_ref()))
248			.map(|iter| iter.take(count as usize).collect())
249			.map_err(client_err)
250	}
251
252	fn storage(
253		&self,
254		block: Option<Block::Hash>,
255		key: StorageKey,
256	) -> std::result::Result<Option<StorageData>, Error> {
257		self.block_or_best(block)
258			.and_then(|block| self.client.storage(block, &key))
259			.map_err(client_err)
260	}
261
262	async fn storage_size(
263		&self,
264		block: Option<Block::Hash>,
265		key: StorageKey,
266		deny_unsafe: DenyUnsafe,
267	) -> std::result::Result<Option<u64>, Error> {
268		let block = match self.block_or_best(block) {
269			Ok(b) => b,
270			Err(e) => return Err(client_err(e)),
271		};
272
273		let client = self.client.clone();
274		let timeout = match deny_unsafe {
275			DenyUnsafe::Yes => Some(MAXIMUM_SAFE_RPC_CALL_TIMEOUT),
276			DenyUnsafe::No => None,
277		};
278
279		super::utils::spawn_blocking_with_timeout(timeout, move |is_timed_out| {
280			// Does the key point to a concrete entry in the database?
281			match client.storage(block, &key) {
282				Ok(Some(d)) => return Ok(Ok(Some(d.0.len() as u64))),
283				Err(e) => return Ok(Err(client_err(e))),
284				Ok(None) => {},
285			}
286
287			// The key doesn't point to anything, so it's probably a prefix.
288			let iter = match client.storage_keys(block, Some(&key), None).map_err(client_err) {
289				Ok(iter) => iter,
290				Err(e) => return Ok(Err(e)),
291			};
292
293			let mut sum = 0;
294			for storage_key in iter {
295				let value = client.storage(block, &storage_key).ok().flatten().unwrap_or_default();
296				sum += value.0.len() as u64;
297
298				is_timed_out.check_if_timed_out()?;
299			}
300
301			if sum > 0 {
302				Ok(Ok(Some(sum)))
303			} else {
304				Ok(Ok(None))
305			}
306		})
307		.await
308		.map_err(|error| Error::Client(Box::new(error)))?
309	}
310
311	fn storage_hash(
312		&self,
313		block: Option<Block::Hash>,
314		key: StorageKey,
315	) -> std::result::Result<Option<Block::Hash>, Error> {
316		self.block_or_best(block)
317			.and_then(|block| self.client.storage_hash(block, &key))
318			.map_err(client_err)
319	}
320
321	fn metadata(&self, block: Option<Block::Hash>) -> std::result::Result<Bytes, Error> {
322		self.block_or_best(block).map_err(client_err).and_then(|block| {
323			self.client
324				.runtime_api()
325				.metadata(block)
326				.map(Into::into)
327				.map_err(|e| Error::Client(Box::new(e)))
328		})
329	}
330
331	fn runtime_version(
332		&self,
333		block: Option<Block::Hash>,
334	) -> std::result::Result<RuntimeVersion, Error> {
335		self.block_or_best(block).map_err(client_err).and_then(|block| {
336			self.client
337				.runtime_version_at(block, CallContext::Offchain)
338				.map_err(|e| Error::Client(Box::new(e)))
339		})
340	}
341
342	fn query_storage(
343		&self,
344		from: Block::Hash,
345		to: Option<Block::Hash>,
346		keys: Vec<StorageKey>,
347	) -> std::result::Result<Vec<StorageChangeSet<Block::Hash>>, Error> {
348		let call_fn = move || {
349			let range = self.query_storage_range(from, to)?;
350			let mut changes = Vec::new();
351			let mut last_values = HashMap::new();
352			self.query_storage_unfiltered(&range, &keys, &mut last_values, &mut changes)?;
353			Ok(changes)
354		};
355		call_fn()
356	}
357
358	fn query_storage_at(
359		&self,
360		keys: Vec<StorageKey>,
361		at: Option<Block::Hash>,
362	) -> std::result::Result<Vec<StorageChangeSet<Block::Hash>>, Error> {
363		let at = at.unwrap_or_else(|| self.client.info().best_hash);
364		self.query_storage(at, Some(at), keys)
365	}
366
367	fn read_proof(
368		&self,
369		block: Option<Block::Hash>,
370		keys: Vec<StorageKey>,
371	) -> std::result::Result<ReadProof<Block::Hash>, Error> {
372		self.block_or_best(block)
373			.and_then(|block| {
374				self.client
375					.read_proof(block, &mut keys.iter().map(|key| key.0.as_ref()))
376					.map(|proof| proof.into_iter_nodes().map(|node| node.into()).collect())
377					.map(|proof| ReadProof { at: block, proof })
378			})
379			.map_err(client_err)
380	}
381
382	fn subscribe_runtime_version(&self, pending: PendingSubscriptionSink) {
383		let initial = match self
384			.block_or_best(None)
385			.and_then(|block| {
386				self.client.runtime_version_at(block, CallContext::Offchain).map_err(Into::into)
387			})
388			.map_err(|e| Error::Client(Box::new(e)))
389		{
390			Ok(initial) => initial,
391			Err(e) => {
392				spawn_subscription_task(&self.executor, pending.reject(e));
393				return;
394			},
395		};
396
397		let mut previous_version = initial.clone();
398		let client = self.client.clone();
399
400		// A stream of new versions
401		let version_stream = client
402			.import_notification_stream()
403			.filter(|n| future::ready(n.is_new_best))
404			.filter_map(move |n| {
405				let version = client
406					.runtime_version_at(n.hash, CallContext::Offchain)
407					.map_err(|e| Error::Client(Box::new(e)));
408
409				match version {
410					Ok(version) if version != previous_version => {
411						previous_version = version.clone();
412						future::ready(Some(version))
413					},
414					_ => future::ready(None),
415				}
416			});
417
418		let stream = futures::stream::once(future::ready(initial)).chain(version_stream);
419		spawn_subscription_task(
420			&self.executor,
421			PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
422		);
423	}
424
425	fn subscribe_storage(
426		&self,
427		pending: PendingSubscriptionSink,
428		keys: Option<Vec<StorageKey>>,
429		deny_unsafe: DenyUnsafe,
430	) {
431		if keys.is_none() {
432			if let Err(err) = deny_unsafe.check_if_safe() {
433				spawn_subscription_task(&self.executor, pending.reject(ErrorObject::from(err)));
434				return;
435			}
436		}
437
438		let stream = match self.client.storage_changes_notification_stream(keys.as_deref(), None) {
439			Ok(stream) => stream,
440			Err(blockchain_err) => {
441				spawn_subscription_task(
442					&self.executor,
443					pending.reject(Error::Client(Box::new(blockchain_err))),
444				);
445				return;
446			},
447		};
448
449		let initial = stream::iter(keys.map(|keys| {
450			let block = self.client.info().best_hash;
451			let changes = keys
452				.into_iter()
453				.map(|key| {
454					let v = self.client.storage(block, &key).ok().flatten();
455					(key, v)
456				})
457				.collect();
458			StorageChangeSet { block, changes }
459		}));
460
461		let storage_stream = stream.map(|storage_notif| StorageChangeSet {
462			block: storage_notif.block,
463			changes: storage_notif
464				.changes
465				.iter()
466				.filter_map(|(o_sk, k, v)| o_sk.is_none().then(|| (k.clone(), v.cloned())))
467				.collect(),
468		});
469
470		let stream = initial
471			.chain(storage_stream)
472			.filter(|storage| future::ready(!storage.changes.is_empty()));
473
474		spawn_subscription_task(
475			&self.executor,
476			PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
477		);
478	}
479
480	fn trace_block(
481		&self,
482		block: Block::Hash,
483		targets: Option<String>,
484		storage_keys: Option<String>,
485		methods: Option<String>,
486	) -> std::result::Result<sp_rpc::tracing::TraceBlockResponse, Error> {
487		sc_tracing::block::BlockExecutor::new(
488			self.client.clone(),
489			block,
490			targets,
491			storage_keys,
492			methods,
493			self.execute_block.clone(),
494		)
495		.trace_block()
496		.map_err(|e| invalid_block::<Block>(block, None, e.to_string()))
497	}
498}
499
500impl<BE, Block, Client> ChildStateBackend<Block, Client> for FullState<BE, Block, Client>
501where
502	Block: BlockT + 'static,
503	BE: Backend<Block> + 'static,
504	Client: ExecutorProvider<Block>
505		+ StorageProvider<Block, BE>
506		+ ProofProvider<Block>
507		+ HeaderBackend<Block>
508		+ BlockBackend<Block>
509		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
510		+ BlockchainEvents<Block>
511		+ CallApiAt<Block>
512		+ ProvideRuntimeApi<Block>
513		+ Send
514		+ Sync
515		+ 'static,
516	Client::Api: Metadata<Block>,
517{
518	fn read_child_proof(
519		&self,
520		block: Option<Block::Hash>,
521		storage_key: PrefixedStorageKey,
522		keys: Vec<StorageKey>,
523	) -> std::result::Result<ReadProof<Block::Hash>, Error> {
524		self.block_or_best(block)
525			.and_then(|block| {
526				let child_info = match ChildType::from_prefixed_key(&storage_key) {
527					Some((ChildType::ParentKeyId, storage_key)) => {
528						ChildInfo::new_default(storage_key)
529					},
530					None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
531				};
532				self.client
533					.read_child_proof(
534						block,
535						&child_info,
536						&mut keys.iter().map(|key| key.0.as_ref()),
537					)
538					.map(|proof| proof.into_iter_nodes().map(|node| node.into()).collect())
539					.map(|proof| ReadProof { at: block, proof })
540			})
541			.map_err(client_err)
542	}
543
544	fn storage_keys(
545		&self,
546		block: Option<Block::Hash>,
547		storage_key: PrefixedStorageKey,
548		prefix: StorageKey,
549	) -> std::result::Result<Vec<StorageKey>, Error> {
550		// TODO: Remove the `.collect`.
551		self.block_or_best(block)
552			.and_then(|block| {
553				let child_info = match ChildType::from_prefixed_key(&storage_key) {
554					Some((ChildType::ParentKeyId, storage_key)) => {
555						ChildInfo::new_default(storage_key)
556					},
557					None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
558				};
559				self.client.child_storage_keys(block, child_info, Some(&prefix), None)
560			})
561			.map(|iter| iter.collect())
562			.map_err(client_err)
563	}
564
565	fn storage_keys_paged(
566		&self,
567		block: Option<Block::Hash>,
568		storage_key: PrefixedStorageKey,
569		prefix: Option<StorageKey>,
570		count: u32,
571		start_key: Option<StorageKey>,
572	) -> std::result::Result<Vec<StorageKey>, Error> {
573		self.block_or_best(block)
574			.and_then(|block| {
575				let child_info = match ChildType::from_prefixed_key(&storage_key) {
576					Some((ChildType::ParentKeyId, storage_key)) => {
577						ChildInfo::new_default(storage_key)
578					},
579					None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
580				};
581				self.client.child_storage_keys(
582					block,
583					child_info,
584					prefix.as_ref(),
585					start_key.as_ref(),
586				)
587			})
588			.map(|iter| iter.take(count as usize).collect())
589			.map_err(client_err)
590	}
591
592	fn storage(
593		&self,
594		block: Option<Block::Hash>,
595		storage_key: PrefixedStorageKey,
596		key: StorageKey,
597	) -> std::result::Result<Option<StorageData>, Error> {
598		self.block_or_best(block)
599			.and_then(|block| {
600				let child_info = match ChildType::from_prefixed_key(&storage_key) {
601					Some((ChildType::ParentKeyId, storage_key)) => {
602						ChildInfo::new_default(storage_key)
603					},
604					None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
605				};
606				self.client.child_storage(block, &child_info, &key)
607			})
608			.map_err(client_err)
609	}
610
611	fn storage_entries(
612		&self,
613		block: Option<Block::Hash>,
614		storage_key: PrefixedStorageKey,
615		keys: Vec<StorageKey>,
616	) -> std::result::Result<Vec<Option<StorageData>>, Error> {
617		let child_info = if let Some((ChildType::ParentKeyId, storage_key)) =
618			ChildType::from_prefixed_key(&storage_key)
619		{
620			Arc::new(ChildInfo::new_default(storage_key))
621		} else {
622			return Err(client_err(sp_blockchain::Error::InvalidChildStorageKey));
623		};
624		let block = self.block_or_best(block).map_err(client_err)?;
625		let client = self.client.clone();
626
627		keys.into_iter()
628			.map(move |key| {
629				client.clone().child_storage(block, &child_info, &key).map_err(client_err)
630			})
631			.collect()
632	}
633
634	fn storage_hash(
635		&self,
636		block: Option<Block::Hash>,
637		storage_key: PrefixedStorageKey,
638		key: StorageKey,
639	) -> std::result::Result<Option<Block::Hash>, Error> {
640		self.block_or_best(block)
641			.and_then(|block| {
642				let child_info = match ChildType::from_prefixed_key(&storage_key) {
643					Some((ChildType::ParentKeyId, storage_key)) => {
644						ChildInfo::new_default(storage_key)
645					},
646					None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
647				};
648				self.client.child_storage_hash(block, &child_info, &key)
649			})
650			.map_err(client_err)
651	}
652}
653
654fn invalid_block_range<B: BlockT>(
655	from: &CachedHeaderMetadata<B>,
656	to: &CachedHeaderMetadata<B>,
657	details: String,
658) -> Error {
659	let to_string = |h: &CachedHeaderMetadata<B>| format!("{} ({:?})", h.number, h.hash);
660
661	Error::InvalidBlockRange { from: to_string(from), to: to_string(to), details }
662}
663
664fn invalid_block<B: BlockT>(from: B::Hash, to: Option<B::Hash>, details: String) -> Error {
665	Error::InvalidBlockRange { from: format!("{:?}", from), to: format!("{:?}", to), details }
666}