vortex_array/pipeline/driver/
allocation.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Vector allocation strategy for pipelines
5
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_vector::VectorMut;
8
9use crate::Array;
10use crate::pipeline::driver::{Node, NodeId};
11use crate::pipeline::{N, VectorId};
12
13#[derive(Debug)]
14pub struct VectorAllocation {
15    /// Where each node writes its output
16    pub(crate) output_targets: Vec<OutputTarget>,
17    /// The actual allocated vectors
18    pub(crate) vectors: Vec<VectorMut>,
19}
20
21// TODO(joe): support in-place view operations
22// Node mutates its input in-place (input node index, vector idx)
23// add variant InPlace.
24/// Tracks which vector a node outputs to
25#[derive(Debug, Clone)]
26pub(crate) enum OutputTarget {
27    /// Node writes to the top-level provided output
28    ExternalOutput,
29    /// Node writes to an allocated intermediate vector
30    IntermediateVector(VectorId), // vector idx
31}
32
33impl OutputTarget {
34    pub fn vector_id(&self) -> Option<VectorId> {
35        match self {
36            OutputTarget::IntermediateVector(vector_id) => Some(*vector_id),
37            OutputTarget::ExternalOutput => None,
38        }
39    }
40}
41
42// ============================================================================
43// Improved Pipeline with vector allocation
44// ============================================================================
45
46/// Allocate vectors with lifetime analysis and zero-copy optimization
47pub(super) fn allocate_vectors(
48    dag: &[Node],
49    execution_order: &[NodeId],
50) -> VortexResult<VectorAllocation> {
51    let mut output_targets: Vec<Option<OutputTarget>> = vec![None; dag.len()];
52    let mut allocation_types = Vec::new();
53
54    // Process nodes in reverse execution order (top-down for output propagation)
55    for &node_idx in execution_order.iter().rev() {
56        let node = &dag[node_idx];
57        let array = &node.array;
58
59        // Determine output target
60        let output_target = if node.parents.is_empty() {
61            // Root node - always writes to external output
62            OutputTarget::ExternalOutput
63        } else {
64            // All intermediate nodes need intermediate vector allocation
65            // The previous pass-through optimization was buggy and incorrectly
66            // assigned ExternalOutput to intermediate nodes
67
68            // TODO(joe): Implement vector allocation reuse optimization here:
69            // 1. Identify when intermediate nodes can safely write to ExternalOutput
70            // 2. Check that ALL consumers of this node can handle external output
71            // 3. Verify no conflicts with parallel execution paths
72            // 4. Ensure proper vector lifetime management
73
74            let vector_id = VectorId::new(allocation_types.len());
75            allocation_types.push(array.dtype());
76            OutputTarget::IntermediateVector(vector_id)
77        };
78
79        output_targets[node_idx] = Some(output_target);
80    }
81
82    Ok(VectorAllocation {
83        output_targets: output_targets
84            .into_iter()
85            .map(|target| target.vortex_expect("missing target"))
86            .collect(),
87        vectors: allocation_types
88            .into_iter()
89            .map(|dtype| VectorMut::with_capacity(dtype, N))
90            .collect(),
91    })
92}