datafusion_functions_aggregate_common/merge_arrays.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::compute::SortOptions;
19use datafusion_common::utils::compare_rows;
20use datafusion_common::{exec_err, ScalarValue};
21use std::cmp::Ordering;
22use std::collections::{BinaryHeap, VecDeque};
23
24/// This is a wrapper struct to be able to correctly merge `ARRAY_AGG` data from
25/// multiple partitions using `BinaryHeap`. When used inside `BinaryHeap`, this
26/// struct returns smallest `CustomElement`, where smallest is determined by
27/// `ordering` values (`Vec<ScalarValue>`) according to `sort_options`.
28#[derive(Debug, PartialEq, Eq)]
29struct CustomElement<'a> {
30 /// Stores the partition this entry came from
31 branch_idx: usize,
32 /// Values to merge
33 value: ScalarValue,
34 // Comparison "key"
35 ordering: Vec<ScalarValue>,
36 /// Options defining the ordering semantics
37 sort_options: &'a [SortOptions],
38}
39
40impl<'a> CustomElement<'a> {
41 fn new(
42 branch_idx: usize,
43 value: ScalarValue,
44 ordering: Vec<ScalarValue>,
45 sort_options: &'a [SortOptions],
46 ) -> Self {
47 Self {
48 branch_idx,
49 value,
50 ordering,
51 sort_options,
52 }
53 }
54
55 fn ordering(
56 &self,
57 current: &[ScalarValue],
58 target: &[ScalarValue],
59 ) -> datafusion_common::Result<Ordering> {
60 // Calculate ordering according to `sort_options`
61 compare_rows(current, target, self.sort_options)
62 }
63}
64
65// Overwrite ordering implementation such that
66// - `self.ordering` values are used for comparison,
67// - When used inside `BinaryHeap` it is a min-heap.
68impl Ord for CustomElement<'_> {
69 fn cmp(&self, other: &Self) -> Ordering {
70 // Compares according to custom ordering
71 self.ordering(&self.ordering, &other.ordering)
72 // Convert max heap to min heap
73 .map(|ordering| ordering.reverse())
74 // This function return error, when `self.ordering` and `other.ordering`
75 // have different types (such as one is `ScalarValue::Int64`, other is `ScalarValue::Float32`)
76 // Here this case won't happen, because data from each partition will have same type
77 .unwrap()
78 }
79}
80
81impl PartialOrd for CustomElement<'_> {
82 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
83 Some(self.cmp(other))
84 }
85}
86
87/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single array `Vec<ScalarValue>`
88/// Merging done according to ordering values stored inside `ordering_values` (`&[Vec<Vec<ScalarValue>>]`)
89/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as ordering information for the
90/// each `ScalarValue` in the `values` array.
91/// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec<ScalarValue>`
92/// of the `ordering_values` array).
93///
94/// As an example
95/// values can be \[
96/// \[1, 2, 3, 4, 5\],
97/// \[1, 2, 3, 4\],
98/// \[1, 2, 3, 4, 5, 6\],
99/// \]
100/// In this case we will be merging three arrays (doesn't have to be same size)
101/// and produce a merged array with size 15 (sum of 5+4+6)
102/// Merging will be done according to ordering at `ordering_values` vector.
103/// As an example `ordering_values` can be [
104/// \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
105/// \[(1, a), (2, b), (3, b), (4, a) \],
106/// \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
107/// ]
108/// For each ScalarValue in the `values` we have a corresponding `Vec<ScalarValue>` (like timestamp of it)
109/// for the example above `sort_options` will have size two, that defines ordering requirement of the merge.
110/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared according `sort_options` (Their sizes should match)
111pub fn merge_ordered_arrays(
112 // We will merge values into single `Vec<ScalarValue>`.
113 values: &mut [VecDeque<ScalarValue>],
114 // `values` will be merged according to `ordering_values`.
115 // Inner `Vec<ScalarValue>` can be thought as ordering information for the
116 // each `ScalarValue` in the values`.
117 ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
118 // Defines according to which ordering comparisons should be done.
119 sort_options: &[SortOptions],
120) -> datafusion_common::Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
121 // Keep track the most recent data of each branch, in binary heap data structure.
122 let mut heap = BinaryHeap::<CustomElement>::new();
123
124 if values.len() != ordering_values.len()
125 || values
126 .iter()
127 .zip(ordering_values.iter())
128 .any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
129 {
130 return exec_err!(
131 "Expects values arguments and/or ordering_values arguments to have same size"
132 );
133 }
134 let n_branch = values.len();
135 let mut merged_values = vec![];
136 let mut merged_orderings = vec![];
137 // Continue iterating the loop until consuming data of all branches.
138 loop {
139 let minimum = if let Some(minimum) = heap.pop() {
140 minimum
141 } else {
142 // Heap is empty, fill it with the next entries from each branch.
143 for branch_idx in 0..n_branch {
144 if let Some(orderings) = ordering_values[branch_idx].pop_front() {
145 // Their size should be same, we can safely .unwrap here.
146 let value = values[branch_idx].pop_front().unwrap();
147 // Push the next element to the heap:
148 heap.push(CustomElement::new(
149 branch_idx,
150 value,
151 orderings,
152 sort_options,
153 ));
154 }
155 // If None, we consumed this branch, skip it.
156 }
157
158 // Now we have filled the heap, get the largest entry (this will be
159 // the next element in merge).
160 if let Some(minimum) = heap.pop() {
161 minimum
162 } else {
163 // Heap is empty, this means that all indices are same with
164 // `end_indices`. We have consumed all of the branches, merge
165 // is completed, exit from the loop:
166 break;
167 }
168 };
169 let CustomElement {
170 branch_idx,
171 value,
172 ordering,
173 ..
174 } = minimum;
175 // Add minimum value in the heap to the result
176 merged_values.push(value);
177 merged_orderings.push(ordering);
178
179 // If there is an available entry, push next entry in the most
180 // recently consumed branch to the heap.
181 if let Some(orderings) = ordering_values[branch_idx].pop_front() {
182 // Their size should be same, we can safely .unwrap here.
183 let value = values[branch_idx].pop_front().unwrap();
184 // Push the next element to the heap:
185 heap.push(CustomElement::new(
186 branch_idx,
187 value,
188 orderings,
189 sort_options,
190 ));
191 }
192 }
193
194 Ok((merged_values, merged_orderings))
195}