fuel_core/query/balance/
asset_query.rs1use crate::graphql_api::database::ReadView;
2use fuel_core_services::stream::IntoBoxStream;
3use fuel_core_storage::{
4 Error as StorageError,
5 Result as StorageResult,
6 iter::IterDirection,
7};
8use fuel_core_types::{
9 entities::coins::{
10 CoinId,
11 CoinType,
12 },
13 fuel_tx::UtxoId,
14 fuel_types::{
15 Address,
16 AssetId,
17 Nonce,
18 },
19};
20use futures::{
21 Stream,
22 TryStreamExt,
23};
24use std::collections::HashSet;
25use tokio_stream::StreamExt;
26
27#[derive(Clone)]
29pub struct AssetSpendTarget {
30 pub id: AssetId,
31 pub target: u128,
32 pub max: u16,
33 pub allow_partial: bool,
34}
35
36impl AssetSpendTarget {
37 pub fn new(id: AssetId, target: u128, max: u16, allow_partial: bool) -> Self {
38 Self {
39 id,
40 target,
41 max,
42 allow_partial,
43 }
44 }
45}
46
47#[derive(Default, Clone)]
48pub struct Exclude {
49 pub coin_ids: HashSet<CoinId>,
50}
51
52impl Exclude {
53 pub fn new(ids: Vec<CoinId>) -> Self {
54 let mut instance = Self::default();
55
56 for id in ids.into_iter() {
57 instance.coin_ids.insert(id);
58 }
59
60 instance
61 }
62
63 pub fn exclude(&mut self, coin: CoinId) {
64 self.coin_ids.insert(coin);
65 }
66
67 pub fn contains_coin(&self, id: &UtxoId) -> bool {
68 self.coin_ids.contains(&CoinId::Utxo(*id))
69 }
70
71 pub fn contains_message(&self, id: &Nonce) -> bool {
72 self.coin_ids.contains(&CoinId::Message(*id))
73 }
74}
75
76#[derive(Clone)]
77pub struct AssetsQuery<'a> {
78 pub owner: &'a Address,
79 pub allowed_assets: Option<HashSet<&'a AssetId>>,
80 pub exclude: Option<&'a Exclude>,
81 pub database: &'a ReadView,
82 pub base_asset_id: &'a AssetId,
83}
84
85impl<'a> AssetsQuery<'a> {
86 pub fn new(
87 owner: &'a Address,
88 allowed_assets: Option<HashSet<&'a AssetId>>,
89 exclude: Option<&'a Exclude>,
90 database: &'a ReadView,
91 base_asset_id: &'a AssetId,
92 ) -> Self {
93 Self {
94 owner,
95 allowed_assets,
96 exclude,
97 database,
98 base_asset_id,
99 }
100 }
101
102 fn coins_iter(mut self) -> impl Stream<Item = StorageResult<CoinType>> + 'a {
103 let allowed_assets = self.allowed_assets.take();
104 let database = self.database;
105 let stream = self
106 .database
107 .owned_coins_ids(self.owner, None, IterDirection::Forward)
108 .map(|id| id.map(CoinId::from))
109 .filter(move |result| {
110 if let Ok(id) = result {
111 match self.exclude {
112 Some(exclude) => !exclude.coin_ids.contains(id),
113 _ => true,
114 }
115 } else {
116 true
117 }
118 })
119 .map(move |res| {
120 res.and_then(|id| {
121 let id = if let CoinId::Utxo(id) = id {
122 id
123 } else {
124 return Err(anyhow::anyhow!("The coin is not UTXO").into());
125 };
126 Ok(id)
127 })
128 });
129
130 futures::stream::StreamExt::chunks(stream, database.batch_size)
131 .map(|chunk| {
132 use itertools::Itertools;
133
134 let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?;
135 Ok::<_, StorageError>(chunk)
136 })
137 .try_filter_map(move |chunk| async move {
138 let chunk = database
139 .coins(chunk)
140 .await
141 .map(|result| result.map(CoinType::Coin));
142 Ok(Some(futures::stream::iter(chunk)))
143 })
144 .try_flatten()
145 .filter(move |result| {
146 if let Ok(CoinType::Coin(coin)) = result {
147 allowed_asset(&allowed_assets, &coin.asset_id)
148 } else {
149 true
150 }
151 })
152 }
153
154 fn messages_iter(
155 &self,
156 ) -> impl Stream<Item = StorageResult<CoinType>> + 'a + use<'a> {
157 let exclude = self.exclude;
158 let database = self.database;
159 let stream = self
160 .database
161 .owned_message_ids(self.owner, None, IterDirection::Forward)
162 .map(|id| id.map(CoinId::from))
163 .filter(move |result| {
164 if let Ok(id) = result {
165 if let Some(e) = exclude {
166 !e.coin_ids.contains(id)
167 } else {
168 true
169 }
170 } else {
171 true
172 }
173 })
174 .map(move |res| {
175 res.and_then(|id| {
176 let id = if let CoinId::Message(id) = id {
177 id
178 } else {
179 return Err(anyhow::anyhow!("The coin is not a message").into());
180 };
181 Ok(id)
182 })
183 });
184
185 futures::stream::StreamExt::chunks(stream, database.batch_size)
186 .map(|chunk| {
187 use itertools::Itertools;
188
189 let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?;
190 Ok(chunk)
191 })
192 .try_filter_map(move |chunk| async move {
193 let chunk = database.messages(chunk).await;
194 Ok::<_, StorageError>(Some(futures::stream::iter(chunk)))
195 })
196 .try_flatten()
197 .filter(|result| {
198 if let Ok(message) = result {
199 message.is_non_retryable_message()
200 } else {
201 true
202 }
203 })
204 .map(|result| {
205 result.map(|message| {
206 CoinType::MessageCoin(
207 message
208 .try_into()
209 .expect("The checked above that message data is empty."),
210 )
211 })
212 })
213 }
214
215 pub fn coins(self) -> impl Stream<Item = StorageResult<CoinType>> + 'a {
221 let has_base_asset = allowed_asset(&self.allowed_assets, self.base_asset_id);
222 if has_base_asset {
223 let message_iter = self.messages_iter();
224 self.coins_iter().chain(message_iter).into_boxed_ref()
225 } else {
226 self.coins_iter().into_boxed_ref()
227 }
228 }
229}
230
231#[derive(Clone)]
232pub struct AssetQuery<'a> {
233 pub owner: &'a Address,
234 pub asset: &'a AssetSpendTarget,
235 pub exclude: Option<&'a Exclude>,
236 pub database: &'a ReadView,
237 query: AssetsQuery<'a>,
238}
239
240impl<'a> AssetQuery<'a> {
241 pub fn new(
242 owner: &'a Address,
243 asset: &'a AssetSpendTarget,
244 base_asset_id: &'a AssetId,
245 exclude: Option<&'a Exclude>,
246 database: &'a ReadView,
247 ) -> Self {
248 let mut allowed = HashSet::new();
249 allowed.insert(&asset.id);
250 Self {
251 owner,
252 asset,
253 exclude,
254 database,
255 query: AssetsQuery::new(
256 owner,
257 Some(allowed),
258 exclude,
259 database,
260 base_asset_id,
261 ),
262 }
263 }
264
265 pub fn coins(self) -> impl Stream<Item = StorageResult<CoinType>> + 'a {
268 self.query.coins()
269 }
270}
271
272fn allowed_asset(allowed_assets: &Option<HashSet<&AssetId>>, asset_id: &AssetId) -> bool {
273 allowed_assets
274 .as_ref()
275 .map(|allowed_assets| allowed_assets.contains(asset_id))
276 .unwrap_or(true)
277}