use {
crate::{
Client,
FireblocksError,
apis::vaults_api::GetPagedVaultAccountsParams,
models::VaultAccountsPagedResponse,
},
futures::{FutureExt, Stream, StreamExt, future::BoxFuture, stream::FuturesUnordered},
std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
},
};
type VaultResult = std::result::Result<VaultAccountsPagedResponse, crate::FireblocksError>;
pub struct VaultStream {
client: Arc<Client>,
batch: u16,
after: String,
init: bool,
fut: FuturesUnordered<BoxFuture<'static, VaultResult>>,
}
impl VaultStream {
pub fn new(client: Arc<Client>, batch: u16) -> Self {
Self {
client,
batch,
init: false,
after: String::new(),
fut: FuturesUnordered::new(),
}
}
fn build_params(&self) -> GetPagedVaultAccountsParams {
GetPagedVaultAccountsParams::builder()
.after(self.after.clone())
.limit(self.batch.into())
.build()
}
}
impl Stream for VaultStream {
type Item = VaultResult;
#[allow(clippy::cognitive_complexity)]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.init {
tracing::debug!("init future");
self.init = true;
let client = self.client.clone();
let params = self.build_params();
let fut = async move {
client
.vaults_api()
.get_paged_vault_accounts(params)
.await
.map_err(|e| FireblocksError::FetchVaultsPagedError(e.to_string()))
}
.boxed();
self.fut.push(fut);
cx.waker().wake_by_ref();
return Poll::Pending;
}
tracing::trace!("check future poll");
match self.fut.poll_next_unpin(cx) {
Poll::Ready(opt) => {
if let Some(result) = opt {
match result {
Ok(ref va) => match &va.paging {
None => self.after = String::new(),
Some(p) => self.after = p.after.clone().unwrap_or_default(),
},
Err(e) => {
return Poll::Ready(Some(Err(e)));
}
}
return Poll::Ready(Some(result));
}
}
Poll::Pending => {
tracing::trace!("still pending");
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
tracing::trace!("checking after {:#?}", self.after);
if self.after.is_empty() {
return Poll::Ready(None);
}
let client = self.client.clone();
let params = self.build_params();
let fut = async move {
client
.vaults_api()
.get_paged_vault_accounts(params)
.await
.map_err(|e| FireblocksError::FetchVaultsPagedError(e.to_string()))
}
.boxed();
self.fut.push(fut);
cx.waker().wake_by_ref();
Poll::Pending
}
}