1use arrow::compute::SortOptions;
19use datafusion_common::utils::compare_rows;
20use datafusion_common::{exec_err, ScalarValue};
21use std::cmp::Ordering;
22use std::collections::{BinaryHeap, VecDeque};
23
24#[derive(Debug, PartialEq, Eq)]
29struct CustomElement<'a> {
30 branch_idx: usize,
32 value: ScalarValue,
34 ordering: Vec<ScalarValue>,
36 sort_options: &'a [SortOptions],
38}
39
40impl<'a> CustomElement<'a> {
41 fn new(
42 branch_idx: usize,
43 value: ScalarValue,
44 ordering: Vec<ScalarValue>,
45 sort_options: &'a [SortOptions],
46 ) -> Self {
47 Self {
48 branch_idx,
49 value,
50 ordering,
51 sort_options,
52 }
53 }
54
55 fn ordering(
56 &self,
57 current: &[ScalarValue],
58 target: &[ScalarValue],
59 ) -> datafusion_common::Result<Ordering> {
60 compare_rows(current, target, self.sort_options)
62 }
63}
64
65impl Ord for CustomElement<'_> {
69 fn cmp(&self, other: &Self) -> Ordering {
70 self.ordering(&self.ordering, &other.ordering)
73 .map(|ordering| ordering.reverse())
75 .unwrap()
79 }
80}
81
82impl PartialOrd for CustomElement<'_> {
83 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84 Some(self.cmp(other))
85 }
86}
87
88pub fn merge_ordered_arrays(
113 values: &mut [VecDeque<ScalarValue>],
115 ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
119 sort_options: &[SortOptions],
121) -> datafusion_common::Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
122 let mut heap = BinaryHeap::<CustomElement>::new();
124
125 if values.len() != ordering_values.len()
126 || values
127 .iter()
128 .zip(ordering_values.iter())
129 .any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
130 {
131 return exec_err!(
132 "Expects values arguments and/or ordering_values arguments to have same size"
133 );
134 }
135 let n_branch = values.len();
136 let mut merged_values = vec![];
137 let mut merged_orderings = vec![];
138 loop {
140 let minimum = if let Some(minimum) = heap.pop() {
141 minimum
142 } else {
143 for branch_idx in 0..n_branch {
145 if let Some(orderings) = ordering_values[branch_idx].pop_front() {
146 let value = values[branch_idx].pop_front().unwrap();
148 heap.push(CustomElement::new(
150 branch_idx,
151 value,
152 orderings,
153 sort_options,
154 ));
155 }
156 }
158
159 if let Some(minimum) = heap.pop() {
162 minimum
163 } else {
164 break;
168 }
169 };
170 let CustomElement {
171 branch_idx,
172 value,
173 ordering,
174 ..
175 } = minimum;
176 merged_values.push(value);
178 merged_orderings.push(ordering);
179
180 if let Some(orderings) = ordering_values[branch_idx].pop_front() {
183 let value = values[branch_idx].pop_front().unwrap();
185 heap.push(CustomElement::new(
187 branch_idx,
188 value,
189 orderings,
190 sort_options,
191 ));
192 }
193 }
194
195 Ok((merged_values, merged_orderings))
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201
202 use std::collections::VecDeque;
203 use std::sync::Arc;
204
205 use arrow::array::{ArrayRef, Int64Array};
206
207 use datafusion_common::utils::get_row_at_idx;
208 use datafusion_common::{Result, ScalarValue};
209
210 #[test]
211 fn test_merge_asc() -> Result<()> {
212 let lhs_arrays: Vec<ArrayRef> = vec![
213 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
214 Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
215 ];
216 let n_row = lhs_arrays[0].len();
217 let lhs_orderings = (0..n_row)
218 .map(|idx| get_row_at_idx(&lhs_arrays, idx))
219 .collect::<Result<VecDeque<_>>>()?;
220
221 let rhs_arrays: Vec<ArrayRef> = vec![
222 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
223 Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
224 ];
225 let n_row = rhs_arrays[0].len();
226 let rhs_orderings = (0..n_row)
227 .map(|idx| get_row_at_idx(&rhs_arrays, idx))
228 .collect::<Result<VecDeque<_>>>()?;
229 let sort_options = vec![
230 SortOptions {
231 descending: false,
232 nulls_first: false,
233 },
234 SortOptions {
235 descending: false,
236 nulls_first: false,
237 },
238 ];
239
240 let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
241 let lhs_vals = (0..lhs_vals_arr.len())
242 .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
243 .collect::<Result<VecDeque<_>>>()?;
244
245 let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
246 let rhs_vals = (0..rhs_vals_arr.len())
247 .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
248 .collect::<Result<VecDeque<_>>>()?;
249 let expected =
250 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef;
251 let expected_ts = vec![
252 Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef,
253 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef,
254 ];
255
256 let (merged_vals, merged_ts) = merge_ordered_arrays(
257 &mut [lhs_vals, rhs_vals],
258 &mut [lhs_orderings, rhs_orderings],
259 &sort_options,
260 )?;
261 let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
262 let merged_ts = (0..merged_ts[0].len())
263 .map(|col_idx| {
264 ScalarValue::iter_to_array(
265 (0..merged_ts.len())
266 .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
267 )
268 })
269 .collect::<Result<Vec<_>>>()?;
270
271 assert_eq!(&merged_vals, &expected);
272 assert_eq!(&merged_ts, &expected_ts);
273
274 Ok(())
275 }
276
277 #[test]
278 fn test_merge_desc() -> Result<()> {
279 let lhs_arrays: Vec<ArrayRef> = vec![
280 Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
281 Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
282 ];
283 let n_row = lhs_arrays[0].len();
284 let lhs_orderings = (0..n_row)
285 .map(|idx| get_row_at_idx(&lhs_arrays, idx))
286 .collect::<Result<VecDeque<_>>>()?;
287
288 let rhs_arrays: Vec<ArrayRef> = vec![
289 Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
290 Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
291 ];
292 let n_row = rhs_arrays[0].len();
293 let rhs_orderings = (0..n_row)
294 .map(|idx| get_row_at_idx(&rhs_arrays, idx))
295 .collect::<Result<VecDeque<_>>>()?;
296 let sort_options = vec![
297 SortOptions {
298 descending: true,
299 nulls_first: false,
300 },
301 SortOptions {
302 descending: true,
303 nulls_first: false,
304 },
305 ];
306
307 let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
309 let lhs_vals = (0..lhs_vals_arr.len())
310 .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
311 .collect::<Result<VecDeque<_>>>()?;
312
313 let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
314 let rhs_vals = (0..rhs_vals_arr.len())
315 .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
316 .collect::<Result<VecDeque<_>>>()?;
317 let expected =
318 Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef;
319 let expected_ts = vec![
320 Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef,
321 Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef,
322 ];
323 let (merged_vals, merged_ts) = merge_ordered_arrays(
324 &mut [lhs_vals, rhs_vals],
325 &mut [lhs_orderings, rhs_orderings],
326 &sort_options,
327 )?;
328 let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
329 let merged_ts = (0..merged_ts[0].len())
330 .map(|col_idx| {
331 ScalarValue::iter_to_array(
332 (0..merged_ts.len())
333 .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
334 )
335 })
336 .collect::<Result<Vec<_>>>()?;
337
338 assert_eq!(&merged_vals, &expected);
339 assert_eq!(&merged_ts, &expected_ts);
340 Ok(())
341 }
342}