polars-stream 0.53.0

Private crate for the streaming execution engine for the Polars DataFrame library
Documentation
pub mod callback_sink;
#[cfg(feature = "cum_agg")]
pub mod cum_agg;
#[cfg(feature = "dynamic_group_by")]
pub mod dynamic_group_by;
pub mod dynamic_slice;
#[cfg(feature = "ewma")]
pub mod ewm;
pub mod filter;
pub mod gather_every;
pub mod group_by;
pub mod in_memory_map;
pub mod in_memory_sink;
pub mod in_memory_source;
pub mod input_independent_select;
pub mod io_sinks;
pub mod io_sources;
pub mod joins;
pub mod map;
#[cfg(feature = "merge_sorted")]
pub mod merge_sorted;
pub mod multiplexer;
pub mod negative_slice;
pub mod ordered_union;
pub mod peak_minmax;
pub mod reduce;
pub mod repeat;
pub mod rle;
pub mod rle_id;
#[cfg(feature = "dynamic_group_by")]
pub mod rolling_group_by;
pub mod select;
pub mod shift;
pub mod simple_projection;
pub mod sorted_group_by;
pub mod streaming_slice;
pub mod top_k;
pub mod unordered_union;
pub mod with_row_index;
pub mod zip;

/// The imports you'll always need for implementing a ComputeNode.
mod compute_node_prelude {
    pub use polars_core::frame::DataFrame;
    pub use polars_error::PolarsResult;
    pub use polars_expr::state::ExecutionState;

    pub use super::ComputeNode;
    pub use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};
    pub use crate::execute::StreamingExecutionState;
    pub use crate::graph::PortState;
    pub use crate::morsel::{Morsel, MorselSeq};
    pub use crate::pipe::{PortReceiver, PortSender, RecvPort, SendPort};
}

use compute_node_prelude::*;

use crate::execute::StreamingExecutionState;
use crate::metrics::MetricsBuilder;

pub trait ComputeNode: Send {
    /// The name of this node.
    fn name(&self) -> &str;

    /// Update the state of this node given the state of our input and output
    /// ports. May be called multiple times until fully resolved for each
    /// execution phase.
    ///
    /// For each input pipe `recv` will contain a respective state of the
    /// send port that pipe is connected to when called, and it is expected when
    /// `update_state` returns it contains your computed receive port state.
    ///
    /// Similarly, for each output pipe `send` will contain the respective
    /// state of the input port that pipe is connected to when called, and you
    /// must update it to contain the desired state of your output port.
    fn update_state(
        &mut self,
        recv: &mut [PortState],
        send: &mut [PortState],
        state: &StreamingExecutionState,
    ) -> PolarsResult<()>;

    /// If this node (in its current state) is a pipeline blocker, and whether
    /// this is memory intensive or not.
    fn is_memory_intensive_pipeline_blocker(&self) -> bool {
        false
    }

    /// Spawn the tasks that this compute node needs to receive input(s),
    /// process it and send to its output(s). Called once per execution phase.
    fn spawn<'env, 's>(
        &'env mut self,
        scope: &'s TaskScope<'s, 'env>,
        recv_ports: &mut [Option<RecvPort<'_>>],
        send_ports: &mut [Option<SendPort<'_>>],
        state: &'s StreamingExecutionState,
        join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
    );

    fn set_metrics_builder(&mut self, _metrics_builder: MetricsBuilder) {}

    /// Called once after the last execution phase to extract output from
    /// in-memory nodes.
    fn get_output(&mut self) -> PolarsResult<Option<DataFrame>> {
        Ok(None)
    }
}