1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use std::{io::Cursor, sync::Arc};

use crate::{types::ArrowResponse, ArrowBatch, ArrowResponseData, QueryResponse};
use anyhow::{Context, Result};
use hypersync_net_types::{hypersync_net_types_capnp, RollbackGuard};
use polars_arrow::io::ipc;

fn read_chunks(bytes: &[u8]) -> Result<Vec<ArrowBatch>> {
    let mut reader = Cursor::new(bytes);

    let metadata = ipc::read::read_file_metadata(&mut reader).context("read metadata")?;

    let schema = metadata.schema.clone();

    let reader = ipc::read::FileReader::new(reader, metadata, None, None);

    let chunks = reader
        .map(|chunk| {
            chunk.context("read chunk").map(|chunk| ArrowBatch {
                chunk: Arc::new(chunk),
                schema: schema.clone(),
            })
        })
        .collect::<Result<Vec<ArrowBatch>>>()?;

    Ok(chunks)
}

pub fn parse_query_response(bytes: &[u8]) -> Result<ArrowResponse> {
    let mut opts = capnp::message::ReaderOptions::new();
    opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
    let message_reader =
        capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;

    let query_response = message_reader
        .get_root::<hypersync_net_types_capnp::query_response::Reader>()
        .context("get root")?;

    let archive_height = match query_response.get_archive_height() {
        -1 => None,
        h => Some(
            h.try_into()
                .context("invalid archive height returned from server")?,
        ),
    };

    let rollback_guard = if query_response.has_rollback_guard() {
        let rg = query_response
            .get_rollback_guard()
            .context("get rollback guard")?;

        Some(RollbackGuard {
            block_number: rg.get_block_number(),
            timestamp: rg.get_timestamp(),
            hash: rg
                .get_hash()
                .context("get rollback guard hash")?
                .try_into()
                .context("hash size")?,
            first_block_number: rg.get_first_block_number(),
            first_parent_hash: rg
                .get_first_parent_hash()
                .context("get rollback guard first parent hash")?
                .try_into()
                .context("hash size")?,
        })
    } else {
        None
    };

    let data = query_response.get_data().context("read data")?;

    let blocks = read_chunks(data.get_blocks().context("get data")?).context("parse block data")?;
    let transactions =
        read_chunks(data.get_transactions().context("get data")?).context("parse tx data")?;
    let logs = read_chunks(data.get_logs().context("get data")?).context("parse log data")?;
    let traces = if data.has_traces() {
        read_chunks(data.get_traces().context("get data")?).context("parse traces data")?
    } else {
        Vec::new()
    };

    Ok(QueryResponse {
        archive_height,
        next_block: query_response.get_next_block(),
        total_execution_time: query_response.get_total_execution_time(),
        data: ArrowResponseData {
            blocks,
            transactions,
            logs,
            traces,
            decoded_logs: Vec::new(),
        },
        rollback_guard,
    })
}