1use std::{collections::HashMap, marker::PhantomData, sync::Arc, time::Duration};
22
23use super::{
24 client_err,
25 error::{Error, Result},
26 ChildStateBackend, StateBackend,
27};
28use crate::{
29 utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
30 DenyUnsafe, SubscriptionTaskExecutor,
31};
32
33use futures::{future, stream, StreamExt};
34use jsonrpsee::{core::async_trait, types::ErrorObject, PendingSubscriptionSink};
35use sc_client_api::{
36 Backend, BlockBackend, BlockchainEvents, CallExecutor, ExecutorProvider, ProofProvider,
37 StorageProvider,
38};
39use sc_rpc_api::state::ReadProof;
40use sc_tracing::block::TracingExecuteBlock;
41use sp_api::{CallApiAt, Metadata, ProvideRuntimeApi};
42use sp_blockchain::{
43 CachedHeaderMetadata, Error as ClientError, HeaderBackend, HeaderMetadata,
44 Result as ClientResult,
45};
46use sp_core::{
47 storage::{
48 ChildInfo, ChildType, PrefixedStorageKey, StorageChangeSet, StorageData, StorageKey,
49 },
50 traits::CallContext,
51 Bytes,
52};
53use sp_runtime::traits::Block as BlockT;
54use sp_version::RuntimeVersion;
55
56const MAXIMUM_SAFE_RPC_CALL_TIMEOUT: Duration = Duration::from_secs(30);
58
59struct QueryStorageRange<Block: BlockT> {
61 pub hashes: Vec<Block::Hash>,
63}
64
65pub struct FullState<BE, Block: BlockT, Client> {
67 client: Arc<Client>,
68 executor: SubscriptionTaskExecutor,
69 execute_block: Option<Arc<dyn TracingExecuteBlock<Block>>>,
70 _phantom: PhantomData<BE>,
71}
72
73impl<BE, Block: BlockT, Client> FullState<BE, Block, Client>
74where
75 BE: Backend<Block>,
76 Client: StorageProvider<Block, BE>
77 + HeaderBackend<Block>
78 + BlockBackend<Block>
79 + HeaderMetadata<Block, Error = sp_blockchain::Error>,
80 Block: BlockT + 'static,
81{
82 pub fn new(
84 client: Arc<Client>,
85 executor: SubscriptionTaskExecutor,
86 execute_block: Option<Arc<dyn TracingExecuteBlock<Block>>>,
87 ) -> Self {
88 Self { client, executor, execute_block, _phantom: PhantomData }
89 }
90
91 fn block_or_best(&self, hash: Option<Block::Hash>) -> ClientResult<Block::Hash> {
93 Ok(hash.unwrap_or_else(|| self.client.info().best_hash))
94 }
95
96 fn query_storage_range(
98 &self,
99 from: Block::Hash,
100 to: Option<Block::Hash>,
101 ) -> Result<QueryStorageRange<Block>> {
102 let to = self
103 .block_or_best(to)
104 .map_err(|e| invalid_block::<Block>(from, to, e.to_string()))?;
105
106 let invalid_block_err =
107 |e: ClientError| invalid_block::<Block>(from, Some(to), e.to_string());
108 let from_meta = self.client.header_metadata(from).map_err(invalid_block_err)?;
109 let to_meta = self.client.header_metadata(to).map_err(invalid_block_err)?;
110
111 if from_meta.number > to_meta.number {
112 return Err(invalid_block_range(
113 &from_meta,
114 &to_meta,
115 "from number > to number".to_owned(),
116 ));
117 }
118
119 let from_number = from_meta.number;
121 let hashes = {
122 let mut hashes = vec![to_meta.hash];
123 let mut last = to_meta.clone();
124 while last.number > from_number {
125 let header_metadata = self
126 .client
127 .header_metadata(last.parent)
128 .map_err(|e| invalid_block_range::<Block>(&last, &to_meta, e.to_string()))?;
129 hashes.push(header_metadata.hash);
130 last = header_metadata;
131 }
132 if last.hash != from_meta.hash {
133 return Err(invalid_block_range(
134 &from_meta,
135 &to_meta,
136 "from and to are on different forks".to_owned(),
137 ));
138 }
139 hashes.reverse();
140 hashes
141 };
142
143 Ok(QueryStorageRange { hashes })
144 }
145
146 fn query_storage_unfiltered(
148 &self,
149 range: &QueryStorageRange<Block>,
150 keys: &[StorageKey],
151 last_values: &mut HashMap<StorageKey, Option<StorageData>>,
152 changes: &mut Vec<StorageChangeSet<Block::Hash>>,
153 ) -> Result<()> {
154 for block_hash in &range.hashes {
155 let mut block_changes = StorageChangeSet { block: *block_hash, changes: Vec::new() };
156 for key in keys {
157 let (has_changed, data) = {
158 let curr_data = self.client.storage(*block_hash, key).map_err(client_err)?;
159 match last_values.get(key) {
160 Some(prev_data) => (curr_data != *prev_data, curr_data),
161 None => (true, curr_data),
162 }
163 };
164 if has_changed {
165 block_changes.changes.push((key.clone(), data.clone()));
166 }
167 last_values.insert(key.clone(), data);
168 }
169 if !block_changes.changes.is_empty() {
170 changes.push(block_changes);
171 }
172 }
173 Ok(())
174 }
175}
176
177#[async_trait]
178impl<BE, Block, Client> StateBackend<Block, Client> for FullState<BE, Block, Client>
179where
180 Block: BlockT + 'static,
181 Block::Hash: Unpin,
182 BE: Backend<Block> + 'static,
183 Client: ExecutorProvider<Block>
184 + StorageProvider<Block, BE>
185 + ProofProvider<Block>
186 + HeaderBackend<Block>
187 + HeaderMetadata<Block, Error = sp_blockchain::Error>
188 + BlockchainEvents<Block>
189 + CallApiAt<Block>
190 + ProvideRuntimeApi<Block>
191 + BlockBackend<Block>
192 + Send
193 + Sync
194 + 'static,
195 Client::Api: Metadata<Block>,
196{
197 fn call(
198 &self,
199 block: Option<Block::Hash>,
200 method: String,
201 call_data: Bytes,
202 ) -> std::result::Result<Bytes, Error> {
203 self.block_or_best(block)
204 .and_then(|block| {
205 self.client
206 .executor()
207 .call(block, &method, &call_data, CallContext::Offchain)
208 .map(Into::into)
209 })
210 .map_err(client_err)
211 }
212
213 fn storage_keys(
215 &self,
216 block: Option<Block::Hash>,
217 prefix: StorageKey,
218 ) -> std::result::Result<Vec<StorageKey>, Error> {
219 self.block_or_best(block)
221 .and_then(|block| self.client.storage_keys(block, Some(&prefix), None))
222 .map(|iter| iter.collect())
223 .map_err(client_err)
224 }
225
226 fn storage_pairs(
228 &self,
229 block: Option<Block::Hash>,
230 prefix: StorageKey,
231 ) -> std::result::Result<Vec<(StorageKey, StorageData)>, Error> {
232 self.block_or_best(block)
234 .and_then(|block| self.client.storage_pairs(block, Some(&prefix), None))
235 .map(|iter| iter.collect())
236 .map_err(client_err)
237 }
238
239 fn storage_keys_paged(
240 &self,
241 block: Option<Block::Hash>,
242 prefix: Option<StorageKey>,
243 count: u32,
244 start_key: Option<StorageKey>,
245 ) -> std::result::Result<Vec<StorageKey>, Error> {
246 self.block_or_best(block)
247 .and_then(|block| self.client.storage_keys(block, prefix.as_ref(), start_key.as_ref()))
248 .map(|iter| iter.take(count as usize).collect())
249 .map_err(client_err)
250 }
251
252 fn storage(
253 &self,
254 block: Option<Block::Hash>,
255 key: StorageKey,
256 ) -> std::result::Result<Option<StorageData>, Error> {
257 self.block_or_best(block)
258 .and_then(|block| self.client.storage(block, &key))
259 .map_err(client_err)
260 }
261
262 async fn storage_size(
263 &self,
264 block: Option<Block::Hash>,
265 key: StorageKey,
266 deny_unsafe: DenyUnsafe,
267 ) -> std::result::Result<Option<u64>, Error> {
268 let block = match self.block_or_best(block) {
269 Ok(b) => b,
270 Err(e) => return Err(client_err(e)),
271 };
272
273 let client = self.client.clone();
274 let timeout = match deny_unsafe {
275 DenyUnsafe::Yes => Some(MAXIMUM_SAFE_RPC_CALL_TIMEOUT),
276 DenyUnsafe::No => None,
277 };
278
279 super::utils::spawn_blocking_with_timeout(timeout, move |is_timed_out| {
280 match client.storage(block, &key) {
282 Ok(Some(d)) => return Ok(Ok(Some(d.0.len() as u64))),
283 Err(e) => return Ok(Err(client_err(e))),
284 Ok(None) => {},
285 }
286
287 let iter = match client.storage_keys(block, Some(&key), None).map_err(client_err) {
289 Ok(iter) => iter,
290 Err(e) => return Ok(Err(e)),
291 };
292
293 let mut sum = 0;
294 for storage_key in iter {
295 let value = client.storage(block, &storage_key).ok().flatten().unwrap_or_default();
296 sum += value.0.len() as u64;
297
298 is_timed_out.check_if_timed_out()?;
299 }
300
301 if sum > 0 {
302 Ok(Ok(Some(sum)))
303 } else {
304 Ok(Ok(None))
305 }
306 })
307 .await
308 .map_err(|error| Error::Client(Box::new(error)))?
309 }
310
311 fn storage_hash(
312 &self,
313 block: Option<Block::Hash>,
314 key: StorageKey,
315 ) -> std::result::Result<Option<Block::Hash>, Error> {
316 self.block_or_best(block)
317 .and_then(|block| self.client.storage_hash(block, &key))
318 .map_err(client_err)
319 }
320
321 fn metadata(&self, block: Option<Block::Hash>) -> std::result::Result<Bytes, Error> {
322 self.block_or_best(block).map_err(client_err).and_then(|block| {
323 self.client
324 .runtime_api()
325 .metadata(block)
326 .map(Into::into)
327 .map_err(|e| Error::Client(Box::new(e)))
328 })
329 }
330
331 fn runtime_version(
332 &self,
333 block: Option<Block::Hash>,
334 ) -> std::result::Result<RuntimeVersion, Error> {
335 self.block_or_best(block).map_err(client_err).and_then(|block| {
336 self.client
337 .runtime_version_at(block, CallContext::Offchain)
338 .map_err(|e| Error::Client(Box::new(e)))
339 })
340 }
341
342 fn query_storage(
343 &self,
344 from: Block::Hash,
345 to: Option<Block::Hash>,
346 keys: Vec<StorageKey>,
347 ) -> std::result::Result<Vec<StorageChangeSet<Block::Hash>>, Error> {
348 let call_fn = move || {
349 let range = self.query_storage_range(from, to)?;
350 let mut changes = Vec::new();
351 let mut last_values = HashMap::new();
352 self.query_storage_unfiltered(&range, &keys, &mut last_values, &mut changes)?;
353 Ok(changes)
354 };
355 call_fn()
356 }
357
358 fn query_storage_at(
359 &self,
360 keys: Vec<StorageKey>,
361 at: Option<Block::Hash>,
362 ) -> std::result::Result<Vec<StorageChangeSet<Block::Hash>>, Error> {
363 let at = at.unwrap_or_else(|| self.client.info().best_hash);
364 self.query_storage(at, Some(at), keys)
365 }
366
367 fn read_proof(
368 &self,
369 block: Option<Block::Hash>,
370 keys: Vec<StorageKey>,
371 ) -> std::result::Result<ReadProof<Block::Hash>, Error> {
372 self.block_or_best(block)
373 .and_then(|block| {
374 self.client
375 .read_proof(block, &mut keys.iter().map(|key| key.0.as_ref()))
376 .map(|proof| proof.into_iter_nodes().map(|node| node.into()).collect())
377 .map(|proof| ReadProof { at: block, proof })
378 })
379 .map_err(client_err)
380 }
381
382 fn subscribe_runtime_version(&self, pending: PendingSubscriptionSink) {
383 let initial = match self
384 .block_or_best(None)
385 .and_then(|block| {
386 self.client.runtime_version_at(block, CallContext::Offchain).map_err(Into::into)
387 })
388 .map_err(|e| Error::Client(Box::new(e)))
389 {
390 Ok(initial) => initial,
391 Err(e) => {
392 spawn_subscription_task(&self.executor, pending.reject(e));
393 return;
394 },
395 };
396
397 let mut previous_version = initial.clone();
398 let client = self.client.clone();
399
400 let version_stream = client
402 .import_notification_stream()
403 .filter(|n| future::ready(n.is_new_best))
404 .filter_map(move |n| {
405 let version = client
406 .runtime_version_at(n.hash, CallContext::Offchain)
407 .map_err(|e| Error::Client(Box::new(e)));
408
409 match version {
410 Ok(version) if version != previous_version => {
411 previous_version = version.clone();
412 future::ready(Some(version))
413 },
414 _ => future::ready(None),
415 }
416 });
417
418 let stream = futures::stream::once(future::ready(initial)).chain(version_stream);
419 spawn_subscription_task(
420 &self.executor,
421 PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
422 );
423 }
424
425 fn subscribe_storage(
426 &self,
427 pending: PendingSubscriptionSink,
428 keys: Option<Vec<StorageKey>>,
429 deny_unsafe: DenyUnsafe,
430 ) {
431 if keys.is_none() {
432 if let Err(err) = deny_unsafe.check_if_safe() {
433 spawn_subscription_task(&self.executor, pending.reject(ErrorObject::from(err)));
434 return;
435 }
436 }
437
438 let stream = match self.client.storage_changes_notification_stream(keys.as_deref(), None) {
439 Ok(stream) => stream,
440 Err(blockchain_err) => {
441 spawn_subscription_task(
442 &self.executor,
443 pending.reject(Error::Client(Box::new(blockchain_err))),
444 );
445 return;
446 },
447 };
448
449 let initial = stream::iter(keys.map(|keys| {
450 let block = self.client.info().best_hash;
451 let changes = keys
452 .into_iter()
453 .map(|key| {
454 let v = self.client.storage(block, &key).ok().flatten();
455 (key, v)
456 })
457 .collect();
458 StorageChangeSet { block, changes }
459 }));
460
461 let storage_stream = stream.map(|storage_notif| StorageChangeSet {
462 block: storage_notif.block,
463 changes: storage_notif
464 .changes
465 .iter()
466 .filter_map(|(o_sk, k, v)| o_sk.is_none().then(|| (k.clone(), v.cloned())))
467 .collect(),
468 });
469
470 let stream = initial
471 .chain(storage_stream)
472 .filter(|storage| future::ready(!storage.changes.is_empty()));
473
474 spawn_subscription_task(
475 &self.executor,
476 PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
477 );
478 }
479
480 fn trace_block(
481 &self,
482 block: Block::Hash,
483 targets: Option<String>,
484 storage_keys: Option<String>,
485 methods: Option<String>,
486 ) -> std::result::Result<sp_rpc::tracing::TraceBlockResponse, Error> {
487 sc_tracing::block::BlockExecutor::new(
488 self.client.clone(),
489 block,
490 targets,
491 storage_keys,
492 methods,
493 self.execute_block.clone(),
494 )
495 .trace_block()
496 .map_err(|e| invalid_block::<Block>(block, None, e.to_string()))
497 }
498}
499
500impl<BE, Block, Client> ChildStateBackend<Block, Client> for FullState<BE, Block, Client>
501where
502 Block: BlockT + 'static,
503 BE: Backend<Block> + 'static,
504 Client: ExecutorProvider<Block>
505 + StorageProvider<Block, BE>
506 + ProofProvider<Block>
507 + HeaderBackend<Block>
508 + BlockBackend<Block>
509 + HeaderMetadata<Block, Error = sp_blockchain::Error>
510 + BlockchainEvents<Block>
511 + CallApiAt<Block>
512 + ProvideRuntimeApi<Block>
513 + Send
514 + Sync
515 + 'static,
516 Client::Api: Metadata<Block>,
517{
518 fn read_child_proof(
519 &self,
520 block: Option<Block::Hash>,
521 storage_key: PrefixedStorageKey,
522 keys: Vec<StorageKey>,
523 ) -> std::result::Result<ReadProof<Block::Hash>, Error> {
524 self.block_or_best(block)
525 .and_then(|block| {
526 let child_info = match ChildType::from_prefixed_key(&storage_key) {
527 Some((ChildType::ParentKeyId, storage_key)) => {
528 ChildInfo::new_default(storage_key)
529 },
530 None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
531 };
532 self.client
533 .read_child_proof(
534 block,
535 &child_info,
536 &mut keys.iter().map(|key| key.0.as_ref()),
537 )
538 .map(|proof| proof.into_iter_nodes().map(|node| node.into()).collect())
539 .map(|proof| ReadProof { at: block, proof })
540 })
541 .map_err(client_err)
542 }
543
544 fn storage_keys(
545 &self,
546 block: Option<Block::Hash>,
547 storage_key: PrefixedStorageKey,
548 prefix: StorageKey,
549 ) -> std::result::Result<Vec<StorageKey>, Error> {
550 self.block_or_best(block)
552 .and_then(|block| {
553 let child_info = match ChildType::from_prefixed_key(&storage_key) {
554 Some((ChildType::ParentKeyId, storage_key)) => {
555 ChildInfo::new_default(storage_key)
556 },
557 None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
558 };
559 self.client.child_storage_keys(block, child_info, Some(&prefix), None)
560 })
561 .map(|iter| iter.collect())
562 .map_err(client_err)
563 }
564
565 fn storage_keys_paged(
566 &self,
567 block: Option<Block::Hash>,
568 storage_key: PrefixedStorageKey,
569 prefix: Option<StorageKey>,
570 count: u32,
571 start_key: Option<StorageKey>,
572 ) -> std::result::Result<Vec<StorageKey>, Error> {
573 self.block_or_best(block)
574 .and_then(|block| {
575 let child_info = match ChildType::from_prefixed_key(&storage_key) {
576 Some((ChildType::ParentKeyId, storage_key)) => {
577 ChildInfo::new_default(storage_key)
578 },
579 None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
580 };
581 self.client.child_storage_keys(
582 block,
583 child_info,
584 prefix.as_ref(),
585 start_key.as_ref(),
586 )
587 })
588 .map(|iter| iter.take(count as usize).collect())
589 .map_err(client_err)
590 }
591
592 fn storage(
593 &self,
594 block: Option<Block::Hash>,
595 storage_key: PrefixedStorageKey,
596 key: StorageKey,
597 ) -> std::result::Result<Option<StorageData>, Error> {
598 self.block_or_best(block)
599 .and_then(|block| {
600 let child_info = match ChildType::from_prefixed_key(&storage_key) {
601 Some((ChildType::ParentKeyId, storage_key)) => {
602 ChildInfo::new_default(storage_key)
603 },
604 None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
605 };
606 self.client.child_storage(block, &child_info, &key)
607 })
608 .map_err(client_err)
609 }
610
611 fn storage_entries(
612 &self,
613 block: Option<Block::Hash>,
614 storage_key: PrefixedStorageKey,
615 keys: Vec<StorageKey>,
616 ) -> std::result::Result<Vec<Option<StorageData>>, Error> {
617 let child_info = if let Some((ChildType::ParentKeyId, storage_key)) =
618 ChildType::from_prefixed_key(&storage_key)
619 {
620 Arc::new(ChildInfo::new_default(storage_key))
621 } else {
622 return Err(client_err(sp_blockchain::Error::InvalidChildStorageKey));
623 };
624 let block = self.block_or_best(block).map_err(client_err)?;
625 let client = self.client.clone();
626
627 keys.into_iter()
628 .map(move |key| {
629 client.clone().child_storage(block, &child_info, &key).map_err(client_err)
630 })
631 .collect()
632 }
633
634 fn storage_hash(
635 &self,
636 block: Option<Block::Hash>,
637 storage_key: PrefixedStorageKey,
638 key: StorageKey,
639 ) -> std::result::Result<Option<Block::Hash>, Error> {
640 self.block_or_best(block)
641 .and_then(|block| {
642 let child_info = match ChildType::from_prefixed_key(&storage_key) {
643 Some((ChildType::ParentKeyId, storage_key)) => {
644 ChildInfo::new_default(storage_key)
645 },
646 None => return Err(sp_blockchain::Error::InvalidChildStorageKey),
647 };
648 self.client.child_storage_hash(block, &child_info, &key)
649 })
650 .map_err(client_err)
651 }
652}
653
654fn invalid_block_range<B: BlockT>(
655 from: &CachedHeaderMetadata<B>,
656 to: &CachedHeaderMetadata<B>,
657 details: String,
658) -> Error {
659 let to_string = |h: &CachedHeaderMetadata<B>| format!("{} ({:?})", h.number, h.hash);
660
661 Error::InvalidBlockRange { from: to_string(from), to: to_string(to), details }
662}
663
664fn invalid_block<B: BlockT>(from: B::Hash, to: Option<B::Hash>, details: String) -> Error {
665 Error::InvalidBlockRange { from: format!("{:?}", from), to: format!("{:?}", to), details }
666}