fireblocks_sdk/paged_client/
paged_transaction.rs1use {
2 crate::{
3 Client,
4 Epoch,
5 FireblocksError,
6 apis::{
7 ResponseContent,
8 transactions_api::{GetTransactionsError, GetTransactionsParams},
9 },
10 models::{self, ErrorSchema, TransactionResponse},
11 },
12 chrono::{TimeZone, Utc, offset::LocalResult},
13 futures::{FutureExt, Stream, StreamExt, future::BoxFuture, stream::FuturesUnordered},
14 std::{
15 pin::Pin,
16 sync::Arc,
17 task::{Context, Poll},
18 },
19};
20
21type TransactionResults = crate::Result<Vec<TransactionResponse>>;
22const fn epoch(ts: &Epoch) -> i64 {
23 ts.timestamp_millis()
24}
25
26pub struct TransactionStream {
28 client: Arc<Client>,
29 batch: u16,
30 init: bool, vault_id: i32,
32 after: Epoch,
33 is_source: bool, fut: FuturesUnordered<BoxFuture<'static, TransactionResults>>,
35}
36
37impl TransactionStream {
38 pub fn from_source(client: Arc<Client>, batch: u16, vault_id: i32, after: Epoch) -> Self {
39 Self {
40 client,
41 batch,
42 init: false,
43 vault_id,
44 after,
45 fut: FuturesUnordered::new(),
46 is_source: true,
47 }
48 }
49
50 pub fn from_dest(client: Arc<Client>, batch: u16, vault_id: i32, after: Epoch) -> Self {
51 Self {
52 client,
53 batch,
54 init: false,
55 vault_id,
56 after,
57 fut: FuturesUnordered::new(),
58 is_source: false,
59 }
60 }
61
62 fn build_params(&self) -> GetTransactionsParams {
63 let builder = GetTransactionsParams::builder()
64 .limit(self.batch.into())
65 .order_by("createdAt".to_owned())
66 .after(epoch(&self.after).to_string())
67 .status(models::TransactionStatus::Completed)
68 .sort("ASC".to_owned());
69 if self.is_source {
70 return builder.source_id(self.vault_id.to_string()).build();
71 }
72 builder.dest_id(self.vault_id.to_string()).build()
73 }
74}
75
76impl Stream for TransactionStream {
77 type Item = TransactionResults;
78
79 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80 if !self.init {
81 tracing::debug!("init tracing stream");
82 self.init = true;
83 let client = self.client.clone();
84 let params = self.build_params();
85 let fut = async move {
86 client
87 .transactions_api()
88 .get_transactions(params)
89 .await
90 .map_err(FireblocksError::FetchTransactionsError)
91 }
92 .boxed();
93 self.fut.push(fut);
94 cx.waker().wake_by_ref();
95 return Poll::Pending;
96 }
97
98 match self.fut.poll_next_unpin(cx) {
99 Poll::Ready(opt) => {
100 if let Some(result) = opt {
101 match result {
102 Ok(ref va) => {
103 if va.is_empty() {
104 return Poll::Ready(None);
105 }
106 if let Some(last) = va.last() {
107 tracing::trace!(
108 "1st after {:#?} last after {:#?}",
109 va[0].created_at,
110 last.created_at
111 );
112 if let Some(millis) = last.created_at {
113 #[allow(clippy::cast_possible_truncation)]
114 let LocalResult::Single(ts) =
115 Utc.timestamp_millis_opt((millis as i64) + 1)
116 else {
117 let entity: GetTransactionsError =
118 GetTransactionsError::DefaultResponse(ErrorSchema {
119 message: Some(format!(
120 "invalid timestamp {millis}"
121 )),
122 code: Some(-1.0),
123 });
124 let e =
125 crate::apis::Error::ResponseError(ResponseContent {
126 status: reqwest::StatusCode::GONE,
127 content: String::new(),
128 entity: Some(entity),
129 });
130 return Poll::Ready(Some(Err(
131 FireblocksError::FetchTransactionsError(e),
132 )));
133 };
134 self.after = ts;
135 }
138 }
139 }
140 Err(e) => {
141 return Poll::Ready(Some(Err(e)));
142 }
143 }
144 return Poll::Ready(Some(result));
145 }
146 }
147 Poll::Pending => {
148 cx.waker().wake_by_ref();
149 return Poll::Pending;
150 }
151 }
152
153 let client = self.client.clone();
154 let params = self.build_params();
155 let fut = async move {
156 client
157 .transactions_api()
158 .get_transactions(params)
159 .await
160 .map_err(FireblocksError::FetchTransactionsError)
161 }
162 .boxed();
163 self.fut.push(fut);
164 cx.waker().wake_by_ref();
165 Poll::Pending
166 }
167}