hyperfuel_client/
types.rs

1use std::sync::Arc;
2
3use crate::{ArrowChunk, FromArrow};
4use anyhow::{anyhow, Context, Result};
5use hyperfuel_format::{BlockHeader, Input, Output, Receipt, Transaction};
6use polars_arrow::datatypes::SchemaRef;
7
8/// Query response in Arrow format
9#[derive(Default, Debug, Clone)]
10pub struct ArrowResponseData {
11    /// Query blocks response
12    pub blocks: Vec<ArrowBatch>,
13    /// Query transactions response
14    pub transactions: Vec<ArrowBatch>,
15    /// Query receipts response
16    pub receipts: Vec<ArrowBatch>,
17    /// Query inputs response
18    pub inputs: Vec<ArrowBatch>,
19    /// Query outputs response
20    pub outputs: Vec<ArrowBatch>,
21}
22
23/// Query response data in Rust native format
24#[derive(Default, Debug, Clone)]
25pub struct ResponseData {
26    /// Query blocks response
27    pub blocks: Vec<Vec<BlockHeader>>,
28    /// Query transactions response
29    pub transactions: Vec<Vec<Transaction>>,
30    /// Query receipts response
31    pub receipts: Vec<Vec<Receipt>>,
32    /// Query inputs response
33    pub inputs: Vec<Vec<Input>>,
34    /// Query outputs response
35    pub outputs: Vec<Vec<Output>>,
36}
37
38impl From<&'_ ArrowResponse> for QueryResponse {
39    fn from(arrow_response: &ArrowResponse) -> Self {
40        let blocks = arrow_response
41            .data
42            .blocks
43            .iter()
44            .map(BlockHeader::from_arrow)
45            .collect();
46        let transactions = arrow_response
47            .data
48            .transactions
49            .iter()
50            .map(Transaction::from_arrow)
51            .collect();
52        let receipts = arrow_response
53            .data
54            .receipts
55            .iter()
56            .map(Receipt::from_arrow)
57            .collect();
58        let inputs = arrow_response
59            .data
60            .inputs
61            .iter()
62            .map(Input::from_arrow)
63            .collect();
64        let outputs = arrow_response
65            .data
66            .outputs
67            .iter()
68            .map(Output::from_arrow)
69            .collect();
70
71        QueryResponse {
72            archive_height: arrow_response.archive_height,
73            next_block: arrow_response.next_block,
74            total_execution_time: arrow_response.total_execution_time,
75            data: ResponseData {
76                blocks,
77                transactions,
78                receipts,
79                inputs,
80                outputs,
81            },
82            // rollback_guard: arrow_response.rollback_guard.clone(),
83        }
84    }
85}
86
87/// Query response from hypersync instance.
88/// Contain next_block field in case query didn't process all the block range
89#[derive(Debug, Clone)]
90pub struct QueryResponse<T = ResponseData> {
91    /// Current height of the source hypersync instance
92    pub archive_height: Option<u64>,
93    /// Next block to query for, the responses are paginated so
94    /// the caller should continue the query from this block if they
95    /// didn't get responses up to the to_block they specified in the Query.
96    pub next_block: u64,
97    /// Total time it took the hypersync instance to execute the query.
98    pub total_execution_time: u64,
99    /// Response data
100    pub data: T,
101    // /// Rollback guard
102    // pub rollback_guard: Option<RollbackGuard>,
103}
104
105/// Alias for Arrow Query response
106pub type ArrowResponse = QueryResponse<ArrowResponseData>;
107
108/// Arrow chunk with schema
109#[derive(Debug, Clone)]
110pub struct ArrowBatch {
111    /// Reference to array chunk
112    pub chunk: Arc<ArrowChunk>,
113    /// Schema reference for the chunk
114    pub schema: SchemaRef,
115}
116
117impl ArrowBatch {
118    /// Extract column from chunk by name
119    pub fn column<T: 'static>(&self, name: &str) -> Result<&T> {
120        match self
121            .schema
122            .fields
123            .iter()
124            .enumerate()
125            .find(|(_, f)| f.name == name)
126        {
127            Some((idx, _)) => {
128                let col = self
129                    .chunk
130                    .columns()
131                    .get(idx)
132                    .context("get column using index")?;
133                let col = col.as_any().downcast_ref::<T>().with_context(|| {
134                    anyhow!(
135                        "cast type of column '{}', it was {:?}",
136                        name,
137                        col.data_type()
138                    )
139                })?;
140                Ok(col)
141            }
142            None => Err(anyhow!("field {} not found in schema", name)),
143        }
144    }
145}