hypersync_client/
types.rs

1use std::sync::Arc;
2
3use crate::{
4    simple_types::{Block, Event, Log, Trace, Transaction},
5    ArrowChunk, FromArrow,
6};
7use anyhow::{anyhow, Context, Result};
8use hypersync_net_types::RollbackGuard;
9use polars_arrow::datatypes::SchemaRef;
10
11/// Query response in Arrow format
12#[derive(Default, Debug, Clone)]
13pub struct ArrowResponseData {
14    /// Query blocks response
15    pub blocks: Vec<ArrowBatch>,
16    /// Query transactions response
17    pub transactions: Vec<ArrowBatch>,
18    /// Query logs response
19    pub logs: Vec<ArrowBatch>,
20    /// Query traces response
21    pub traces: Vec<ArrowBatch>,
22    /// Query decoded_logs response.
23    ///
24    /// Populated only if event_signature is present.
25    pub decoded_logs: Vec<ArrowBatch>,
26}
27
28/// Query response data in Rust native format
29#[derive(Default, Debug, Clone)]
30pub struct ResponseData {
31    /// Query blocks response
32    pub blocks: Vec<Vec<Block>>,
33    /// Query transactions response
34    pub transactions: Vec<Vec<Transaction>>,
35    /// Query logs response
36    pub logs: Vec<Vec<Log>>,
37    /// Query traces response
38    pub traces: Vec<Vec<Trace>>,
39}
40
41impl From<&'_ ArrowResponse> for EventResponse {
42    fn from(arrow_response: &'_ ArrowResponse) -> Self {
43        let r: QueryResponse = arrow_response.into();
44        Self {
45            archive_height: r.archive_height,
46            next_block: r.next_block,
47            total_execution_time: r.total_execution_time,
48            data: vec![r.data.into()],
49            rollback_guard: r.rollback_guard,
50        }
51    }
52}
53
54impl From<&'_ ArrowResponse> for QueryResponse {
55    fn from(arrow_response: &ArrowResponse) -> Self {
56        let blocks = arrow_response
57            .data
58            .blocks
59            .iter()
60            .map(Block::from_arrow)
61            .collect();
62        let transactions = arrow_response
63            .data
64            .transactions
65            .iter()
66            .map(Transaction::from_arrow)
67            .collect();
68        let logs = arrow_response
69            .data
70            .logs
71            .iter()
72            .map(Log::from_arrow)
73            .collect();
74        let traces = arrow_response
75            .data
76            .traces
77            .iter()
78            .map(Trace::from_arrow)
79            .collect();
80
81        QueryResponse {
82            archive_height: arrow_response.archive_height,
83            next_block: arrow_response.next_block,
84            total_execution_time: arrow_response.total_execution_time,
85            data: ResponseData {
86                blocks,
87                transactions,
88                logs,
89                traces,
90            },
91            rollback_guard: arrow_response.rollback_guard.clone(),
92        }
93    }
94}
95
96/// Query response from hypersync instance.
97/// Contain next_block field in case query didn't process all the block range
98#[derive(Debug, Clone)]
99pub struct QueryResponse<T = ResponseData> {
100    /// Current height of the source hypersync instance
101    pub archive_height: Option<u64>,
102    /// Next block to query for, the responses are paginated so
103    /// the caller should continue the query from this block if they
104    /// didn't get responses up to the to_block they specified in the Query.
105    pub next_block: u64,
106    /// Total time it took the hypersync instance to execute the query.
107    pub total_execution_time: u64,
108    /// Response data
109    pub data: T,
110    /// Rollback guard
111    pub rollback_guard: Option<RollbackGuard>,
112}
113
114/// Alias for Arrow Query response
115pub type ArrowResponse = QueryResponse<ArrowResponseData>;
116/// Alias for Event oriented, vectorized QueryResponse
117pub type EventResponse = QueryResponse<Vec<Vec<Event>>>;
118
119/// Arrow chunk with schema
120#[derive(Debug, Clone)]
121pub struct ArrowBatch {
122    /// Reference to array chunk
123    pub chunk: Arc<ArrowChunk>,
124    /// Schema reference for the chunk
125    pub schema: SchemaRef,
126}
127
128impl ArrowBatch {
129    /// Extract column from chunk by name if the name exists in the schema
130    pub fn optional_column<T: 'static>(&self, name: &str) -> Result<Option<&T>> {
131        match self
132            .schema
133            .fields
134            .iter()
135            .enumerate()
136            .find(|(_, f)| f.name == name)
137        {
138            Some((idx, _)) => {
139                let col = self
140                    .chunk
141                    .columns()
142                    .get(idx)
143                    .context("get column using index")?;
144                let col = col.as_any().downcast_ref::<T>().with_context(|| {
145                    anyhow!(
146                        "cast type of column '{}', it was {:?}",
147                        name,
148                        col.data_type()
149                    )
150                })?;
151                Ok(Some(col))
152            }
153            None => Ok(None),
154        }
155    }
156    /// Extract column from chunk by name
157    pub fn column<T: 'static>(&self, name: &str) -> Result<&T> {
158        self.optional_column(name)?
159            .with_context(|| anyhow!("field {} not found in schema", name))
160    }
161}