datafusion_functions_aggregate_common/
merge_arrays.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::compute::SortOptions;
19use datafusion_common::utils::compare_rows;
20use datafusion_common::{exec_err, ScalarValue};
21use std::cmp::Ordering;
22use std::collections::{BinaryHeap, VecDeque};
23
24/// This is a wrapper struct to be able to correctly merge `ARRAY_AGG` data from
25/// multiple partitions using `BinaryHeap`. When used inside `BinaryHeap`, this
26/// struct returns smallest `CustomElement`, where smallest is determined by
27/// `ordering` values (`Vec<ScalarValue>`) according to `sort_options`.
28#[derive(Debug, PartialEq, Eq)]
29struct CustomElement<'a> {
30    /// Stores the partition this entry came from
31    branch_idx: usize,
32    /// Values to merge
33    value: ScalarValue,
34    // Comparison "key"
35    ordering: Vec<ScalarValue>,
36    /// Options defining the ordering semantics
37    sort_options: &'a [SortOptions],
38}
39
40impl<'a> CustomElement<'a> {
41    fn new(
42        branch_idx: usize,
43        value: ScalarValue,
44        ordering: Vec<ScalarValue>,
45        sort_options: &'a [SortOptions],
46    ) -> Self {
47        Self {
48            branch_idx,
49            value,
50            ordering,
51            sort_options,
52        }
53    }
54
55    fn ordering(
56        &self,
57        current: &[ScalarValue],
58        target: &[ScalarValue],
59    ) -> datafusion_common::Result<Ordering> {
60        // Calculate ordering according to `sort_options`
61        compare_rows(current, target, self.sort_options)
62    }
63}
64
65// Overwrite ordering implementation such that
66// - `self.ordering` values are used for comparison,
67// - When used inside `BinaryHeap` it is a min-heap.
68impl Ord for CustomElement<'_> {
69    fn cmp(&self, other: &Self) -> Ordering {
70        // TODO Ord/PartialOrd is not consistent with PartialEq; PartialOrd contract is violated
71        // Compares according to custom ordering
72        self.ordering(&self.ordering, &other.ordering)
73            // Convert max heap to min heap
74            .map(|ordering| ordering.reverse())
75            // This function return error, when `self.ordering` and `other.ordering`
76            // have different types (such as one is `ScalarValue::Int64`, other is `ScalarValue::Float32`)
77            // Here this case won't happen, because data from each partition will have same type
78            .unwrap()
79    }
80}
81
82impl PartialOrd for CustomElement<'_> {
83    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84        Some(self.cmp(other))
85    }
86}
87
88/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single array `Vec<ScalarValue>`
89/// Merging done according to ordering values stored inside `ordering_values` (`&[Vec<Vec<ScalarValue>>]`)
90/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as ordering information for
91/// each `ScalarValue` in the `values` array.
92/// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec<ScalarValue>`
93/// of the `ordering_values` array).
94///
95/// As an example
96/// values can be \[
97///      \[1, 2, 3, 4, 5\],
98///      \[1, 2, 3, 4\],
99///      \[1, 2, 3, 4, 5, 6\],
100/// \]
101/// In this case we will be merging three arrays (doesn't have to be same size)
102/// and produce a merged array with size 15 (sum of 5+4+6)
103/// Merging will be done according to ordering at `ordering_values` vector.
104/// As an example `ordering_values` can be [
105///      \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
106///      \[(1, a), (2, b), (3, b), (4, a) \],
107///      \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
108/// ]
109/// For each ScalarValue in the `values` we have a corresponding `Vec<ScalarValue>` (like timestamp of it)
110/// for the example above `sort_options` will have size two, that defines ordering requirement of the merge.
111/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared according `sort_options` (Their sizes should match)
112pub fn merge_ordered_arrays(
113    // We will merge values into single `Vec<ScalarValue>`.
114    values: &mut [VecDeque<ScalarValue>],
115    // `values` will be merged according to `ordering_values`.
116    // Inner `Vec<ScalarValue>` can be thought as ordering information for the
117    // each `ScalarValue` in the values`.
118    ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
119    // Defines according to which ordering comparisons should be done.
120    sort_options: &[SortOptions],
121) -> datafusion_common::Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
122    // Keep track of the most recent data of each branch, in a binary heap data structure.
123    let mut heap = BinaryHeap::<CustomElement>::new();
124
125    if values.len() != ordering_values.len() {
126        return exec_err!(
127            "Expects values and ordering_values to have same size but got {} and {}",
128            values.len(),
129            ordering_values.len()
130        );
131    }
132    if let Some((idx, (values, ordering_values))) = values
133        .iter()
134        .zip(ordering_values.iter())
135        .enumerate()
136        .find(|(_, (vals, ordering_vals))| vals.len() != ordering_vals.len())
137    {
138        return exec_err!(
139            "Expects values elements and ordering_values elements to have same size but got {} and {} at index {}",
140            values.len(),
141            ordering_values.len(),
142            idx
143        );
144    }
145    let n_branch = values.len();
146    let mut merged_values = vec![];
147    let mut merged_orderings = vec![];
148    // Continue iterating the loop until consuming data of all branches.
149    loop {
150        let minimum = if let Some(minimum) = heap.pop() {
151            minimum
152        } else {
153            // Heap is empty, fill it with the next entries from each branch.
154            for branch_idx in 0..n_branch {
155                if let Some(orderings) = ordering_values[branch_idx].pop_front() {
156                    // Their size should be same, we can safely .unwrap here.
157                    let value = values[branch_idx].pop_front().unwrap();
158                    // Push the next element to the heap:
159                    heap.push(CustomElement::new(
160                        branch_idx,
161                        value,
162                        orderings,
163                        sort_options,
164                    ));
165                }
166                // If None, we consumed this branch, skip it.
167            }
168
169            // Now we have filled the heap, get the largest entry (this will be
170            // the next element in merge).
171            if let Some(minimum) = heap.pop() {
172                minimum
173            } else {
174                // Heap is empty, this means that all indices are same with
175                // `end_indices`. We have consumed all of the branches, merge
176                // is completed, exit from the loop:
177                break;
178            }
179        };
180        let CustomElement {
181            branch_idx,
182            value,
183            ordering,
184            ..
185        } = minimum;
186        // Add minimum value in the heap to the result
187        merged_values.push(value);
188        merged_orderings.push(ordering);
189
190        // If there is an available entry, push next entry in the most
191        // recently consumed branch to the heap.
192        if let Some(orderings) = ordering_values[branch_idx].pop_front() {
193            // Their size should be same, we can safely .unwrap here.
194            let value = values[branch_idx].pop_front().unwrap();
195            // Push the next element to the heap:
196            heap.push(CustomElement::new(
197                branch_idx,
198                value,
199                orderings,
200                sort_options,
201            ));
202        }
203    }
204
205    Ok((merged_values, merged_orderings))
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    use std::collections::VecDeque;
213    use std::sync::Arc;
214
215    use arrow::array::{ArrayRef, Int64Array};
216
217    use datafusion_common::utils::get_row_at_idx;
218    use datafusion_common::{Result, ScalarValue};
219
220    #[test]
221    fn test_merge_asc() -> Result<()> {
222        let lhs_arrays: Vec<ArrayRef> = vec![
223            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
224            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
225        ];
226        let n_row = lhs_arrays[0].len();
227        let lhs_orderings = (0..n_row)
228            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
229            .collect::<Result<VecDeque<_>>>()?;
230
231        let rhs_arrays: Vec<ArrayRef> = vec![
232            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
233            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
234        ];
235        let n_row = rhs_arrays[0].len();
236        let rhs_orderings = (0..n_row)
237            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
238            .collect::<Result<VecDeque<_>>>()?;
239        let sort_options = vec![
240            SortOptions {
241                descending: false,
242                nulls_first: false,
243            },
244            SortOptions {
245                descending: false,
246                nulls_first: false,
247            },
248        ];
249
250        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
251        let lhs_vals = (0..lhs_vals_arr.len())
252            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
253            .collect::<Result<VecDeque<_>>>()?;
254
255        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
256        let rhs_vals = (0..rhs_vals_arr.len())
257            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
258            .collect::<Result<VecDeque<_>>>()?;
259        let expected =
260            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef;
261        let expected_ts = vec![
262            Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef,
263            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef,
264        ];
265
266        let (merged_vals, merged_ts) = merge_ordered_arrays(
267            &mut [lhs_vals, rhs_vals],
268            &mut [lhs_orderings, rhs_orderings],
269            &sort_options,
270        )?;
271        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
272        let merged_ts = (0..merged_ts[0].len())
273            .map(|col_idx| {
274                ScalarValue::iter_to_array(
275                    (0..merged_ts.len())
276                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
277                )
278            })
279            .collect::<Result<Vec<_>>>()?;
280
281        assert_eq!(&merged_vals, &expected);
282        assert_eq!(&merged_ts, &expected_ts);
283
284        Ok(())
285    }
286
287    #[test]
288    fn test_merge_desc() -> Result<()> {
289        let lhs_arrays: Vec<ArrayRef> = vec![
290            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
291            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
292        ];
293        let n_row = lhs_arrays[0].len();
294        let lhs_orderings = (0..n_row)
295            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
296            .collect::<Result<VecDeque<_>>>()?;
297
298        let rhs_arrays: Vec<ArrayRef> = vec![
299            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
300            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
301        ];
302        let n_row = rhs_arrays[0].len();
303        let rhs_orderings = (0..n_row)
304            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
305            .collect::<Result<VecDeque<_>>>()?;
306        let sort_options = vec![
307            SortOptions {
308                descending: true,
309                nulls_first: false,
310            },
311            SortOptions {
312                descending: true,
313                nulls_first: false,
314            },
315        ];
316
317        // Values (which will be merged) doesn't have to be ordered.
318        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
319        let lhs_vals = (0..lhs_vals_arr.len())
320            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
321            .collect::<Result<VecDeque<_>>>()?;
322
323        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
324        let rhs_vals = (0..rhs_vals_arr.len())
325            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
326            .collect::<Result<VecDeque<_>>>()?;
327        let expected =
328            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef;
329        let expected_ts = vec![
330            Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef,
331            Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef,
332        ];
333        let (merged_vals, merged_ts) = merge_ordered_arrays(
334            &mut [lhs_vals, rhs_vals],
335            &mut [lhs_orderings, rhs_orderings],
336            &sort_options,
337        )?;
338        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
339        let merged_ts = (0..merged_ts[0].len())
340            .map(|col_idx| {
341                ScalarValue::iter_to_array(
342                    (0..merged_ts.len())
343                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
344                )
345            })
346            .collect::<Result<Vec<_>>>()?;
347
348        assert_eq!(&merged_vals, &expected);
349        assert_eq!(&merged_ts, &expected_ts);
350        Ok(())
351    }
352}