polymesh_api_client/
storage.rs1use core::marker::PhantomData;
2
3use async_stream::try_stream;
4use futures_core::stream::Stream;
5
6#[cfg(not(feature = "std"))]
7use alloc::format;
8use codec::Decode;
9use sp_std::prelude::*;
10
11use crate::*;
12
13pub struct StoragePaged<K, V> {
17 client: Client,
18 prefix: StorageKey,
19 key_hash_len: Option<usize>,
20 at: Option<BlockHash>,
21 batch_size: usize,
22 start_key: Option<StorageKey>,
23 finished: bool,
24 _phantom: PhantomData<(K, V)>,
25}
26
27impl<K: Decode, V: Decode> StoragePaged<K, V> {
28 pub fn new(
29 client: &Client,
30 prefix: StorageKey,
31 key_hash_len: Option<usize>,
32 at: Option<BlockHash>,
33 ) -> Self {
34 Self {
35 client: client.clone(),
36 prefix,
37 key_hash_len,
38 at,
39 batch_size: 10,
40 start_key: None,
41 finished: false,
42 _phantom: PhantomData::default(),
43 }
44 }
45
46 pub fn batch_size(mut self, batch_size: usize) -> Self {
48 self.batch_size = batch_size;
49 self
50 }
51
52 fn get_hashed_key<'a>(&self, key: &'a StorageKey) -> Result<&'a [u8]> {
53 let h_len = match self.key_hash_len {
54 Some(l) => l,
55 None => {
56 return Err(Error::DecodeTypeFailed(format!(
57 "Failed to decode storage key: hasher isn't reversible"
58 )));
59 }
60 };
61 let p_len = self.prefix.0.len();
62 if key.0.len() < (p_len + h_len) {
63 return Err(Error::DecodeTypeFailed(format!(
64 "Failed to decode storage key: too short"
65 )));
66 }
67 let (key_prefix, key) = key.0.split_at(p_len);
68 if key_prefix != self.prefix.0.as_slice() {
69 return Err(Error::DecodeTypeFailed(format!(
70 "Invalid storage key, the prefix doesn't match"
71 )));
72 }
73 Ok(&key[h_len..])
74 }
75
76 async fn next_page(&mut self) -> Result<Option<Vec<StorageKey>>> {
77 if self.finished {
78 return Ok(None);
79 }
80 let keys = self
81 .client
82 .get_storage_keys_paged(
83 &self.prefix,
84 self.batch_size,
85 self.start_key.as_ref(),
86 self.at,
87 )
88 .await?;
89 if keys.len() < self.batch_size {
90 self.finished = true;
91 } else {
92 self.start_key = keys.last().cloned();
93 }
94 Ok(Some(keys))
95 }
96
97 pub fn entries(mut self) -> impl Stream<Item = Result<(K, Option<V>)>> {
99 try_stream! {
100 while let Some(keys) = self.next_page().await? {
101 for storage_key in keys {
102 let mut data = self.get_hashed_key(&storage_key)?;
104 let key = K::decode(&mut data)?;
105 let value = self.client.get_storage_by_key(storage_key, self.at).await?;
107 yield (key, value);
108 }
109 }
110 }
111 }
112
113 pub fn keys(mut self) -> impl Stream<Item = Result<K>> {
115 try_stream! {
116 while let Some(keys) = self.next_page().await? {
117 for key in keys {
118 let mut data = self.get_hashed_key(&key)?;
119 yield K::decode(&mut data)?;
120 }
121 }
122 }
123 }
124
125 pub fn values(mut self) -> impl Stream<Item = Result<Option<V>>> {
127 try_stream! {
128 while let Some(keys) = self.next_page().await? {
129 for key in keys {
130 let value = self.client.get_storage_by_key(key, self.at).await?;
131 yield value;
132 }
133 }
134 }
135 }
136}