fireblocks_sdk/paged_client/
paged_vault.rs

1use {
2    crate::{
3        Client,
4        FireblocksError,
5        apis::vaults_api::GetPagedVaultAccountsParams,
6        models::VaultAccountsPagedResponse,
7    },
8    futures::{FutureExt, Stream, StreamExt, future::BoxFuture, stream::FuturesUnordered},
9    std::{
10        pin::Pin,
11        sync::Arc,
12        task::{Context, Poll},
13    },
14};
15
16type VaultResult = std::result::Result<VaultAccountsPagedResponse, crate::FireblocksError>;
17
18pub struct VaultStream {
19    client: Arc<Client>,
20    batch: u16,
21    after: String,
22    init: bool,
23    fut: FuturesUnordered<BoxFuture<'static, VaultResult>>,
24}
25
26/// Stream all vault accounts in batches
27impl VaultStream {
28    pub fn new(client: Arc<Client>, batch: u16) -> Self {
29        Self {
30            client,
31            batch,
32            init: false,
33            after: String::new(),
34            fut: FuturesUnordered::new(),
35        }
36    }
37
38    fn build_params(&self) -> GetPagedVaultAccountsParams {
39        GetPagedVaultAccountsParams::builder()
40            .after(self.after.clone())
41            .limit(self.batch.into())
42            .build()
43    }
44}
45
46impl Stream for VaultStream {
47    type Item = VaultResult;
48
49    #[allow(clippy::cognitive_complexity)]
50    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        if !self.init {
52            tracing::debug!("init future");
53            self.init = true;
54            let client = self.client.clone();
55            let params = self.build_params();
56            let fut = async move {
57                client
58                    .vaults_api()
59                    .get_paged_vault_accounts(params)
60                    .await
61                    .map_err(|e| FireblocksError::FetchVaultsPagedError(e.to_string()))
62            }
63            .boxed();
64            self.fut.push(fut);
65            cx.waker().wake_by_ref();
66            return Poll::Pending;
67        }
68
69        // Try to resolve any existing futures first
70        tracing::trace!("check future poll");
71        match self.fut.poll_next_unpin(cx) {
72            Poll::Ready(opt) => {
73                if let Some(result) = opt {
74                    match result {
75                        Ok(ref va) => match &va.paging {
76                            None => self.after = String::new(),
77                            Some(p) => self.after = p.after.clone().unwrap_or_default(),
78                        },
79                        Err(e) => {
80                            return Poll::Ready(Some(Err(e)));
81                        }
82                    }
83                    return Poll::Ready(Some(result));
84                }
85            }
86            Poll::Pending => {
87                tracing::trace!("still pending");
88                cx.waker().wake_by_ref();
89                return Poll::Pending;
90            }
91        }
92
93        tracing::trace!("checking after {:#?}", self.after);
94        // If there are no more pages to fetch and no pending futures, end the stream
95        if self.after.is_empty() {
96            return Poll::Ready(None);
97        }
98
99        let client = self.client.clone();
100        let params = self.build_params();
101        let fut = async move {
102            client
103                .vaults_api()
104                .get_paged_vault_accounts(params)
105                .await
106                .map_err(|e| FireblocksError::FetchVaultsPagedError(e.to_string()))
107        }
108        .boxed();
109        self.fut.push(fut);
110        cx.waker().wake_by_ref();
111        Poll::Pending
112    }
113}