skar_client/
types.rs

1use std::sync::Arc;
2
3use crate::{column_mapping::ColumnMapping, ArrowChunk};
4use anyhow::{anyhow, Context, Result};
5use polars_arrow::datatypes::SchemaRef;
6use serde::{Deserialize, Serialize};
7use skar_net_types::RollbackGuard;
8
9#[derive(Debug, Clone)]
10pub struct QueryResponseData {
11    pub blocks: Vec<ArrowBatch>,
12    pub transactions: Vec<ArrowBatch>,
13    pub logs: Vec<ArrowBatch>,
14    pub traces: Vec<ArrowBatch>,
15}
16
17#[derive(Debug, Clone)]
18pub struct QueryResponse {
19    /// Current height of the source hypersync instance
20    pub archive_height: Option<u64>,
21    /// Next block to query for, the responses are paginated so
22    /// the caller should continue the query from this block if they
23    /// didn't get responses up to the to_block they specified in the Query.
24    pub next_block: u64,
25    /// Total time it took the hypersync instance to execute the query.
26    pub total_execution_time: u64,
27    /// Response data
28    pub data: QueryResponseData,
29    /// Rollback guard
30    pub rollback_guard: Option<RollbackGuard>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct StreamConfig {
35    #[serde(default = "default_batch_size")]
36    /// Block range size to use when making individual requests.
37    pub batch_size: u64,
38    #[serde(default = "default_concurrency")]
39    /// Controls the number of concurrent requests made to hypersync server.
40    pub concurrency: usize,
41    /// Requests are retried forever internally if this param is set to true.
42    #[serde(default)]
43    pub retry: bool,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ParquetConfig {
48    /// Path to write parquet files to
49    pub path: String,
50    /// Define type mapping for output columns
51    #[serde(default)]
52    pub column_mapping: ColumnMapping,
53    /// Event signature to parse the logs with. example: Transfer(address indexed from, address indexed to, uint256 amount)
54    pub event_signature: Option<String>,
55    /// Convert binary output columns to hex
56    #[serde(default)]
57    pub hex_output: bool,
58    #[serde(default = "default_batch_size")]
59    /// Block range size to use when making individual requests.
60    pub batch_size: u64,
61    #[serde(default = "default_concurrency")]
62    /// Controls the number of concurrent requests made to hypersync server.
63    pub concurrency: usize,
64    /// Requests are retried forever internally if this param is set to true.
65    #[serde(default)]
66    pub retry: bool,
67}
68
69fn default_batch_size() -> u64 {
70    400
71}
72
73fn default_concurrency() -> usize {
74    10
75}
76
77#[derive(Debug, Clone)]
78pub struct ArrowBatch {
79    pub chunk: Arc<ArrowChunk>,
80    pub schema: SchemaRef,
81}
82
83impl ArrowBatch {
84    pub fn column<T: 'static>(&self, name: &str) -> Result<&T> {
85        match self
86            .schema
87            .fields
88            .iter()
89            .enumerate()
90            .find(|(_, f)| f.name == name)
91        {
92            Some((idx, _)) => {
93                let col = self
94                    .chunk
95                    .columns()
96                    .get(idx)
97                    .context("get column")?
98                    .as_any()
99                    .downcast_ref::<T>()
100                    .with_context(|| anyhow!("cast type of column '{}'", name))?;
101                Ok(col)
102            }
103            None => Err(anyhow!("field {} not found in schema", name)),
104        }
105    }
106
107    // pub fn arrow_eq(&self, other: &Self) -> ArrowComparison {
108    //     if self.schema != other.schema {
109    //         return ArrowComparison::DifferentSchema;
110    //     }
111
112    //     let mut conflicts = vec![];
113    //     for (col_index, (s, o)) in self
114    //         .chunk
115    //         .columns()
116    //         .iter()
117    //         .zip(other.chunk.columns())
118    //         .enumerate()
119    //     {
120    //         if s.data_type() == o.data_type() && can_eq(s.data_type()) {
121    //             let valid = polars_compute::eq_and_validity(s.as_ref(), o.as_ref());
122
123    //             for (index, validity) in valid.iter().enumerate() {
124    //                 if !validity.unwrap_or(true) {
125    //                     conflicts.push(ValueDifferent {
126    //                         col_name: self.schema.fields[col_index].name.clone(),
127    //                         index,
128    //                     });
129    //                 }
130    //             }
131    //         } else {
132    //             return ArrowComparison::DifferentDataType {
133    //                 col_name: self.schema.fields[col_index].name.clone(),
134    //                 s_data_type: s.data_type().clone(),
135    //                 o_data_type: o.data_type().clone(),
136    //             };
137    //         }
138    //     }
139
140    //     if conflicts.is_empty() {
141    //         ArrowComparison::Equal
142    //     } else {
143    //         ArrowComparison::ValueDifferent(conflicts)
144    //     }
145    // }
146}
147
148// pub enum ArrowComparison {
149//     ValueDifferent(Vec<ValueDifferent>),
150//     DifferentSchema,
151//     DifferentDataType {
152//         col_name: String,
153//         s_data_type: DataType,
154//         o_data_type: DataType,
155//     },
156//     Equal,
157// }
158
159// pub struct ValueDifferent {
160//     pub col_name: String,
161//     pub index: usize,
162// }