datafusion_physical_expr/window/
window_expr.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::fmt::Debug;
20use std::ops::Range;
21use std::sync::Arc;
22
23use crate::PhysicalExpr;
24
25use arrow::array::BooleanArray;
26use arrow::array::{Array, ArrayRef, new_empty_array};
27use arrow::compute::SortOptions;
28use arrow::compute::filter as arrow_filter;
29use arrow::compute::kernels::sort::SortColumn;
30use arrow::datatypes::FieldRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::cast::as_boolean_array;
33use datafusion_common::utils::compare_rows;
34use datafusion_common::{
35    Result, ScalarValue, arrow_datafusion_err, exec_datafusion_err, internal_err,
36};
37use datafusion_expr::window_state::{
38    PartitionBatchState, WindowAggState, WindowFrameContext, WindowFrameStateGroups,
39};
40use datafusion_expr::{Accumulator, PartitionEvaluator, WindowFrame, WindowFrameBound};
41use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
42
43use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
44use indexmap::IndexMap;
45
46/// Common trait for [window function] implementations
47///
48/// # Aggregate Window Expressions
49///
50/// These expressions take the form
51///
52/// ```text
53/// OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)
54/// ```
55///
56/// For example, cumulative window frames uses `PlainAggregateWindowExpr`.
57///
58/// # Non Aggregate Window Expressions
59///
60/// The expressions have the form
61///
62/// ```text
63/// OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)
64/// ```
65///
66/// For example, sliding window frames use [`SlidingAggregateWindowExpr`].
67///
68/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
69/// [`PlainAggregateWindowExpr`]: crate::window::PlainAggregateWindowExpr
70/// [`SlidingAggregateWindowExpr`]: crate::window::SlidingAggregateWindowExpr
71pub trait WindowExpr: Send + Sync + Debug {
72    /// Returns the window expression as [`Any`] so that it can be
73    /// downcast to a specific implementation.
74    fn as_any(&self) -> &dyn Any;
75
76    /// The field of the final result of this window function.
77    fn field(&self) -> Result<FieldRef>;
78
79    /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
80    /// implementation returns placeholder text.
81    fn name(&self) -> &str {
82        "WindowExpr: default name"
83    }
84
85    /// Expressions that are passed to the WindowAccumulator.
86    /// Functions which take a single input argument, such as `sum`, return a single [`datafusion_expr::expr::Expr`],
87    /// others (e.g. `cov`) return many.
88    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
89
90    /// Evaluate the window function arguments against the batch and return
91    /// array ref, normally the resulting `Vec` is a single element one.
92    fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
93        evaluate_expressions_to_arrays(&self.expressions(), batch)
94    }
95
96    /// Evaluate the window function values against the batch
97    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
98
99    /// Evaluate the window function against the batch. This function facilitates
100    /// stateful, bounded-memory implementations.
101    fn evaluate_stateful(
102        &self,
103        _partition_batches: &PartitionBatches,
104        _window_agg_state: &mut PartitionWindowAggStates,
105    ) -> Result<()> {
106        internal_err!("evaluate_stateful is not implemented for {}", self.name())
107    }
108
109    /// Expressions that's from the window function's partition by clause, empty if absent
110    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
111
112    /// Expressions that's from the window function's order by clause, empty if absent
113    fn order_by(&self) -> &[PhysicalSortExpr];
114
115    /// Get order by columns, empty if absent
116    fn order_by_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
117        self.order_by()
118            .iter()
119            .map(|e| e.evaluate_to_sort_column(batch))
120            .collect()
121    }
122
123    /// Get the window frame of this [WindowExpr].
124    fn get_window_frame(&self) -> &Arc<WindowFrame>;
125
126    /// Return a flag indicating whether this [WindowExpr] can run with
127    /// bounded memory.
128    fn uses_bounded_memory(&self) -> bool;
129
130    /// Get the reverse expression of this [WindowExpr].
131    fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>>;
132
133    /// Creates a new instance of the window function evaluator.
134    ///
135    /// Returns `WindowFn::Builtin` for built-in window functions (e.g., ROW_NUMBER, RANK)
136    /// or `WindowFn::Aggregate` for aggregate window functions (e.g., SUM, AVG).
137    fn create_window_fn(&self) -> Result<WindowFn>;
138
139    /// Returns all expressions used in the [`WindowExpr`].
140    /// These expressions are (1) function arguments, (2) partition by expressions, (3) order by expressions.
141    fn all_expressions(&self) -> WindowPhysicalExpressions {
142        let args = self.expressions();
143        let partition_by_exprs = self.partition_by().to_vec();
144        let order_by_exprs = self
145            .order_by()
146            .iter()
147            .map(|sort_expr| Arc::clone(&sort_expr.expr))
148            .collect();
149        WindowPhysicalExpressions {
150            args,
151            partition_by_exprs,
152            order_by_exprs,
153        }
154    }
155
156    /// Rewrites [`WindowExpr`], with new expressions given. The argument should be consistent
157    /// with the return value of the [`WindowExpr::all_expressions`] method.
158    /// Returns `Some(Arc<dyn WindowExpr>)` if re-write is supported, otherwise returns `None`.
159    fn with_new_expressions(
160        &self,
161        _args: Vec<Arc<dyn PhysicalExpr>>,
162        _partition_bys: Vec<Arc<dyn PhysicalExpr>>,
163        _order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
164    ) -> Option<Arc<dyn WindowExpr>> {
165        None
166    }
167}
168
169/// Stores the physical expressions used inside the `WindowExpr`.
170pub struct WindowPhysicalExpressions {
171    /// Window function arguments
172    pub args: Vec<Arc<dyn PhysicalExpr>>,
173    /// PARTITION BY expressions
174    pub partition_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
175    /// ORDER BY expressions
176    pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
177}
178
179/// Extension trait that adds common functionality to [`AggregateWindowExpr`]s
180pub trait AggregateWindowExpr: WindowExpr {
181    /// Get the accumulator for the window expression. Note that distinct
182    /// window expressions may return distinct accumulators; e.g. sliding
183    /// (non-sliding) expressions will return sliding (normal) accumulators.
184    fn get_accumulator(&self) -> Result<Box<dyn Accumulator>>;
185
186    /// Optional FILTER (WHERE ...) predicate for this window aggregate.
187    fn filter_expr(&self) -> Option<&Arc<dyn PhysicalExpr>>;
188
189    /// Given current range and the last range, calculates the accumulator
190    /// result for the range of interest.
191    fn get_aggregate_result_inside_range(
192        &self,
193        last_range: &Range<usize>,
194        cur_range: &Range<usize>,
195        value_slice: &[ArrayRef],
196        accumulator: &mut Box<dyn Accumulator>,
197        filter_mask: Option<&BooleanArray>,
198    ) -> Result<ScalarValue>;
199
200    /// Indicates whether this window function always produces the same result
201    /// for all rows in the partition.
202    fn is_constant_in_partition(&self) -> bool;
203
204    /// Evaluates the window function against the batch.
205    fn aggregate_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
206        let mut accumulator = self.get_accumulator()?;
207        let mut last_range = Range { start: 0, end: 0 };
208        let sort_options = self.order_by().iter().map(|o| o.options).collect();
209        let mut window_frame_ctx =
210            WindowFrameContext::new(Arc::clone(self.get_window_frame()), sort_options);
211        self.get_result_column(
212            &mut accumulator,
213            batch,
214            None,
215            &mut last_range,
216            &mut window_frame_ctx,
217            0,
218            false,
219        )
220    }
221
222    /// Statefully evaluates the window function against the batch. Maintains
223    /// state so that it can work incrementally over multiple chunks.
224    fn aggregate_evaluate_stateful(
225        &self,
226        partition_batches: &PartitionBatches,
227        window_agg_state: &mut PartitionWindowAggStates,
228    ) -> Result<()> {
229        let field = self.field()?;
230        let out_type = field.data_type();
231        for (partition_row, partition_batch_state) in partition_batches.iter() {
232            if !window_agg_state.contains_key(partition_row) {
233                let accumulator = self.get_accumulator()?;
234                window_agg_state.insert(
235                    partition_row.clone(),
236                    WindowState {
237                        state: WindowAggState::new(out_type)?,
238                        window_fn: WindowFn::Aggregate(accumulator),
239                    },
240                );
241            };
242            let window_state = window_agg_state
243                .get_mut(partition_row)
244                .ok_or_else(|| exec_datafusion_err!("Cannot find state"))?;
245            let accumulator = match &mut window_state.window_fn {
246                WindowFn::Aggregate(accumulator) => accumulator,
247                _ => unreachable!(),
248            };
249            let state = &mut window_state.state;
250            let record_batch = &partition_batch_state.record_batch;
251            let most_recent_row = partition_batch_state.most_recent_row.as_ref();
252
253            // If there is no window state context, initialize it.
254            let window_frame_ctx = state.window_frame_ctx.get_or_insert_with(|| {
255                let sort_options = self.order_by().iter().map(|o| o.options).collect();
256                WindowFrameContext::new(Arc::clone(self.get_window_frame()), sort_options)
257            });
258            let out_col = self.get_result_column(
259                accumulator,
260                record_batch,
261                most_recent_row,
262                // Start search from the last range
263                &mut state.window_frame_range,
264                window_frame_ctx,
265                state.last_calculated_index,
266                !partition_batch_state.is_end,
267            )?;
268            state.update(&out_col, partition_batch_state)?;
269        }
270        Ok(())
271    }
272
273    /// Calculates the window expression result for the given record batch.
274    /// Assumes that `record_batch` belongs to a single partition.
275    ///
276    /// # Arguments
277    /// * `accumulator`: The accumulator to use for the calculation.
278    /// * `record_batch`: batch belonging to the current partition (see [`PartitionBatchState`]).
279    /// * `most_recent_row`: the batch that contains the most recent row, if available (see [`PartitionBatchState`]).
280    /// * `last_range`: The last range of rows that were processed (see [`WindowAggState`]).
281    /// * `window_frame_ctx`: Details about the window frame (see [`WindowFrameContext`]).
282    /// * `idx`: The index of the current row in the record batch.
283    /// * `not_end`: is the current row not the end of the partition (see [`PartitionBatchState`]).
284    #[expect(clippy::too_many_arguments)]
285    fn get_result_column(
286        &self,
287        accumulator: &mut Box<dyn Accumulator>,
288        record_batch: &RecordBatch,
289        most_recent_row: Option<&RecordBatch>,
290        last_range: &mut Range<usize>,
291        window_frame_ctx: &mut WindowFrameContext,
292        mut idx: usize,
293        not_end: bool,
294    ) -> Result<ArrayRef> {
295        let values = self.evaluate_args(record_batch)?;
296
297        // Evaluate filter mask once per record batch if present
298        let filter_mask_arr: Option<ArrayRef> = match self.filter_expr() {
299            Some(expr) => {
300                let value = expr.evaluate(record_batch)?;
301                Some(value.into_array(record_batch.num_rows())?)
302            }
303            None => None,
304        };
305
306        // Borrow boolean view from the owned array
307        let filter_mask: Option<&BooleanArray> = match filter_mask_arr.as_deref() {
308            Some(arr) => Some(as_boolean_array(arr)?),
309            None => None,
310        };
311
312        if self.is_constant_in_partition() {
313            if not_end {
314                let field = self.field()?;
315                let out_type = field.data_type();
316                return Ok(new_empty_array(out_type));
317            }
318            let values = if let Some(mask) = filter_mask {
319                // Apply mask to all argument arrays before a single update
320                filter_arrays(&values, mask)?
321            } else {
322                values
323            };
324            accumulator.update_batch(&values)?;
325            let value = accumulator.evaluate()?;
326            return value.to_array_of_size(record_batch.num_rows());
327        }
328        let order_bys = get_orderby_values(self.order_by_columns(record_batch)?);
329        let most_recent_row_order_bys = most_recent_row
330            .map(|batch| self.order_by_columns(batch))
331            .transpose()?
332            .map(get_orderby_values);
333
334        // We iterate on each row to perform a running calculation.
335        let length = values[0].len();
336        let mut row_wise_results: Vec<ScalarValue> = vec![];
337        let is_causal = self.get_window_frame().is_causal();
338        while idx < length {
339            // Start search from the last_range. This squeezes searched range.
340            let cur_range =
341                window_frame_ctx.calculate_range(&order_bys, last_range, length, idx)?;
342            // Exit if the range is non-causal and extends all the way:
343            if cur_range.end == length
344                && !is_causal
345                && not_end
346                && !is_end_bound_safe(
347                    window_frame_ctx,
348                    &order_bys,
349                    most_recent_row_order_bys.as_deref(),
350                    self.order_by(),
351                    idx,
352                )?
353            {
354                break;
355            }
356            let value = self.get_aggregate_result_inside_range(
357                last_range,
358                &cur_range,
359                &values,
360                accumulator,
361                filter_mask,
362            )?;
363            // Update last range
364            *last_range = cur_range;
365            row_wise_results.push(value);
366            idx += 1;
367        }
368
369        if row_wise_results.is_empty() {
370            let field = self.field()?;
371            let out_type = field.data_type();
372            Ok(new_empty_array(out_type))
373        } else {
374            ScalarValue::iter_to_array(row_wise_results)
375        }
376    }
377}
378
379/// Filters a single array with the provided boolean mask.
380pub(crate) fn filter_array(array: &ArrayRef, mask: &BooleanArray) -> Result<ArrayRef> {
381    arrow_filter(array.as_ref(), mask)
382        .map(|a| a as ArrayRef)
383        .map_err(|e| arrow_datafusion_err!(e))
384}
385
386/// Filters a list of arrays with the provided boolean mask.
387pub(crate) fn filter_arrays(
388    arrays: &[ArrayRef],
389    mask: &BooleanArray,
390) -> Result<Vec<ArrayRef>> {
391    arrays.iter().map(|arr| filter_array(arr, mask)).collect()
392}
393
394/// Determines whether the end bound calculation for a window frame context is
395/// safe, meaning that the end bound stays the same, regardless of future data,
396/// based on the current sort expressions and ORDER BY columns. This function
397/// delegates work to specific functions for each frame type.
398///
399/// # Parameters
400///
401/// * `window_frame_ctx`: The context of the window frame being evaluated.
402/// * `order_bys`: A slice of `ArrayRef` representing the ORDER BY columns.
403/// * `most_recent_order_bys`: An optional reference to the most recent ORDER BY
404///   columns.
405/// * `sort_exprs`: Defines the lexicographical ordering in question.
406/// * `idx`: The current index in the window frame.
407///
408/// # Returns
409///
410/// A `Result` which is `Ok(true)` if the end bound is safe, `Ok(false)` otherwise.
411pub(crate) fn is_end_bound_safe(
412    window_frame_ctx: &WindowFrameContext,
413    order_bys: &[ArrayRef],
414    most_recent_order_bys: Option<&[ArrayRef]>,
415    sort_exprs: &[PhysicalSortExpr],
416    idx: usize,
417) -> Result<bool> {
418    if sort_exprs.is_empty() {
419        // Early return if no sort expressions are present:
420        return Ok(false);
421    };
422
423    match window_frame_ctx {
424        WindowFrameContext::Rows(window_frame) => {
425            is_end_bound_safe_for_rows(&window_frame.end_bound)
426        }
427        WindowFrameContext::Range { window_frame, .. } => is_end_bound_safe_for_range(
428            &window_frame.end_bound,
429            &order_bys[0],
430            most_recent_order_bys.map(|items| &items[0]),
431            &sort_exprs[0].options,
432            idx,
433        ),
434        WindowFrameContext::Groups {
435            window_frame,
436            state,
437        } => is_end_bound_safe_for_groups(
438            &window_frame.end_bound,
439            state,
440            &order_bys[0],
441            most_recent_order_bys.map(|items| &items[0]),
442            &sort_exprs[0].options,
443        ),
444    }
445}
446
447/// For row-based window frames, determines whether the end bound calculation
448/// is safe, which is trivially the case for `Preceding` and `CurrentRow` bounds.
449/// For 'Following' bounds, it compares the bound value to zero to ensure that
450/// it doesn't extend beyond the current row.
451///
452/// # Parameters
453///
454/// * `end_bound`: Reference to the window frame bound in question.
455///
456/// # Returns
457///
458/// A `Result` indicating whether the end bound is safe for row-based window frames.
459fn is_end_bound_safe_for_rows(end_bound: &WindowFrameBound) -> Result<bool> {
460    if let WindowFrameBound::Following(value) = end_bound {
461        let zero = ScalarValue::new_zero(&value.data_type());
462        Ok(zero.map(|zero| value.eq(&zero)).unwrap_or(false))
463    } else {
464        Ok(true)
465    }
466}
467
468/// For row-based window frames, determines whether the end bound calculation
469/// is safe by comparing it against specific values (zero, current row). It uses
470/// the `is_row_ahead` helper function to determine if the current row is ahead
471/// of the most recent row based on the ORDER BY column and sorting options.
472///
473/// # Parameters
474///
475/// * `end_bound`: Reference to the window frame bound in question.
476/// * `orderby_col`: Reference to the column used for ordering.
477/// * `most_recent_ob_col`: Optional reference to the most recent order-by column.
478/// * `sort_options`: The sorting options used in the window frame.
479/// * `idx`: The current index in the window frame.
480///
481/// # Returns
482///
483/// A `Result` indicating whether the end bound is safe for range-based window frames.
484fn is_end_bound_safe_for_range(
485    end_bound: &WindowFrameBound,
486    orderby_col: &ArrayRef,
487    most_recent_ob_col: Option<&ArrayRef>,
488    sort_options: &SortOptions,
489    idx: usize,
490) -> Result<bool> {
491    match end_bound {
492        WindowFrameBound::Preceding(value) => {
493            let zero = ScalarValue::new_zero(&value.data_type())?;
494            if value.eq(&zero) {
495                is_row_ahead(orderby_col, most_recent_ob_col, sort_options)
496            } else {
497                Ok(true)
498            }
499        }
500        WindowFrameBound::CurrentRow => {
501            is_row_ahead(orderby_col, most_recent_ob_col, sort_options)
502        }
503        WindowFrameBound::Following(delta) => {
504            let Some(most_recent_ob_col) = most_recent_ob_col else {
505                return Ok(false);
506            };
507            let most_recent_row_value =
508                ScalarValue::try_from_array(most_recent_ob_col, 0)?;
509            let current_row_value = ScalarValue::try_from_array(orderby_col, idx)?;
510
511            if sort_options.descending {
512                current_row_value
513                    .sub(delta)
514                    .map(|value| value > most_recent_row_value)
515            } else {
516                current_row_value
517                    .add(delta)
518                    .map(|value| most_recent_row_value > value)
519            }
520        }
521    }
522}
523
524/// For group-based window frames, determines whether the end bound calculation
525/// is safe by considering the group offset and whether the current row is ahead
526/// of the most recent row in terms of sorting. It checks if the end bound is
527/// within the bounds of the current group based on group end indices.
528///
529/// # Parameters
530///
531/// * `end_bound`: Reference to the window frame bound in question.
532/// * `state`: The state of the window frame for group calculations.
533/// * `orderby_col`: Reference to the column used for ordering.
534/// * `most_recent_ob_col`: Optional reference to the most recent order-by column.
535/// * `sort_options`: The sorting options used in the window frame.
536///
537/// # Returns
538///
539/// A `Result` indicating whether the end bound is safe for group-based window frames.
540fn is_end_bound_safe_for_groups(
541    end_bound: &WindowFrameBound,
542    state: &WindowFrameStateGroups,
543    orderby_col: &ArrayRef,
544    most_recent_ob_col: Option<&ArrayRef>,
545    sort_options: &SortOptions,
546) -> Result<bool> {
547    match end_bound {
548        WindowFrameBound::Preceding(value) => {
549            let zero = ScalarValue::new_zero(&value.data_type())?;
550            if value.eq(&zero) {
551                is_row_ahead(orderby_col, most_recent_ob_col, sort_options)
552            } else {
553                Ok(true)
554            }
555        }
556        WindowFrameBound::CurrentRow => {
557            is_row_ahead(orderby_col, most_recent_ob_col, sort_options)
558        }
559        WindowFrameBound::Following(ScalarValue::UInt64(Some(offset))) => {
560            let delta = state.group_end_indices.len() - state.current_group_idx;
561            if delta == (*offset as usize) + 1 {
562                is_row_ahead(orderby_col, most_recent_ob_col, sort_options)
563            } else {
564                Ok(false)
565            }
566        }
567        _ => Ok(false),
568    }
569}
570
571/// This utility function checks whether `current_cols` is ahead of the `old_cols`
572/// in terms of `sort_options`.
573fn is_row_ahead(
574    old_col: &ArrayRef,
575    current_col: Option<&ArrayRef>,
576    sort_options: &SortOptions,
577) -> Result<bool> {
578    let Some(current_col) = current_col else {
579        return Ok(false);
580    };
581    if old_col.is_empty() || current_col.is_empty() {
582        return Ok(false);
583    }
584    let last_value = ScalarValue::try_from_array(old_col, old_col.len() - 1)?;
585    let current_value = ScalarValue::try_from_array(current_col, 0)?;
586    let cmp = compare_rows(&[current_value], &[last_value], &[*sort_options])?;
587    Ok(cmp.is_gt())
588}
589
590/// Get order by expression results inside `order_by_columns`.
591pub(crate) fn get_orderby_values(order_by_columns: Vec<SortColumn>) -> Vec<ArrayRef> {
592    order_by_columns.into_iter().map(|s| s.values).collect()
593}
594
595#[derive(Debug)]
596pub enum WindowFn {
597    Builtin(Box<dyn PartitionEvaluator>),
598    Aggregate(Box<dyn Accumulator>),
599}
600
601/// Key for IndexMap for each unique partition
602///
603/// For instance, if window frame is `OVER(PARTITION BY a,b)`,
604/// PartitionKey would consist of unique `[a,b]` pairs
605pub type PartitionKey = Vec<ScalarValue>;
606
607#[derive(Debug)]
608pub struct WindowState {
609    pub state: WindowAggState,
610    pub window_fn: WindowFn,
611}
612pub type PartitionWindowAggStates = IndexMap<PartitionKey, WindowState>;
613
614/// The IndexMap (i.e. an ordered HashMap) where record batches are separated for each partition.
615pub type PartitionBatches = IndexMap<PartitionKey, PartitionBatchState>;
616
617#[cfg(test)]
618mod tests {
619    use std::sync::Arc;
620
621    use crate::window::window_expr::is_row_ahead;
622
623    use arrow::array::{ArrayRef, Float64Array};
624    use arrow::compute::SortOptions;
625    use datafusion_common::Result;
626
627    #[test]
628    fn test_is_row_ahead() -> Result<()> {
629        let old_values: ArrayRef =
630            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]));
631
632        let new_values1: ArrayRef = Arc::new(Float64Array::from(vec![11.0]));
633        let new_values2: ArrayRef = Arc::new(Float64Array::from(vec![10.0]));
634
635        assert!(is_row_ahead(
636            &old_values,
637            Some(&new_values1),
638            &SortOptions {
639                descending: false,
640                nulls_first: false
641            }
642        )?);
643        assert!(!is_row_ahead(
644            &old_values,
645            Some(&new_values2),
646            &SortOptions {
647                descending: false,
648                nulls_first: false
649            }
650        )?);
651
652        Ok(())
653    }
654}