hypersync_client/
parse_response.rs1use std::io::Cursor;
2
3use crate::{types::ArrowResponse, ArrowResponseData, QueryResponse};
4use anyhow::{Context, Result};
5use arrow::{array::RecordBatch, ipc};
6use hypersync_net_types::{hypersync_net_types_capnp, RollbackGuard};
7
8fn read_chunks(bytes: &[u8]) -> Result<Vec<RecordBatch>> {
9 let reader = Cursor::new(bytes);
10
11 let reader = ipc::reader::FileReader::try_new(reader, None).context("create reader")?;
12
13 let chunks = reader
14 .map(|chunk| chunk.context("read chunk"))
15 .collect::<Result<Vec<RecordBatch>>>()?;
16
17 Ok(chunks)
18}
19
20pub fn read_query_response(
21 query_response: &hypersync_net_types_capnp::query_response::Reader,
22) -> Result<ArrowResponse> {
23 let archive_height = match query_response.get_archive_height() {
24 -1 => None,
25 h => Some(
26 h.try_into()
27 .context("invalid archive height returned from server")?,
28 ),
29 };
30
31 let rollback_guard = if query_response.has_rollback_guard() {
32 let rg = query_response
33 .get_rollback_guard()
34 .context("get rollback guard")?;
35
36 Some(RollbackGuard {
37 block_number: rg.get_block_number(),
38 timestamp: rg.get_timestamp(),
39 hash: rg
40 .get_hash()
41 .context("get rollback guard hash")?
42 .try_into()
43 .context("hash size")?,
44 first_block_number: rg.get_first_block_number(),
45 first_parent_hash: rg
46 .get_first_parent_hash()
47 .context("get rollback guard first parent hash")?
48 .try_into()
49 .context("hash size")?,
50 })
51 } else {
52 None
53 };
54
55 let data = query_response.get_data().context("read data")?;
56
57 let blocks =
58 read_chunks(data.get_blocks().context("get block data")?).context("parse block data")?;
59 let transactions = read_chunks(data.get_transactions().context("get transaction data")?)
60 .context("parse tx data")?;
61 let logs = read_chunks(data.get_logs().context("get log data")?).context("parse log data")?;
62 let traces = if data.has_traces() {
63 read_chunks(data.get_traces().context("get trace data")?).context("parse traces data")?
64 } else {
65 Vec::new()
66 };
67
68 Ok(QueryResponse {
69 archive_height,
70 next_block: query_response.get_next_block(),
71 total_execution_time: query_response.get_total_execution_time(),
72 data: ArrowResponseData {
73 blocks,
74 transactions,
75 logs,
76 traces,
77 decoded_logs: Vec::new(),
78 },
79 rollback_guard,
80 })
81}
82
83pub fn parse_query_response(bytes: &[u8]) -> Result<ArrowResponse> {
84 let mut opts = capnp::message::ReaderOptions::new();
85 opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
86 let message_reader =
87 capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;
88
89 let query_response = message_reader
90 .get_root::<hypersync_net_types_capnp::query_response::Reader>()
91 .context("get root")?;
92 read_query_response(&query_response).context("read query response")
93}