1use arrow::compute::SortOptions;
19use datafusion_common::utils::compare_rows;
20use datafusion_common::{exec_err, ScalarValue};
21use std::cmp::Ordering;
22use std::collections::{BinaryHeap, VecDeque};
23
24#[derive(Debug, PartialEq, Eq)]
29struct CustomElement<'a> {
30 branch_idx: usize,
32 value: ScalarValue,
34 ordering: Vec<ScalarValue>,
36 sort_options: &'a [SortOptions],
38}
39
40impl<'a> CustomElement<'a> {
41 fn new(
42 branch_idx: usize,
43 value: ScalarValue,
44 ordering: Vec<ScalarValue>,
45 sort_options: &'a [SortOptions],
46 ) -> Self {
47 Self {
48 branch_idx,
49 value,
50 ordering,
51 sort_options,
52 }
53 }
54
55 fn ordering(
56 &self,
57 current: &[ScalarValue],
58 target: &[ScalarValue],
59 ) -> datafusion_common::Result<Ordering> {
60 compare_rows(current, target, self.sort_options)
62 }
63}
64
65impl Ord for CustomElement<'_> {
69 fn cmp(&self, other: &Self) -> Ordering {
70 self.ordering(&self.ordering, &other.ordering)
73 .map(|ordering| ordering.reverse())
75 .unwrap()
79 }
80}
81
82impl PartialOrd for CustomElement<'_> {
83 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84 Some(self.cmp(other))
85 }
86}
87
88pub fn merge_ordered_arrays(
113 values: &mut [VecDeque<ScalarValue>],
115 ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
119 sort_options: &[SortOptions],
121) -> datafusion_common::Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
122 let mut heap = BinaryHeap::<CustomElement>::new();
124
125 if values.len() != ordering_values.len() {
126 return exec_err!(
127 "Expects values and ordering_values to have same size but got {} and {}",
128 values.len(),
129 ordering_values.len()
130 );
131 }
132 if let Some((idx, (values, ordering_values))) = values
133 .iter()
134 .zip(ordering_values.iter())
135 .enumerate()
136 .find(|(_, (vals, ordering_vals))| vals.len() != ordering_vals.len())
137 {
138 return exec_err!(
139 "Expects values elements and ordering_values elements to have same size but got {} and {} at index {}",
140 values.len(),
141 ordering_values.len(),
142 idx
143 );
144 }
145 let n_branch = values.len();
146 let mut merged_values = vec![];
147 let mut merged_orderings = vec![];
148 loop {
150 let minimum = if let Some(minimum) = heap.pop() {
151 minimum
152 } else {
153 for branch_idx in 0..n_branch {
155 if let Some(orderings) = ordering_values[branch_idx].pop_front() {
156 let value = values[branch_idx].pop_front().unwrap();
158 heap.push(CustomElement::new(
160 branch_idx,
161 value,
162 orderings,
163 sort_options,
164 ));
165 }
166 }
168
169 if let Some(minimum) = heap.pop() {
172 minimum
173 } else {
174 break;
178 }
179 };
180 let CustomElement {
181 branch_idx,
182 value,
183 ordering,
184 ..
185 } = minimum;
186 merged_values.push(value);
188 merged_orderings.push(ordering);
189
190 if let Some(orderings) = ordering_values[branch_idx].pop_front() {
193 let value = values[branch_idx].pop_front().unwrap();
195 heap.push(CustomElement::new(
197 branch_idx,
198 value,
199 orderings,
200 sort_options,
201 ));
202 }
203 }
204
205 Ok((merged_values, merged_orderings))
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 use std::collections::VecDeque;
213 use std::sync::Arc;
214
215 use arrow::array::{ArrayRef, Int64Array};
216
217 use datafusion_common::utils::get_row_at_idx;
218 use datafusion_common::{Result, ScalarValue};
219
220 #[test]
221 fn test_merge_asc() -> Result<()> {
222 let lhs_arrays: Vec<ArrayRef> = vec![
223 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
224 Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
225 ];
226 let n_row = lhs_arrays[0].len();
227 let lhs_orderings = (0..n_row)
228 .map(|idx| get_row_at_idx(&lhs_arrays, idx))
229 .collect::<Result<VecDeque<_>>>()?;
230
231 let rhs_arrays: Vec<ArrayRef> = vec![
232 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
233 Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
234 ];
235 let n_row = rhs_arrays[0].len();
236 let rhs_orderings = (0..n_row)
237 .map(|idx| get_row_at_idx(&rhs_arrays, idx))
238 .collect::<Result<VecDeque<_>>>()?;
239 let sort_options = vec![
240 SortOptions {
241 descending: false,
242 nulls_first: false,
243 },
244 SortOptions {
245 descending: false,
246 nulls_first: false,
247 },
248 ];
249
250 let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
251 let lhs_vals = (0..lhs_vals_arr.len())
252 .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
253 .collect::<Result<VecDeque<_>>>()?;
254
255 let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
256 let rhs_vals = (0..rhs_vals_arr.len())
257 .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
258 .collect::<Result<VecDeque<_>>>()?;
259 let expected =
260 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef;
261 let expected_ts = vec![
262 Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef,
263 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef,
264 ];
265
266 let (merged_vals, merged_ts) = merge_ordered_arrays(
267 &mut [lhs_vals, rhs_vals],
268 &mut [lhs_orderings, rhs_orderings],
269 &sort_options,
270 )?;
271 let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
272 let merged_ts = (0..merged_ts[0].len())
273 .map(|col_idx| {
274 ScalarValue::iter_to_array(
275 (0..merged_ts.len())
276 .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
277 )
278 })
279 .collect::<Result<Vec<_>>>()?;
280
281 assert_eq!(&merged_vals, &expected);
282 assert_eq!(&merged_ts, &expected_ts);
283
284 Ok(())
285 }
286
287 #[test]
288 fn test_merge_desc() -> Result<()> {
289 let lhs_arrays: Vec<ArrayRef> = vec![
290 Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
291 Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
292 ];
293 let n_row = lhs_arrays[0].len();
294 let lhs_orderings = (0..n_row)
295 .map(|idx| get_row_at_idx(&lhs_arrays, idx))
296 .collect::<Result<VecDeque<_>>>()?;
297
298 let rhs_arrays: Vec<ArrayRef> = vec![
299 Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
300 Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
301 ];
302 let n_row = rhs_arrays[0].len();
303 let rhs_orderings = (0..n_row)
304 .map(|idx| get_row_at_idx(&rhs_arrays, idx))
305 .collect::<Result<VecDeque<_>>>()?;
306 let sort_options = vec![
307 SortOptions {
308 descending: true,
309 nulls_first: false,
310 },
311 SortOptions {
312 descending: true,
313 nulls_first: false,
314 },
315 ];
316
317 let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
319 let lhs_vals = (0..lhs_vals_arr.len())
320 .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
321 .collect::<Result<VecDeque<_>>>()?;
322
323 let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
324 let rhs_vals = (0..rhs_vals_arr.len())
325 .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
326 .collect::<Result<VecDeque<_>>>()?;
327 let expected =
328 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef;
329 let expected_ts = vec![
330 Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef,
331 Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef,
332 ];
333 let (merged_vals, merged_ts) = merge_ordered_arrays(
334 &mut [lhs_vals, rhs_vals],
335 &mut [lhs_orderings, rhs_orderings],
336 &sort_options,
337 )?;
338 let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
339 let merged_ts = (0..merged_ts[0].len())
340 .map(|col_idx| {
341 ScalarValue::iter_to_array(
342 (0..merged_ts.len())
343 .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
344 )
345 })
346 .collect::<Result<Vec<_>>>()?;
347
348 assert_eq!(&merged_vals, &expected);
349 assert_eq!(&merged_ts, &expected_ts);
350 Ok(())
351 }
352}