fireblocks_sdk/paged_client/
paged_transaction.rs

1use {
2    crate::{
3        Client,
4        Epoch,
5        FireblocksError,
6        apis::{
7            ResponseContent,
8            transactions_api::{GetTransactionsError, GetTransactionsParams},
9        },
10        models::{self, ErrorSchema, TransactionResponse},
11    },
12    chrono::{TimeZone, Utc, offset::LocalResult},
13    futures::{FutureExt, Stream, StreamExt, future::BoxFuture, stream::FuturesUnordered},
14    std::{
15        pin::Pin,
16        sync::Arc,
17        task::{Context, Poll},
18    },
19};
20
21type TransactionResults = crate::Result<Vec<TransactionResponse>>;
22const fn epoch(ts: &Epoch) -> i64 {
23    ts.timestamp_millis()
24}
25
26/// Stream transactions from a vault account
27pub struct TransactionStream {
28    client: Arc<Client>,
29    batch: u16,
30    init: bool, // has the stream started?
31    vault_id: i32,
32    after: Epoch,
33    is_source: bool, // are we streaming from source vault account or destination
34    fut: FuturesUnordered<BoxFuture<'static, TransactionResults>>,
35}
36
37impl TransactionStream {
38    pub fn from_source(client: Arc<Client>, batch: u16, vault_id: i32, after: Epoch) -> Self {
39        Self {
40            client,
41            batch,
42            init: false,
43            vault_id,
44            after,
45            fut: FuturesUnordered::new(),
46            is_source: true,
47        }
48    }
49
50    pub fn from_dest(client: Arc<Client>, batch: u16, vault_id: i32, after: Epoch) -> Self {
51        Self {
52            client,
53            batch,
54            init: false,
55            vault_id,
56            after,
57            fut: FuturesUnordered::new(),
58            is_source: false,
59        }
60    }
61
62    fn build_params(&self) -> GetTransactionsParams {
63        let builder = GetTransactionsParams::builder()
64            .limit(self.batch.into())
65            .order_by("createdAt".to_owned())
66            .after(epoch(&self.after).to_string())
67            .status(models::TransactionStatus::Completed)
68            .sort("ASC".to_owned());
69        if self.is_source {
70            return builder.source_id(self.vault_id.to_string()).build();
71        }
72        builder.dest_id(self.vault_id.to_string()).build()
73    }
74}
75
76impl Stream for TransactionStream {
77    type Item = TransactionResults;
78
79    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80        if !self.init {
81            tracing::debug!("init tracing stream");
82            self.init = true;
83            let client = self.client.clone();
84            let params = self.build_params();
85            let fut = async move {
86                client
87                    .transactions_api()
88                    .get_transactions(params)
89                    .await
90                    .map_err(FireblocksError::FetchTransactionsError)
91            }
92            .boxed();
93            self.fut.push(fut);
94            cx.waker().wake_by_ref();
95            return Poll::Pending;
96        }
97
98        match self.fut.poll_next_unpin(cx) {
99            Poll::Ready(opt) => {
100                if let Some(result) = opt {
101                    match result {
102                        Ok(ref va) => {
103                            if va.is_empty() {
104                                return Poll::Ready(None);
105                            }
106                            if let Some(last) = va.last() {
107                                tracing::trace!(
108                                    "1st after {:#?} last after {:#?}",
109                                    va[0].created_at,
110                                    last.created_at
111                                );
112                                if let Some(millis) = last.created_at {
113                                    #[allow(clippy::cast_possible_truncation)]
114                                    let LocalResult::Single(ts) =
115                                        Utc.timestamp_millis_opt((millis as i64) + 1)
116                                    else {
117                                        let entity: GetTransactionsError =
118                                            GetTransactionsError::DefaultResponse(ErrorSchema {
119                                                message: Some(format!(
120                                                    "invalid timestamp {millis}"
121                                                )),
122                                                code: Some(-1.0),
123                                            });
124                                        let e =
125                                            crate::apis::Error::ResponseError(ResponseContent {
126                                                status: reqwest::StatusCode::GONE,
127                                                content: String::new(),
128                                                entity: Some(entity),
129                                            });
130                                        return Poll::Ready(Some(Err(
131                                            FireblocksError::FetchTransactionsError(e),
132                                        )));
133                                    };
134                                    self.after = ts;
135                                    // last.created_at +
136                                    // chrono::Duration::milliseconds(1);
137                                }
138                            }
139                        }
140                        Err(e) => {
141                            return Poll::Ready(Some(Err(e)));
142                        }
143                    }
144                    return Poll::Ready(Some(result));
145                }
146            }
147            Poll::Pending => {
148                cx.waker().wake_by_ref();
149                return Poll::Pending;
150            }
151        }
152
153        let client = self.client.clone();
154        let params = self.build_params();
155        let fut = async move {
156            client
157                .transactions_api()
158                .get_transactions(params)
159                .await
160                .map_err(FireblocksError::FetchTransactionsError)
161        }
162        .boxed();
163        self.fut.push(fut);
164        cx.waker().wake_by_ref();
165        Poll::Pending
166    }
167}