floe-core 0.3.6

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::sync::Arc;

use arrow::array::{
    ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int16Array, Int32Array,
    Int64Array, Int8Array, NullArray, StringArray, Time64MicrosecondArray, Time64NanosecondArray,
    TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use polars::prelude::{DataType, Series, TimeUnit};

use crate::errors::RunError;
use crate::FloeResult;

#[derive(Debug, Clone, Copy)]
pub(crate) enum ArrowTimeEncoding {
    Nanoseconds,
    Microseconds,
}

#[derive(Debug, Clone, Copy)]
pub(crate) struct ArrowConversionOptions {
    pub(crate) upcast_i8_i16_to_i32: bool,
    pub(crate) time_encoding: ArrowTimeEncoding,
}

pub(crate) fn series_to_arrow_array<F>(
    series: &Series,
    options: ArrowConversionOptions,
    unsupported_dtype: F,
) -> FloeResult<ArrayRef>
where
    F: Fn(&DataType) -> RunError,
{
    let array: ArrayRef = match series.dtype() {
        DataType::String => Arc::new(StringArray::from_iter(series.str()?)),
        DataType::Boolean => Arc::new(BooleanArray::from_iter(series.bool()?)),
        DataType::Int8 => {
            let values = series.i8()?;
            if options.upcast_i8_i16_to_i32 {
                Arc::new(Int32Array::from_iter(
                    values.into_iter().map(|opt| opt.map(i32::from)),
                ))
            } else {
                Arc::new(Int8Array::from_iter(values))
            }
        }
        DataType::Int16 => {
            let values = series.i16()?;
            if options.upcast_i8_i16_to_i32 {
                Arc::new(Int32Array::from_iter(
                    values.into_iter().map(|opt| opt.map(i32::from)),
                ))
            } else {
                Arc::new(Int16Array::from_iter(values))
            }
        }
        DataType::Int32 => Arc::new(Int32Array::from_iter(series.i32()?)),
        DataType::Int64 => Arc::new(Int64Array::from_iter(series.i64()?)),
        DataType::UInt8 => Arc::new(UInt8Array::from_iter(series.u8()?)),
        DataType::UInt16 => Arc::new(UInt16Array::from_iter(series.u16()?)),
        DataType::UInt32 => Arc::new(UInt32Array::from_iter(series.u32()?)),
        DataType::UInt64 => Arc::new(UInt64Array::from_iter(series.u64()?)),
        DataType::Float32 => Arc::new(Float32Array::from_iter(series.f32()?)),
        DataType::Float64 => Arc::new(Float64Array::from_iter(series.f64()?)),
        DataType::Date => {
            let values = series.date()?;
            Arc::new(Date32Array::from_iter(values.phys.iter()))
        }
        DataType::Datetime(unit, _) => {
            let values = series.datetime()?;
            let micros = values.phys.iter().map(|opt| match unit {
                TimeUnit::Milliseconds => opt.map(|value| value.saturating_mul(1_000)),
                TimeUnit::Microseconds => opt,
                TimeUnit::Nanoseconds => opt.map(|value| value / 1_000),
            });
            Arc::new(TimestampMicrosecondArray::from_iter(micros))
        }
        DataType::Time => {
            let values = series.time()?;
            match options.time_encoding {
                ArrowTimeEncoding::Nanoseconds => {
                    Arc::new(Time64NanosecondArray::from_iter(values.phys.iter()))
                }
                ArrowTimeEncoding::Microseconds => {
                    let micros = values.phys.iter().map(|opt| opt.map(|value| value / 1_000));
                    Arc::new(Time64MicrosecondArray::from_iter(micros))
                }
            }
        }
        DataType::Null => Arc::new(NullArray::new(series.len())),
        dtype => return Err(Box::new(unsupported_dtype(dtype))),
    };
    Ok(array)
}