polars-core 0.53.0

Core of the Polars DataFrame library
Documentation
use std::io::{Read, Seek};
use std::sync::Arc;

use arrow::datatypes::Metadata;
use arrow::io::ipc::read::{StreamReader, StreamState, read_stream_metadata};
use arrow::io::ipc::write::WriteOptions;
use polars_error::{PolarsResult, polars_err, to_compute_err};
use polars_utils::format_pl_smallstr;
use polars_utils::pl_serialize::deserialize_map_bytes;
use polars_utils::pl_str::PlSmallStr;
use serde::de::Error;
use serde::*;

use crate::chunked_array::flags::StatisticsFlags;
use crate::config;
use crate::frame::chunk_df_for_writing;
use crate::prelude::{CompatLevel, DataFrame, SchemaExt};
use crate::schema::Schema;
use crate::utils::accumulate_dataframes_vertical_unchecked;

const FLAGS_KEY: PlSmallStr = PlSmallStr::from_static("_PL_FLAGS");

impl DataFrame {
    pub fn serialize_into_writer(&mut self, writer: &mut dyn std::io::Write) -> PolarsResult<()> {
        let schema = self.schema();

        if schema.iter_values().any(|x| x.is_object()) {
            return Err(polars_err!(
                ComputeError:
                "serializing data of type Object is not supported",
            ));
        }

        let mut ipc_writer =
            arrow::io::ipc::write::StreamWriter::new(writer, WriteOptions { compression: None });

        ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from_iter(
            self.columns().iter().map(|c| {
                (
                    format_pl_smallstr!("{}{}", FLAGS_KEY, c.name()),
                    PlSmallStr::from(c.get_flags().bits().to_string()),
                )
            }),
        )));

        ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from([(
            FLAGS_KEY,
            serde_json::to_string(
                &self
                    .columns()
                    .iter()
                    .map(|s| s.get_flags().bits())
                    .collect::<Vec<u32>>(),
            )
            .map_err(to_compute_err)?
            .into(),
        )])));

        ipc_writer.start(&schema.to_arrow(CompatLevel::newest()), None)?;

        for batch in chunk_df_for_writing(self, 512 * 512)?.iter_chunks(CompatLevel::newest(), true)
        {
            ipc_writer.write(&batch, None)?;
        }

        ipc_writer.finish()?;

        Ok(())
    }

    pub fn serialize_to_bytes(&mut self) -> PolarsResult<Vec<u8>> {
        let mut buf = vec![];
        self.serialize_into_writer(&mut buf)?;

        Ok(buf)
    }

    pub fn deserialize_from_reader<T: Read + Seek>(reader: &mut T) -> PolarsResult<Self> {
        let mut md = read_stream_metadata(reader)?;
        let pl_schema = Schema::from_arrow_schema(&md.schema);

        let custom_metadata = md.custom_schema_metadata.take();

        let reader = StreamReader::new(reader, md, None);
        let dfs = reader
            .into_iter()
            .map_while(|batch| match batch {
                Ok(StreamState::Some(batch)) => Some(Ok(DataFrame::from(batch))),
                Ok(StreamState::Waiting) => None,
                Err(e) => Some(Err(e)),
            })
            .collect::<PolarsResult<Vec<DataFrame>>>()?;

        if dfs.is_empty() {
            return Ok(DataFrame::empty_with_schema(&pl_schema));
        }

        let mut df = accumulate_dataframes_vertical_unchecked(dfs);

        // Set custom metadata (fallible)
        (|| {
            let custom_metadata = custom_metadata?;
            let flags = custom_metadata.get(&FLAGS_KEY)?;

            let flags: PolarsResult<Vec<u32>> = serde_json::from_str(flags).map_err(to_compute_err);

            let verbose = config::verbose();

            if let Err(e) = &flags {
                if verbose {
                    eprintln!("DataFrame::read_ipc: Error parsing metadata flags: {e}");
                }
            }

            let flags = flags.ok()?;

            if flags.len() != df.width() {
                if verbose {
                    eprintln!(
                        "DataFrame::read_ipc: Metadata flags width mismatch: {} != {}",
                        flags.len(),
                        df.width()
                    );
                }

                return None;
            }

            let mut n_set = 0;

            for (c, v) in unsafe { df.columns_mut_retain_schema() }
                .iter_mut()
                .zip(flags)
            {
                if let Some(flags) = StatisticsFlags::from_bits(v) {
                    n_set += c.set_flags(flags) as usize;
                }
            }

            if verbose {
                eprintln!(
                    "DataFrame::read_ipc: Loaded metadata for {} / {} columns",
                    n_set,
                    df.width()
                );
            }

            Some(())
        })();

        Ok(df)
    }
}

impl Serialize for DataFrame {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        use serde::ser::Error;

        let mut bytes = vec![];
        self.clone()
            .serialize_into_writer(&mut bytes)
            .map_err(S::Error::custom)?;

        serializer.serialize_bytes(bytes.as_slice())
    }
}

impl<'de> Deserialize<'de> for DataFrame {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: Deserializer<'de>,
    {
        deserialize_map_bytes(deserializer, |b| {
            let v = &mut b.as_ref();
            let mut reader = std::io::Cursor::new(v);
            Self::deserialize_from_reader(&mut reader)
        })?
        .map_err(D::Error::custom)
    }
}

#[cfg(feature = "dsl-schema")]
impl schemars::JsonSchema for DataFrame {
    fn schema_name() -> std::borrow::Cow<'static, str> {
        "DataFrame".into()
    }

    fn schema_id() -> std::borrow::Cow<'static, str> {
        std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "DataFrame"))
    }

    fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
        Vec::<u8>::json_schema(generator)
    }
}