vortex_array/pipeline/query/
buffers.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Vector allocation strategy for pipelines
5
6use std::cell::RefCell;
7
8use vortex_error::{VortexExpect, VortexResult};
9
10use crate::pipeline::query::QueryPlan;
11use crate::pipeline::query::dag::DagNode;
12use crate::pipeline::types::VType;
13use crate::pipeline::vec::{Vector, VectorId};
14
15#[derive(Debug)]
16pub struct VectorAllocationPlan {
17    /// Where each node writes its output
18    pub(crate) output_targets: Vec<OutputTarget>,
19    /// The actual allocated vectors
20    pub(crate) vectors: Vec<RefCell<Vector>>,
21}
22
23// TODO(joe): support in-place view operations
24// Node mutates its input in-place (input node index, vector idx)
25// add variant InPlace.
26/// Tracks which vector a node outputs to
27#[derive(Debug, Clone)]
28pub(crate) enum OutputTarget {
29    /// Node writes to the top-level provided output
30    ExternalOutput,
31    /// Node writes to an allocated intermediate vector
32    IntermediateVector(usize), // vector idx
33}
34
35impl OutputTarget {
36    pub fn vector_id(&self) -> Option<VectorId> {
37        match self {
38            OutputTarget::IntermediateVector(idx) => Some(VectorId(*idx)),
39            OutputTarget::ExternalOutput => None,
40        }
41    }
42}
43
44/// Represents an allocated vector that can be reused
45#[derive(Debug, Clone)]
46struct VectorAllocation {
47    /// Type of elements in this vector
48    element_type: VType,
49}
50
51// ============================================================================
52// Improved Pipeline with vector allocation
53// ============================================================================
54
55impl<'a> QueryPlan<'a> {
56    /// Allocate vectors with lifetime analysis and zero-copy optimization
57    pub(crate) fn allocate_vectors(
58        dag: &[DagNode<'a>],
59        execution_order: &[usize],
60    ) -> VortexResult<VectorAllocationPlan> {
61        let mut output_targets: Vec<Option<OutputTarget>> = vec![None; dag.len()];
62        let mut allocations = Vec::new();
63
64        // Process nodes in reverse execution order (top-down for output propagation)
65        for &node_idx in execution_order.iter().rev() {
66            let node = &dag[node_idx];
67            let plan_node = node.plan_node;
68
69            // Determine output target
70            let output_target = if node.parents.is_empty() {
71                // Root node - always writes to external output
72                OutputTarget::ExternalOutput
73            } else {
74                // All intermediate nodes need intermediate vector allocation
75                // The previous pass-through optimization was buggy and incorrectly
76                // assigned ExternalOutput to intermediate nodes
77
78                // TODO(joe): Implement vector allocation reuse optimization here:
79                // 1. Identify when intermediate nodes can safely write to ExternalOutput
80                // 2. Check that ALL consumers of this node can handle external output
81                // 3. Verify no conflicts with parallel execution paths
82                // 4. Ensure proper vector lifetime management
83
84                let alloc_id = allocations.len();
85                allocations.push(VectorAllocation {
86                    element_type: plan_node.vtype(),
87                });
88                OutputTarget::IntermediateVector(alloc_id)
89            };
90
91            output_targets[node_idx] = Some(output_target);
92        }
93
94        Ok(VectorAllocationPlan {
95            output_targets: output_targets
96                .into_iter()
97                .map(|target| target.vortex_expect("missing target"))
98                .collect(),
99            vectors: allocations
100                .into_iter()
101                .map(|alloc| RefCell::new(Vector::new_with_vtype(alloc.element_type)))
102                .collect(),
103        })
104    }
105}