polars-io 0.54.1

IO related logic for the Polars DataFrame library
Documentation
use std::io::{Read, Write};
use std::sync::Arc;

use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;

use crate::options::RowIndex;
#[cfg(any(feature = "ipc", feature = "avro", feature = "ipc_streaming",))]
use crate::predicates::PhysicalIoExpr;

pub trait SerReader<R>
where
    R: Read,
{
    /// Create a new instance of the [`SerReader`]
    fn new(reader: R) -> Self;

    /// Make sure that all columns are contiguous in memory by
    /// aggregating the chunks into a single array.
    #[must_use]
    fn set_rechunk(self, _rechunk: bool) -> Self
    where
        Self: Sized,
    {
        self
    }

    /// Take the SerReader and return a parsed DataFrame.
    fn finish(self) -> PolarsResult<DataFrame>;
}

pub trait SerWriter<W>
where
    W: Write,
{
    fn new(writer: W) -> Self
    where
        Self: Sized;
    fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()>;
}

pub trait ArrowReader {
    fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>>;
}

#[cfg(any(feature = "ipc", feature = "avro", feature = "ipc_streaming",))]
pub(crate) fn finish_reader<R: ArrowReader>(
    mut reader: R,
    rechunk: bool,
    n_rows: Option<usize>,
    predicate: Option<Arc<dyn PhysicalIoExpr>>,
    arrow_schema: &ArrowSchema,
    row_index: Option<RowIndex>,
) -> PolarsResult<DataFrame> {
    use polars_core::utils::accumulate_dataframes_vertical_unchecked;

    let mut num_rows = 0;
    let mut parsed_dfs = Vec::with_capacity(1024);

    while let Some(batch) = reader.next_record_batch()? {
        let current_num_rows = num_rows as IdxSize;
        num_rows += batch.len();
        let mut df = DataFrame::from(batch);

        if let Some(rc) = &row_index {
            unsafe { df.with_row_index_mut(rc.name.clone(), Some(current_num_rows + rc.offset)) };
        }

        if let Some(predicate) = &predicate {
            let s = predicate.evaluate_io(&df)?;
            let mask = s.bool().expect("filter predicates was not of type boolean");
            df = df.filter(mask)?;
        }

        if let Some(n) = n_rows {
            if num_rows >= n {
                let len = n - parsed_dfs
                    .iter()
                    .map(|df: &DataFrame| df.height())
                    .sum::<usize>();
                if polars_core::config::verbose() {
                    eprintln!(
                        "sliced off {} rows of the 'DataFrame'. These lines were read because they were in a single chunk.",
                        df.height().saturating_sub(n)
                    )
                }
                parsed_dfs.push(df.slice(0, len));
                break;
            }
        }
        parsed_dfs.push(df);
    }

    let mut df = {
        if parsed_dfs.is_empty() {
            DataFrame::empty_with_schema(&Schema::from_arrow_schema(arrow_schema))
        } else {
            // If there are any rows, accumulate them into a df
            accumulate_dataframes_vertical_unchecked(parsed_dfs)
        }
    };

    if rechunk {
        df.rechunk_mut_par();
    }
    Ok(df)
}

pub fn schema_to_arrow_checked(
    schema: &Schema,
    compat_level: CompatLevel,
    _file_name: &str,
) -> PolarsResult<ArrowSchema> {
    schema
        .iter_fields()
        .map(|field| {
            #[cfg(feature = "object")]
            {
                polars_ensure!(
                    !matches!(field.dtype(), DataType::Object(_)),
                    ComputeError: "cannot write 'Object' datatype to {}",
                    _file_name
                );
            }

            let field = field
                .dtype()
                .to_arrow_field(field.name().clone(), compat_level);
            Ok((field.name.clone(), field))
        })
        .collect::<PolarsResult<ArrowSchema>>()
}