polite 0.1.3

rusqlite × Polars bridge library
Documentation
//! Destination implementation for Arrow and Polars.

mod arrow_assoc;
mod errors;
mod funcs;
pub mod typesystem;

pub use self::errors::{ArrowDestinationError, Result};
pub use self::typesystem::ArrowTypeSystem;
use super::{Consume, Destination, DestinationPartition};
use crate::connectorx::constants::RECORD_BATCH_SIZE;
use crate::connectorx::data_order::DataOrder;
use crate::connectorx::typesystem::{Realize, TypeAssoc, TypeSystem};
use anyhow::anyhow;
use arrow::{datatypes::Schema, record_batch::RecordBatch};
use arrow_assoc::ArrowAssoc;
use fehler::{throw, throws};
use funcs::{FFinishBuilder, FNewBuilder, FNewField};
use itertools::Itertools;
use std::{
    any::Any,
    sync::{Arc, Mutex},
};

// #[cfg(feature = "dst_polars")]
use {
    arrow::ffi::to_ffi,
    polars::prelude::{concat, DataFrame, IntoLazy, PlSmallStr, Series, UnionArgs},
    polars_arrow::ffi::{import_array_from_c, import_field_from_c},
    std::iter::FromIterator,
    std::mem::transmute,
};

type Builder = Box<dyn Any + Send>;
type Builders = Vec<Builder>;

pub struct ArrowDestination {
    schema: Vec<ArrowTypeSystem>,
    names: Vec<String>,
    data: Arc<Mutex<Vec<RecordBatch>>>,
    arrow_schema: Arc<Schema>,
    batch_size: usize,
}

impl Default for ArrowDestination {
    fn default() -> Self {
        ArrowDestination {
            schema: vec![],
            names: vec![],
            data: Arc::new(Mutex::new(vec![])),
            arrow_schema: Arc::new(Schema::empty()),
            batch_size: RECORD_BATCH_SIZE,
        }
    }
}

impl ArrowDestination {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn new_with_batch_size(batch_size: usize) -> Self {
        ArrowDestination {
            schema: vec![],
            names: vec![],
            data: Arc::new(Mutex::new(vec![])),
            arrow_schema: Arc::new(Schema::empty()),
            batch_size,
        }
    }
}

impl Destination for ArrowDestination {
    const DATA_ORDERS: &'static [DataOrder] = &[DataOrder::ColumnMajor, DataOrder::RowMajor];
    type TypeSystem = ArrowTypeSystem;
    type Partition<'a> = ArrowPartitionWriter;
    type Error = ArrowDestinationError;

    fn needs_count(&self) -> bool {
        false
    }

    #[throws(ArrowDestinationError)]
    fn allocate<S: AsRef<str>>(
        &mut self,
        _nrow: usize,
        names: &[S],
        schema: &[ArrowTypeSystem],
        data_order: DataOrder,
    ) {
        // todo: support colmajor
        if !matches!(data_order, DataOrder::RowMajor) {
            throw!(crate::connectorx::errors::ConnectorXError::UnsupportedDataOrder(data_order))
        }

        // parse the metadata
        self.schema = schema.to_vec();
        self.names = names.iter().map(|n| n.as_ref().to_string()).collect();
        let fields = self
            .schema
            .iter()
            .zip_eq(&self.names)
            .map(|(&dt, h)| Ok(Realize::<FNewField>::realize(dt)?(h.as_str())))
            .collect::<Result<Vec<_>>>()?;
        self.arrow_schema = Arc::new(Schema::new(fields));
    }

    #[throws(ArrowDestinationError)]
    fn partition(&mut self, counts: usize) -> Vec<Self::Partition<'_>> {
        let mut partitions = vec![];
        for _ in 0..counts {
            partitions.push(ArrowPartitionWriter::new(
                self.schema.clone(),
                Arc::clone(&self.data),
                Arc::clone(&self.arrow_schema),
                self.batch_size,
            )?);
        }
        partitions
    }

    fn schema(&self) -> &[ArrowTypeSystem] {
        self.schema.as_slice()
    }
}

impl ArrowDestination {
    #[throws(ArrowDestinationError)]
    pub fn arrow(self) -> Vec<RecordBatch> {
        let lock = Arc::try_unwrap(self.data).map_err(|_| anyhow!("Partitions are not freed"))?;
        lock.into_inner()
            .map_err(|e| anyhow!("mutex poisoned {}", e))?
    }

    // #[cfg(feature = "dst_polars")]
    #[throws(ArrowDestinationError)]
    pub fn polars(self) -> DataFrame {
        // Convert to arrow first
        let rbs = self.arrow()?;

        // Ready LazyFrame vector for the chunks
        let mut lf_vec = vec![];

        for chunk in rbs.into_iter() {
            // Column vector
            let mut columns = Vec::with_capacity(chunk.num_columns());

            // Arrow stores data by columns, therefore need to be Zero-copied by column
            for (i, col) in chunk.columns().iter().enumerate() {
                // Convert to ArrayData (arrow-rs)
                let array = col.to_data();

                // Convert to ffi with arrow-rs
                let (out_array, out_schema) = to_ffi(&array).unwrap();

                // Import field from ffi with polars
                let field = unsafe {
                    import_field_from_c(transmute::<
                        &arrow::ffi::FFI_ArrowSchema,
                        &polars_arrow::ffi::ArrowSchema,
                    >(&out_schema))
                }?;

                // Import data from ffi with polars
                let data = unsafe {
                    import_array_from_c(
                        transmute::<arrow::ffi::FFI_ArrowArray, polars_arrow::ffi::ArrowArray>(
                            out_array,
                        ),
                        field.dtype().clone(),
                    )
                }?;

                // Create Polars series from arrow column
                columns.push(Series::from_arrow(
                    PlSmallStr::from(chunk.schema().field(i).name()),
                    data,
                )?);
            }

            // Create DataFrame from the columns
            lf_vec.push(DataFrame::from_iter(columns).lazy());
        }

        // Concat the chunks
        let union_args = UnionArgs::default();
        concat(lf_vec, union_args)?.collect()?
    }

    #[throws(ArrowDestinationError)]
    pub fn record_batch(&mut self) -> Option<RecordBatch> {
        let mut guard = self
            .data
            .lock()
            .map_err(|e| anyhow!("mutex poisoned {}", e))?;
        (*guard).pop()
    }

    pub fn empty_batch(&self) -> RecordBatch {
        RecordBatch::new_empty(self.arrow_schema.clone())
    }

    pub fn arrow_schema(&self) -> Arc<Schema> {
        self.arrow_schema.clone()
    }

    pub fn names(&self) -> &[String] {
        self.names.as_slice()
    }
}

pub struct ArrowPartitionWriter {
    schema: Vec<ArrowTypeSystem>,
    builders: Option<Builders>,
    current_row: usize,
    current_col: usize,
    data: Arc<Mutex<Vec<RecordBatch>>>,
    arrow_schema: Arc<Schema>,
    batch_size: usize,
}

// unsafe impl Sync for ArrowPartitionWriter {}

impl ArrowPartitionWriter {
    #[throws(ArrowDestinationError)]
    fn new(
        schema: Vec<ArrowTypeSystem>,
        data: Arc<Mutex<Vec<RecordBatch>>>,
        arrow_schema: Arc<Schema>,
        batch_size: usize,
    ) -> Self {
        let mut pw = ArrowPartitionWriter {
            schema,
            builders: None,
            current_row: 0,
            current_col: 0,
            data,
            arrow_schema,
            batch_size,
        };
        pw.allocate()?;
        pw
    }

    #[throws(ArrowDestinationError)]
    fn allocate(&mut self) {
        let builders = self
            .schema
            .iter()
            .map(|dt| Ok(Realize::<FNewBuilder>::realize(*dt)?(self.batch_size)))
            .collect::<Result<Vec<_>>>()?;
        self.builders.replace(builders);
    }

    #[throws(ArrowDestinationError)]
    fn flush(&mut self) {
        let builders = self
            .builders
            .take()
            .unwrap_or_else(|| panic!("arrow builder is none when flush!"));
        let columns = builders
            .into_iter()
            .zip(self.schema.iter())
            .map(|(builder, &dt)| Realize::<FFinishBuilder>::realize(dt)?(builder))
            .collect::<std::result::Result<Vec<_>, crate::connectorx::errors::ConnectorXError>>()?;
        let rb = RecordBatch::try_new(Arc::clone(&self.arrow_schema), columns)?;
        {
            let mut guard = self
                .data
                .lock()
                .map_err(|e| anyhow!("mutex poisoned {}", e))?;
            let inner_data = &mut *guard;
            inner_data.push(rb);
        }

        self.current_row = 0;
        self.current_col = 0;
    }
}

impl<'a> DestinationPartition<'a> for ArrowPartitionWriter {
    type TypeSystem = ArrowTypeSystem;
    type Error = ArrowDestinationError;

    #[throws(ArrowDestinationError)]
    fn finalize(&mut self) {
        if self.builders.is_some() {
            self.flush()?;
        }
    }

    #[throws(ArrowDestinationError)]
    fn aquire_row(&mut self, _n: usize) -> usize {
        self.current_row
    }

    fn ncols(&self) -> usize {
        self.schema.len()
    }
}

impl<'a, T> Consume<T> for ArrowPartitionWriter
where
    T: TypeAssoc<<Self as DestinationPartition<'a>>::TypeSystem> + ArrowAssoc + 'static,
{
    type Error = ArrowDestinationError;

    #[throws(ArrowDestinationError)]
    fn consume(&mut self, value: T) {
        let col = self.current_col;
        self.current_col = (self.current_col + 1) % self.ncols();
        self.schema[col].check::<T>()?;

        loop {
            match &mut self.builders {
                Some(builders) => {
                    <T as ArrowAssoc>::append(
                        builders[col]
                            .downcast_mut::<T::Builder>()
                            .ok_or_else(|| anyhow!("cannot cast arrow builder for append"))?,
                        value,
                    )?;
                    break;
                }
                None => self.allocate()?, // allocate if builders are not initialized
            }
        }

        // flush if exceed batch_size
        if self.current_col == 0 {
            self.current_row += 1;
            if self.current_row >= self.batch_size {
                self.flush()?;
                self.allocate()?;
            }
        }
    }
}