simd_kernels/kernels/
window.rs

1// Copyright Peter Bower 2025. All Rights Reserved.
2// Licensed under Mozilla Public License (MPL) 2.0.
3
4//! # **Window Functions Kernels Module** - *High-Performance Analytical Window Operations*
5//!
6//! Advanced window function kernels for sliding window computations,
7//! ranking operations, and positional analytics with SIMD acceleration and null-aware semantics.
8//! Backbone of time series analysis, analytical SQL window functions, and chunked streaming computations.
9//!
10//! ## Core Operations
11//! - **Moving averages**: Rolling mean calculations with configurable window sizes
12//! - **Cumulative functions**: Running sums, products, and statistical aggregations  
13//! - **Ranking functions**: ROW_NUMBER, RANK, DENSE_RANK with tie-handling strategies
14//! - **Lead/lag operations**: Positional value access with configurable offsets
15//! - **Percentile functions**: Moving quantile calculations with interpolation support
16//! - **Window aggregates**: MIN, MAX, SUM operations over sliding windows
17
18include!(concat!(env!("OUT_DIR"), "/simd_lanes.rs"));
19
20use std::marker::PhantomData;
21
22use minarrow::{
23    Bitmask, BooleanAVT, BooleanArray, FloatArray, Integer, IntegerArray, Length, MaskedArray,
24    Offset, StringArray, Vec64,
25    aliases::{FloatAVT, IntegerAVT},
26    vec64,
27};
28use num_traits::{Float, Num, NumCast, One, Zero};
29
30use crate::{errors::KernelError, utils::confirm_mask_capacity};
31use minarrow::StringAVT;
32
33// Helpers
34#[inline(always)]
35fn new_null_mask(len: usize) -> Bitmask {
36    Bitmask::new_set_all(len, false)
37}
38
39#[inline(always)]
40fn prealloc_vec<T: Copy>(len: usize) -> Vec64<T> {
41    let mut v = Vec64::<T>::with_capacity(len);
42    unsafe { v.set_len(len) };
43    v
44}
45
46// Rolling kernels (sum, product, min, max, mean, count)
47
48/// Generic sliding window aggregator for kernels that allow an
49/// incremental push and pop update (sum, product, etc.).
50/// Always emits the running aggregate, even when the subwindow has nulls.
51/// Only flags “valid” once the full subwindow has been seen.
52#[inline(always)]
53fn rolling_push_pop<T, FAdd, FRem>(
54    data: &[T],
55    mask: Option<&Bitmask>,
56    subwindow: usize,
57    mut add: FAdd,
58    mut remove: FRem,
59    zero: T,
60) -> (Vec64<T>, Bitmask)
61where
62    T: Copy,
63    FAdd: FnMut(T, T) -> T,
64    FRem: FnMut(T, T) -> T,
65{
66    let n = data.len();
67    let mut out = prealloc_vec::<T>(n);
68    let mut out_mask = new_null_mask(n);
69
70    if subwindow == 0 {
71        for slot in &mut out {
72            *slot = zero;
73        }
74        return (out, out_mask);
75    }
76
77    let mut agg = zero;
78    let mut invalids = 0usize;
79    for i in 0..n {
80        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
81            agg = add(agg, data[i]);
82        } else {
83            invalids += 1;
84        }
85        if i + 1 > subwindow {
86            let j = i + 1 - subwindow - 1;
87            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
88                agg = remove(agg, data[j]);
89            } else {
90                invalids -= 1;
91            }
92        }
93        if i + 1 < subwindow {
94            unsafe { out_mask.set_unchecked(i, false) };
95            out[i] = zero;
96        } else {
97            let ok = invalids == 0;
98            unsafe { out_mask.set_unchecked(i, ok) };
99            out[i] = agg;
100        }
101    }
102    (out, out_mask)
103}
104
105/// Generic rolling extreme aggregator (min/max) for a subwindow over a slice.
106#[inline(always)]
107fn rolling_extreme<T, F>(
108    data: &[T],
109    mask: Option<&Bitmask>,
110    subwindow: usize,
111    mut better: F,
112    zero: T,
113) -> (Vec64<T>, Bitmask)
114where
115    T: Copy,
116    F: FnMut(&T, &T) -> bool,
117{
118    let n = data.len();
119    let mut out = prealloc_vec::<T>(n);
120    let mut out_mask = new_null_mask(n);
121
122    if subwindow == 0 {
123        return (out, out_mask);
124    }
125
126    for i in 0..n {
127        if i + 1 < subwindow {
128            unsafe { out_mask.set_unchecked(i, false) };
129            out[i] = zero;
130            continue;
131        }
132        let start = i + 1 - subwindow;
133        let mut found = false;
134        let mut extreme = zero;
135        for j in start..=i {
136            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
137                if !found {
138                    extreme = data[j];
139                    found = true;
140                } else if better(&data[j], &extreme) {
141                    extreme = data[j];
142                }
143            } else {
144                found = false;
145                break;
146            }
147        }
148        unsafe { out_mask.set_unchecked(i, found) };
149        out[i] = if found { extreme } else { zero };
150    }
151    (out, out_mask)
152}
153
154/// Computes rolling sums over a sliding window for integer data with null-aware semantics.
155///
156/// Applies a sliding window of configurable size to compute cumulative sums, employing 
157/// incremental computation to avoid O(n²) complexity through efficient push-pop operations.
158/// Each position in the output represents the sum of values within the preceding window.
159///
160/// ## Parameters
161/// * `window` - Integer array view containing the data, offset, and length information
162/// * `subwindow` - Size of the sliding window (number of elements to sum)
163///
164/// ## Returns
165/// Returns an `IntegerArray<T>` containing:
166/// - Rolling sums for each position where a complete window exists
167/// - Zero values for positions before the window is complete
168/// - Null mask indicating validity (false for incomplete windows or null-contaminated windows)
169///
170/// ## Examples
171/// ```rust,ignore
172/// use minarrow::IntegerArray;
173/// use simd_kernels::kernels::window::rolling_sum_int;
174///
175/// let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
176/// let result = rolling_sum_int((&arr, 0, arr.len()), 3);
177/// ```
178#[inline]
179pub fn rolling_sum_int<T: Num + Copy + Zero>(
180    window: IntegerAVT<'_, T>,
181    subwindow: usize,
182) -> IntegerArray<T> {
183    let (arr, offset, len) = window;
184    let data = &arr.data[offset..offset + len];
185    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
186    let (mut out, mut out_mask) = rolling_push_pop(
187        data,
188        mask.as_ref(),
189        subwindow,
190        |a, b| a + b,
191        |a, b| a - b,
192        T::zero(),
193    );
194    if arr.null_mask.is_some() && subwindow > 0 && subwindow - 1 < out.len() {
195        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
196        out[subwindow - 1] = T::zero();
197    }
198    IntegerArray {
199        data: out.into(),
200        null_mask: Some(out_mask),
201    }
202}
203
204/// Computes rolling sums over a sliding window for floating-point data with IEEE 754 compliance.
205///
206/// Applies incremental computation to calculate cumulative sums across sliding windows,
207/// maintaining numerical stability through careful accumulation strategies. Handles
208/// special floating-point values (infinity, NaN) according to IEEE 754 semantics.
209///
210/// ## Parameters
211/// * `window` - Float array view containing the data, offset, and length information
212/// * `subwindow` - Size of the sliding window for summation
213///
214/// ## Returns
215/// Returns a `FloatArray<T>` containing:
216/// - Rolling sums computed incrementally for efficiency
217/// - Zero values for positions with incomplete windows
218/// - Proper null mask for window validity tracking
219///
220/// ## Examples
221/// ```rust,ignore
222/// use minarrow::FloatArray;
223/// use simd_kernels::kernels::window::rolling_sum_float;
224///
225/// let arr = FloatArray::<f64>::from_slice(&[1.5, 2.3, 3.7, 4.1]);
226/// let result = rolling_sum_float((&arr, 0, arr.len()), 2);
227/// ```
228#[inline]
229pub fn rolling_sum_float<T: Float + Copy + Zero>(
230    window: FloatAVT<'_, T>,
231    subwindow: usize,
232) -> FloatArray<T> {
233    let (arr, offset, len) = window;
234    let data = &arr.data[offset..offset + len];
235    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
236    let (mut out, mut out_mask) = rolling_push_pop(
237        data,
238        mask.as_ref(),
239        subwindow,
240        |a, b| a + b,
241        |a, b| a - b,
242        T::zero(),
243    );
244    if subwindow > 0 && subwindow - 1 < out.len() {
245        out_mask.set(subwindow - 1, false);
246        out[subwindow - 1] = T::zero();
247    }
248    FloatArray {
249        data: out.into(),
250        null_mask: Some(out_mask),
251    }
252}
253
254/// Computes rolling sums over boolean data, counting true values within sliding windows.
255///
256/// Treats boolean values as integers (true=1, false=0) and applies sliding window
257/// summation to count true occurrences within each window position. Essential for
258/// constructing conditional aggregations and boolean pattern analysis.
259///
260/// ## Parameters
261/// * `window` - Boolean array view with offset and length specifications
262/// * `subwindow` - Number of boolean values to consider in each sliding window
263///
264/// ## Returns
265/// Returns an `IntegerArray<i32>` containing:
266/// - Count of true values within each complete window
267/// - Zero for positions with incomplete windows
268/// - Null mask indicating window completeness and null contamination
269///
270/// ## Examples
271/// ```rust,ignore
272/// use minarrow::BooleanArray;
273/// use simd_kernels::kernels::window::rolling_sum_bool;
274///
275/// let bools = BooleanArray::from_slice(&[true, false, true, true]);
276/// let result = rolling_sum_bool((&bools, 0, bools.len()), 2);
277/// ```
278#[inline]
279pub fn rolling_sum_bool(window: BooleanAVT<'_, ()>, subwindow: usize) -> IntegerArray<i32> {
280    let (arr, offset, len) = window;
281    let bools: Vec<i32> = arr.iter_range(offset, len).map(|b| b as i32).collect();
282    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
283    let (mut out, mut out_mask) = rolling_push_pop(
284        &bools,
285        mask.as_ref(),
286        subwindow,
287        |a, b| a + b,
288        |a, b| a - b,
289        0,
290    );
291    if subwindow > 0 && subwindow - 1 < out.len() {
292        out_mask.set(subwindow - 1, false);
293        out[subwindow - 1] = 0;
294    }
295    IntegerArray {
296        data: out.into(),
297        null_mask: Some(out_mask),
298    }
299}
300
301/// Computes rolling products over a sliding window for integer data with overflow protection.
302///
303/// Applies multiplicative aggregation across sliding windows using incremental computation
304/// through division operations. Maintains numerical stability through careful handling of
305/// zero values and potential overflow conditions in integer arithmetic.
306///
307/// ## Parameters
308/// * `window` - Integer array view containing multiplicands and window specification
309/// * `subwindow` - Number of consecutive elements to multiply in each window
310///
311/// ## Returns
312/// Returns an `IntegerArray<T>` containing:
313/// - Rolling products computed via incremental multiplication/division
314/// - Identity value (1) for positions with incomplete windows
315/// - Null mask reflecting window completeness and null contamination
316///
317/// ## Examples
318/// ```rust,ignore
319/// use minarrow::IntegerArray;
320/// use simd_kernels::kernels::window::rolling_product_int;
321///
322/// let arr = IntegerArray::<i32>::from_slice(&[2, 3, 4, 5]);
323/// let result = rolling_product_int((&arr, 0, arr.len()), 2);
324/// ```
325#[inline]
326pub fn rolling_product_int<T: Num + Copy + One + Zero>(
327    window: IntegerAVT<'_, T>,
328    subwindow: usize,
329) -> IntegerArray<T> {
330    let (arr, offset, len) = window;
331    let data = &arr.data[offset..offset + len];
332    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
333    let (out, out_mask) = rolling_push_pop(
334        data,
335        mask.as_ref(),
336        subwindow,
337        |a, b| a * b,
338        |a, b| a / b,
339        T::one(),
340    );
341    IntegerArray {
342        data: out.into(),
343        null_mask: Some(out_mask),
344    }
345}
346
347/// Computes rolling products over floating-point data with IEEE 754 mathematical semantics.
348///
349/// Performs multiplicative aggregation using incremental computation strategies that
350/// maintain numerical precision through careful handling of special values (infinity,
351/// NaN, zero) according to IEEE 754 standards.
352///
353/// ## Parameters
354/// * `window` - Float array view containing multiplicands for window processing
355/// * `subwindow` - Window size determining number of values to multiply
356///
357/// ## Returns
358/// Returns a `FloatArray<T>` containing:
359/// - Rolling products computed with floating-point precision
360/// - Identity value (1.0) for incomplete window positions
361/// - Comprehensive null mask for validity tracking
362///
363/// ## Examples
364/// ```rust,ignore
365/// use minarrow::FloatArray;
366/// use simd_kernels::kernels::window::rolling_product_float;
367///
368/// let arr = FloatArray::<f64>::from_slice(&[1.5, 2.0, 3.0, 4.0]);
369/// let result = rolling_product_float((&arr, 0, arr.len()), 2);
370/// ```
371#[inline]
372pub fn rolling_product_float<T: Float + Copy + One + Zero>(
373    window: FloatAVT<'_, T>,
374    subwindow: usize,
375) -> FloatArray<T> {
376    let (arr, offset, len) = window;
377    let data = &arr.data[offset..offset + len];
378    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
379    let (out, out_mask) = rolling_push_pop(
380        data,
381        mask.as_ref(),
382        subwindow,
383        |a, b| a * b,
384        |a, b| a / b,
385        T::one(),
386    );
387    FloatArray {
388        data: out.into(),
389        null_mask: Some(out_mask),
390    }
391}
392
393/// Computes rolling logical AND operations over boolean data within sliding windows.
394///
395/// Treats boolean multiplication as logical AND operations, computing the conjunction
396/// of all boolean values within each sliding window. Essential for constructing
397/// compound logical conditions and boolean pattern validation.
398///
399/// ## Parameters
400/// * `window` - Boolean array view containing logical values for conjunction
401/// * `subwindow` - Number of boolean values to AND together in each window
402///
403/// ## Returns
404/// Returns a `BooleanArray<()>` containing:
405/// - Logical AND results for each complete window position
406/// - False values for positions with incomplete windows
407/// - Null mask indicating window validity and null contamination
408///
409/// ## Examples
410/// ```rust,ignore
411/// use minarrow::BooleanArray;
412/// use simd_kernels::kernels::window::rolling_product_bool;
413///
414/// let bools = BooleanArray::from_slice(&[true, true, false, true]);
415/// let result = rolling_product_bool((&bools, 0, bools.len()), 2);
416/// ```
417#[inline]
418pub fn rolling_product_bool(window: BooleanAVT<'_, ()>, subwindow: usize) -> BooleanArray<()> {
419    let (arr, offset, len) = window;
420    let n = len;
421    let mut out_mask = new_null_mask(n);
422    let mut out = Bitmask::new_set_all(n, false);
423
424    for i in 0..n {
425        let start = if i + 1 >= subwindow {
426            i + 1 - subwindow
427        } else {
428            0
429        };
430        let mut acc = true;
431        let mut valid = subwindow > 0 && i + 1 >= subwindow;
432        for j in start..=i {
433            match unsafe { arr.get_unchecked(offset + j) } {
434                Some(val) => acc &= val,
435                None => {
436                    valid = false;
437                    break;
438                }
439            }
440        }
441        unsafe { out_mask.set_unchecked(i, valid) };
442        out.set(i, valid && acc);
443    }
444
445    BooleanArray {
446        data: out.into(),
447        null_mask: Some(out_mask),
448        len: n,
449        _phantom: PhantomData,
450    }
451}
452
453/// Computes rolling arithmetic means over integer data with high-precision floating-point output.
454///
455/// Calculates moving averages across sliding windows, converting integer inputs to double-precision
456/// floating-point for accurate mean computation. Essential for time series analysis and statistical
457/// smoothing operations over integer sequences.
458///
459/// ## Parameters
460/// * `window` - Integer array view containing values for mean calculation
461/// * `subwindow` - Window size determining number of values to average
462///
463/// ## Returns
464/// Returns a `FloatArray<f64>` containing:
465/// - Rolling arithmetic means computed with double precision
466/// - Zero values for positions with incomplete windows
467/// - Null mask indicating window completeness and null contamination
468///
469/// ## Examples
470/// ```rust,ignore
471/// use minarrow::IntegerArray;
472/// use simd_kernels::kernels::window::rolling_mean_int;
473///
474/// let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
475/// let result = rolling_mean_int((&arr, 0, arr.len()), 3);
476/// ```
477#[inline]
478pub fn rolling_mean_int<T: NumCast + Copy + Zero>(
479    window: IntegerAVT<'_, T>,
480    subwindow: usize,
481) -> FloatArray<f64> {
482    let (arr, offset, len) = window;
483    let n = len;
484    let mask_ref = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
485    let mut out = prealloc_vec::<f64>(n);
486    let mut out_mask = new_null_mask(n);
487
488    if subwindow == 0 {
489        return FloatArray {
490            data: out.into(),
491            null_mask: Some(out_mask),
492        };
493    }
494
495    for i in 0..n {
496        if i + 1 < subwindow {
497            unsafe { out_mask.set_unchecked(i, false) };
498            out[i] = 0.0;
499            continue;
500        }
501        let start = i + 1 - subwindow;
502        let mut sum = 0.0;
503        let mut valid = true;
504        for j in start..=i {
505            if mask_ref
506                .as_ref()
507                .map_or(true, |m| unsafe { m.get_unchecked(j) })
508            {
509                sum += num_traits::cast(arr.data[offset + j]).unwrap_or(0.0);
510            } else {
511                valid = false;
512                break;
513            }
514        }
515        unsafe { out_mask.set_unchecked(i, valid) };
516        out[i] = if valid { sum / subwindow as f64 } else { 0.0 };
517    }
518
519    if subwindow > 0 && subwindow - 1 < out.len() {
520        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
521        out[subwindow - 1] = 0.0;
522    }
523
524    FloatArray {
525        data: out.into(),
526        null_mask: Some(out_mask),
527    }
528}
529
530/// Computes rolling arithmetic means over floating-point data with IEEE 754 compliance.
531///
532/// Performs moving average calculations across sliding windows while maintaining the original
533/// floating-point precision. Implements numerically stable summation strategies and proper
534/// handling of special floating-point values (NaN, infinity).
535///
536/// ## Parameters
537/// * `window` - Float array view containing values for mean calculation
538/// * `subwindow` - Window size specifying number of values to average
539///
540/// ## Returns
541/// Returns a `FloatArray<T>` containing:
542/// - Rolling arithmetic means preserving input precision (f32 or f64)
543/// - Zero values for positions with incomplete windows
544/// - Comprehensive null mask for validity indication
545///
546/// ## Examples
547/// ```rust,ignore
548/// use minarrow::FloatArray;
549/// use simd_kernels::kernels::window::rolling_mean_float;
550///
551/// let arr = FloatArray::<f32>::from_slice(&[1.5, 2.5, 3.5, 4.5]);
552/// let result = rolling_mean_float((&arr, 0, arr.len()), 2);
553/// ```
554#[inline]
555pub fn rolling_mean_float<T: Float + Copy + Zero>(
556    window: FloatAVT<'_, T>,
557    subwindow: usize,
558) -> FloatArray<T> {
559    let (arr, offset, len) = window;
560    let n = len;
561    let mask_ref = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
562    let mut out = prealloc_vec::<T>(n);
563    let mut out_mask = new_null_mask(n);
564
565    if subwindow == 0 {
566        return FloatArray {
567            data: out.into(),
568            null_mask: Some(out_mask),
569        };
570    }
571
572    for i in 0..n {
573        if i + 1 < subwindow {
574            unsafe { out_mask.set_unchecked(i, false) };
575            out[i] = T::zero();
576            continue;
577        }
578        let start = i + 1 - subwindow;
579        let mut sum = T::zero();
580        let mut valid = true;
581        for j in start..=i {
582            if mask_ref
583                .as_ref()
584                .map_or(true, |m| unsafe { m.get_unchecked(j) })
585            {
586                sum = sum + arr.data[offset + j];
587            } else {
588                valid = false;
589                break;
590            }
591        }
592        unsafe { out_mask.set_unchecked(i, valid) };
593        out[i] = if valid {
594            sum / T::from(subwindow as u32).unwrap()
595        } else {
596            T::zero()
597        };
598    }
599
600    if subwindow > 0 && subwindow - 1 < out.len() {
601        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
602        out[subwindow - 1] = T::zero();
603    }
604
605    FloatArray {
606        data: out.into(),
607        null_mask: Some(out_mask),
608    }
609}
610
611/// For min, we skip that very first `window` slot so that
612/// the _first_ non-zero result appears one step later.
613/// Computes rolling minimum values over integer data within sliding windows.
614///
615/// Identifies minimum values across sliding windows using efficient comparison strategies.
616/// Each output position represents the smallest value found within the preceding window,
617/// essential for trend analysis and outlier detection in integer sequences.
618///
619/// ## Parameters
620/// * `window` - Integer array view containing values for minimum detection
621/// * `subwindow` - Window size determining scope of minimum search
622///
623/// ## Returns
624/// Returns an `IntegerArray<T>` containing:
625/// - Rolling minimum values for each complete window position
626/// - Zero values for positions with incomplete windows
627/// - Null mask indicating window completeness and validity
628///
629/// ## Use Cases
630/// - **Trend analysis**: Identifying minimum trends in time series data
631/// - **Outlier detection**: Finding exceptionally low values within windows
632/// - **Signal processing**: Detecting minimum signal levels over time intervals
633/// - **Statistical analysis**: Computing rolling minimum statistics
634///
635/// ## Examples
636/// ```rust,ignore
637/// use minarrow::IntegerArray;
638/// use simd_kernels::kernels::window::rolling_min_int;
639///
640/// let arr = IntegerArray::<i32>::from_slice(&[5, 2, 8, 1, 9]);
641/// let result = rolling_min_int((&arr, 0, arr.len()), 3);
642/// // Output: [0, 0, 2, 1, 1] - minimum values in each 3-element window
643/// ```
644#[inline]
645pub fn rolling_min_int<T: Ord + Copy + Zero>(
646    window: IntegerAVT<'_, T>,
647    subwindow: usize,
648) -> IntegerArray<T> {
649    let (arr, offset, len) = window;
650    let data = &arr.data[offset..offset + len];
651    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
652    let (mut out, mut out_mask) =
653        rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a < b, T::zero());
654    if subwindow > 0 && subwindow - 1 < out.len() {
655        out_mask.set(subwindow - 1, false);
656        out[subwindow - 1] = T::zero();
657    }
658    IntegerArray {
659        data: out.into(),
660        null_mask: Some(out_mask),
661    }
662}
663
664/// Computes rolling maximum values over integer data within sliding windows.
665///
666/// Identifies maximum values across sliding windows using efficient comparison operations.
667/// Each output position represents the largest value found within the preceding window,
668/// crucial for peak detection and trend analysis in integer data sequences.
669///
670/// ## Parameters
671/// * `window` - Integer array view containing values for maximum detection
672/// * `subwindow` - Window size determining scope of maximum search
673///
674/// ## Returns
675/// Returns an `IntegerArray<T>` containing:
676/// - Rolling maximum values for each complete window position
677/// - Zero values for positions with incomplete windows
678/// - Comprehensive null mask for validity tracking
679///
680/// ## Applications
681/// - **Peak detection**: Identifying maximum peaks in time series data
682/// - **Trend analysis**: Tracking maximum value trends over sliding intervals
683/// - **Threshold monitoring**: Detecting when maximum values exceed thresholds
684/// - **Signal analysis**: Finding maximum signal amplitudes in windows
685///
686/// ## Examples
687/// ```rust,ignore
688/// use minarrow::IntegerArray;
689/// use simd_kernels::kernels::window::rolling_max_int;
690///
691/// let arr = IntegerArray::<i32>::from_slice(&[3, 7, 2, 9, 4]);
692/// let result = rolling_max_int((&arr, 0, arr.len()), 3);
693/// // Output: [0, 0, 7, 9, 9] - maximum values in each 3-element window
694/// ```
695#[inline]
696pub fn rolling_max_int<T: Ord + Copy + Zero>(
697    window: IntegerAVT<'_, T>,
698    subwindow: usize,
699) -> IntegerArray<T> {
700    let (arr, offset, len) = window;
701    let data = &arr.data[offset..offset + len];
702    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
703    let (out, out_mask) = rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a > b, T::zero());
704    IntegerArray {
705        data: out.into(),
706        null_mask: Some(out_mask),
707    }
708}
709
710/// Computes rolling minimum values over floating-point data with IEEE 754 compliance.
711///
712/// Identifies minimum floating-point values across sliding windows while properly handling
713/// special values (NaN, infinity) according to IEEE 754 standards. Essential for numerical
714/// analysis requiring precise minimum detection in floating-point sequences.
715///
716/// ## Parameters
717/// * `window` - Float array view containing values for minimum computation
718/// * `subwindow` - Window size determining minimum search scope
719///
720/// ## Returns
721/// Returns a `FloatArray<T>` containing:
722/// - Rolling minimum values computed with floating-point precision
723/// - Zero values for positions with incomplete windows
724/// - Null mask reflecting window validity and special value handling
725/// 
726/// ## Applications
727/// - **Scientific computing**: Finding minimum values in numerical simulations
728/// - **Signal processing**: Detecting minimum signal levels with high precision
729/// - **Financial analysis**: Identifying minimum prices in sliding time windows
730/// - **Data analysis**: Computing rolling minimum statistics for trend detection
731///
732/// ## Examples
733/// ```rust,ignore
734/// use minarrow::FloatArray;
735/// use simd_kernels::kernels::window::rolling_min_float;
736///
737/// let arr = FloatArray::<f64>::from_slice(&[3.14, 2.71, 1.41, 1.73]);
738/// let result = rolling_min_float((&arr, 0, arr.len()), 2);
739/// // Output: [0.0, 0.0, 2.71, 1.41] - minimum values in 2-element windows
740/// ```
741#[inline]
742pub fn rolling_min_float<T: Float + Copy + Zero>(
743    window: FloatAVT<'_, T>,
744    subwindow: usize,
745) -> FloatArray<T> {
746    let (arr, offset, len) = window;
747    let data = &arr.data[offset..offset + len];
748    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
749    let (mut out, mut out_mask) =
750        rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a < b, T::zero());
751    if subwindow > 0 && subwindow - 1 < out.len() {
752        out_mask.set(subwindow - 1, false);
753        out[subwindow - 1] = T::zero();
754    }
755    FloatArray {
756        data: out.into(),
757        null_mask: Some(out_mask),
758    }
759}
760
761/// Computes rolling maximum values over floating-point data with IEEE 754 compliance.
762///
763/// Identifies maximum floating-point values across sliding windows while maintaining
764/// strict adherence to IEEE 754 comparison semantics. Handles special floating-point
765/// values (NaN, infinity) correctly for reliable maximum detection.
766///
767/// ## Parameters
768/// * `window` - Float array view containing values for maximum computation
769/// * `subwindow` - Window size specifying maximum search scope
770///
771/// ## Returns
772/// Returns a `FloatArray<T>` containing:
773/// - Rolling maximum values with full floating-point precision
774/// - Zero values for positions with incomplete windows
775/// - Comprehensive null mask for result validity indication
776///
777/// ## Applications
778/// - **Peak detection**: Identifying maximum peaks in continuous data streams
779/// - **Envelope detection**: Computing upper envelopes of signal data
780/// - **Risk analysis**: Finding maximum risk values in financial time series
781/// - **Scientific measurement**: Detecting maximum readings in sensor data
782///
783/// ## Examples
784/// ```rust,ignore
785/// use minarrow::FloatArray;
786/// use simd_kernels::kernels::window::rolling_max_float;
787///
788/// let arr = FloatArray::<f32>::from_slice(&[1.5, 3.2, 2.1, 4.7]);
789/// let result = rolling_max_float((&arr, 0, arr.len()), 2);
790/// ```
791#[inline]
792pub fn rolling_max_float<T: Float + Copy + Zero>(
793    window: FloatAVT<'_, T>,
794    subwindow: usize,
795) -> FloatArray<T> {
796    let (arr, offset, len) = window;
797    let data = &arr.data[offset..offset + len];
798    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
799    let (out, out_mask) = rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a > b, T::zero());
800    FloatArray {
801        data: out.into(),
802        null_mask: Some(out_mask),
803    }
804}
805
806/// Computes rolling counts of elements within sliding windows for positional analysis.
807///
808/// Counts the number of elements present within each sliding window position, providing
809/// fundamental cardinality information essential for statistical analysis and data validation.
810/// Unlike other rolling functions, operates on position information rather than data values.
811///
812/// ## Parameters
813/// * `window` - Tuple containing offset and length defining the data window scope
814/// * `subwindow` - Size of sliding window for element counting
815///
816/// ## Returns
817/// Returns an `IntegerArray<i32>` containing:
818/// - Count of elements in each complete window (always equals subwindow size)
819/// - Zero values for positions with incomplete windows
820/// - Null mask indicating where complete windows exist
821///
822/// ## Examples
823/// ```rust,ignore
824/// use simd_kernels::kernels::window::rolling_count;
825///
826/// let result = rolling_count((0, 5), 3); // 5 elements, window size 3
827/// ```
828#[inline]
829pub fn rolling_count(window: (Offset, Length), subwindow: usize) -> IntegerArray<i32> {
830    let (_offset, len) = window;
831    let mut out = prealloc_vec::<i32>(len);
832    let mut out_mask = new_null_mask(len);
833    for i in 0..len {
834        let start = if i + 1 >= subwindow {
835            i + 1 - subwindow
836        } else {
837            0
838        };
839        let count = (i - start + 1) as i32;
840        let valid_row = subwindow > 0 && i + 1 >= subwindow;
841        unsafe { out_mask.set_unchecked(i, valid_row) };
842        out[i] = if valid_row { count } else { 0 };
843    }
844    IntegerArray {
845        data: out.into(),
846        null_mask: Some(out_mask),
847    }
848}
849
850// Rank and Dense-rank kernels
851
852#[inline(always)]
853fn rank_numeric<T, F>(data: &[T], mask: Option<&Bitmask>, mut cmp: F) -> IntegerArray<i32>
854where
855    T: Copy,
856    F: FnMut(&T, &T) -> std::cmp::Ordering,
857{
858    let n = data.len();
859    let mut indices: Vec<usize> = (0..n).collect();
860    indices.sort_by(|&i, &j| cmp(&data[i], &data[j]));
861
862    let mut out = vec64![0i32; n];
863    let mut out_mask = Bitmask::new_set_all(n, false);
864
865    for (rank, &i) in indices.iter().enumerate() {
866        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
867            out[i] = (rank + 1) as i32;
868            unsafe { out_mask.set_unchecked(i, true) };
869        }
870    }
871
872    IntegerArray {
873        data: out.into(),
874        null_mask: Some(out_mask),
875    }
876}
877
878/// Computes standard SQL ROW_NUMBER() ranking for integer data with tie handling.
879///
880/// Assigns sequential rank values to elements based on their sorted order, providing
881/// standard SQL ROW_NUMBER() semantics where tied values receive different ranks.
882/// Essential for analytical queries requiring unique positional ranking.
883///
884/// ## Parameters
885/// * `window` - Integer array view containing values for ranking
886///
887/// ## Returns
888/// Returns an `IntegerArray<i32>` containing:
889/// - Rank values from 1 to n for valid elements
890/// - Zero values for null elements
891/// - Null mask indicating which positions have valid ranks
892///
893/// ## Ranking Semantics
894/// - **ROW_NUMBER() behaviour**: Each element receives a unique rank (1, 2, 3, ...)
895/// - **Tie breaking**: Tied values receive different ranks based on their position
896/// - **Ascending order**: Smaller values receive lower (better) ranks
897/// - **Null exclusion**: Null values are excluded from ranking and receive rank 0
898///
899/// ## Use Cases
900/// - **Analytical queries**: SQL ROW_NUMBER() window function implementation
901/// - **Leaderboards**: Creating ordered rankings with unique positions
902/// - **Percentile calculation**: Basis for percentile and quartile computations
903/// - **Data analysis**: Establishing ordinality in integer datasets
904///
905/// ## Examples
906/// ```rust,ignore
907/// use minarrow::IntegerArray;
908/// use simd_kernels::kernels::window::rank_int;
909///
910/// let arr = IntegerArray::<i32>::from_slice(&[30, 10, 20, 10]);
911/// let result = rank_int((&arr, 0, arr.len()));
912/// // Output: [4, 1, 3, 2] - ROW_NUMBER() style ranking
913/// ```
914#[inline(always)]
915pub fn rank_int<T: Ord + Copy>(window: IntegerAVT<T>) -> IntegerArray<i32> {
916    let (arr, offset, len) = window;
917    let data = &arr.data[offset..offset + len];
918    let null_mask = if len != arr.data.len() {
919        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
920    } else {
921        &arr.null_mask
922    };
923    rank_numeric(data, null_mask.as_ref(), |a, b| a.cmp(b))
924}
925
926/// Computes standard SQL ROW_NUMBER() ranking for floating-point data with IEEE 754 compliance.
927///
928/// Assigns sequential rank values based on sorted floating-point order, implementing
929/// ROW_NUMBER() semantics with proper handling of special floating-point values (NaN,
930/// infinity) according to IEEE 754 comparison standards.
931///
932/// ## Parameters
933/// * `window` - Float array view containing values for ranking
934///
935/// ## Returns
936/// Returns an `IntegerArray<i32>` containing:
937/// - Rank values from 1 to n for valid, non-NaN elements
938/// - Zero values for null or NaN elements
939/// - Null mask indicating positions with valid ranks
940///
941/// ## Floating-Point Ranking
942/// - **IEEE 754 ordering**: Uses IEEE 754 compliant comparison operations
943/// - **NaN handling**: NaN values are excluded from ranking (receive rank 0)
944/// - **Infinity treatment**: Positive/negative infinity participate in ranking
945/// - **Precision preservation**: Maintains full floating-point comparison precision
946///
947/// ## Ranking Semantics
948/// - **ROW_NUMBER() style**: Each non-NaN element receives unique sequential rank
949/// - **Ascending order**: Smaller floating-point values receive lower ranks
950/// - **Tie breaking**: Floating-point ties broken by original array position
951/// - **Special value exclusion**: NaN and null values excluded from rank assignment
952///
953/// ## Applications
954/// - **Statistical ranking**: Ranking continuous numerical data
955/// - **Scientific analysis**: Ordered ranking of experimental measurements
956/// - **Financial analysis**: Ranking performance metrics and indicators
957/// - **Data preprocessing**: Establishing ordinality for regression analysis
958///
959/// ## Examples
960/// ```rust,ignore
961/// use minarrow::FloatArray;
962/// use simd_kernels::kernels::window::rank_float;
963///
964/// let arr = FloatArray::<f64>::from_slice(&[3.14, 2.71, 1.41, f64::NAN]);
965/// let result = rank_float((&arr, 0, arr.len()));
966/// // Output: [3, 2, 1, 0] - NaN excluded, others ranked by value
967/// ```
968#[inline(always)]
969pub fn rank_float<T: Float + Copy>(window: FloatAVT<T>) -> IntegerArray<i32> {
970    let (arr, offset, len) = window;
971    let data = &arr.data[offset..offset + len];
972    let null_mask = if len != arr.data.len() {
973        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
974    } else {
975        &arr.null_mask
976    };
977    rank_numeric(data, null_mask.as_ref(), |a, b| a.partial_cmp(b).unwrap())
978}
979
980/// Computes standard SQL ROW_NUMBER() ranking for string data with lexicographic ordering.
981///
982/// Assigns sequential rank values based on lexicographic string comparison, implementing
983/// ROW_NUMBER() semantics for textual data. Essential for alphabetical ranking and
984/// string-based analytical operations.
985///
986/// ## Parameters
987/// * `arr` - String array view containing textual values for ranking
988///
989/// ## Returns
990/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
991/// - **Success**: Rank values from 1 to n for valid string elements
992/// - **Error**: KernelError if capacity validation fails
993/// - Zero values for null string elements
994/// - Null mask indicating positions with valid ranks
995///
996/// ## String Ranking Semantics
997/// - **Lexicographic order**: Uses standard string comparison (dictionary order)
998/// - **Case sensitivity**: Comparisons are case-sensitive ("A" < "a")
999/// - **Unicode support**: Proper handling of UTF-8 encoded string data
1000/// - **ROW_NUMBER() behaviour**: Tied strings receive different ranks by position
1001///
1002/// ## Error Conditions
1003/// - **Capacity errors**: Returns KernelError if mask capacity validation fails
1004/// - **Memory allocation**: May fail with insufficient memory for large datasets
1005///
1006/// ## Use Cases
1007/// - **Alphabetical ranking**: Creating alphabetically ordered rankings
1008/// - **Text analysis**: Establishing lexicographic ordinality in textual data
1009/// - **Database operations**: SQL ROW_NUMBER() implementation for string columns
1010/// - **Sorting applications**: Providing ranking information for string sorting
1011///
1012/// ## Examples
1013/// ```rust,ignore
1014/// use minarrow::StringArray;
1015/// use simd_kernels::kernels::window::rank_str;
1016///
1017/// let arr = StringArray::<u32>::from_slice(&["zebra", "apple", "banana"]);
1018/// let result = rank_str((&arr, 0, arr.len())).unwrap();
1019/// // Output: [3, 1, 2] - lexicographic ranking
1020/// ```
1021#[inline(always)]
1022pub fn rank_str<T: Integer>(arr: StringAVT<T>) -> Result<IntegerArray<i32>, KernelError> {
1023    let (array, offset, len) = arr;
1024    let mask = array.null_mask.as_ref();
1025    let _ = confirm_mask_capacity(array.len(), mask)?;
1026
1027    // Gather (local_idx, string) pairs for valid elements in the window
1028    let mut tuples: Vec<(usize, &str)> = (0..len)
1029        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) }))
1030        .map(|i| (i, unsafe { array.get_unchecked(offset + i) }.unwrap_or("")))
1031        .collect();
1032
1033    // Sort by string value
1034    tuples.sort_by(|a, b| a.1.cmp(&b.1));
1035
1036    let mut out = vec64![0i32; len];
1037    let mut out_mask = new_null_mask(len);
1038
1039    // Assign ranks (1-based), using local output indices
1040    for (rank, (i, _)) in tuples.iter().enumerate() {
1041        out[*i] = (rank + 1) as i32;
1042        unsafe { out_mask.set_unchecked(*i, true) };
1043    }
1044
1045    Ok(IntegerArray {
1046        data: out.into(),
1047        null_mask: Some(out_mask),
1048    })
1049}
1050
1051#[inline(always)]
1052fn dense_rank_numeric<T, F, G>(
1053    data: &[T],
1054    mask: Option<&Bitmask>,
1055    mut sort: F,
1056    mut eq: G,
1057) -> Result<IntegerArray<i32>, KernelError>
1058where
1059    T: Copy,
1060    F: FnMut(&T, &T) -> std::cmp::Ordering,
1061    G: FnMut(&T, &T) -> bool,
1062{
1063    let n = data.len();
1064    let _ = confirm_mask_capacity(n, mask)?;
1065    let mut uniqs: Vec<T> = (0..n)
1066        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(i) }))
1067        .map(|i| data[i])
1068        .collect();
1069
1070    uniqs.sort_by(&mut sort);
1071    uniqs.dedup_by(|a, b| eq(&*a, &*b));
1072
1073    let mut out = prealloc_vec::<i32>(n);
1074    let mut out_mask = Bitmask::new_set_all(n, false);
1075
1076    for i in 0..n {
1077        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
1078            let rank = uniqs.binary_search_by(|x| sort(x, &data[i])).unwrap() + 1;
1079            out[i] = rank as i32;
1080            unsafe { out_mask.set_unchecked(i, true) };
1081        } else {
1082            out[i] = 0;
1083        }
1084    }
1085
1086    Ok(IntegerArray {
1087        data: out.into(),
1088        null_mask: Some(out_mask),
1089    })
1090}
1091
1092/// Computes SQL DENSE_RANK() ranking for integer data with consecutive rank assignment.
1093///
1094/// Assigns consecutive rank values where tied values receive identical ranks, implementing
1095/// SQL DENSE_RANK() semantics. Unlike standard ranking, eliminates gaps in rank sequence
1096/// when ties occur, providing compact rank numbering for analytical applications.
1097///
1098/// ## Parameters
1099/// * `window` - Integer array view containing values for dense ranking
1100///
1101/// ## Returns
1102/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1103/// - **Success**: Dense rank values with no gaps in sequence
1104/// - **Error**: KernelError if capacity validation fails
1105/// - Zero values for null elements
1106/// - Null mask indicating positions with valid ranks
1107///
1108/// ## Dense Ranking Semantics
1109/// - **DENSE_RANK() behaviour**: Tied values receive same rank, next rank is consecutive
1110/// - **No rank gaps**: Eliminates gaps that occur in standard RANK() function
1111/// - **Unique value counting**: Essentially counts distinct values in sorted order
1112/// - **Ascending order**: Smaller integer values receive lower (better) ranks
1113///
1114/// ## Comparison with RANK()
1115/// - **RANK()**: [1, 2, 2, 4] for values [10, 20, 20, 30]
1116/// - **DENSE_RANK()**: [1, 2, 2, 3] for values [10, 20, 20, 30]
1117///
1118/// ## Use Cases
1119/// - **Analytical queries**: SQL DENSE_RANK() window function implementation
1120/// - **Categorical ranking**: Creating compact categorical orderings
1121/// - **Percentile calculation**: Foundation for percentile computations without gaps
1122/// - **Data binning**: Assigning data points to consecutive bins based on value
1123///
1124/// ## Examples
1125/// ```rust,ignore
1126/// use minarrow::IntegerArray;
1127/// use simd_kernels::kernels::window::dense_rank_int;
1128///
1129/// let arr = IntegerArray::<i32>::from_slice(&[10, 30, 20, 30]);
1130/// let result = dense_rank_int((&arr, 0, arr.len())).unwrap();
1131/// // Output: [1, 3, 2, 3] - dense ranking with tied values
1132/// ```
1133#[inline(always)]
1134pub fn dense_rank_int<T: Ord + Copy>(
1135    window: IntegerAVT<T>,
1136) -> Result<IntegerArray<i32>, KernelError> {
1137    let (arr, offset, len) = window;
1138    let data = &arr.data[offset..offset + len];
1139    let null_mask = if len != arr.data.len() {
1140        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1141    } else {
1142        &arr.null_mask
1143    };
1144    dense_rank_numeric(data, null_mask.as_ref(), |a, b| a.cmp(b), |a, b| a == b)
1145}
1146
1147/// Computes SQL DENSE_RANK() ranking for floating-point data with IEEE 754 compliance.
1148///
1149/// Implements dense ranking for floating-point values where tied values receive identical
1150/// consecutive ranks. Handles special floating-point values (NaN, infinity) according
1151/// to IEEE 754 standards while maintaining dense rank sequence properties.
1152///
1153/// ## Parameters
1154/// * `window` - Float array view containing values for dense ranking
1155///
1156/// ## Returns
1157/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1158/// - **Success**: Dense rank values with consecutive numbering
1159/// - **Error**: KernelError if capacity validation fails
1160/// - Zero values for null or NaN elements
1161/// - Null mask indicating positions with valid ranks
1162///
1163/// ## Applications
1164/// - **Scientific ranking**: Dense ranking of experimental measurements
1165/// - **Statistical analysis**: Percentile calculations without rank gaps
1166/// - **Financial modeling**: Dense ranking of performance metrics
1167/// - **Data preprocessing**: Creating ordinal encodings for continuous variables
1168///
1169/// ## Examples
1170/// ```rust,ignore
1171/// use minarrow::FloatArray;
1172/// use simd_kernels::kernels::window::dense_rank_float;
1173///
1174/// let arr = FloatArray::<f64>::from_slice(&[1.5, 3.14, 2.71, 3.14]);
1175/// let result = dense_rank_float((&arr, 0, arr.len())).unwrap();
1176/// // Output: [1, 3, 2, 3] - dense ranking with tied 3.14 values
1177/// ```
1178#[inline(always)]
1179pub fn dense_rank_float<T: Float + Copy>(
1180    window: FloatAVT<T>,
1181) -> Result<IntegerArray<i32>, KernelError> {
1182    let (arr, offset, len) = window;
1183    let data = &arr.data[offset..offset + len];
1184    let null_mask = if len != arr.data.len() {
1185        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1186    } else {
1187        &arr.null_mask
1188    };
1189    dense_rank_numeric(
1190        data,
1191        null_mask.as_ref(),
1192        |a, b| a.partial_cmp(b).unwrap(),
1193        |a, b| a == b,
1194    )
1195}
1196
1197/// Computes SQL DENSE_RANK() ranking for string data with lexicographic dense ordering.
1198///
1199/// Implements dense ranking for string values using lexicographic comparison, where
1200/// identical strings receive the same rank and subsequent ranks remain consecutive.
1201/// Essential for alphabetical dense ranking and textual categorical analysis.
1202///
1203/// ## Parameters
1204/// * `arr` - String array view containing textual values for dense ranking
1205///
1206/// ## Returns
1207/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1208/// - **Success**: Dense rank values with consecutive sequence
1209/// - **Error**: KernelError if capacity validation fails
1210/// - Zero values for null string elements
1211/// - Null mask indicating positions with valid ranks
1212///
1213/// ## Dense String Ranking
1214/// - **DENSE_RANK() semantics**: Identical strings receive same rank, no rank gaps
1215/// - **Lexicographic ordering**: Standard dictionary-style string comparison
1216/// - **Case sensitivity**: Maintains case-sensitive comparison ("Apple" ≠ "apple")
1217/// - **UTF-8 support**: Proper handling of Unicode string sequences
1218///
1219/// ## Use Cases
1220/// - **Alphabetical dense ranking**: Creating compact alphabetical orderings
1221/// - **Categorical encoding**: Converting string categories to dense integer codes
1222/// - **Text analytics**: Establishing lexicographic ordinality for text processing
1223/// - **Database operations**: SQL DENSE_RANK() for string-valued columns
1224///
1225/// ## Examples
1226/// ```rust,ignore
1227/// use minarrow::StringArray;
1228/// use simd_kernels::kernels::window::dense_rank_str;
1229///
1230/// let arr = StringArray::<u32>::from_slice(&["banana", "apple", "cherry", "apple"]);
1231/// let result = dense_rank_str((&arr, 0, arr.len())).unwrap();
1232/// // Output: [2, 1, 3, 1] - dense ranking with tied "apple" values
1233/// ```
1234#[inline(always)]
1235pub fn dense_rank_str<T: Integer>(arr: StringAVT<T>) -> Result<IntegerArray<i32>, KernelError> {
1236    let (array, offset, len) = arr;
1237    let mask = array.null_mask.as_ref();
1238    let _ = confirm_mask_capacity(array.len(), mask)?;
1239
1240    // Collect all unique valid values in window
1241    let mut vals: Vec<&str> = (0..len)
1242        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) }))
1243        .map(|i| unsafe { array.get_unchecked(offset + i) }.unwrap_or(""))
1244        .collect();
1245    vals.sort();
1246    vals.dedup();
1247
1248    let mut out = prealloc_vec::<i32>(len);
1249    let mut out_mask = Bitmask::new_set_all(len, false);
1250
1251    for i in 0..len {
1252        let valid = mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) });
1253        if valid {
1254            let rank = vals
1255                .binary_search(&unsafe { array.get_unchecked(offset + i) }.unwrap_or(""))
1256                .unwrap()
1257                + 1;
1258            out[i] = rank as i32;
1259            unsafe { out_mask.set_unchecked(i, true) };
1260        } else {
1261            out[i] = 0;
1262        }
1263    }
1264
1265    Ok(IntegerArray {
1266        data: out.into(),
1267        null_mask: Some(out_mask),
1268    })
1269}
1270
1271// Lag / Lead / Shift kernels
1272
1273#[inline(always)]
1274fn shift_with_bounds<T: Copy>(
1275    data: &[T],
1276    mask: Option<&Bitmask>,
1277    len: usize,
1278    offset_fn: impl Fn(usize) -> Option<usize>,
1279    default: T,
1280) -> (Vec64<T>, Bitmask) {
1281    let mut out = prealloc_vec::<T>(len);
1282    let mut out_mask = Bitmask::new_set_all(len, false);
1283    for i in 0..len {
1284        if let Some(j) = offset_fn(i) {
1285            out[i] = data[j];
1286            let is_valid = mask.map_or(true, |m| unsafe { m.get_unchecked(j) });
1287            unsafe { out_mask.set_unchecked(i, is_valid) };
1288        } else {
1289            out[i] = default;
1290        }
1291    }
1292    (out, out_mask)
1293}
1294
1295#[inline(always)]
1296fn shift_str_with_bounds<T: Integer>(
1297    arr: StringAVT<T>,
1298    offset_fn: impl Fn(usize) -> Option<usize>,
1299) -> Result<StringArray<T>, KernelError> {
1300    let (array, offset, len) = arr;
1301    let src_mask = array.null_mask.as_ref();
1302    let _ = confirm_mask_capacity(array.len(), src_mask)?;
1303
1304    // Determine offsets and total bytes required
1305    let mut offsets = Vec64::<T>::with_capacity(len + 1);
1306    unsafe {
1307        offsets.set_len(len + 1);
1308    }
1309    offsets[0] = T::zero();
1310
1311    let mut total_bytes = 0;
1312    let mut string_lengths = vec![0usize; len];
1313
1314    for i in 0..len {
1315        let byte_len = if let Some(j) = offset_fn(i) {
1316            let src_idx = offset + j;
1317            let valid = src_mask.map_or(true, |m| unsafe { m.get_unchecked(src_idx) });
1318            if valid {
1319                unsafe { array.get_unchecked(src_idx).unwrap_or("").len() }
1320            } else {
1321                0
1322            }
1323        } else {
1324            0
1325        };
1326        total_bytes += byte_len;
1327        string_lengths[i] = byte_len;
1328        offsets[i + 1] = T::from_usize(total_bytes);
1329    }
1330
1331    // Allocate data buffer
1332    let mut data = Vec64::<u8>::with_capacity(total_bytes);
1333    let mut out_mask = Bitmask::new_set_all(len, false);
1334
1335    // Write string content
1336    for i in 0..len {
1337        if let Some(j) = offset_fn(i) {
1338            let src_idx = offset + j;
1339            let valid = src_mask.map_or(true, |m| unsafe { m.get_unchecked(src_idx) });
1340            if valid {
1341                let s = unsafe { array.get_unchecked(src_idx).unwrap_or("") };
1342                data.extend_from_slice(s.as_bytes());
1343                unsafe { out_mask.set_unchecked(i, true) };
1344                continue;
1345            }
1346        }
1347        // Not valid or OOB write nothing
1348    }
1349
1350    Ok(StringArray {
1351        offsets: offsets.into(),
1352        data: data.into(),
1353        null_mask: Some(out_mask),
1354    })
1355}
1356
1357// Integer
1358
1359/// Accesses values from previous positions in integer arrays with configurable offset.
1360///
1361/// Implements SQL LAG() window function semantics, retrieving values from earlier positions
1362/// in the array sequence. Essential for time series analysis, trend detection, and
1363/// comparative analytics requiring access to historical data points.
1364///
1365/// ## Parameters
1366/// * `window` - Integer array view containing sequential data for lag access
1367/// * `n` - Lag offset specifying how many positions to look backward
1368///
1369/// ## Returns
1370/// Returns an `IntegerArray<T>` containing:
1371/// - Values from n positions earlier in the sequence
1372/// - Default values for positions where lag source is unavailable
1373/// - Null mask indicating validity of lagged values
1374///
1375/// ## Examples
1376/// ```rust,ignore
1377/// use minarrow::IntegerArray;
1378/// use simd_kernels::kernels::window::lag_int;
1379///
1380/// let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1381/// let result = lag_int((&arr, 0, arr.len()), 1);
1382/// ```
1383#[inline]
1384pub fn lag_int<T: Copy + Default>(window: IntegerAVT<T>, n: usize) -> IntegerArray<T> {
1385    let (arr, offset, len) = window;
1386    let data_window = &arr.data[offset..offset + len];
1387    let mask_window = if len != arr.data.len() {
1388        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1389    } else {
1390        arr.null_mask.clone()
1391    };
1392    let (data, null_mask) = shift_with_bounds(
1393        data_window,
1394        mask_window.as_ref(),
1395        len,
1396        |i| if i >= n { Some(i - n) } else { None },
1397        T::default(),
1398    );
1399    IntegerArray {
1400        data: data.into(),
1401        null_mask: Some(null_mask),
1402    }
1403}
1404
1405/// Accesses values from future positions in integer arrays with configurable offset.
1406///
1407/// Implements SQL LEAD() window function semantics, retrieving values from later positions
1408/// in the array sequence. Essential for predictive analytics, forward-looking comparisons,
1409/// and temporal analysis requiring access to future data points.
1410///
1411/// ## Parameters
1412/// * `window` - Integer array view containing sequential data for lead access
1413/// * `n` - Lead offset specifying how many positions to look forward
1414///
1415/// ## Returns
1416/// Returns an `IntegerArray<T>` containing:
1417/// - Values from n positions later in the sequence
1418/// - Default values for positions where lead source is unavailable
1419/// - Null mask indicating validity of lead values
1420///
1421/// ## Examples
1422/// ```rust,ignore
1423/// use minarrow::IntegerArray;
1424/// use simd_kernels::kernels::window::lead_int;
1425///
1426/// let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1427/// let result = lead_int((&arr, 0, arr.len()), 2);
1428/// ```
1429#[inline]
1430pub fn lead_int<T: Copy + Default>(window: IntegerAVT<T>, n: usize) -> IntegerArray<T> {
1431    let (arr, offset, len) = window;
1432    let data_window = &arr.data[offset..offset + len];
1433    let mask_window = if len != arr.data.len() {
1434        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1435    } else {
1436        arr.null_mask.clone()
1437    };
1438    let (data, null_mask) = shift_with_bounds(
1439        data_window,
1440        mask_window.as_ref(),
1441        len,
1442        |i| if i + n < len { Some(i + n) } else { None },
1443        T::default(),
1444    );
1445    IntegerArray {
1446        data: data.into(),
1447        null_mask: Some(null_mask),
1448    }
1449}
1450
1451/// Accesses values from previous positions in floating-point arrays with IEEE 754 compliance.
1452///
1453/// Implements SQL LAG() function for floating-point data, retrieving values from earlier
1454/// positions while maintaining IEEE 754 semantics for special values (NaN, infinity).
1455/// Critical for time series analysis and numerical sequence processing.
1456///
1457/// ## Parameters
1458/// * `window` - Float array view containing sequential floating-point data
1459/// * `n` - Lag offset specifying backward position distance
1460///
1461/// ## Returns
1462/// Returns a `FloatArray<T>` containing:
1463/// - Floating-point values from n positions earlier
1464/// - Zero values for positions with insufficient history
1465/// - Null mask reflecting lag validity and special value handling
1466///
1467/// ## Examples
1468/// ```rust,ignore
1469/// use minarrow::FloatArray;
1470/// use simd_kernels::kernels::window::lag_float;
1471///
1472/// let arr = FloatArray::<f64>::from_slice(&[1.0, 2.5, 3.14, 4.7]);
1473/// let result = lag_float((&arr, 0, arr.len()), 1);
1474/// ```
1475#[inline]
1476pub fn lag_float<T: Copy + num_traits::Zero>(window: FloatAVT<T>, n: usize) -> FloatArray<T> {
1477    let (arr, offset, len) = window;
1478    let data_window = &arr.data[offset..offset + len];
1479    let mask_window = if len != arr.data.len() {
1480        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1481    } else {
1482        arr.null_mask.clone()
1483    };
1484    let (data, null_mask) = shift_with_bounds(
1485        data_window,
1486        mask_window.as_ref(),
1487        len,
1488        |i| if i >= n { Some(i - n) } else { None },
1489        T::zero(),
1490    );
1491    FloatArray {
1492        data: data.into(),
1493        null_mask: Some(null_mask),
1494    }
1495}
1496
1497/// Accesses values from future positions in floating-point arrays with IEEE 754 compliance.
1498///
1499/// Implements SQL LEAD() function for floating-point data, retrieving values from later
1500/// positions while preserving IEEE 754 semantics. Essential for forward-looking analysis
1501/// and predictive modeling with continuous numerical data.
1502///
1503/// ## Parameters
1504/// * `window` - Float array view containing sequential floating-point data
1505/// * `n` - Lead offset specifying forward position distance
1506///
1507/// ## Returns
1508/// Returns a `FloatArray<T>` containing:
1509/// - Floating-point values from n positions later
1510/// - Zero values for positions beyond available future
1511/// - Null mask indicating lead validity and special value propagation
1512///
1513/// ## Use Cases
1514/// - **Predictive analytics**: Accessing future values for comparison and modeling
1515/// - **Signal analysis**: Forward-looking operations in digital signal processing
1516/// - **Financial modeling**: Computing forward returns and future value analysis
1517/// - **Scientific computing**: Implementing forward difference schemes
1518///
1519/// ## Examples
1520/// ```rust,ignore
1521/// use minarrow::FloatArray;
1522/// use simd_kernels::kernels::window::lead_float;
1523///
1524/// let arr = FloatArray::<f32>::from_slice(&[1.1, 2.2, 3.3, 4.4]);
1525/// let result = lead_float((&arr, 0, arr.len()), 2);
1526/// // Output: [3.3, 4.4, 0.0, 0.0] - lead by 2 positions
1527/// ```
1528#[inline]
1529pub fn lead_float<T: Copy + num_traits::Zero>(window: FloatAVT<T>, n: usize) -> FloatArray<T> {
1530    let (arr, offset, len) = window;
1531    let data_window = &arr.data[offset..offset + len];
1532    let mask_window = if len != arr.data.len() {
1533        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1534    } else {
1535        arr.null_mask.clone()
1536    };
1537    let (data, null_mask) = shift_with_bounds(
1538        data_window,
1539        mask_window.as_ref(),
1540        len,
1541        |i| if i + n < len { Some(i + n) } else { None },
1542        T::zero(),
1543    );
1544    FloatArray {
1545        data: data.into(),
1546        null_mask: Some(null_mask),
1547    }
1548}
1549
1550// String
1551
1552/// Accesses string values from previous positions with UTF-8 string handling.
1553///
1554/// Implements SQL LAG() function for string data, retrieving textual values from earlier
1555/// positions in the array sequence. Essential for textual analysis, sequential string
1556/// processing, and comparative text analytics.
1557///
1558/// ## Parameters
1559/// * `arr` - String array view containing sequential textual data
1560/// * `n` - Lag offset specifying backward position distance
1561///
1562/// ## Returns
1563/// Returns `Result<StringArray<T>, KernelError>` containing:
1564/// - **Success**: String values from n positions earlier
1565/// - **Error**: KernelError if string processing fails
1566/// - Empty strings for positions with insufficient history
1567/// - Null mask indicating lag validity and source availability
1568///
1569/// ## String Lag Semantics
1570/// - **UTF-8 preservation**: Maintains proper UTF-8 encoding throughout operation
1571/// - **Null propagation**: Null strings in source positions result in null outputs
1572/// - **Memory management**: Efficient string copying and allocation strategies
1573/// - **Boundary handling**: Positions without history receive empty string defaults
1574///
1575/// ## Applications
1576/// - **Text analysis**: Comparing current text with previous entries
1577/// - **Sequential processing**: Analysing patterns in ordered textual data
1578/// - **Log analysis**: Accessing previous log entries for context
1579/// - **Natural language processing**: Context-aware text processing with history
1580///
1581/// ## Examples
1582/// ```rust,ignore
1583/// use minarrow::StringArray;
1584/// use simd_kernels::kernels::window::lag_str;
1585///
1586/// let arr = StringArray::<u32>::from_slice(&["first", "second", "third"]);
1587/// let result = lag_str((&arr, 0, arr.len()), 1).unwrap();
1588/// // Output: ["", "first", "second"] - strings lagged by 1 position
1589/// ```
1590#[inline]
1591pub fn lag_str<T: Integer>(arr: StringAVT<T>, n: usize) -> Result<StringArray<T>, KernelError> {
1592    shift_str_with_bounds(arr, |i| if i >= n { Some(i - n) } else { None })
1593}
1594
1595/// Accesses string values from future positions with efficient UTF-8 processing.
1596///
1597/// Implements SQL LEAD() function for string data, retrieving textual values from later
1598/// positions in the array sequence. Critical for forward-looking text analysis and
1599/// sequential string pattern recognition.
1600///
1601/// ## Parameters
1602/// * `arr` - String array view containing sequential textual data
1603/// * `n` - Lead offset specifying forward position distance
1604///
1605/// ## Returns
1606/// Returns `Result<StringArray<T>, KernelError>` containing:
1607/// - **Success**: String values from n positions later
1608/// - **Error**: KernelError if string processing encounters issues
1609/// - Empty strings for positions beyond available future
1610/// - Null mask indicating lead validity and source availability
1611///
1612/// ## Examples
1613/// ```rust,ignore
1614/// use minarrow::StringArray;
1615/// use simd_kernels::kernels::window::lead_str;
1616///
1617/// let arr = StringArray::<u32>::from_slice(&["alpha", "beta", "gamma"]);
1618/// let result = lead_str((&arr, 0, arr.len()), 1).unwrap();
1619/// ```
1620#[inline]
1621pub fn lead_str<T: Integer>(arr: StringAVT<T>, n: usize) -> Result<StringArray<T>, KernelError> {
1622    let (_array, _offset, len) = arr;
1623    shift_str_with_bounds(arr, |i| if i + n < len { Some(i + n) } else { None })
1624}
1625
1626// Shift variants
1627/// Shifts integer array elements by specified offset with bidirectional support.
1628///
1629/// Provides unified interface for both LAG and LEAD operations through signed offset
1630/// parameter. Positive offsets implement LEAD semantics (forward shift), negative
1631/// offsets implement LAG semantics (backward shift), enabling flexible positional access.
1632///
1633/// ## Parameters
1634/// * `window` - Integer array view containing data for shifting
1635/// * `offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1636///
1637/// ## Returns
1638/// Returns an `IntegerArray<T>` containing:
1639/// - Shifted integer values according to offset direction
1640/// - Default values for positions beyond available data
1641/// - Null mask reflecting validity of shifted positions
1642///
1643/// ## Shift Semantics
1644/// - **Positive offset**: LEAD operation (shift left, access future values)
1645/// - **Negative offset**: LAG operation (shift right, access past values)
1646/// - **Zero offset**: Identity operation (returns original array)
1647/// - **Boundary handling**: Out-of-bounds positions receive default values
1648///
1649/// ## Applications
1650/// - **Time series analysis**: Flexible temporal shifting for comparison operations
1651/// - **Sequence processing**: Bidirectional access in ordered integer sequences
1652/// - **Algorithm implementation**: Building blocks for complex windowing operations
1653/// - **Data transformation**: Positional transformations in numerical datasets
1654///
1655/// ## Examples
1656/// ```rust,ignore
1657/// use minarrow::IntegerArray;
1658/// use simd_kernels::kernels::window::shift_int;
1659///
1660/// let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4]);
1661/// let lag = shift_int((&arr, 0, arr.len()), -1);  // LAG by 1
1662/// let lead = shift_int((&arr, 0, arr.len()), 2);  // LEAD by 2
1663/// // lag:  [0, 1, 2, 3] - previous values
1664/// // lead: [3, 4, 0, 0] - future values
1665/// ```
1666#[inline(always)]
1667pub fn shift_int<T: Copy + Default>(window: IntegerAVT<T>, offset: isize) -> IntegerArray<T> {
1668    let (arr, win_offset, win_len) = window;
1669    if offset == 0 {
1670        return IntegerArray {
1671            data: Vec64::from_slice(&arr.data[win_offset..win_offset + win_len]).into(),
1672            null_mask: if win_len != arr.data.len() {
1673                arr.null_mask
1674                    .as_ref()
1675                    .map(|m| m.slice_clone(win_offset, win_len))
1676            } else {
1677                arr.null_mask.clone()
1678            },
1679        };
1680    } else if offset > 0 {
1681        lead_int((arr, win_offset, win_len), offset as usize)
1682    } else {
1683        lag_int((arr, win_offset, win_len), offset.unsigned_abs())
1684    }
1685}
1686
1687/// Shifts floating-point array elements with IEEE 754 compliance and bidirectional support.
1688///
1689/// Unified shifting interface for floating-point data supporting both LAG and LEAD semantics
1690/// through signed offset parameter. Maintains IEEE 754 standards for special value handling
1691/// while providing efficient bidirectional positional access.
1692///
1693/// ## Parameters
1694/// * `window` - Float array view containing data for shifting
1695/// * `offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1696///
1697/// ## Returns
1698/// Returns a `FloatArray<T>` containing:
1699/// - Shifted floating-point values preserving IEEE 754 semantics
1700/// - Zero values for positions beyond data boundaries
1701/// - Null mask indicating validity of shifted positions
1702///
1703/// ## Examples
1704/// ```rust,ignore
1705/// use minarrow::FloatArray;
1706/// use simd_kernels::kernels::window::shift_float;
1707///
1708/// let arr = FloatArray::<f64>::from_slice(&[1.1, 2.2, 3.3, 4.4]);
1709/// let backward = shift_float((&arr, 0, arr.len()), -2); // LAG by 2
1710/// let forward = shift_float((&arr, 0, arr.len()), 1);   // LEAD by 1
1711/// ```
1712#[inline(always)]
1713pub fn shift_float<T: Copy + num_traits::Zero>(
1714    window: FloatAVT<T>,
1715    offset: isize,
1716) -> FloatArray<T> {
1717    let (arr, win_offset, win_len) = window;
1718    if offset == 0 {
1719        return FloatArray {
1720            data: Vec64::from_slice(&arr.data[win_offset..win_offset + win_len]).into(),
1721            null_mask: if win_len != arr.data.len() {
1722                arr.null_mask
1723                    .as_ref()
1724                    .map(|m| m.slice_clone(win_offset, win_len))
1725            } else {
1726                arr.null_mask.clone()
1727            },
1728        };
1729    } else if offset > 0 {
1730        lead_float((arr, win_offset, win_len), offset as usize)
1731    } else {
1732        lag_float((arr, win_offset, win_len), offset.unsigned_abs())
1733    }
1734}
1735
1736/// Shifts string array elements with UTF-8 integrity and bidirectional offset support.
1737///
1738/// String shifting function supporting both LAG and LEAD operations through
1739/// signed offset parameter. Maintains UTF-8 encoding integrity while providing flexible
1740/// positional access for textual sequence analysis.
1741///
1742/// ## Parameters
1743/// * `arr` - String array view containing textual data for shifting
1744/// * `shift_offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1745///
1746/// ## Returns
1747/// Returns `Result<StringArray<T>, KernelError>` containing:
1748/// - **Success**: Shifted string values maintaining UTF-8 integrity
1749/// - **Error**: KernelError if string processing encounters issues
1750/// - Empty strings for positions beyond data boundaries
1751/// - Null mask reflecting validity of shifted string positions
1752///
1753/// ## Shift Semantics
1754/// - **Positive offset**: LEAD operation accessing future string values
1755/// - **Negative offset**: LAG operation accessing historical string values
1756/// - **Zero offset**: Identity operation (returns cloned array slice)
1757/// - **Boundary conditions**: Out-of-range positions produce empty strings
1758///
1759/// ## Examples
1760/// ```rust,ignore
1761/// use minarrow::StringArray;
1762/// use simd_kernels::kernels::window::shift_str;
1763///
1764/// let arr = StringArray::<u32>::from_slice(&["one", "two", "three"]);
1765/// let back = shift_str((&arr, 0, arr.len()), -1).unwrap(); // LAG
1766/// let forward = shift_str((&arr, 0, arr.len()), 1).unwrap(); // LEAD
1767/// // back:    ["", "one", "two"]
1768/// // forward: ["two", "three", ""]
1769/// ```
1770#[inline(always)]
1771pub fn shift_str<T: Integer>(
1772    arr: StringAVT<T>,
1773    shift_offset: isize,
1774) -> Result<StringArray<T>, KernelError> {
1775    if shift_offset == 0 {
1776        // Return this slice's window as a cloned StringArray
1777        let (array, off, len) = arr;
1778        Ok(array.slice_clone(off, len))
1779    } else if shift_offset > 0 {
1780        lead_str(arr, shift_offset as usize)
1781    } else {
1782        lag_str(arr, shift_offset.unsigned_abs())
1783    }
1784}
1785
1786#[cfg(test)]
1787mod tests {
1788    use minarrow::structs::variants::float::FloatArray;
1789    use minarrow::structs::variants::integer::IntegerArray;
1790    use minarrow::structs::variants::string::StringArray;
1791    use minarrow::{Bitmask, BooleanArray};
1792
1793    use super::*;
1794
1795    // ─────────────────────────── Helpers ───────────────────────────
1796
1797    /// Build a `Bitmask` from booleans.
1798    fn bm(bits: &[bool]) -> Bitmask {
1799        let mut m = Bitmask::new_set_all(bits.len(), false);
1800        for (i, &b) in bits.iter().enumerate() {
1801            m.set(i, b);
1802        }
1803        m
1804    }
1805
1806    /// Simple equality for `IntegerArray<T>`
1807    fn expect_int<T: PartialEq + std::fmt::Debug + Clone>(
1808        arr: &IntegerArray<T>,
1809        values: &[T],
1810        valid: &[bool],
1811    ) {
1812        assert_eq!(arr.data.as_slice(), values);
1813        let mask = arr.null_mask.as_ref().expect("mask missing");
1814        for (i, &v) in valid.iter().enumerate() {
1815            assert_eq!(mask.get(i), v, "mask bit {}", i);
1816        }
1817    }
1818
1819    /// Simple equality for `FloatArray<T>`
1820    fn expect_float<T: num_traits::Float + std::fmt::Debug>(
1821        arr: &FloatArray<T>,
1822        values: &[T],
1823        valid: &[bool],
1824        eps: T,
1825    ) {
1826        let data = arr.data.as_slice();
1827        assert_eq!(data.len(), values.len());
1828        for (a, b) in data.iter().zip(values.iter()) {
1829            assert!((*a - *b).abs() <= eps, "value mismatch {:?} vs {:?}", a, b);
1830        }
1831        let mask = arr.null_mask.as_ref().expect("mask missing");
1832        for (i, &v) in valid.iter().enumerate() {
1833            assert_eq!(mask.get(i), v);
1834        }
1835    }
1836
1837    // ───────────────────────── Rolling kernels ─────────────────────────
1838
1839    #[test]
1840    fn test_rolling_sum_int_basic() {
1841        let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
1842        let res = rolling_sum_int((&arr, 0, arr.len()), 3);
1843        expect_int(&res, &[0, 0, 6, 9, 12], &[false, false, true, true, true]);
1844    }
1845
1846    #[test]
1847    fn test_rolling_sum_int_masked() {
1848        let mut arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4]);
1849        arr.null_mask = Some(bm(&[true, false, true, true]));
1850        let res = rolling_sum_int((&arr, 0, arr.len()), 2);
1851        expect_int(
1852            &res,
1853            &[0, 0, 3, 7],
1854            &[false, false, false, true], // window valid only when no nulls in window
1855        );
1856    }
1857
1858    #[test]
1859    fn test_rolling_sum_float() {
1860        let arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0]);
1861        let res = rolling_sum_float((&arr, 0, arr.len()), 2);
1862        expect_float(&res, &[0.0, 0.0, 5.0], &[false, false, true], 1e-6f32);
1863    }
1864
1865    #[test]
1866    fn test_rolling_sum_bool() {
1867        let bools = BooleanArray::from_slice(&[true, true, false, true]);
1868        let res = rolling_sum_bool((&bools, 0, bools.len()), 2); // counts trues over window
1869        expect_int(&res, &[0, 0, 1, 1], &[false, false, true, true]);
1870    }
1871
1872    #[test]
1873    fn test_rolling_min_max_mean_count() {
1874        let arr = IntegerArray::<i32>::from_slice(&[3, 1, 4, 1, 5]);
1875        // min
1876        let rmin = rolling_min_int((&arr, 0, arr.len()), 2);
1877        expect_int(&rmin, &[0, 0, 1, 1, 1], &[false, false, true, true, true]);
1878
1879        // max
1880        let rmax = rolling_max_int((&arr, 0, arr.len()), 3);
1881        expect_int(&rmax, &[0, 0, 4, 4, 5], &[false, false, true, true, true]);
1882
1883        // mean
1884        let rmean = rolling_mean_int((&arr, 0, arr.len()), 2);
1885        expect_float(
1886            &rmean,
1887            &[0.0, 0.0, 2.5, 2.5, 3.0],
1888            &[false, false, true, true, true],
1889            1e-12,
1890        );
1891
1892        // count
1893        let cnt = rolling_count((0, 5), 3);
1894        expect_int(&cnt, &[0, 0, 3, 3, 3], &[false, false, true, true, true]);
1895    }
1896
1897    // ───────────────────────── Rank / Dense-rank ─────────────────────────
1898
1899    #[test]
1900    fn test_rank_int_basic() {
1901        let arr = IntegerArray::<i32>::from_slice(&[30, 10, 20]);
1902        let res = rank_int((&arr, 0, arr.len()));
1903        expect_int(&res, &[3, 1, 2], &[true, true, true]);
1904    }
1905
1906    #[test]
1907    fn test_rank_float_with_nulls() {
1908        let mut arr = FloatArray::<f64>::from_slice(&[2.0, 1.0, 3.0]);
1909        arr.null_mask = Some(bm(&[true, false, true]));
1910        let res = rank_float((&arr, 0, arr.len()));
1911        expect_int(&res, &[2, 0, 3], &[true, false, true]);
1912    }
1913
1914    #[test]
1915    fn test_dense_rank_str_duplicates() {
1916        let arr = StringArray::<u32>::from_slice(&["b", "a", "b", "c"]);
1917        let res = dense_rank_str((&arr, 0, arr.len())).unwrap();
1918        expect_int(&res, &[2, 1, 2, 3], &[true, true, true, true]);
1919    }
1920
1921    #[test]
1922    fn test_dense_rank_str_duplicates_chunk() {
1923        // Windowed over ["x", "b", "a", "b", "c", "y"]
1924        let arr = StringArray::<u32>::from_slice(&["x", "b", "a", "b", "c", "y"]);
1925        let res = dense_rank_str((&arr, 1, 4)).unwrap(); // window is "b", "a", "b", "c"
1926        expect_int(&res, &[2, 1, 2, 3], &[true, true, true, true]);
1927    }
1928
1929    // ───────────────────────── Lag / Lead / Shift ─────────────────────────
1930
1931    #[test]
1932    fn test_lag_lead_int() {
1933        let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1934        let lag1 = lag_int((&arr, 0, arr.len()), 1);
1935        expect_int(&lag1, &[0, 10, 20, 30], &[false, true, true, true]);
1936
1937        let lead2 = lead_int((&arr, 0, arr.len()), 2);
1938        expect_int(&lead2, &[30, 40, 0, 0], &[true, true, false, false]);
1939    }
1940
1941    #[test]
1942    fn test_shift_float_positive_negative_zero() {
1943        let arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0]);
1944        let s0 = shift_float((&arr, 0, arr.len()), 0);
1945        assert_eq!(s0.data, arr.data); // exact copy
1946
1947        let s1 = shift_float((&arr, 0, arr.len()), 1);
1948        expect_float(&s1, &[2.0, 3.0, 0.0], &[true, true, false], 1e-6f32);
1949
1950        let s_neg = shift_float((&arr, 0, arr.len()), -1);
1951        expect_float(&s_neg, &[0.0, 1.0, 2.0], &[false, true, true], 1e-6f32);
1952    }
1953
1954    #[test]
1955    fn test_lag_lead_str() {
1956        let arr = StringArray::<u32>::from_slice(&["a", "b", "c"]);
1957        let l1 = lag_str((&arr, 0, arr.len()), 1).unwrap();
1958        assert_eq!(l1.get(0), None);
1959        assert_eq!(l1.get(1), Some("a"));
1960        assert_eq!(l1.get(2), Some("b"));
1961
1962        let d1 = lead_str((&arr, 0, arr.len()), 1).unwrap();
1963        assert_eq!(d1.get(0), Some("b"));
1964        assert_eq!(d1.get(1), Some("c"));
1965        assert_eq!(d1.get(2), None);
1966    }
1967
1968    #[test]
1969    fn test_lag_lead_str_chunk() {
1970        // Window is ["x", "a", "b", "c", "y"], test on chunk "a", "b", "c"
1971        let arr = StringArray::<u32>::from_slice(&["x", "a", "b", "c", "y"]);
1972        let l1 = lag_str((&arr, 1, 3), 1).unwrap();
1973        assert_eq!(l1.get(0), None);
1974        assert_eq!(l1.get(1), Some("a"));
1975        assert_eq!(l1.get(2), Some("b"));
1976
1977        let d1 = lead_str((&arr, 1, 3), 1).unwrap();
1978        assert_eq!(d1.get(0), Some("b"));
1979        assert_eq!(d1.get(1), Some("c"));
1980        assert_eq!(d1.get(2), None);
1981    }
1982
1983    #[test]
1984    fn test_rolling_sum_int_edge_windows() {
1985        let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
1986
1987        // window = 0 → all zeros + mask all false
1988        let r0 = rolling_sum_int((&arr, 0, arr.len()), 0);
1989        assert_eq!(r0.data.as_slice(), &[0, 0, 0, 0, 0]);
1990        assert_eq!(r0.null_mask.as_ref().unwrap().all_unset(), true);
1991
1992        // window = 1 → identity
1993        let r1 = rolling_sum_int((&arr, 0, arr.len()), 1);
1994        assert_eq!(r1.data.as_slice(), &[1, 2, 3, 4, 5]);
1995        assert!(r1.null_mask.as_ref().unwrap().all_set());
1996
1997        // window > len → all zero + all false
1998        let r_large = rolling_sum_int((&arr, 0, arr.len()), 10);
1999        assert_eq!(r_large.data.as_slice(), &[0, 0, 0, 0, 0]);
2000        assert_eq!(r_large.null_mask.as_ref().unwrap().all_unset(), true);
2001    }
2002
2003    #[test]
2004    fn test_rolling_sum_float_masked_nulls_propagate() {
2005        let mut arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0, 4.0]);
2006        // null in the middle
2007        arr.null_mask = Some(bm(&[true, true, false, true]));
2008        let r = rolling_sum_float((&arr, 0, arr.len()), 2);
2009        //   i=0: <full-window, 0.0, false>
2010        //   i=1: first full-window → cleared → 0.0, false
2011        //   i=2: window contains null → mask=false, but value = 2.0
2012        //   i=3: window contains null → mask=false, but value = 4.0
2013        expect_float(
2014            &r,
2015            &[0.0, 0.0, 2.0, 4.0],
2016            &[false, false, false, false],
2017            1e-6,
2018        );
2019    }
2020
2021    #[test]
2022    fn test_rolling_sum_bool_with_nulls() {
2023        let mut b = BooleanArray::from_slice(&[true, false, true, true]);
2024        b.null_mask = Some(bm(&[true, false, true, true]));
2025        let r = rolling_sum_bool((&b, 0, b.len()), 2);
2026        // windows [t,f], [f,t], [t,t] → only last window is all non-null
2027        expect_int(&r, &[0, 0, 1, 2], &[false, false, false, true]);
2028    }
2029
2030    #[test]
2031    fn test_lag_str_null_propagation() {
2032        let mut arr = StringArray::<u32>::from_slice(&["x", "y", "z"]);
2033        arr.null_mask = Some(bm(&[true, false, true])); // y is null
2034        let lag1 = lag_str((&arr, 0, arr.len()), 1).unwrap();
2035        assert_eq!(lag1.get(0), None); // no source → null
2036        assert_eq!(lag1.get(1), Some("x"));
2037        assert_eq!(lag1.get(2), None); // source was null
2038        let m = lag1.null_mask.unwrap();
2039        assert_eq!(m.get(0), false);
2040        assert_eq!(m.get(1), true);
2041        assert_eq!(m.get(2), false);
2042    }
2043
2044    #[test]
2045    fn test_lag_str_null_propagation_chunk() {
2046        // Window ["w", "x", "y", "z", "q"], test on chunk "x", "y", "z"
2047        let mut arr = StringArray::<u32>::from_slice(&["w", "x", "y", "z", "q"]);
2048        arr.null_mask = Some(bm(&[true, true, false, true, true]));
2049        let lag1 = lag_str((&arr, 1, 3), 1).unwrap();
2050        assert_eq!(lag1.get(0), None); // "x", index 0 in chunk, no source
2051        assert_eq!(lag1.get(1), Some("x")); // "y", index 1 pulls "x" (valid)
2052        assert_eq!(lag1.get(2), None); // "z", index 2 pulls "y" (invalid)
2053        let m = lag1.null_mask.unwrap();
2054        assert_eq!(m.get(0), false);
2055        assert_eq!(m.get(1), true);
2056        assert_eq!(m.get(2), false);
2057    }
2058
2059    #[test]
2060    fn test_shift_str_large_offset() {
2061        let arr = StringArray::<u32>::from_slice(&["a", "b", "c"]);
2062        let shifted = shift_str((&arr, 0, arr.len()), 10).unwrap(); // > len
2063        assert_eq!(shifted.len(), 3);
2064        for i in 0..3 {
2065            assert_eq!(shifted.get(i), None);
2066            assert_eq!(shifted.null_mask.as_ref().unwrap().get(i), false);
2067        }
2068    }
2069
2070    #[test]
2071    fn test_shift_str_large_offset_chunk() {
2072        // Window ["w", "a", "b", "c", "x"]
2073        let arr = StringArray::<u32>::from_slice(&["w", "a", "b", "c", "x"]);
2074        let shifted = shift_str((&arr, 1, 3), 10).unwrap(); // window is "a","b","c"
2075        assert_eq!(shifted.len(), 3);
2076        for i in 0..3 {
2077            assert_eq!(shifted.get(i), None);
2078            assert_eq!(shifted.null_mask.as_ref().unwrap().get(i), false);
2079        }
2080    }
2081
2082    #[test]
2083    fn test_rank_str_ties_and_nulls() {
2084        let mut arr = StringArray::<u32>::from_slice(&["a", "b", "a", "c"]);
2085        arr.null_mask = Some(bm(&[true, true, false, true]));
2086        let r = rank_str((&arr, 0, arr.len())).unwrap();
2087        // valid positions: 0="a"(rank1),1="b"(rank3),2=null,3="c"(rank4)
2088        expect_int(&r, &[1, 2, 0, 3], &[true, true, false, true]);
2089    }
2090
2091    #[test]
2092    fn test_rank_str_ties_and_nulls_chunk() {
2093        // Window ["w", "a", "b", "a", "c"]
2094        let mut arr = StringArray::<u32>::from_slice(&["w", "a", "b", "a", "c"]);
2095        arr.null_mask = Some(bm(&[true, true, true, false, true]));
2096        let r = rank_str((&arr, 1, 4)).unwrap(); // "a","b","a","c"
2097        expect_int(&r, &[1, 2, 0, 3], &[true, true, false, true]);
2098    }
2099}