1use arrow::compute::SortOptions;
19use datafusion_common::utils::compare_rows;
20use datafusion_common::{exec_err, ScalarValue};
21use std::cmp::Ordering;
22use std::collections::{BinaryHeap, VecDeque};
23
24#[derive(Debug, PartialEq, Eq)]
29struct CustomElement<'a> {
30 branch_idx: usize,
32 value: ScalarValue,
34 ordering: Vec<ScalarValue>,
36 sort_options: &'a [SortOptions],
38}
39
40impl<'a> CustomElement<'a> {
41 fn new(
42 branch_idx: usize,
43 value: ScalarValue,
44 ordering: Vec<ScalarValue>,
45 sort_options: &'a [SortOptions],
46 ) -> Self {
47 Self {
48 branch_idx,
49 value,
50 ordering,
51 sort_options,
52 }
53 }
54
55 fn ordering(
56 &self,
57 current: &[ScalarValue],
58 target: &[ScalarValue],
59 ) -> datafusion_common::Result<Ordering> {
60 compare_rows(current, target, self.sort_options)
62 }
63}
64
65impl Ord for CustomElement<'_> {
69 fn cmp(&self, other: &Self) -> Ordering {
70 self.ordering(&self.ordering, &other.ordering)
72 .map(|ordering| ordering.reverse())
74 .unwrap()
78 }
79}
80
81impl PartialOrd for CustomElement<'_> {
82 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
83 Some(self.cmp(other))
84 }
85}
86
87pub fn merge_ordered_arrays(
112 values: &mut [VecDeque<ScalarValue>],
114 ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
118 sort_options: &[SortOptions],
120) -> datafusion_common::Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
121 let mut heap = BinaryHeap::<CustomElement>::new();
123
124 if values.len() != ordering_values.len()
125 || values
126 .iter()
127 .zip(ordering_values.iter())
128 .any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
129 {
130 return exec_err!(
131 "Expects values arguments and/or ordering_values arguments to have same size"
132 );
133 }
134 let n_branch = values.len();
135 let mut merged_values = vec![];
136 let mut merged_orderings = vec![];
137 loop {
139 let minimum = if let Some(minimum) = heap.pop() {
140 minimum
141 } else {
142 for branch_idx in 0..n_branch {
144 if let Some(orderings) = ordering_values[branch_idx].pop_front() {
145 let value = values[branch_idx].pop_front().unwrap();
147 heap.push(CustomElement::new(
149 branch_idx,
150 value,
151 orderings,
152 sort_options,
153 ));
154 }
155 }
157
158 if let Some(minimum) = heap.pop() {
161 minimum
162 } else {
163 break;
167 }
168 };
169 let CustomElement {
170 branch_idx,
171 value,
172 ordering,
173 ..
174 } = minimum;
175 merged_values.push(value);
177 merged_orderings.push(ordering);
178
179 if let Some(orderings) = ordering_values[branch_idx].pop_front() {
182 let value = values[branch_idx].pop_front().unwrap();
184 heap.push(CustomElement::new(
186 branch_idx,
187 value,
188 orderings,
189 sort_options,
190 ));
191 }
192 }
193
194 Ok((merged_values, merged_orderings))
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 use std::collections::VecDeque;
202 use std::sync::Arc;
203
204 use arrow::array::{ArrayRef, Int64Array};
205
206 use datafusion_common::utils::get_row_at_idx;
207 use datafusion_common::{Result, ScalarValue};
208
209 #[test]
210 fn test_merge_asc() -> Result<()> {
211 let lhs_arrays: Vec<ArrayRef> = vec![
212 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
213 Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
214 ];
215 let n_row = lhs_arrays[0].len();
216 let lhs_orderings = (0..n_row)
217 .map(|idx| get_row_at_idx(&lhs_arrays, idx))
218 .collect::<Result<VecDeque<_>>>()?;
219
220 let rhs_arrays: Vec<ArrayRef> = vec![
221 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
222 Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
223 ];
224 let n_row = rhs_arrays[0].len();
225 let rhs_orderings = (0..n_row)
226 .map(|idx| get_row_at_idx(&rhs_arrays, idx))
227 .collect::<Result<VecDeque<_>>>()?;
228 let sort_options = vec![
229 SortOptions {
230 descending: false,
231 nulls_first: false,
232 },
233 SortOptions {
234 descending: false,
235 nulls_first: false,
236 },
237 ];
238
239 let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
240 let lhs_vals = (0..lhs_vals_arr.len())
241 .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
242 .collect::<Result<VecDeque<_>>>()?;
243
244 let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
245 let rhs_vals = (0..rhs_vals_arr.len())
246 .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
247 .collect::<Result<VecDeque<_>>>()?;
248 let expected =
249 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef;
250 let expected_ts = vec![
251 Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef,
252 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef,
253 ];
254
255 let (merged_vals, merged_ts) = merge_ordered_arrays(
256 &mut [lhs_vals, rhs_vals],
257 &mut [lhs_orderings, rhs_orderings],
258 &sort_options,
259 )?;
260 let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
261 let merged_ts = (0..merged_ts[0].len())
262 .map(|col_idx| {
263 ScalarValue::iter_to_array(
264 (0..merged_ts.len())
265 .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
266 )
267 })
268 .collect::<Result<Vec<_>>>()?;
269
270 assert_eq!(&merged_vals, &expected);
271 assert_eq!(&merged_ts, &expected_ts);
272
273 Ok(())
274 }
275
276 #[test]
277 fn test_merge_desc() -> Result<()> {
278 let lhs_arrays: Vec<ArrayRef> = vec![
279 Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
280 Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
281 ];
282 let n_row = lhs_arrays[0].len();
283 let lhs_orderings = (0..n_row)
284 .map(|idx| get_row_at_idx(&lhs_arrays, idx))
285 .collect::<Result<VecDeque<_>>>()?;
286
287 let rhs_arrays: Vec<ArrayRef> = vec![
288 Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
289 Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
290 ];
291 let n_row = rhs_arrays[0].len();
292 let rhs_orderings = (0..n_row)
293 .map(|idx| get_row_at_idx(&rhs_arrays, idx))
294 .collect::<Result<VecDeque<_>>>()?;
295 let sort_options = vec![
296 SortOptions {
297 descending: true,
298 nulls_first: false,
299 },
300 SortOptions {
301 descending: true,
302 nulls_first: false,
303 },
304 ];
305
306 let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
308 let lhs_vals = (0..lhs_vals_arr.len())
309 .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
310 .collect::<Result<VecDeque<_>>>()?;
311
312 let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
313 let rhs_vals = (0..rhs_vals_arr.len())
314 .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
315 .collect::<Result<VecDeque<_>>>()?;
316 let expected =
317 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef;
318 let expected_ts = vec![
319 Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef,
320 Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef,
321 ];
322 let (merged_vals, merged_ts) = merge_ordered_arrays(
323 &mut [lhs_vals, rhs_vals],
324 &mut [lhs_orderings, rhs_orderings],
325 &sort_options,
326 )?;
327 let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
328 let merged_ts = (0..merged_ts[0].len())
329 .map(|col_idx| {
330 ScalarValue::iter_to_array(
331 (0..merged_ts.len())
332 .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
333 )
334 })
335 .collect::<Result<Vec<_>>>()?;
336
337 assert_eq!(&merged_vals, &expected);
338 assert_eq!(&merged_ts, &expected_ts);
339 Ok(())
340 }
341}