1#![deny(missing_docs)]
2use std::{num::NonZeroU64, sync::Arc, time::Duration};
5
6use anyhow::{anyhow, Context, Result};
7use hyperfuel_net_types::{ArchiveHeight, ChainId, Query};
8use polars_arrow::{array::Array, record_batch::RecordBatchT as Chunk};
9use reqwest::Method;
10
11mod column_mapping;
12mod config;
13mod from_arrow;
14mod parquet_out;
15mod parse_response;
16mod rayon_async;
17mod stream;
19mod types;
20mod util;
21
22pub use from_arrow::FromArrow;
23pub use hyperfuel_format as format;
24pub use hyperfuel_net_types as net_types;
25pub use hyperfuel_schema as schema;
26
27use parse_response::parse_query_response;
28use tokio::sync::mpsc;
30use url::Url;
31
32pub use column_mapping::{ColumnMapping, DataType};
33pub use config::HexOutput;
34pub use config::{ClientConfig, StreamConfig};
35pub use types::{ArrowBatch, ArrowResponse, ArrowResponseData, QueryResponse};
36
37pub type ArrowChunk = Chunk<Box<dyn Array>>;
39
40#[derive(Clone, Debug)]
42pub struct Client {
43 http_client: reqwest::Client,
45 url: Url,
47 bearer_token: Option<String>,
49 max_num_retries: usize,
51 retry_backoff_ms: u64,
53 retry_base_ms: u64,
55 retry_ceiling_ms: u64,
57}
58
59impl Client {
60 pub fn new(cfg: ClientConfig) -> Result<Self> {
62 let timeout = cfg
63 .http_req_timeout_millis
64 .unwrap_or(NonZeroU64::new(30_000).unwrap());
65
66 let http_client = reqwest::Client::builder()
67 .no_gzip()
68 .timeout(Duration::from_millis(timeout.get()))
69 .build()
70 .unwrap();
71
72 Ok(Self {
73 http_client,
74 url: cfg
75 .url
76 .unwrap_or("https://eth.hypersync.xyz".parse().context("parse url")?),
77 bearer_token: cfg.bearer_token,
78 max_num_retries: cfg.max_num_retries.unwrap_or(12),
79 retry_backoff_ms: cfg.retry_backoff_ms.unwrap_or(500),
80 retry_base_ms: cfg.retry_base_ms.unwrap_or(200),
81 retry_ceiling_ms: cfg.retry_ceiling_ms.unwrap_or(5_000),
82 })
83 }
84
85 pub async fn collect_arrow(
185 self: Arc<Self>,
186 query: Query,
187 config: StreamConfig,
188 ) -> Result<ArrowResponse> {
189 let mut recv = stream::stream_arrow(self, query, config)
190 .await
191 .context("start stream")?;
192
193 let mut data = ArrowResponseData::default();
194 let mut archive_height = None;
195 let mut next_block = 0;
196 let mut total_execution_time = 0;
197
198 while let Some(res) = recv.recv().await {
199 let res = res.context("get response")?;
200
201 for batch in res.data.blocks {
202 data.blocks.push(batch);
203 }
204 for batch in res.data.transactions {
205 data.transactions.push(batch);
206 }
207 for batch in res.data.receipts {
208 data.receipts.push(batch);
209 }
210 for batch in res.data.inputs {
211 data.inputs.push(batch);
212 }
213 for batch in res.data.outputs {
214 data.outputs.push(batch);
215 }
216
217 archive_height = res.archive_height;
218 next_block = res.next_block;
219 total_execution_time += res.total_execution_time
220 }
221
222 Ok(ArrowResponse {
223 archive_height,
224 next_block,
225 total_execution_time,
226 data,
227 })
228 }
229
230 pub async fn collect_parquet(
233 self: Arc<Self>,
234 path: &str,
235 query: Query,
236 config: StreamConfig,
237 ) -> Result<()> {
238 parquet_out::collect_parquet(self, path, query, config).await
239 }
240
241 async fn get_chain_id_impl(&self) -> Result<u64> {
243 let mut url = self.url.clone();
244 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
245 segments.push("chain_id");
246 std::mem::drop(segments);
247 let mut req = self.http_client.request(Method::GET, url);
248
249 if let Some(bearer_token) = &self.bearer_token {
250 req = req.bearer_auth(bearer_token);
251 }
252
253 let res = req.send().await.context("execute http req")?;
254
255 let status = res.status();
256 if !status.is_success() {
257 return Err(anyhow!("http response status code {}", status));
258 }
259
260 let chain_id: ChainId = res.json().await.context("read response body json")?;
261
262 Ok(chain_id.chain_id)
263 }
264
265 async fn get_height_impl(&self, http_timeout_override: Option<Duration>) -> Result<u64> {
267 let mut url = self.url.clone();
268 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
269 segments.push("height");
270 std::mem::drop(segments);
271 let mut req = self.http_client.request(Method::GET, url);
272
273 if let Some(bearer_token) = &self.bearer_token {
274 req = req.bearer_auth(bearer_token);
275 }
276
277 if let Some(http_timeout_override) = http_timeout_override {
278 req = req.timeout(http_timeout_override);
279 }
280
281 let res = req.send().await.context("execute http req")?;
282
283 let status = res.status();
284 if !status.is_success() {
285 return Err(anyhow!("http response status code {}", status));
286 }
287
288 let height: ArchiveHeight = res.json().await.context("read response body json")?;
289
290 Ok(height.height.unwrap_or(0))
291 }
292
293 pub async fn get_chain_id(&self) -> Result<u64> {
295 let mut base = self.retry_base_ms;
296
297 let mut err = anyhow!("");
298
299 for _ in 0..self.max_num_retries + 1 {
300 match self.get_chain_id_impl().await {
301 Ok(res) => return Ok(res),
302 Err(e) => {
303 log::error!(
304 "failed to get chain_id from server, retrying... The error was: {:?}",
305 e
306 );
307 err = err.context(format!("{:?}", e));
308 }
309 }
310
311 let base_ms = Duration::from_millis(base);
312 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
313 rand::random(),
314 self.retry_backoff_ms,
315 ));
316
317 tokio::time::sleep(base_ms + jitter).await;
318
319 base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
320 }
321
322 Err(err)
323 }
324
325 pub async fn get_height(&self) -> Result<u64> {
327 let mut base = self.retry_base_ms;
328
329 let mut err = anyhow!("");
330
331 for _ in 0..self.max_num_retries + 1 {
332 match self.get_height_impl(None).await {
333 Ok(res) => return Ok(res),
334 Err(e) => {
335 log::error!(
336 "failed to get height from server, retrying... The error was: {:?}",
337 e
338 );
339 err = err.context(format!("{:?}", e));
340 }
341 }
342
343 let base_ms = Duration::from_millis(base);
344 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
345 rand::random(),
346 self.retry_backoff_ms,
347 ));
348
349 tokio::time::sleep(base_ms + jitter).await;
350
351 base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
352 }
353
354 Err(err)
355 }
356
357 pub async fn health_check(&self, http_req_timeout: Option<Duration>) -> Result<u64> {
360 self.get_height_impl(http_req_timeout).await
361 }
362
363 pub async fn get(&self, query: &Query) -> Result<QueryResponse> {
365 let arrow_response = self.get_arrow(query).await.context("get data")?;
366 Ok(QueryResponse::from(&arrow_response))
367 }
368 async fn get_arrow_impl(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
379 let mut url = self.url.clone();
380 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
381 segments.push("query");
382 segments.push("arrow-ipc");
383 std::mem::drop(segments);
384 let mut req = self.http_client.request(Method::POST, url);
385
386 if let Some(bearer_token) = &self.bearer_token {
387 req = req.bearer_auth(bearer_token);
388 }
389
390 let res = req.json(&query).send().await.context("execute http req")?;
391
392 let status = res.status();
393 if !status.is_success() {
394 let text = res.text().await.context("read text to see error")?;
395
396 return Err(anyhow!(
397 "http response status code {}, err body: {}",
398 status,
399 text
400 ));
401 }
402
403 let bytes = res.bytes().await.context("read response body bytes")?;
404
405 let res = tokio::task::block_in_place(|| {
406 parse_query_response(&bytes).context("parse query response")
407 })?;
408
409 Ok((res, bytes.len().try_into().unwrap()))
410 }
411
412 pub async fn get_arrow(&self, query: &Query) -> Result<ArrowResponse> {
414 self.get_arrow_with_size(query).await.map(|res| res.0)
415 }
416
417 async fn get_arrow_with_size(&self, query: &Query) -> Result<(ArrowResponse, u64)> {
419 let mut base = self.retry_base_ms;
420
421 let mut err = anyhow!("");
422
423 for _ in 0..self.max_num_retries + 1 {
424 match self.get_arrow_impl(query).await {
425 Ok(res) => return Ok(res),
426 Err(e) => {
427 log::error!(
428 "failed to get arrow data from server, retrying... The error was: {:?}",
429 e
430 );
431 err = err.context(format!("{:?}", e));
432 }
433 }
434
435 let base_ms = Duration::from_millis(base);
436 let jitter = Duration::from_millis(fastrange_rs::fastrange_64(
437 rand::random(),
438 self.retry_backoff_ms,
439 ));
440
441 tokio::time::sleep(base_ms + jitter).await;
442
443 base = std::cmp::min(base + self.retry_backoff_ms, self.retry_ceiling_ms);
444 }
445
446 Err(err)
447 }
448
449 pub async fn stream_arrow(
522 self: Arc<Self>,
523 query: Query,
524 config: StreamConfig,
525 ) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
526 stream::stream_arrow(self, query, config).await
527 }
528
529 pub fn url(&self) -> &Url {
531 &self.url
532 }
533}
534
535#[allow(dead_code)]
536fn check_simple_stream_params(config: &StreamConfig) -> Result<()> {
537 if config.column_mapping.is_some() {
538 return Err(anyhow!("config.column_mapping can't be passed to single type function. User is expected to map values manually."));
539 }
540
541 Ok(())
542}
543
544#[allow(dead_code)]
545fn add_event_join_fields_to_selection(query: &mut Query) {
546 const BLOCK_JOIN_FIELDS: &[&str] = &["height"]; const TX_JOIN_FIELDS: &[&str] = &["id"];
550 const RECEIPT_JOIN_FIELDS: &[&str] = &["tx_id", "block_height"];
551 const INPUT_JOIN_FIELDS: &[&str] = &["tx_id", "block_height"];
552 const OUTPUT_JOIN_FIELDS: &[&str] = &["tx_id", "block_height"];
553
554 if !query.field_selection.block.is_empty() {
555 for field in BLOCK_JOIN_FIELDS.iter() {
556 query.field_selection.block.insert(field.to_string());
557 }
558 }
559
560 if !query.field_selection.transaction.is_empty() {
561 for field in TX_JOIN_FIELDS.iter() {
562 query.field_selection.transaction.insert(field.to_string());
563 }
564 }
565
566 if !query.field_selection.receipt.is_empty() {
567 for field in RECEIPT_JOIN_FIELDS.iter() {
568 query.field_selection.receipt.insert(field.to_string());
569 }
570 }
571
572 if !query.field_selection.input.is_empty() {
573 for field in INPUT_JOIN_FIELDS.iter() {
574 query.field_selection.input.insert(field.to_string());
575 }
576 }
577
578 if !query.field_selection.output.is_empty() {
579 for field in OUTPUT_JOIN_FIELDS.iter() {
580 query.field_selection.output.insert(field.to_string());
581 }
582 }
583}