skar_client/types.rs
1use std::sync::Arc;
2
3use crate::{column_mapping::ColumnMapping, ArrowChunk};
4use anyhow::{anyhow, Context, Result};
5use polars_arrow::datatypes::SchemaRef;
6use serde::{Deserialize, Serialize};
7use skar_net_types::RollbackGuard;
8
9#[derive(Debug, Clone)]
10pub struct QueryResponseData {
11 pub blocks: Vec<ArrowBatch>,
12 pub transactions: Vec<ArrowBatch>,
13 pub logs: Vec<ArrowBatch>,
14 pub traces: Vec<ArrowBatch>,
15}
16
17#[derive(Debug, Clone)]
18pub struct QueryResponse {
19 /// Current height of the source hypersync instance
20 pub archive_height: Option<u64>,
21 /// Next block to query for, the responses are paginated so
22 /// the caller should continue the query from this block if they
23 /// didn't get responses up to the to_block they specified in the Query.
24 pub next_block: u64,
25 /// Total time it took the hypersync instance to execute the query.
26 pub total_execution_time: u64,
27 /// Response data
28 pub data: QueryResponseData,
29 /// Rollback guard
30 pub rollback_guard: Option<RollbackGuard>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct StreamConfig {
35 #[serde(default = "default_batch_size")]
36 /// Block range size to use when making individual requests.
37 pub batch_size: u64,
38 #[serde(default = "default_concurrency")]
39 /// Controls the number of concurrent requests made to hypersync server.
40 pub concurrency: usize,
41 /// Requests are retried forever internally if this param is set to true.
42 #[serde(default)]
43 pub retry: bool,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ParquetConfig {
48 /// Path to write parquet files to
49 pub path: String,
50 /// Define type mapping for output columns
51 #[serde(default)]
52 pub column_mapping: ColumnMapping,
53 /// Event signature to parse the logs with. example: Transfer(address indexed from, address indexed to, uint256 amount)
54 pub event_signature: Option<String>,
55 /// Convert binary output columns to hex
56 #[serde(default)]
57 pub hex_output: bool,
58 #[serde(default = "default_batch_size")]
59 /// Block range size to use when making individual requests.
60 pub batch_size: u64,
61 #[serde(default = "default_concurrency")]
62 /// Controls the number of concurrent requests made to hypersync server.
63 pub concurrency: usize,
64 /// Requests are retried forever internally if this param is set to true.
65 #[serde(default)]
66 pub retry: bool,
67}
68
69fn default_batch_size() -> u64 {
70 400
71}
72
73fn default_concurrency() -> usize {
74 10
75}
76
77#[derive(Debug, Clone)]
78pub struct ArrowBatch {
79 pub chunk: Arc<ArrowChunk>,
80 pub schema: SchemaRef,
81}
82
83impl ArrowBatch {
84 pub fn column<T: 'static>(&self, name: &str) -> Result<&T> {
85 match self
86 .schema
87 .fields
88 .iter()
89 .enumerate()
90 .find(|(_, f)| f.name == name)
91 {
92 Some((idx, _)) => {
93 let col = self
94 .chunk
95 .columns()
96 .get(idx)
97 .context("get column")?
98 .as_any()
99 .downcast_ref::<T>()
100 .with_context(|| anyhow!("cast type of column '{}'", name))?;
101 Ok(col)
102 }
103 None => Err(anyhow!("field {} not found in schema", name)),
104 }
105 }
106
107 // pub fn arrow_eq(&self, other: &Self) -> ArrowComparison {
108 // if self.schema != other.schema {
109 // return ArrowComparison::DifferentSchema;
110 // }
111
112 // let mut conflicts = vec![];
113 // for (col_index, (s, o)) in self
114 // .chunk
115 // .columns()
116 // .iter()
117 // .zip(other.chunk.columns())
118 // .enumerate()
119 // {
120 // if s.data_type() == o.data_type() && can_eq(s.data_type()) {
121 // let valid = polars_compute::eq_and_validity(s.as_ref(), o.as_ref());
122
123 // for (index, validity) in valid.iter().enumerate() {
124 // if !validity.unwrap_or(true) {
125 // conflicts.push(ValueDifferent {
126 // col_name: self.schema.fields[col_index].name.clone(),
127 // index,
128 // });
129 // }
130 // }
131 // } else {
132 // return ArrowComparison::DifferentDataType {
133 // col_name: self.schema.fields[col_index].name.clone(),
134 // s_data_type: s.data_type().clone(),
135 // o_data_type: o.data_type().clone(),
136 // };
137 // }
138 // }
139
140 // if conflicts.is_empty() {
141 // ArrowComparison::Equal
142 // } else {
143 // ArrowComparison::ValueDifferent(conflicts)
144 // }
145 // }
146}
147
148// pub enum ArrowComparison {
149// ValueDifferent(Vec<ValueDifferent>),
150// DifferentSchema,
151// DifferentDataType {
152// col_name: String,
153// s_data_type: DataType,
154// o_data_type: DataType,
155// },
156// Equal,
157// }
158
159// pub struct ValueDifferent {
160// pub col_name: String,
161// pub index: usize,
162// }