use std::time::Duration;
use anyhow::{anyhow, Context, Result};
use arrow2::{array::Array, chunk::Chunk};
use reqwest::Method;
use skar_net_types::{skar_net_types_capnp, ArchiveHeight, Query};
mod config;
mod parquet_out;
mod transport_format;
mod types;
pub use config::Config;
pub use transport_format::{ArrowIpc, Parquet, TransportFormat};
pub use types::{ArrowBatch, QueryResponse, QueryResponseData};
pub type ArrowChunk = Chunk<Box<dyn Array>>;
pub struct Client {
http_client: reqwest::Client,
cfg: Config,
}
impl Client {
pub fn new(cfg: Config) -> Self {
let http_client = reqwest::Client::builder()
.no_gzip()
.http1_only()
.timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
.tcp_keepalive(Duration::from_secs(7200))
.connect_timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
.build()
.unwrap();
Self { http_client, cfg }
}
pub async fn create_parquet_folder(&self, query: Query, path: String) -> Result<()> {
parquet_out::create_parquet_folder(self, query, path).await
}
pub async fn get_height(&self) -> Result<u64> {
let mut url = self.cfg.url.clone();
let mut segments = url.path_segments_mut().ok().context("get path segments")?;
segments.push("height");
std::mem::drop(segments);
let mut req = self.http_client.request(Method::GET, url);
if let Some(bearer_token) = &self.cfg.bearer_token {
req = req.bearer_auth(bearer_token);
}
let res = req.send().await.context("execute http req")?;
let status = res.status();
if !status.is_success() {
return Err(anyhow!("http response status code {}", status));
}
let height: ArchiveHeight = res.json().await.context("read response body json")?;
Ok(height.height.unwrap_or(0))
}
pub async fn get_height_with_retry(&self) -> Result<u64> {
let mut base = 1;
loop {
match self.get_height().await {
Ok(res) => return Ok(res),
Err(e) => {
log::error!("failed to send request to skar server: {:?}", e);
}
}
let secs = Duration::from_secs(base);
let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
tokio::time::sleep(secs + millis).await;
base = std::cmp::min(base + 1, 5);
}
}
pub async fn send<Format: TransportFormat>(&self, query: &Query) -> Result<QueryResponse> {
let mut url = self.cfg.url.clone();
let mut segments = url.path_segments_mut().ok().context("get path segments")?;
segments.push("query");
segments.push(Format::path());
std::mem::drop(segments);
let mut req = self.http_client.request(Method::POST, url);
if let Some(bearer_token) = &self.cfg.bearer_token {
req = req.bearer_auth(bearer_token);
}
log::trace!("sending req to skar");
let res = req.json(&query).send().await.context("execute http req")?;
log::trace!("got req response");
let status = res.status();
if !status.is_success() {
return Err(anyhow!("http response status code {}", status));
}
log::trace!("starting to get response body bytes");
let bytes = res.bytes().await.context("read response body bytes")?;
log::trace!("starting to parse query response");
let res = tokio::task::block_in_place(|| {
parse_query_response::<Format>(&bytes).context("parse query response")
})?;
log::trace!("got data from skar");
Ok(res)
}
pub async fn send_with_retry<Format: TransportFormat>(
&self,
query: &Query,
) -> Result<QueryResponse> {
let mut base = 1;
loop {
match self.send::<Format>(query).await {
Ok(res) => return Ok(res),
Err(e) => {
log::error!("failed to send request to skar server: {:?}", e);
}
}
let secs = Duration::from_secs(base);
let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
tokio::time::sleep(secs + millis).await;
base = std::cmp::min(base + 1, 5);
}
}
}
fn parse_query_response<Format: TransportFormat>(bytes: &[u8]) -> Result<QueryResponse> {
let mut opts = capnp::message::ReaderOptions::new();
opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
let message_reader =
capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;
let query_response = message_reader
.get_root::<skar_net_types_capnp::query_response::Reader>()
.context("get root")?;
let archive_height = match query_response.get_archive_height() {
-1 => None,
h => Some(
h.try_into()
.context("invalid archive height returned from server")?,
),
};
let data = query_response.get_data().context("read data")?;
let blocks =
Format::read_chunks(data.get_blocks().context("get data")?).context("parse block data")?;
let transactions = Format::read_chunks(data.get_transactions().context("get data")?)
.context("parse tx data")?;
let logs =
Format::read_chunks(data.get_logs().context("get data")?).context("parse log data")?;
Ok(QueryResponse {
archive_height,
next_block: query_response.get_next_block(),
total_execution_time: query_response.get_total_execution_time(),
data: QueryResponseData {
blocks,
transactions,
logs,
},
})
}