polars-stream 0.53.0

Private crate for the streaming execution engine for the Polars DataFrame library
Documentation
use std::num::NonZeroUsize;
use std::sync::Arc;

use polars_error::PolarsResult;
use polars_io::pl_async;
use polars_io::utils::sync_on_close::SyncOnCloseType;
use polars_plan::dsl::file_provider::FileProviderArgs;

use crate::async_executor;
use crate::async_primitives::connector;
use crate::nodes::TaskPriority;
use crate::nodes::io_sinks::components::file_provider::FileProvider;
use crate::nodes::io_sinks::components::file_sink::{FileSinkPermit, FileSinkTaskData};
use crate::nodes::io_sinks::components::size::RowCountAndSize;
use crate::nodes::io_sinks::writers::interface::{FileOpenTaskHandle, FileWriterStarter};
use crate::utils::tokio_handle_ext;

#[derive(Clone)]
pub struct PartitionSinkStarter {
    pub file_provider: Arc<FileProvider>,
    pub writer_starter: Arc<dyn FileWriterStarter>,
    pub sync_on_close: SyncOnCloseType,
    pub num_pipelines_per_sink: NonZeroUsize,
}

impl PartitionSinkStarter {
    pub fn start_sink(
        &self,
        file_provider_args: FileProviderArgs,
        start_position: RowCountAndSize,
        file_permit: FileSinkPermit,
    ) -> PolarsResult<FileSinkTaskData> {
        let file_provider = Arc::clone(&self.file_provider);
        let file_open_task = tokio_handle_ext::AbortOnDropHandle(
            pl_async::get_runtime()
                .spawn(async move { file_provider.open_file(file_provider_args).await }),
        );

        let (morsel_tx, morsel_rx) = connector::connector();

        let writer_handle = self.writer_starter.start_file_writer(
            morsel_rx,
            FileOpenTaskHandle::new(file_open_task, self.sync_on_close),
            self.num_pipelines_per_sink,
        )?;

        let task_handle = async_executor::spawn(TaskPriority::High, async move {
            writer_handle.await?;
            Ok(file_permit)
        });

        Ok(FileSinkTaskData {
            morsel_tx,
            start_position,
            task_handle,
        })
    }
}