use std::{cmp, time::Duration};
use anyhow::{anyhow, Context, Result};
use arrow2::{array::Array, chunk::Chunk};
use futures::StreamExt;
use reqwest::Method;
use skar_net_types::{skar_net_types_capnp, ArchiveHeight, Query, RollbackGuard};
mod column_mapping;
pub mod config;
mod decode;
mod parquet_out;
mod rayon_async;
mod transport_format;
mod types;
pub use column_mapping::{ColumnMapping, DataType};
pub use config::Config;
pub use decode::Decoder;
pub use skar_format as format;
use tokio::sync::mpsc;
pub use transport_format::{ArrowIpc, TransportFormat};
pub use types::{ArrowBatch, ParquetConfig, QueryResponse, QueryResponseData, StreamConfig};
pub type ArrowChunk = Chunk<Box<dyn Array>>;
#[derive(Clone)]
pub struct Client {
http_client: reqwest::Client,
cfg: Config,
}
impl Client {
pub fn new(cfg: Config) -> Result<Self> {
let http_client = reqwest::Client::builder()
.no_gzip()
.http1_only()
.timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
.tcp_keepalive(Duration::from_secs(7200))
.connect_timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
.build()
.unwrap();
Ok(Self { http_client, cfg })
}
pub async fn create_parquet_folder(&self, query: Query, config: ParquetConfig) -> Result<()> {
parquet_out::create_parquet_folder(self, query, config).await
}
pub async fn get_height(&self) -> Result<u64> {
let mut url = self.cfg.url.clone();
let mut segments = url.path_segments_mut().ok().context("get path segments")?;
segments.push("height");
std::mem::drop(segments);
let mut req = self.http_client.request(Method::GET, url);
if let Some(bearer_token) = &self.cfg.bearer_token {
req = req.bearer_auth(bearer_token);
}
let res = req.send().await.context("execute http req")?;
let status = res.status();
if !status.is_success() {
return Err(anyhow!("http response status code {}", status));
}
let height: ArchiveHeight = res.json().await.context("read response body json")?;
Ok(height.height.unwrap_or(0))
}
pub async fn get_height_with_retry(&self) -> Result<u64> {
let mut base = 1;
loop {
match self.get_height().await {
Ok(res) => return Ok(res),
Err(e) => {
log::error!("failed to send request to skar server: {:?}", e);
}
}
let secs = Duration::from_secs(base);
let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
tokio::time::sleep(secs + millis).await;
base = std::cmp::min(base + 1, 5);
}
}
pub async fn stream<Format: TransportFormat>(
&self,
query: Query,
config: StreamConfig,
) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
let (tx, rx) = mpsc::channel(config.concurrency);
let to_block = match query.to_block {
Some(to_block) => to_block,
None => {
if config.retry {
self.get_height_with_retry().await.context("get height")?
} else {
self.get_height().await.context("get height")?
}
}
};
let client = self.clone();
let step = usize::try_from(config.batch_size).unwrap();
tokio::spawn(async move {
let futs = (query.from_block..to_block)
.step_by(step)
.map(move |start| {
let end = cmp::min(start + config.batch_size, to_block);
let mut query = query.clone();
query.from_block = start;
query.to_block = Some(end);
Self::run_query_to_end(client.clone(), query, config.retry)
});
let mut stream = futures::stream::iter(futs).buffered(config.concurrency);
while let Some(resps) = stream.next().await {
let resps = match resps {
Ok(resps) => resps,
Err(e) => {
tx.send(Err(e)).await.ok();
return;
}
};
for resp in resps {
if tx.send(Ok(resp)).await.is_err() {
return;
}
}
}
});
Ok(rx)
}
async fn run_query_to_end(self, query: Query, retry: bool) -> Result<Vec<QueryResponse>> {
let mut resps = Vec::new();
let to_block = query.to_block.unwrap();
let mut query = query;
loop {
let resp = if retry {
self.send_with_retry::<crate::ArrowIpc>(&query)
.await
.context("send query")?
} else {
self.send::<crate::ArrowIpc>(&query)
.await
.context("send query")?
};
let next_block = resp.next_block;
resps.push(resp);
if next_block >= to_block {
break;
} else {
query.from_block = next_block;
}
}
Ok(resps)
}
pub async fn send<Format: TransportFormat>(&self, query: &Query) -> Result<QueryResponse> {
let mut url = self.cfg.url.clone();
let mut segments = url.path_segments_mut().ok().context("get path segments")?;
segments.push("query");
segments.push(Format::path());
std::mem::drop(segments);
let mut req = self.http_client.request(Method::POST, url);
if let Some(bearer_token) = &self.cfg.bearer_token {
req = req.bearer_auth(bearer_token);
}
let res = req.json(&query).send().await.context("execute http req")?;
let status = res.status();
if !status.is_success() {
let text = res.text().await.context("read text to see error")?;
return Err(anyhow!(
"http response status code {}, err body: {}",
status,
text
));
}
let bytes = res.bytes().await.context("read response body bytes")?;
let res = tokio::task::block_in_place(|| {
Self::parse_query_response::<Format>(&bytes).context("parse query response")
})?;
Ok(res)
}
pub async fn send_with_retry<Format: TransportFormat>(
&self,
query: &Query,
) -> Result<QueryResponse> {
let mut base = 1;
loop {
match self.send::<Format>(query).await {
Ok(res) => return Ok(res),
Err(e) => {
log::error!("failed to send request to skar server: {:?}", e);
}
}
let secs = Duration::from_secs(base);
let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
tokio::time::sleep(secs + millis).await;
base = std::cmp::min(base + 1, 5);
}
}
fn parse_query_response<Format: TransportFormat>(bytes: &[u8]) -> Result<QueryResponse> {
let mut opts = capnp::message::ReaderOptions::new();
opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
let message_reader =
capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;
let query_response = message_reader
.get_root::<skar_net_types_capnp::query_response::Reader>()
.context("get root")?;
let archive_height = match query_response.get_archive_height() {
-1 => None,
h => Some(
h.try_into()
.context("invalid archive height returned from server")?,
),
};
let rollback_guard = if query_response.has_rollback_guard() {
let rg = query_response
.get_rollback_guard()
.context("get rollback guard")?;
Some(RollbackGuard {
block_number: rg.get_block_number(),
timestamp: rg.get_timestamp(),
hash: rg
.get_hash()
.context("get rollback guard hash")?
.try_into()
.context("hash size")?,
first_block_number: rg.get_first_block_number(),
first_parent_hash: rg
.get_first_parent_hash()
.context("get rollback guard first parent hash")?
.try_into()
.context("hash size")?,
})
} else {
None
};
let data = query_response.get_data().context("read data")?;
let blocks = Format::read_chunks(data.get_blocks().context("get data")?)
.context("parse block data")?;
let transactions = Format::read_chunks(data.get_transactions().context("get data")?)
.context("parse tx data")?;
let logs =
Format::read_chunks(data.get_logs().context("get data")?).context("parse log data")?;
let traces = if data.has_traces() {
Format::read_chunks(data.get_traces().context("get data")?)
.context("parse traces data")?
} else {
Vec::new()
};
Ok(QueryResponse {
archive_height,
next_block: query_response.get_next_block(),
total_execution_time: query_response.get_total_execution_time(),
data: QueryResponseData {
blocks,
transactions,
logs,
traces,
},
rollback_guard,
})
}
}