polymesh_api_client/
storage.rs

1use core::marker::PhantomData;
2
3use async_stream::try_stream;
4use futures_core::stream::Stream;
5
6#[cfg(not(feature = "std"))]
7use alloc::format;
8use codec::Decode;
9use sp_std::prelude::*;
10
11use crate::*;
12
13/// Query chain map/double map storage with a common prefix.
14///
15/// The `state_getKeysPaged` API is used to get batches of keys.
16pub struct StoragePaged<K, V> {
17  client: Client,
18  prefix: StorageKey,
19  key_hash_len: Option<usize>,
20  at: Option<BlockHash>,
21  batch_size: usize,
22  start_key: Option<StorageKey>,
23  finished: bool,
24  _phantom: PhantomData<(K, V)>,
25}
26
27impl<K: Decode, V: Decode> StoragePaged<K, V> {
28  pub fn new(
29    client: &Client,
30    prefix: StorageKey,
31    key_hash_len: Option<usize>,
32    at: Option<BlockHash>,
33  ) -> Self {
34    Self {
35      client: client.clone(),
36      prefix,
37      key_hash_len,
38      at,
39      batch_size: 10,
40      start_key: None,
41      finished: false,
42      _phantom: PhantomData::default(),
43    }
44  }
45
46  /// Change the `batch_size` (default is 10).
47  pub fn batch_size(mut self, batch_size: usize) -> Self {
48    self.batch_size = batch_size;
49    self
50  }
51
52  fn get_hashed_key<'a>(&self, key: &'a StorageKey) -> Result<&'a [u8]> {
53    let h_len = match self.key_hash_len {
54      Some(l) => l,
55      None => {
56        return Err(Error::DecodeTypeFailed(format!(
57          "Failed to decode storage key: hasher isn't reversible"
58        )));
59      }
60    };
61    let p_len = self.prefix.0.len();
62    if key.0.len() < (p_len + h_len) {
63      return Err(Error::DecodeTypeFailed(format!(
64        "Failed to decode storage key: too short"
65      )));
66    }
67    let (key_prefix, key) = key.0.split_at(p_len);
68    if key_prefix != self.prefix.0.as_slice() {
69      return Err(Error::DecodeTypeFailed(format!(
70        "Invalid storage key, the prefix doesn't match"
71      )));
72    }
73    Ok(&key[h_len..])
74  }
75
76  async fn next_page(&mut self) -> Result<Option<Vec<StorageKey>>> {
77    if self.finished {
78      return Ok(None);
79    }
80    let keys = self
81      .client
82      .get_storage_keys_paged(
83        &self.prefix,
84        self.batch_size,
85        self.start_key.as_ref(),
86        self.at,
87      )
88      .await?;
89    if keys.len() < self.batch_size {
90      self.finished = true;
91    } else {
92      self.start_key = keys.last().cloned();
93    }
94    Ok(Some(keys))
95  }
96
97  /// Async stream to get key/value pairs.
98  pub fn entries(mut self) -> impl Stream<Item = Result<(K, Option<V>)>> {
99    try_stream! {
100      while let Some(keys) = self.next_page().await? {
101        for storage_key in keys {
102          // Decode key.
103          let mut data = self.get_hashed_key(&storage_key)?;
104          let key = K::decode(&mut data)?;
105          // Get value from chain storage.
106          let value = self.client.get_storage_by_key(storage_key, self.at).await?;
107          yield (key, value);
108        }
109      }
110    }
111  }
112
113  /// Async stream to get only keys.
114  pub fn keys(mut self) -> impl Stream<Item = Result<K>> {
115    try_stream! {
116      while let Some(keys) = self.next_page().await? {
117        for key in keys {
118          let mut data = self.get_hashed_key(&key)?;
119          yield K::decode(&mut data)?;
120        }
121      }
122    }
123  }
124
125  /// Async stream to get only values.
126  pub fn values(mut self) -> impl Stream<Item = Result<Option<V>>> {
127    try_stream! {
128      while let Some(keys) = self.next_page().await? {
129        for key in keys {
130          let value = self.client.get_storage_by_key(key, self.at).await?;
131          yield value;
132        }
133      }
134    }
135  }
136}