1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use core::marker::PhantomData;

use async_stream::try_stream;
use futures_core::stream::Stream;

#[cfg(not(feature = "std"))]
use alloc::format;
use codec::Decode;
use sp_std::prelude::*;

use crate::*;

/// Query chain map/double map storage with a common prefix.
///
/// The `state_getKeysPaged` API is used to get batches of keys.
pub struct StoragePaged<K, V> {
  client: Client,
  prefix: StorageKey,
  key_hash_len: Option<usize>,
  at: Option<BlockHash>,
  batch_size: usize,
  start_key: Option<StorageKey>,
  finished: bool,
  _phantom: PhantomData<(K, V)>,
}

impl<K: Decode, V: Decode> StoragePaged<K, V> {
  pub fn new(
    client: &Client,
    prefix: StorageKey,
    key_hash_len: Option<usize>,
    at: Option<BlockHash>,
  ) -> Self {
    Self {
      client: client.clone(),
      prefix,
      key_hash_len,
      at,
      batch_size: 10,
      start_key: None,
      finished: false,
      _phantom: PhantomData::default(),
    }
  }

  /// Change the `batch_size` (default is 10).
  pub fn batch_size(mut self, batch_size: usize) -> Self {
    self.batch_size = batch_size;
    self
  }

  fn get_hashed_key<'a>(&self, key: &'a StorageKey) -> Result<&'a [u8]> {
    let h_len = match self.key_hash_len {
      Some(l) => l,
      None => {
        return Err(Error::DecodeTypeFailed(format!(
          "Failed to decode storage key: hasher isn't reversible"
        )));
      }
    };
    let p_len = self.prefix.0.len();
    if key.0.len() < (p_len + h_len) {
      return Err(Error::DecodeTypeFailed(format!(
        "Failed to decode storage key: too short"
      )));
    }
    let (key_prefix, key) = key.0.split_at(p_len);
    if key_prefix != self.prefix.0.as_slice() {
      return Err(Error::DecodeTypeFailed(format!(
        "Invalid storage key, the prefix doesn't match"
      )));
    }
    Ok(&key[h_len..])
  }

  async fn next_page(&mut self) -> Result<Option<Vec<StorageKey>>> {
    if self.finished {
      return Ok(None);
    }
    let keys = self
      .client
      .get_storage_keys_paged(
        &self.prefix,
        self.batch_size,
        self.start_key.as_ref(),
        self.at,
      )
      .await?;
    if keys.len() < self.batch_size {
      self.finished = true;
    } else {
      self.start_key = keys.last().cloned();
    }
    Ok(Some(keys))
  }

  /// Async stream to get key/value pairs.
  pub fn entries(mut self) -> impl Stream<Item = Result<(K, Option<V>)>> {
    try_stream! {
      while let Some(keys) = self.next_page().await? {
        for storage_key in keys {
          // Decode key.
          let mut data = self.get_hashed_key(&storage_key)?;
          let key = K::decode(&mut data)?;
          // Get value from chain storage.
          let value = self.client.get_storage_by_key(storage_key, self.at).await?;
          yield (key, value);
        }
      }
    }
  }

  /// Async stream to get only keys.
  pub fn keys(mut self) -> impl Stream<Item = Result<K>> {
    try_stream! {
      while let Some(keys) = self.next_page().await? {
        for key in keys {
          let mut data = self.get_hashed_key(&key)?;
          yield K::decode(&mut data)?;
        }
      }
    }
  }

  /// Async stream to get only values.
  pub fn values(mut self) -> impl Stream<Item = Result<Option<V>>> {
    try_stream! {
      while let Some(keys) = self.next_page().await? {
        for key in keys {
          let value = self.client.get_storage_by_key(key, self.at).await?;
          yield value;
        }
      }
    }
  }
}