Skip to main content

simd_kernels/kernels/
window.rs

1// Copyright (c) 2025 SpaceCell Enterprises Ltd
2// SPDX-License-Identifier: AGPL-3.0-or-later
3// Commercial licensing available. See LICENSE and LICENSING.md.
4
5//! # **Window Functions Kernels Module** - *High-Performance Analytical Window Operations*
6//!
7//! Advanced window function kernels for sliding window computations,
8//! ranking operations, and positional analytics with SIMD acceleration and null-aware semantics.
9//! Backbone of time series analysis, analytical SQL window functions, and chunked streaming computations.
10//!
11//! ## Core Operations
12//! - **Moving averages**: Rolling mean calculations with configurable window sizes
13//! - **Cumulative functions**: Running sums, products, and statistical aggregations  
14//! - **Ranking functions**: ROW_NUMBER, RANK, DENSE_RANK with tie-handling strategies
15//! - **Lead/lag operations**: Positional value access with configurable offsets
16//! - **Percentile functions**: Moving quantile calculations with interpolation support
17//! - **Window aggregates**: MIN, MAX, SUM operations over sliding windows
18
19include!(concat!(env!("OUT_DIR"), "/simd_lanes.rs"));
20
21use std::marker::PhantomData;
22
23use minarrow::{
24    Bitmask, BooleanAVT, BooleanArray, FloatArray, Integer, IntegerArray, Length, MaskedArray,
25    Offset, StringArray, Vec64,
26    aliases::{FloatAVT, IntegerAVT},
27    enums::error::KernelError,
28    vec64,
29};
30use num_traits::{Float, Num, NumCast, One, Zero};
31
32use minarrow::StringAVT;
33use minarrow::utils::confirm_mask_capacity;
34
35// Helpers
36#[inline(always)]
37fn new_null_mask(len: usize) -> Bitmask {
38    Bitmask::new_set_all(len, false)
39}
40
41#[inline(always)]
42fn prealloc_vec<T: Copy>(len: usize) -> Vec64<T> {
43    let mut v = Vec64::<T>::with_capacity(len);
44    unsafe { v.set_len(len) };
45    v
46}
47
48// Rolling kernels (sum, product, min, max, mean, count)
49
50/// Zero-allocation variant: writes directly to caller's output buffers.
51///
52/// Generic sliding window aggregator for kernels that allow an
53/// incremental push and pop update (sum, product, etc.).
54/// Always emits the running aggregate, even when the subwindow has nulls.
55/// Only flags "valid" once the full subwindow has been seen.
56///
57/// Panics if `out.len() != data.len()` or `out_mask.capacity() < data.len()`.
58#[inline(always)]
59fn rolling_push_pop_to<T, FAdd, FRem>(
60    data: &[T],
61    mask: Option<&Bitmask>,
62    subwindow: usize,
63    mut add: FAdd,
64    mut remove: FRem,
65    zero: T,
66    out: &mut [T],
67    out_mask: &mut Bitmask,
68) where
69    T: Copy,
70    FAdd: FnMut(T, T) -> T,
71    FRem: FnMut(T, T) -> T,
72{
73    let n = data.len();
74    assert_eq!(
75        n,
76        out.len(),
77        "rolling_push_pop_to: input/output length mismatch"
78    );
79
80    if subwindow == 0 {
81        for slot in out.iter_mut() {
82            *slot = zero;
83        }
84        return;
85    }
86
87    let mut agg = zero;
88    let mut invalids = 0usize;
89    for i in 0..n {
90        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
91            agg = add(agg, data[i]);
92        } else {
93            invalids += 1;
94        }
95        if i + 1 > subwindow {
96            let j = i + 1 - subwindow - 1;
97            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
98                agg = remove(agg, data[j]);
99            } else {
100                invalids -= 1;
101            }
102        }
103        if i + 1 < subwindow {
104            unsafe { out_mask.set_unchecked(i, false) };
105            out[i] = zero;
106        } else {
107            let ok = invalids == 0;
108            unsafe { out_mask.set_unchecked(i, ok) };
109            out[i] = agg;
110        }
111    }
112}
113
114/// Allocating variant: creates new output buffers internally.
115#[inline(always)]
116pub fn rolling_push_pop<T, FAdd, FRem>(
117    data: &[T],
118    mask: Option<&Bitmask>,
119    subwindow: usize,
120    add: FAdd,
121    remove: FRem,
122    zero: T,
123) -> (Vec64<T>, Bitmask)
124where
125    T: Copy,
126    FAdd: FnMut(T, T) -> T,
127    FRem: FnMut(T, T) -> T,
128{
129    let n = data.len();
130    let mut out = prealloc_vec::<T>(n);
131    let mut out_mask = new_null_mask(n);
132    rolling_push_pop_to(
133        data,
134        mask,
135        subwindow,
136        add,
137        remove,
138        zero,
139        &mut out,
140        &mut out_mask,
141    );
142    (out, out_mask)
143}
144
145/// Zero-allocation variant: writes directly to caller's output buffers.
146///
147/// Generic rolling extreme aggregator (min/max) for a subwindow over a slice.
148///
149/// Panics if `out.len() != data.len()`.
150#[inline(always)]
151pub fn rolling_extreme_to<T, F>(
152    data: &[T],
153    mask: Option<&Bitmask>,
154    subwindow: usize,
155    mut better: F,
156    zero: T,
157    out: &mut [T],
158    out_mask: &mut Bitmask,
159) where
160    T: Copy,
161    F: FnMut(&T, &T) -> bool,
162{
163    let n = data.len();
164    assert_eq!(
165        n,
166        out.len(),
167        "rolling_extreme_to: input/output length mismatch"
168    );
169
170    if subwindow == 0 {
171        return;
172    }
173
174    for i in 0..n {
175        if i + 1 < subwindow {
176            unsafe { out_mask.set_unchecked(i, false) };
177            out[i] = zero;
178            continue;
179        }
180        let start = i + 1 - subwindow;
181        let mut found = false;
182        let mut extreme = zero;
183        for j in start..=i {
184            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
185                if !found {
186                    extreme = data[j];
187                    found = true;
188                } else if better(&data[j], &extreme) {
189                    extreme = data[j];
190                }
191            } else {
192                found = false;
193                break;
194            }
195        }
196        unsafe { out_mask.set_unchecked(i, found) };
197        out[i] = if found { extreme } else { zero };
198    }
199}
200
201/// Allocating variant: creates new output buffers internally.
202#[inline(always)]
203pub fn rolling_extreme<T, F>(
204    data: &[T],
205    mask: Option<&Bitmask>,
206    subwindow: usize,
207    better: F,
208    zero: T,
209) -> (Vec64<T>, Bitmask)
210where
211    T: Copy,
212    F: FnMut(&T, &T) -> bool,
213{
214    let n = data.len();
215    let mut out = prealloc_vec::<T>(n);
216    let mut out_mask = new_null_mask(n);
217    rolling_extreme_to(data, mask, subwindow, better, zero, &mut out, &mut out_mask);
218    (out, out_mask)
219}
220
221/// Zero-allocation variant: writes directly to caller's output buffers.
222///
223/// Computes rolling sums over a sliding window for integer data with null-aware semantics.
224/// Panics if `out.len() != data.len()`.
225#[inline]
226pub fn rolling_sum_int_to<T: Num + Copy + Zero>(
227    data: &[T],
228    mask: Option<&Bitmask>,
229    subwindow: usize,
230    out: &mut [T],
231    out_mask: &mut Bitmask,
232) {
233    rolling_push_pop_to(
234        data,
235        mask,
236        subwindow,
237        |a, b| a + b,
238        |a, b| a - b,
239        T::zero(),
240        out,
241        out_mask,
242    );
243    if mask.is_some() && subwindow > 0 && subwindow - 1 < out.len() {
244        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
245        out[subwindow - 1] = T::zero();
246    }
247}
248
249/// Computes rolling sums over a sliding window for integer data with null-aware semantics.
250///
251/// Applies a sliding window of configurable size to compute cumulative sums, employing
252/// incremental computation to avoid O(n²) complexity through efficient push-pop operations.
253/// Each position in the output represents the sum of values within the preceding window.
254///
255/// ## Parameters
256/// * `window` - Integer array view containing the data, offset, and length information
257/// * `subwindow` - Size of the sliding window (number of elements to sum)
258///
259/// ## Returns
260/// Returns an `IntegerArray<T>` containing:
261/// - Rolling sums for each position where a complete window exists
262/// - Zero values for positions before the window is complete
263/// - Null mask indicating validity (false for incomplete windows or null-contaminated windows)
264#[inline]
265pub fn rolling_sum_int<T: Num + Copy + Zero>(
266    window: IntegerAVT<'_, T>,
267    subwindow: usize,
268) -> IntegerArray<T> {
269    let (arr, offset, len) = window;
270    let data = &arr.data[offset..offset + len];
271    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
272    let mut out = prealloc_vec::<T>(len);
273    let mut out_mask = new_null_mask(len);
274    rolling_sum_int_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
275    IntegerArray {
276        data: out.into(),
277        null_mask: Some(out_mask),
278    }
279}
280
281/// Zero-allocation variant: writes directly to caller's output buffers.
282///
283/// Computes rolling sums over a sliding window for floating-point data.
284/// Panics if `out.len() != data.len()`.
285#[inline]
286pub fn rolling_sum_float_to<T: Float + Copy + Zero>(
287    data: &[T],
288    mask: Option<&Bitmask>,
289    subwindow: usize,
290    out: &mut [T],
291    out_mask: &mut Bitmask,
292) {
293    rolling_push_pop_to(
294        data,
295        mask,
296        subwindow,
297        |a, b| a + b,
298        |a, b| a - b,
299        T::zero(),
300        out,
301        out_mask,
302    );
303    if subwindow > 0 && subwindow - 1 < out.len() {
304        out_mask.set(subwindow - 1, false);
305        out[subwindow - 1] = T::zero();
306    }
307}
308
309/// Computes rolling sums over a sliding window for floating-point data with IEEE 754 compliance.
310///
311/// Applies incremental computation to calculate cumulative sums across sliding windows,
312/// maintaining numerical stability through careful accumulation strategies. Handles
313/// special floating-point values (infinity, NaN) according to IEEE 754 semantics.
314///
315/// ## Parameters
316/// * `window` - Float array view containing the data, offset, and length information
317/// * `subwindow` - Size of the sliding window for summation
318///
319/// ## Returns
320/// Returns a `FloatArray<T>` containing:
321/// - Rolling sums computed incrementally for efficiency
322/// - Zero values for positions with incomplete windows
323/// - Proper null mask for window validity tracking
324#[inline]
325pub fn rolling_sum_float<T: Float + Copy + Zero>(
326    window: FloatAVT<'_, T>,
327    subwindow: usize,
328) -> FloatArray<T> {
329    let (arr, offset, len) = window;
330    let data = &arr.data[offset..offset + len];
331    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
332    let mut out = prealloc_vec::<T>(len);
333    let mut out_mask = new_null_mask(len);
334    rolling_sum_float_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
335    FloatArray {
336        data: out.into(),
337        null_mask: Some(out_mask),
338    }
339}
340
341/// Zero-allocation variant: writes directly to caller's output buffers.
342///
343/// Computes rolling sums over i32 data (pre-converted from booleans).
344/// Panics if `out.len() != data.len()`.
345#[inline]
346pub fn rolling_sum_bool_to(
347    data: &[i32],
348    mask: Option<&Bitmask>,
349    subwindow: usize,
350    out: &mut [i32],
351    out_mask: &mut Bitmask,
352) {
353    rolling_push_pop_to(
354        data,
355        mask,
356        subwindow,
357        |a, b| a + b,
358        |a, b| a - b,
359        0,
360        out,
361        out_mask,
362    );
363    if subwindow > 0 && subwindow - 1 < out.len() {
364        out_mask.set(subwindow - 1, false);
365        out[subwindow - 1] = 0;
366    }
367}
368
369/// Computes rolling sums over boolean data, counting true values within sliding windows.
370///
371/// Treats boolean values as integers (true=1, false=0) and applies sliding window
372/// summation to count true occurrences within each window position. Essential for
373/// constructing conditional aggregations and boolean pattern analysis.
374///
375/// ## Parameters
376/// * `window` - Boolean array view with offset and length specifications
377/// * `subwindow` - Number of boolean values to consider in each sliding window
378///
379/// ## Returns
380/// Returns an `IntegerArray<i32>` containing:
381/// - Count of true values within each complete window
382/// - Zero for positions with incomplete windows
383/// - Null mask indicating window completeness and null contamination
384#[inline]
385pub fn rolling_sum_bool(window: BooleanAVT<'_, ()>, subwindow: usize) -> IntegerArray<i32> {
386    let (arr, offset, len) = window;
387    let bools: Vec<i32> = arr.iter_range(offset, len).map(|b| b as i32).collect();
388    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
389    let mut out = prealloc_vec::<i32>(len);
390    let mut out_mask = new_null_mask(len);
391    rolling_sum_bool_to(&bools, mask.as_ref(), subwindow, &mut out, &mut out_mask);
392    IntegerArray {
393        data: out.into(),
394        null_mask: Some(out_mask),
395    }
396}
397
398/// Zero-allocation variant: writes directly to caller's output buffers.
399#[inline]
400pub fn rolling_product_int_to<T: Num + Copy + One + Zero>(
401    data: &[T],
402    mask: Option<&Bitmask>,
403    subwindow: usize,
404    out: &mut [T],
405    out_mask: &mut Bitmask,
406) {
407    rolling_push_pop_to(
408        data,
409        mask,
410        subwindow,
411        |a, b| a * b,
412        |a, b| a / b,
413        T::one(),
414        out,
415        out_mask,
416    );
417}
418
419/// Computes rolling products over a sliding window for integer data with overflow protection.
420///
421/// Applies multiplicative aggregation across sliding windows using incremental computation
422/// through division operations. Maintains numerical stability through careful handling of
423/// zero values and potential overflow conditions in integer arithmetic.
424#[inline]
425pub fn rolling_product_int<T: Num + Copy + One + Zero>(
426    window: IntegerAVT<'_, T>,
427    subwindow: usize,
428) -> IntegerArray<T> {
429    let (arr, offset, len) = window;
430    let data = &arr.data[offset..offset + len];
431    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
432    let mut out = prealloc_vec::<T>(len);
433    let mut out_mask = new_null_mask(len);
434    rolling_product_int_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
435    IntegerArray {
436        data: out.into(),
437        null_mask: Some(out_mask),
438    }
439}
440
441/// Zero-allocation variant: writes directly to caller's output buffers.
442#[inline]
443pub fn rolling_product_float_to<T: Float + Copy + One + Zero>(
444    data: &[T],
445    mask: Option<&Bitmask>,
446    subwindow: usize,
447    out: &mut [T],
448    out_mask: &mut Bitmask,
449) {
450    rolling_push_pop_to(
451        data,
452        mask,
453        subwindow,
454        |a, b| a * b,
455        |a, b| a / b,
456        T::one(),
457        out,
458        out_mask,
459    );
460}
461
462/// Computes rolling products over floating-point data with IEEE 754 mathematical semantics.
463///
464/// Performs multiplicative aggregation using incremental computation strategies that
465/// maintain numerical precision through careful handling of special values (infinity,
466/// NaN, zero) according to IEEE 754 standards.
467#[inline]
468pub fn rolling_product_float<T: Float + Copy + One + Zero>(
469    window: FloatAVT<'_, T>,
470    subwindow: usize,
471) -> FloatArray<T> {
472    let (arr, offset, len) = window;
473    let data = &arr.data[offset..offset + len];
474    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
475    let mut out = prealloc_vec::<T>(len);
476    let mut out_mask = new_null_mask(len);
477    rolling_product_float_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
478    FloatArray {
479        data: out.into(),
480        null_mask: Some(out_mask),
481    }
482}
483
484/// Zero-allocation variant: writes directly to caller's output buffers.
485///
486/// Computes rolling logical AND over boolean data (pre-converted to i32: 1=true, 0=false).
487#[inline]
488pub fn rolling_product_bool_to(
489    data: &[i32],
490    mask: Option<&Bitmask>,
491    subwindow: usize,
492    out: &mut Bitmask,
493    out_mask: &mut Bitmask,
494) {
495    let n = data.len();
496    for i in 0..n {
497        let start = if i + 1 >= subwindow {
498            i + 1 - subwindow
499        } else {
500            0
501        };
502        let mut acc = true;
503        let mut valid = subwindow > 0 && i + 1 >= subwindow;
504        for j in start..=i {
505            let is_valid = mask.map_or(true, |m| unsafe { m.get_unchecked(j) });
506            if is_valid {
507                acc &= data[j] != 0;
508            } else {
509                valid = false;
510                break;
511            }
512        }
513        unsafe { out_mask.set_unchecked(i, valid) };
514        out.set(i, valid && acc);
515    }
516}
517
518/// Computes rolling logical AND operations over boolean data within sliding windows.
519///
520/// Treats boolean multiplication as logical AND operations, computing the conjunction
521/// of all boolean values within each sliding window. Essential for constructing
522/// compound logical conditions and boolean pattern validation.
523#[inline]
524pub fn rolling_product_bool(window: BooleanAVT<'_, ()>, subwindow: usize) -> BooleanArray<()> {
525    let (arr, offset, len) = window;
526    let n = len;
527    let mut out_mask = new_null_mask(n);
528    let mut out = Bitmask::new_set_all(n, false);
529
530    for i in 0..n {
531        let start = if i + 1 >= subwindow {
532            i + 1 - subwindow
533        } else {
534            0
535        };
536        let mut acc = true;
537        let mut valid = subwindow > 0 && i + 1 >= subwindow;
538        for j in start..=i {
539            match unsafe { arr.get_unchecked(offset + j) } {
540                Some(val) => acc &= val,
541                None => {
542                    valid = false;
543                    break;
544                }
545            }
546        }
547        unsafe { out_mask.set_unchecked(i, valid) };
548        out.set(i, valid && acc);
549    }
550
551    BooleanArray {
552        data: out.into(),
553        null_mask: Some(out_mask),
554        len: n,
555        _phantom: PhantomData,
556    }
557}
558
559/// Zero-allocation variant: writes directly to caller's output buffers.
560#[inline]
561pub fn rolling_mean_int_to<T: NumCast + Copy + Zero>(
562    data: &[T],
563    mask: Option<&Bitmask>,
564    subwindow: usize,
565    out: &mut [f64],
566    out_mask: &mut Bitmask,
567) {
568    let n = data.len();
569    if subwindow == 0 {
570        return;
571    }
572    for i in 0..n {
573        if i + 1 < subwindow {
574            unsafe { out_mask.set_unchecked(i, false) };
575            out[i] = 0.0;
576            continue;
577        }
578        let start = i + 1 - subwindow;
579        let mut sum = 0.0;
580        let mut valid = true;
581        for j in start..=i {
582            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
583                sum += num_traits::cast(data[j]).unwrap_or(0.0);
584            } else {
585                valid = false;
586                break;
587            }
588        }
589        unsafe { out_mask.set_unchecked(i, valid) };
590        out[i] = if valid { sum / subwindow as f64 } else { 0.0 };
591    }
592    if subwindow > 0 && subwindow - 1 < out.len() {
593        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
594        out[subwindow - 1] = 0.0;
595    }
596}
597
598/// Computes rolling arithmetic means over integer data with high-precision floating-point output.
599#[inline]
600pub fn rolling_mean_int<T: NumCast + Copy + Zero>(
601    window: IntegerAVT<'_, T>,
602    subwindow: usize,
603) -> FloatArray<f64> {
604    let (arr, offset, len) = window;
605    let data = &arr.data[offset..offset + len];
606    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
607    let mut out = prealloc_vec::<f64>(len);
608    let mut out_mask = new_null_mask(len);
609    rolling_mean_int_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
610    FloatArray {
611        data: out.into(),
612        null_mask: Some(out_mask),
613    }
614}
615
616/// Zero-allocation variant: writes directly to caller's output buffers.
617#[inline]
618pub fn rolling_mean_float_to<T: Float + Copy + Zero>(
619    data: &[T],
620    mask: Option<&Bitmask>,
621    subwindow: usize,
622    out: &mut [T],
623    out_mask: &mut Bitmask,
624) {
625    let n = data.len();
626    if subwindow == 0 {
627        return;
628    }
629    for i in 0..n {
630        if i + 1 < subwindow {
631            unsafe { out_mask.set_unchecked(i, false) };
632            out[i] = T::zero();
633            continue;
634        }
635        let start = i + 1 - subwindow;
636        let mut sum = T::zero();
637        let mut valid = true;
638        for j in start..=i {
639            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
640                sum = sum + data[j];
641            } else {
642                valid = false;
643                break;
644            }
645        }
646        unsafe { out_mask.set_unchecked(i, valid) };
647        out[i] = if valid {
648            sum / T::from(subwindow as u32).unwrap()
649        } else {
650            T::zero()
651        };
652    }
653    if subwindow > 0 && subwindow - 1 < out.len() {
654        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
655        out[subwindow - 1] = T::zero();
656    }
657}
658
659/// Computes rolling arithmetic means over floating-point data with IEEE 754 compliance.
660#[inline]
661pub fn rolling_mean_float<T: Float + Copy + Zero>(
662    window: FloatAVT<'_, T>,
663    subwindow: usize,
664) -> FloatArray<T> {
665    let (arr, offset, len) = window;
666    let data = &arr.data[offset..offset + len];
667    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
668    let mut out = prealloc_vec::<T>(len);
669    let mut out_mask = new_null_mask(len);
670    rolling_mean_float_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
671    FloatArray {
672        data: out.into(),
673        null_mask: Some(out_mask),
674    }
675}
676
677/// Zero-allocation variant: writes directly to caller's output buffers.
678#[inline]
679pub fn rolling_min_int_to<T: Ord + Copy + Zero>(
680    data: &[T],
681    mask: Option<&Bitmask>,
682    subwindow: usize,
683    out: &mut [T],
684    out_mask: &mut Bitmask,
685) {
686    rolling_extreme_to(
687        data,
688        mask,
689        subwindow,
690        |a, b| a < b,
691        T::zero(),
692        out,
693        out_mask,
694    );
695    if subwindow > 0 && subwindow - 1 < out.len() {
696        out_mask.set(subwindow - 1, false);
697        out[subwindow - 1] = T::zero();
698    }
699}
700
701/// Computes rolling minimum values over integer data within sliding windows.
702#[inline]
703pub fn rolling_min_int<T: Ord + Copy + Zero>(
704    window: IntegerAVT<'_, T>,
705    subwindow: usize,
706) -> IntegerArray<T> {
707    let (arr, offset, len) = window;
708    let data = &arr.data[offset..offset + len];
709    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
710    let mut out = prealloc_vec::<T>(len);
711    let mut out_mask = new_null_mask(len);
712    rolling_min_int_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
713    IntegerArray {
714        data: out.into(),
715        null_mask: Some(out_mask),
716    }
717}
718
719/// Zero-allocation variant: writes directly to caller's output buffers.
720#[inline]
721pub fn rolling_max_int_to<T: Ord + Copy + Zero>(
722    data: &[T],
723    mask: Option<&Bitmask>,
724    subwindow: usize,
725    out: &mut [T],
726    out_mask: &mut Bitmask,
727) {
728    rolling_extreme_to(
729        data,
730        mask,
731        subwindow,
732        |a, b| a > b,
733        T::zero(),
734        out,
735        out_mask,
736    );
737}
738
739/// Computes rolling maximum values over integer data within sliding windows.
740#[inline]
741pub fn rolling_max_int<T: Ord + Copy + Zero>(
742    window: IntegerAVT<'_, T>,
743    subwindow: usize,
744) -> IntegerArray<T> {
745    let (arr, offset, len) = window;
746    let data = &arr.data[offset..offset + len];
747    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
748    let mut out = prealloc_vec::<T>(len);
749    let mut out_mask = new_null_mask(len);
750    rolling_max_int_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
751    IntegerArray {
752        data: out.into(),
753        null_mask: Some(out_mask),
754    }
755}
756
757/// Zero-allocation variant: writes directly to caller's output buffers.
758#[inline]
759pub fn rolling_min_float_to<T: Float + Copy + Zero>(
760    data: &[T],
761    mask: Option<&Bitmask>,
762    subwindow: usize,
763    out: &mut [T],
764    out_mask: &mut Bitmask,
765) {
766    rolling_extreme_to(
767        data,
768        mask,
769        subwindow,
770        |a, b| a < b,
771        T::zero(),
772        out,
773        out_mask,
774    );
775    if subwindow > 0 && subwindow - 1 < out.len() {
776        out_mask.set(subwindow - 1, false);
777        out[subwindow - 1] = T::zero();
778    }
779}
780
781/// Computes rolling minimum values over floating-point data with IEEE 754 compliance.
782#[inline]
783pub fn rolling_min_float<T: Float + Copy + Zero>(
784    window: FloatAVT<'_, T>,
785    subwindow: usize,
786) -> FloatArray<T> {
787    let (arr, offset, len) = window;
788    let data = &arr.data[offset..offset + len];
789    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
790    let mut out = prealloc_vec::<T>(len);
791    let mut out_mask = new_null_mask(len);
792    rolling_min_float_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
793    FloatArray {
794        data: out.into(),
795        null_mask: Some(out_mask),
796    }
797}
798
799/// Zero-allocation variant: writes directly to caller's output buffers.
800#[inline]
801pub fn rolling_max_float_to<T: Float + Copy + Zero>(
802    data: &[T],
803    mask: Option<&Bitmask>,
804    subwindow: usize,
805    out: &mut [T],
806    out_mask: &mut Bitmask,
807) {
808    rolling_extreme_to(
809        data,
810        mask,
811        subwindow,
812        |a, b| a > b,
813        T::zero(),
814        out,
815        out_mask,
816    );
817}
818
819/// Computes rolling maximum values over floating-point data with IEEE 754 compliance.
820#[inline]
821pub fn rolling_max_float<T: Float + Copy + Zero>(
822    window: FloatAVT<'_, T>,
823    subwindow: usize,
824) -> FloatArray<T> {
825    let (arr, offset, len) = window;
826    let data = &arr.data[offset..offset + len];
827    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
828    let mut out = prealloc_vec::<T>(len);
829    let mut out_mask = new_null_mask(len);
830    rolling_max_float_to(data, mask.as_ref(), subwindow, &mut out, &mut out_mask);
831    FloatArray {
832        data: out.into(),
833        null_mask: Some(out_mask),
834    }
835}
836
837/// Computes rolling counts of elements within sliding windows for positional analysis.
838///
839/// Counts the number of elements present within each sliding window position, providing
840/// fundamental cardinality information essential for statistical analysis and data validation.
841/// Unlike other rolling functions, operates on position information rather than data values.
842///
843/// ## Parameters
844/// * `window` - Tuple containing offset and length defining the data window scope
845/// * `subwindow` - Size of sliding window for element counting
846///
847/// ## Returns
848/// Returns an `IntegerArray<i32>` containing:
849/// - Count of elements in each complete window (always equals subwindow size)
850/// - Zero values for positions with incomplete windows
851/// - Null mask indicating where complete windows exist
852///
853/// ## Examples
854/// ```rust,ignore
855/// use simd_kernels::kernels::window::rolling_count;
856///
857/// let result = rolling_count((0, 5), 3); // 5 elements, window size 3
858/// ```
859#[inline]
860pub fn rolling_count(window: (Offset, Length), subwindow: usize) -> IntegerArray<i32> {
861    let (_offset, len) = window;
862    let mut out = prealloc_vec::<i32>(len);
863    let mut out_mask = new_null_mask(len);
864    for i in 0..len {
865        let start = if i + 1 >= subwindow {
866            i + 1 - subwindow
867        } else {
868            0
869        };
870        let count = (i - start + 1) as i32;
871        let valid_row = subwindow > 0 && i + 1 >= subwindow;
872        unsafe { out_mask.set_unchecked(i, valid_row) };
873        out[i] = if valid_row { count } else { 0 };
874    }
875    IntegerArray {
876        data: out.into(),
877        null_mask: Some(out_mask),
878    }
879}
880
881// Rank and Dense-rank kernels
882
883#[inline(always)]
884fn rank_numeric<T, F>(data: &[T], mask: Option<&Bitmask>, mut cmp: F) -> IntegerArray<i32>
885where
886    T: Copy,
887    F: FnMut(&T, &T) -> std::cmp::Ordering,
888{
889    let n = data.len();
890    let mut indices: Vec<usize> = (0..n).collect();
891    indices.sort_by(|&i, &j| cmp(&data[i], &data[j]));
892
893    let mut out = vec64![0i32; n];
894    let mut out_mask = Bitmask::new_set_all(n, false);
895
896    for (rank, &i) in indices.iter().enumerate() {
897        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
898            out[i] = (rank + 1) as i32;
899            unsafe { out_mask.set_unchecked(i, true) };
900        }
901    }
902
903    IntegerArray {
904        data: out.into(),
905        null_mask: Some(out_mask),
906    }
907}
908
909/// Computes standard SQL ROW_NUMBER() ranking for integer data with tie handling.
910///
911/// Assigns sequential rank values to elements based on their sorted order, providing
912/// standard SQL ROW_NUMBER() semantics where tied values receive different ranks.
913/// Essential for analytical queries requiring unique positional ranking.
914///
915/// ## Parameters
916/// * `window` - Integer array view containing values for ranking
917///
918/// ## Returns
919/// Returns an `IntegerArray<i32>` containing:
920/// - Rank values from 1 to n for valid elements
921/// - Zero values for null elements
922/// - Null mask indicating which positions have valid ranks
923///
924/// ## Ranking Semantics
925/// - **ROW_NUMBER() behaviour**: Each element receives a unique rank (1, 2, 3, ...)
926/// - **Tie breaking**: Tied values receive different ranks based on their position
927/// - **Ascending order**: Smaller values receive lower (better) ranks
928/// - **Null exclusion**: Null values are excluded from ranking and receive rank 0
929///
930/// ## Use Cases
931/// - **Analytical queries**: SQL ROW_NUMBER() window function implementation
932/// - **Leaderboards**: Creating ordered rankings with unique positions
933/// - **Percentile calculation**: Basis for percentile and quartile computations
934/// - **Data analysis**: Establishing ordinality in integer datasets
935///
936/// ## Examples
937/// ```rust,ignore
938/// use minarrow::IntegerArray;
939/// use simd_kernels::kernels::window::rank_int;
940///
941/// let arr = IntegerArray::<i32>::from_slice(&[30, 10, 20, 10]);
942/// let result = rank_int((&arr, 0, arr.len()));
943/// // Output: [4, 1, 3, 2] - ROW_NUMBER() style ranking
944/// ```
945#[inline(always)]
946pub fn rank_int<T: Ord + Copy>(window: IntegerAVT<T>) -> IntegerArray<i32> {
947    let (arr, offset, len) = window;
948    let data = &arr.data[offset..offset + len];
949    let null_mask = if len != arr.data.len() {
950        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
951    } else {
952        &arr.null_mask
953    };
954    rank_numeric(data, null_mask.as_ref(), |a, b| a.cmp(b))
955}
956
957/// Computes standard SQL ROW_NUMBER() ranking for floating-point data with IEEE 754 compliance.
958///
959/// Assigns sequential rank values based on sorted floating-point order, implementing
960/// ROW_NUMBER() semantics with proper handling of special floating-point values (NaN,
961/// infinity) according to IEEE 754 comparison standards.
962///
963/// ## Parameters
964/// * `window` - Float array view containing values for ranking
965///
966/// ## Returns
967/// Returns an `IntegerArray<i32>` containing:
968/// - Rank values from 1 to n for valid, non-NaN elements
969/// - Zero values for null or NaN elements
970/// - Null mask indicating positions with valid ranks
971///
972/// ## Floating-Point Ranking
973/// - **IEEE 754 ordering**: Uses IEEE 754 compliant comparison operations
974/// - **NaN handling**: NaN values are excluded from ranking (receive rank 0)
975/// - **Infinity treatment**: Positive/negative infinity participate in ranking
976/// - **Precision preservation**: Maintains full floating-point comparison precision
977///
978/// ## Ranking Semantics
979/// - **ROW_NUMBER() style**: Each non-NaN element receives unique sequential rank
980/// - **Ascending order**: Smaller floating-point values receive lower ranks
981/// - **Tie breaking**: Floating-point ties broken by original array position
982/// - **Special value exclusion**: NaN and null values excluded from rank assignment
983///
984/// ## Applications
985/// - **Statistical ranking**: Ranking continuous numerical data
986/// - **Scientific analysis**: Ordered ranking of experimental measurements
987/// - **Financial analysis**: Ranking performance metrics and indicators
988/// - **Data preprocessing**: Establishing ordinality for regression analysis
989///
990/// ## Examples
991/// ```rust,ignore
992/// use minarrow::FloatArray;
993/// use simd_kernels::kernels::window::rank_float;
994///
995/// let arr = FloatArray::<f64>::from_slice(&[3.14, 2.71, 1.41, f64::NAN]);
996/// let result = rank_float((&arr, 0, arr.len()));
997/// // Output: [3, 2, 1, 0] - NaN excluded, others ranked by value
998/// ```
999#[inline(always)]
1000pub fn rank_float<T: Float + Copy>(window: FloatAVT<T>) -> IntegerArray<i32> {
1001    let (arr, offset, len) = window;
1002    let data = &arr.data[offset..offset + len];
1003    let null_mask = if len != arr.data.len() {
1004        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1005    } else {
1006        &arr.null_mask
1007    };
1008    rank_numeric(data, null_mask.as_ref(), |a, b| a.partial_cmp(b).unwrap())
1009}
1010
1011/// Computes standard SQL ROW_NUMBER() ranking for string data with lexicographic ordering.
1012///
1013/// Assigns sequential rank values based on lexicographic string comparison, implementing
1014/// ROW_NUMBER() semantics for textual data. Essential for alphabetical ranking and
1015/// string-based analytical operations.
1016///
1017/// ## Parameters
1018/// * `arr` - String array view containing textual values for ranking
1019///
1020/// ## Returns
1021/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1022/// - **Success**: Rank values from 1 to n for valid string elements
1023/// - **Error**: KernelError if capacity validation fails
1024/// - Zero values for null string elements
1025/// - Null mask indicating positions with valid ranks
1026///
1027/// ## String Ranking Semantics
1028/// - **Lexicographic order**: Uses standard string comparison (dictionary order)
1029/// - **Case sensitivity**: Comparisons are case-sensitive ("A" < "a")
1030/// - **Unicode support**: Proper handling of UTF-8 encoded string data
1031/// - **ROW_NUMBER() behaviour**: Tied strings receive different ranks by position
1032///
1033/// ## Error Conditions
1034/// - **Capacity errors**: Returns KernelError if mask capacity validation fails
1035/// - **Memory allocation**: May fail with insufficient memory for large datasets
1036///
1037/// ## Use Cases
1038/// - **Alphabetical ranking**: Creating alphabetically ordered rankings
1039/// - **Text analysis**: Establishing lexicographic ordinality in textual data
1040/// - **Database operations**: SQL ROW_NUMBER() implementation for string columns
1041/// - **Sorting applications**: Providing ranking information for string sorting
1042///
1043/// ## Examples
1044/// ```rust,ignore
1045/// use minarrow::StringArray;
1046/// use simd_kernels::kernels::window::rank_str;
1047///
1048/// let arr = StringArray::<u32>::from_slice(&["zebra", "apple", "banana"]);
1049/// let result = rank_str((&arr, 0, arr.len())).unwrap();
1050/// // Output: [3, 1, 2] - lexicographic ranking
1051/// ```
1052#[inline(always)]
1053pub fn rank_str<T: Integer>(arr: StringAVT<T>) -> Result<IntegerArray<i32>, KernelError> {
1054    let (array, offset, len) = arr;
1055    let mask = array.null_mask.as_ref();
1056    let _ = confirm_mask_capacity(array.len(), mask)?;
1057
1058    // Gather (local_idx, string) pairs for valid elements in the window
1059    let mut tuples: Vec<(usize, &str)> = (0..len)
1060        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) }))
1061        .map(|i| (i, unsafe { array.get_unchecked(offset + i) }.unwrap_or("")))
1062        .collect();
1063
1064    // Sort by string value
1065    tuples.sort_by(|a, b| a.1.cmp(&b.1));
1066
1067    let mut out = vec64![0i32; len];
1068    let mut out_mask = new_null_mask(len);
1069
1070    // Assign ranks (1-based), using local output indices
1071    for (rank, (i, _)) in tuples.iter().enumerate() {
1072        out[*i] = (rank + 1) as i32;
1073        unsafe { out_mask.set_unchecked(*i, true) };
1074    }
1075
1076    Ok(IntegerArray {
1077        data: out.into(),
1078        null_mask: Some(out_mask),
1079    })
1080}
1081
1082#[inline(always)]
1083fn dense_rank_numeric<T, F, G>(
1084    data: &[T],
1085    mask: Option<&Bitmask>,
1086    mut sort: F,
1087    mut eq: G,
1088) -> Result<IntegerArray<i32>, KernelError>
1089where
1090    T: Copy,
1091    F: FnMut(&T, &T) -> std::cmp::Ordering,
1092    G: FnMut(&T, &T) -> bool,
1093{
1094    let n = data.len();
1095    let _ = confirm_mask_capacity(n, mask)?;
1096    let mut uniqs: Vec<T> = (0..n)
1097        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(i) }))
1098        .map(|i| data[i])
1099        .collect();
1100
1101    uniqs.sort_by(&mut sort);
1102    uniqs.dedup_by(|a, b| eq(&*a, &*b));
1103
1104    let mut out = prealloc_vec::<i32>(n);
1105    let mut out_mask = Bitmask::new_set_all(n, false);
1106
1107    for i in 0..n {
1108        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
1109            let rank = uniqs.binary_search_by(|x| sort(x, &data[i])).unwrap() + 1;
1110            out[i] = rank as i32;
1111            unsafe { out_mask.set_unchecked(i, true) };
1112        } else {
1113            out[i] = 0;
1114        }
1115    }
1116
1117    Ok(IntegerArray {
1118        data: out.into(),
1119        null_mask: Some(out_mask),
1120    })
1121}
1122
1123/// Computes SQL DENSE_RANK() ranking for integer data with consecutive rank assignment.
1124///
1125/// Assigns consecutive rank values where tied values receive identical ranks, implementing
1126/// SQL DENSE_RANK() semantics. Unlike standard ranking, eliminates gaps in rank sequence
1127/// when ties occur, providing compact rank numbering for analytical applications.
1128///
1129/// ## Parameters
1130/// * `window` - Integer array view containing values for dense ranking
1131///
1132/// ## Returns
1133/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1134/// - **Success**: Dense rank values with no gaps in sequence
1135/// - **Error**: KernelError if capacity validation fails
1136/// - Zero values for null elements
1137/// - Null mask indicating positions with valid ranks
1138///
1139/// ## Dense Ranking Semantics
1140/// - **DENSE_RANK() behaviour**: Tied values receive same rank, next rank is consecutive
1141/// - **No rank gaps**: Eliminates gaps that occur in standard RANK() function
1142/// - **Unique value counting**: Essentially counts distinct values in sorted order
1143/// - **Ascending order**: Smaller integer values receive lower (better) ranks
1144///
1145/// ## Comparison with RANK()
1146/// - **RANK()**: [1, 2, 2, 4] for values [10, 20, 20, 30]
1147/// - **DENSE_RANK()**: [1, 2, 2, 3] for values [10, 20, 20, 30]
1148///
1149/// ## Use Cases
1150/// - **Analytical queries**: SQL DENSE_RANK() window function implementation
1151/// - **Categorical ranking**: Creating compact categorical orderings
1152/// - **Percentile calculation**: Foundation for percentile computations without gaps
1153/// - **Data binning**: Assigning data points to consecutive bins based on value
1154///
1155/// ## Examples
1156/// ```rust,ignore
1157/// use minarrow::IntegerArray;
1158/// use simd_kernels::kernels::window::dense_rank_int;
1159///
1160/// let arr = IntegerArray::<i32>::from_slice(&[10, 30, 20, 30]);
1161/// let result = dense_rank_int((&arr, 0, arr.len())).unwrap();
1162/// // Output: [1, 3, 2, 3] - dense ranking with tied values
1163/// ```
1164#[inline(always)]
1165pub fn dense_rank_int<T: Ord + Copy>(
1166    window: IntegerAVT<T>,
1167) -> Result<IntegerArray<i32>, KernelError> {
1168    let (arr, offset, len) = window;
1169    let data = &arr.data[offset..offset + len];
1170    let null_mask = if len != arr.data.len() {
1171        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1172    } else {
1173        &arr.null_mask
1174    };
1175    dense_rank_numeric(data, null_mask.as_ref(), |a, b| a.cmp(b), |a, b| a == b)
1176}
1177
1178/// Computes SQL DENSE_RANK() ranking for floating-point data with IEEE 754 compliance.
1179///
1180/// Implements dense ranking for floating-point values where tied values receive identical
1181/// consecutive ranks. Handles special floating-point values (NaN, infinity) according
1182/// to IEEE 754 standards while maintaining dense rank sequence properties.
1183///
1184/// ## Parameters
1185/// * `window` - Float array view containing values for dense ranking
1186///
1187/// ## Returns
1188/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1189/// - **Success**: Dense rank values with consecutive numbering
1190/// - **Error**: KernelError if capacity validation fails
1191/// - Zero values for null or NaN elements
1192/// - Null mask indicating positions with valid ranks
1193///
1194/// ## Applications
1195/// - **Scientific ranking**: Dense ranking of experimental measurements
1196/// - **Statistical analysis**: Percentile calculations without rank gaps
1197/// - **Financial modeling**: Dense ranking of performance metrics
1198/// - **Data preprocessing**: Creating ordinal encodings for continuous variables
1199///
1200/// ## Examples
1201/// ```rust,ignore
1202/// use minarrow::FloatArray;
1203/// use simd_kernels::kernels::window::dense_rank_float;
1204///
1205/// let arr = FloatArray::<f64>::from_slice(&[1.5, 3.14, 2.71, 3.14]);
1206/// let result = dense_rank_float((&arr, 0, arr.len())).unwrap();
1207/// // Output: [1, 3, 2, 3] - dense ranking with tied 3.14 values
1208/// ```
1209#[inline(always)]
1210pub fn dense_rank_float<T: Float + Copy>(
1211    window: FloatAVT<T>,
1212) -> Result<IntegerArray<i32>, KernelError> {
1213    let (arr, offset, len) = window;
1214    let data = &arr.data[offset..offset + len];
1215    let null_mask = if len != arr.data.len() {
1216        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1217    } else {
1218        &arr.null_mask
1219    };
1220    dense_rank_numeric(
1221        data,
1222        null_mask.as_ref(),
1223        |a, b| a.partial_cmp(b).unwrap(),
1224        |a, b| a == b,
1225    )
1226}
1227
1228/// Computes SQL DENSE_RANK() ranking for string data with lexicographic dense ordering.
1229///
1230/// Implements dense ranking for string values using lexicographic comparison, where
1231/// identical strings receive the same rank and subsequent ranks remain consecutive.
1232/// Essential for alphabetical dense ranking and textual categorical analysis.
1233///
1234/// ## Parameters
1235/// * `arr` - String array view containing textual values for dense ranking
1236///
1237/// ## Returns
1238/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1239/// - **Success**: Dense rank values with consecutive sequence
1240/// - **Error**: KernelError if capacity validation fails
1241/// - Zero values for null string elements
1242/// - Null mask indicating positions with valid ranks
1243///
1244/// ## Dense String Ranking
1245/// - **DENSE_RANK() semantics**: Identical strings receive same rank, no rank gaps
1246/// - **Lexicographic ordering**: Standard dictionary-style string comparison
1247/// - **Case sensitivity**: Maintains case-sensitive comparison ("Apple" ≠ "apple")
1248/// - **UTF-8 support**: Proper handling of Unicode string sequences
1249///
1250/// ## Use Cases
1251/// - **Alphabetical dense ranking**: Creating compact alphabetical orderings
1252/// - **Categorical encoding**: Converting string categories to dense integer codes
1253/// - **Text analytics**: Establishing lexicographic ordinality for text processing
1254/// - **Database operations**: SQL DENSE_RANK() for string-valued columns
1255///
1256/// ## Examples
1257/// ```rust,ignore
1258/// use minarrow::StringArray;
1259/// use simd_kernels::kernels::window::dense_rank_str;
1260///
1261/// let arr = StringArray::<u32>::from_slice(&["banana", "apple", "cherry", "apple"]);
1262/// let result = dense_rank_str((&arr, 0, arr.len())).unwrap();
1263/// // Output: [2, 1, 3, 1] - dense ranking with tied "apple" values
1264/// ```
1265#[inline(always)]
1266pub fn dense_rank_str<T: Integer>(arr: StringAVT<T>) -> Result<IntegerArray<i32>, KernelError> {
1267    let (array, offset, len) = arr;
1268    let mask = array.null_mask.as_ref();
1269    let _ = confirm_mask_capacity(array.len(), mask)?;
1270
1271    // Collect all unique valid values in window
1272    let mut vals: Vec<&str> = (0..len)
1273        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) }))
1274        .map(|i| unsafe { array.get_unchecked(offset + i) }.unwrap_or(""))
1275        .collect();
1276    vals.sort();
1277    vals.dedup();
1278
1279    let mut out = prealloc_vec::<i32>(len);
1280    let mut out_mask = Bitmask::new_set_all(len, false);
1281
1282    for i in 0..len {
1283        let valid = mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) });
1284        if valid {
1285            let rank = vals
1286                .binary_search(&unsafe { array.get_unchecked(offset + i) }.unwrap_or(""))
1287                .unwrap()
1288                + 1;
1289            out[i] = rank as i32;
1290            unsafe { out_mask.set_unchecked(i, true) };
1291        } else {
1292            out[i] = 0;
1293        }
1294    }
1295
1296    Ok(IntegerArray {
1297        data: out.into(),
1298        null_mask: Some(out_mask),
1299    })
1300}
1301
1302// Lag / Lead / Shift kernels
1303
1304/// Zero-allocation variant: writes directly to caller's output buffers.
1305///
1306/// Panics if `out.len() != len`.
1307#[inline(always)]
1308fn shift_with_bounds_to<T: Copy>(
1309    data: &[T],
1310    mask: Option<&Bitmask>,
1311    len: usize,
1312    offset_fn: impl Fn(usize) -> Option<usize>,
1313    default: T,
1314    out: &mut [T],
1315    out_mask: &mut Bitmask,
1316) {
1317    assert_eq!(
1318        len,
1319        out.len(),
1320        "shift_with_bounds_to: input/output length mismatch"
1321    );
1322    for i in 0..len {
1323        if let Some(j) = offset_fn(i) {
1324            out[i] = data[j];
1325            let is_valid = mask.map_or(true, |m| unsafe { m.get_unchecked(j) });
1326            unsafe { out_mask.set_unchecked(i, is_valid) };
1327        } else {
1328            out[i] = default;
1329        }
1330    }
1331}
1332
1333/// Allocating variant: creates new output buffers internally.
1334#[inline(always)]
1335fn shift_with_bounds<T: Copy>(
1336    data: &[T],
1337    mask: Option<&Bitmask>,
1338    len: usize,
1339    offset_fn: impl Fn(usize) -> Option<usize>,
1340    default: T,
1341) -> (Vec64<T>, Bitmask) {
1342    let mut out = prealloc_vec::<T>(len);
1343    let mut out_mask = Bitmask::new_set_all(len, false);
1344    shift_with_bounds_to(data, mask, len, offset_fn, default, &mut out, &mut out_mask);
1345    (out, out_mask)
1346}
1347
1348#[inline(always)]
1349fn shift_str_with_bounds<T: Integer>(
1350    arr: StringAVT<T>,
1351    offset_fn: impl Fn(usize) -> Option<usize>,
1352) -> Result<StringArray<T>, KernelError> {
1353    let (array, offset, len) = arr;
1354    let src_mask = array.null_mask.as_ref();
1355    let _ = confirm_mask_capacity(array.len(), src_mask)?;
1356
1357    // Determine offsets and total bytes required
1358    let mut offsets = Vec64::<T>::with_capacity(len + 1);
1359    unsafe {
1360        offsets.set_len(len + 1);
1361    }
1362    offsets[0] = T::zero();
1363
1364    let mut total_bytes = 0;
1365    let mut string_lengths = vec![0usize; len];
1366
1367    for i in 0..len {
1368        let byte_len = if let Some(j) = offset_fn(i) {
1369            let src_idx = offset + j;
1370            let valid = src_mask.map_or(true, |m| unsafe { m.get_unchecked(src_idx) });
1371            if valid {
1372                unsafe { array.get_unchecked(src_idx).unwrap_or("").len() }
1373            } else {
1374                0
1375            }
1376        } else {
1377            0
1378        };
1379        total_bytes += byte_len;
1380        string_lengths[i] = byte_len;
1381        offsets[i + 1] = T::from_usize(total_bytes);
1382    }
1383
1384    // Allocate data buffer
1385    let mut data = Vec64::<u8>::with_capacity(total_bytes);
1386    let mut out_mask = Bitmask::new_set_all(len, false);
1387
1388    // Write string content
1389    for i in 0..len {
1390        if let Some(j) = offset_fn(i) {
1391            let src_idx = offset + j;
1392            let valid = src_mask.map_or(true, |m| unsafe { m.get_unchecked(src_idx) });
1393            if valid {
1394                let s = unsafe { array.get_unchecked(src_idx).unwrap_or("") };
1395                data.extend_from_slice(s.as_bytes());
1396                unsafe { out_mask.set_unchecked(i, true) };
1397                continue;
1398            }
1399        }
1400        // Not valid or OOB write nothing
1401    }
1402
1403    Ok(StringArray {
1404        offsets: offsets.into(),
1405        data: data.into(),
1406        null_mask: Some(out_mask),
1407    })
1408}
1409
1410// Integer
1411
1412/// Accesses values from previous positions in integer arrays with configurable offset.
1413///
1414/// Implements SQL LAG() window function semantics, retrieving values from earlier positions
1415/// in the array sequence. Essential for time series analysis, trend detection, and
1416/// comparative analytics requiring access to historical data points.
1417///
1418/// ## Parameters
1419/// * `window` - Integer array view containing sequential data for lag access
1420/// * `n` - Lag offset specifying how many positions to look backward
1421///
1422/// ## Returns
1423/// Returns an `IntegerArray<T>` containing:
1424/// - Values from n positions earlier in the sequence
1425/// - Default values for positions where lag source is unavailable
1426/// - Null mask indicating validity of lagged values
1427///
1428/// ## Examples
1429/// ```rust,ignore
1430/// use minarrow::IntegerArray;
1431/// use simd_kernels::kernels::window::lag_int;
1432///
1433/// let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1434/// let result = lag_int((&arr, 0, arr.len()), 1);
1435/// ```
1436/// Zero-allocation variant: writes directly to caller's output buffers.
1437#[inline]
1438pub fn lag_int_to<T: Copy + Default>(
1439    data: &[T],
1440    mask: Option<&Bitmask>,
1441    n: usize,
1442    out: &mut [T],
1443    out_mask: &mut Bitmask,
1444) {
1445    let len = data.len();
1446    shift_with_bounds_to(
1447        data,
1448        mask,
1449        len,
1450        |i| if i >= n { Some(i - n) } else { None },
1451        T::default(),
1452        out,
1453        out_mask,
1454    );
1455}
1456
1457#[inline]
1458pub fn lag_int<T: Copy + Default>(window: IntegerAVT<T>, n: usize) -> IntegerArray<T> {
1459    let (arr, offset, len) = window;
1460    let data_window = &arr.data[offset..offset + len];
1461    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
1462    let mut out = prealloc_vec::<T>(len);
1463    let mut out_mask = Bitmask::new_set_all(len, false);
1464    lag_int_to(data_window, mask.as_ref(), n, &mut out, &mut out_mask);
1465    IntegerArray {
1466        data: out.into(),
1467        null_mask: Some(out_mask),
1468    }
1469}
1470
1471/// Accesses values from future positions in integer arrays with configurable offset.
1472///
1473/// Implements SQL LEAD() window function semantics, retrieving values from later positions
1474/// in the array sequence. Essential for predictive analytics, forward-looking comparisons,
1475/// and temporal analysis requiring access to future data points.
1476///
1477/// ## Parameters
1478/// * `window` - Integer array view containing sequential data for lead access
1479/// * `n` - Lead offset specifying how many positions to look forward
1480///
1481/// ## Returns
1482/// Returns an `IntegerArray<T>` containing:
1483/// - Values from n positions later in the sequence
1484/// - Default values for positions where lead source is unavailable
1485/// - Null mask indicating validity of lead values
1486///
1487/// ## Examples
1488/// ```rust,ignore
1489/// use minarrow::IntegerArray;
1490/// use simd_kernels::kernels::window::lead_int;
1491///
1492/// let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1493/// let result = lead_int((&arr, 0, arr.len()), 2);
1494/// ```
1495/// Zero-allocation variant: writes directly to caller's output buffers.
1496#[inline]
1497pub fn lead_int_to<T: Copy + Default>(
1498    data: &[T],
1499    mask: Option<&Bitmask>,
1500    n: usize,
1501    out: &mut [T],
1502    out_mask: &mut Bitmask,
1503) {
1504    let len = data.len();
1505    shift_with_bounds_to(
1506        data,
1507        mask,
1508        len,
1509        |i| if i + n < len { Some(i + n) } else { None },
1510        T::default(),
1511        out,
1512        out_mask,
1513    );
1514}
1515
1516#[inline]
1517pub fn lead_int<T: Copy + Default>(window: IntegerAVT<T>, n: usize) -> IntegerArray<T> {
1518    let (arr, offset, len) = window;
1519    let data_window = &arr.data[offset..offset + len];
1520    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
1521    let mut out = prealloc_vec::<T>(len);
1522    let mut out_mask = Bitmask::new_set_all(len, false);
1523    lead_int_to(data_window, mask.as_ref(), n, &mut out, &mut out_mask);
1524    IntegerArray {
1525        data: out.into(),
1526        null_mask: Some(out_mask),
1527    }
1528}
1529
1530/// Zero-allocation variant: writes directly to caller's output buffers.
1531#[inline]
1532pub fn lag_float_to<T: Copy + num_traits::Zero>(
1533    data: &[T],
1534    mask: Option<&Bitmask>,
1535    n: usize,
1536    out: &mut [T],
1537    out_mask: &mut Bitmask,
1538) {
1539    let len = data.len();
1540    shift_with_bounds_to(
1541        data,
1542        mask,
1543        len,
1544        |i| if i >= n { Some(i - n) } else { None },
1545        T::zero(),
1546        out,
1547        out_mask,
1548    );
1549}
1550
1551/// Accesses values from previous positions in floating-point arrays with IEEE 754 compliance.
1552#[inline]
1553pub fn lag_float<T: Copy + num_traits::Zero>(window: FloatAVT<T>, n: usize) -> FloatArray<T> {
1554    let (arr, offset, len) = window;
1555    let data_window = &arr.data[offset..offset + len];
1556    let mask_window = if len != arr.data.len() {
1557        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1558    } else {
1559        arr.null_mask.clone()
1560    };
1561    let (data, null_mask) = shift_with_bounds(
1562        data_window,
1563        mask_window.as_ref(),
1564        len,
1565        |i| if i >= n { Some(i - n) } else { None },
1566        T::zero(),
1567    );
1568    FloatArray {
1569        data: data.into(),
1570        null_mask: Some(null_mask),
1571    }
1572}
1573
1574/// Accesses values from future positions in floating-point arrays with IEEE 754 compliance.
1575///
1576/// Implements SQL LEAD() function for floating-point data, retrieving values from later
1577/// positions while preserving IEEE 754 semantics. Essential for forward-looking analysis
1578/// and predictive modeling with continuous numerical data.
1579///
1580/// ## Parameters
1581/// * `window` - Float array view containing sequential floating-point data
1582/// * `n` - Lead offset specifying forward position distance
1583///
1584/// ## Returns
1585/// Returns a `FloatArray<T>` containing:
1586/// - Floating-point values from n positions later
1587/// - Zero values for positions beyond available future
1588/// - Null mask indicating lead validity and special value propagation
1589///
1590/// ## Use Cases
1591/// - **Predictive analytics**: Accessing future values for comparison and modeling
1592/// - **Signal analysis**: Forward-looking operations in digital signal processing
1593/// - **Financial modeling**: Computing forward returns and future value analysis
1594/// - **Scientific computing**: Implementing forward difference schemes
1595///
1596/// ## Examples
1597/// ```rust,ignore
1598/// use minarrow::FloatArray;
1599/// use simd_kernels::kernels::window::lead_float;
1600///
1601/// let arr = FloatArray::<f32>::from_slice(&[1.1, 2.2, 3.3, 4.4]);
1602/// let result = lead_float((&arr, 0, arr.len()), 2);
1603/// // Output: [3.3, 4.4, 0.0, 0.0] - lead by 2 positions
1604/// ```
1605#[inline]
1606pub fn lead_float<T: Copy + num_traits::Zero>(window: FloatAVT<T>, n: usize) -> FloatArray<T> {
1607    let (arr, offset, len) = window;
1608    let data_window = &arr.data[offset..offset + len];
1609    let mask_window = if len != arr.data.len() {
1610        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1611    } else {
1612        arr.null_mask.clone()
1613    };
1614    let (data, null_mask) = shift_with_bounds(
1615        data_window,
1616        mask_window.as_ref(),
1617        len,
1618        |i| if i + n < len { Some(i + n) } else { None },
1619        T::zero(),
1620    );
1621    FloatArray {
1622        data: data.into(),
1623        null_mask: Some(null_mask),
1624    }
1625}
1626
1627// String
1628
1629/// Accesses string values from previous positions with UTF-8 string handling.
1630///
1631/// Implements SQL LAG() function for string data, retrieving textual values from earlier
1632/// positions in the array sequence. Essential for textual analysis, sequential string
1633/// processing, and comparative text analytics.
1634///
1635/// ## Parameters
1636/// * `arr` - String array view containing sequential textual data
1637/// * `n` - Lag offset specifying backward position distance
1638///
1639/// ## Returns
1640/// Returns `Result<StringArray<T>, KernelError>` containing:
1641/// - **Success**: String values from n positions earlier
1642/// - **Error**: KernelError if string processing fails
1643/// - Empty strings for positions with insufficient history
1644/// - Null mask indicating lag validity and source availability
1645///
1646/// ## String Lag Semantics
1647/// - **UTF-8 preservation**: Maintains proper UTF-8 encoding throughout operation
1648/// - **Null propagation**: Null strings in source positions result in null outputs
1649/// - **Memory management**: Efficient string copying and allocation strategies
1650/// - **Boundary handling**: Positions without history receive empty string defaults
1651///
1652/// ## Applications
1653/// - **Text analysis**: Comparing current text with previous entries
1654/// - **Sequential processing**: Analysing patterns in ordered textual data
1655/// - **Log analysis**: Accessing previous log entries for context
1656/// - **Natural language processing**: Context-aware text processing with history
1657///
1658/// ## Examples
1659/// ```rust,ignore
1660/// use minarrow::StringArray;
1661/// use simd_kernels::kernels::window::lag_str;
1662///
1663/// let arr = StringArray::<u32>::from_slice(&["first", "second", "third"]);
1664/// let result = lag_str((&arr, 0, arr.len()), 1).unwrap();
1665/// // Output: ["", "first", "second"] - strings lagged by 1 position
1666/// ```
1667#[inline]
1668pub fn lag_str<T: Integer>(arr: StringAVT<T>, n: usize) -> Result<StringArray<T>, KernelError> {
1669    shift_str_with_bounds(arr, |i| if i >= n { Some(i - n) } else { None })
1670}
1671
1672/// Accesses string values from future positions with efficient UTF-8 processing.
1673///
1674/// Implements SQL LEAD() function for string data, retrieving textual values from later
1675/// positions in the array sequence. Critical for forward-looking text analysis and
1676/// sequential string pattern recognition.
1677///
1678/// ## Parameters
1679/// * `arr` - String array view containing sequential textual data
1680/// * `n` - Lead offset specifying forward position distance
1681///
1682/// ## Returns
1683/// Returns `Result<StringArray<T>, KernelError>` containing:
1684/// - **Success**: String values from n positions later
1685/// - **Error**: KernelError if string processing encounters issues
1686/// - Empty strings for positions beyond available future
1687/// - Null mask indicating lead validity and source availability
1688///
1689/// ## Examples
1690/// ```rust,ignore
1691/// use minarrow::StringArray;
1692/// use simd_kernels::kernels::window::lead_str;
1693///
1694/// let arr = StringArray::<u32>::from_slice(&["alpha", "beta", "gamma"]);
1695/// let result = lead_str((&arr, 0, arr.len()), 1).unwrap();
1696/// ```
1697#[inline]
1698pub fn lead_str<T: Integer>(arr: StringAVT<T>, n: usize) -> Result<StringArray<T>, KernelError> {
1699    let (_array, _offset, len) = arr;
1700    shift_str_with_bounds(arr, |i| if i + n < len { Some(i + n) } else { None })
1701}
1702
1703// Shift variants
1704/// Shifts integer array elements by specified offset with bidirectional support.
1705///
1706/// Provides unified interface for both LAG and LEAD operations through signed offset
1707/// parameter. Positive offsets implement LEAD semantics (forward shift), negative
1708/// offsets implement LAG semantics (backward shift), enabling flexible positional access.
1709///
1710/// ## Parameters
1711/// * `window` - Integer array view containing data for shifting
1712/// * `offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1713///
1714/// ## Returns
1715/// Returns an `IntegerArray<T>` containing:
1716/// - Shifted integer values according to offset direction
1717/// - Default values for positions beyond available data
1718/// - Null mask reflecting validity of shifted positions
1719///
1720/// ## Shift Semantics
1721/// - **Positive offset**: LEAD operation (shift left, access future values)
1722/// - **Negative offset**: LAG operation (shift right, access past values)
1723/// - **Zero offset**: Identity operation (returns original array)
1724/// - **Boundary handling**: Out-of-bounds positions receive default values
1725///
1726/// ## Applications
1727/// - **Time series analysis**: Flexible temporal shifting for comparison operations
1728/// - **Sequence processing**: Bidirectional access in ordered integer sequences
1729/// - **Algorithm implementation**: Building blocks for complex windowing operations
1730/// - **Data transformation**: Positional transformations in numerical datasets
1731///
1732/// ## Examples
1733/// ```rust,ignore
1734/// use minarrow::IntegerArray;
1735/// use simd_kernels::kernels::window::shift_int;
1736///
1737/// let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4]);
1738/// let lag = shift_int((&arr, 0, arr.len()), -1);  // LAG by 1
1739/// let lead = shift_int((&arr, 0, arr.len()), 2);  // LEAD by 2
1740/// // lag:  [0, 1, 2, 3] - previous values
1741/// // lead: [3, 4, 0, 0] - future values
1742/// ```
1743#[inline(always)]
1744pub fn shift_int<T: Copy + Default>(window: IntegerAVT<T>, offset: isize) -> IntegerArray<T> {
1745    let (arr, win_offset, win_len) = window;
1746    if offset == 0 {
1747        return IntegerArray {
1748            data: Vec64::from_slice(&arr.data[win_offset..win_offset + win_len]).into(),
1749            null_mask: if win_len != arr.data.len() {
1750                arr.null_mask
1751                    .as_ref()
1752                    .map(|m| m.slice_clone(win_offset, win_len))
1753            } else {
1754                arr.null_mask.clone()
1755            },
1756        };
1757    } else if offset > 0 {
1758        lead_int((arr, win_offset, win_len), offset as usize)
1759    } else {
1760        lag_int((arr, win_offset, win_len), offset.unsigned_abs())
1761    }
1762}
1763
1764/// Shifts floating-point array elements with IEEE 754 compliance and bidirectional support.
1765///
1766/// Unified shifting interface for floating-point data supporting both LAG and LEAD semantics
1767/// through signed offset parameter. Maintains IEEE 754 standards for special value handling
1768/// while providing efficient bidirectional positional access.
1769///
1770/// ## Parameters
1771/// * `window` - Float array view containing data for shifting
1772/// * `offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1773///
1774/// ## Returns
1775/// Returns a `FloatArray<T>` containing:
1776/// - Shifted floating-point values preserving IEEE 754 semantics
1777/// - Zero values for positions beyond data boundaries
1778/// - Null mask indicating validity of shifted positions
1779///
1780/// ## Examples
1781/// ```rust,ignore
1782/// use minarrow::FloatArray;
1783/// use simd_kernels::kernels::window::shift_float;
1784///
1785/// let arr = FloatArray::<f64>::from_slice(&[1.1, 2.2, 3.3, 4.4]);
1786/// let backward = shift_float((&arr, 0, arr.len()), -2); // LAG by 2
1787/// let forward = shift_float((&arr, 0, arr.len()), 1);   // LEAD by 1
1788/// ```
1789#[inline(always)]
1790pub fn shift_float<T: Copy + num_traits::Zero>(
1791    window: FloatAVT<T>,
1792    offset: isize,
1793) -> FloatArray<T> {
1794    let (arr, win_offset, win_len) = window;
1795    if offset == 0 {
1796        return FloatArray {
1797            data: Vec64::from_slice(&arr.data[win_offset..win_offset + win_len]).into(),
1798            null_mask: if win_len != arr.data.len() {
1799                arr.null_mask
1800                    .as_ref()
1801                    .map(|m| m.slice_clone(win_offset, win_len))
1802            } else {
1803                arr.null_mask.clone()
1804            },
1805        };
1806    } else if offset > 0 {
1807        lead_float((arr, win_offset, win_len), offset as usize)
1808    } else {
1809        lag_float((arr, win_offset, win_len), offset.unsigned_abs())
1810    }
1811}
1812
1813/// Shifts string array elements with UTF-8 integrity and bidirectional offset support.
1814///
1815/// String shifting function supporting both LAG and LEAD operations through
1816/// signed offset parameter. Maintains UTF-8 encoding integrity while providing flexible
1817/// positional access for textual sequence analysis.
1818///
1819/// ## Parameters
1820/// * `arr` - String array view containing textual data for shifting
1821/// * `shift_offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1822///
1823/// ## Returns
1824/// Returns `Result<StringArray<T>, KernelError>` containing:
1825/// - **Success**: Shifted string values maintaining UTF-8 integrity
1826/// - **Error**: KernelError if string processing encounters issues
1827/// - Empty strings for positions beyond data boundaries
1828/// - Null mask reflecting validity of shifted string positions
1829///
1830/// ## Shift Semantics
1831/// - **Positive offset**: LEAD operation accessing future string values
1832/// - **Negative offset**: LAG operation accessing historical string values
1833/// - **Zero offset**: Identity operation (returns cloned array slice)
1834/// - **Boundary conditions**: Out-of-range positions produce empty strings
1835///
1836/// ## Examples
1837/// ```rust,ignore
1838/// use minarrow::StringArray;
1839/// use simd_kernels::kernels::window::shift_str;
1840///
1841/// let arr = StringArray::<u32>::from_slice(&["one", "two", "three"]);
1842/// let back = shift_str((&arr, 0, arr.len()), -1).unwrap(); // LAG
1843/// let forward = shift_str((&arr, 0, arr.len()), 1).unwrap(); // LEAD
1844/// // back:    ["", "one", "two"]
1845/// // forward: ["two", "three", ""]
1846/// ```
1847#[inline(always)]
1848pub fn shift_str<T: Integer>(
1849    arr: StringAVT<T>,
1850    shift_offset: isize,
1851) -> Result<StringArray<T>, KernelError> {
1852    if shift_offset == 0 {
1853        // Return this slice's window as a cloned StringArray
1854        let (array, off, len) = arr;
1855        Ok(array.slice_clone(off, len))
1856    } else if shift_offset > 0 {
1857        lead_str(arr, shift_offset as usize)
1858    } else {
1859        lag_str(arr, shift_offset.unsigned_abs())
1860    }
1861}
1862
1863#[cfg(test)]
1864mod tests {
1865    use minarrow::structs::variants::float::FloatArray;
1866    use minarrow::structs::variants::integer::IntegerArray;
1867    use minarrow::structs::variants::string::StringArray;
1868    use minarrow::{Bitmask, BooleanArray};
1869
1870    use super::*;
1871
1872    // ─────────────────────────── Helpers ───────────────────────────
1873
1874    /// Build a `Bitmask` from booleans.
1875    fn bm(bits: &[bool]) -> Bitmask {
1876        let mut m = Bitmask::new_set_all(bits.len(), false);
1877        for (i, &b) in bits.iter().enumerate() {
1878            m.set(i, b);
1879        }
1880        m
1881    }
1882
1883    /// Simple equality for `IntegerArray<T>`
1884    fn expect_int<T: PartialEq + std::fmt::Debug + Clone>(
1885        arr: &IntegerArray<T>,
1886        values: &[T],
1887        valid: &[bool],
1888    ) {
1889        assert_eq!(arr.data.as_slice(), values);
1890        let mask = arr.null_mask.as_ref().expect("mask missing");
1891        for (i, &v) in valid.iter().enumerate() {
1892            assert_eq!(mask.get(i), v, "mask bit {}", i);
1893        }
1894    }
1895
1896    /// Simple equality for `FloatArray<T>`
1897    fn expect_float<T: num_traits::Float + std::fmt::Debug>(
1898        arr: &FloatArray<T>,
1899        values: &[T],
1900        valid: &[bool],
1901        eps: T,
1902    ) {
1903        let data = arr.data.as_slice();
1904        assert_eq!(data.len(), values.len());
1905        for (a, b) in data.iter().zip(values.iter()) {
1906            assert!((*a - *b).abs() <= eps, "value mismatch {:?} vs {:?}", a, b);
1907        }
1908        let mask = arr.null_mask.as_ref().expect("mask missing");
1909        for (i, &v) in valid.iter().enumerate() {
1910            assert_eq!(mask.get(i), v);
1911        }
1912    }
1913
1914    // ───────────────────────── Rolling kernels ─────────────────────────
1915
1916    #[test]
1917    fn test_rolling_sum_int_basic() {
1918        let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
1919        let res = rolling_sum_int((&arr, 0, arr.len()), 3);
1920        expect_int(&res, &[0, 0, 6, 9, 12], &[false, false, true, true, true]);
1921    }
1922
1923    #[test]
1924    fn test_rolling_sum_int_masked() {
1925        let mut arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4]);
1926        arr.null_mask = Some(bm(&[true, false, true, true]));
1927        let res = rolling_sum_int((&arr, 0, arr.len()), 2);
1928        expect_int(
1929            &res,
1930            &[0, 0, 3, 7],
1931            &[false, false, false, true], // window valid only when no nulls in window
1932        );
1933    }
1934
1935    #[test]
1936    fn test_rolling_sum_float() {
1937        let arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0]);
1938        let res = rolling_sum_float((&arr, 0, arr.len()), 2);
1939        expect_float(&res, &[0.0, 0.0, 5.0], &[false, false, true], 1e-6f32);
1940    }
1941
1942    #[test]
1943    fn test_rolling_sum_bool() {
1944        let bools = BooleanArray::from_slice(&[true, true, false, true]);
1945        let res = rolling_sum_bool((&bools, 0, bools.len()), 2); // counts trues over window
1946        expect_int(&res, &[0, 0, 1, 1], &[false, false, true, true]);
1947    }
1948
1949    #[test]
1950    fn test_rolling_min_max_mean_count() {
1951        let arr = IntegerArray::<i32>::from_slice(&[3, 1, 4, 1, 5]);
1952        // min
1953        let rmin = rolling_min_int((&arr, 0, arr.len()), 2);
1954        expect_int(&rmin, &[0, 0, 1, 1, 1], &[false, false, true, true, true]);
1955
1956        // max
1957        let rmax = rolling_max_int((&arr, 0, arr.len()), 3);
1958        expect_int(&rmax, &[0, 0, 4, 4, 5], &[false, false, true, true, true]);
1959
1960        // mean
1961        let rmean = rolling_mean_int((&arr, 0, arr.len()), 2);
1962        expect_float(
1963            &rmean,
1964            &[0.0, 0.0, 2.5, 2.5, 3.0],
1965            &[false, false, true, true, true],
1966            1e-12,
1967        );
1968
1969        // count
1970        let cnt = rolling_count((0, 5), 3);
1971        expect_int(&cnt, &[0, 0, 3, 3, 3], &[false, false, true, true, true]);
1972    }
1973
1974    // ───────────────────────── Rank / Dense-rank ─────────────────────────
1975
1976    #[test]
1977    fn test_rank_int_basic() {
1978        let arr = IntegerArray::<i32>::from_slice(&[30, 10, 20]);
1979        let res = rank_int((&arr, 0, arr.len()));
1980        expect_int(&res, &[3, 1, 2], &[true, true, true]);
1981    }
1982
1983    #[test]
1984    fn test_rank_float_with_nulls() {
1985        let mut arr = FloatArray::<f64>::from_slice(&[2.0, 1.0, 3.0]);
1986        arr.null_mask = Some(bm(&[true, false, true]));
1987        let res = rank_float((&arr, 0, arr.len()));
1988        expect_int(&res, &[2, 0, 3], &[true, false, true]);
1989    }
1990
1991    #[test]
1992    fn test_dense_rank_str_duplicates() {
1993        let arr = StringArray::<u32>::from_slice(&["b", "a", "b", "c"]);
1994        let res = dense_rank_str((&arr, 0, arr.len())).unwrap();
1995        expect_int(&res, &[2, 1, 2, 3], &[true, true, true, true]);
1996    }
1997
1998    #[test]
1999    fn test_dense_rank_str_duplicates_chunk() {
2000        // Windowed over ["x", "b", "a", "b", "c", "y"]
2001        let arr = StringArray::<u32>::from_slice(&["x", "b", "a", "b", "c", "y"]);
2002        let res = dense_rank_str((&arr, 1, 4)).unwrap(); // window is "b", "a", "b", "c"
2003        expect_int(&res, &[2, 1, 2, 3], &[true, true, true, true]);
2004    }
2005
2006    // ───────────────────────── Lag / Lead / Shift ─────────────────────────
2007
2008    #[test]
2009    fn test_lag_lead_int() {
2010        let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
2011        let lag1 = lag_int((&arr, 0, arr.len()), 1);
2012        expect_int(&lag1, &[0, 10, 20, 30], &[false, true, true, true]);
2013
2014        let lead2 = lead_int((&arr, 0, arr.len()), 2);
2015        expect_int(&lead2, &[30, 40, 0, 0], &[true, true, false, false]);
2016    }
2017
2018    #[test]
2019    fn test_shift_float_positive_negative_zero() {
2020        let arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0]);
2021        let s0 = shift_float((&arr, 0, arr.len()), 0);
2022        assert_eq!(s0.data, arr.data); // exact copy
2023
2024        let s1 = shift_float((&arr, 0, arr.len()), 1);
2025        expect_float(&s1, &[2.0, 3.0, 0.0], &[true, true, false], 1e-6f32);
2026
2027        let s_neg = shift_float((&arr, 0, arr.len()), -1);
2028        expect_float(&s_neg, &[0.0, 1.0, 2.0], &[false, true, true], 1e-6f32);
2029    }
2030
2031    #[test]
2032    fn test_lag_lead_str() {
2033        let arr = StringArray::<u32>::from_slice(&["a", "b", "c"]);
2034        let l1 = lag_str((&arr, 0, arr.len()), 1).unwrap();
2035        assert_eq!(l1.get(0), None);
2036        assert_eq!(l1.get(1), Some("a"));
2037        assert_eq!(l1.get(2), Some("b"));
2038
2039        let d1 = lead_str((&arr, 0, arr.len()), 1).unwrap();
2040        assert_eq!(d1.get(0), Some("b"));
2041        assert_eq!(d1.get(1), Some("c"));
2042        assert_eq!(d1.get(2), None);
2043    }
2044
2045    #[test]
2046    fn test_lag_lead_str_chunk() {
2047        // Window is ["x", "a", "b", "c", "y"], test on chunk "a", "b", "c"
2048        let arr = StringArray::<u32>::from_slice(&["x", "a", "b", "c", "y"]);
2049        let l1 = lag_str((&arr, 1, 3), 1).unwrap();
2050        assert_eq!(l1.get(0), None);
2051        assert_eq!(l1.get(1), Some("a"));
2052        assert_eq!(l1.get(2), Some("b"));
2053
2054        let d1 = lead_str((&arr, 1, 3), 1).unwrap();
2055        assert_eq!(d1.get(0), Some("b"));
2056        assert_eq!(d1.get(1), Some("c"));
2057        assert_eq!(d1.get(2), None);
2058    }
2059
2060    #[test]
2061    fn test_rolling_sum_int_edge_windows() {
2062        let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
2063
2064        // window = 0 -> all zeros + mask all false
2065        let r0 = rolling_sum_int((&arr, 0, arr.len()), 0);
2066        assert_eq!(r0.data.as_slice(), &[0, 0, 0, 0, 0]);
2067        assert_eq!(r0.null_mask.as_ref().unwrap().all_unset(), true);
2068
2069        // window = 1 -> identity
2070        let r1 = rolling_sum_int((&arr, 0, arr.len()), 1);
2071        assert_eq!(r1.data.as_slice(), &[1, 2, 3, 4, 5]);
2072        assert!(r1.null_mask.as_ref().unwrap().all_set());
2073
2074        // window > len -> all zero + all false
2075        let r_large = rolling_sum_int((&arr, 0, arr.len()), 10);
2076        assert_eq!(r_large.data.as_slice(), &[0, 0, 0, 0, 0]);
2077        assert_eq!(r_large.null_mask.as_ref().unwrap().all_unset(), true);
2078    }
2079
2080    #[test]
2081    fn test_rolling_sum_float_masked_nulls_propagate() {
2082        let mut arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0, 4.0]);
2083        // null in the middle
2084        arr.null_mask = Some(bm(&[true, true, false, true]));
2085        let r = rolling_sum_float((&arr, 0, arr.len()), 2);
2086        //   i=0: <full-window, 0.0, false>
2087        //   i=1: first full-window -> cleared -> 0.0, false
2088        //   i=2: window contains null -> mask=false, but value = 2.0
2089        //   i=3: window contains null -> mask=false, but value = 4.0
2090        expect_float(
2091            &r,
2092            &[0.0, 0.0, 2.0, 4.0],
2093            &[false, false, false, false],
2094            1e-6,
2095        );
2096    }
2097
2098    #[test]
2099    fn test_rolling_sum_bool_with_nulls() {
2100        let mut b = BooleanArray::from_slice(&[true, false, true, true]);
2101        b.null_mask = Some(bm(&[true, false, true, true]));
2102        let r = rolling_sum_bool((&b, 0, b.len()), 2);
2103        // windows [t,f], [f,t], [t,t] -> only last window is all non-null
2104        expect_int(&r, &[0, 0, 1, 2], &[false, false, false, true]);
2105    }
2106
2107    #[test]
2108    fn test_lag_str_null_propagation() {
2109        let mut arr = StringArray::<u32>::from_slice(&["x", "y", "z"]);
2110        arr.null_mask = Some(bm(&[true, false, true])); // y is null
2111        let lag1 = lag_str((&arr, 0, arr.len()), 1).unwrap();
2112        assert_eq!(lag1.get(0), None); // no source -> null
2113        assert_eq!(lag1.get(1), Some("x"));
2114        assert_eq!(lag1.get(2), None); // source was null
2115        let m = lag1.null_mask.unwrap();
2116        assert_eq!(m.get(0), false);
2117        assert_eq!(m.get(1), true);
2118        assert_eq!(m.get(2), false);
2119    }
2120
2121    #[test]
2122    fn test_lag_str_null_propagation_chunk() {
2123        // Window ["w", "x", "y", "z", "q"], test on chunk "x", "y", "z"
2124        let mut arr = StringArray::<u32>::from_slice(&["w", "x", "y", "z", "q"]);
2125        arr.null_mask = Some(bm(&[true, true, false, true, true]));
2126        let lag1 = lag_str((&arr, 1, 3), 1).unwrap();
2127        assert_eq!(lag1.get(0), None); // "x", index 0 in chunk, no source
2128        assert_eq!(lag1.get(1), Some("x")); // "y", index 1 pulls "x" (valid)
2129        assert_eq!(lag1.get(2), None); // "z", index 2 pulls "y" (invalid)
2130        let m = lag1.null_mask.unwrap();
2131        assert_eq!(m.get(0), false);
2132        assert_eq!(m.get(1), true);
2133        assert_eq!(m.get(2), false);
2134    }
2135
2136    #[test]
2137    fn test_shift_str_large_offset() {
2138        let arr = StringArray::<u32>::from_slice(&["a", "b", "c"]);
2139        let shifted = shift_str((&arr, 0, arr.len()), 10).unwrap(); // > len
2140        assert_eq!(shifted.len(), 3);
2141        for i in 0..3 {
2142            assert_eq!(shifted.get(i), None);
2143            assert_eq!(shifted.null_mask.as_ref().unwrap().get(i), false);
2144        }
2145    }
2146
2147    #[test]
2148    fn test_shift_str_large_offset_chunk() {
2149        // Window ["w", "a", "b", "c", "x"]
2150        let arr = StringArray::<u32>::from_slice(&["w", "a", "b", "c", "x"]);
2151        let shifted = shift_str((&arr, 1, 3), 10).unwrap(); // window is "a","b","c"
2152        assert_eq!(shifted.len(), 3);
2153        for i in 0..3 {
2154            assert_eq!(shifted.get(i), None);
2155            assert_eq!(shifted.null_mask.as_ref().unwrap().get(i), false);
2156        }
2157    }
2158
2159    #[test]
2160    fn test_rank_str_ties_and_nulls() {
2161        let mut arr = StringArray::<u32>::from_slice(&["a", "b", "a", "c"]);
2162        arr.null_mask = Some(bm(&[true, true, false, true]));
2163        let r = rank_str((&arr, 0, arr.len())).unwrap();
2164        // valid positions: 0="a"(rank1),1="b"(rank3),2=null,3="c"(rank4)
2165        expect_int(&r, &[1, 2, 0, 3], &[true, true, false, true]);
2166    }
2167
2168    #[test]
2169    fn test_rank_str_ties_and_nulls_chunk() {
2170        // Window ["w", "a", "b", "a", "c"]
2171        let mut arr = StringArray::<u32>::from_slice(&["w", "a", "b", "a", "c"]);
2172        arr.null_mask = Some(bm(&[true, true, true, false, true]));
2173        let r = rank_str((&arr, 1, 4)).unwrap(); // "a","b","a","c"
2174        expect_int(&r, &[1, 2, 0, 3], &[true, true, false, true]);
2175    }
2176}