mod bind;
pub mod buffers;
mod input;
mod toposort;
use std::any::Any;
use std::cell::RefCell;
use std::fmt::Formatter;
use std::hash::{BuildHasher, Hash, Hasher};
use std::iter;
use std::marker::PhantomData;
use std::sync::Arc;
use arrow_buffer::BooleanBuffer;
use async_trait::async_trait;
use futures::future::try_join_all;
use itertools::Itertools;
use termtree::Tree;
use vortex_buffer::{Alignment, BufferMut, ByteBuffer};
use vortex_dtype::{DType, NativePType, Nullability, match_each_native_ptype};
use vortex_error::{VortexExpect, VortexResult, vortex_bail};
use vortex_mask::AllOr;
use vortex_utils::aliases::hash_map::{HashMap, RandomState};
use crate::Canonical;
use crate::arrays::{BoolArray, PrimitiveArray};
use crate::operator::{
BatchBindCtx, BatchExecution, BatchExecutionRef, BatchOperator, DisplayFormat, LengthBounds,
Operator, OperatorEq, OperatorHash, OperatorId, OperatorKey, OperatorRef,
};
use crate::pipeline::bits::{BitVector, BitView, BitViewMut};
use crate::pipeline::operator::bind::bind_kernels;
use crate::pipeline::operator::buffers::{OutputTarget, allocate_vectors};
use crate::pipeline::operator::input::PipelineInputOperator;
use crate::pipeline::operator::toposort::topological_sort;
use crate::pipeline::vec::Vector;
use crate::pipeline::view::ViewMut;
use crate::pipeline::{BatchId, Element, Kernel, KernelContext, N, N_WORDS, RowSelection};
use crate::validity::Validity;
#[derive(Clone, Debug)]
pub(crate) struct PipelineOperator {
root: NodeId,
dag: Vec<PipelineNode>,
batch_inputs: Vec<OperatorRef>,
domain_inputs: Vec<BatchId>,
row_selection: RowSelection,
}
impl OperatorHash for PipelineOperator {
fn operator_hash<H: Hasher>(&self, state: &mut H) {
self.root.hash(state);
for node in &self.dag {
node.operator_hash(state);
}
for input in &self.batch_inputs {
input.operator_hash(state);
}
}
}
impl OperatorEq for PipelineOperator {
fn operator_eq(&self, other: &Self) -> bool {
if self.root != other.root || self.dag.len() != other.dag.len() {
return false;
}
for (node_a, node_b) in self.dag.iter().zip(other.dag.iter()) {
if !node_a.operator_eq(node_b) {
return false;
}
}
if self.batch_inputs.len() != other.batch_inputs.len() {
return false;
}
for (input_a, input_b) in self.batch_inputs.iter().zip(other.batch_inputs.iter()) {
if !input_a.operator_eq(input_b) {
return false;
}
}
true
}
}
type NodeId = usize;
#[derive(Debug, Clone)]
struct PipelineNode {
operator: OperatorRef,
children: Vec<NodeId>,
parents: Vec<NodeId>,
batch_inputs: Vec<BatchId>,
}
impl OperatorHash for PipelineNode {
fn operator_hash<H: Hasher>(&self, state: &mut H) {
self.operator.operator_hash(state);
self.children.hash(state);
self.batch_inputs.hash(state);
}
}
impl OperatorEq for PipelineNode {
fn operator_eq(&self, other: &Self) -> bool {
self.operator.operator_eq(&other.operator)
&& self.children == other.children
&& self.batch_inputs == other.batch_inputs
}
}
impl PipelineOperator {
pub fn new(operator: OperatorRef) -> Option<Self> {
let row_selection = operator.as_pipelined()?.row_selection();
fn visit_node(
node: OperatorRef,
row_selection: RowSelection,
dag: &mut Vec<PipelineNode>,
batch: &mut Vec<OperatorRef>,
domain_inputs: &mut Vec<BatchId>,
hash_to_id: &mut HashMap<u64, NodeId>,
random_state: &RandomState,
) -> NodeId {
let subtree_hash = random_state.hash_one(OperatorKey(node.clone()));
if let Some(&existing_index) = hash_to_id.get(&subtree_hash) {
return existing_index;
}
let mut child_indices: Vec<NodeId> = vec![];
let mut batch_indices: Vec<BatchId> = vec![];
let node_children = node.children();
let pipelined = node.as_pipelined().vortex_expect("must be pipelined");
for child_idx in pipelined.vector_children() {
let mut child_op = node_children[child_idx].clone();
let mut is_domain_input = false;
if child_op
.as_pipelined()
.is_none_or(|op| op.row_selection() != row_selection)
{
child_op = Arc::new(PipelineInputOperator::new(child_op));
is_domain_input = true;
}
let child_node_id = visit_node(
child_op,
row_selection.clone(),
dag,
batch,
domain_inputs,
hash_to_id,
random_state,
);
child_indices.push(child_node_id);
if is_domain_input {
let domain_batch = &dag[child_node_id].batch_inputs;
assert_eq!(domain_batch.len(), 1);
domain_inputs.push(domain_batch[0]);
}
}
for child_idx in pipelined.batch_children() {
let child = node_children[child_idx].clone();
let batch_id = batch.len();
batch.push(child);
batch_indices.push(batch_id);
}
let node_id: NodeId = dag.len();
let dag_node = PipelineNode {
operator: node,
children: child_indices,
parents: vec![], batch_inputs: batch_indices,
};
dag.push(dag_node);
hash_to_id.insert(subtree_hash, node_id);
node_id
}
let mut dag = vec![];
let mut batch = vec![];
let mut domain_inputs = vec![];
let mut hash_to_id: HashMap<u64, NodeId> = HashMap::new();
let random_state = RandomState::default();
let root_index = visit_node(
operator,
row_selection.clone(),
&mut dag,
&mut batch,
&mut domain_inputs,
&mut hash_to_id,
&random_state,
);
for i in 0..dag.len() {
let children = dag[i].children.clone();
for &child_idx in &children {
dag[child_idx].parents.push(i);
}
}
if let RowSelection::MaskOperator(mask_op) = &row_selection {
batch.push(mask_op.clone());
}
Some(PipelineOperator {
root: root_index,
dag,
batch_inputs: batch,
domain_inputs,
row_selection,
})
}
fn root_operator(&self) -> &OperatorRef {
&self.dag[self.root].operator
}
}
impl Operator for PipelineOperator {
fn id(&self) -> OperatorId {
OperatorId::from("vortex.operator")
}
fn as_any(&self) -> &dyn Any {
self
}
fn dtype(&self) -> &DType {
self.root_operator().dtype()
}
fn bounds(&self) -> LengthBounds {
self.root_operator().bounds()
}
fn children(&self) -> &[OperatorRef] {
&self.batch_inputs
}
fn fmt_as(&self, _df: DisplayFormat, f: &mut Formatter) -> std::fmt::Result {
writeln!(f, "PipelineOperator")?;
write!(f, "{}", self.root_operator().display_tree(),)
}
fn fmt_all(&self) -> String {
let node_name = "PipelineOperator".to_string();
let child_trees: Vec<_> = iter::once(self.root_operator().fmt_all())
.chain(self.children().iter().map(|child| child.fmt_all()))
.collect();
Tree::new(node_name)
.with_leaves(child_trees)
.with_multiline(true)
.to_string()
}
fn with_children(self: Arc<Self>, children: Vec<OperatorRef>) -> VortexResult<OperatorRef> {
let mut this = self.as_ref().clone();
this.batch_inputs = children;
Ok(Arc::new(this))
}
fn as_batch(&self) -> Option<&dyn BatchOperator> {
Some(self)
}
}
impl BatchOperator for PipelineOperator {
fn bind(&self, ctx: &mut dyn BatchBindCtx) -> VortexResult<BatchExecutionRef> {
let exec_order = topological_sort(&self.dag)?;
let allocation_plan = allocate_vectors(&self.dag, &exec_order)?;
let kernels = bind_kernels(&self.dag, &allocation_plan)?;
let batch_inputs: Vec<_> = (0..self.batch_inputs.len())
.map(|i| ctx.child(i))
.try_collect()?;
let vectors = allocation_plan.vectors;
let pipeline = Pipeline {
kernels,
exec_order,
output_targets: allocation_plan.output_targets,
};
let row_selection = match &self.row_selection {
RowSelection::Domain(len) => {
RowSelectionSource::LeafNode { len: *len }
}
RowSelection::All => {
RowSelectionSource::BatchInputs(self.domain_inputs.clone())
}
RowSelection::MaskOperator(_) => {
RowSelectionSource::Mask
}
};
match self.dtype() {
DType::Bool(Nullability::NonNullable) => {
Ok(Box::new(PipelineExecution::<BoolOutput>::new(
row_selection,
batch_inputs,
vectors,
pipeline,
)))
}
DType::Primitive(ptype, Nullability::NonNullable) => {
match_each_native_ptype!(ptype, |T| {
Ok(Box::new(PipelineExecution::<PrimitiveOutput<T>>::new(
row_selection,
batch_inputs,
vectors,
pipeline,
)))
})
}
_ => vortex_bail!(
"PipelineOperator currently only supports non-nullable bool or primitive output types {}",
self.dtype()
),
}
}
}
enum RowSelectionSource {
BatchInputs(Vec<BatchId>),
LeafNode { len: usize },
Mask,
}
trait PipelineOutput: Send {
type Element: Element;
fn allocate(capacity: usize) -> Self;
fn view_mut(&mut self, offset: usize) -> ViewMut<'_>;
fn into_canonical(self, len: usize) -> VortexResult<Canonical>;
}
struct BoolOutput {
buffer: BufferMut<bool>,
}
impl PipelineOutput for BoolOutput {
type Element = bool;
fn allocate(capacity: usize) -> Self {
let mut buffer = BufferMut::with_capacity(capacity);
unsafe { buffer.set_len(capacity) };
BoolOutput { buffer }
}
fn view_mut(&mut self, offset: usize) -> ViewMut<'_> {
ViewMut::new(&mut self.buffer[offset..][..N], None)
}
fn into_canonical(mut self, len: usize) -> VortexResult<Canonical> {
unsafe { self.buffer.set_len(len) };
let buffer = ByteBuffer::from_arrow_buffer(
BooleanBuffer::from(self.buffer.as_ref()).into_inner(),
Alignment::of::<u64>(),
);
Ok(Canonical::Bool(BoolArray::try_new(
buffer,
0,
len,
Validity::NonNullable,
)?))
}
}
struct PrimitiveOutput<T> {
buffer: BufferMut<T>,
}
impl<T: NativePType + Element> PipelineOutput for PrimitiveOutput<T> {
type Element = T;
fn allocate(capacity: usize) -> Self {
let mut buffer = BufferMut::with_capacity(capacity);
unsafe { buffer.set_len(capacity) };
PrimitiveOutput { buffer }
}
fn view_mut(&mut self, offset: usize) -> ViewMut<'_> {
ViewMut::new(&mut self.buffer[offset..][..N], None)
}
fn into_canonical(mut self, len: usize) -> VortexResult<Canonical> {
unsafe { self.buffer.set_len(len) };
Ok(Canonical::Primitive(PrimitiveArray::new(
self.buffer.freeze(),
Validity::NonNullable,
)))
}
}
struct PipelineExecution<O> {
row_selection: RowSelectionSource,
children: Vec<BatchExecutionRef>,
vectors: Vec<RefCell<Vector>>,
pipeline: Pipeline,
_element: PhantomData<O>,
}
impl<O> PipelineExecution<O> {
fn new(
row_selection: RowSelectionSource,
batch_inputs: Vec<BatchExecutionRef>,
vectors: Vec<RefCell<Vector>>,
pipeline: Pipeline,
) -> Self {
PipelineExecution {
row_selection,
children: batch_inputs,
vectors,
pipeline,
_element: PhantomData,
}
}
}
#[async_trait]
impl<O: PipelineOutput> BatchExecution for PipelineExecution<O> {
async fn execute(mut self: Box<Self>) -> VortexResult<Canonical> {
let mut children =
try_join_all(self.children.into_iter().map(|exec| exec.execute())).await?;
let mut mask: Option<BooleanBuffer> = None;
let len = match &self.row_selection {
RowSelectionSource::BatchInputs(batch_ids) => {
match batch_ids
.iter()
.map(|id| children[*id].as_ref().len())
.all_equal_value()
{
Ok(len) => len,
Err(lens) => {
vortex_bail!(
"Mismatched input lengths for pipeline domain inputs: {:?}",
lens
);
}
}
}
RowSelectionSource::LeafNode { len } => *len,
RowSelectionSource::Mask => {
let selection_mask = children
.pop()
.vortex_expect("mask batch input missing")
.as_ref()
.try_to_mask_fill_null_false()?;
match selection_mask.boolean_buffer() {
AllOr::All => selection_mask.len(),
AllOr::None => {
0
}
AllOr::Some(buffer) => {
mask = Some(buffer.clone());
selection_mask.true_count()
}
}
}
};
let ctx = KernelContext {
vectors: self.vectors,
batch_inputs: children,
};
let capacity = len.next_multiple_of(N) + N;
let mut output = O::allocate(capacity);
match mask {
None => {
let nchunks = len.div_ceil(N);
let mut position = 0;
for chunk_idx in 0..nchunks {
let mask_len = (len - position).min(N);
let selection_vec = (mask_len < N).then(|| BitVector::true_until(mask_len));
let selection = selection_vec.as_ref().unwrap_or_else(|| BitVector::full());
let mut elements_view = output.view_mut(position);
self.pipeline.step(
&ctx,
chunk_idx,
&selection.as_view(),
&mut elements_view,
)?;
elements_view.flatten::<O::Element>(&selection.as_view());
position += selection.true_count();
}
assert_eq!(position, len);
}
Some(mask) => {
let mut mask_iter = mask.bit_chunks().iter_padded();
let mut selection_words = [0usize; N_WORDS];
let mut selection_view_mut = BitViewMut::new(&mut selection_words);
let mut position = 0;
let mut chunk_idx = 0;
while position < len {
selection_view_mut.clear();
selection_view_mut.fill_with_words(&mut mask_iter);
let mut elements_view = output.view_mut(position);
self.pipeline.step(
&ctx,
chunk_idx,
&selection_view_mut.as_view(),
&mut elements_view,
)?;
chunk_idx += 1;
elements_view.flatten::<O::Element>(&selection_view_mut.as_view());
position += selection_view_mut.true_count();
}
assert_eq!(position, len);
}
}
output.into_canonical(len)
}
}
struct Pipeline {
kernels: Vec<Box<dyn Kernel>>,
exec_order: Vec<NodeId>,
output_targets: Vec<OutputTarget>,
}
impl Kernel for Pipeline {
fn step(
&self,
ctx: &KernelContext,
chunk_idx: usize,
selection: &BitView,
out: &mut ViewMut,
) -> VortexResult<()> {
for &node_idx in self.exec_order.iter() {
let kernel = &self.kernels[node_idx];
match &self.output_targets[node_idx] {
OutputTarget::ExternalOutput => kernel.step(ctx, chunk_idx, selection, out)?,
OutputTarget::IntermediateVector(vector_idx) => {
let mut vector_ref = ctx.vectors[*vector_idx].borrow_mut();
let selection = {
let mut view = vector_ref.as_view_mut();
kernel.step(ctx, chunk_idx, selection, &mut view)?;
view.selection
};
vector_ref.set_selection(selection);
}
};
}
Ok(())
}
}