hypersync-client 1.1.4

client library for hypersync
Documentation
use std::{path::PathBuf, time::Instant};

use anyhow::{Context, Result};
use arrow::array::RecordBatch;
use hypersync_net_types::Query;
use parquet::{
    arrow::async_writer::AsyncArrowWriter,
    file::properties::{EnabledStatistics, WriterProperties, WriterVersion},
};
use tokio::{sync::mpsc, task::JoinHandle};

use crate::{config::StreamConfig, Client};

pub async fn collect_parquet(
    client: &Client,
    path: &str,
    query: Query,
    config: StreamConfig,
) -> Result<()> {
    let path = PathBuf::from(path);

    tokio::fs::create_dir_all(&path)
        .await
        .context("create parquet dir")?;

    let mut blocks_path = path.clone();
    blocks_path.push("blocks.parquet");
    let (mut blocks_sender, blocks_join) = spawn_writer(blocks_path)?;

    let mut transactions_path = path.clone();
    transactions_path.push("transactions.parquet");
    let (mut transactions_sender, transactions_join) = spawn_writer(transactions_path)?;

    let mut logs_path = path.clone();
    logs_path.push("logs.parquet");
    let (mut logs_sender, logs_join) = spawn_writer(logs_path)?;

    let mut traces_path = path.clone();
    traces_path.push("traces.parquet");
    let (mut traces_sender, traces_join) = spawn_writer(traces_path)?;

    let mut decoded_logs_path = path.clone();
    decoded_logs_path.push("decoded_logs.parquet");
    let (mut decoded_logs_sender, decoded_logs_join) = spawn_writer(decoded_logs_path)?;

    let mut rx = client
        .stream_arrow(query, config)
        .await
        .context("start stream")?;

    while let Some(resp) = rx.recv().await {
        let resp = resp.context("get query response")?;

        log::trace!("got data up to block {}", resp.next_block);

        let blocks_fut = async move {
            for batch in resp.data.blocks {
                blocks_sender
                    .send(batch)
                    .await
                    .context("write blocks chunk to parquet")?;
            }

            Ok::<_, anyhow::Error>(blocks_sender)
        };

        let txs_fut = async move {
            for batch in resp.data.transactions {
                transactions_sender
                    .send(batch)
                    .await
                    .context("write transactions chunk to parquet")?;
            }

            Ok::<_, anyhow::Error>(transactions_sender)
        };

        let logs_fut = {
            let data = resp.data.logs.clone();
            async move {
                for batch in data {
                    logs_sender
                        .send(batch)
                        .await
                        .context("write logs chunk to parquet")?;
                }

                Ok::<_, anyhow::Error>(logs_sender)
            }
        };

        let traces_fut = async move {
            for batch in resp.data.traces {
                traces_sender
                    .send(batch)
                    .await
                    .context("write traces chunk to parquet")?;
            }

            Ok::<_, anyhow::Error>(traces_sender)
        };

        let decoded_logs_fut = async move {
            for batch in resp.data.decoded_logs {
                decoded_logs_sender
                    .send(batch)
                    .await
                    .context("write decoded_logs chunk to parquet")?;
            }

            Ok::<_, anyhow::Error>(decoded_logs_sender)
        };

        let start = Instant::now();

        (
            blocks_sender,
            transactions_sender,
            logs_sender,
            traces_sender,
            decoded_logs_sender,
        ) = futures::future::try_join5(blocks_fut, txs_fut, logs_fut, traces_fut, decoded_logs_fut)
            .await
            .context("write to parquet")?;

        log::trace!("wrote to parquet in {} ms", start.elapsed().as_millis());
    }

    std::mem::drop(blocks_sender);
    std::mem::drop(transactions_sender);
    std::mem::drop(logs_sender);
    std::mem::drop(traces_sender);
    std::mem::drop(decoded_logs_sender);

    blocks_join
        .await
        .context("join blocks task")?
        .context("finish blocks file")?;
    transactions_join
        .await
        .context("join transactions task")?
        .context("finish transactions file")?;
    logs_join
        .await
        .context("join logs task")?
        .context("finish logs file")?;
    traces_join
        .await
        .context("join traces task")?
        .context("finish traces file")?;
    decoded_logs_join
        .await
        .context("join decoded_logs task")?
        .context("finish decoded_logs file")?;

    Ok(())
}

fn spawn_writer(path: PathBuf) -> Result<(mpsc::Sender<RecordBatch>, JoinHandle<Result<()>>)> {
    let (tx, rx) = mpsc::channel(64);

    let handle = tokio::task::spawn(async move {
        match run_writer(rx, path).await {
            Ok(v) => Ok(v),
            Err(e) => {
                log::error!("failed to run parquet writer: {e:?}");
                Err(e)
            }
        }
    });

    Ok((tx, handle))
}

async fn run_writer(mut rx: mpsc::Receiver<RecordBatch>, path: PathBuf) -> Result<()> {
    let first_batch = match rx.recv().await {
        Some(batch) => batch,
        None => return Ok(()),
    };

    let file = tokio::io::BufWriter::new(
        tokio::fs::File::create(&path)
            .await
            .context("create parquet file")?,
    );

    let props = WriterProperties::builder()
        .set_writer_version(WriterVersion::PARQUET_2_0)
        .set_statistics_enabled(EnabledStatistics::Chunk)
        .build();

    let mut writer = AsyncArrowWriter::try_new(file, first_batch.schema(), Some(props))
        .context("create writer")?;

    writer
        .write(&first_batch)
        .await
        .context("write first batch")?;

    while let Some(batch) = rx.recv().await {
        writer.write(&batch).await.context("write batch")?;
    }

    writer.close().await.context("finish writer")?;

    Ok(())
}