vortex_array/pipeline/driver/allocation.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Vector allocation strategy for pipelines
5
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_vector::VectorMut;
8
9use crate::Array;
10use crate::pipeline::driver::{Node, NodeId};
11use crate::pipeline::{N, VectorId};
12
13#[derive(Debug)]
14pub struct VectorAllocation {
15 /// Where each node writes its output
16 pub(crate) output_targets: Vec<OutputTarget>,
17 /// The actual allocated vectors
18 pub(crate) vectors: Vec<VectorMut>,
19}
20
21// TODO(joe): support in-place view operations
22// Node mutates its input in-place (input node index, vector idx)
23// add variant InPlace.
24/// Tracks which vector a node outputs to
25#[derive(Debug, Clone)]
26pub(crate) enum OutputTarget {
27 /// Node writes to the top-level provided output
28 ExternalOutput,
29 /// Node writes to an allocated intermediate vector
30 IntermediateVector(VectorId), // vector idx
31}
32
33impl OutputTarget {
34 pub fn vector_id(&self) -> Option<VectorId> {
35 match self {
36 OutputTarget::IntermediateVector(vector_id) => Some(*vector_id),
37 OutputTarget::ExternalOutput => None,
38 }
39 }
40}
41
42// ============================================================================
43// Improved Pipeline with vector allocation
44// ============================================================================
45
46/// Allocate vectors with lifetime analysis and zero-copy optimization
47pub(super) fn allocate_vectors(
48 dag: &[Node],
49 execution_order: &[NodeId],
50) -> VortexResult<VectorAllocation> {
51 let mut output_targets: Vec<Option<OutputTarget>> = vec![None; dag.len()];
52 let mut allocation_types = Vec::new();
53
54 // Process nodes in reverse execution order (top-down for output propagation)
55 for &node_idx in execution_order.iter().rev() {
56 let node = &dag[node_idx];
57 let array = &node.array;
58
59 // Determine output target
60 let output_target = if node.parents.is_empty() {
61 // Root node - always writes to external output
62 OutputTarget::ExternalOutput
63 } else {
64 // All intermediate nodes need intermediate vector allocation
65 // The previous pass-through optimization was buggy and incorrectly
66 // assigned ExternalOutput to intermediate nodes
67
68 // TODO(joe): Implement vector allocation reuse optimization here:
69 // 1. Identify when intermediate nodes can safely write to ExternalOutput
70 // 2. Check that ALL consumers of this node can handle external output
71 // 3. Verify no conflicts with parallel execution paths
72 // 4. Ensure proper vector lifetime management
73
74 let vector_id = VectorId::new(allocation_types.len());
75 allocation_types.push(array.dtype());
76 OutputTarget::IntermediateVector(vector_id)
77 };
78
79 output_targets[node_idx] = Some(output_target);
80 }
81
82 Ok(VectorAllocation {
83 output_targets: output_targets
84 .into_iter()
85 .map(|target| target.vortex_expect("missing target"))
86 .collect(),
87 vectors: allocation_types
88 .into_iter()
89 .map(|dtype| VectorMut::with_capacity(dtype, N))
90 .collect(),
91 })
92}