hypersync_client/
parse_response.rs

1use std::io::Cursor;
2
3use crate::{types::ArrowResponse, ArrowResponseData, QueryResponse};
4use anyhow::{Context, Result};
5use arrow::{array::RecordBatch, ipc};
6use hypersync_net_types::{hypersync_net_types_capnp, RollbackGuard};
7
8fn read_chunks(bytes: &[u8]) -> Result<Vec<RecordBatch>> {
9    let reader = Cursor::new(bytes);
10
11    let reader = ipc::reader::FileReader::try_new(reader, None).context("create reader")?;
12
13    let chunks = reader
14        .map(|chunk| chunk.context("read chunk"))
15        .collect::<Result<Vec<RecordBatch>>>()?;
16
17    Ok(chunks)
18}
19
20pub fn read_query_response(
21    query_response: &hypersync_net_types_capnp::query_response::Reader,
22) -> Result<ArrowResponse> {
23    let archive_height = match query_response.get_archive_height() {
24        -1 => None,
25        h => Some(
26            h.try_into()
27                .context("invalid archive height returned from server")?,
28        ),
29    };
30
31    let rollback_guard = if query_response.has_rollback_guard() {
32        let rg = query_response
33            .get_rollback_guard()
34            .context("get rollback guard")?;
35
36        Some(RollbackGuard {
37            block_number: rg.get_block_number(),
38            timestamp: rg.get_timestamp(),
39            hash: rg
40                .get_hash()
41                .context("get rollback guard hash")?
42                .try_into()
43                .context("hash size")?,
44            first_block_number: rg.get_first_block_number(),
45            first_parent_hash: rg
46                .get_first_parent_hash()
47                .context("get rollback guard first parent hash")?
48                .try_into()
49                .context("hash size")?,
50        })
51    } else {
52        None
53    };
54
55    let data = query_response.get_data().context("read data")?;
56
57    let blocks =
58        read_chunks(data.get_blocks().context("get block data")?).context("parse block data")?;
59    let transactions = read_chunks(data.get_transactions().context("get transaction data")?)
60        .context("parse tx data")?;
61    let logs = read_chunks(data.get_logs().context("get log data")?).context("parse log data")?;
62    let traces = if data.has_traces() {
63        read_chunks(data.get_traces().context("get trace data")?).context("parse traces data")?
64    } else {
65        Vec::new()
66    };
67
68    Ok(QueryResponse {
69        archive_height,
70        next_block: query_response.get_next_block(),
71        total_execution_time: query_response.get_total_execution_time(),
72        data: ArrowResponseData {
73            blocks,
74            transactions,
75            logs,
76            traces,
77            decoded_logs: Vec::new(),
78        },
79        rollback_guard,
80    })
81}
82
83pub fn parse_query_response(bytes: &[u8]) -> Result<ArrowResponse> {
84    let mut opts = capnp::message::ReaderOptions::new();
85    opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
86    let message_reader =
87        capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;
88
89    let query_response = message_reader
90        .get_root::<hypersync_net_types_capnp::query_response::Reader>()
91        .context("get root")?;
92    read_query_response(&query_response).context("read query response")
93}