use std::{collections::BTreeSet, time::Duration};
use anyhow::{anyhow, Context, Result};
use arrow2::{array::Array, chunk::Chunk};
use filter::filter_out_unselected_data;
use from_arrow::{receipts_from_arrow_data, typed_data_from_arrow_data};
use hyperfuel_format::{Hash, Receipt};
use hyperfuel_net_types::{
hyperfuel_net_types_capnp, ArchiveHeight, FieldSelection, Query, ReceiptSelection,
};
use reqwest::Method;
pub mod config;
mod filter;
mod from_arrow;
mod parquet_out;
mod transport_format;
mod types;
pub use config::Config;
pub use transport_format::{ArrowIpc, TransportFormat};
pub use types::{
ArrowBatch, LogContext, LogResponse, QueryResponse, QueryResponseData, QueryResponseDataTyped,
QueryResponseTyped,
};
pub type ArrowChunk = Chunk<Box<dyn Array>>;
pub struct Client {
http_client: reqwest::Client,
cfg: Config,
}
impl Client {
pub fn new(cfg: Config) -> Result<Self> {
let http_client = reqwest::Client::builder()
.no_gzip()
.http1_only()
.timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
.tcp_keepalive(Duration::from_secs(7200))
.connect_timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
.build()
.unwrap();
Ok(Self { http_client, cfg })
}
pub async fn create_parquet_folder(&self, query: Query, path: String) -> Result<()> {
parquet_out::create_parquet_folder(self, query, path).await
}
pub async fn get_height(&self) -> Result<u64> {
let mut url = self.cfg.url.clone();
let mut segments = url.path_segments_mut().ok().context("get path segments")?;
segments.push("height");
std::mem::drop(segments);
let mut req = self.http_client.request(Method::GET, url);
if let Some(bearer_token) = &self.cfg.bearer_token {
req = req.bearer_auth(bearer_token);
}
let res = req.send().await.context("execute http req")?;
let status = res.status();
if !status.is_success() {
return Err(anyhow!("http response status code {}", status));
}
let height: ArchiveHeight = res.json().await.context("read response body json")?;
Ok(height.height.unwrap_or(0))
}
pub async fn get_height_with_retry(&self) -> Result<u64> {
let mut base = 1;
loop {
match self.get_height().await {
Ok(res) => return Ok(res),
Err(e) => {
log::error!("failed to send request to hyperfuel server: {:?}", e);
}
}
let secs = Duration::from_secs(base);
let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
tokio::time::sleep(secs + millis).await;
base = std::cmp::min(base + 1, 5);
}
}
pub async fn get_data(&self, query: &Query) -> Result<QueryResponseTyped> {
let res = self.get_arrow_data(query).await.context("get arrow data")?;
let mut typed_data =
typed_data_from_arrow_data(res.data).context("convert arrow data to typed response")?;
sort_receipts(&mut typed_data.receipts);
Ok(QueryResponseTyped {
archive_height: res.archive_height,
next_block: res.next_block,
total_execution_time: res.total_execution_time,
data: typed_data,
})
}
pub async fn get_selected_data(&self, query: &Query) -> Result<QueryResponseTyped> {
let query = add_selections_to_field_selection(&mut query.clone());
let res = self
.get_arrow_data(&query)
.await
.context("get arrow data")?;
let filtered_data =
filter_out_unselected_data(res.data, &query).context("filter out unselected data")?;
let mut typed_data = typed_data_from_arrow_data(filtered_data)
.context("convert arrow data to typed response")?;
sort_receipts(&mut typed_data.receipts);
Ok(QueryResponseTyped {
archive_height: res.archive_height,
next_block: res.next_block,
total_execution_time: res.total_execution_time,
data: typed_data,
})
}
pub async fn preset_query_get_logs<H: Into<Hash>>(
&self,
emitting_contracts: Vec<H>,
from_block: u64,
to_block: Option<u64>,
) -> Result<LogResponse> {
let mut receipt_field_selection = BTreeSet::new();
receipt_field_selection.insert("block_height".to_owned());
receipt_field_selection.insert("tx_id".to_owned());
receipt_field_selection.insert("tx_status".to_owned());
receipt_field_selection.insert("receipt_index".to_owned());
receipt_field_selection.insert("receipt_type".to_owned());
receipt_field_selection.insert("contract_id".to_owned());
receipt_field_selection.insert("root_contract_id".to_owned());
receipt_field_selection.insert("ra".to_owned());
receipt_field_selection.insert("rb".to_owned());
receipt_field_selection.insert("rc".to_owned());
receipt_field_selection.insert("rd".to_owned());
receipt_field_selection.insert("pc".to_owned());
receipt_field_selection.insert("is".to_owned());
receipt_field_selection.insert("ptr".to_owned());
receipt_field_selection.insert("len".to_owned());
receipt_field_selection.insert("digest".to_owned());
receipt_field_selection.insert("data".to_owned());
let emitting_contracts: Vec<Hash> =
emitting_contracts.into_iter().map(|c| c.into()).collect();
let query = Query {
from_block,
to_block,
receipts: vec![ReceiptSelection {
root_contract_id: emitting_contracts.clone(),
receipt_type: vec![5, 6],
tx_status: vec![1],
..Default::default()
}],
field_selection: FieldSelection {
receipt: receipt_field_selection,
..Default::default()
},
..Default::default()
};
let res = self
.get_arrow_data(&query)
.await
.context("get arrow data")?;
let filtered_data = filter_out_unselected_data(res.data, &query)
.context("filter out unselected receipts")?;
let mut typed_receipts = receipts_from_arrow_data(&filtered_data.receipts)
.context("convert arrow data to receipt response")?;
sort_receipts(&mut typed_receipts);
let logs: Vec<LogContext> = typed_receipts
.into_iter()
.map(|receipt| receipt.into())
.collect();
Ok(LogResponse {
archive_height: res.archive_height,
next_block: res.next_block,
total_execution_time: res.total_execution_time,
data: logs,
})
}
pub async fn get_arrow_data(&self, query: &Query) -> Result<QueryResponse> {
let mut url = self.cfg.url.clone();
let mut segments = url.path_segments_mut().ok().context("get path segments")?;
segments.push("query");
segments.push(ArrowIpc::path());
std::mem::drop(segments);
let mut req = self.http_client.request(Method::POST, url);
if let Some(bearer_token) = &self.cfg.bearer_token {
req = req.bearer_auth(bearer_token);
}
log::trace!("sending req to hyperfuel");
let res = req.json(&query).send().await.context("execute http req")?;
log::trace!("got req response");
let status = res.status();
if !status.is_success() {
let text = res.text().await.context("read text to see error")?;
return Err(anyhow!(
"http response status code {}, err body: {}",
status,
text
));
}
log::trace!("starting to get response body bytes");
let bytes = res.bytes().await.context("read response body bytes")?;
log::trace!("starting to parse query response");
let res = tokio::task::block_in_place(|| {
self.parse_query_response::<ArrowIpc>(&bytes)
.context("parse query response")
})?;
log::trace!("got data from hyperfuel");
Ok(res)
}
pub async fn get_arrow_data_with_retry(&self, query: &Query) -> Result<QueryResponse> {
let mut base = 1;
loop {
match self.get_arrow_data(query).await {
Ok(res) => return Ok(res),
Err(e) => {
log::error!("failed to send request to hyperfuel server: {:?}", e);
}
}
let secs = Duration::from_secs(base);
let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
tokio::time::sleep(secs + millis).await;
base = std::cmp::min(base + 1, 5);
}
}
fn parse_query_response<Format: TransportFormat>(&self, bytes: &[u8]) -> Result<QueryResponse> {
let mut opts = capnp::message::ReaderOptions::new();
opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
let message_reader =
capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;
let query_response = message_reader
.get_root::<hyperfuel_net_types_capnp::query_response::Reader>()
.context("get root")?;
let archive_height = match query_response.get_archive_height() {
-1 => None,
h => Some(
h.try_into()
.context("invalid archive height returned from server")?,
),
};
let data = query_response.get_data().context("read data")?;
let blocks = Format::read_chunks(data.get_blocks().context("get data")?)
.context("parse block data")?;
let transactions = Format::read_chunks(data.get_transactions().context("get data")?)
.context("parse tx data")?;
let receipts = Format::read_chunks(data.get_receipts().context("get data")?)
.context("parse receipt data")?;
let inputs = Format::read_chunks(data.get_inputs().context("get data")?)
.context("parse input data")?;
let outputs = Format::read_chunks(data.get_outputs().context("get data")?)
.context("parse output data")?;
Ok(QueryResponse {
archive_height,
next_block: query_response.get_next_block(),
total_execution_time: query_response.get_total_execution_time(),
data: QueryResponseData {
blocks,
transactions,
receipts,
inputs,
outputs,
},
})
}
}
fn add_selections_to_field_selection(query: &mut Query) -> Query {
query.receipts.iter_mut().for_each(|selection| {
if !selection.root_contract_id.is_empty() {
query
.field_selection
.receipt
.insert("root_contract_id".into());
}
if !selection.to_address.is_empty() {
query.field_selection.receipt.insert("to_address".into());
}
if !selection.asset_id.is_empty() {
query.field_selection.receipt.insert("asset_id".into());
}
if !selection.receipt_type.is_empty() {
query.field_selection.receipt.insert("receipt_type".into());
}
if !selection.sender.is_empty() {
query.field_selection.receipt.insert("sender".into());
}
if !selection.recipient.is_empty() {
query.field_selection.receipt.insert("recipient".into());
}
if !selection.contract_id.is_empty() {
query.field_selection.receipt.insert("contract_id".into());
}
if !selection.ra.is_empty() {
query.field_selection.receipt.insert("ra".into());
}
if !selection.rb.is_empty() {
query.field_selection.receipt.insert("rb".into());
}
if !selection.rc.is_empty() {
query.field_selection.receipt.insert("rc".into());
}
if !selection.rd.is_empty() {
query.field_selection.receipt.insert("rd".into());
}
});
query.inputs.iter_mut().for_each(|selection| {
if !selection.owner.is_empty() {
query.field_selection.input.insert("owner".into());
}
if !selection.asset_id.is_empty() {
query.field_selection.input.insert("asset_id".into());
}
if !selection.contract.is_empty() {
query.field_selection.input.insert("contract".into());
}
if !selection.sender.is_empty() {
query.field_selection.input.insert("sender".into());
}
if !selection.recipient.is_empty() {
query.field_selection.input.insert("recipient".into());
}
if !selection.input_type.is_empty() {
query.field_selection.input.insert("input_type".into());
}
});
query.outputs.iter_mut().for_each(|selection| {
if !selection.to.is_empty() {
query.field_selection.output.insert("to".into());
}
if !selection.asset_id.is_empty() {
query.field_selection.output.insert("asset_id".into());
}
if !selection.contract.is_empty() {
query.field_selection.output.insert("contract".into());
}
if !selection.output_type.is_empty() {
query.field_selection.output.insert("output_type".into());
}
});
query.clone()
}
fn sort_receipts(receipts: &mut [Receipt]) {
receipts.sort_by(|a, b| {
a.block_height
.cmp(&b.block_height)
.then_with(|| a.receipt_index.cmp(&b.receipt_index))
});
}
#[cfg(test)]
mod tests {
use hyperfuel_format::Receipt;
use crate::sort_receipts;
#[test]
fn test_sort_receipts() {
let mut receipts: Vec<Receipt> = vec![
Receipt {
block_height: 0.into(),
receipt_index: 1.into(),
..Default::default()
},
Receipt {
block_height: 0.into(),
receipt_index: 0.into(),
..Default::default()
},
Receipt {
block_height: 1.into(),
receipt_index: 0.into(),
..Default::default()
},
Receipt {
block_height: 2.into(),
receipt_index: 2.into(),
..Default::default()
},
Receipt {
block_height: 2.into(),
receipt_index: 3.into(),
..Default::default()
},
Receipt {
block_height: 2.into(),
receipt_index: 1.into(),
..Default::default()
},
];
sort_receipts(&mut receipts);
let correct_order: Vec<Receipt> = vec![
Receipt {
block_height: 0.into(),
receipt_index: 0.into(),
..Default::default()
},
Receipt {
block_height: 0.into(),
receipt_index: 1.into(),
..Default::default()
},
Receipt {
block_height: 1.into(),
receipt_index: 0.into(),
..Default::default()
},
Receipt {
block_height: 2.into(),
receipt_index: 1.into(),
..Default::default()
},
Receipt {
block_height: 2.into(),
receipt_index: 2.into(),
..Default::default()
},
Receipt {
block_height: 2.into(),
receipt_index: 3.into(),
..Default::default()
},
];
assert_eq!(receipts, correct_order)
}
}