1#![deny(missing_docs)]
2use std::{num::NonZeroU64, sync::Arc, time::Duration};
4
5use anyhow::{anyhow, Context, Result};
6use hypersync_net_types::{ArchiveHeight, ChainId, Query};
7use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk};
8use reqwest::Method;
9
10mod column_mapping;
11mod config;
12mod decode;
13mod decode_call;
14mod from_arrow;
15mod parquet_out;
16mod parse_response;
17pub mod preset_query;
18mod rayon_async;
19pub mod simple_types;
20mod stream;
21#[cfg(feature = "ethers")]
22pub mod to_ethers;
23mod types;
24mod util;
25
26pub use from_arrow::FromArrow;
27pub use hypersync_format as format;
28pub use hypersync_net_types as net_types;
29pub use hypersync_schema as schema;
30
31use parse_response::parse_query_response;
32use tokio::sync::mpsc;
33use types::{EventResponse, ResponseData};
34use url::Url;
35
36pub use column_mapping::{ColumnMapping, DataType};
37pub use config::HexOutput;
38pub use config::{ClientConfig, StreamConfig};
39pub use decode::Decoder;
40pub use decode_call::CallDecoder;
41pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};
42
43use crate::simple_types::InternalEventJoinStrategy;
44
45type ArrowChunk = Chunk<Box<dyn Array>>;
46
47#[derive(Clone, Debug)]
49pub struct Client {
50 http_client: reqwest::Client,
52 url: Url,
54 bearer_token: Option<String>,
56 max_num_retries: usize,
58 retry_backoff_ms: u64,
60 retry_base_ms: u64,
62 retry_ceiling_ms: u64,
64}
65
66impl Client {
67 pub fn new(cfg: ClientConfig) -> Result<Self> {
69 let timeout = cfg
70 .http_req_timeout_millis
71 .unwrap_or(NonZeroU64::new(30_000).unwrap());
72
73 let http_client = reqwest::Client::builder()
74 .no_gzip()
75 .timeout(Duration::from_millis(timeout.get()))
76 .build()
77 .unwrap();
78
79 Ok(Self {
80 http_client,
81 url: cfg
82 .url
83 .unwrap_or("https://eth.hypersync.xyz".parse().context("parse url")?),
84 bearer_token: cfg.bearer_token,
85 max_num_retries: cfg.max_num_retries.unwrap_or(12),
86 retry_backoff_ms: cfg.retry_backoff_ms.unwrap_or(500),
87 retry_base_ms: cfg.retry_base_ms.unwrap_or(200),
88 retry_ceiling_ms: cfg.retry_ceiling_ms.unwrap_or(5_000),
89 })
90 }
91
92 pub async fn collect(
101 self: Arc<Self>,
102 query: Query,
103 config: StreamConfig,
104 ) -> Result<QueryResponse> {
105 check_simple_stream_params(&config)?;
106
107 let mut recv = stream::stream_arrow(self, query, config)
108 .await
109 .context("start stream")?;
110
111 let mut data = ResponseData::default();
112 let mut archive_height = None;
113 let mut next_block = 0;
114 let mut total_execution_time = 0;
115
116 while let Some(res) = recv.recv().await {
117 let res = res.context("get response")?;
118 let res: QueryResponse = QueryResponse::from(&res);
119
120 for batch in res.data.blocks {
121 data.blocks.push(batch);
122 }
123 for batch in res.data.transactions {
124 data.transactions.push(batch);
125 }
126 for batch in res.data.logs {
127 data.logs.push(batch);
128 }
129 for batch in res.data.traces {
130 data.traces.push(batch);
131 }
132
133 archive_height = res.archive_height;
134 next_block = res.next_block;
135 total_execution_time += res.total_execution_time
136 }
137
138 Ok(QueryResponse {
139 archive_height,
140 next_block,
141 total_execution_time,
142 data,
143 rollback_guard: None,
144 })
145 }
146
147 pub async fn collect_events(
149 self: Arc<Self>,
150 mut query: Query,
151 config: StreamConfig,
152 ) -> Result<EventResponse> {
153 check_simple_stream_params(&config)?;
154
155 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
156 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
157
158 let mut recv = stream::stream_arrow(self, query, config)
159 .await
160 .context("start stream")?;
161
162 let mut data = Vec::new();
163 let mut archive_height = None;
164 let mut next_block = 0;
165 let mut total_execution_time = 0;
166
167 while let Some(res) = recv.recv().await {
168 let res = res.context("get response")?;
169 let res: QueryResponse = QueryResponse::from(&res);
170 let events = event_join_strategy.join_from_response_data(res.data);
171
172 data.extend(events);
173
174 archive_height = res.archive_height;
175 next_block = res.next_block;
176 total_execution_time += res.total_execution_time
177 }
178
179 Ok(EventResponse {
180 archive_height,
181 next_block,
182 total_execution_time,
183 data,
184 rollback_guard: None,
185 })
186 }
187
188 pub async fn collect_arrow(
191 self: Arc<Self>,
192 query: Query,
193 config: StreamConfig,
194 ) -> Result<ArrowResponse> {
195 let mut recv = stream::stream_arrow(self, query, config)
196 .await
197 .context("start stream")?;
198
199 let mut data = ArrowResponseData::default();
200 let mut archive_height = None;
201 let mut next_block = 0;
202 let mut total_execution_time = 0;
203
204 while let Some(res) = recv.recv().await {
205 let res = res.context("get response")?;
206
207 for batch in res.data.blocks {
208 data.blocks.push(batch);
209 }
210 for batch in res.data.transactions {
211 data.transactions.push(batch);
212 }
213 for batch in res.data.logs {
214 data.logs.push(batch);
215 }
216 for batch in res.data.traces {
217 data.traces.push(batch);
218 }
219 for batch in res.data.decoded_logs {
220 data.decoded_logs.push(batch);
221 }
222
223 archive_height = res.archive_height;
224 next_block = res.next_block;
225 total_execution_time += res.total_execution_time
226 }
227
228 Ok(ArrowResponse {
229 archive_height,
230 next_block,
231 total_execution_time,
232 data,
233 rollback_guard: None,
234 })
235 }
236
237 pub async fn collect_parquet(
240 self: Arc<Self>,
241 path: &str,
242 query: Query,
243 config: StreamConfig,
244 ) -> Result<()> {
245 parquet_out::collect_parquet(self, path, query, config).await
246 }
247
248 async fn get_chain_id_impl(&self) -> Result<u64> {
250 let mut url = self.url.clone();
251 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
252 segments.push("chain_id");
253 std::mem::drop(segments);
254 let mut req = self.http_client.request(Method::GET, url);
255
256 if let Some(bearer_token) = &self.bearer_token {
257 req = req.bearer_auth(bearer_token);
258 }
259
260 let res = req.send().await.context("execute http req")?;
261
262 let status = res.status();
263 if !status.is_success() {
264 return Err(anyhow!("http response status code {}", status));
265 }
266
267 let chain_id: ChainId = res.json().await.context("read response body json")?;
268
269 Ok(chain_id.chain_id)
270 }
271
272 async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
274 let mut url = self.url.clone();
275 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
276 segments.push("height");
277 std::mem::drop(segments);
278 let mut req = self.http_client.request(Method::GET, url);
279
280 if let Some(bearer_token) = &self.bearer_token {
281 req = req.bearer_auth(bearer_token);
282 }
283
284 if let Some(http_timeout_override) = http_timeout_override {
285 req = req.timeout(http_timeout_override);
286 }
287
288 let res = req.send().await.context("execute http req")?;
289
290 let status = res.status();
291 if !status.is_success() {
292 return Err(anyhow!("http response status code {}", status));
293 }
294
295 let height: ArchiveHeight = res.json().await.context("read response body json")?;
296
297 Ok(height.height.unwrap_or(0))
298 }
299
300 pub async fn get_chain_id(&self) -> Result<u64> {
302 let mut base = self.retry_base_ms;
303
304 let mut err = anyhow!("");
305
306 for _ in 0..self.max_num_retries + 1 {
307 match self.get_chain_id_impl().await {
308 Ok(res) => return Ok(res),
309 Err(e) => {
310 log::error!(
311 "failed to get chain_id from server, retrying... The error was: {e:?}"
312 );
313 err = err.context(format!("{e:?}"));
314 }
315 }
316
317 let base_ms = Duration::from_millis(base);
318 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
319 rand::random(),
320 self.retry_backoff_ms,
321 ));
322
323 tokio::time::sleep(base_ms + jitter).await;
324
325 base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
326 }
327
328 Err(err)
329 }
330
331 pub async fn get_height(&self) -> Result<u64> {
333 let mut base = self.retry_base_ms;
334
335 let mut err = anyhow!("");
336
337 for _ in 0..self.max_num_retries + 1 {
338 match self.get_height_impl(None).await {
339 Ok(res) => return Ok(res),
340 Err(e) => {
341 log::error!(
342 "failed to get height from server, retrying... The error was: {e:?}"
343 );
344 err = err.context(format!("{e:?}"));
345 }
346 }
347
348 let base_ms = Duration::from_millis(base);
349 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
350 rand::random(),
351 self.retry_backoff_ms,
352 ));
353
354 tokio::time::sleep(base_ms + jitter).await;
355
356 base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
357 }
358
359 Err(err)
360 }
361
362 pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
365 self.get_height_impl(http_req_timeout).await
366 }
367
368 pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
370 let arrow_response = self.get_arrow(query).await.context("get data")?;
371 Ok(QueryResponse::from(&arrow_response))
372 }
373
374 pub async fn get_events(&self, mut query: Query) -> Result<EventResponse> {
377 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
378 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
379 let arrow_response = self.get_arrow(&query).await.context("get data")?;
380 Ok(EventResponse::from_arrow_response(
381 &arrow_response,
382 &event_join_strategy,
383 ))
384 }
385
386 async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
388 let mut url = self.url.clone();
389 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
390 segments.push("query");
391 segments.push("arrow-ipc");
392 std::mem::drop(segments);
393 let mut req = self.http_client.request(Method::POST, url);
394
395 if let Some(bearer_token) = &self.bearer_token {
396 req = req.bearer_auth(bearer_token);
397 }
398
399 let res = req.json(&query).send().await.context("execute http req")?;
400
401 let status = res.status();
402 if !status.is_success() {
403 let text = res.text().await.context("read text to see error")?;
404
405 return Err(anyhow!(
406 "http response status code {}, err body: {}",
407 status,
408 text
409 ));
410 }
411
412 let bytes = res.bytes().await.context("read response body bytes")?;
413
414 let res = tokio::task::block_in_place(|| {
415 parse_query_response(&bytes).context("parse query response")
416 })?;
417
418 Ok((res, bytes.len().try_into().unwrap()))
419 }
420
421 pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
423 self.get_arrow_with_size(query).await.map(|res| res.0)
424 }
425
426 async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
428 let mut base = self.retry_base_ms;
429
430 let mut err = anyhow!("");
431
432 for _ in 0..self.max_num_retries + 1 {
433 match self.get_arrow_impl(query).await {
434 Ok(res) => return Ok(res),
435 Err(e) => {
436 log::error!(
437 "failed to get arrow data from server, retrying... The error was: {e:?}"
438 );
439 err = err.context(format!("{e:?}"));
440 }
441 }
442
443 let base_ms = Duration::from_millis(base);
444 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
445 rand::random(),
446 self.retry_backoff_ms,
447 ));
448
449 tokio::time::sleep(base_ms + jitter).await;
450
451 base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
452 }
453
454 Err(err)
455 }
456
457 pub async fn stream(
459 self: Arc<Self>,
460 query: Query,
461 config: StreamConfig,
462 ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
463 check_simple_stream_params(&config)?;
464
465 let (tx, rx): (_, mpsc::Receiver<Result<QueryResponse>>) =
466 mpsc::channel(config.concurrency.unwrap_or(10));
467
468 let mut inner_rx = self
469 .stream_arrow(query, config)
470 .await
471 .context("start inner stream")?;
472
473 tokio::spawn(async move {
474 while let Some(resp) = inner_rx.recv().await {
475 let is_err = resp.is_err();
476 if tx
477 .send(resp.map(|r| QueryResponse::from(&r)))
478 .await
479 .is_err()
480 || is_err
481 {
482 return;
483 }
484 }
485 });
486
487 Ok(rx)
488 }
489
490 pub async fn stream_events(
493 self: Arc<Self>,
494 mut query: Query,
495 config: StreamConfig,
496 ) -> Result<mpsc::Receiver<Result<EventResponse>>> {
497 check_simple_stream_params(&config)?;
498
499 let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection);
500
501 event_join_strategy.add_join_fields_to_selection(&mut query.field_selection);
502
503 let (tx, rx): (_, mpsc::Receiver<Result<EventResponse>>) =
504 mpsc::channel(config.concurrency.unwrap_or(10));
505
506 let mut inner_rx = self
507 .stream_arrow(query, config)
508 .await
509 .context("start inner stream")?;
510
511 tokio::spawn(async move {
512 while let Some(resp) = inner_rx.recv().await {
513 let is_err = resp.is_err();
514 if tx
515 .send(
516 resp.map(|r| EventResponse::from_arrow_response(&r, &event_join_strategy)),
517 )
518 .await
519 .is_err()
520 || is_err
521 {
522 return;
523 }
524 }
525 });
526
527 Ok(rx)
528 }
529
530 pub async fn stream_arrow(
532 self: Arc<Self>,
533 query: Query,
534 config: StreamConfig,
535 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
536 stream::stream_arrow(self, query, config).await
537 }
538
539 pub fn url(&self) -> &Url {
541 &self.url
542 }
543}
544
545fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
546 if config.event_signature.is_some() {
547 return Err(anyhow!(
548 "config.event_signature can't be passed to simple type function. User is expected to \
549 decode the logs using Decoder."
550 ));
551 }
552 if config.column_mapping.is_some() {
553 return Err(anyhow!(
554 "config.column_mapping can't be passed to single type function. User is expected to \
555 map values manually."
556 ));
557 }
558
559 Ok(())
560}