hypersync_client/
types.rs1use std::sync::Arc;
2
3use crate::{
4 simple_types::{Block, Event, InternalEventJoinStrategy, Log, Trace, Transaction},
5 ArrowChunk, FromArrow,
6};
7use anyhow::{anyhow, Context, Result};
8use hypersync_net_types::RollbackGuard;
9use polars_arrow::datatypes::SchemaRef;
10
11#[derive(Default, Debug, Clone)]
13pub struct ArrowResponseData {
14 pub blocks: Vec<ArrowBatch>,
16 pub transactions: Vec<ArrowBatch>,
18 pub logs: Vec<ArrowBatch>,
20 pub traces: Vec<ArrowBatch>,
22 pub decoded_logs: Vec<ArrowBatch>,
26}
27
28#[derive(Default, Debug, Clone)]
30pub struct ResponseData {
31 pub blocks: Vec<Vec<Block>>,
33 pub transactions: Vec<Vec<Transaction>>,
35 pub logs: Vec<Vec<Log>>,
37 pub traces: Vec<Vec<Trace>>,
39}
40
41impl EventResponse {
42 pub(crate) fn from_arrow_response(
44 arrow_response: &ArrowResponse,
45 event_join_strategy: &InternalEventJoinStrategy,
46 ) -> Self {
47 let r: QueryResponse = arrow_response.into();
48 Self {
49 archive_height: r.archive_height,
50 next_block: r.next_block,
51 total_execution_time: r.total_execution_time,
52 data: event_join_strategy.join_from_response_data(r.data),
53 rollback_guard: r.rollback_guard,
54 }
55 }
56}
57
58impl From<&'_ ArrowResponse> for QueryResponse {
59 fn from(arrow_response: &ArrowResponse) -> Self {
60 let blocks = arrow_response
61 .data
62 .blocks
63 .iter()
64 .map(Block::from_arrow)
65 .collect();
66 let transactions = arrow_response
67 .data
68 .transactions
69 .iter()
70 .map(Transaction::from_arrow)
71 .collect();
72 let logs = arrow_response
73 .data
74 .logs
75 .iter()
76 .map(Log::from_arrow)
77 .collect();
78 let traces = arrow_response
79 .data
80 .traces
81 .iter()
82 .map(Trace::from_arrow)
83 .collect();
84
85 QueryResponse {
86 archive_height: arrow_response.archive_height,
87 next_block: arrow_response.next_block,
88 total_execution_time: arrow_response.total_execution_time,
89 data: ResponseData {
90 blocks,
91 transactions,
92 logs,
93 traces,
94 },
95 rollback_guard: arrow_response.rollback_guard.clone(),
96 }
97 }
98}
99
100#[derive(Debug, Clone)]
103pub struct QueryResponse<T = ResponseData> {
104 pub archive_height: Option<u64>,
106 pub next_block: u64,
110 pub total_execution_time: u64,
112 pub data: T,
114 pub rollback_guard: Option<RollbackGuard>,
116}
117
118pub type ArrowResponse = QueryResponse<ArrowResponseData>;
120pub type EventResponse = QueryResponse<Vec<Event>>;
122
123#[derive(Debug, Clone)]
125pub struct ArrowBatch {
126 pub chunk: Arc<ArrowChunk>,
128 pub schema: SchemaRef,
130}
131
132impl ArrowBatch {
133 pub fn optional_column<T: 'static>(&self, name: &str) -> Result<Option<&T>> {
135 match self
136 .schema
137 .fields
138 .iter()
139 .enumerate()
140 .find(|(_, f)| f.name == name)
141 {
142 Some((idx, _)) => {
143 let col = self
144 .chunk
145 .columns()
146 .get(idx)
147 .context("get column using index")?;
148 let col = col.as_any().downcast_ref::<T>().with_context(|| {
149 anyhow!(
150 "cast type of column '{}', it was {:?}",
151 name,
152 col.data_type()
153 )
154 })?;
155 Ok(Some(col))
156 }
157 None => Ok(None),
158 }
159 }
160 pub fn column<T: 'static>(&self, name: &str) -> Result<&T> {
162 self.optional_column(name)?
163 .with_context(|| anyhow!("field {} not found in schema", name))
164 }
165}