fireblocks_sdk/paged_client/
paged_vault.rs1use {
2 crate::{
3 Client,
4 FireblocksError,
5 apis::vaults_api::GetPagedVaultAccountsParams,
6 models::VaultAccountsPagedResponse,
7 },
8 futures::{FutureExt, Stream, StreamExt, future::BoxFuture, stream::FuturesUnordered},
9 std::{
10 pin::Pin,
11 sync::Arc,
12 task::{Context, Poll},
13 },
14};
15
16type VaultResult = std::result::Result<VaultAccountsPagedResponse, crate::FireblocksError>;
17
18pub struct VaultStream {
19 client: Arc<Client>,
20 batch: u16,
21 after: String,
22 init: bool,
23 fut: FuturesUnordered<BoxFuture<'static, VaultResult>>,
24}
25
26impl VaultStream {
28 pub fn new(client: Arc<Client>, batch: u16) -> Self {
29 Self {
30 client,
31 batch,
32 init: false,
33 after: String::new(),
34 fut: FuturesUnordered::new(),
35 }
36 }
37
38 fn build_params(&self) -> GetPagedVaultAccountsParams {
39 GetPagedVaultAccountsParams::builder()
40 .after(self.after.clone())
41 .limit(self.batch.into())
42 .build()
43 }
44}
45
46impl Stream for VaultStream {
47 type Item = VaultResult;
48
49 #[allow(clippy::cognitive_complexity)]
50 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 if !self.init {
52 tracing::debug!("init future");
53 self.init = true;
54 let client = self.client.clone();
55 let params = self.build_params();
56 let fut = async move {
57 client
58 .vaults_api()
59 .get_paged_vault_accounts(params)
60 .await
61 .map_err(|e| FireblocksError::FetchVaultsPagedError(e.to_string()))
62 }
63 .boxed();
64 self.fut.push(fut);
65 cx.waker().wake_by_ref();
66 return Poll::Pending;
67 }
68
69 tracing::trace!("check future poll");
71 match self.fut.poll_next_unpin(cx) {
72 Poll::Ready(opt) => {
73 if let Some(result) = opt {
74 match result {
75 Ok(ref va) => match &va.paging {
76 None => self.after = String::new(),
77 Some(p) => self.after = p.after.clone().unwrap_or_default(),
78 },
79 Err(e) => {
80 return Poll::Ready(Some(Err(e)));
81 }
82 }
83 return Poll::Ready(Some(result));
84 }
85 }
86 Poll::Pending => {
87 tracing::trace!("still pending");
88 cx.waker().wake_by_ref();
89 return Poll::Pending;
90 }
91 }
92
93 tracing::trace!("checking after {:#?}", self.after);
94 if self.after.is_empty() {
96 return Poll::Ready(None);
97 }
98
99 let client = self.client.clone();
100 let params = self.build_params();
101 let fut = async move {
102 client
103 .vaults_api()
104 .get_paged_vault_accounts(params)
105 .await
106 .map_err(|e| FireblocksError::FetchVaultsPagedError(e.to_string()))
107 }
108 .boxed();
109 self.fut.push(fut);
110 cx.waker().wake_by_ref();
111 Poll::Pending
112 }
113}