simd_kernels/kernels/
window.rs

1// Copyright Peter Bower 2025. All Rights Reserved.
2// Licensed under Mozilla Public License (MPL) 2.0.
3
4//! # **Window Functions Kernels Module** - *High-Performance Analytical Window Operations*
5//!
6//! Advanced window function kernels for sliding window computations,
7//! ranking operations, and positional analytics with SIMD acceleration and null-aware semantics.
8//! Backbone of time series analysis, analytical SQL window functions, and chunked streaming computations.
9//!
10//! ## Core Operations
11//! - **Moving averages**: Rolling mean calculations with configurable window sizes
12//! - **Cumulative functions**: Running sums, products, and statistical aggregations  
13//! - **Ranking functions**: ROW_NUMBER, RANK, DENSE_RANK with tie-handling strategies
14//! - **Lead/lag operations**: Positional value access with configurable offsets
15//! - **Percentile functions**: Moving quantile calculations with interpolation support
16//! - **Window aggregates**: MIN, MAX, SUM operations over sliding windows
17
18include!(concat!(env!("OUT_DIR"), "/simd_lanes.rs"));
19
20use std::marker::PhantomData;
21
22use minarrow::{
23    Bitmask, BooleanAVT, BooleanArray, FloatArray, Integer, IntegerArray, Length, MaskedArray,
24    Offset, StringArray, Vec64,
25    aliases::{FloatAVT, IntegerAVT},
26    enums::error::KernelError,
27    vec64,
28};
29use num_traits::{Float, Num, NumCast, One, Zero};
30
31use minarrow::StringAVT;
32use minarrow::utils::confirm_mask_capacity;
33
34// Helpers
35#[inline(always)]
36fn new_null_mask(len: usize) -> Bitmask {
37    Bitmask::new_set_all(len, false)
38}
39
40#[inline(always)]
41fn prealloc_vec<T: Copy>(len: usize) -> Vec64<T> {
42    let mut v = Vec64::<T>::with_capacity(len);
43    unsafe { v.set_len(len) };
44    v
45}
46
47// Rolling kernels (sum, product, min, max, mean, count)
48
49/// Generic sliding window aggregator for kernels that allow an
50/// incremental push and pop update (sum, product, etc.).
51/// Always emits the running aggregate, even when the subwindow has nulls.
52/// Only flags “valid” once the full subwindow has been seen.
53#[inline(always)]
54fn rolling_push_pop<T, FAdd, FRem>(
55    data: &[T],
56    mask: Option<&Bitmask>,
57    subwindow: usize,
58    mut add: FAdd,
59    mut remove: FRem,
60    zero: T,
61) -> (Vec64<T>, Bitmask)
62where
63    T: Copy,
64    FAdd: FnMut(T, T) -> T,
65    FRem: FnMut(T, T) -> T,
66{
67    let n = data.len();
68    let mut out = prealloc_vec::<T>(n);
69    let mut out_mask = new_null_mask(n);
70
71    if subwindow == 0 {
72        for slot in &mut out {
73            *slot = zero;
74        }
75        return (out, out_mask);
76    }
77
78    let mut agg = zero;
79    let mut invalids = 0usize;
80    for i in 0..n {
81        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
82            agg = add(agg, data[i]);
83        } else {
84            invalids += 1;
85        }
86        if i + 1 > subwindow {
87            let j = i + 1 - subwindow - 1;
88            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
89                agg = remove(agg, data[j]);
90            } else {
91                invalids -= 1;
92            }
93        }
94        if i + 1 < subwindow {
95            unsafe { out_mask.set_unchecked(i, false) };
96            out[i] = zero;
97        } else {
98            let ok = invalids == 0;
99            unsafe { out_mask.set_unchecked(i, ok) };
100            out[i] = agg;
101        }
102    }
103    (out, out_mask)
104}
105
106/// Generic rolling extreme aggregator (min/max) for a subwindow over a slice.
107#[inline(always)]
108fn rolling_extreme<T, F>(
109    data: &[T],
110    mask: Option<&Bitmask>,
111    subwindow: usize,
112    mut better: F,
113    zero: T,
114) -> (Vec64<T>, Bitmask)
115where
116    T: Copy,
117    F: FnMut(&T, &T) -> bool,
118{
119    let n = data.len();
120    let mut out = prealloc_vec::<T>(n);
121    let mut out_mask = new_null_mask(n);
122
123    if subwindow == 0 {
124        return (out, out_mask);
125    }
126
127    for i in 0..n {
128        if i + 1 < subwindow {
129            unsafe { out_mask.set_unchecked(i, false) };
130            out[i] = zero;
131            continue;
132        }
133        let start = i + 1 - subwindow;
134        let mut found = false;
135        let mut extreme = zero;
136        for j in start..=i {
137            if mask.map_or(true, |m| unsafe { m.get_unchecked(j) }) {
138                if !found {
139                    extreme = data[j];
140                    found = true;
141                } else if better(&data[j], &extreme) {
142                    extreme = data[j];
143                }
144            } else {
145                found = false;
146                break;
147            }
148        }
149        unsafe { out_mask.set_unchecked(i, found) };
150        out[i] = if found { extreme } else { zero };
151    }
152    (out, out_mask)
153}
154
155/// Computes rolling sums over a sliding window for integer data with null-aware semantics.
156///
157/// Applies a sliding window of configurable size to compute cumulative sums, employing
158/// incremental computation to avoid O(n²) complexity through efficient push-pop operations.
159/// Each position in the output represents the sum of values within the preceding window.
160///
161/// ## Parameters
162/// * `window` - Integer array view containing the data, offset, and length information
163/// * `subwindow` - Size of the sliding window (number of elements to sum)
164///
165/// ## Returns
166/// Returns an `IntegerArray<T>` containing:
167/// - Rolling sums for each position where a complete window exists
168/// - Zero values for positions before the window is complete
169/// - Null mask indicating validity (false for incomplete windows or null-contaminated windows)
170///
171/// ## Examples
172/// ```rust,ignore
173/// use minarrow::IntegerArray;
174/// use simd_kernels::kernels::window::rolling_sum_int;
175///
176/// let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
177/// let result = rolling_sum_int((&arr, 0, arr.len()), 3);
178/// ```
179#[inline]
180pub fn rolling_sum_int<T: Num + Copy + Zero>(
181    window: IntegerAVT<'_, T>,
182    subwindow: usize,
183) -> IntegerArray<T> {
184    let (arr, offset, len) = window;
185    let data = &arr.data[offset..offset + len];
186    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
187    let (mut out, mut out_mask) = rolling_push_pop(
188        data,
189        mask.as_ref(),
190        subwindow,
191        |a, b| a + b,
192        |a, b| a - b,
193        T::zero(),
194    );
195    if arr.null_mask.is_some() && subwindow > 0 && subwindow - 1 < out.len() {
196        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
197        out[subwindow - 1] = T::zero();
198    }
199    IntegerArray {
200        data: out.into(),
201        null_mask: Some(out_mask),
202    }
203}
204
205/// Computes rolling sums over a sliding window for floating-point data with IEEE 754 compliance.
206///
207/// Applies incremental computation to calculate cumulative sums across sliding windows,
208/// maintaining numerical stability through careful accumulation strategies. Handles
209/// special floating-point values (infinity, NaN) according to IEEE 754 semantics.
210///
211/// ## Parameters
212/// * `window` - Float array view containing the data, offset, and length information
213/// * `subwindow` - Size of the sliding window for summation
214///
215/// ## Returns
216/// Returns a `FloatArray<T>` containing:
217/// - Rolling sums computed incrementally for efficiency
218/// - Zero values for positions with incomplete windows
219/// - Proper null mask for window validity tracking
220///
221/// ## Examples
222/// ```rust,ignore
223/// use minarrow::FloatArray;
224/// use simd_kernels::kernels::window::rolling_sum_float;
225///
226/// let arr = FloatArray::<f64>::from_slice(&[1.5, 2.3, 3.7, 4.1]);
227/// let result = rolling_sum_float((&arr, 0, arr.len()), 2);
228/// ```
229#[inline]
230pub fn rolling_sum_float<T: Float + Copy + Zero>(
231    window: FloatAVT<'_, T>,
232    subwindow: usize,
233) -> FloatArray<T> {
234    let (arr, offset, len) = window;
235    let data = &arr.data[offset..offset + len];
236    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
237    let (mut out, mut out_mask) = rolling_push_pop(
238        data,
239        mask.as_ref(),
240        subwindow,
241        |a, b| a + b,
242        |a, b| a - b,
243        T::zero(),
244    );
245    if subwindow > 0 && subwindow - 1 < out.len() {
246        out_mask.set(subwindow - 1, false);
247        out[subwindow - 1] = T::zero();
248    }
249    FloatArray {
250        data: out.into(),
251        null_mask: Some(out_mask),
252    }
253}
254
255/// Computes rolling sums over boolean data, counting true values within sliding windows.
256///
257/// Treats boolean values as integers (true=1, false=0) and applies sliding window
258/// summation to count true occurrences within each window position. Essential for
259/// constructing conditional aggregations and boolean pattern analysis.
260///
261/// ## Parameters
262/// * `window` - Boolean array view with offset and length specifications
263/// * `subwindow` - Number of boolean values to consider in each sliding window
264///
265/// ## Returns
266/// Returns an `IntegerArray<i32>` containing:
267/// - Count of true values within each complete window
268/// - Zero for positions with incomplete windows
269/// - Null mask indicating window completeness and null contamination
270///
271/// ## Examples
272/// ```rust,ignore
273/// use minarrow::BooleanArray;
274/// use simd_kernels::kernels::window::rolling_sum_bool;
275///
276/// let bools = BooleanArray::from_slice(&[true, false, true, true]);
277/// let result = rolling_sum_bool((&bools, 0, bools.len()), 2);
278/// ```
279#[inline]
280pub fn rolling_sum_bool(window: BooleanAVT<'_, ()>, subwindow: usize) -> IntegerArray<i32> {
281    let (arr, offset, len) = window;
282    let bools: Vec<i32> = arr.iter_range(offset, len).map(|b| b as i32).collect();
283    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
284    let (mut out, mut out_mask) = rolling_push_pop(
285        &bools,
286        mask.as_ref(),
287        subwindow,
288        |a, b| a + b,
289        |a, b| a - b,
290        0,
291    );
292    if subwindow > 0 && subwindow - 1 < out.len() {
293        out_mask.set(subwindow - 1, false);
294        out[subwindow - 1] = 0;
295    }
296    IntegerArray {
297        data: out.into(),
298        null_mask: Some(out_mask),
299    }
300}
301
302/// Computes rolling products over a sliding window for integer data with overflow protection.
303///
304/// Applies multiplicative aggregation across sliding windows using incremental computation
305/// through division operations. Maintains numerical stability through careful handling of
306/// zero values and potential overflow conditions in integer arithmetic.
307///
308/// ## Parameters
309/// * `window` - Integer array view containing multiplicands and window specification
310/// * `subwindow` - Number of consecutive elements to multiply in each window
311///
312/// ## Returns
313/// Returns an `IntegerArray<T>` containing:
314/// - Rolling products computed via incremental multiplication/division
315/// - Identity value (1) for positions with incomplete windows
316/// - Null mask reflecting window completeness and null contamination
317///
318/// ## Examples
319/// ```rust,ignore
320/// use minarrow::IntegerArray;
321/// use simd_kernels::kernels::window::rolling_product_int;
322///
323/// let arr = IntegerArray::<i32>::from_slice(&[2, 3, 4, 5]);
324/// let result = rolling_product_int((&arr, 0, arr.len()), 2);
325/// ```
326#[inline]
327pub fn rolling_product_int<T: Num + Copy + One + Zero>(
328    window: IntegerAVT<'_, T>,
329    subwindow: usize,
330) -> IntegerArray<T> {
331    let (arr, offset, len) = window;
332    let data = &arr.data[offset..offset + len];
333    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
334    let (out, out_mask) = rolling_push_pop(
335        data,
336        mask.as_ref(),
337        subwindow,
338        |a, b| a * b,
339        |a, b| a / b,
340        T::one(),
341    );
342    IntegerArray {
343        data: out.into(),
344        null_mask: Some(out_mask),
345    }
346}
347
348/// Computes rolling products over floating-point data with IEEE 754 mathematical semantics.
349///
350/// Performs multiplicative aggregation using incremental computation strategies that
351/// maintain numerical precision through careful handling of special values (infinity,
352/// NaN, zero) according to IEEE 754 standards.
353///
354/// ## Parameters
355/// * `window` - Float array view containing multiplicands for window processing
356/// * `subwindow` - Window size determining number of values to multiply
357///
358/// ## Returns
359/// Returns a `FloatArray<T>` containing:
360/// - Rolling products computed with floating-point precision
361/// - Identity value (1.0) for incomplete window positions
362/// - Comprehensive null mask for validity tracking
363///
364/// ## Examples
365/// ```rust,ignore
366/// use minarrow::FloatArray;
367/// use simd_kernels::kernels::window::rolling_product_float;
368///
369/// let arr = FloatArray::<f64>::from_slice(&[1.5, 2.0, 3.0, 4.0]);
370/// let result = rolling_product_float((&arr, 0, arr.len()), 2);
371/// ```
372#[inline]
373pub fn rolling_product_float<T: Float + Copy + One + Zero>(
374    window: FloatAVT<'_, T>,
375    subwindow: usize,
376) -> FloatArray<T> {
377    let (arr, offset, len) = window;
378    let data = &arr.data[offset..offset + len];
379    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
380    let (out, out_mask) = rolling_push_pop(
381        data,
382        mask.as_ref(),
383        subwindow,
384        |a, b| a * b,
385        |a, b| a / b,
386        T::one(),
387    );
388    FloatArray {
389        data: out.into(),
390        null_mask: Some(out_mask),
391    }
392}
393
394/// Computes rolling logical AND operations over boolean data within sliding windows.
395///
396/// Treats boolean multiplication as logical AND operations, computing the conjunction
397/// of all boolean values within each sliding window. Essential for constructing
398/// compound logical conditions and boolean pattern validation.
399///
400/// ## Parameters
401/// * `window` - Boolean array view containing logical values for conjunction
402/// * `subwindow` - Number of boolean values to AND together in each window
403///
404/// ## Returns
405/// Returns a `BooleanArray<()>` containing:
406/// - Logical AND results for each complete window position
407/// - False values for positions with incomplete windows
408/// - Null mask indicating window validity and null contamination
409///
410/// ## Examples
411/// ```rust,ignore
412/// use minarrow::BooleanArray;
413/// use simd_kernels::kernels::window::rolling_product_bool;
414///
415/// let bools = BooleanArray::from_slice(&[true, true, false, true]);
416/// let result = rolling_product_bool((&bools, 0, bools.len()), 2);
417/// ```
418#[inline]
419pub fn rolling_product_bool(window: BooleanAVT<'_, ()>, subwindow: usize) -> BooleanArray<()> {
420    let (arr, offset, len) = window;
421    let n = len;
422    let mut out_mask = new_null_mask(n);
423    let mut out = Bitmask::new_set_all(n, false);
424
425    for i in 0..n {
426        let start = if i + 1 >= subwindow {
427            i + 1 - subwindow
428        } else {
429            0
430        };
431        let mut acc = true;
432        let mut valid = subwindow > 0 && i + 1 >= subwindow;
433        for j in start..=i {
434            match unsafe { arr.get_unchecked(offset + j) } {
435                Some(val) => acc &= val,
436                None => {
437                    valid = false;
438                    break;
439                }
440            }
441        }
442        unsafe { out_mask.set_unchecked(i, valid) };
443        out.set(i, valid && acc);
444    }
445
446    BooleanArray {
447        data: out.into(),
448        null_mask: Some(out_mask),
449        len: n,
450        _phantom: PhantomData,
451    }
452}
453
454/// Computes rolling arithmetic means over integer data with high-precision floating-point output.
455///
456/// Calculates moving averages across sliding windows, converting integer inputs to double-precision
457/// floating-point for accurate mean computation. Essential for time series analysis and statistical
458/// smoothing operations over integer sequences.
459///
460/// ## Parameters
461/// * `window` - Integer array view containing values for mean calculation
462/// * `subwindow` - Window size determining number of values to average
463///
464/// ## Returns
465/// Returns a `FloatArray<f64>` containing:
466/// - Rolling arithmetic means computed with double precision
467/// - Zero values for positions with incomplete windows
468/// - Null mask indicating window completeness and null contamination
469///
470/// ## Examples
471/// ```rust,ignore
472/// use minarrow::IntegerArray;
473/// use simd_kernels::kernels::window::rolling_mean_int;
474///
475/// let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
476/// let result = rolling_mean_int((&arr, 0, arr.len()), 3);
477/// ```
478#[inline]
479pub fn rolling_mean_int<T: NumCast + Copy + Zero>(
480    window: IntegerAVT<'_, T>,
481    subwindow: usize,
482) -> FloatArray<f64> {
483    let (arr, offset, len) = window;
484    let n = len;
485    let mask_ref = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
486    let mut out = prealloc_vec::<f64>(n);
487    let mut out_mask = new_null_mask(n);
488
489    if subwindow == 0 {
490        return FloatArray {
491            data: out.into(),
492            null_mask: Some(out_mask),
493        };
494    }
495
496    for i in 0..n {
497        if i + 1 < subwindow {
498            unsafe { out_mask.set_unchecked(i, false) };
499            out[i] = 0.0;
500            continue;
501        }
502        let start = i + 1 - subwindow;
503        let mut sum = 0.0;
504        let mut valid = true;
505        for j in start..=i {
506            if mask_ref
507                .as_ref()
508                .map_or(true, |m| unsafe { m.get_unchecked(j) })
509            {
510                sum += num_traits::cast(arr.data[offset + j]).unwrap_or(0.0);
511            } else {
512                valid = false;
513                break;
514            }
515        }
516        unsafe { out_mask.set_unchecked(i, valid) };
517        out[i] = if valid { sum / subwindow as f64 } else { 0.0 };
518    }
519
520    if subwindow > 0 && subwindow - 1 < out.len() {
521        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
522        out[subwindow - 1] = 0.0;
523    }
524
525    FloatArray {
526        data: out.into(),
527        null_mask: Some(out_mask),
528    }
529}
530
531/// Computes rolling arithmetic means over floating-point data with IEEE 754 compliance.
532///
533/// Performs moving average calculations across sliding windows while maintaining the original
534/// floating-point precision. Implements numerically stable summation strategies and proper
535/// handling of special floating-point values (NaN, infinity).
536///
537/// ## Parameters
538/// * `window` - Float array view containing values for mean calculation
539/// * `subwindow` - Window size specifying number of values to average
540///
541/// ## Returns
542/// Returns a `FloatArray<T>` containing:
543/// - Rolling arithmetic means preserving input precision (f32 or f64)
544/// - Zero values for positions with incomplete windows
545/// - Comprehensive null mask for validity indication
546///
547/// ## Examples
548/// ```rust,ignore
549/// use minarrow::FloatArray;
550/// use simd_kernels::kernels::window::rolling_mean_float;
551///
552/// let arr = FloatArray::<f32>::from_slice(&[1.5, 2.5, 3.5, 4.5]);
553/// let result = rolling_mean_float((&arr, 0, arr.len()), 2);
554/// ```
555#[inline]
556pub fn rolling_mean_float<T: Float + Copy + Zero>(
557    window: FloatAVT<'_, T>,
558    subwindow: usize,
559) -> FloatArray<T> {
560    let (arr, offset, len) = window;
561    let n = len;
562    let mask_ref = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
563    let mut out = prealloc_vec::<T>(n);
564    let mut out_mask = new_null_mask(n);
565
566    if subwindow == 0 {
567        return FloatArray {
568            data: out.into(),
569            null_mask: Some(out_mask),
570        };
571    }
572
573    for i in 0..n {
574        if i + 1 < subwindow {
575            unsafe { out_mask.set_unchecked(i, false) };
576            out[i] = T::zero();
577            continue;
578        }
579        let start = i + 1 - subwindow;
580        let mut sum = T::zero();
581        let mut valid = true;
582        for j in start..=i {
583            if mask_ref
584                .as_ref()
585                .map_or(true, |m| unsafe { m.get_unchecked(j) })
586            {
587                sum = sum + arr.data[offset + j];
588            } else {
589                valid = false;
590                break;
591            }
592        }
593        unsafe { out_mask.set_unchecked(i, valid) };
594        out[i] = if valid {
595            sum / T::from(subwindow as u32).unwrap()
596        } else {
597            T::zero()
598        };
599    }
600
601    if subwindow > 0 && subwindow - 1 < out.len() {
602        unsafe { out_mask.set_unchecked(subwindow - 1, false) };
603        out[subwindow - 1] = T::zero();
604    }
605
606    FloatArray {
607        data: out.into(),
608        null_mask: Some(out_mask),
609    }
610}
611
612/// For min, we skip that very first `window` slot so that
613/// the _first_ non-zero result appears one step later.
614/// Computes rolling minimum values over integer data within sliding windows.
615///
616/// Identifies minimum values across sliding windows using efficient comparison strategies.
617/// Each output position represents the smallest value found within the preceding window,
618/// essential for trend analysis and outlier detection in integer sequences.
619///
620/// ## Parameters
621/// * `window` - Integer array view containing values for minimum detection
622/// * `subwindow` - Window size determining scope of minimum search
623///
624/// ## Returns
625/// Returns an `IntegerArray<T>` containing:
626/// - Rolling minimum values for each complete window position
627/// - Zero values for positions with incomplete windows
628/// - Null mask indicating window completeness and validity
629///
630/// ## Use Cases
631/// - **Trend analysis**: Identifying minimum trends in time series data
632/// - **Outlier detection**: Finding exceptionally low values within windows
633/// - **Signal processing**: Detecting minimum signal levels over time intervals
634/// - **Statistical analysis**: Computing rolling minimum statistics
635///
636/// ## Examples
637/// ```rust,ignore
638/// use minarrow::IntegerArray;
639/// use simd_kernels::kernels::window::rolling_min_int;
640///
641/// let arr = IntegerArray::<i32>::from_slice(&[5, 2, 8, 1, 9]);
642/// let result = rolling_min_int((&arr, 0, arr.len()), 3);
643/// // Output: [0, 0, 2, 1, 1] - minimum values in each 3-element window
644/// ```
645#[inline]
646pub fn rolling_min_int<T: Ord + Copy + Zero>(
647    window: IntegerAVT<'_, T>,
648    subwindow: usize,
649) -> IntegerArray<T> {
650    let (arr, offset, len) = window;
651    let data = &arr.data[offset..offset + len];
652    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
653    let (mut out, mut out_mask) =
654        rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a < b, T::zero());
655    if subwindow > 0 && subwindow - 1 < out.len() {
656        out_mask.set(subwindow - 1, false);
657        out[subwindow - 1] = T::zero();
658    }
659    IntegerArray {
660        data: out.into(),
661        null_mask: Some(out_mask),
662    }
663}
664
665/// Computes rolling maximum values over integer data within sliding windows.
666///
667/// Identifies maximum values across sliding windows using efficient comparison operations.
668/// Each output position represents the largest value found within the preceding window,
669/// crucial for peak detection and trend analysis in integer data sequences.
670///
671/// ## Parameters
672/// * `window` - Integer array view containing values for maximum detection
673/// * `subwindow` - Window size determining scope of maximum search
674///
675/// ## Returns
676/// Returns an `IntegerArray<T>` containing:
677/// - Rolling maximum values for each complete window position
678/// - Zero values for positions with incomplete windows
679/// - Comprehensive null mask for validity tracking
680///
681/// ## Applications
682/// - **Peak detection**: Identifying maximum peaks in time series data
683/// - **Trend analysis**: Tracking maximum value trends over sliding intervals
684/// - **Threshold monitoring**: Detecting when maximum values exceed thresholds
685/// - **Signal analysis**: Finding maximum signal amplitudes in windows
686///
687/// ## Examples
688/// ```rust,ignore
689/// use minarrow::IntegerArray;
690/// use simd_kernels::kernels::window::rolling_max_int;
691///
692/// let arr = IntegerArray::<i32>::from_slice(&[3, 7, 2, 9, 4]);
693/// let result = rolling_max_int((&arr, 0, arr.len()), 3);
694/// // Output: [0, 0, 7, 9, 9] - maximum values in each 3-element window
695/// ```
696#[inline]
697pub fn rolling_max_int<T: Ord + Copy + Zero>(
698    window: IntegerAVT<'_, T>,
699    subwindow: usize,
700) -> IntegerArray<T> {
701    let (arr, offset, len) = window;
702    let data = &arr.data[offset..offset + len];
703    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
704    let (out, out_mask) = rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a > b, T::zero());
705    IntegerArray {
706        data: out.into(),
707        null_mask: Some(out_mask),
708    }
709}
710
711/// Computes rolling minimum values over floating-point data with IEEE 754 compliance.
712///
713/// Identifies minimum floating-point values across sliding windows while properly handling
714/// special values (NaN, infinity) according to IEEE 754 standards. Essential for numerical
715/// analysis requiring precise minimum detection in floating-point sequences.
716///
717/// ## Parameters
718/// * `window` - Float array view containing values for minimum computation
719/// * `subwindow` - Window size determining minimum search scope
720///
721/// ## Returns
722/// Returns a `FloatArray<T>` containing:
723/// - Rolling minimum values computed with floating-point precision
724/// - Zero values for positions with incomplete windows
725/// - Null mask reflecting window validity and special value handling
726///
727/// ## Applications
728/// - **Scientific computing**: Finding minimum values in numerical simulations
729/// - **Signal processing**: Detecting minimum signal levels with high precision
730/// - **Financial analysis**: Identifying minimum prices in sliding time windows
731/// - **Data analysis**: Computing rolling minimum statistics for trend detection
732///
733/// ## Examples
734/// ```rust,ignore
735/// use minarrow::FloatArray;
736/// use simd_kernels::kernels::window::rolling_min_float;
737///
738/// let arr = FloatArray::<f64>::from_slice(&[3.14, 2.71, 1.41, 1.73]);
739/// let result = rolling_min_float((&arr, 0, arr.len()), 2);
740/// // Output: [0.0, 0.0, 2.71, 1.41] - minimum values in 2-element windows
741/// ```
742#[inline]
743pub fn rolling_min_float<T: Float + Copy + Zero>(
744    window: FloatAVT<'_, T>,
745    subwindow: usize,
746) -> FloatArray<T> {
747    let (arr, offset, len) = window;
748    let data = &arr.data[offset..offset + len];
749    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
750    let (mut out, mut out_mask) =
751        rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a < b, T::zero());
752    if subwindow > 0 && subwindow - 1 < out.len() {
753        out_mask.set(subwindow - 1, false);
754        out[subwindow - 1] = T::zero();
755    }
756    FloatArray {
757        data: out.into(),
758        null_mask: Some(out_mask),
759    }
760}
761
762/// Computes rolling maximum values over floating-point data with IEEE 754 compliance.
763///
764/// Identifies maximum floating-point values across sliding windows while maintaining
765/// strict adherence to IEEE 754 comparison semantics. Handles special floating-point
766/// values (NaN, infinity) correctly for reliable maximum detection.
767///
768/// ## Parameters
769/// * `window` - Float array view containing values for maximum computation
770/// * `subwindow` - Window size specifying maximum search scope
771///
772/// ## Returns
773/// Returns a `FloatArray<T>` containing:
774/// - Rolling maximum values with full floating-point precision
775/// - Zero values for positions with incomplete windows
776/// - Comprehensive null mask for result validity indication
777///
778/// ## Applications
779/// - **Peak detection**: Identifying maximum peaks in continuous data streams
780/// - **Envelope detection**: Computing upper envelopes of signal data
781/// - **Risk analysis**: Finding maximum risk values in financial time series
782/// - **Scientific measurement**: Detecting maximum readings in sensor data
783///
784/// ## Examples
785/// ```rust,ignore
786/// use minarrow::FloatArray;
787/// use simd_kernels::kernels::window::rolling_max_float;
788///
789/// let arr = FloatArray::<f32>::from_slice(&[1.5, 3.2, 2.1, 4.7]);
790/// let result = rolling_max_float((&arr, 0, arr.len()), 2);
791/// ```
792#[inline]
793pub fn rolling_max_float<T: Float + Copy + Zero>(
794    window: FloatAVT<'_, T>,
795    subwindow: usize,
796) -> FloatArray<T> {
797    let (arr, offset, len) = window;
798    let data = &arr.data[offset..offset + len];
799    let mask = arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len));
800    let (out, out_mask) = rolling_extreme(data, mask.as_ref(), subwindow, |a, b| a > b, T::zero());
801    FloatArray {
802        data: out.into(),
803        null_mask: Some(out_mask),
804    }
805}
806
807/// Computes rolling counts of elements within sliding windows for positional analysis.
808///
809/// Counts the number of elements present within each sliding window position, providing
810/// fundamental cardinality information essential for statistical analysis and data validation.
811/// Unlike other rolling functions, operates on position information rather than data values.
812///
813/// ## Parameters
814/// * `window` - Tuple containing offset and length defining the data window scope
815/// * `subwindow` - Size of sliding window for element counting
816///
817/// ## Returns
818/// Returns an `IntegerArray<i32>` containing:
819/// - Count of elements in each complete window (always equals subwindow size)
820/// - Zero values for positions with incomplete windows
821/// - Null mask indicating where complete windows exist
822///
823/// ## Examples
824/// ```rust,ignore
825/// use simd_kernels::kernels::window::rolling_count;
826///
827/// let result = rolling_count((0, 5), 3); // 5 elements, window size 3
828/// ```
829#[inline]
830pub fn rolling_count(window: (Offset, Length), subwindow: usize) -> IntegerArray<i32> {
831    let (_offset, len) = window;
832    let mut out = prealloc_vec::<i32>(len);
833    let mut out_mask = new_null_mask(len);
834    for i in 0..len {
835        let start = if i + 1 >= subwindow {
836            i + 1 - subwindow
837        } else {
838            0
839        };
840        let count = (i - start + 1) as i32;
841        let valid_row = subwindow > 0 && i + 1 >= subwindow;
842        unsafe { out_mask.set_unchecked(i, valid_row) };
843        out[i] = if valid_row { count } else { 0 };
844    }
845    IntegerArray {
846        data: out.into(),
847        null_mask: Some(out_mask),
848    }
849}
850
851// Rank and Dense-rank kernels
852
853#[inline(always)]
854fn rank_numeric<T, F>(data: &[T], mask: Option<&Bitmask>, mut cmp: F) -> IntegerArray<i32>
855where
856    T: Copy,
857    F: FnMut(&T, &T) -> std::cmp::Ordering,
858{
859    let n = data.len();
860    let mut indices: Vec<usize> = (0..n).collect();
861    indices.sort_by(|&i, &j| cmp(&data[i], &data[j]));
862
863    let mut out = vec64![0i32; n];
864    let mut out_mask = Bitmask::new_set_all(n, false);
865
866    for (rank, &i) in indices.iter().enumerate() {
867        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
868            out[i] = (rank + 1) as i32;
869            unsafe { out_mask.set_unchecked(i, true) };
870        }
871    }
872
873    IntegerArray {
874        data: out.into(),
875        null_mask: Some(out_mask),
876    }
877}
878
879/// Computes standard SQL ROW_NUMBER() ranking for integer data with tie handling.
880///
881/// Assigns sequential rank values to elements based on their sorted order, providing
882/// standard SQL ROW_NUMBER() semantics where tied values receive different ranks.
883/// Essential for analytical queries requiring unique positional ranking.
884///
885/// ## Parameters
886/// * `window` - Integer array view containing values for ranking
887///
888/// ## Returns
889/// Returns an `IntegerArray<i32>` containing:
890/// - Rank values from 1 to n for valid elements
891/// - Zero values for null elements
892/// - Null mask indicating which positions have valid ranks
893///
894/// ## Ranking Semantics
895/// - **ROW_NUMBER() behaviour**: Each element receives a unique rank (1, 2, 3, ...)
896/// - **Tie breaking**: Tied values receive different ranks based on their position
897/// - **Ascending order**: Smaller values receive lower (better) ranks
898/// - **Null exclusion**: Null values are excluded from ranking and receive rank 0
899///
900/// ## Use Cases
901/// - **Analytical queries**: SQL ROW_NUMBER() window function implementation
902/// - **Leaderboards**: Creating ordered rankings with unique positions
903/// - **Percentile calculation**: Basis for percentile and quartile computations
904/// - **Data analysis**: Establishing ordinality in integer datasets
905///
906/// ## Examples
907/// ```rust,ignore
908/// use minarrow::IntegerArray;
909/// use simd_kernels::kernels::window::rank_int;
910///
911/// let arr = IntegerArray::<i32>::from_slice(&[30, 10, 20, 10]);
912/// let result = rank_int((&arr, 0, arr.len()));
913/// // Output: [4, 1, 3, 2] - ROW_NUMBER() style ranking
914/// ```
915#[inline(always)]
916pub fn rank_int<T: Ord + Copy>(window: IntegerAVT<T>) -> IntegerArray<i32> {
917    let (arr, offset, len) = window;
918    let data = &arr.data[offset..offset + len];
919    let null_mask = if len != arr.data.len() {
920        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
921    } else {
922        &arr.null_mask
923    };
924    rank_numeric(data, null_mask.as_ref(), |a, b| a.cmp(b))
925}
926
927/// Computes standard SQL ROW_NUMBER() ranking for floating-point data with IEEE 754 compliance.
928///
929/// Assigns sequential rank values based on sorted floating-point order, implementing
930/// ROW_NUMBER() semantics with proper handling of special floating-point values (NaN,
931/// infinity) according to IEEE 754 comparison standards.
932///
933/// ## Parameters
934/// * `window` - Float array view containing values for ranking
935///
936/// ## Returns
937/// Returns an `IntegerArray<i32>` containing:
938/// - Rank values from 1 to n for valid, non-NaN elements
939/// - Zero values for null or NaN elements
940/// - Null mask indicating positions with valid ranks
941///
942/// ## Floating-Point Ranking
943/// - **IEEE 754 ordering**: Uses IEEE 754 compliant comparison operations
944/// - **NaN handling**: NaN values are excluded from ranking (receive rank 0)
945/// - **Infinity treatment**: Positive/negative infinity participate in ranking
946/// - **Precision preservation**: Maintains full floating-point comparison precision
947///
948/// ## Ranking Semantics
949/// - **ROW_NUMBER() style**: Each non-NaN element receives unique sequential rank
950/// - **Ascending order**: Smaller floating-point values receive lower ranks
951/// - **Tie breaking**: Floating-point ties broken by original array position
952/// - **Special value exclusion**: NaN and null values excluded from rank assignment
953///
954/// ## Applications
955/// - **Statistical ranking**: Ranking continuous numerical data
956/// - **Scientific analysis**: Ordered ranking of experimental measurements
957/// - **Financial analysis**: Ranking performance metrics and indicators
958/// - **Data preprocessing**: Establishing ordinality for regression analysis
959///
960/// ## Examples
961/// ```rust,ignore
962/// use minarrow::FloatArray;
963/// use simd_kernels::kernels::window::rank_float;
964///
965/// let arr = FloatArray::<f64>::from_slice(&[3.14, 2.71, 1.41, f64::NAN]);
966/// let result = rank_float((&arr, 0, arr.len()));
967/// // Output: [3, 2, 1, 0] - NaN excluded, others ranked by value
968/// ```
969#[inline(always)]
970pub fn rank_float<T: Float + Copy>(window: FloatAVT<T>) -> IntegerArray<i32> {
971    let (arr, offset, len) = window;
972    let data = &arr.data[offset..offset + len];
973    let null_mask = if len != arr.data.len() {
974        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
975    } else {
976        &arr.null_mask
977    };
978    rank_numeric(data, null_mask.as_ref(), |a, b| a.partial_cmp(b).unwrap())
979}
980
981/// Computes standard SQL ROW_NUMBER() ranking for string data with lexicographic ordering.
982///
983/// Assigns sequential rank values based on lexicographic string comparison, implementing
984/// ROW_NUMBER() semantics for textual data. Essential for alphabetical ranking and
985/// string-based analytical operations.
986///
987/// ## Parameters
988/// * `arr` - String array view containing textual values for ranking
989///
990/// ## Returns
991/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
992/// - **Success**: Rank values from 1 to n for valid string elements
993/// - **Error**: KernelError if capacity validation fails
994/// - Zero values for null string elements
995/// - Null mask indicating positions with valid ranks
996///
997/// ## String Ranking Semantics
998/// - **Lexicographic order**: Uses standard string comparison (dictionary order)
999/// - **Case sensitivity**: Comparisons are case-sensitive ("A" < "a")
1000/// - **Unicode support**: Proper handling of UTF-8 encoded string data
1001/// - **ROW_NUMBER() behaviour**: Tied strings receive different ranks by position
1002///
1003/// ## Error Conditions
1004/// - **Capacity errors**: Returns KernelError if mask capacity validation fails
1005/// - **Memory allocation**: May fail with insufficient memory for large datasets
1006///
1007/// ## Use Cases
1008/// - **Alphabetical ranking**: Creating alphabetically ordered rankings
1009/// - **Text analysis**: Establishing lexicographic ordinality in textual data
1010/// - **Database operations**: SQL ROW_NUMBER() implementation for string columns
1011/// - **Sorting applications**: Providing ranking information for string sorting
1012///
1013/// ## Examples
1014/// ```rust,ignore
1015/// use minarrow::StringArray;
1016/// use simd_kernels::kernels::window::rank_str;
1017///
1018/// let arr = StringArray::<u32>::from_slice(&["zebra", "apple", "banana"]);
1019/// let result = rank_str((&arr, 0, arr.len())).unwrap();
1020/// // Output: [3, 1, 2] - lexicographic ranking
1021/// ```
1022#[inline(always)]
1023pub fn rank_str<T: Integer>(arr: StringAVT<T>) -> Result<IntegerArray<i32>, KernelError> {
1024    let (array, offset, len) = arr;
1025    let mask = array.null_mask.as_ref();
1026    let _ = confirm_mask_capacity(array.len(), mask)?;
1027
1028    // Gather (local_idx, string) pairs for valid elements in the window
1029    let mut tuples: Vec<(usize, &str)> = (0..len)
1030        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) }))
1031        .map(|i| (i, unsafe { array.get_unchecked(offset + i) }.unwrap_or("")))
1032        .collect();
1033
1034    // Sort by string value
1035    tuples.sort_by(|a, b| a.1.cmp(&b.1));
1036
1037    let mut out = vec64![0i32; len];
1038    let mut out_mask = new_null_mask(len);
1039
1040    // Assign ranks (1-based), using local output indices
1041    for (rank, (i, _)) in tuples.iter().enumerate() {
1042        out[*i] = (rank + 1) as i32;
1043        unsafe { out_mask.set_unchecked(*i, true) };
1044    }
1045
1046    Ok(IntegerArray {
1047        data: out.into(),
1048        null_mask: Some(out_mask),
1049    })
1050}
1051
1052#[inline(always)]
1053fn dense_rank_numeric<T, F, G>(
1054    data: &[T],
1055    mask: Option<&Bitmask>,
1056    mut sort: F,
1057    mut eq: G,
1058) -> Result<IntegerArray<i32>, KernelError>
1059where
1060    T: Copy,
1061    F: FnMut(&T, &T) -> std::cmp::Ordering,
1062    G: FnMut(&T, &T) -> bool,
1063{
1064    let n = data.len();
1065    let _ = confirm_mask_capacity(n, mask)?;
1066    let mut uniqs: Vec<T> = (0..n)
1067        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(i) }))
1068        .map(|i| data[i])
1069        .collect();
1070
1071    uniqs.sort_by(&mut sort);
1072    uniqs.dedup_by(|a, b| eq(&*a, &*b));
1073
1074    let mut out = prealloc_vec::<i32>(n);
1075    let mut out_mask = Bitmask::new_set_all(n, false);
1076
1077    for i in 0..n {
1078        if mask.map_or(true, |m| unsafe { m.get_unchecked(i) }) {
1079            let rank = uniqs.binary_search_by(|x| sort(x, &data[i])).unwrap() + 1;
1080            out[i] = rank as i32;
1081            unsafe { out_mask.set_unchecked(i, true) };
1082        } else {
1083            out[i] = 0;
1084        }
1085    }
1086
1087    Ok(IntegerArray {
1088        data: out.into(),
1089        null_mask: Some(out_mask),
1090    })
1091}
1092
1093/// Computes SQL DENSE_RANK() ranking for integer data with consecutive rank assignment.
1094///
1095/// Assigns consecutive rank values where tied values receive identical ranks, implementing
1096/// SQL DENSE_RANK() semantics. Unlike standard ranking, eliminates gaps in rank sequence
1097/// when ties occur, providing compact rank numbering for analytical applications.
1098///
1099/// ## Parameters
1100/// * `window` - Integer array view containing values for dense ranking
1101///
1102/// ## Returns
1103/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1104/// - **Success**: Dense rank values with no gaps in sequence
1105/// - **Error**: KernelError if capacity validation fails
1106/// - Zero values for null elements
1107/// - Null mask indicating positions with valid ranks
1108///
1109/// ## Dense Ranking Semantics
1110/// - **DENSE_RANK() behaviour**: Tied values receive same rank, next rank is consecutive
1111/// - **No rank gaps**: Eliminates gaps that occur in standard RANK() function
1112/// - **Unique value counting**: Essentially counts distinct values in sorted order
1113/// - **Ascending order**: Smaller integer values receive lower (better) ranks
1114///
1115/// ## Comparison with RANK()
1116/// - **RANK()**: [1, 2, 2, 4] for values [10, 20, 20, 30]
1117/// - **DENSE_RANK()**: [1, 2, 2, 3] for values [10, 20, 20, 30]
1118///
1119/// ## Use Cases
1120/// - **Analytical queries**: SQL DENSE_RANK() window function implementation
1121/// - **Categorical ranking**: Creating compact categorical orderings
1122/// - **Percentile calculation**: Foundation for percentile computations without gaps
1123/// - **Data binning**: Assigning data points to consecutive bins based on value
1124///
1125/// ## Examples
1126/// ```rust,ignore
1127/// use minarrow::IntegerArray;
1128/// use simd_kernels::kernels::window::dense_rank_int;
1129///
1130/// let arr = IntegerArray::<i32>::from_slice(&[10, 30, 20, 30]);
1131/// let result = dense_rank_int((&arr, 0, arr.len())).unwrap();
1132/// // Output: [1, 3, 2, 3] - dense ranking with tied values
1133/// ```
1134#[inline(always)]
1135pub fn dense_rank_int<T: Ord + Copy>(
1136    window: IntegerAVT<T>,
1137) -> Result<IntegerArray<i32>, KernelError> {
1138    let (arr, offset, len) = window;
1139    let data = &arr.data[offset..offset + len];
1140    let null_mask = if len != arr.data.len() {
1141        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1142    } else {
1143        &arr.null_mask
1144    };
1145    dense_rank_numeric(data, null_mask.as_ref(), |a, b| a.cmp(b), |a, b| a == b)
1146}
1147
1148/// Computes SQL DENSE_RANK() ranking for floating-point data with IEEE 754 compliance.
1149///
1150/// Implements dense ranking for floating-point values where tied values receive identical
1151/// consecutive ranks. Handles special floating-point values (NaN, infinity) according
1152/// to IEEE 754 standards while maintaining dense rank sequence properties.
1153///
1154/// ## Parameters
1155/// * `window` - Float array view containing values for dense ranking
1156///
1157/// ## Returns
1158/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1159/// - **Success**: Dense rank values with consecutive numbering
1160/// - **Error**: KernelError if capacity validation fails
1161/// - Zero values for null or NaN elements
1162/// - Null mask indicating positions with valid ranks
1163///
1164/// ## Applications
1165/// - **Scientific ranking**: Dense ranking of experimental measurements
1166/// - **Statistical analysis**: Percentile calculations without rank gaps
1167/// - **Financial modeling**: Dense ranking of performance metrics
1168/// - **Data preprocessing**: Creating ordinal encodings for continuous variables
1169///
1170/// ## Examples
1171/// ```rust,ignore
1172/// use minarrow::FloatArray;
1173/// use simd_kernels::kernels::window::dense_rank_float;
1174///
1175/// let arr = FloatArray::<f64>::from_slice(&[1.5, 3.14, 2.71, 3.14]);
1176/// let result = dense_rank_float((&arr, 0, arr.len())).unwrap();
1177/// // Output: [1, 3, 2, 3] - dense ranking with tied 3.14 values
1178/// ```
1179#[inline(always)]
1180pub fn dense_rank_float<T: Float + Copy>(
1181    window: FloatAVT<T>,
1182) -> Result<IntegerArray<i32>, KernelError> {
1183    let (arr, offset, len) = window;
1184    let data = &arr.data[offset..offset + len];
1185    let null_mask = if len != arr.data.len() {
1186        &arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1187    } else {
1188        &arr.null_mask
1189    };
1190    dense_rank_numeric(
1191        data,
1192        null_mask.as_ref(),
1193        |a, b| a.partial_cmp(b).unwrap(),
1194        |a, b| a == b,
1195    )
1196}
1197
1198/// Computes SQL DENSE_RANK() ranking for string data with lexicographic dense ordering.
1199///
1200/// Implements dense ranking for string values using lexicographic comparison, where
1201/// identical strings receive the same rank and subsequent ranks remain consecutive.
1202/// Essential for alphabetical dense ranking and textual categorical analysis.
1203///
1204/// ## Parameters
1205/// * `arr` - String array view containing textual values for dense ranking
1206///
1207/// ## Returns
1208/// Returns `Result<IntegerArray<i32>, KernelError>` containing:
1209/// - **Success**: Dense rank values with consecutive sequence
1210/// - **Error**: KernelError if capacity validation fails
1211/// - Zero values for null string elements
1212/// - Null mask indicating positions with valid ranks
1213///
1214/// ## Dense String Ranking
1215/// - **DENSE_RANK() semantics**: Identical strings receive same rank, no rank gaps
1216/// - **Lexicographic ordering**: Standard dictionary-style string comparison
1217/// - **Case sensitivity**: Maintains case-sensitive comparison ("Apple" ≠ "apple")
1218/// - **UTF-8 support**: Proper handling of Unicode string sequences
1219///
1220/// ## Use Cases
1221/// - **Alphabetical dense ranking**: Creating compact alphabetical orderings
1222/// - **Categorical encoding**: Converting string categories to dense integer codes
1223/// - **Text analytics**: Establishing lexicographic ordinality for text processing
1224/// - **Database operations**: SQL DENSE_RANK() for string-valued columns
1225///
1226/// ## Examples
1227/// ```rust,ignore
1228/// use minarrow::StringArray;
1229/// use simd_kernels::kernels::window::dense_rank_str;
1230///
1231/// let arr = StringArray::<u32>::from_slice(&["banana", "apple", "cherry", "apple"]);
1232/// let result = dense_rank_str((&arr, 0, arr.len())).unwrap();
1233/// // Output: [2, 1, 3, 1] - dense ranking with tied "apple" values
1234/// ```
1235#[inline(always)]
1236pub fn dense_rank_str<T: Integer>(arr: StringAVT<T>) -> Result<IntegerArray<i32>, KernelError> {
1237    let (array, offset, len) = arr;
1238    let mask = array.null_mask.as_ref();
1239    let _ = confirm_mask_capacity(array.len(), mask)?;
1240
1241    // Collect all unique valid values in window
1242    let mut vals: Vec<&str> = (0..len)
1243        .filter(|&i| mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) }))
1244        .map(|i| unsafe { array.get_unchecked(offset + i) }.unwrap_or(""))
1245        .collect();
1246    vals.sort();
1247    vals.dedup();
1248
1249    let mut out = prealloc_vec::<i32>(len);
1250    let mut out_mask = Bitmask::new_set_all(len, false);
1251
1252    for i in 0..len {
1253        let valid = mask.map_or(true, |m| unsafe { m.get_unchecked(offset + i) });
1254        if valid {
1255            let rank = vals
1256                .binary_search(&unsafe { array.get_unchecked(offset + i) }.unwrap_or(""))
1257                .unwrap()
1258                + 1;
1259            out[i] = rank as i32;
1260            unsafe { out_mask.set_unchecked(i, true) };
1261        } else {
1262            out[i] = 0;
1263        }
1264    }
1265
1266    Ok(IntegerArray {
1267        data: out.into(),
1268        null_mask: Some(out_mask),
1269    })
1270}
1271
1272// Lag / Lead / Shift kernels
1273
1274#[inline(always)]
1275fn shift_with_bounds<T: Copy>(
1276    data: &[T],
1277    mask: Option<&Bitmask>,
1278    len: usize,
1279    offset_fn: impl Fn(usize) -> Option<usize>,
1280    default: T,
1281) -> (Vec64<T>, Bitmask) {
1282    let mut out = prealloc_vec::<T>(len);
1283    let mut out_mask = Bitmask::new_set_all(len, false);
1284    for i in 0..len {
1285        if let Some(j) = offset_fn(i) {
1286            out[i] = data[j];
1287            let is_valid = mask.map_or(true, |m| unsafe { m.get_unchecked(j) });
1288            unsafe { out_mask.set_unchecked(i, is_valid) };
1289        } else {
1290            out[i] = default;
1291        }
1292    }
1293    (out, out_mask)
1294}
1295
1296#[inline(always)]
1297fn shift_str_with_bounds<T: Integer>(
1298    arr: StringAVT<T>,
1299    offset_fn: impl Fn(usize) -> Option<usize>,
1300) -> Result<StringArray<T>, KernelError> {
1301    let (array, offset, len) = arr;
1302    let src_mask = array.null_mask.as_ref();
1303    let _ = confirm_mask_capacity(array.len(), src_mask)?;
1304
1305    // Determine offsets and total bytes required
1306    let mut offsets = Vec64::<T>::with_capacity(len + 1);
1307    unsafe {
1308        offsets.set_len(len + 1);
1309    }
1310    offsets[0] = T::zero();
1311
1312    let mut total_bytes = 0;
1313    let mut string_lengths = vec![0usize; len];
1314
1315    for i in 0..len {
1316        let byte_len = if let Some(j) = offset_fn(i) {
1317            let src_idx = offset + j;
1318            let valid = src_mask.map_or(true, |m| unsafe { m.get_unchecked(src_idx) });
1319            if valid {
1320                unsafe { array.get_unchecked(src_idx).unwrap_or("").len() }
1321            } else {
1322                0
1323            }
1324        } else {
1325            0
1326        };
1327        total_bytes += byte_len;
1328        string_lengths[i] = byte_len;
1329        offsets[i + 1] = T::from_usize(total_bytes);
1330    }
1331
1332    // Allocate data buffer
1333    let mut data = Vec64::<u8>::with_capacity(total_bytes);
1334    let mut out_mask = Bitmask::new_set_all(len, false);
1335
1336    // Write string content
1337    for i in 0..len {
1338        if let Some(j) = offset_fn(i) {
1339            let src_idx = offset + j;
1340            let valid = src_mask.map_or(true, |m| unsafe { m.get_unchecked(src_idx) });
1341            if valid {
1342                let s = unsafe { array.get_unchecked(src_idx).unwrap_or("") };
1343                data.extend_from_slice(s.as_bytes());
1344                unsafe { out_mask.set_unchecked(i, true) };
1345                continue;
1346            }
1347        }
1348        // Not valid or OOB write nothing
1349    }
1350
1351    Ok(StringArray {
1352        offsets: offsets.into(),
1353        data: data.into(),
1354        null_mask: Some(out_mask),
1355    })
1356}
1357
1358// Integer
1359
1360/// Accesses values from previous positions in integer arrays with configurable offset.
1361///
1362/// Implements SQL LAG() window function semantics, retrieving values from earlier positions
1363/// in the array sequence. Essential for time series analysis, trend detection, and
1364/// comparative analytics requiring access to historical data points.
1365///
1366/// ## Parameters
1367/// * `window` - Integer array view containing sequential data for lag access
1368/// * `n` - Lag offset specifying how many positions to look backward
1369///
1370/// ## Returns
1371/// Returns an `IntegerArray<T>` containing:
1372/// - Values from n positions earlier in the sequence
1373/// - Default values for positions where lag source is unavailable
1374/// - Null mask indicating validity of lagged values
1375///
1376/// ## Examples
1377/// ```rust,ignore
1378/// use minarrow::IntegerArray;
1379/// use simd_kernels::kernels::window::lag_int;
1380///
1381/// let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1382/// let result = lag_int((&arr, 0, arr.len()), 1);
1383/// ```
1384#[inline]
1385pub fn lag_int<T: Copy + Default>(window: IntegerAVT<T>, n: usize) -> IntegerArray<T> {
1386    let (arr, offset, len) = window;
1387    let data_window = &arr.data[offset..offset + len];
1388    let mask_window = if len != arr.data.len() {
1389        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1390    } else {
1391        arr.null_mask.clone()
1392    };
1393    let (data, null_mask) = shift_with_bounds(
1394        data_window,
1395        mask_window.as_ref(),
1396        len,
1397        |i| if i >= n { Some(i - n) } else { None },
1398        T::default(),
1399    );
1400    IntegerArray {
1401        data: data.into(),
1402        null_mask: Some(null_mask),
1403    }
1404}
1405
1406/// Accesses values from future positions in integer arrays with configurable offset.
1407///
1408/// Implements SQL LEAD() window function semantics, retrieving values from later positions
1409/// in the array sequence. Essential for predictive analytics, forward-looking comparisons,
1410/// and temporal analysis requiring access to future data points.
1411///
1412/// ## Parameters
1413/// * `window` - Integer array view containing sequential data for lead access
1414/// * `n` - Lead offset specifying how many positions to look forward
1415///
1416/// ## Returns
1417/// Returns an `IntegerArray<T>` containing:
1418/// - Values from n positions later in the sequence
1419/// - Default values for positions where lead source is unavailable
1420/// - Null mask indicating validity of lead values
1421///
1422/// ## Examples
1423/// ```rust,ignore
1424/// use minarrow::IntegerArray;
1425/// use simd_kernels::kernels::window::lead_int;
1426///
1427/// let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1428/// let result = lead_int((&arr, 0, arr.len()), 2);
1429/// ```
1430#[inline]
1431pub fn lead_int<T: Copy + Default>(window: IntegerAVT<T>, n: usize) -> IntegerArray<T> {
1432    let (arr, offset, len) = window;
1433    let data_window = &arr.data[offset..offset + len];
1434    let mask_window = if len != arr.data.len() {
1435        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1436    } else {
1437        arr.null_mask.clone()
1438    };
1439    let (data, null_mask) = shift_with_bounds(
1440        data_window,
1441        mask_window.as_ref(),
1442        len,
1443        |i| if i + n < len { Some(i + n) } else { None },
1444        T::default(),
1445    );
1446    IntegerArray {
1447        data: data.into(),
1448        null_mask: Some(null_mask),
1449    }
1450}
1451
1452/// Accesses values from previous positions in floating-point arrays with IEEE 754 compliance.
1453///
1454/// Implements SQL LAG() function for floating-point data, retrieving values from earlier
1455/// positions while maintaining IEEE 754 semantics for special values (NaN, infinity).
1456/// Critical for time series analysis and numerical sequence processing.
1457///
1458/// ## Parameters
1459/// * `window` - Float array view containing sequential floating-point data
1460/// * `n` - Lag offset specifying backward position distance
1461///
1462/// ## Returns
1463/// Returns a `FloatArray<T>` containing:
1464/// - Floating-point values from n positions earlier
1465/// - Zero values for positions with insufficient history
1466/// - Null mask reflecting lag validity and special value handling
1467///
1468/// ## Examples
1469/// ```rust,ignore
1470/// use minarrow::FloatArray;
1471/// use simd_kernels::kernels::window::lag_float;
1472///
1473/// let arr = FloatArray::<f64>::from_slice(&[1.0, 2.5, 3.14, 4.7]);
1474/// let result = lag_float((&arr, 0, arr.len()), 1);
1475/// ```
1476#[inline]
1477pub fn lag_float<T: Copy + num_traits::Zero>(window: FloatAVT<T>, n: usize) -> FloatArray<T> {
1478    let (arr, offset, len) = window;
1479    let data_window = &arr.data[offset..offset + len];
1480    let mask_window = if len != arr.data.len() {
1481        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1482    } else {
1483        arr.null_mask.clone()
1484    };
1485    let (data, null_mask) = shift_with_bounds(
1486        data_window,
1487        mask_window.as_ref(),
1488        len,
1489        |i| if i >= n { Some(i - n) } else { None },
1490        T::zero(),
1491    );
1492    FloatArray {
1493        data: data.into(),
1494        null_mask: Some(null_mask),
1495    }
1496}
1497
1498/// Accesses values from future positions in floating-point arrays with IEEE 754 compliance.
1499///
1500/// Implements SQL LEAD() function for floating-point data, retrieving values from later
1501/// positions while preserving IEEE 754 semantics. Essential for forward-looking analysis
1502/// and predictive modeling with continuous numerical data.
1503///
1504/// ## Parameters
1505/// * `window` - Float array view containing sequential floating-point data
1506/// * `n` - Lead offset specifying forward position distance
1507///
1508/// ## Returns
1509/// Returns a `FloatArray<T>` containing:
1510/// - Floating-point values from n positions later
1511/// - Zero values for positions beyond available future
1512/// - Null mask indicating lead validity and special value propagation
1513///
1514/// ## Use Cases
1515/// - **Predictive analytics**: Accessing future values for comparison and modeling
1516/// - **Signal analysis**: Forward-looking operations in digital signal processing
1517/// - **Financial modeling**: Computing forward returns and future value analysis
1518/// - **Scientific computing**: Implementing forward difference schemes
1519///
1520/// ## Examples
1521/// ```rust,ignore
1522/// use minarrow::FloatArray;
1523/// use simd_kernels::kernels::window::lead_float;
1524///
1525/// let arr = FloatArray::<f32>::from_slice(&[1.1, 2.2, 3.3, 4.4]);
1526/// let result = lead_float((&arr, 0, arr.len()), 2);
1527/// // Output: [3.3, 4.4, 0.0, 0.0] - lead by 2 positions
1528/// ```
1529#[inline]
1530pub fn lead_float<T: Copy + num_traits::Zero>(window: FloatAVT<T>, n: usize) -> FloatArray<T> {
1531    let (arr, offset, len) = window;
1532    let data_window = &arr.data[offset..offset + len];
1533    let mask_window = if len != arr.data.len() {
1534        arr.null_mask.as_ref().map(|m| m.slice_clone(offset, len))
1535    } else {
1536        arr.null_mask.clone()
1537    };
1538    let (data, null_mask) = shift_with_bounds(
1539        data_window,
1540        mask_window.as_ref(),
1541        len,
1542        |i| if i + n < len { Some(i + n) } else { None },
1543        T::zero(),
1544    );
1545    FloatArray {
1546        data: data.into(),
1547        null_mask: Some(null_mask),
1548    }
1549}
1550
1551// String
1552
1553/// Accesses string values from previous positions with UTF-8 string handling.
1554///
1555/// Implements SQL LAG() function for string data, retrieving textual values from earlier
1556/// positions in the array sequence. Essential for textual analysis, sequential string
1557/// processing, and comparative text analytics.
1558///
1559/// ## Parameters
1560/// * `arr` - String array view containing sequential textual data
1561/// * `n` - Lag offset specifying backward position distance
1562///
1563/// ## Returns
1564/// Returns `Result<StringArray<T>, KernelError>` containing:
1565/// - **Success**: String values from n positions earlier
1566/// - **Error**: KernelError if string processing fails
1567/// - Empty strings for positions with insufficient history
1568/// - Null mask indicating lag validity and source availability
1569///
1570/// ## String Lag Semantics
1571/// - **UTF-8 preservation**: Maintains proper UTF-8 encoding throughout operation
1572/// - **Null propagation**: Null strings in source positions result in null outputs
1573/// - **Memory management**: Efficient string copying and allocation strategies
1574/// - **Boundary handling**: Positions without history receive empty string defaults
1575///
1576/// ## Applications
1577/// - **Text analysis**: Comparing current text with previous entries
1578/// - **Sequential processing**: Analysing patterns in ordered textual data
1579/// - **Log analysis**: Accessing previous log entries for context
1580/// - **Natural language processing**: Context-aware text processing with history
1581///
1582/// ## Examples
1583/// ```rust,ignore
1584/// use minarrow::StringArray;
1585/// use simd_kernels::kernels::window::lag_str;
1586///
1587/// let arr = StringArray::<u32>::from_slice(&["first", "second", "third"]);
1588/// let result = lag_str((&arr, 0, arr.len()), 1).unwrap();
1589/// // Output: ["", "first", "second"] - strings lagged by 1 position
1590/// ```
1591#[inline]
1592pub fn lag_str<T: Integer>(arr: StringAVT<T>, n: usize) -> Result<StringArray<T>, KernelError> {
1593    shift_str_with_bounds(arr, |i| if i >= n { Some(i - n) } else { None })
1594}
1595
1596/// Accesses string values from future positions with efficient UTF-8 processing.
1597///
1598/// Implements SQL LEAD() function for string data, retrieving textual values from later
1599/// positions in the array sequence. Critical for forward-looking text analysis and
1600/// sequential string pattern recognition.
1601///
1602/// ## Parameters
1603/// * `arr` - String array view containing sequential textual data
1604/// * `n` - Lead offset specifying forward position distance
1605///
1606/// ## Returns
1607/// Returns `Result<StringArray<T>, KernelError>` containing:
1608/// - **Success**: String values from n positions later
1609/// - **Error**: KernelError if string processing encounters issues
1610/// - Empty strings for positions beyond available future
1611/// - Null mask indicating lead validity and source availability
1612///
1613/// ## Examples
1614/// ```rust,ignore
1615/// use minarrow::StringArray;
1616/// use simd_kernels::kernels::window::lead_str;
1617///
1618/// let arr = StringArray::<u32>::from_slice(&["alpha", "beta", "gamma"]);
1619/// let result = lead_str((&arr, 0, arr.len()), 1).unwrap();
1620/// ```
1621#[inline]
1622pub fn lead_str<T: Integer>(arr: StringAVT<T>, n: usize) -> Result<StringArray<T>, KernelError> {
1623    let (_array, _offset, len) = arr;
1624    shift_str_with_bounds(arr, |i| if i + n < len { Some(i + n) } else { None })
1625}
1626
1627// Shift variants
1628/// Shifts integer array elements by specified offset with bidirectional support.
1629///
1630/// Provides unified interface for both LAG and LEAD operations through signed offset
1631/// parameter. Positive offsets implement LEAD semantics (forward shift), negative
1632/// offsets implement LAG semantics (backward shift), enabling flexible positional access.
1633///
1634/// ## Parameters
1635/// * `window` - Integer array view containing data for shifting
1636/// * `offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1637///
1638/// ## Returns
1639/// Returns an `IntegerArray<T>` containing:
1640/// - Shifted integer values according to offset direction
1641/// - Default values for positions beyond available data
1642/// - Null mask reflecting validity of shifted positions
1643///
1644/// ## Shift Semantics
1645/// - **Positive offset**: LEAD operation (shift left, access future values)
1646/// - **Negative offset**: LAG operation (shift right, access past values)
1647/// - **Zero offset**: Identity operation (returns original array)
1648/// - **Boundary handling**: Out-of-bounds positions receive default values
1649///
1650/// ## Applications
1651/// - **Time series analysis**: Flexible temporal shifting for comparison operations
1652/// - **Sequence processing**: Bidirectional access in ordered integer sequences
1653/// - **Algorithm implementation**: Building blocks for complex windowing operations
1654/// - **Data transformation**: Positional transformations in numerical datasets
1655///
1656/// ## Examples
1657/// ```rust,ignore
1658/// use minarrow::IntegerArray;
1659/// use simd_kernels::kernels::window::shift_int;
1660///
1661/// let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4]);
1662/// let lag = shift_int((&arr, 0, arr.len()), -1);  // LAG by 1
1663/// let lead = shift_int((&arr, 0, arr.len()), 2);  // LEAD by 2
1664/// // lag:  [0, 1, 2, 3] - previous values
1665/// // lead: [3, 4, 0, 0] - future values
1666/// ```
1667#[inline(always)]
1668pub fn shift_int<T: Copy + Default>(window: IntegerAVT<T>, offset: isize) -> IntegerArray<T> {
1669    let (arr, win_offset, win_len) = window;
1670    if offset == 0 {
1671        return IntegerArray {
1672            data: Vec64::from_slice(&arr.data[win_offset..win_offset + win_len]).into(),
1673            null_mask: if win_len != arr.data.len() {
1674                arr.null_mask
1675                    .as_ref()
1676                    .map(|m| m.slice_clone(win_offset, win_len))
1677            } else {
1678                arr.null_mask.clone()
1679            },
1680        };
1681    } else if offset > 0 {
1682        lead_int((arr, win_offset, win_len), offset as usize)
1683    } else {
1684        lag_int((arr, win_offset, win_len), offset.unsigned_abs())
1685    }
1686}
1687
1688/// Shifts floating-point array elements with IEEE 754 compliance and bidirectional support.
1689///
1690/// Unified shifting interface for floating-point data supporting both LAG and LEAD semantics
1691/// through signed offset parameter. Maintains IEEE 754 standards for special value handling
1692/// while providing efficient bidirectional positional access.
1693///
1694/// ## Parameters
1695/// * `window` - Float array view containing data for shifting
1696/// * `offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1697///
1698/// ## Returns
1699/// Returns a `FloatArray<T>` containing:
1700/// - Shifted floating-point values preserving IEEE 754 semantics
1701/// - Zero values for positions beyond data boundaries
1702/// - Null mask indicating validity of shifted positions
1703///
1704/// ## Examples
1705/// ```rust,ignore
1706/// use minarrow::FloatArray;
1707/// use simd_kernels::kernels::window::shift_float;
1708///
1709/// let arr = FloatArray::<f64>::from_slice(&[1.1, 2.2, 3.3, 4.4]);
1710/// let backward = shift_float((&arr, 0, arr.len()), -2); // LAG by 2
1711/// let forward = shift_float((&arr, 0, arr.len()), 1);   // LEAD by 1
1712/// ```
1713#[inline(always)]
1714pub fn shift_float<T: Copy + num_traits::Zero>(
1715    window: FloatAVT<T>,
1716    offset: isize,
1717) -> FloatArray<T> {
1718    let (arr, win_offset, win_len) = window;
1719    if offset == 0 {
1720        return FloatArray {
1721            data: Vec64::from_slice(&arr.data[win_offset..win_offset + win_len]).into(),
1722            null_mask: if win_len != arr.data.len() {
1723                arr.null_mask
1724                    .as_ref()
1725                    .map(|m| m.slice_clone(win_offset, win_len))
1726            } else {
1727                arr.null_mask.clone()
1728            },
1729        };
1730    } else if offset > 0 {
1731        lead_float((arr, win_offset, win_len), offset as usize)
1732    } else {
1733        lag_float((arr, win_offset, win_len), offset.unsigned_abs())
1734    }
1735}
1736
1737/// Shifts string array elements with UTF-8 integrity and bidirectional offset support.
1738///
1739/// String shifting function supporting both LAG and LEAD operations through
1740/// signed offset parameter. Maintains UTF-8 encoding integrity while providing flexible
1741/// positional access for textual sequence analysis.
1742///
1743/// ## Parameters
1744/// * `arr` - String array view containing textual data for shifting
1745/// * `shift_offset` - Signed offset: positive for LEAD (forward), negative for LAG (backward), zero for identity
1746///
1747/// ## Returns
1748/// Returns `Result<StringArray<T>, KernelError>` containing:
1749/// - **Success**: Shifted string values maintaining UTF-8 integrity
1750/// - **Error**: KernelError if string processing encounters issues
1751/// - Empty strings for positions beyond data boundaries
1752/// - Null mask reflecting validity of shifted string positions
1753///
1754/// ## Shift Semantics
1755/// - **Positive offset**: LEAD operation accessing future string values
1756/// - **Negative offset**: LAG operation accessing historical string values
1757/// - **Zero offset**: Identity operation (returns cloned array slice)
1758/// - **Boundary conditions**: Out-of-range positions produce empty strings
1759///
1760/// ## Examples
1761/// ```rust,ignore
1762/// use minarrow::StringArray;
1763/// use simd_kernels::kernels::window::shift_str;
1764///
1765/// let arr = StringArray::<u32>::from_slice(&["one", "two", "three"]);
1766/// let back = shift_str((&arr, 0, arr.len()), -1).unwrap(); // LAG
1767/// let forward = shift_str((&arr, 0, arr.len()), 1).unwrap(); // LEAD
1768/// // back:    ["", "one", "two"]
1769/// // forward: ["two", "three", ""]
1770/// ```
1771#[inline(always)]
1772pub fn shift_str<T: Integer>(
1773    arr: StringAVT<T>,
1774    shift_offset: isize,
1775) -> Result<StringArray<T>, KernelError> {
1776    if shift_offset == 0 {
1777        // Return this slice's window as a cloned StringArray
1778        let (array, off, len) = arr;
1779        Ok(array.slice_clone(off, len))
1780    } else if shift_offset > 0 {
1781        lead_str(arr, shift_offset as usize)
1782    } else {
1783        lag_str(arr, shift_offset.unsigned_abs())
1784    }
1785}
1786
1787#[cfg(test)]
1788mod tests {
1789    use minarrow::structs::variants::float::FloatArray;
1790    use minarrow::structs::variants::integer::IntegerArray;
1791    use minarrow::structs::variants::string::StringArray;
1792    use minarrow::{Bitmask, BooleanArray};
1793
1794    use super::*;
1795
1796    // ─────────────────────────── Helpers ───────────────────────────
1797
1798    /// Build a `Bitmask` from booleans.
1799    fn bm(bits: &[bool]) -> Bitmask {
1800        let mut m = Bitmask::new_set_all(bits.len(), false);
1801        for (i, &b) in bits.iter().enumerate() {
1802            m.set(i, b);
1803        }
1804        m
1805    }
1806
1807    /// Simple equality for `IntegerArray<T>`
1808    fn expect_int<T: PartialEq + std::fmt::Debug + Clone>(
1809        arr: &IntegerArray<T>,
1810        values: &[T],
1811        valid: &[bool],
1812    ) {
1813        assert_eq!(arr.data.as_slice(), values);
1814        let mask = arr.null_mask.as_ref().expect("mask missing");
1815        for (i, &v) in valid.iter().enumerate() {
1816            assert_eq!(mask.get(i), v, "mask bit {}", i);
1817        }
1818    }
1819
1820    /// Simple equality for `FloatArray<T>`
1821    fn expect_float<T: num_traits::Float + std::fmt::Debug>(
1822        arr: &FloatArray<T>,
1823        values: &[T],
1824        valid: &[bool],
1825        eps: T,
1826    ) {
1827        let data = arr.data.as_slice();
1828        assert_eq!(data.len(), values.len());
1829        for (a, b) in data.iter().zip(values.iter()) {
1830            assert!((*a - *b).abs() <= eps, "value mismatch {:?} vs {:?}", a, b);
1831        }
1832        let mask = arr.null_mask.as_ref().expect("mask missing");
1833        for (i, &v) in valid.iter().enumerate() {
1834            assert_eq!(mask.get(i), v);
1835        }
1836    }
1837
1838    // ───────────────────────── Rolling kernels ─────────────────────────
1839
1840    #[test]
1841    fn test_rolling_sum_int_basic() {
1842        let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
1843        let res = rolling_sum_int((&arr, 0, arr.len()), 3);
1844        expect_int(&res, &[0, 0, 6, 9, 12], &[false, false, true, true, true]);
1845    }
1846
1847    #[test]
1848    fn test_rolling_sum_int_masked() {
1849        let mut arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4]);
1850        arr.null_mask = Some(bm(&[true, false, true, true]));
1851        let res = rolling_sum_int((&arr, 0, arr.len()), 2);
1852        expect_int(
1853            &res,
1854            &[0, 0, 3, 7],
1855            &[false, false, false, true], // window valid only when no nulls in window
1856        );
1857    }
1858
1859    #[test]
1860    fn test_rolling_sum_float() {
1861        let arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0]);
1862        let res = rolling_sum_float((&arr, 0, arr.len()), 2);
1863        expect_float(&res, &[0.0, 0.0, 5.0], &[false, false, true], 1e-6f32);
1864    }
1865
1866    #[test]
1867    fn test_rolling_sum_bool() {
1868        let bools = BooleanArray::from_slice(&[true, true, false, true]);
1869        let res = rolling_sum_bool((&bools, 0, bools.len()), 2); // counts trues over window
1870        expect_int(&res, &[0, 0, 1, 1], &[false, false, true, true]);
1871    }
1872
1873    #[test]
1874    fn test_rolling_min_max_mean_count() {
1875        let arr = IntegerArray::<i32>::from_slice(&[3, 1, 4, 1, 5]);
1876        // min
1877        let rmin = rolling_min_int((&arr, 0, arr.len()), 2);
1878        expect_int(&rmin, &[0, 0, 1, 1, 1], &[false, false, true, true, true]);
1879
1880        // max
1881        let rmax = rolling_max_int((&arr, 0, arr.len()), 3);
1882        expect_int(&rmax, &[0, 0, 4, 4, 5], &[false, false, true, true, true]);
1883
1884        // mean
1885        let rmean = rolling_mean_int((&arr, 0, arr.len()), 2);
1886        expect_float(
1887            &rmean,
1888            &[0.0, 0.0, 2.5, 2.5, 3.0],
1889            &[false, false, true, true, true],
1890            1e-12,
1891        );
1892
1893        // count
1894        let cnt = rolling_count((0, 5), 3);
1895        expect_int(&cnt, &[0, 0, 3, 3, 3], &[false, false, true, true, true]);
1896    }
1897
1898    // ───────────────────────── Rank / Dense-rank ─────────────────────────
1899
1900    #[test]
1901    fn test_rank_int_basic() {
1902        let arr = IntegerArray::<i32>::from_slice(&[30, 10, 20]);
1903        let res = rank_int((&arr, 0, arr.len()));
1904        expect_int(&res, &[3, 1, 2], &[true, true, true]);
1905    }
1906
1907    #[test]
1908    fn test_rank_float_with_nulls() {
1909        let mut arr = FloatArray::<f64>::from_slice(&[2.0, 1.0, 3.0]);
1910        arr.null_mask = Some(bm(&[true, false, true]));
1911        let res = rank_float((&arr, 0, arr.len()));
1912        expect_int(&res, &[2, 0, 3], &[true, false, true]);
1913    }
1914
1915    #[test]
1916    fn test_dense_rank_str_duplicates() {
1917        let arr = StringArray::<u32>::from_slice(&["b", "a", "b", "c"]);
1918        let res = dense_rank_str((&arr, 0, arr.len())).unwrap();
1919        expect_int(&res, &[2, 1, 2, 3], &[true, true, true, true]);
1920    }
1921
1922    #[test]
1923    fn test_dense_rank_str_duplicates_chunk() {
1924        // Windowed over ["x", "b", "a", "b", "c", "y"]
1925        let arr = StringArray::<u32>::from_slice(&["x", "b", "a", "b", "c", "y"]);
1926        let res = dense_rank_str((&arr, 1, 4)).unwrap(); // window is "b", "a", "b", "c"
1927        expect_int(&res, &[2, 1, 2, 3], &[true, true, true, true]);
1928    }
1929
1930    // ───────────────────────── Lag / Lead / Shift ─────────────────────────
1931
1932    #[test]
1933    fn test_lag_lead_int() {
1934        let arr = IntegerArray::<i32>::from_slice(&[10, 20, 30, 40]);
1935        let lag1 = lag_int((&arr, 0, arr.len()), 1);
1936        expect_int(&lag1, &[0, 10, 20, 30], &[false, true, true, true]);
1937
1938        let lead2 = lead_int((&arr, 0, arr.len()), 2);
1939        expect_int(&lead2, &[30, 40, 0, 0], &[true, true, false, false]);
1940    }
1941
1942    #[test]
1943    fn test_shift_float_positive_negative_zero() {
1944        let arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0]);
1945        let s0 = shift_float((&arr, 0, arr.len()), 0);
1946        assert_eq!(s0.data, arr.data); // exact copy
1947
1948        let s1 = shift_float((&arr, 0, arr.len()), 1);
1949        expect_float(&s1, &[2.0, 3.0, 0.0], &[true, true, false], 1e-6f32);
1950
1951        let s_neg = shift_float((&arr, 0, arr.len()), -1);
1952        expect_float(&s_neg, &[0.0, 1.0, 2.0], &[false, true, true], 1e-6f32);
1953    }
1954
1955    #[test]
1956    fn test_lag_lead_str() {
1957        let arr = StringArray::<u32>::from_slice(&["a", "b", "c"]);
1958        let l1 = lag_str((&arr, 0, arr.len()), 1).unwrap();
1959        assert_eq!(l1.get(0), None);
1960        assert_eq!(l1.get(1), Some("a"));
1961        assert_eq!(l1.get(2), Some("b"));
1962
1963        let d1 = lead_str((&arr, 0, arr.len()), 1).unwrap();
1964        assert_eq!(d1.get(0), Some("b"));
1965        assert_eq!(d1.get(1), Some("c"));
1966        assert_eq!(d1.get(2), None);
1967    }
1968
1969    #[test]
1970    fn test_lag_lead_str_chunk() {
1971        // Window is ["x", "a", "b", "c", "y"], test on chunk "a", "b", "c"
1972        let arr = StringArray::<u32>::from_slice(&["x", "a", "b", "c", "y"]);
1973        let l1 = lag_str((&arr, 1, 3), 1).unwrap();
1974        assert_eq!(l1.get(0), None);
1975        assert_eq!(l1.get(1), Some("a"));
1976        assert_eq!(l1.get(2), Some("b"));
1977
1978        let d1 = lead_str((&arr, 1, 3), 1).unwrap();
1979        assert_eq!(d1.get(0), Some("b"));
1980        assert_eq!(d1.get(1), Some("c"));
1981        assert_eq!(d1.get(2), None);
1982    }
1983
1984    #[test]
1985    fn test_rolling_sum_int_edge_windows() {
1986        let arr = IntegerArray::<i32>::from_slice(&[1, 2, 3, 4, 5]);
1987
1988        // window = 0 → all zeros + mask all false
1989        let r0 = rolling_sum_int((&arr, 0, arr.len()), 0);
1990        assert_eq!(r0.data.as_slice(), &[0, 0, 0, 0, 0]);
1991        assert_eq!(r0.null_mask.as_ref().unwrap().all_unset(), true);
1992
1993        // window = 1 → identity
1994        let r1 = rolling_sum_int((&arr, 0, arr.len()), 1);
1995        assert_eq!(r1.data.as_slice(), &[1, 2, 3, 4, 5]);
1996        assert!(r1.null_mask.as_ref().unwrap().all_set());
1997
1998        // window > len → all zero + all false
1999        let r_large = rolling_sum_int((&arr, 0, arr.len()), 10);
2000        assert_eq!(r_large.data.as_slice(), &[0, 0, 0, 0, 0]);
2001        assert_eq!(r_large.null_mask.as_ref().unwrap().all_unset(), true);
2002    }
2003
2004    #[test]
2005    fn test_rolling_sum_float_masked_nulls_propagate() {
2006        let mut arr = FloatArray::<f32>::from_slice(&[1.0, 2.0, 3.0, 4.0]);
2007        // null in the middle
2008        arr.null_mask = Some(bm(&[true, true, false, true]));
2009        let r = rolling_sum_float((&arr, 0, arr.len()), 2);
2010        //   i=0: <full-window, 0.0, false>
2011        //   i=1: first full-window → cleared → 0.0, false
2012        //   i=2: window contains null → mask=false, but value = 2.0
2013        //   i=3: window contains null → mask=false, but value = 4.0
2014        expect_float(
2015            &r,
2016            &[0.0, 0.0, 2.0, 4.0],
2017            &[false, false, false, false],
2018            1e-6,
2019        );
2020    }
2021
2022    #[test]
2023    fn test_rolling_sum_bool_with_nulls() {
2024        let mut b = BooleanArray::from_slice(&[true, false, true, true]);
2025        b.null_mask = Some(bm(&[true, false, true, true]));
2026        let r = rolling_sum_bool((&b, 0, b.len()), 2);
2027        // windows [t,f], [f,t], [t,t] → only last window is all non-null
2028        expect_int(&r, &[0, 0, 1, 2], &[false, false, false, true]);
2029    }
2030
2031    #[test]
2032    fn test_lag_str_null_propagation() {
2033        let mut arr = StringArray::<u32>::from_slice(&["x", "y", "z"]);
2034        arr.null_mask = Some(bm(&[true, false, true])); // y is null
2035        let lag1 = lag_str((&arr, 0, arr.len()), 1).unwrap();
2036        assert_eq!(lag1.get(0), None); // no source → null
2037        assert_eq!(lag1.get(1), Some("x"));
2038        assert_eq!(lag1.get(2), None); // source was null
2039        let m = lag1.null_mask.unwrap();
2040        assert_eq!(m.get(0), false);
2041        assert_eq!(m.get(1), true);
2042        assert_eq!(m.get(2), false);
2043    }
2044
2045    #[test]
2046    fn test_lag_str_null_propagation_chunk() {
2047        // Window ["w", "x", "y", "z", "q"], test on chunk "x", "y", "z"
2048        let mut arr = StringArray::<u32>::from_slice(&["w", "x", "y", "z", "q"]);
2049        arr.null_mask = Some(bm(&[true, true, false, true, true]));
2050        let lag1 = lag_str((&arr, 1, 3), 1).unwrap();
2051        assert_eq!(lag1.get(0), None); // "x", index 0 in chunk, no source
2052        assert_eq!(lag1.get(1), Some("x")); // "y", index 1 pulls "x" (valid)
2053        assert_eq!(lag1.get(2), None); // "z", index 2 pulls "y" (invalid)
2054        let m = lag1.null_mask.unwrap();
2055        assert_eq!(m.get(0), false);
2056        assert_eq!(m.get(1), true);
2057        assert_eq!(m.get(2), false);
2058    }
2059
2060    #[test]
2061    fn test_shift_str_large_offset() {
2062        let arr = StringArray::<u32>::from_slice(&["a", "b", "c"]);
2063        let shifted = shift_str((&arr, 0, arr.len()), 10).unwrap(); // > len
2064        assert_eq!(shifted.len(), 3);
2065        for i in 0..3 {
2066            assert_eq!(shifted.get(i), None);
2067            assert_eq!(shifted.null_mask.as_ref().unwrap().get(i), false);
2068        }
2069    }
2070
2071    #[test]
2072    fn test_shift_str_large_offset_chunk() {
2073        // Window ["w", "a", "b", "c", "x"]
2074        let arr = StringArray::<u32>::from_slice(&["w", "a", "b", "c", "x"]);
2075        let shifted = shift_str((&arr, 1, 3), 10).unwrap(); // window is "a","b","c"
2076        assert_eq!(shifted.len(), 3);
2077        for i in 0..3 {
2078            assert_eq!(shifted.get(i), None);
2079            assert_eq!(shifted.null_mask.as_ref().unwrap().get(i), false);
2080        }
2081    }
2082
2083    #[test]
2084    fn test_rank_str_ties_and_nulls() {
2085        let mut arr = StringArray::<u32>::from_slice(&["a", "b", "a", "c"]);
2086        arr.null_mask = Some(bm(&[true, true, false, true]));
2087        let r = rank_str((&arr, 0, arr.len())).unwrap();
2088        // valid positions: 0="a"(rank1),1="b"(rank3),2=null,3="c"(rank4)
2089        expect_int(&r, &[1, 2, 0, 3], &[true, true, false, true]);
2090    }
2091
2092    #[test]
2093    fn test_rank_str_ties_and_nulls_chunk() {
2094        // Window ["w", "a", "b", "a", "c"]
2095        let mut arr = StringArray::<u32>::from_slice(&["w", "a", "b", "a", "c"]);
2096        arr.null_mask = Some(bm(&[true, true, true, false, true]));
2097        let r = rank_str((&arr, 1, 4)).unwrap(); // "a","b","a","c"
2098        expect_int(&r, &[1, 2, 0, 3], &[true, true, false, true]);
2099    }
2100}