1#![deny(missing_docs)]
2use std::{num::NonZeroU64, sync::Arc, time::Duration};
4
5use anyhow::{anyhow, Context, Result};
6use hypersync_net_types::{ArchiveHeight, ChainId, Query};
7use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk};
8use reqwest::Method;
9
10mod column_mapping;
11mod config;
12mod decode;
13mod decode_call;
14mod from_arrow;
15mod parquet_out;
16mod parse_response;
17pub mod preset_query;
18mod rayon_async;
19pub mod simple_types;
20mod stream;
21#[cfg(feature = "ethers")]
22pub mod to_ethers;
23mod types;
24mod util;
25
26pub use from_arrow::FromArrow;
27pub use hypersync_format as format;
28pub use hypersync_net_types as net_types;
29pub use hypersync_schema as schema;
30
31use parse_response::parse_query_response;
32use simple_types::Event;
33use tokio::sync::mpsc;
34use types::{EventResponse, ResponseData};
35use url::Url;
36
37pub use column_mapping::{ColumnMapping, DataType};
38pub use config::HexOutput;
39pub use config::{ClientConfig, StreamConfig};
40pub use decode::Decoder;
41pub use decode_call::CallDecoder;
42pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};
43
44type ArrowChunk = Chunk<Box<dyn Array>>;
45
46#[derive(Clone, Debug)]
48pub struct Client {
49 http_client: reqwest::Client,
51 url: Url,
53 bearer_token: Option<String>,
55 max_num_retries: usize,
57 retry_backoff_ms: u64,
59 retry_base_ms: u64,
61 retry_ceiling_ms: u64,
63}
64
65impl Client {
66 pub fn new(cfg: ClientConfig) -> Result<Self> {
68 let timeout = cfg
69 .http_req_timeout_millis
70 .unwrap_or(NonZeroU64::new(30_000).unwrap());
71
72 let http_client = reqwest::Client::builder()
73 .no_gzip()
74 .timeout(Duration::from_millis(timeout.get()))
75 .build()
76 .unwrap();
77
78 Ok(Self {
79 http_client,
80 url: cfg
81 .url
82 .unwrap_or("https://eth.hypersync.xyz".parse().context("parse url")?),
83 bearer_token: cfg.bearer_token,
84 max_num_retries: cfg.max_num_retries.unwrap_or(12),
85 retry_backoff_ms: cfg.retry_backoff_ms.unwrap_or(500),
86 retry_base_ms: cfg.retry_base_ms.unwrap_or(200),
87 retry_ceiling_ms: cfg.retry_ceiling_ms.unwrap_or(5_000),
88 })
89 }
90
91 pub async fn collect(
100 self: Arc<Self>,
101 query: Query,
102 config: StreamConfig,
103 ) -> Result<QueryResponse> {
104 check_simple_stream_params(&config)?;
105
106 let mut recv = stream::stream_arrow(self, query, config)
107 .await
108 .context("start stream")?;
109
110 let mut data = ResponseData::default();
111 let mut archive_height = None;
112 let mut next_block = 0;
113 let mut total_execution_time = 0;
114
115 while let Some(res) = recv.recv().await {
116 let res = res.context("get response")?;
117 let res: QueryResponse = QueryResponse::from(&res);
118
119 for batch in res.data.blocks {
120 data.blocks.push(batch);
121 }
122 for batch in res.data.transactions {
123 data.transactions.push(batch);
124 }
125 for batch in res.data.logs {
126 data.logs.push(batch);
127 }
128 for batch in res.data.traces {
129 data.traces.push(batch);
130 }
131
132 archive_height = res.archive_height;
133 next_block = res.next_block;
134 total_execution_time += res.total_execution_time
135 }
136
137 Ok(QueryResponse {
138 archive_height,
139 next_block,
140 total_execution_time,
141 data,
142 rollback_guard: None,
143 })
144 }
145
146 pub async fn collect_events(
148 self: Arc<Self>,
149 mut query: Query,
150 config: StreamConfig,
151 ) -> Result<EventResponse> {
152 check_simple_stream_params(&config)?;
153
154 add_event_join_fields_to_selection(&mut query);
155
156 let mut recv = stream::stream_arrow(self, query, config)
157 .await
158 .context("start stream")?;
159
160 let mut data = Vec::new();
161 let mut archive_height = None;
162 let mut next_block = 0;
163 let mut total_execution_time = 0;
164
165 while let Some(res) = recv.recv().await {
166 let res = res.context("get response")?;
167 let res: QueryResponse = QueryResponse::from(&res);
168 let events: Vec<Event> = res.data.into();
169
170 data.push(events);
171
172 archive_height = res.archive_height;
173 next_block = res.next_block;
174 total_execution_time += res.total_execution_time
175 }
176
177 Ok(EventResponse {
178 archive_height,
179 next_block,
180 total_execution_time,
181 data,
182 rollback_guard: None,
183 })
184 }
185
186 pub async fn collect_arrow(
189 self: Arc<Self>,
190 query: Query,
191 config: StreamConfig,
192 ) -> Result<ArrowResponse> {
193 let mut recv = stream::stream_arrow(self, query, config)
194 .await
195 .context("start stream")?;
196
197 let mut data = ArrowResponseData::default();
198 let mut archive_height = None;
199 let mut next_block = 0;
200 let mut total_execution_time = 0;
201
202 while let Some(res) = recv.recv().await {
203 let res = res.context("get response")?;
204
205 for batch in res.data.blocks {
206 data.blocks.push(batch);
207 }
208 for batch in res.data.transactions {
209 data.transactions.push(batch);
210 }
211 for batch in res.data.logs {
212 data.logs.push(batch);
213 }
214 for batch in res.data.traces {
215 data.traces.push(batch);
216 }
217 for batch in res.data.decoded_logs {
218 data.decoded_logs.push(batch);
219 }
220
221 archive_height = res.archive_height;
222 next_block = res.next_block;
223 total_execution_time += res.total_execution_time
224 }
225
226 Ok(ArrowResponse {
227 archive_height,
228 next_block,
229 total_execution_time,
230 data,
231 rollback_guard: None,
232 })
233 }
234
235 pub async fn collect_parquet(
238 self: Arc<Self>,
239 path: &str,
240 query: Query,
241 config: StreamConfig,
242 ) -> Result<()> {
243 parquet_out::collect_parquet(self, path, query, config).await
244 }
245
246 async fn get_chain_id_impl(&self) -> Result<u64> {
248 let mut url = self.url.clone();
249 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
250 segments.push("chain_id");
251 std::mem::drop(segments);
252 let mut req = self.http_client.request(Method::GET, url);
253
254 if let Some(bearer_token) = &self.bearer_token {
255 req = req.bearer_auth(bearer_token);
256 }
257
258 let res = req.send().await.context("execute http req")?;
259
260 let status = res.status();
261 if !status.is_success() {
262 return Err(anyhow!("http response status code {}", status));
263 }
264
265 let chain_id: ChainId = res.json().await.context("read response body json")?;
266
267 Ok(chain_id.chain_id)
268 }
269
270 async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
272 let mut url = self.url.clone();
273 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
274 segments.push("height");
275 std::mem::drop(segments);
276 let mut req = self.http_client.request(Method::GET, url);
277
278 if let Some(bearer_token) = &self.bearer_token {
279 req = req.bearer_auth(bearer_token);
280 }
281
282 if let Some(http_timeout_override) = http_timeout_override {
283 req = req.timeout(http_timeout_override);
284 }
285
286 let res = req.send().await.context("execute http req")?;
287
288 let status = res.status();
289 if !status.is_success() {
290 return Err(anyhow!("http response status code {}", status));
291 }
292
293 let height: ArchiveHeight = res.json().await.context("read response body json")?;
294
295 Ok(height.height.unwrap_or(0))
296 }
297
298 pub async fn get_chain_id(&self) -> Result<u64> {
300 let mut base = self.retry_base_ms;
301
302 let mut err = anyhow!("");
303
304 for _ in 0..self.max_num_retries + 1 {
305 match self.get_chain_id_impl().await {
306 Ok(res) => return Ok(res),
307 Err(e) => {
308 log::error!(
309 "failed to get chain_id from server, retrying... The error was: {:?}",
310 e
311 );
312 err = err.context(format!("{:?}", e));
313 }
314 }
315
316 let base_ms = Duration::from_millis(base);
317 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
318 rand::random(),
319 self.retry_backoff_ms,
320 ));
321
322 tokio::time::sleep(base_ms + jitter).await;
323
324 base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
325 }
326
327 Err(err)
328 }
329
330 pub async fn get_height(&self) -> Result<u64> {
332 let mut base = self.retry_base_ms;
333
334 let mut err = anyhow!("");
335
336 for _ in 0..self.max_num_retries + 1 {
337 match self.get_height_impl(None).await {
338 Ok(res) => return Ok(res),
339 Err(e) => {
340 log::error!(
341 "failed to get height from server, retrying... The error was: {:?}",
342 e
343 );
344 err = err.context(format!("{:?}", e));
345 }
346 }
347
348 let base_ms = Duration::from_millis(base);
349 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
350 rand::random(),
351 self.retry_backoff_ms,
352 ));
353
354 tokio::time::sleep(base_ms + jitter).await;
355
356 base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
357 }
358
359 Err(err)
360 }
361
362 pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
365 self.get_height_impl(http_req_timeout).await
366 }
367
368 pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
370 let arrow_response = self.get_arrow(query).await.context("get data")?;
371 Ok(QueryResponse::from(&arrow_response))
372 }
373
374 pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
377 add_event_join_fields_to_selection(&mut query);
378 let arrow_response = self.get_arrow(&query).await.context("get data")?;
379 Ok(EventResponse::from(&arrow_response))
380 }
381
382 async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
384 let mut url = self.url.clone();
385 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
386 segments.push("query");
387 segments.push("arrow-ipc");
388 std::mem::drop(segments);
389 let mut req = self.http_client.request(Method::POST, url);
390
391 if let Some(bearer_token) = &self.bearer_token {
392 req = req.bearer_auth(bearer_token);
393 }
394
395 let res = req.json(&query).send().await.context("execute http req")?;
396
397 let status = res.status();
398 if !status.is_success() {
399 let text = res.text().await.context("read text to see error")?;
400
401 return Err(anyhow!(
402 "http response status code {}, err body: {}",
403 status,
404 text
405 ));
406 }
407
408 let bytes = res.bytes().await.context("read response body bytes")?;
409
410 let res = tokio::task::block_in_place(|| {
411 parse_query_response(&bytes).context("parse query response")
412 })?;
413
414 Ok((res, bytes.len().try_into().unwrap()))
415 }
416
417 pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
419 self.get_arrow_with_size(query).await.map(|res| res.0)
420 }
421
422 async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
424 let mut base = self.retry_base_ms;
425
426 let mut err = anyhow!("");
427
428 for _ in 0..self.max_num_retries + 1 {
429 match self.get_arrow_impl(query).await {
430 Ok(res) => return Ok(res),
431 Err(e) => {
432 log::error!(
433 "failed to get arrow data from server, retrying... The error was: {:?}",
434 e
435 );
436 err = err.context(format!("{:?}", e));
437 }
438 }
439
440 let base_ms = Duration::from_millis(base);
441 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
442 rand::random(),
443 self.retry_backoff_ms,
444 ));
445
446 tokio::time::sleep(base_ms + jitter).await;
447
448 base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
449 }
450
451 Err(err)
452 }
453
454 pub async fn stream(
456 self: Arc<Self>,
457 query: Query,
458 config: StreamConfig,
459 ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
460 check_simple_stream_params(&config)?;
461
462 let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
463 mpsc::channel(config.concurrency.unwrap_or(10));
464
465 let mut inner_rx = self
466 .stream_arrow(query, config)
467 .await
468 .context("start inner stream")?;
469
470 tokio::spawn(async move {
471 while let Some(resp) = inner_rx.recv().await {
472 let is_err = resp.is_err();
473 if tx
474 .send(resp.map(|r| QueryResponse::from(&r)))
475 .await
476 .is_err()
477 || is_err
478 {
479 return;
480 }
481 }
482 });
483
484 Ok(rx)
485 }
486
487 pub async fn stream_events(
490 self: Arc<Self>,
491 mut query: Query,
492 config: StreamConfig,
493 ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
494 check_simple_stream_params(&config)?;
495
496 add_event_join_fields_to_selection(&mut query);
497
498 let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
499 mpsc::channel(config.concurrency.unwrap_or(10));
500
501 let mut inner_rx = self
502 .stream_arrow(query, config)
503 .await
504 .context("start inner stream")?;
505
506 tokio::spawn(async move {
507 while let Some(resp) = inner_rx.recv().await {
508 let is_err = resp.is_err();
509 if tx
510 .send(resp.map(|r| EventResponse::from(&r)))
511 .await
512 .is_err()
513 || is_err
514 {
515 return;
516 }
517 }
518 });
519
520 Ok(rx)
521 }
522
523 pub async fn stream_arrow(
525 self: Arc<Self>,
526 query: Query,
527 config: StreamConfig,
528 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
529 stream::stream_arrow(self, query, config).await
530 }
531
532 pub fn url(&self) -> &Url {
534 &self.url
535 }
536}
537
538fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
539 if config.event_signature.is_some() {
540 return Err(anyhow!(
541 "config.event_signature can't be passed to simple type function. User is expected to \
542 decode the logs using Decoder."
543 ));
544 }
545 if config.column_mapping.is_some() {
546 return Err(anyhow!(
547 "config.column_mapping can't be passed to single type function. User is expected to \
548 map values manually."
549 ));
550 }
551
552 Ok(())
553}
554
555fn add_event_join_fields_to_selection(query: &mut Query) {
556 const BLOCK_JOIN_FIELDS: &[&str] = &["number"];
559 const TX_JOIN_FIELDS: &[&str] = &["hash"];
560 const LOG_JOIN_FIELDS: &[&str] = &["transaction_hash", "block_number"];
561
562 if !query.field_selection.block.is_empty() {
563 for field in BLOCK_JOIN_FIELDS.iter() {
564 query.field_selection.block.insert(field.to_string());
565 }
566 }
567
568 if !query.field_selection.transaction.is_empty() {
569 for field in TX_JOIN_FIELDS.iter() {
570 query.field_selection.transaction.insert(field.to_string());
571 }
572 }
573
574 if !query.field_selection.log.is_empty() {
575 for field in LOG_JOIN_FIELDS.iter() {
576 query.field_selection.log.insert(field.to_string());
577 }
578 }
579}