fuel_core/query/balance/
asset_query.rs

1use crate::graphql_api::database::ReadView;
2use fuel_core_services::stream::IntoBoxStream;
3use fuel_core_storage::{
4    Error as StorageError,
5    Result as StorageResult,
6    iter::IterDirection,
7};
8use fuel_core_types::{
9    entities::coins::{
10        CoinId,
11        CoinType,
12    },
13    fuel_tx::UtxoId,
14    fuel_types::{
15        Address,
16        AssetId,
17        Nonce,
18    },
19};
20use futures::{
21    Stream,
22    TryStreamExt,
23};
24use std::collections::HashSet;
25use tokio_stream::StreamExt;
26
27/// At least required `target` of the query per asset's `id` with `max` coins.
28#[derive(Clone)]
29pub struct AssetSpendTarget {
30    pub id: AssetId,
31    pub target: u128,
32    pub max: u16,
33    pub allow_partial: bool,
34}
35
36impl AssetSpendTarget {
37    pub fn new(id: AssetId, target: u128, max: u16, allow_partial: bool) -> Self {
38        Self {
39            id,
40            target,
41            max,
42            allow_partial,
43        }
44    }
45}
46
47#[derive(Default, Clone)]
48pub struct Exclude {
49    pub coin_ids: HashSet<CoinId>,
50}
51
52impl Exclude {
53    pub fn new(ids: Vec<CoinId>) -> Self {
54        let mut instance = Self::default();
55
56        for id in ids.into_iter() {
57            instance.coin_ids.insert(id);
58        }
59
60        instance
61    }
62
63    pub fn exclude(&mut self, coin: CoinId) {
64        self.coin_ids.insert(coin);
65    }
66
67    pub fn contains_coin(&self, id: &UtxoId) -> bool {
68        self.coin_ids.contains(&CoinId::Utxo(*id))
69    }
70
71    pub fn contains_message(&self, id: &Nonce) -> bool {
72        self.coin_ids.contains(&CoinId::Message(*id))
73    }
74}
75
76#[derive(Clone)]
77pub struct AssetsQuery<'a> {
78    pub owner: &'a Address,
79    pub allowed_assets: Option<HashSet<&'a AssetId>>,
80    pub exclude: Option<&'a Exclude>,
81    pub database: &'a ReadView,
82    pub base_asset_id: &'a AssetId,
83}
84
85impl<'a> AssetsQuery<'a> {
86    pub fn new(
87        owner: &'a Address,
88        allowed_assets: Option<HashSet<&'a AssetId>>,
89        exclude: Option<&'a Exclude>,
90        database: &'a ReadView,
91        base_asset_id: &'a AssetId,
92    ) -> Self {
93        Self {
94            owner,
95            allowed_assets,
96            exclude,
97            database,
98            base_asset_id,
99        }
100    }
101
102    fn coins_iter(mut self) -> impl Stream<Item = StorageResult<CoinType>> + 'a {
103        let allowed_assets = self.allowed_assets.take();
104        let database = self.database;
105        let stream = self
106            .database
107            .owned_coins_ids(self.owner, None, IterDirection::Forward)
108            .map(|id| id.map(CoinId::from))
109            .filter(move |result| {
110                if let Ok(id) = result {
111                    match self.exclude {
112                        Some(exclude) => !exclude.coin_ids.contains(id),
113                        _ => true,
114                    }
115                } else {
116                    true
117                }
118            })
119            .map(move |res| {
120                res.and_then(|id| {
121                    let id = if let CoinId::Utxo(id) = id {
122                        id
123                    } else {
124                        return Err(anyhow::anyhow!("The coin is not UTXO").into());
125                    };
126                    Ok(id)
127                })
128            });
129
130        futures::stream::StreamExt::chunks(stream, database.batch_size)
131            .map(|chunk| {
132                use itertools::Itertools;
133
134                let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?;
135                Ok::<_, StorageError>(chunk)
136            })
137            .try_filter_map(move |chunk| async move {
138                let chunk = database
139                    .coins(chunk)
140                    .await
141                    .map(|result| result.map(CoinType::Coin));
142                Ok(Some(futures::stream::iter(chunk)))
143            })
144            .try_flatten()
145            .filter(move |result| {
146                if let Ok(CoinType::Coin(coin)) = result {
147                    allowed_asset(&allowed_assets, &coin.asset_id)
148                } else {
149                    true
150                }
151            })
152    }
153
154    fn messages_iter(
155        &self,
156    ) -> impl Stream<Item = StorageResult<CoinType>> + 'a + use<'a> {
157        let exclude = self.exclude;
158        let database = self.database;
159        let stream = self
160            .database
161            .owned_message_ids(self.owner, None, IterDirection::Forward)
162            .map(|id| id.map(CoinId::from))
163            .filter(move |result| {
164                if let Ok(id) = result {
165                    if let Some(e) = exclude {
166                        !e.coin_ids.contains(id)
167                    } else {
168                        true
169                    }
170                } else {
171                    true
172                }
173            })
174            .map(move |res| {
175                res.and_then(|id| {
176                    let id = if let CoinId::Message(id) = id {
177                        id
178                    } else {
179                        return Err(anyhow::anyhow!("The coin is not a message").into());
180                    };
181                    Ok(id)
182                })
183            });
184
185        futures::stream::StreamExt::chunks(stream, database.batch_size)
186            .map(|chunk| {
187                use itertools::Itertools;
188
189                let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?;
190                Ok(chunk)
191            })
192            .try_filter_map(move |chunk| async move {
193                let chunk = database.messages(chunk).await;
194                Ok::<_, StorageError>(Some(futures::stream::iter(chunk)))
195            })
196            .try_flatten()
197            .filter(|result| {
198                if let Ok(message) = result {
199                    message.is_non_retryable_message()
200                } else {
201                    true
202                }
203            })
204            .map(|result| {
205                result.map(|message| {
206                    CoinType::MessageCoin(
207                        message
208                            .try_into()
209                            .expect("The checked above that message data is empty."),
210                    )
211                })
212            })
213    }
214
215    /// Returns the iterator over all valid(spendable, allowed by `exclude`) coins of the `owner`.
216    ///
217    /// # Note: The coins of different type are not grouped by the `asset_id`.
218    // TODO: Optimize this by creating an index
219    //  https://github.com/FuelLabs/fuel-core/issues/588
220    pub fn coins(self) -> impl Stream<Item = StorageResult<CoinType>> + 'a {
221        let has_base_asset = allowed_asset(&self.allowed_assets, self.base_asset_id);
222        if has_base_asset {
223            let message_iter = self.messages_iter();
224            self.coins_iter().chain(message_iter).into_boxed_ref()
225        } else {
226            self.coins_iter().into_boxed_ref()
227        }
228    }
229}
230
231#[derive(Clone)]
232pub struct AssetQuery<'a> {
233    pub owner: &'a Address,
234    pub asset: &'a AssetSpendTarget,
235    pub exclude: Option<&'a Exclude>,
236    pub database: &'a ReadView,
237    query: AssetsQuery<'a>,
238}
239
240impl<'a> AssetQuery<'a> {
241    pub fn new(
242        owner: &'a Address,
243        asset: &'a AssetSpendTarget,
244        base_asset_id: &'a AssetId,
245        exclude: Option<&'a Exclude>,
246        database: &'a ReadView,
247    ) -> Self {
248        let mut allowed = HashSet::new();
249        allowed.insert(&asset.id);
250        Self {
251            owner,
252            asset,
253            exclude,
254            database,
255            query: AssetsQuery::new(
256                owner,
257                Some(allowed),
258                exclude,
259                database,
260                base_asset_id,
261            ),
262        }
263    }
264
265    /// Returns the iterator over all valid(spendable, allowed by `exclude`) coins of the `owner`
266    /// for the `asset_id`.
267    pub fn coins(self) -> impl Stream<Item = StorageResult<CoinType>> + 'a {
268        self.query.coins()
269    }
270}
271
272fn allowed_asset(allowed_assets: &Option<HashSet<&AssetId>>, asset_id: &AssetId) -> bool {
273    allowed_assets
274        .as_ref()
275        .map(|allowed_assets| allowed_assets.contains(asset_id))
276        .unwrap_or(true)
277}