use std::{
cmp,
collections::BTreeMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use anyhow::{anyhow, Context, Result};
use hyperfuel_net_types::Query;
use polars_arrow::{
array::{Array, BinaryArray, BooleanArray, UInt64Array, UInt8Array, Utf8Array},
datatypes::ArrowDataType,
record_batch::RecordBatch,
};
use tokio::sync::mpsc;
use tokio::task::JoinSet;
use crate::{
config::HexOutput,
rayon_async,
types::ArrowResponse,
util::{hex_encode_batch, hex_encode_prefixed},
ArrowBatch, ArrowResponseData, StreamConfig,
};
pub async fn stream_arrow(
client: Arc<crate::Client>,
query: Query,
config: StreamConfig,
) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
let concurrency = config.concurrency.unwrap_or(10);
let batch_size = config.batch_size.unwrap_or(1000);
let max_batch_size = config.max_batch_size.unwrap_or(200_000);
let min_batch_size = config.min_batch_size.unwrap_or(200);
let response_size_ceiling = config.response_bytes_ceiling.unwrap_or(500_000);
let response_size_floor = config.response_bytes_floor.unwrap_or(250_000);
let reverse = config.reverse.unwrap_or_default();
let step = Arc::new(AtomicU64::new(batch_size));
let (tx, rx) = mpsc::channel(concurrency * 2);
let to_block = match query.to_block {
Some(to_block) => to_block,
None => client.get_height().await.context("get height")?,
};
tokio::spawn(async move {
let mut query = query;
if !reverse {
let initial_res = client.get_arrow(&query).await.context("get initial data");
match initial_res {
Ok(res) => {
let res = match map_responses(config.clone(), vec![res], reverse).await {
Ok(mut resps) => resps.remove(0),
Err(e) => {
tx.send(Err(e)).await.ok();
return;
}
};
query.from_block = res.next_block;
if tx.send(Ok(res)).await.is_err() {
return;
}
}
Err(e) => {
tx.send(Err(e)).await.ok();
return;
}
}
}
let range_iter = BlockRangeIterator::new(query.from_block, to_block, step.clone(), reverse);
let mut futs = range_iter
.enumerate()
.map(move |(req_idx, (start, end, generation))| {
let mut query = query.clone();
query.from_block = start;
query.to_block = Some(end);
let client = client.clone();
async move { (generation, req_idx, run_query_to_end(client, query).await) }
})
.peekable();
let (res_tx, mut res_rx) = mpsc::channel(concurrency * 2);
tokio::spawn(async move {
let mut set = JoinSet::new();
let mut queue = BTreeMap::new();
let mut next_req_idx = 0;
while futs.peek().is_some() {
while let Some(res) = set.try_join_next() {
let (generation, req_idx, resps) = res.unwrap();
queue.insert(req_idx, (generation, resps));
}
while set.len() >= concurrency {
let (generation, req_idx, resps) = set.join_next().await.unwrap().unwrap();
queue.insert(req_idx, (generation, resps));
}
if queue.len() < concurrency * 2 {
futs.by_ref().take(concurrency - set.len()).for_each(|fut| {
set.spawn(fut);
});
} else {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
while let Some(resps) = queue.remove(&next_req_idx) {
if res_tx.send(resps).await.is_err() {
return;
}
next_req_idx += 1;
}
}
while let Some(res) = set.join_next().await {
let (generation, req_idx, resps) = res.unwrap();
queue.insert(req_idx, (generation, resps));
}
while let Some(resps) = queue.remove(&next_req_idx) {
if res_tx.send(resps).await.is_err() {
return;
}
next_req_idx += 1;
}
});
let mut num_blocks = 0;
let mut num_transactions = 0;
let mut num_receipts = 0;
let mut num_inputs = 0;
let mut num_outputs = 0;
let mut next_generation = 0;
while let Some((generation, resps)) = res_rx.recv().await {
let resps = match resps {
Ok(resps) => resps,
Err(e) => {
tx.send(Err(e)).await.ok();
return;
}
};
let (resps, resps_size) = resps;
let resps = match map_responses(config.clone(), resps, reverse).await {
Ok(resps) => resps,
Err(e) => {
tx.send(Err(e)).await.ok();
return;
}
};
if generation == next_generation {
next_generation += 1;
if resps_size > response_size_ceiling {
let ratio = response_size_ceiling as f64 / resps_size as f64;
step.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
let x = x as u32;
let batch_size = cmp::max((x as f64 * ratio) as u64, min_batch_size);
let step = batch_size | u64::from(next_generation) << 32;
Some(step)
})
.ok();
} else if resps_size < response_size_floor {
let ratio = response_size_floor as f64 / resps_size as f64;
step.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
let x = x as u32;
let batch_size = cmp::min((x as f64 * ratio) as u64, max_batch_size);
let step = batch_size | u64::from(next_generation) << 32;
Some(step)
})
.ok();
}
}
for resp in resps {
num_blocks += count_rows(&resp.data.blocks);
num_transactions += count_rows(&resp.data.transactions);
num_receipts += count_rows(&resp.data.receipts);
num_inputs += count_rows(&resp.data.inputs);
num_outputs += count_rows(&resp.data.outputs);
if tx.send(Ok(resp)).await.is_err() {
return;
}
}
if check_entity_limit(num_blocks, config.max_num_blocks)
|| check_entity_limit(num_transactions, config.max_num_transactions)
|| check_entity_limit(num_receipts, config.max_num_receipts)
|| check_entity_limit(num_inputs, config.max_num_inputs)
|| check_entity_limit(num_outputs, config.max_num_outputs)
{
return;
}
}
});
Ok(rx)
}
fn count_rows(batches: &[ArrowBatch]) -> usize {
batches.iter().map(|b| b.chunk.len()).sum()
}
fn check_entity_limit(val: usize, limit: Option<usize>) -> bool {
if let Some(limit) = limit {
val >= limit
} else {
false
}
}
async fn map_responses(
cfg: StreamConfig,
mut responses: Vec<ArrowResponse>,
reverse: bool,
) -> Result<Vec<ArrowResponse>> {
if reverse {
responses.reverse();
for resp in responses.iter_mut() {
resp.data.blocks.reverse();
resp.data.transactions.reverse();
resp.data.receipts.reverse();
resp.data.inputs.reverse();
resp.data.outputs.reverse();
}
}
rayon_async::spawn(move || {
responses
.into_iter()
.map(|resp| {
Ok(ArrowResponse {
data: ArrowResponseData {
blocks: resp
.data
.blocks
.into_iter()
.map(|batch| {
map_batch(
cfg.column_mapping.as_ref().map(|cm| &cm.block),
cfg.hex_output,
batch,
reverse,
)
})
.collect::<Result<Vec<_>>>()?,
transactions: resp
.data
.transactions
.into_iter()
.map(|batch| {
map_batch(
cfg.column_mapping.as_ref().map(|cm| &cm.transaction),
cfg.hex_output,
batch,
reverse,
)
})
.collect::<Result<Vec<_>>>()?,
receipts: resp
.data
.receipts
.into_iter()
.map(|batch| {
map_batch(
cfg.column_mapping.as_ref().map(|cm| &cm.receipt),
cfg.hex_output,
batch,
reverse,
)
})
.collect::<Result<Vec<_>>>()?,
inputs: resp
.data
.inputs
.into_iter()
.map(|batch| {
map_batch(
cfg.column_mapping.as_ref().map(|cm| &cm.input),
cfg.hex_output,
batch,
reverse,
)
})
.collect::<Result<Vec<_>>>()?,
outputs: resp
.data
.outputs
.into_iter()
.map(|batch| {
map_batch(
cfg.column_mapping.as_ref().map(|cm| &cm.output),
cfg.hex_output,
batch,
reverse,
)
})
.collect::<Result<Vec<_>>>()?,
},
..resp
})
})
.collect()
})
.await
.unwrap()
}
fn map_batch(
column_mapping: Option<&BTreeMap<String, crate::DataType>>,
hex_output: HexOutput,
mut batch: ArrowBatch,
reverse: bool,
) -> Result<ArrowBatch> {
if reverse {
let cols = batch
.chunk
.columns()
.iter()
.map(|a| reverse_array(a.as_ref()))
.collect::<Result<Vec<_>>>()
.context("reverse the arrays")?;
let chunk = Arc::new(RecordBatch::new(cols));
batch = ArrowBatch {
chunk,
schema: batch.schema,
};
}
if let Some(map) = column_mapping {
batch =
crate::column_mapping::apply_to_batch(&batch, map).context("apply column mapping")?;
}
match hex_output {
HexOutput::NonPrefixed => batch = hex_encode_batch(&batch, faster_hex::hex_string),
HexOutput::Prefixed => batch = hex_encode_batch(&batch, hex_encode_prefixed),
HexOutput::NoEncode => (),
}
Ok(batch)
}
fn reverse_array(array: &dyn Array) -> Result<Box<dyn Array>> {
match array.data_type() {
ArrowDataType::Binary => Ok(BinaryArray::<i32>::from_iter(
array
.as_any()
.downcast_ref::<BinaryArray<i32>>()
.unwrap()
.iter()
.rev(),
)
.boxed()),
ArrowDataType::Utf8 => Ok(Utf8Array::<i32>::from_iter(
array
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap()
.iter()
.rev(),
)
.boxed()),
ArrowDataType::Boolean => Ok(BooleanArray::from_iter(
array
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap()
.iter()
.rev(),
)
.boxed()),
ArrowDataType::UInt64 => Ok(UInt64Array::from_iter(
array
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.iter()
.map(|opt| opt.copied())
.rev(),
)
.boxed()),
ArrowDataType::UInt8 => Ok(UInt8Array::from_iter(
array
.as_any()
.downcast_ref::<UInt8Array>()
.unwrap()
.iter()
.map(|opt| opt.copied())
.rev(),
)
.boxed()),
dt => Err(anyhow!(
"reversing an array of datatype {:?} is not supported",
dt
)),
}
}
async fn run_query_to_end(
client: Arc<crate::Client>,
query: Query,
) -> Result<(Vec<ArrowResponse>, u64)> {
let mut resps = Vec::new();
let to_block = query.to_block.unwrap();
let mut size = 0;
let mut query = query;
loop {
let (resp, resp_size) = client
.get_arrow_with_size(&query)
.await
.context("get data")?;
size += resp_size;
let next_block = resp.next_block;
resps.push(resp);
if next_block >= to_block {
break;
} else {
query.from_block = next_block;
}
}
Ok((resps, size))
}
pub struct BlockRangeIterator {
offset: u64,
end: u64,
step: Arc<AtomicU64>,
reverse: bool,
}
impl BlockRangeIterator {
pub fn new(start: u64, end: u64, step: Arc<AtomicU64>, reverse: bool) -> Self {
if reverse {
Self {
offset: end,
end: start,
step,
reverse,
}
} else {
Self {
offset: start,
end,
step,
reverse,
}
}
}
}
impl Iterator for BlockRangeIterator {
type Item = (u64, u64, u32);
fn next(&mut self) -> Option<Self::Item> {
if self.offset == self.end {
return None;
}
let start = self.offset;
let step = self.step.load(Ordering::SeqCst);
let generation = (step >> 32) as u32;
let batch_size = step as u32;
if self.reverse {
self.offset = cmp::max(self.offset.saturating_sub(u64::from(batch_size)), self.end);
Some((self.offset, start, generation))
} else {
self.offset = cmp::min(self.offset + u64::from(batch_size), self.end);
Some((start, self.offset, generation))
}
}
}