polars-stream 0.53.0

Private crate for the streaming execution engine for the Polars DataFrame library
Documentation
use arrow::array::{Array, StructArray};
use arrow::datatypes::{ArrowDataType, Field as ArrowField};
use polars_core::frame::DataFrame;
use polars_core::prelude::CompatLevel;
use polars_error::PolarsResult;

use crate::async_executor::{self, TaskPriority};
use crate::async_primitives::connector;
use crate::nodes::io_sinks::components::par_utils::rechunk_par;
use crate::nodes::io_sinks::components::sink_morsel::{SinkMorsel, SinkMorselPermit};

pub struct MorselSerializerPipeline {
    pub morsel_rx: connector::Receiver<SinkMorsel>,
    pub filled_serializer_tx: tokio::sync::mpsc::Sender<(
        async_executor::AbortOnDropHandle<PolarsResult<MorselSerializer>>,
        SinkMorselPermit,
    )>,
    pub reuse_serializer_rx: tokio::sync::mpsc::Receiver<MorselSerializer>,
    pub max_serializers: usize,
    pub base_allocation_size: usize,
}

impl MorselSerializerPipeline {
    pub async fn run(self) {
        let MorselSerializerPipeline {
            mut morsel_rx,
            filled_serializer_tx,
            mut reuse_serializer_rx,
            max_serializers,
            base_allocation_size,
        } = self;

        let mut num_created_serializers: usize = 0;

        while let Ok(morsel) = morsel_rx.recv().await {
            let morsel_serializer: MorselSerializer =
                if let Ok(serializer) = reuse_serializer_rx.try_recv() {
                    serializer
                } else if num_created_serializers < max_serializers {
                    num_created_serializers += 1;
                    MorselSerializer {
                        serialized_data: Vec::with_capacity(base_allocation_size),
                    }
                } else if let Some(serializer) = reuse_serializer_rx.recv().await {
                    serializer
                } else {
                    break;
                };

            let (df, morsel_permit) = morsel.into_inner();

            let handle = async_executor::AbortOnDropHandle::new(async_executor::spawn(
                TaskPriority::High,
                morsel_serializer.serialize_morsel(df),
            ));

            if filled_serializer_tx
                .send((handle, morsel_permit))
                .await
                .is_err()
            {
                break;
            }
        }
    }
}

pub struct MorselSerializer {
    pub serialized_data: Vec<u8>,
}

impl MorselSerializer {
    pub async fn serialize_morsel(mut self, mut df: DataFrame) -> PolarsResult<Self> {
        let MorselSerializer { serialized_data } = &mut self;

        rechunk_par(unsafe { df.columns_mut_retain_schema() }).await;

        serialized_data.clear();

        let height = df.height();

        let (arrays, fields): (Vec<Box<dyn Array>>, Vec<ArrowField>) = df
            .into_columns()
            .into_iter()
            .map(|c| {
                let name = c.name().clone();
                let arr = c.rechunk_to_arrow(CompatLevel::newest());

                let field = ArrowField::new(name, arr.dtype().clone(), true);

                (arr, field)
            })
            .unzip();

        let array = StructArray::new(ArrowDataType::Struct(fields), height, arrays, None);
        let serializer = polars_json::json::write::new_serializer(&array, 0, usize::MAX);

        serializer.serialize_json_lines_to_vec(serialized_data, height);

        Ok(self)
    }
}