hyperfuel-client 3.0.1

client library for hyperfuel
Documentation
use std::{collections::VecDeque, path::PathBuf, sync::Arc};

use anyhow::{Context, Result};
use hyperfuel_net_types::Query;
use hyperfuel_schema::concat_chunks;
use polars_arrow::{datatypes::ArrowSchema as Schema, legacy::error::PolarsError};
use polars_parquet::parquet::write::FileStreamer;
use polars_parquet::write::StatisticsOptions;
use polars_parquet::{
    read::ParquetError,
    write::{
        array_to_columns, to_parquet_schema, to_parquet_type, transverse, CompressedPage, DynIter,
        DynStreamingIterator, Encoding, FallibleStreamingIterator,
        RowGroupIterColumns as RowGroupIter, WriteOptions,
    },
};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::compat::TokioAsyncReadCompatExt;

use crate::{
    config::StreamConfig, rayon_async, util::map_batch_to_binary_view, ArrowBatch, Client,
};

pub async fn collect_parquet(
    client: Arc<Client>,
    path: &str,
    query: Query,
    config: StreamConfig,
) -> Result<()> {
    let path = PathBuf::from(path);

    tokio::fs::create_dir_all(&path)
        .await
        .context("create parquet dir")?;

    let mut blocks_path = path.clone();
    blocks_path.push("blocks.parquet");
    let (mut blocks_sender, blocks_join) = spawn_writer(blocks_path)?;

    let mut transactions_path = path.clone();
    transactions_path.push("transactions.parquet");
    let (mut transactions_sender, transactions_join) = spawn_writer(transactions_path)?;

    let mut receipts_path = path.clone();
    receipts_path.push("receipts.parquet");
    let (mut receipts_sender, receipts_join) = spawn_writer(receipts_path)?;

    let mut inputs_path = path.clone();
    inputs_path.push("inputs.parquet");
    let (mut inputs_sender, inputs_join) = spawn_writer(inputs_path)?;

    let mut outputs_path = path.clone();
    outputs_path.push("outputs.parquet");
    let (mut outputs_sender, outputs_join) = spawn_writer(outputs_path)?;

    let mut rx = client
        .stream_arrow(query, config)
        .await
        .context("start stream")?;

    while let Some(resp) = rx.recv().await {
        let resp = resp.context("get query response")?;

        log::trace!("got data up to block {}", resp.next_block);

        let blocks_fut = async move {
            for batch in resp.data.blocks {
                blocks_sender
                    .send(batch)
                    .await
                    .context("write blocks chunk to parquet")?;
            }

            Ok::<_, anyhow::Error>(blocks_sender)
        };

        let txs_fut = async move {
            for batch in resp.data.transactions {
                transactions_sender
                    .send(batch)
                    .await
                    .context("write transactions chunk to parquet")?;
            }

            Ok::<_, anyhow::Error>(transactions_sender)
        };

        let receipts_fut = async move {
            for batch in resp.data.receipts {
                receipts_sender
                    .send(batch)
                    .await
                    .context("write receipts chunk to parquet")?;
            }

            Ok::<_, anyhow::Error>(receipts_sender)
        };

        let inputs_fut = async move {
            for batch in resp.data.inputs {
                inputs_sender
                    .send(batch)
                    .await
                    .context("write inputs chunk to parquet")?;
            }

            Ok::<_, anyhow::Error>(inputs_sender)
        };

        let outputs_fut = async move {
            for batch in resp.data.outputs {
                outputs_sender
                    .send(batch)
                    .await
                    .context("write outputs chunk to parquet")?;
            }

            Ok::<_, anyhow::Error>(outputs_sender)
        };

        // Execute futures concurrently
        (
            blocks_sender,
            transactions_sender,
            receipts_sender,
            inputs_sender,
            outputs_sender,
        ) = tokio::try_join!(blocks_fut, txs_fut, receipts_fut, inputs_fut, outputs_fut)?;
    }

    std::mem::drop(blocks_sender);
    std::mem::drop(transactions_sender);
    std::mem::drop(receipts_sender);
    std::mem::drop(inputs_sender);
    std::mem::drop(outputs_sender);

    blocks_join
        .await
        .context("join blocks task")?
        .context("finish blocks file")?;
    transactions_join
        .await
        .context("join transactions task")?
        .context("finish transactions file")?;
    receipts_join
        .await
        .context("join receipts task")?
        .context("finish receipts file")?;
    inputs_join
        .await
        .context("join inputs task")?
        .context("finish inputs file")?;
    outputs_join
        .await
        .context("join outputs task")?
        .context("finish outputs file")?;

    Ok(())
}

fn spawn_writer(path: PathBuf) -> Result<(mpsc::Sender<ArrowBatch>, JoinHandle<Result<()>>)> {
    let (tx, rx) = mpsc::channel(64);

    let handle = tokio::task::spawn(async move {
        match run_writer(rx, path).await {
            Ok(v) => Ok(v),
            Err(e) => {
                log::error!("failed to run parquet writer: {:?}", e);
                Err(e)
            }
        }
    });

    Ok((tx, handle))
}

async fn run_writer(mut rx: mpsc::Receiver<ArrowBatch>, path: PathBuf) -> Result<()> {
    let make_writer = move |schema: &Schema| {
        let schema = schema.clone();
        let path = path.clone();
        async move {
            let write_options = polars_parquet::parquet::write::WriteOptions {
                write_statistics: true,
                version: polars_parquet::parquet::write::Version::V2,
            };

            let file = tokio::io::BufWriter::new(
                tokio::fs::File::create(&path)
                    .await
                    .context("create parquet file")?,
            )
            .compat();

            let parquet_schema = to_parquet_schema(&schema).context("to parquet schema")?;

            let writer = FileStreamer::new(file, parquet_schema, write_options, None);

            Ok::<_, anyhow::Error>(writer)
        }
    };

    let mut writer = None;

    let num_cpus = num_cpus::get();
    let mut encode_jobs = VecDeque::<EncodeFut>::with_capacity(num_cpus);

    let mut data = Vec::new();
    let mut total_rows = 0;
    loop {
        let mut stop = false;
        if let Some(batch) = rx.recv().await {
            total_rows += batch.chunk.len();
            data.push(batch);
        } else {
            stop = true;
        }

        if !data.is_empty() && (stop || total_rows >= ROW_GROUP_MAX_ROWS) {
            let batches = std::mem::take(&mut data);
            if encode_jobs.len() >= num_cpus {
                let fut = encode_jobs.pop_front().unwrap();
                let (rg, schema) = fut
                    .await
                    .context("join prepare task")?
                    .context("prepare row group")?;
                if writer.is_none() {
                    writer = Some(make_writer(&schema).await.context("create writer")?);
                }
                writer
                    .as_mut()
                    .unwrap()
                    .write(rg)
                    .await
                    .context("write encoded row group to file")?;
            }

            total_rows = 0;
            let schema = batches[0].schema.clone();
            let chunks = batches.into_iter().map(|b| b.chunk).collect::<Vec<_>>();
            let chunk = concat_chunks(chunks.as_slice()).context("concat chunks")?;
            let batch = ArrowBatch {
                chunk: Arc::new(chunk),
                schema: schema.clone(),
            };
            let batch = map_batch_to_binary_view(batch);

            let fut = rayon_async::spawn(move || {
                let rg = encode_row_group(
                    batch,
                    WriteOptions {
                        statistics: StatisticsOptions::default(),
                        version: polars_parquet::write::Version::V2,
                        compression: polars_parquet::write::CompressionOptions::Lz4Raw,
                        data_page_size: None,
                    },
                )
                .context("encode row group")?;

                Ok((rg, schema))
            });

            encode_jobs.push_back(fut);
        }

        if stop {
            break;
        }
    }

    while let Some(fut) = encode_jobs.pop_front() {
        let (rg, schema) = fut
            .await
            .context("join prepare task")?
            .context("prepare row group")?;
        if writer.is_none() {
            writer = Some(make_writer(&schema).await.context("create writer")?);
        }
        writer
            .as_mut()
            .unwrap()
            .write(rg)
            .await
            .context("write encoded row group to file")?;
    }

    if let Some(writer) = writer.as_mut() {
        let _size = writer.end(None).await.context("write footer")?;
    }

    Ok(())
}

type EncodeFut = tokio::sync::oneshot::Receiver<
    Result<(
        DynIter<
            'static,
            std::result::Result<
                DynStreamingIterator<'static, CompressedPage, PolarsError>,
                PolarsError,
            >,
        >,
        Arc<Schema>,
    )>,
>;

fn encode_row_group(
    batch: ArrowBatch,
    write_options: WriteOptions,
) -> Result<RowGroupIter<'static, PolarsError>> {
    let fields = batch
        .schema
        .fields
        .iter()
        .map(|field| to_parquet_type(field).context("map to parquet field"))
        .collect::<Result<Vec<_>>>()?;
    let encodings = batch
        .schema
        .fields
        .iter()
        .map(|f| transverse(&f.data_type, |_| Encoding::Plain))
        .collect::<Vec<_>>();

    let data = batch
        .chunk
        .arrays()
        .iter()
        .zip(fields)
        .zip(encodings)
        .flat_map(move |((array, type_), encoding)| {
            let encoded_columns = array_to_columns(array, type_, write_options, &encoding).unwrap();
            encoded_columns
                .into_iter()
                .map(|encoded_pages| {
                    let pages = encoded_pages;

                    let pages = DynIter::new(
                        pages
                            .into_iter()
                            .map(|x| x.map_err(|e| ParquetError::OutOfSpec(e.to_string()))),
                    );

                    let compressed_pages = pages
                        .map(|page| {
                            let page = page?;
                            polars_parquet::write::compress(page, vec![], write_options.compression)
                                .map_err(PolarsError::from)
                        })
                        .collect::<Vec<_>>();

                    Ok(DynStreamingIterator::new(CompressedPageIter {
                        data: compressed_pages.into_iter(),
                        current: None,
                    }))
                })
                .collect::<Vec<_>>()
        })
        .collect::<Vec<_>>();
    Ok(DynIter::new(data.into_iter()))
}

struct CompressedPageIter {
    data: std::vec::IntoIter<std::result::Result<CompressedPage, PolarsError>>,
    current: Option<CompressedPage>,
}

impl FallibleStreamingIterator for CompressedPageIter {
    type Item = CompressedPage;
    type Error = PolarsError;

    fn get(&self) -> Option<&Self::Item> {
        self.current.as_ref()
    }

    fn advance(&mut self) -> std::result::Result<(), Self::Error> {
        self.current = match self.data.next() {
            Some(page) => Some(page?),
            None => None,
        };
        Ok(())
    }
}

const ROW_GROUP_MAX_ROWS: usize = 10_000;