hyperfuel-client 3.0.1

client library for hyperfuel
Documentation
use std::sync::Arc;

use polars_arrow::{
    array::{
        ArrayFromIter, BinaryArray, BinaryViewArray, MutableUtf8Array, Utf8Array, Utf8ViewArray,
    },
    datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field},
};
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};

use crate::{ArrowBatch, ArrowChunk};

pub fn hex_encode_prefixed(bytes: &[u8]) -> String {
    let mut out = vec![0; bytes.len() * 2 + 2];

    out[0] = b'0';
    out[1] = b'x';

    faster_hex::hex_encode(bytes, &mut out[2..]).unwrap();

    unsafe { String::from_utf8_unchecked(out) }
}

pub fn hex_encode_batch<F: Fn(&[u8]) -> String + Send + Sync + Copy>(
    batch: &ArrowBatch,
    encode: F,
) -> ArrowBatch {
    let (fields, cols) = batch
        .chunk
        .columns()
        .par_iter()
        .zip(batch.schema.fields.par_iter())
        .map(|(col, field)| {
            let col = match col.data_type() {
                DataType::Binary => {
                    Box::new(hex_encode(col.as_any().downcast_ref().unwrap(), encode))
                }
                _ => col.clone(),
            };

            (
                Field::new(
                    field.name.clone(),
                    col.data_type().clone(),
                    field.is_nullable,
                ),
                col,
            )
        })
        .collect::<(Vec<_>, Vec<_>)>();

    ArrowBatch {
        chunk: ArrowChunk::new(cols).into(),
        schema: Schema::from(fields).into(),
    }
}

fn hex_encode<F: Fn(&[u8]) -> String + Copy>(
    input: &BinaryArray<i32>,
    encode: F,
) -> Utf8Array<i32> {
    let mut arr = MutableUtf8Array::<i32>::new();

    for buf in input.iter() {
        arr.push(buf.map(encode));
    }

    arr.into()
}

pub fn map_batch_to_binary_view(batch: ArrowBatch) -> ArrowBatch {
    let cols = batch
        .chunk
        .arrays()
        .iter()
        .map(|col| match col.data_type() {
            DataType::Binary => BinaryViewArray::arr_from_iter(
                col.as_any()
                    .downcast_ref::<BinaryArray<i32>>()
                    .unwrap()
                    .iter(),
            )
            .boxed(),
            DataType::Utf8 => Utf8ViewArray::arr_from_iter(
                col.as_any()
                    .downcast_ref::<Utf8Array<i32>>()
                    .unwrap()
                    .iter(),
            )
            .boxed(),
            _ => col.clone(),
        })
        .collect::<Vec<_>>();

    let fields = cols
        .iter()
        .zip(batch.schema.fields.iter())
        .map(|(col, field)| {
            Field::new(
                field.name.clone(),
                col.data_type().clone(),
                field.is_nullable,
            )
        })
        .collect::<Vec<_>>();

    let schema = Schema {
        fields,
        metadata: Default::default(),
    };

    ArrowBatch {
        chunk: Arc::new(ArrowChunk::new(cols)),
        schema: Arc::new(schema),
    }
}