vortex_array/pipeline/query/buffers.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Vector allocation strategy for pipelines
5
6use std::cell::RefCell;
7
8use vortex_error::{VortexExpect, VortexResult};
9
10use crate::pipeline::query::QueryPlan;
11use crate::pipeline::query::dag::DagNode;
12use crate::pipeline::types::VType;
13use crate::pipeline::vec::{Vector, VectorId};
14
15#[derive(Debug)]
16pub struct VectorAllocationPlan {
17 /// Where each node writes its output
18 pub(crate) output_targets: Vec<OutputTarget>,
19 /// The actual allocated vectors
20 pub(crate) vectors: Vec<RefCell<Vector>>,
21}
22
23// TODO(joe): support in-place view operations
24// Node mutates its input in-place (input node index, vector idx)
25// add variant InPlace.
26/// Tracks which vector a node outputs to
27#[derive(Debug, Clone)]
28pub(crate) enum OutputTarget {
29 /// Node writes to the top-level provided output
30 ExternalOutput,
31 /// Node writes to an allocated intermediate vector
32 IntermediateVector(usize), // vector idx
33}
34
35impl OutputTarget {
36 pub fn vector_id(&self) -> Option<VectorId> {
37 match self {
38 OutputTarget::IntermediateVector(idx) => Some(VectorId(*idx)),
39 OutputTarget::ExternalOutput => None,
40 }
41 }
42}
43
44/// Represents an allocated vector that can be reused
45#[derive(Debug, Clone)]
46struct VectorAllocation {
47 /// Type of elements in this vector
48 element_type: VType,
49}
50
51// ============================================================================
52// Improved Pipeline with vector allocation
53// ============================================================================
54
55impl<'a> QueryPlan<'a> {
56 /// Allocate vectors with lifetime analysis and zero-copy optimization
57 pub(crate) fn allocate_vectors(
58 dag: &[DagNode<'a>],
59 execution_order: &[usize],
60 ) -> VortexResult<VectorAllocationPlan> {
61 let mut output_targets: Vec<Option<OutputTarget>> = vec![None; dag.len()];
62 let mut allocations = Vec::new();
63
64 // Process nodes in reverse execution order (top-down for output propagation)
65 for &node_idx in execution_order.iter().rev() {
66 let node = &dag[node_idx];
67 let plan_node = node.plan_node;
68
69 // Determine output target
70 let output_target = if node.parents.is_empty() {
71 // Root node - always writes to external output
72 OutputTarget::ExternalOutput
73 } else {
74 // All intermediate nodes need intermediate vector allocation
75 // The previous pass-through optimization was buggy and incorrectly
76 // assigned ExternalOutput to intermediate nodes
77
78 // TODO(joe): Implement vector allocation reuse optimization here:
79 // 1. Identify when intermediate nodes can safely write to ExternalOutput
80 // 2. Check that ALL consumers of this node can handle external output
81 // 3. Verify no conflicts with parallel execution paths
82 // 4. Ensure proper vector lifetime management
83
84 let alloc_id = allocations.len();
85 allocations.push(VectorAllocation {
86 element_type: plan_node.vtype(),
87 });
88 OutputTarget::IntermediateVector(alloc_id)
89 };
90
91 output_targets[node_idx] = Some(output_target);
92 }
93
94 Ok(VectorAllocationPlan {
95 output_targets: output_targets
96 .into_iter()
97 .map(|target| target.vortex_expect("missing target"))
98 .collect(),
99 vectors: allocations
100 .into_iter()
101 .map(|alloc| RefCell::new(Vector::new_with_vtype(alloc.element_type)))
102 .collect(),
103 })
104 }
105}