polars-stream 0.53.0

Private crate for the streaming execution engine for the Polars DataFrame library
Documentation
use std::sync::Arc;

use polars_core::POOL;
use polars_core::prelude::IntoColumn;
use rayon::iter::IntoParallelRefIterator;
use rayon::prelude::*;

use super::compute_node_prelude::*;
use crate::expression::StreamExpr;
use crate::nodes::in_memory_source::InMemorySourceNode;

pub enum InputIndependentSelectNode {
    ToSelect { selectors: Vec<StreamExpr> },
    Source(InMemorySourceNode),
    Done,
}

impl InputIndependentSelectNode {
    pub fn new(selectors: Vec<StreamExpr>) -> Self {
        Self::ToSelect { selectors }
    }
}

impl ComputeNode for InputIndependentSelectNode {
    fn name(&self) -> &str {
        "input-independent-select"
    }

    fn update_state(
        &mut self,
        recv: &mut [PortState],
        send: &mut [PortState],
        state: &StreamingExecutionState,
    ) -> PolarsResult<()> {
        assert!(recv.is_empty() && send.len() == 1);
        if send[0] == PortState::Done {
            *self = Self::Done;
            return Ok(());
        }

        POOL.install(|| {
            if let Self::ToSelect { selectors } = self {
                let empty_df = DataFrame::empty();
                let state = ExecutionState::new();
                let selected: Vec<_> = selectors
                    .par_iter()
                    .map(|selector| {
                        let s = selector.evaluate_blocking(&empty_df, &state)?;
                        PolarsResult::Ok(s.into_column())
                    })
                    .collect::<PolarsResult<_>>()?;
                let ret = unsafe { DataFrame::new_unchecked_infer_broadcast(selected)? };
                let src_node = InMemorySourceNode::new(Arc::new(ret), MorselSeq::default());
                *self = InputIndependentSelectNode::Source(src_node);
            }
            PolarsResult::Ok(())
        })?;

        match self {
            Self::ToSelect { .. } => unreachable!(),
            Self::Source(src) => src.update_state(recv, send, state),
            Self::Done => {
                send[0] = PortState::Done;
                Ok(())
            },
        }
    }

    fn spawn<'env, 's>(
        &'env mut self,
        scope: &'s TaskScope<'s, 'env>,
        recv_ports: &mut [Option<RecvPort<'_>>],
        send_ports: &mut [Option<SendPort<'_>>],
        state: &'s StreamingExecutionState,
        join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
    ) {
        assert!(recv_ports.is_empty() && send_ports.len() == 1);
        let Self::Source(src) = self else {
            unreachable!()
        };
        src.spawn(scope, recv_ports, send_ports, state, join_handles);
    }
}