hypersync_client/
types.rs

1use std::sync::Arc;
2
3use crate::{
4    simple_types::{Block, Event, InternalEventJoinStrategy, Log, Trace, Transaction},
5    ArrowChunk, FromArrow,
6};
7use anyhow::{anyhow, Context, Result};
8use hypersync_net_types::RollbackGuard;
9use polars_arrow::datatypes::SchemaRef;
10
11/// Query response in Arrow format
12#[derive(Default, Debug, Clone)]
13pub struct ArrowResponseData {
14    /// Query blocks response
15    pub blocks: Vec<ArrowBatch>,
16    /// Query transactions response
17    pub transactions: Vec<ArrowBatch>,
18    /// Query logs response
19    pub logs: Vec<ArrowBatch>,
20    /// Query traces response
21    pub traces: Vec<ArrowBatch>,
22    /// Query decoded_logs response.
23    ///
24    /// Populated only if event_signature is present.
25    pub decoded_logs: Vec<ArrowBatch>,
26}
27
28/// Query response data in Rust native format
29#[derive(Default, Debug, Clone)]
30pub struct ResponseData {
31    /// Query blocks response
32    pub blocks: Vec<Vec<Block>>,
33    /// Query transactions response
34    pub transactions: Vec<Vec<Transaction>>,
35    /// Query logs response
36    pub logs: Vec<Vec<Log>>,
37    /// Query traces response
38    pub traces: Vec<Vec<Trace>>,
39}
40
41impl EventResponse {
42    /// Create EventResponse from ArrowResponse with the specified event join strategy
43    pub(crate) fn from_arrow_response(
44        arrow_response: &ArrowResponse,
45        event_join_strategy: &InternalEventJoinStrategy,
46    ) -> Self {
47        let r: QueryResponse = arrow_response.into();
48        Self {
49            archive_height: r.archive_height,
50            next_block: r.next_block,
51            total_execution_time: r.total_execution_time,
52            data: event_join_strategy.join_from_response_data(r.data),
53            rollback_guard: r.rollback_guard,
54        }
55    }
56}
57
58impl From<&'_ ArrowResponse> for QueryResponse {
59    fn from(arrow_response: &ArrowResponse) -> Self {
60        let blocks = arrow_response
61            .data
62            .blocks
63            .iter()
64            .map(Block::from_arrow)
65            .collect();
66        let transactions = arrow_response
67            .data
68            .transactions
69            .iter()
70            .map(Transaction::from_arrow)
71            .collect();
72        let logs = arrow_response
73            .data
74            .logs
75            .iter()
76            .map(Log::from_arrow)
77            .collect();
78        let traces = arrow_response
79            .data
80            .traces
81            .iter()
82            .map(Trace::from_arrow)
83            .collect();
84
85        QueryResponse {
86            archive_height: arrow_response.archive_height,
87            next_block: arrow_response.next_block,
88            total_execution_time: arrow_response.total_execution_time,
89            data: ResponseData {
90                blocks,
91                transactions,
92                logs,
93                traces,
94            },
95            rollback_guard: arrow_response.rollback_guard.clone(),
96        }
97    }
98}
99
100/// Query response from hypersync instance.
101/// Contain next_block field in case query didn't process all the block range
102#[derive(Debug, Clone)]
103pub struct QueryResponse<T = ResponseData> {
104    /// Current height of the source hypersync instance
105    pub archive_height: Option<u64>,
106    /// Next block to query for, the responses are paginated so
107    /// the caller should continue the query from this block if they
108    /// didn't get responses up to the to_block they specified in the Query.
109    pub next_block: u64,
110    /// Total time it took the hypersync instance to execute the query.
111    pub total_execution_time: u64,
112    /// Response data
113    pub data: T,
114    /// Rollback guard
115    pub rollback_guard: Option<RollbackGuard>,
116}
117
118/// Alias for Arrow Query response
119pub type ArrowResponse = QueryResponse<ArrowResponseData>;
120/// Alias for Event oriented, vectorized QueryResponse
121pub type EventResponse = QueryResponse<Vec<Event>>;
122
123/// Arrow chunk with schema
124#[derive(Debug, Clone)]
125pub struct ArrowBatch {
126    /// Reference to array chunk
127    pub chunk: Arc<ArrowChunk>,
128    /// Schema reference for the chunk
129    pub schema: SchemaRef,
130}
131
132impl ArrowBatch {
133    /// Extract column from chunk by name if the name exists in the schema
134    pub fn optional_column<T: 'static>(&self, name: &str) -> Result<Option<&T>> {
135        match self
136            .schema
137            .fields
138            .iter()
139            .enumerate()
140            .find(|(_, f)| f.name == name)
141        {
142            Some((idx, _)) => {
143                let col = self
144                    .chunk
145                    .columns()
146                    .get(idx)
147                    .context("get column using index")?;
148                let col = col.as_any().downcast_ref::<T>().with_context(|| {
149                    anyhow!(
150                        "cast type of column '{}', it was {:?}",
151                        name,
152                        col.data_type()
153                    )
154                })?;
155                Ok(Some(col))
156            }
157            None => Ok(None),
158        }
159    }
160    /// Extract column from chunk by name
161    pub fn column<T: 'static>(&self, name: &str) -> Result<&T> {
162        self.optional_column(name)?
163            .with_context(|| anyhow!("field {} not found in schema", name))
164    }
165}