Skip to main content

oxigdal_query/executor/
window.rs

1//! SQL:2003 window-function evaluation kernel.
2//!
3//! This module implements a self-contained window-evaluation engine over an
4//! already-materialized set of rows. It is intentionally decoupled from the SQL
5//! parser/planner: callers drive it directly by constructing a [`WindowFunction`]
6//! and a [`WindowSpec`], then calling [`evaluate_window`] (closure-driven) or
7//! [`evaluate_window_batch`] (over a [`RecordBatch`]).
8//!
9//! # Supported functions
10//!
11//! - `ROW_NUMBER()` — sequential 1-based ordinal within each partition.
12//! - `RANK()` — competition ranking: equal order keys share a rank and the next
13//!   distinct key skips ranks (`1, 1, 3, ...`).
14//! - `DENSE_RANK()` — like `RANK()` but without gaps (`1, 1, 2, ...`).
15//! - `LAG(expr[, offset[, default]])` — value of `expr` `offset` rows before the
16//!   current row inside the sorted partition; out-of-range yields the supplied
17//!   default or `NULL`.
18//! - `LEAD(expr[, offset[, default]])` — symmetric to `LAG` but forward.
19//! - `FIRST_VALUE(expr)` / `LAST_VALUE(expr)` — first / last value of `expr` in
20//!   the sorted partition.
21//! - `NTH_VALUE(expr, n)` — the `n`-th (1-based) value of `expr` in the sorted
22//!   partition, or `NULL` when the partition has fewer than `n` rows.
23//!
24//! # Row model
25//!
26//! The crate stores data column-by-column ([`RecordBatch`] / [`ColumnData`]),
27//! so there is no concrete "row" struct to borrow. The kernel therefore
28//! addresses values by `(row_index, column_index)` pairs and pulls them through
29//! a `value_at` closure. Column indices in [`WindowSpec`] and the `target_column`
30//! arguments are positions into that columnar model, exactly as in
31//! [`crate::executor::aggregate`] and [`crate::executor::sort`].
32//!
33//! # Value ordering
34//!
35//! Ordering follows the same rules as [`crate::executor::sort`]: real values
36//! compare naturally, NULLs sort *after* non-NULLs in ascending order, and a
37//! descending [`OrderKey`] simply reverses the per-key ordering. Numeric values
38//! of differing widths are compared after promotion (matching the coercion used
39//! by [`crate::executor::filter`]). The comparison never assumes the full set of
40//! [`Value`] variants, so newer variants degrade gracefully to "equal".
41
42use crate::error::{QueryError, Result};
43use crate::executor::filter::Value;
44use crate::executor::scan::{ColumnData, RecordBatch};
45use std::cmp::Ordering;
46use std::collections::HashMap;
47
48/// A window function to evaluate over partitioned, ordered rows.
49#[derive(Debug, Clone, PartialEq)]
50pub enum WindowFunction {
51    /// `ROW_NUMBER()` — 1-based sequential ordinal within the partition.
52    RowNumber,
53    /// `RANK()` — competition ranking with gaps after ties.
54    Rank,
55    /// `DENSE_RANK()` — ranking without gaps after ties.
56    DenseRank,
57    /// `LAG(expr[, offset[, default]])`.
58    Lag {
59        /// Number of rows to look backward (defaults to 1 in the constructor).
60        offset: usize,
61        /// Value to use when the lookup falls before the partition start.
62        default: Option<Value>,
63    },
64    /// `LEAD(expr[, offset[, default]])`.
65    Lead {
66        /// Number of rows to look forward (defaults to 1 in the constructor).
67        offset: usize,
68        /// Value to use when the lookup falls past the partition end.
69        default: Option<Value>,
70    },
71    /// `FIRST_VALUE(expr)` — first value of `expr` in the sorted partition.
72    FirstValue,
73    /// `LAST_VALUE(expr)` — last value of `expr` in the sorted partition.
74    LastValue,
75    /// `NTH_VALUE(expr, n)` — the `n`-th (1-based) value in the sorted partition.
76    NthValue {
77        /// 1-based position to fetch; `0` is treated as out of range.
78        n: usize,
79    },
80}
81
82impl WindowFunction {
83    /// `LAG(expr)` with the default offset of 1 and a `NULL` fallback.
84    pub fn lag() -> Self {
85        WindowFunction::Lag {
86            offset: 1,
87            default: None,
88        }
89    }
90
91    /// `LAG(expr, offset)` with a `NULL` fallback.
92    pub fn lag_offset(offset: usize) -> Self {
93        WindowFunction::Lag {
94            offset,
95            default: None,
96        }
97    }
98
99    /// `LAG(expr, offset, default)`.
100    pub fn lag_offset_default(offset: usize, default: Value) -> Self {
101        WindowFunction::Lag {
102            offset,
103            default: Some(default),
104        }
105    }
106
107    /// `LEAD(expr)` with the default offset of 1 and a `NULL` fallback.
108    pub fn lead() -> Self {
109        WindowFunction::Lead {
110            offset: 1,
111            default: None,
112        }
113    }
114
115    /// `LEAD(expr, offset)` with a `NULL` fallback.
116    pub fn lead_offset(offset: usize) -> Self {
117        WindowFunction::Lead {
118            offset,
119            default: None,
120        }
121    }
122
123    /// `LEAD(expr, offset, default)`.
124    pub fn lead_offset_default(offset: usize, default: Value) -> Self {
125        WindowFunction::Lead {
126            offset,
127            default: Some(default),
128        }
129    }
130
131    /// `NTH_VALUE(expr, n)`.
132    pub fn nth_value(n: usize) -> Self {
133        WindowFunction::NthValue { n }
134    }
135
136    /// Whether the function reads a target column (`LAG`/`LEAD`/`*_VALUE`).
137    ///
138    /// Ranking functions (`ROW_NUMBER`, `RANK`, `DENSE_RANK`) ignore the target
139    /// column entirely.
140    pub fn reads_target(&self) -> bool {
141        matches!(
142            self,
143            WindowFunction::Lag { .. }
144                | WindowFunction::Lead { .. }
145                | WindowFunction::FirstValue
146                | WindowFunction::LastValue
147                | WindowFunction::NthValue { .. }
148        )
149    }
150}
151
152/// A single `ORDER BY` key inside an `OVER (... ORDER BY ...)` clause.
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub struct OrderKey {
155    /// Column index (position into the columnar row model).
156    pub column: usize,
157    /// `true` for `ASC`, `false` for `DESC`.
158    pub ascending: bool,
159}
160
161impl OrderKey {
162    /// Construct an ascending order key on `column`.
163    pub fn asc(column: usize) -> Self {
164        Self {
165            column,
166            ascending: true,
167        }
168    }
169
170    /// Construct a descending order key on `column`.
171    pub fn desc(column: usize) -> Self {
172        Self {
173            column,
174            ascending: false,
175        }
176    }
177}
178
179/// The `OVER (PARTITION BY ... ORDER BY ...)` specification.
180#[derive(Debug, Clone, Default, PartialEq)]
181pub struct WindowSpec {
182    /// Columns used to partition rows (empty means a single partition).
183    pub partition_by: Vec<usize>,
184    /// Ordering keys applied within each partition.
185    pub order_by: Vec<OrderKey>,
186}
187
188impl WindowSpec {
189    /// Build a spec from partition and ordering keys.
190    pub fn new(partition_by: Vec<usize>, order_by: Vec<OrderKey>) -> Self {
191        Self {
192            partition_by,
193            order_by,
194        }
195    }
196
197    /// A spec with no partitioning and the given ordering keys.
198    pub fn ordered(order_by: Vec<OrderKey>) -> Self {
199        Self {
200            partition_by: Vec::new(),
201            order_by,
202        }
203    }
204}
205
206/// Compare two [`Value`]s using SQL ordering semantics.
207///
208/// This mirrors [`crate::executor::sort`]: non-NULL values compare naturally,
209/// NULLs sort *after* non-NULLs (so ascending order places NULLs last), and
210/// numeric values of differing widths are compared after promotion (matching the
211/// coercion in [`crate::executor::filter`]). The match deliberately ends with a
212/// catch-all arm so that [`Value`] variants this kernel does not model (for
213/// example geometry values on the integration branch) never cause a panic.
214fn compare_values(left: &Value, right: &Value) -> Ordering {
215    // NULL handling first, identical to the sort executor: a present value sorts
216    // before NULL, two NULLs are equal.
217    match (left, right) {
218        (Value::Null, Value::Null) => return Ordering::Equal,
219        (Value::Null, _) => return Ordering::Greater,
220        (_, Value::Null) => return Ordering::Less,
221        _ => {}
222    }
223
224    match (left, right) {
225        (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
226        (Value::String(a), Value::String(b)) => a.cmp(b),
227
228        // Same-width integers.
229        (Value::Int32(a), Value::Int32(b)) => a.cmp(b),
230        (Value::Int64(a), Value::Int64(b)) => a.cmp(b),
231
232        // Mixed-width integers: promote to i64.
233        (Value::Int32(a), Value::Int64(b)) => (*a as i64).cmp(b),
234        (Value::Int64(a), Value::Int32(b)) => a.cmp(&(*b as i64)),
235
236        // Same-width floats.
237        (Value::Float32(a), Value::Float32(b)) => float_cmp(*a as f64, *b as f64),
238        (Value::Float64(a), Value::Float64(b)) => float_cmp(*a, *b),
239
240        // Mixed-width floats: promote to f64.
241        (Value::Float32(a), Value::Float64(b)) => float_cmp(*a as f64, *b),
242        (Value::Float64(a), Value::Float32(b)) => float_cmp(*a, *b as f64),
243
244        // Integer vs float: compare as f64 (matches filter coercion).
245        (Value::Int32(a), Value::Float32(b)) => float_cmp(*a as f64, *b as f64),
246        (Value::Int32(a), Value::Float64(b)) => float_cmp(*a as f64, *b),
247        (Value::Int64(a), Value::Float32(b)) => float_cmp(*a as f64, *b as f64),
248        (Value::Int64(a), Value::Float64(b)) => float_cmp(*a as f64, *b),
249        (Value::Float32(a), Value::Int32(b)) => float_cmp(*a as f64, *b as f64),
250        (Value::Float32(a), Value::Int64(b)) => float_cmp(*a as f64, *b as f64),
251        (Value::Float64(a), Value::Int32(b)) => float_cmp(*a, *b as f64),
252        (Value::Float64(a), Value::Int64(b)) => float_cmp(*a, *b as f64),
253
254        // Unknown / incomparable combinations (including variants this kernel
255        // does not model) are treated as equal so ordering stays total-ish and
256        // never panics.
257        _ => Ordering::Equal,
258    }
259}
260
261/// Total order over `f64` for ordering purposes (NaN sorts last).
262fn float_cmp(a: f64, b: f64) -> Ordering {
263    match a.partial_cmp(&b) {
264        Some(ordering) => ordering,
265        None => {
266            // Push NaN to the end deterministically.
267            match (a.is_nan(), b.is_nan()) {
268                (true, true) => Ordering::Equal,
269                (true, false) => Ordering::Greater,
270                (false, true) => Ordering::Less,
271                (false, false) => Ordering::Equal,
272            }
273        }
274    }
275}
276
277/// Whether two order-key tuples are equal for ranking purposes.
278fn order_keys_equal(a: &[Value], b: &[Value]) -> bool {
279    a.len() == b.len()
280        && a.iter()
281            .zip(b)
282            .all(|(x, y)| compare_values(x, y) == Ordering::Equal)
283}
284
285/// A hashable, order-preserving partition key.
286///
287/// Partition equality must group identical key tuples while preserving the
288/// first-seen order of distinct keys. Rather than require [`Value`] to be
289/// `Hash`/`Eq` (it is neither — it carries floats), we derive a stable string
290/// fingerprint from each key value, in the same spirit as the join executor's
291/// `to_hash_key`. The catch-all arm keeps unmodeled variants from breaking.
292///
293/// The `#[allow(unreachable_patterns)]` is deliberate: the integration branch
294/// may carry additional [`Value`] variants (for example geometry values) that do
295/// not exist at this base, so the trailing `other` arm is required for
296/// forward-compatibility even though every current variant is matched explicitly.
297#[allow(unreachable_patterns)]
298fn partition_fingerprint(values: &[Value]) -> String {
299    let mut key = String::new();
300    for value in values {
301        match value {
302            Value::Null => key.push_str("N|"),
303            Value::Boolean(b) => {
304                key.push_str("b:");
305                key.push_str(if *b { "1" } else { "0" });
306                key.push('|');
307            }
308            Value::Int32(i) => {
309                key.push_str("i:");
310                key.push_str(&(*i as i64).to_string());
311                key.push('|');
312            }
313            Value::Int64(i) => {
314                key.push_str("i:");
315                key.push_str(&i.to_string());
316                key.push('|');
317            }
318            Value::Float32(fl) => {
319                key.push_str("f:");
320                key.push_str(&format!("{:?}", *fl as f64));
321                key.push('|');
322            }
323            Value::Float64(fl) => {
324                key.push_str("f:");
325                key.push_str(&format!("{:?}", fl));
326                key.push('|');
327            }
328            Value::String(s) => {
329                key.push_str("s:");
330                // Length-prefix to avoid delimiter collisions across values.
331                key.push_str(&s.len().to_string());
332                key.push(':');
333                key.push_str(s);
334                key.push('|');
335            }
336            // Unmodeled variants get a distinct, Debug-derived fingerprint so
337            // distinct values stay distinct without an exhaustive match.
338            other => {
339                key.push_str("x:");
340                key.push_str(&format!("{:?}", other));
341                key.push('|');
342            }
343        }
344    }
345    key
346}
347
348/// Evaluate a window function over `num_rows` rows.
349///
350/// `value_at(row_index, column_index)` must return the [`Value`] stored at that
351/// position; this is the closure-based analogue of indexing a concrete row. The
352/// `target_column` is the column referenced by value-returning functions
353/// (`LAG`, `LEAD`, `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE`); it is ignored by
354/// the ranking functions but still validated so callers fail fast on bad input.
355///
356/// Returns exactly one [`Value`] per input row, in **input order**.
357pub fn evaluate_window(
358    func: &WindowFunction,
359    spec: &WindowSpec,
360    num_rows: usize,
361    target_column: usize,
362    value_at: impl Fn(usize, usize) -> Value,
363) -> Result<Vec<Value>> {
364    if num_rows == 0 {
365        return Ok(Vec::new());
366    }
367
368    // (1) Group rows into partitions, preserving the first-seen order of keys.
369    let partitions = build_partitions(spec, num_rows, &value_at);
370
371    // Output buffer, scattered back to input order at the end.
372    let mut output = vec![Value::Null; num_rows];
373
374    for partition in &partitions {
375        // (2) Stably sort the partition's row indices by the ORDER BY keys.
376        let sorted = sort_partition(spec, partition, &value_at);
377
378        // (3) Assign per-function values along the sorted order.
379        match func {
380            WindowFunction::RowNumber => {
381                assign_row_number(&sorted, &mut output);
382            }
383            WindowFunction::Rank => {
384                assign_rank(spec, &sorted, &value_at, &mut output, true);
385            }
386            WindowFunction::DenseRank => {
387                assign_rank(spec, &sorted, &value_at, &mut output, false);
388            }
389            WindowFunction::Lag { offset, default } => {
390                assign_lag_lead(
391                    &sorted,
392                    target_column,
393                    &value_at,
394                    default,
395                    *offset as isize,
396                    true,
397                    &mut output,
398                );
399            }
400            WindowFunction::Lead { offset, default } => {
401                assign_lag_lead(
402                    &sorted,
403                    target_column,
404                    &value_at,
405                    default,
406                    *offset as isize,
407                    false,
408                    &mut output,
409                );
410            }
411            WindowFunction::FirstValue => {
412                assign_nth_in_partition(&sorted, target_column, &value_at, Some(0), &mut output);
413            }
414            WindowFunction::LastValue => {
415                let last = sorted.len().checked_sub(1);
416                assign_nth_in_partition(&sorted, target_column, &value_at, last, &mut output);
417            }
418            WindowFunction::NthValue { n } => {
419                // 1-based; 0 (or out of range) yields NULL via checked_sub.
420                let index = n.checked_sub(1);
421                assign_nth_in_partition(&sorted, target_column, &value_at, index, &mut output);
422            }
423        }
424    }
425
426    Ok(output)
427}
428
429/// Evaluate a window function directly over a [`RecordBatch`].
430///
431/// This is a convenience wrapper around [`evaluate_window`] that extracts values
432/// from columns exactly the way [`crate::executor::filter`] does. Column indices
433/// in `spec` and `target_column` are positions into `batch.columns`.
434pub fn evaluate_window_batch(
435    func: &WindowFunction,
436    spec: &WindowSpec,
437    target_column: usize,
438    batch: &RecordBatch,
439) -> Result<Vec<Value>> {
440    let max_column = spec
441        .partition_by
442        .iter()
443        .chain(spec.order_by.iter().map(|key| &key.column))
444        .copied()
445        .max();
446
447    if let Some(max_column) = max_column {
448        if max_column >= batch.columns.len() {
449            return Err(QueryError::execution(format!(
450                "Window column index {} out of bounds (batch has {} columns)",
451                max_column,
452                batch.columns.len()
453            )));
454        }
455    }
456
457    if func.reads_target() && target_column >= batch.columns.len() {
458        return Err(QueryError::execution(format!(
459            "Window target column index {} out of bounds (batch has {} columns)",
460            target_column,
461            batch.columns.len()
462        )));
463    }
464
465    evaluate_window(func, spec, batch.num_rows, target_column, |row, column| {
466        column_value(&batch.columns[column], row)
467    })
468}
469
470/// Extract a [`Value`] from a column at `row_idx`, mirroring the filter executor.
471///
472/// Binary columns have no scalar [`Value`] representation, so they surface as
473/// `Value::Null` here (window ordering/partitioning over binary is not defined).
474fn column_value(column: &ColumnData, row_idx: usize) -> Value {
475    match column {
476        ColumnData::Boolean(data) => data
477            .get(row_idx)
478            .and_then(|v| v.as_ref())
479            .map(|&v| Value::Boolean(v))
480            .unwrap_or(Value::Null),
481        ColumnData::Int32(data) => data
482            .get(row_idx)
483            .and_then(|v| v.as_ref())
484            .map(|&v| Value::Int32(v))
485            .unwrap_or(Value::Null),
486        ColumnData::Int64(data) => data
487            .get(row_idx)
488            .and_then(|v| v.as_ref())
489            .map(|&v| Value::Int64(v))
490            .unwrap_or(Value::Null),
491        ColumnData::Float32(data) => data
492            .get(row_idx)
493            .and_then(|v| v.as_ref())
494            .map(|&v| Value::Float32(v))
495            .unwrap_or(Value::Null),
496        ColumnData::Float64(data) => data
497            .get(row_idx)
498            .and_then(|v| v.as_ref())
499            .map(|&v| Value::Float64(v))
500            .unwrap_or(Value::Null),
501        ColumnData::String(data) => data
502            .get(row_idx)
503            .and_then(|v| v.as_ref())
504            .map(|v| Value::String(v.clone()))
505            .unwrap_or(Value::Null),
506        ColumnData::Binary(_) => Value::Null,
507    }
508}
509
510/// Group rows into partitions, preserving the first-seen order of distinct keys.
511fn build_partitions(
512    spec: &WindowSpec,
513    num_rows: usize,
514    value_at: &impl Fn(usize, usize) -> Value,
515) -> Vec<Vec<usize>> {
516    if spec.partition_by.is_empty() {
517        // A single partition containing every row in input order.
518        return vec![(0..num_rows).collect()];
519    }
520
521    let mut order: Vec<String> = Vec::new();
522    let mut groups: HashMap<String, Vec<usize>> = HashMap::new();
523
524    for row in 0..num_rows {
525        let key_values: Vec<Value> = spec
526            .partition_by
527            .iter()
528            .map(|&column| value_at(row, column))
529            .collect();
530        let fingerprint = partition_fingerprint(&key_values);
531
532        match groups.get_mut(&fingerprint) {
533            Some(bucket) => bucket.push(row),
534            None => {
535                order.push(fingerprint.clone());
536                groups.insert(fingerprint, vec![row]);
537            }
538        }
539    }
540
541    order
542        .into_iter()
543        .filter_map(|fingerprint| groups.remove(&fingerprint))
544        .collect()
545}
546
547/// Stably sort a partition's row indices according to the `ORDER BY` keys.
548///
549/// Stability matters: rows with equal keys (or no `ORDER BY` at all) keep their
550/// relative input order, just as the sort executor relies on `sort_by`.
551fn sort_partition(
552    spec: &WindowSpec,
553    partition: &[usize],
554    value_at: &impl Fn(usize, usize) -> Value,
555) -> Vec<usize> {
556    let mut sorted = partition.to_vec();
557    if spec.order_by.is_empty() {
558        return sorted;
559    }
560
561    sorted.sort_by(|&a, &b| {
562        for key in &spec.order_by {
563            let left = value_at(a, key.column);
564            let right = value_at(b, key.column);
565            let mut ordering = compare_values(&left, &right);
566            if !key.ascending {
567                ordering = ordering.reverse();
568            }
569            if ordering != Ordering::Equal {
570                return ordering;
571            }
572        }
573        Ordering::Equal
574    });
575
576    sorted
577}
578
579/// Materialize the order-key tuple for `row`.
580fn order_key_values(
581    spec: &WindowSpec,
582    row: usize,
583    value_at: &impl Fn(usize, usize) -> Value,
584) -> Vec<Value> {
585    spec.order_by
586        .iter()
587        .map(|key| value_at(row, key.column))
588        .collect()
589}
590
591/// `ROW_NUMBER()`: assign `1, 2, 3, ...` along the sorted order.
592fn assign_row_number(sorted: &[usize], output: &mut [Value]) {
593    for (position, &row) in sorted.iter().enumerate() {
594        output[row] = Value::Int64((position + 1) as i64);
595    }
596}
597
598/// `RANK()` / `DENSE_RANK()` over a sorted partition.
599///
600/// With `competition == true` this is `RANK()` (gaps after ties); with
601/// `competition == false` it is `DENSE_RANK()` (no gaps). Ties are decided by
602/// comparing the full `ORDER BY` key tuple. With no `ORDER BY`, every row is a
603/// tie and shares rank `1`.
604fn assign_rank(
605    spec: &WindowSpec,
606    sorted: &[usize],
607    value_at: &impl Fn(usize, usize) -> Value,
608    output: &mut [Value],
609    competition: bool,
610) {
611    let mut current_rank: i64 = 0;
612    let mut previous_key: Option<Vec<Value>> = None;
613
614    for (position, &row) in sorted.iter().enumerate() {
615        // 1-based ordinal of this row in the sorted partition; for RANK() this is
616        // the rank assigned when a new (distinct) order-key group begins.
617        let ordinal = (position + 1) as i64;
618        let key = order_key_values(spec, row, value_at);
619
620        let is_new_group = match &previous_key {
621            None => true,
622            Some(prev) => !order_keys_equal(prev, &key),
623        };
624
625        if is_new_group {
626            current_rank = if competition {
627                ordinal
628            } else {
629                current_rank + 1
630            };
631            previous_key = Some(key);
632        }
633
634        output[row] = Value::Int64(current_rank);
635    }
636}
637
638/// `LAG` / `LEAD`: read `target_column` at a signed offset within the partition.
639///
640/// `backward == true` implements `LAG` (look at `position - offset`); otherwise
641/// `LEAD` (look at `position + offset`). Out-of-range lookups yield the supplied
642/// `default` if present, else `Value::Null`.
643fn assign_lag_lead(
644    sorted: &[usize],
645    target_column: usize,
646    value_at: &impl Fn(usize, usize) -> Value,
647    default: &Option<Value>,
648    offset: isize,
649    backward: bool,
650    output: &mut [Value],
651) {
652    let len = sorted.len() as isize;
653    let signed_offset = if backward { -offset } else { offset };
654
655    for (position, &row) in sorted.iter().enumerate() {
656        let target_index = position as isize + signed_offset;
657        let value = if target_index >= 0 && target_index < len {
658            let source_row = sorted[target_index as usize];
659            value_at(source_row, target_column)
660        } else {
661            default.clone().unwrap_or(Value::Null)
662        };
663        output[row] = value;
664    }
665}
666
667/// `FIRST_VALUE` / `LAST_VALUE` / `NTH_VALUE`: broadcast one positional value to
668/// every row of the partition.
669///
670/// `index` is the 0-based position within the sorted partition (already adjusted
671/// from the 1-based SQL position by the caller). `None`, or an index past the
672/// end, broadcasts `Value::Null`.
673fn assign_nth_in_partition(
674    sorted: &[usize],
675    target_column: usize,
676    value_at: &impl Fn(usize, usize) -> Value,
677    index: Option<usize>,
678    output: &mut [Value],
679) {
680    let picked = match index {
681        Some(idx) => sorted
682            .get(idx)
683            .map(|&source_row| value_at(source_row, target_column))
684            .unwrap_or(Value::Null),
685        None => Value::Null,
686    };
687
688    for &row in sorted {
689        output[row] = picked.clone();
690    }
691}
692
693#[cfg(test)]
694mod tests {
695    use super::*;
696
697    /// Build a closure over a small in-memory column-major table for unit tests.
698    fn table(columns: Vec<Vec<Value>>) -> (usize, impl Fn(usize, usize) -> Value) {
699        let num_rows = columns.first().map(|c| c.len()).unwrap_or(0);
700        (num_rows, move |row: usize, col: usize| {
701            columns[col][row].clone()
702        })
703    }
704
705    #[test]
706    fn unit_row_number_basic() -> Result<()> {
707        // Single partition, ordered by column 0 ascending.
708        let (rows, value_at) = table(vec![vec![
709            Value::Int64(30),
710            Value::Int64(10),
711            Value::Int64(20),
712        ]]);
713        let spec = WindowSpec::ordered(vec![OrderKey::asc(0)]);
714        let out = evaluate_window(&WindowFunction::RowNumber, &spec, rows, 0, value_at)?;
715        // Sorted order is 10(row1),20(row2),30(row0) => ranks scattered to input.
716        assert_eq!(out[1], Value::Int64(1));
717        assert_eq!(out[2], Value::Int64(2));
718        assert_eq!(out[0], Value::Int64(3));
719        Ok(())
720    }
721
722    #[test]
723    fn unit_compare_nulls_last() {
724        assert_eq!(
725            compare_values(&Value::Int64(1), &Value::Null),
726            Ordering::Less
727        );
728        assert_eq!(
729            compare_values(&Value::Null, &Value::Int64(1)),
730            Ordering::Greater
731        );
732        assert_eq!(compare_values(&Value::Null, &Value::Null), Ordering::Equal);
733    }
734
735    #[test]
736    fn unit_compare_mixed_numeric() {
737        assert_eq!(
738            compare_values(&Value::Int32(5), &Value::Float64(5.5)),
739            Ordering::Less
740        );
741        assert_eq!(
742            compare_values(&Value::Float64(2.0), &Value::Int64(2)),
743            Ordering::Equal
744        );
745    }
746}